@@ -89,7 +89,8 @@ static void SendCurrentUserId(void);
8989static void SendBgWorkerPids (void );
9090static Oid GetRemoteBackendUserId (PGPROC * proc );
9191static List * GetRemoteBackendWorkers (PGPROC * proc );
92- static List * GetRemoteBackendQueryStates (List * procs ,
92+ static List * GetRemoteBackendQueryStates (PGPROC * leader ,
93+ List * pworkers ,
9394bool verbose ,
9495bool costs ,
9596bool timing ,
@@ -533,7 +534,8 @@ pg_query_state(PG_FUNCTION_ARGS)
533534
534535bg_worker_procs = GetRemoteBackendWorkers (proc );
535536
536- msgs = GetRemoteBackendQueryStates (lcons (proc ,bg_worker_procs ),
537+ msgs = GetRemoteBackendQueryStates (proc ,
538+ bg_worker_procs ,
537539verbose ,
538540costs ,
539541timing ,
@@ -855,9 +857,7 @@ GetRemoteBackendWorkers(PGPROC *proc)
855857return NIL ;
856858
857859mqh = shm_mq_attach (mq ,NULL ,NULL );
858- mq_receive_result = shm_mq_receive_with_timeout (mqh ,& msg_len ,
859- (void * * )& msg ,
860- MIN_TIMEOUT );
860+ mq_receive_result = shm_mq_receive (mqh ,& msg_len , (void * * )& msg , false);
861861if (mq_receive_result != SHM_MQ_SUCCESS )
862862return NIL ;
863863
@@ -884,7 +884,8 @@ copy_msg(shm_mq_msg *msg)
884884}
885885
886886static List *
887- GetRemoteBackendQueryStates (List * procs ,
887+ GetRemoteBackendQueryStates (PGPROC * leader ,
888+ List * pworkers ,
888889bool verbose ,
889890bool costs ,
890891bool timing ,
@@ -895,6 +896,11 @@ GetRemoteBackendQueryStates(List *procs,
895896List * result = NIL ;
896897List * alive_procs = NIL ;
897898ListCell * iter ;
899+ int sig_result ;
900+ shm_mq_handle * mqh ;
901+ shm_mq_result mq_receive_result ;
902+ shm_mq_msg * msg ;
903+ Size len ;
898904
899905Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
900906Assert (mq );
@@ -912,10 +918,14 @@ GetRemoteBackendQueryStates(List *procs,
912918 * send signal `QueryStatePollReason` to all processes and define all alive
913919 * ones
914920 */
915- foreach (iter ,procs )
921+ sig_result = SendProcSignal (leader -> pid ,
922+ QueryStatePollReason ,
923+ leader -> backendId );
924+ if (sig_result == -1 )
925+ gotosignal_error ;
926+ foreach (iter ,pworkers )
916927{
917928PGPROC * proc = (PGPROC * )lfirst (iter );
918- int sig_result ;
919929
920930sig_result = SendProcSignal (proc -> pid ,
921931QueryStatePollReason ,
@@ -930,16 +940,24 @@ GetRemoteBackendQueryStates(List *procs,
930940alive_procs = lappend (alive_procs ,proc );
931941}
932942
943+ /* extract query state from leader process */
944+ mq = shm_mq_create (mq ,QUEUE_SIZE );
945+ shm_mq_set_sender (mq ,leader );
946+ shm_mq_set_receiver (mq ,MyProc );
947+ mqh = shm_mq_attach (mq ,NULL ,NULL );
948+ mq_receive_result = shm_mq_receive (mqh ,& len , (void * * )& msg , false);
949+ if (mq_receive_result != SHM_MQ_SUCCESS )
950+ gotomq_error ;
951+ Assert (len == msg -> length );
952+ result = lappend (result ,copy_msg (msg ));
953+ shm_mq_detach (mq );
954+
933955/*
934- * collect results from all alivedprocesses
956+ * collect results from all alivedparallel workers
935957 */
936958foreach (iter ,alive_procs )
937959{
938960PGPROC * proc = (PGPROC * )lfirst (iter );
939- shm_mq_handle * mqh ;
940- shm_mq_result mq_receive_result ;
941- shm_mq_msg * msg ;
942- Size len ;
943961
944962/* prepare message queue to transfer data */
945963mq = shm_mq_create (mq ,QUEUE_SIZE );
@@ -953,7 +971,7 @@ GetRemoteBackendQueryStates(List *procs,
953971mq_receive_result = shm_mq_receive_with_timeout (mqh ,
954972& len ,
955973(void * * )& msg ,
956- 2 * MIN_TIMEOUT );
974+ MIN_TIMEOUT );
957975if (mq_receive_result != SHM_MQ_SUCCESS )
958976/* counterpart is died, not consider it */
959977continue ;
@@ -971,4 +989,7 @@ GetRemoteBackendQueryStates(List *procs,
971989signal_error :
972990ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
973991errmsg ("invalid send signal" )));
992+ mq_error :
993+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
994+ errmsg ("error in message queue data transmitting" )));
974995}