Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7e38e46

Browse files
author
Maksim Milyutin
committed
Add extraction parallel worker pids from external backend
1 parent3080468 commit7e38e46

File tree

1 file changed

+105
-51
lines changed

1 file changed

+105
-51
lines changed

‎pg_query_state.c

Lines changed: 105 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6666

6767
/* Global variables */
6868
List*QueryDescStack=NIL;
69-
staticProcSignalReasonUserIdPollReason;
70-
staticProcSignalReasonQueryStatePollReason;
71-
staticProcSignalReasonWorkerPollReason;
69+
staticProcSignalReasonUserIdPollReason=INVALID_PROCSIGNAL;
70+
staticProcSignalReasonQueryStatePollReason=INVALID_PROCSIGNAL;
71+
staticProcSignalReasonWorkerPollReason=INVALID_PROCSIGNAL;
7272
staticboolmodule_initialized= false;
7373
staticconstchar*be_state_str[]= {/* BackendState -> string repr */
7474
"undefined",/* STATE_UNDEFINED */
@@ -107,9 +107,9 @@ typedef struct
107107
}trace_request;
108108

109109
staticvoidSendCurrentUserId(void);
110-
staticvoidSendWorkerPids(void);
110+
staticvoidSendBgWorkerPids(void);
111111
staticOidGetRemoteBackendUserId(PGPROC*proc);
112-
staticList*GetRemoteBackendWorkers(PGPROC*proc,int*error_code);
112+
staticList*GetRemoteBackendWorkers(PGPROC*proc);
113113

114114
/* Shared memory variables */
115115
shm_toc*toc=NULL;
@@ -208,7 +208,7 @@ _PG_init(void)
208208
/* Register interrupt on custom signal of polling query state */
209209
UserIdPollReason=RegisterCustomProcSignalHandler(SendCurrentUserId);
210210
QueryStatePollReason=RegisterCustomProcSignalHandler(SendQueryState);
211-
WorkerPollReason=RegisterCustomProcSignalHandler(SendWorkerPids);
211+
WorkerPollReason=RegisterCustomProcSignalHandler(SendBgWorkerPids);
212212
if (QueryStatePollReason==INVALID_PROCSIGNAL
213213
||WorkerPollReason==INVALID_PROCSIGNAL
214214
||UserIdPollReason==INVALID_PROCSIGNAL)
@@ -571,21 +571,22 @@ pg_query_state(PG_FUNCTION_ARGS)
571571

572572
if (SRF_IS_FIRSTCALL())
573573
{
574-
LOCKTAGtag;
575-
boolverbose=PG_GETARG_BOOL(1),
576-
costs=PG_GETARG_BOOL(2),
577-
timing=PG_GETARG_BOOL(3),
578-
buffers=PG_GETARG_BOOL(4),
579-
triggers=PG_GETARG_BOOL(5);
574+
LOCKTAGtag;
575+
boolverbose=PG_GETARG_BOOL(1),
576+
costs=PG_GETARG_BOOL(2),
577+
timing=PG_GETARG_BOOL(3),
578+
buffers=PG_GETARG_BOOL(4),
579+
triggers=PG_GETARG_BOOL(5);
580580
text*format_text=PG_GETARG_TEXT_P(6);
581-
ExplainFormatformat;
581+
ExplainFormatformat;
582582
PGPROC*proc;
583-
Oidcounterpart_user_id;
583+
Oidcounterpart_user_id;
584584
shm_mq_handle*mqh;
585-
shm_mq_resultmq_receive_result;
586-
intsend_signal_result;
587-
Sizelen;
585+
shm_mq_resultmq_receive_result;
586+
intsend_signal_result;
587+
Sizelen;
588588
shm_mq_msg*msg;
589+
List*bg_worker_pids=NIL;
589590

590591
if (!module_initialized)
591592
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -631,6 +632,8 @@ pg_query_state(PG_FUNCTION_ARGS)
631632
params->triggers=triggers;
632633
params->format=format;
633634

635+
bg_worker_pids=GetRemoteBackendWorkers(proc);
636+
634637
/* prepare message queue to transfer data */
635638
mq=shm_mq_create(mq,QUEUE_SIZE);
636639
shm_mq_set_sender(mq,proc);
@@ -843,8 +846,13 @@ GetRemoteBackendUserId(PGPROC *proc)
843846
{
844847
Oidresult;
845848

849+
Assert(proc&&proc->backendId!=InvalidBackendId);
850+
Assert(UserIdPollReason!=INVALID_PROCSIGNAL);
851+
Assert(counterpart_userid);
852+
846853
counterpart_userid->userid=InvalidOid;
847854
counterpart_userid->caller=MyLatch;
855+
pg_write_barrier();
848856

849857
SendProcSignal(proc->pid,UserIdPollReason,proc->backendId);
850858
for (;;)
@@ -864,8 +872,54 @@ GetRemoteBackendUserId(PGPROC *proc)
864872
returnresult;
865873
}
866874

875+
/*
876+
* Receive a message from a shared message queue until timeout is exceeded.
877+
*
878+
* Parameter `*nbytes` is set to the message length and *data to point to the
879+
* message payload. If timeout is exceeded SHM_MQ_WOULD_BLOCK is returned.
880+
*/
881+
staticshm_mq_result
882+
shm_mq_receive_with_timeout(shm_mq_handle*mqh,
883+
Size*nbytesp,
884+
void**datap,
885+
longtimeout)
886+
{
887+
888+
#ifdefHAVE_INT64_TIMESTAMP
889+
#defineGetNowFloat()((float8) GetCurrentTimestamp() / 1000.0)
890+
#else
891+
#defineGetNowFloat()1000.0 * GetCurrentTimestamp()
892+
#endif
893+
894+
float8endtime=GetNowFloat()+timeout;
895+
intrc=0;
896+
897+
for (;;)
898+
{
899+
longdelay;
900+
shm_mq_resultmq_receive_result;
901+
902+
mq_receive_result=shm_mq_receive(mqh,nbytesp,datap, true);
903+
904+
if (mq_receive_result!=SHM_MQ_WOULD_BLOCK)
905+
returnmq_receive_result;
906+
907+
if (rc&WL_TIMEOUT)
908+
returnSHM_MQ_WOULD_BLOCK;
909+
910+
delay= (long) (endtime-GetNowFloat());
911+
rc=WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,delay);
912+
CHECK_FOR_INTERRUPTS();
913+
ResetLatch(MyLatch);
914+
}
915+
}
916+
917+
/*
918+
* Extract to *result pids of all parallel workers running from leader process
919+
* that executes plan tree whose state root is `node`.
920+
*/
867921
staticbool
868-
extract_worker_handles(PlanState*node,List**result)
922+
extract_running_bgworkers(PlanState*node,List**result)
869923
{
870924
if (node==NULL)
871925
return false;
@@ -879,10 +933,11 @@ extract_worker_handles(PlanState *node, List **result)
879933
{
880934
for (i=0;i<gather_node->pei->pcxt->nworkers_launched;i++)
881935
{
882-
pid_tpid;
883-
BackgroundWorkerHandle*bgwh=gather_node->pei->pcxt->worker[i].bgwhandle;
884-
BgwHandleStatusstatus;
936+
pid_tpid;
937+
BackgroundWorkerHandle*bgwh;
938+
BgwHandleStatusstatus;
885939

940+
bgwh=gather_node->pei->pcxt->worker[i].bgwhandle;
886941
if (!bgwh)
887942
continue;
888943

@@ -892,37 +947,40 @@ extract_worker_handles(PlanState *node, List **result)
892947
}
893948
}
894949
}
895-
returnplanstate_tree_walker(node,extract_worker_handles, (void*)result);
950+
returnplanstate_tree_walker(node,extract_running_bgworkers, (void*)result);
896951
}
897952

898953
typedefstruct
899954
{
900-
intnum;
901-
pid_tpids[FLEXIBLE_ARRAY_MEMBER];
902-
}workers_msg;
955+
intnumber;
956+
pid_tpids[FLEXIBLE_ARRAY_MEMBER];
957+
}BgWorkerPids;
903958

904959
staticvoid
905-
SendWorkerPids(void)
960+
SendBgWorkerPids(void)
906961
{
907-
ListCell*iter;
908-
List*all_workers=NIL;
909-
workers_msg*msg;
910-
intmsg_len;
911-
inti;
912-
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
962+
ListCell*iter;
963+
List*all_workers=NIL;
964+
BgWorkerPids*msg;
965+
intmsg_len;
966+
inti;
967+
shm_mq_handle*mqh;
968+
969+
mqh=shm_mq_attach(mq,NULL,NULL);
913970

914971
foreach(iter,QueryDescStack)
915972
{
916973
QueryDesc*curQueryDesc= (QueryDesc*)lfirst(iter);
917974
List*bgworker_pids=NIL;
918975

919-
extract_worker_handles(curQueryDesc->planstate,&bgworker_pids);
976+
extract_running_bgworkers(curQueryDesc->planstate,&bgworker_pids);
920977
all_workers=list_concat(all_workers,bgworker_pids);
921978
}
922979

923-
msg_len= offsetof(workers_msg,pids)+sizeof(pid_t)*list_length(all_workers);
980+
msg_len= offsetof(BgWorkerPids,pids)
981+
+sizeof(pid_t)*list_length(all_workers);
924982
msg=palloc(msg_len);
925-
msg->num=list_length(all_workers);
983+
msg->number=list_length(all_workers);
926984
i=0;
927985
foreach(iter,all_workers)
928986
msg->pids[i++]=lfirst_int(iter);
@@ -931,44 +989,40 @@ SendWorkerPids(void)
931989
}
932990

933991
/*
992+
* Extracts all parallel worker pids running by process `proc`
993+
*/
934994
List*
935-
GetRemoteBackendWorkers(PGPROC *proc, int *error_code)
995+
GetRemoteBackendWorkers(PGPROC*proc)
936996
{
937-
intsig_result;
997+
intsig_result;
938998
shm_mq_handle*mqh;
939-
shm_mq_result mq_receive_result;
940-
workers_msg*msg;
941-
Sizemsg_len;
942-
inti;
999+
shm_mq_resultmq_receive_result;
1000+
BgWorkerPids*msg;
1001+
Sizemsg_len;
1002+
inti;
9431003
List*result=NIL;
9441004

945-
if (proc->backendId == InvalidBackendId)
946-
{
947-
return NIL;
948-
}
1005+
Assert(proc&&proc->backendId!=InvalidBackendId);
1006+
Assert(WorkerPollReason!=INVALID_PROCSIGNAL);
1007+
Assert(mq);
9491008

9501009
mq=shm_mq_create(mq,QUEUE_SIZE);
9511010
shm_mq_set_sender(mq,proc);
9521011
shm_mq_set_receiver(mq,MyProc);
9531012

9541013
sig_result=SendProcSignal(proc->pid,WorkerPollReason,proc->backendId);
9551014
if (sig_result==-1)
956-
{
9571015
returnNIL;
958-
}
9591016

9601017
mqh=shm_mq_attach(mq,NULL,NULL);
9611018
mq_receive_result=shm_mq_receive_with_timeout(mqh,&msg_len, (void**)&msg,1000);
9621019
if (mq_receive_result!=SHM_MQ_SUCCESS)
963-
{
9641020
returnNIL;
965-
}
9661021

967-
for (i = 0; i < msg->num; i++)
1022+
for (i=0;i<msg->number;i++)
9681023
result=lcons_int(msg->pids[i],result);
9691024

9701025
shm_mq_detach(mq);
9711026

9721027
returnresult;
9731028
}
974-
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp