@@ -110,6 +110,14 @@ static void SendCurrentUserId(void);
110110static void SendBgWorkerPids (void );
111111static Oid GetRemoteBackendUserId (PGPROC * proc );
112112static List * GetRemoteBackendWorkers (PGPROC * proc );
113+ static shm_mq_msg * GetRemoteBackendQueryState (PGPROC * proc ,
114+ List * parallel_workers ,
115+ bool verbose ,
116+ bool costs ,
117+ bool timing ,
118+ bool buffers ,
119+ bool triggers ,
120+ ExplainFormat format );
113121
114122/* Shared memory variables */
115123shm_toc * toc = NULL ;
@@ -581,12 +589,8 @@ pg_query_state(PG_FUNCTION_ARGS)
581589ExplainFormat format ;
582590PGPROC * proc ;
583591Oid counterpart_user_id ;
584- shm_mq_handle * mqh ;
585- shm_mq_result mq_receive_result ;
586- int send_signal_result ;
587- Size len ;
588592shm_mq_msg * msg ;
589- List * bg_worker_pids = NIL ;
593+ List * bg_worker_procs = NIL ;
590594
591595if (!module_initialized )
592596ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -624,36 +628,16 @@ pg_query_state(PG_FUNCTION_ARGS)
624628ereport (ERROR , (errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
625629errmsg ("permission denied" )));
626630
627- /* fill in parameters of query state request */
628- params -> verbose = verbose ;
629- params -> costs = costs ;
630- params -> timing = timing ;
631- params -> buffers = buffers ;
632- params -> triggers = triggers ;
633- params -> format = format ;
634-
635- bg_worker_pids = GetRemoteBackendWorkers (proc );
636-
637- /* prepare message queue to transfer data */
638- mq = shm_mq_create (mq ,QUEUE_SIZE );
639- shm_mq_set_sender (mq ,proc );
640- shm_mq_set_receiver (mq ,MyProc );
641-
642- /* send signal to specified backend to extract its state */
643- send_signal_result = SendProcSignal (pid ,QueryStatePollReason ,proc -> backendId );
644- if (send_signal_result == -1 )
645- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
646- errmsg ("invalid send signal" )));
647-
648- /* retrieve data from message queue */
649- mqh = shm_mq_attach (mq ,NULL ,NULL );
650- mq_receive_result = shm_mq_receive (mqh ,& len , (void * * )& msg , false);
651- if (mq_receive_result != SHM_MQ_SUCCESS )
652- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
653- errmsg ("invalid read from message queue" )));
654- shm_mq_detach (mq );
655-
656- Assert (len == msg -> length );
631+ bg_worker_procs = GetRemoteBackendWorkers (proc );
632+
633+ msg = GetRemoteBackendQueryState (proc ,
634+ bg_worker_procs ,
635+ verbose ,
636+ costs ,
637+ timing ,
638+ buffers ,
639+ triggers ,
640+ format );
657641
658642funcctx = SRF_FIRSTCALL_INIT ();
659643switch (msg -> result_code )
@@ -989,9 +973,9 @@ SendBgWorkerPids(void)
989973}
990974
991975/*
992- * Extracts all parallel workerpids running by process `proc`
976+ * Extracts all parallel worker`proc`s running by process `proc`
993977 */
994- List *
978+ static List *
995979GetRemoteBackendWorkers (PGPROC * proc )
996980{
997981int sig_result ;
@@ -1020,9 +1004,67 @@ GetRemoteBackendWorkers(PGPROC *proc)
10201004return NIL ;
10211005
10221006for (i = 0 ;i < msg -> number ;i ++ )
1023- result = lcons_int (msg -> pids [i ],result );
1007+ {
1008+ pid_t pid = msg -> pids [i ];
1009+ PGPROC * proc = BackendPidGetProc (pid );
1010+
1011+ result = lcons (proc ,result );
1012+ }
10241013
10251014shm_mq_detach (mq );
10261015
10271016return result ;
10281017}
1018+
1019+ static shm_mq_msg *
1020+ GetRemoteBackendQueryState (PGPROC * proc ,
1021+ List * parallel_workers ,
1022+ bool verbose ,
1023+ bool costs ,
1024+ bool timing ,
1025+ bool buffers ,
1026+ bool triggers ,
1027+ ExplainFormat format )
1028+ {
1029+ shm_mq_msg * msg ;
1030+ shm_mq_handle * mqh ;
1031+ shm_mq_result mq_receive_result ;
1032+ int sig_result ;
1033+ Size len ;
1034+
1035+ Assert (proc && proc -> backendId != InvalidBackendId );
1036+ Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
1037+ Assert (mq );
1038+
1039+ /* fill in parameters of query state request */
1040+ params -> verbose = verbose ;
1041+ params -> costs = costs ;
1042+ params -> timing = timing ;
1043+ params -> buffers = buffers ;
1044+ params -> triggers = triggers ;
1045+ params -> format = format ;
1046+ pg_write_barrier ();
1047+
1048+ /* prepare message queue to transfer data */
1049+ mq = shm_mq_create (mq ,QUEUE_SIZE );
1050+ shm_mq_set_sender (mq ,proc );
1051+ shm_mq_set_receiver (mq ,MyProc );
1052+
1053+ /* send signal to specified backend to extract its state */
1054+ sig_result = SendProcSignal (proc -> pid ,QueryStatePollReason ,proc -> backendId );
1055+ if (sig_result == -1 )
1056+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1057+ errmsg ("invalid send signal" )));
1058+
1059+ /* retrieve data from message queue */
1060+ mqh = shm_mq_attach (mq ,NULL ,NULL );
1061+ mq_receive_result = shm_mq_receive_with_timeout (mqh ,& len , (void * * )& msg ,5000 );
1062+ if (mq_receive_result != SHM_MQ_SUCCESS )
1063+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1064+ errmsg ("invalid read from message queue" )));
1065+ shm_mq_detach (mq );
1066+
1067+ Assert (len == msg -> length );
1068+
1069+ return msg ;
1070+ }