Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb132ee1

Browse files
author
Maksim Milyutin
committed
Add parallel workers' state to result output
1 parent94b971c commitb132ee1

File tree

4 files changed

+120
-68
lines changed

4 files changed

+120
-68
lines changed

‎pg_query_state.c

Lines changed: 115 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,13 @@ static void SendCurrentUserId(void);
110110
staticvoidSendBgWorkerPids(void);
111111
staticOidGetRemoteBackendUserId(PGPROC*proc);
112112
staticList*GetRemoteBackendWorkers(PGPROC*proc);
113-
staticshm_mq_msg*GetRemoteBackendQueryState(PGPROC*proc,
114-
List*parallel_workers,
115-
boolverbose,
116-
boolcosts,
117-
booltiming,
118-
boolbuffers,
119-
booltriggers,
120-
ExplainFormatformat);
113+
staticList*GetRemoteBackendQueryStates(List*procs,
114+
boolverbose,
115+
boolcosts,
116+
booltiming,
117+
boolbuffers,
118+
booltriggers,
119+
ExplainFormatformat);
121120

122121
/* Shared memory variables */
123122
shm_toc*toc=NULL;
@@ -563,12 +562,19 @@ PG_FUNCTION_INFO_V1(pg_query_state);
563562
Datum
564563
pg_query_state(PG_FUNCTION_ARGS)
565564
{
566-
/* multicall context type */
567565
typedefstruct
568566
{
569-
ListCell*cursor;
570-
intindex;
567+
PGPROC*proc;
568+
ListCell*frame_cursor;
569+
intframe_index;
571570
List*stack;
571+
}proc_state;
572+
573+
/* multicall context type */
574+
typedefstruct
575+
{
576+
ListCell*proc_cursor;
577+
List*procs;
572578
}pg_qs_fctx;
573579

574580
FuncCallContext*funcctx;
@@ -591,6 +597,7 @@ pg_query_state(PG_FUNCTION_ARGS)
591597
Oidcounterpart_user_id;
592598
shm_mq_msg*msg;
593599
List*bg_worker_procs=NIL;
600+
List*msgs;
594601

595602
if (!module_initialized)
596603
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -630,14 +637,14 @@ pg_query_state(PG_FUNCTION_ARGS)
630637

631638
bg_worker_procs=GetRemoteBackendWorkers(proc);
632639

633-
msg=GetRemoteBackendQueryState(proc,
634-
bg_worker_procs,
635-
verbose,
636-
costs,
637-
timing,
638-
buffers,
639-
triggers,
640-
format);
640+
msgs=GetRemoteBackendQueryStates(lcons(proc,bg_worker_procs),
641+
verbose,
642+
costs,
643+
timing,
644+
buffers,
645+
triggers,
646+
format);
647+
msg= (shm_mq_msg*)linitial(msgs);
641648

642649
funcctx=SRF_FIRSTCALL_INIT();
643650
switch (msg->result_code)
@@ -661,8 +668,9 @@ pg_query_state(PG_FUNCTION_ARGS)
661668
SRF_RETURN_DONE(funcctx);
662669
caseQS_RETURNED:
663670
{
664-
List*qs_stack;
665671
TupleDesctupdesc;
672+
ListCell*i;
673+
int64max_calls=0;
666674

667675
/* print warnings if exist */
668676
if (msg->warnings&TIMINIG_OFF_WARNING)
@@ -676,13 +684,28 @@ pg_query_state(PG_FUNCTION_ARGS)
676684

677685
/* save stack of calls and current cursor in multicall context */
678686
fctx= (pg_qs_fctx*)palloc(sizeof(pg_qs_fctx));
679-
qs_stack=deserialize_stack(msg->stack,msg->stack_depth);
680-
fctx->stack=qs_stack;
681-
fctx->index=0;
682-
fctx->cursor=list_head(qs_stack);
687+
fctx->procs=NIL;
688+
foreach(i,msgs)
689+
{
690+
List*qs_stack;
691+
shm_mq_msg*msg= (shm_mq_msg*)lfirst(i);
692+
proc_state*p_state= (proc_state*)palloc(sizeof(proc_state));
693+
694+
qs_stack=deserialize_stack(msg->stack,msg->stack_depth);
695+
696+
p_state->proc=msg->proc;
697+
p_state->stack=qs_stack;
698+
p_state->frame_index=0;
699+
p_state->frame_cursor=list_head(qs_stack);
700+
701+
fctx->procs=lappend(fctx->procs,p_state);
702+
703+
max_calls+=list_length(qs_stack);
704+
}
705+
fctx->proc_cursor=list_head(fctx->procs);
683706

684707
funcctx->user_fctx=fctx;
685-
funcctx->max_calls=list_length(qs_stack);
708+
funcctx->max_calls=max_calls;
686709

687710
/* Make tuple descriptor */
688711
tupdesc=CreateTemplateTupleDesc(N_ATTRS, false);
@@ -706,24 +729,31 @@ pg_query_state(PG_FUNCTION_ARGS)
706729

707730
if (funcctx->call_cntr<funcctx->max_calls)
708731
{
709-
HeapTupletuple;
710-
Datumvalues[N_ATTRS];
711-
boolnulls[N_ATTRS];
712-
stack_frame*frame= (stack_frame*)lfirst(fctx->cursor);
732+
HeapTupletuple;
733+
Datumvalues[N_ATTRS];
734+
boolnulls[N_ATTRS];
735+
proc_state*p_state= (proc_state*)lfirst(fctx->proc_cursor);
736+
stack_frame*frame= (stack_frame*)lfirst(p_state->frame_cursor);
713737

714738
/* Make and return next tuple to caller */
715739
MemSet(values,0,sizeof(values));
716740
MemSet(nulls,0,sizeof(nulls));
717-
values[0]=Int32GetDatum(pid);
718-
values[1]=Int32GetDatum(fctx->index);
741+
values[0]=Int32GetDatum(p_state->proc->pid);
742+
values[1]=Int32GetDatum(p_state->frame_index);
719743
values[2]=PointerGetDatum(frame->query);
720744
values[3]=PointerGetDatum(frame->plan);
721-
nulls[4]= true;
745+
if (p_state->proc->pid==pid)
746+
nulls[4]= true;
747+
else
748+
values[4]=Int32GetDatum(pid);
722749
tuple=heap_form_tuple(funcctx->tuple_desc,values,nulls);
723750

724751
/* increment cursor */
725-
fctx->cursor=lnext(fctx->cursor);
726-
fctx->index++;
752+
p_state->frame_cursor=lnext(p_state->frame_cursor);
753+
p_state->frame_index++;
754+
755+
if (p_state->frame_cursor==NULL)
756+
fctx->proc_cursor=lnext(fctx->proc_cursor);
727757

728758
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(tuple));
729759
}
@@ -1017,22 +1047,26 @@ GetRemoteBackendWorkers(PGPROC *proc)
10171047
}
10181048

10191049
staticshm_mq_msg*
1020-
GetRemoteBackendQueryState(PGPROC*proc,
1021-
List*parallel_workers,
1022-
boolverbose,
1023-
boolcosts,
1024-
booltiming,
1025-
boolbuffers,
1026-
booltriggers,
1027-
ExplainFormatformat)
1050+
copy_msg(shm_mq_msg*msg)
10281051
{
1029-
shm_mq_msg*msg;
1030-
shm_mq_handle*mqh;
1031-
shm_mq_resultmq_receive_result;
1032-
intsig_result;
1033-
Sizelen;
1052+
shm_mq_msg*result=palloc(msg->length);
1053+
1054+
memcpy(result,msg,msg->length);
1055+
returnresult;
1056+
}
1057+
1058+
staticList*
1059+
GetRemoteBackendQueryStates(List*procs,
1060+
boolverbose,
1061+
boolcosts,
1062+
booltiming,
1063+
boolbuffers,
1064+
booltriggers,
1065+
ExplainFormatformat)
1066+
{
1067+
List*result=NIL;
1068+
ListCell*i;
10341069

1035-
Assert(proc&&proc->backendId!=InvalidBackendId);
10361070
Assert(QueryStatePollReason!=INVALID_PROCSIGNAL);
10371071
Assert(mq);
10381072

@@ -1045,26 +1079,41 @@ GetRemoteBackendQueryState(PGPROC *proc,
10451079
params->format=format;
10461080
pg_write_barrier();
10471081

1048-
/* prepare message queue to transfer data */
1049-
mq=shm_mq_create(mq,QUEUE_SIZE);
1050-
shm_mq_set_sender(mq,proc);
1051-
shm_mq_set_receiver(mq,MyProc);
1082+
foreach(i,procs)
1083+
{
1084+
PGPROC*proc= (PGPROC*)lfirst(i);
1085+
shm_mq_msg*msg;
1086+
shm_mq_handle*mqh;
1087+
shm_mq_resultmq_receive_result;
1088+
intsig_result;
1089+
Sizelen;
10521090

1053-
/* send signal to specified backend to extract its state */
1054-
sig_result=SendProcSignal(proc->pid,QueryStatePollReason,proc->backendId);
1055-
if (sig_result==-1)
1056-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1057-
errmsg("invalid send signal")));
1091+
Assert(proc&&proc->backendId!=InvalidBackendId);
10581092

1059-
/* retrieve data from message queue */
1060-
mqh=shm_mq_attach(mq,NULL,NULL);
1061-
mq_receive_result=shm_mq_receive_with_timeout(mqh,&len, (void**)&msg,5000);
1062-
if (mq_receive_result!=SHM_MQ_SUCCESS)
1063-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1064-
errmsg("invalid read from message queue")));
1065-
shm_mq_detach(mq);
1093+
/* prepare message queue to transfer data */
1094+
mq=shm_mq_create(mq,QUEUE_SIZE);
1095+
shm_mq_set_sender(mq,proc);
1096+
shm_mq_set_receiver(mq,MyProc);
1097+
1098+
/* send signal to specified backend to extract its state */
1099+
sig_result=SendProcSignal(proc->pid,QueryStatePollReason,proc->backendId);
1100+
if (sig_result==-1)
1101+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1102+
errmsg("invalid send signal")));
1103+
1104+
/* retrieve data from message queue */
1105+
mqh=shm_mq_attach(mq,NULL,NULL);
1106+
mq_receive_result=shm_mq_receive_with_timeout(mqh,&len, (void**)&msg,5000);
1107+
if (mq_receive_result!=SHM_MQ_SUCCESS)
1108+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1109+
errmsg("invalid read from message queue")));
10661110

1067-
Assert(len==msg->length);
1111+
result=lappend(result,copy_msg(msg));
10681112

1069-
returnmsg;
1113+
shm_mq_detach(mq);
1114+
1115+
Assert(len==msg->length);
1116+
}
1117+
1118+
returnresult;
10701119
}

‎pg_query_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ typedef enum
3737
typedefstruct
3838
{
3939
intlength;/* size of message record, for sanity check */
40+
PGPROC*proc;
4041
PG_QS_RequestResultresult_code;
4142
intwarnings;/* bitmap of warnings */
4243
intstack_depth;

‎signal_handler.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,15 @@ SendQueryState(void)
158158
/* check if module is enabled */
159159
if (!pg_qs_enable)
160160
{
161-
shm_mq_msgmsg= {BASE_SIZEOF_SHM_MQ_MSG,STAT_DISABLED };
161+
shm_mq_msgmsg= {BASE_SIZEOF_SHM_MQ_MSG,MyProc,STAT_DISABLED };
162162

163163
shm_mq_send(mqh,msg.length,&msg, false);
164164
}
165165

166166
/* check if backend doesn't execute any query */
167167
elseif (list_length(QueryDescStack)==0)
168168
{
169-
shm_mq_msgmsg= {BASE_SIZEOF_SHM_MQ_MSG,QUERY_NOT_RUNNING };
169+
shm_mq_msgmsg= {BASE_SIZEOF_SHM_MQ_MSG,MyProc,QUERY_NOT_RUNNING };
170170

171171
shm_mq_send(mqh,msg.length,&msg, false);
172172
}
@@ -179,6 +179,7 @@ SendQueryState(void)
179179
shm_mq_msg*msg=palloc(msglen);
180180

181181
msg->length=msglen;
182+
msg->proc=MyProc;
182183
msg->result_code=QS_RETURNED;
183184

184185
msg->warnings=0;

‎tests/test_cases.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def test_concurrent_access(config):
152152
acurs1,acurs2,acurs3=acon1.cursor(),acon2.cursor(),acon3.cursor()
153153
query='select count(*) from foo join bar on foo.c1=bar.c1'
154154

155+
set_guc(acon3,'max_parallel_workers_per_gather',0)
155156
acurs3.execute(query)
156157
time.sleep(0.1)
157158
acurs1.callproc('pg_query_state', (acon3.get_backend_pid(),))

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp