@@ -110,14 +110,13 @@ static void SendCurrentUserId(void);
110110static void SendBgWorkerPids (void );
111111static Oid GetRemoteBackendUserId (PGPROC * proc );
112112static List * GetRemoteBackendWorkers (PGPROC * proc );
113- static shm_mq_msg * GetRemoteBackendQueryState (PGPROC * proc ,
114- List * parallel_workers ,
115- bool verbose ,
116- bool costs ,
117- bool timing ,
118- bool buffers ,
119- bool triggers ,
120- ExplainFormat format );
113+ static List * GetRemoteBackendQueryStates (List * procs ,
114+ bool verbose ,
115+ bool costs ,
116+ bool timing ,
117+ bool buffers ,
118+ bool triggers ,
119+ ExplainFormat format );
121120
122121/* Shared memory variables */
123122shm_toc * toc = NULL ;
@@ -563,12 +562,19 @@ PG_FUNCTION_INFO_V1(pg_query_state);
563562Datum
564563pg_query_state (PG_FUNCTION_ARGS )
565564{
566- /* multicall context type */
567565typedef struct
568566{
569- ListCell * cursor ;
570- int index ;
567+ PGPROC * proc ;
568+ ListCell * frame_cursor ;
569+ int frame_index ;
571570List * stack ;
571+ }proc_state ;
572+
573+ /* multicall context type */
574+ typedef struct
575+ {
576+ ListCell * proc_cursor ;
577+ List * procs ;
572578}pg_qs_fctx ;
573579
574580FuncCallContext * funcctx ;
@@ -591,6 +597,7 @@ pg_query_state(PG_FUNCTION_ARGS)
591597Oid counterpart_user_id ;
592598shm_mq_msg * msg ;
593599List * bg_worker_procs = NIL ;
600+ List * msgs ;
594601
595602if (!module_initialized )
596603ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -630,14 +637,14 @@ pg_query_state(PG_FUNCTION_ARGS)
630637
631638bg_worker_procs = GetRemoteBackendWorkers (proc );
632639
633- msg = GetRemoteBackendQueryState ( proc ,
634- bg_worker_procs ,
635- verbose ,
636- costs ,
637- timing ,
638- buffers ,
639- triggers ,
640- format );
640+ msgs = GetRemoteBackendQueryStates ( lcons ( proc , bg_worker_procs ) ,
641+ verbose ,
642+ costs ,
643+ timing ,
644+ buffers ,
645+ triggers ,
646+ format );
647+ msg = ( shm_mq_msg * ) linitial ( msgs );
641648
642649funcctx = SRF_FIRSTCALL_INIT ();
643650switch (msg -> result_code )
@@ -661,8 +668,9 @@ pg_query_state(PG_FUNCTION_ARGS)
661668SRF_RETURN_DONE (funcctx );
662669case QS_RETURNED :
663670{
664- List * qs_stack ;
665671TupleDesc tupdesc ;
672+ ListCell * i ;
673+ int64 max_calls = 0 ;
666674
667675/* print warnings if exist */
668676if (msg -> warnings & TIMINIG_OFF_WARNING )
@@ -676,13 +684,28 @@ pg_query_state(PG_FUNCTION_ARGS)
676684
677685/* save stack of calls and current cursor in multicall context */
678686fctx = (pg_qs_fctx * )palloc (sizeof (pg_qs_fctx ));
679- qs_stack = deserialize_stack (msg -> stack ,msg -> stack_depth );
680- fctx -> stack = qs_stack ;
681- fctx -> index = 0 ;
682- fctx -> cursor = list_head (qs_stack );
687+ fctx -> procs = NIL ;
688+ foreach (i ,msgs )
689+ {
690+ List * qs_stack ;
691+ shm_mq_msg * msg = (shm_mq_msg * )lfirst (i );
692+ proc_state * p_state = (proc_state * )palloc (sizeof (proc_state ));
693+
694+ qs_stack = deserialize_stack (msg -> stack ,msg -> stack_depth );
695+
696+ p_state -> proc = msg -> proc ;
697+ p_state -> stack = qs_stack ;
698+ p_state -> frame_index = 0 ;
699+ p_state -> frame_cursor = list_head (qs_stack );
700+
701+ fctx -> procs = lappend (fctx -> procs ,p_state );
702+
703+ max_calls += list_length (qs_stack );
704+ }
705+ fctx -> proc_cursor = list_head (fctx -> procs );
683706
684707funcctx -> user_fctx = fctx ;
685- funcctx -> max_calls = list_length ( qs_stack ) ;
708+ funcctx -> max_calls = max_calls ;
686709
687710/* Make tuple descriptor */
688711tupdesc = CreateTemplateTupleDesc (N_ATTRS , false);
@@ -706,24 +729,31 @@ pg_query_state(PG_FUNCTION_ARGS)
706729
707730if (funcctx -> call_cntr < funcctx -> max_calls )
708731{
709- HeapTuple tuple ;
710- Datum values [N_ATTRS ];
711- bool nulls [N_ATTRS ];
712- stack_frame * frame = (stack_frame * )lfirst (fctx -> cursor );
732+ HeapTuple tuple ;
733+ Datum values [N_ATTRS ];
734+ bool nulls [N_ATTRS ];
735+ proc_state * p_state = (proc_state * )lfirst (fctx -> proc_cursor );
736+ stack_frame * frame = (stack_frame * )lfirst (p_state -> frame_cursor );
713737
714738/* Make and return next tuple to caller */
715739MemSet (values ,0 ,sizeof (values ));
716740MemSet (nulls ,0 ,sizeof (nulls ));
717- values [0 ]= Int32GetDatum (pid );
718- values [1 ]= Int32GetDatum (fctx -> index );
741+ values [0 ]= Int32GetDatum (p_state -> proc -> pid );
742+ values [1 ]= Int32GetDatum (p_state -> frame_index );
719743values [2 ]= PointerGetDatum (frame -> query );
720744values [3 ]= PointerGetDatum (frame -> plan );
721- nulls [4 ]= true;
745+ if (p_state -> proc -> pid == pid )
746+ nulls [4 ]= true;
747+ else
748+ values [4 ]= Int32GetDatum (pid );
722749tuple = heap_form_tuple (funcctx -> tuple_desc ,values ,nulls );
723750
724751/* increment cursor */
725- fctx -> cursor = lnext (fctx -> cursor );
726- fctx -> index ++ ;
752+ p_state -> frame_cursor = lnext (p_state -> frame_cursor );
753+ p_state -> frame_index ++ ;
754+
755+ if (p_state -> frame_cursor == NULL )
756+ fctx -> proc_cursor = lnext (fctx -> proc_cursor );
727757
728758SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (tuple ));
729759}
@@ -1017,22 +1047,26 @@ GetRemoteBackendWorkers(PGPROC *proc)
10171047}
10181048
10191049static shm_mq_msg *
1020- GetRemoteBackendQueryState (PGPROC * proc ,
1021- List * parallel_workers ,
1022- bool verbose ,
1023- bool costs ,
1024- bool timing ,
1025- bool buffers ,
1026- bool triggers ,
1027- ExplainFormat format )
1050+ copy_msg (shm_mq_msg * msg )
10281051{
1029- shm_mq_msg * msg ;
1030- shm_mq_handle * mqh ;
1031- shm_mq_result mq_receive_result ;
1032- int sig_result ;
1033- Size len ;
1052+ shm_mq_msg * result = palloc (msg -> length );
1053+
1054+ memcpy (result ,msg ,msg -> length );
1055+ return result ;
1056+ }
1057+
1058+ static List *
1059+ GetRemoteBackendQueryStates (List * procs ,
1060+ bool verbose ,
1061+ bool costs ,
1062+ bool timing ,
1063+ bool buffers ,
1064+ bool triggers ,
1065+ ExplainFormat format )
1066+ {
1067+ List * result = NIL ;
1068+ ListCell * i ;
10341069
1035- Assert (proc && proc -> backendId != InvalidBackendId );
10361070Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
10371071Assert (mq );
10381072
@@ -1045,26 +1079,41 @@ GetRemoteBackendQueryState(PGPROC *proc,
10451079params -> format = format ;
10461080pg_write_barrier ();
10471081
1048- /* prepare message queue to transfer data */
1049- mq = shm_mq_create (mq ,QUEUE_SIZE );
1050- shm_mq_set_sender (mq ,proc );
1051- shm_mq_set_receiver (mq ,MyProc );
1082+ foreach (i ,procs )
1083+ {
1084+ PGPROC * proc = (PGPROC * )lfirst (i );
1085+ shm_mq_msg * msg ;
1086+ shm_mq_handle * mqh ;
1087+ shm_mq_result mq_receive_result ;
1088+ int sig_result ;
1089+ Size len ;
10521090
1053- /* send signal to specified backend to extract its state */
1054- sig_result = SendProcSignal (proc -> pid ,QueryStatePollReason ,proc -> backendId );
1055- if (sig_result == -1 )
1056- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1057- errmsg ("invalid send signal" )));
1091+ Assert (proc && proc -> backendId != InvalidBackendId );
10581092
1059- /* retrieve data from message queue */
1060- mqh = shm_mq_attach (mq ,NULL ,NULL );
1061- mq_receive_result = shm_mq_receive_with_timeout (mqh ,& len , (void * * )& msg ,5000 );
1062- if (mq_receive_result != SHM_MQ_SUCCESS )
1063- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1064- errmsg ("invalid read from message queue" )));
1065- shm_mq_detach (mq );
1093+ /* prepare message queue to transfer data */
1094+ mq = shm_mq_create (mq ,QUEUE_SIZE );
1095+ shm_mq_set_sender (mq ,proc );
1096+ shm_mq_set_receiver (mq ,MyProc );
1097+
1098+ /* send signal to specified backend to extract its state */
1099+ sig_result = SendProcSignal (proc -> pid ,QueryStatePollReason ,proc -> backendId );
1100+ if (sig_result == -1 )
1101+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1102+ errmsg ("invalid send signal" )));
1103+
1104+ /* retrieve data from message queue */
1105+ mqh = shm_mq_attach (mq ,NULL ,NULL );
1106+ mq_receive_result = shm_mq_receive_with_timeout (mqh ,& len , (void * * )& msg ,5000 );
1107+ if (mq_receive_result != SHM_MQ_SUCCESS )
1108+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1109+ errmsg ("invalid read from message queue" )));
10661110
1067- Assert ( len == msg -> length );
1111+ result = lappend ( result , copy_msg ( msg ) );
10681112
1069- return msg ;
1113+ shm_mq_detach (mq );
1114+
1115+ Assert (len == msg -> length );
1116+ }
1117+
1118+ return result ;
10701119}