Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit94b971c

Browse files
author
Maksim Milyutin
committed
Separate getting of query state as single functio
1 parent7e38e46 commit94b971c

File tree

1 file changed

+80
-38
lines changed

1 file changed

+80
-38
lines changed

‎pg_query_state.c

Lines changed: 80 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ static void SendCurrentUserId(void);
110110
staticvoidSendBgWorkerPids(void);
111111
staticOidGetRemoteBackendUserId(PGPROC*proc);
112112
staticList*GetRemoteBackendWorkers(PGPROC*proc);
113+
staticshm_mq_msg*GetRemoteBackendQueryState(PGPROC*proc,
114+
List*parallel_workers,
115+
boolverbose,
116+
boolcosts,
117+
booltiming,
118+
boolbuffers,
119+
booltriggers,
120+
ExplainFormatformat);
113121

114122
/* Shared memory variables */
115123
shm_toc*toc=NULL;
@@ -581,12 +589,8 @@ pg_query_state(PG_FUNCTION_ARGS)
581589
ExplainFormatformat;
582590
PGPROC*proc;
583591
Oidcounterpart_user_id;
584-
shm_mq_handle*mqh;
585-
shm_mq_resultmq_receive_result;
586-
intsend_signal_result;
587-
Sizelen;
588592
shm_mq_msg*msg;
589-
List*bg_worker_pids=NIL;
593+
List*bg_worker_procs=NIL;
590594

591595
if (!module_initialized)
592596
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -624,36 +628,16 @@ pg_query_state(PG_FUNCTION_ARGS)
624628
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
625629
errmsg("permission denied")));
626630

627-
/* fill in parameters of query state request */
628-
params->verbose=verbose;
629-
params->costs=costs;
630-
params->timing=timing;
631-
params->buffers=buffers;
632-
params->triggers=triggers;
633-
params->format=format;
634-
635-
bg_worker_pids=GetRemoteBackendWorkers(proc);
636-
637-
/* prepare message queue to transfer data */
638-
mq=shm_mq_create(mq,QUEUE_SIZE);
639-
shm_mq_set_sender(mq,proc);
640-
shm_mq_set_receiver(mq,MyProc);
641-
642-
/* send signal to specified backend to extract its state */
643-
send_signal_result=SendProcSignal(pid,QueryStatePollReason,proc->backendId);
644-
if (send_signal_result==-1)
645-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
646-
errmsg("invalid send signal")));
647-
648-
/* retrieve data from message queue */
649-
mqh=shm_mq_attach(mq,NULL,NULL);
650-
mq_receive_result=shm_mq_receive(mqh,&len, (void**)&msg, false);
651-
if (mq_receive_result!=SHM_MQ_SUCCESS)
652-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
653-
errmsg("invalid read from message queue")));
654-
shm_mq_detach(mq);
655-
656-
Assert(len==msg->length);
631+
bg_worker_procs=GetRemoteBackendWorkers(proc);
632+
633+
msg=GetRemoteBackendQueryState(proc,
634+
bg_worker_procs,
635+
verbose,
636+
costs,
637+
timing,
638+
buffers,
639+
triggers,
640+
format);
657641

658642
funcctx=SRF_FIRSTCALL_INIT();
659643
switch (msg->result_code)
@@ -989,9 +973,9 @@ SendBgWorkerPids(void)
989973
}
990974

991975
/*
992-
* Extracts all parallel workerpids running by process `proc`
976+
* Extracts all parallel worker`proc`s running by process `proc`
993977
*/
994-
List*
978+
staticList*
995979
GetRemoteBackendWorkers(PGPROC*proc)
996980
{
997981
intsig_result;
@@ -1020,9 +1004,67 @@ GetRemoteBackendWorkers(PGPROC *proc)
10201004
returnNIL;
10211005

10221006
for (i=0;i<msg->number;i++)
1023-
result=lcons_int(msg->pids[i],result);
1007+
{
1008+
pid_tpid=msg->pids[i];
1009+
PGPROC*proc=BackendPidGetProc(pid);
1010+
1011+
result=lcons(proc,result);
1012+
}
10241013

10251014
shm_mq_detach(mq);
10261015

10271016
returnresult;
10281017
}
1018+
1019+
staticshm_mq_msg*
1020+
GetRemoteBackendQueryState(PGPROC*proc,
1021+
List*parallel_workers,
1022+
boolverbose,
1023+
boolcosts,
1024+
booltiming,
1025+
boolbuffers,
1026+
booltriggers,
1027+
ExplainFormatformat)
1028+
{
1029+
shm_mq_msg*msg;
1030+
shm_mq_handle*mqh;
1031+
shm_mq_resultmq_receive_result;
1032+
intsig_result;
1033+
Sizelen;
1034+
1035+
Assert(proc&&proc->backendId!=InvalidBackendId);
1036+
Assert(QueryStatePollReason!=INVALID_PROCSIGNAL);
1037+
Assert(mq);
1038+
1039+
/* fill in parameters of query state request */
1040+
params->verbose=verbose;
1041+
params->costs=costs;
1042+
params->timing=timing;
1043+
params->buffers=buffers;
1044+
params->triggers=triggers;
1045+
params->format=format;
1046+
pg_write_barrier();
1047+
1048+
/* prepare message queue to transfer data */
1049+
mq=shm_mq_create(mq,QUEUE_SIZE);
1050+
shm_mq_set_sender(mq,proc);
1051+
shm_mq_set_receiver(mq,MyProc);
1052+
1053+
/* send signal to specified backend to extract its state */
1054+
sig_result=SendProcSignal(proc->pid,QueryStatePollReason,proc->backendId);
1055+
if (sig_result==-1)
1056+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1057+
errmsg("invalid send signal")));
1058+
1059+
/* retrieve data from message queue */
1060+
mqh=shm_mq_attach(mq,NULL,NULL);
1061+
mq_receive_result=shm_mq_receive_with_timeout(mqh,&len, (void**)&msg,5000);
1062+
if (mq_receive_result!=SHM_MQ_SUCCESS)
1063+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1064+
errmsg("invalid read from message queue")));
1065+
shm_mq_detach(mq);
1066+
1067+
Assert(len==msg->length);
1068+
1069+
returnmsg;
1070+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp