@@ -110,14 +110,13 @@ static void SendCurrentUserId(void);
110
110
static void SendBgWorkerPids (void );
111
111
static Oid GetRemoteBackendUserId (PGPROC * proc );
112
112
static List * GetRemoteBackendWorkers (PGPROC * proc );
113
- static shm_mq_msg * GetRemoteBackendQueryState (PGPROC * proc ,
114
- List * parallel_workers ,
115
- bool verbose ,
116
- bool costs ,
117
- bool timing ,
118
- bool buffers ,
119
- bool triggers ,
120
- ExplainFormat format );
113
+ static List * GetRemoteBackendQueryStates (List * procs ,
114
+ bool verbose ,
115
+ bool costs ,
116
+ bool timing ,
117
+ bool buffers ,
118
+ bool triggers ,
119
+ ExplainFormat format );
121
120
122
121
/* Shared memory variables */
123
122
shm_toc * toc = NULL ;
@@ -563,12 +562,19 @@ PG_FUNCTION_INFO_V1(pg_query_state);
563
562
Datum
564
563
pg_query_state (PG_FUNCTION_ARGS )
565
564
{
566
- /* multicall context type */
567
565
typedef struct
568
566
{
569
- ListCell * cursor ;
570
- int index ;
567
+ PGPROC * proc ;
568
+ ListCell * frame_cursor ;
569
+ int frame_index ;
571
570
List * stack ;
571
+ }proc_state ;
572
+
573
+ /* multicall context type */
574
+ typedef struct
575
+ {
576
+ ListCell * proc_cursor ;
577
+ List * procs ;
572
578
}pg_qs_fctx ;
573
579
574
580
FuncCallContext * funcctx ;
@@ -591,6 +597,7 @@ pg_query_state(PG_FUNCTION_ARGS)
591
597
Oid counterpart_user_id ;
592
598
shm_mq_msg * msg ;
593
599
List * bg_worker_procs = NIL ;
600
+ List * msgs ;
594
601
595
602
if (!module_initialized )
596
603
ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -630,14 +637,14 @@ pg_query_state(PG_FUNCTION_ARGS)
630
637
631
638
bg_worker_procs = GetRemoteBackendWorkers (proc );
632
639
633
- msg = GetRemoteBackendQueryState ( proc ,
634
- bg_worker_procs ,
635
- verbose ,
636
- costs ,
637
- timing ,
638
- buffers ,
639
- triggers ,
640
- format );
640
+ msgs = GetRemoteBackendQueryStates ( lcons ( proc , bg_worker_procs ) ,
641
+ verbose ,
642
+ costs ,
643
+ timing ,
644
+ buffers ,
645
+ triggers ,
646
+ format );
647
+ msg = ( shm_mq_msg * ) linitial ( msgs );
641
648
642
649
funcctx = SRF_FIRSTCALL_INIT ();
643
650
switch (msg -> result_code )
@@ -661,8 +668,9 @@ pg_query_state(PG_FUNCTION_ARGS)
661
668
SRF_RETURN_DONE (funcctx );
662
669
case QS_RETURNED :
663
670
{
664
- List * qs_stack ;
665
671
TupleDesc tupdesc ;
672
+ ListCell * i ;
673
+ int64 max_calls = 0 ;
666
674
667
675
/* print warnings if exist */
668
676
if (msg -> warnings & TIMINIG_OFF_WARNING )
@@ -676,13 +684,28 @@ pg_query_state(PG_FUNCTION_ARGS)
676
684
677
685
/* save stack of calls and current cursor in multicall context */
678
686
fctx = (pg_qs_fctx * )palloc (sizeof (pg_qs_fctx ));
679
- qs_stack = deserialize_stack (msg -> stack ,msg -> stack_depth );
680
- fctx -> stack = qs_stack ;
681
- fctx -> index = 0 ;
682
- fctx -> cursor = list_head (qs_stack );
687
+ fctx -> procs = NIL ;
688
+ foreach (i ,msgs )
689
+ {
690
+ List * qs_stack ;
691
+ shm_mq_msg * msg = (shm_mq_msg * )lfirst (i );
692
+ proc_state * p_state = (proc_state * )palloc (sizeof (proc_state ));
693
+
694
+ qs_stack = deserialize_stack (msg -> stack ,msg -> stack_depth );
695
+
696
+ p_state -> proc = msg -> proc ;
697
+ p_state -> stack = qs_stack ;
698
+ p_state -> frame_index = 0 ;
699
+ p_state -> frame_cursor = list_head (qs_stack );
700
+
701
+ fctx -> procs = lappend (fctx -> procs ,p_state );
702
+
703
+ max_calls += list_length (qs_stack );
704
+ }
705
+ fctx -> proc_cursor = list_head (fctx -> procs );
683
706
684
707
funcctx -> user_fctx = fctx ;
685
- funcctx -> max_calls = list_length ( qs_stack ) ;
708
+ funcctx -> max_calls = max_calls ;
686
709
687
710
/* Make tuple descriptor */
688
711
tupdesc = CreateTemplateTupleDesc (N_ATTRS , false);
@@ -706,24 +729,31 @@ pg_query_state(PG_FUNCTION_ARGS)
706
729
707
730
if (funcctx -> call_cntr < funcctx -> max_calls )
708
731
{
709
- HeapTuple tuple ;
710
- Datum values [N_ATTRS ];
711
- bool nulls [N_ATTRS ];
712
- stack_frame * frame = (stack_frame * )lfirst (fctx -> cursor );
732
+ HeapTuple tuple ;
733
+ Datum values [N_ATTRS ];
734
+ bool nulls [N_ATTRS ];
735
+ proc_state * p_state = (proc_state * )lfirst (fctx -> proc_cursor );
736
+ stack_frame * frame = (stack_frame * )lfirst (p_state -> frame_cursor );
713
737
714
738
/* Make and return next tuple to caller */
715
739
MemSet (values ,0 ,sizeof (values ));
716
740
MemSet (nulls ,0 ,sizeof (nulls ));
717
- values [0 ]= Int32GetDatum (pid );
718
- values [1 ]= Int32GetDatum (fctx -> index );
741
+ values [0 ]= Int32GetDatum (p_state -> proc -> pid );
742
+ values [1 ]= Int32GetDatum (p_state -> frame_index );
719
743
values [2 ]= PointerGetDatum (frame -> query );
720
744
values [3 ]= PointerGetDatum (frame -> plan );
721
- nulls [4 ]= true;
745
+ if (p_state -> proc -> pid == pid )
746
+ nulls [4 ]= true;
747
+ else
748
+ values [4 ]= Int32GetDatum (pid );
722
749
tuple = heap_form_tuple (funcctx -> tuple_desc ,values ,nulls );
723
750
724
751
/* increment cursor */
725
- fctx -> cursor = lnext (fctx -> cursor );
726
- fctx -> index ++ ;
752
+ p_state -> frame_cursor = lnext (p_state -> frame_cursor );
753
+ p_state -> frame_index ++ ;
754
+
755
+ if (p_state -> frame_cursor == NULL )
756
+ fctx -> proc_cursor = lnext (fctx -> proc_cursor );
727
757
728
758
SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (tuple ));
729
759
}
@@ -1017,22 +1047,26 @@ GetRemoteBackendWorkers(PGPROC *proc)
1017
1047
}
1018
1048
1019
1049
static shm_mq_msg *
1020
- GetRemoteBackendQueryState (PGPROC * proc ,
1021
- List * parallel_workers ,
1022
- bool verbose ,
1023
- bool costs ,
1024
- bool timing ,
1025
- bool buffers ,
1026
- bool triggers ,
1027
- ExplainFormat format )
1050
+ copy_msg (shm_mq_msg * msg )
1028
1051
{
1029
- shm_mq_msg * msg ;
1030
- shm_mq_handle * mqh ;
1031
- shm_mq_result mq_receive_result ;
1032
- int sig_result ;
1033
- Size len ;
1052
+ shm_mq_msg * result = palloc (msg -> length );
1053
+
1054
+ memcpy (result ,msg ,msg -> length );
1055
+ return result ;
1056
+ }
1057
+
1058
+ static List *
1059
+ GetRemoteBackendQueryStates (List * procs ,
1060
+ bool verbose ,
1061
+ bool costs ,
1062
+ bool timing ,
1063
+ bool buffers ,
1064
+ bool triggers ,
1065
+ ExplainFormat format )
1066
+ {
1067
+ List * result = NIL ;
1068
+ ListCell * i ;
1034
1069
1035
- Assert (proc && proc -> backendId != InvalidBackendId );
1036
1070
Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
1037
1071
Assert (mq );
1038
1072
@@ -1045,26 +1079,41 @@ GetRemoteBackendQueryState(PGPROC *proc,
1045
1079
params -> format = format ;
1046
1080
pg_write_barrier ();
1047
1081
1048
- /* prepare message queue to transfer data */
1049
- mq = shm_mq_create (mq ,QUEUE_SIZE );
1050
- shm_mq_set_sender (mq ,proc );
1051
- shm_mq_set_receiver (mq ,MyProc );
1082
+ foreach (i ,procs )
1083
+ {
1084
+ PGPROC * proc = (PGPROC * )lfirst (i );
1085
+ shm_mq_msg * msg ;
1086
+ shm_mq_handle * mqh ;
1087
+ shm_mq_result mq_receive_result ;
1088
+ int sig_result ;
1089
+ Size len ;
1052
1090
1053
- /* send signal to specified backend to extract its state */
1054
- sig_result = SendProcSignal (proc -> pid ,QueryStatePollReason ,proc -> backendId );
1055
- if (sig_result == -1 )
1056
- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1057
- errmsg ("invalid send signal" )));
1091
+ Assert (proc && proc -> backendId != InvalidBackendId );
1058
1092
1059
- /* retrieve data from message queue */
1060
- mqh = shm_mq_attach (mq ,NULL ,NULL );
1061
- mq_receive_result = shm_mq_receive_with_timeout (mqh ,& len , (void * * )& msg ,5000 );
1062
- if (mq_receive_result != SHM_MQ_SUCCESS )
1063
- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1064
- errmsg ("invalid read from message queue" )));
1065
- shm_mq_detach (mq );
1093
+ /* prepare message queue to transfer data */
1094
+ mq = shm_mq_create (mq ,QUEUE_SIZE );
1095
+ shm_mq_set_sender (mq ,proc );
1096
+ shm_mq_set_receiver (mq ,MyProc );
1097
+
1098
+ /* send signal to specified backend to extract its state */
1099
+ sig_result = SendProcSignal (proc -> pid ,QueryStatePollReason ,proc -> backendId );
1100
+ if (sig_result == -1 )
1101
+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1102
+ errmsg ("invalid send signal" )));
1103
+
1104
+ /* retrieve data from message queue */
1105
+ mqh = shm_mq_attach (mq ,NULL ,NULL );
1106
+ mq_receive_result = shm_mq_receive_with_timeout (mqh ,& len , (void * * )& msg ,5000 );
1107
+ if (mq_receive_result != SHM_MQ_SUCCESS )
1108
+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1109
+ errmsg ("invalid read from message queue" )));
1066
1110
1067
- Assert ( len == msg -> length );
1111
+ result = lappend ( result , copy_msg ( msg ) );
1068
1112
1069
- return msg ;
1113
+ shm_mq_detach (mq );
1114
+
1115
+ Assert (len == msg -> length );
1116
+ }
1117
+
1118
+ return result ;
1070
1119
}