@@ -66,9 +66,9 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6666
6767/* Global variables */
6868List * QueryDescStack = NIL ;
69- static ProcSignalReason UserIdPollReason ;
70- static ProcSignalReason QueryStatePollReason ;
71- static ProcSignalReason WorkerPollReason ;
69+ static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL ;
70+ static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL ;
71+ static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL ;
7272static bool module_initialized = false;
7373static const char * be_state_str []= {/* BackendState -> string repr */
7474"undefined" ,/* STATE_UNDEFINED */
@@ -107,9 +107,9 @@ typedef struct
107107}trace_request ;
108108
109109static void SendCurrentUserId (void );
110- static void SendWorkerPids (void );
110+ static void SendBgWorkerPids (void );
111111static Oid GetRemoteBackendUserId (PGPROC * proc );
112- static List * GetRemoteBackendWorkers (PGPROC * proc , int * error_code );
112+ static List * GetRemoteBackendWorkers (PGPROC * proc );
113113
114114/* Shared memory variables */
115115shm_toc * toc = NULL ;
@@ -208,7 +208,7 @@ _PG_init(void)
208208/* Register interrupt on custom signal of polling query state */
209209UserIdPollReason = RegisterCustomProcSignalHandler (SendCurrentUserId );
210210QueryStatePollReason = RegisterCustomProcSignalHandler (SendQueryState );
211- WorkerPollReason = RegisterCustomProcSignalHandler (SendWorkerPids );
211+ WorkerPollReason = RegisterCustomProcSignalHandler (SendBgWorkerPids );
212212if (QueryStatePollReason == INVALID_PROCSIGNAL
213213|| WorkerPollReason == INVALID_PROCSIGNAL
214214|| UserIdPollReason == INVALID_PROCSIGNAL )
@@ -571,21 +571,22 @@ pg_query_state(PG_FUNCTION_ARGS)
571571
572572if (SRF_IS_FIRSTCALL ())
573573{
574- LOCKTAG tag ;
575- bool verbose = PG_GETARG_BOOL (1 ),
576- costs = PG_GETARG_BOOL (2 ),
577- timing = PG_GETARG_BOOL (3 ),
578- buffers = PG_GETARG_BOOL (4 ),
579- triggers = PG_GETARG_BOOL (5 );
574+ LOCKTAG tag ;
575+ bool verbose = PG_GETARG_BOOL (1 ),
576+ costs = PG_GETARG_BOOL (2 ),
577+ timing = PG_GETARG_BOOL (3 ),
578+ buffers = PG_GETARG_BOOL (4 ),
579+ triggers = PG_GETARG_BOOL (5 );
580580text * format_text = PG_GETARG_TEXT_P (6 );
581- ExplainFormat format ;
581+ ExplainFormat format ;
582582PGPROC * proc ;
583- Oid counterpart_user_id ;
583+ Oid counterpart_user_id ;
584584shm_mq_handle * mqh ;
585- shm_mq_result mq_receive_result ;
586- int send_signal_result ;
587- Size len ;
585+ shm_mq_result mq_receive_result ;
586+ int send_signal_result ;
587+ Size len ;
588588shm_mq_msg * msg ;
589+ List * bg_worker_pids = NIL ;
589590
590591if (!module_initialized )
591592ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -631,6 +632,8 @@ pg_query_state(PG_FUNCTION_ARGS)
631632params -> triggers = triggers ;
632633params -> format = format ;
633634
635+ bg_worker_pids = GetRemoteBackendWorkers (proc );
636+
634637/* prepare message queue to transfer data */
635638mq = shm_mq_create (mq ,QUEUE_SIZE );
636639shm_mq_set_sender (mq ,proc );
@@ -843,8 +846,13 @@ GetRemoteBackendUserId(PGPROC *proc)
843846{
844847Oid result ;
845848
849+ Assert (proc && proc -> backendId != InvalidBackendId );
850+ Assert (UserIdPollReason != INVALID_PROCSIGNAL );
851+ Assert (counterpart_userid );
852+
846853counterpart_userid -> userid = InvalidOid ;
847854counterpart_userid -> caller = MyLatch ;
855+ pg_write_barrier ();
848856
849857SendProcSignal (proc -> pid ,UserIdPollReason ,proc -> backendId );
850858for (;;)
@@ -864,8 +872,54 @@ GetRemoteBackendUserId(PGPROC *proc)
864872return result ;
865873}
866874
875+ /*
876+ * Receive a message from a shared message queue until timeout is exceeded.
877+ *
878+ * Parameter `*nbytes` is set to the message length and *data to point to the
879+ * message payload. If timeout is exceeded SHM_MQ_WOULD_BLOCK is returned.
880+ */
881+ static shm_mq_result
882+ shm_mq_receive_with_timeout (shm_mq_handle * mqh ,
883+ Size * nbytesp ,
884+ void * * datap ,
885+ long timeout )
886+ {
887+
888+ #ifdef HAVE_INT64_TIMESTAMP
889+ #define GetNowFloat ()((float8) GetCurrentTimestamp() / 1000.0)
890+ #else
891+ #define GetNowFloat ()1000.0 * GetCurrentTimestamp()
892+ #endif
893+
894+ float8 endtime = GetNowFloat ()+ timeout ;
895+ int rc = 0 ;
896+
897+ for (;;)
898+ {
899+ long delay ;
900+ shm_mq_result mq_receive_result ;
901+
902+ mq_receive_result = shm_mq_receive (mqh ,nbytesp ,datap , true);
903+
904+ if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
905+ return mq_receive_result ;
906+
907+ if (rc & WL_TIMEOUT )
908+ return SHM_MQ_WOULD_BLOCK ;
909+
910+ delay = (long ) (endtime - GetNowFloat ());
911+ rc = WaitLatch (MyLatch ,WL_LATCH_SET |WL_TIMEOUT ,delay );
912+ CHECK_FOR_INTERRUPTS ();
913+ ResetLatch (MyLatch );
914+ }
915+ }
916+
917+ /*
918+ * Extract to *result pids of all parallel workers running from leader process
919+ * that executes plan tree whose state root is `node`.
920+ */
867921static bool
868- extract_worker_handles (PlanState * node ,List * * result )
922+ extract_running_bgworkers (PlanState * node ,List * * result )
869923{
870924if (node == NULL )
871925return false;
@@ -879,10 +933,11 @@ extract_worker_handles(PlanState *node, List **result)
879933{
880934for (i = 0 ;i < gather_node -> pei -> pcxt -> nworkers_launched ;i ++ )
881935{
882- pid_t pid ;
883- BackgroundWorkerHandle * bgwh = gather_node -> pei -> pcxt -> worker [ i ]. bgwhandle ;
884- BgwHandleStatus status ;
936+ pid_t pid ;
937+ BackgroundWorkerHandle * bgwh ;
938+ BgwHandleStatus status ;
885939
940+ bgwh = gather_node -> pei -> pcxt -> worker [i ].bgwhandle ;
886941if (!bgwh )
887942continue ;
888943
@@ -892,37 +947,40 @@ extract_worker_handles(PlanState *node, List **result)
892947}
893948}
894949}
895- return planstate_tree_walker (node ,extract_worker_handles , (void * )result );
950+ return planstate_tree_walker (node ,extract_running_bgworkers , (void * )result );
896951}
897952
898953typedef struct
899954{
900- int num ;
901- pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
902- }workers_msg ;
955+ int number ;
956+ pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
957+ }BgWorkerPids ;
903958
904959static void
905- SendWorkerPids (void )
960+ SendBgWorkerPids (void )
906961{
907- ListCell * iter ;
908- List * all_workers = NIL ;
909- workers_msg * msg ;
910- int msg_len ;
911- int i ;
912- shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
962+ ListCell * iter ;
963+ List * all_workers = NIL ;
964+ BgWorkerPids * msg ;
965+ int msg_len ;
966+ int i ;
967+ shm_mq_handle * mqh ;
968+
969+ mqh = shm_mq_attach (mq ,NULL ,NULL );
913970
914971foreach (iter ,QueryDescStack )
915972{
916973QueryDesc * curQueryDesc = (QueryDesc * )lfirst (iter );
917974List * bgworker_pids = NIL ;
918975
919- extract_worker_handles (curQueryDesc -> planstate ,& bgworker_pids );
976+ extract_running_bgworkers (curQueryDesc -> planstate ,& bgworker_pids );
920977all_workers = list_concat (all_workers ,bgworker_pids );
921978}
922979
923- msg_len = offsetof(workers_msg ,pids )+ sizeof (pid_t )* list_length (all_workers );
980+ msg_len = offsetof(BgWorkerPids ,pids )
981+ + sizeof (pid_t )* list_length (all_workers );
924982msg = palloc (msg_len );
925- msg -> num = list_length (all_workers );
983+ msg -> number = list_length (all_workers );
926984i = 0 ;
927985foreach (iter ,all_workers )
928986msg -> pids [i ++ ]= lfirst_int (iter );
@@ -931,44 +989,40 @@ SendWorkerPids(void)
931989}
932990
933991/*
992+ * Extracts all parallel worker pids running by process `proc`
993+ */
934994List *
935- GetRemoteBackendWorkers(PGPROC *proc, int *error_code )
995+ GetRemoteBackendWorkers (PGPROC * proc )
936996{
937- intsig_result;
997+ int sig_result ;
938998shm_mq_handle * mqh ;
939- shm_mq_result mq_receive_result;
940- workers_msg *msg;
941- Sizemsg_len;
942- inti;
999+ shm_mq_result mq_receive_result ;
1000+ BgWorkerPids * msg ;
1001+ Size msg_len ;
1002+ int i ;
9431003List * result = NIL ;
9441004
945- if (proc->backendId == InvalidBackendId)
946- {
947- return NIL;
948- }
1005+ Assert (proc && proc -> backendId != InvalidBackendId );
1006+ Assert (WorkerPollReason != INVALID_PROCSIGNAL );
1007+ Assert (mq );
9491008
9501009mq = shm_mq_create (mq ,QUEUE_SIZE );
9511010shm_mq_set_sender (mq ,proc );
9521011shm_mq_set_receiver (mq ,MyProc );
9531012
9541013sig_result = SendProcSignal (proc -> pid ,WorkerPollReason ,proc -> backendId );
9551014if (sig_result == -1 )
956- {
9571015return NIL ;
958- }
9591016
9601017mqh = shm_mq_attach (mq ,NULL ,NULL );
9611018mq_receive_result = shm_mq_receive_with_timeout (mqh ,& msg_len , (void * * )& msg ,1000 );
9621019if (mq_receive_result != SHM_MQ_SUCCESS )
963- {
9641020return NIL ;
965- }
9661021
967- for (i = 0; i < msg->num ; i++)
1022+ for (i = 0 ;i < msg -> number ;i ++ )
9681023result = lcons_int (msg -> pids [i ],result );
9691024
9701025shm_mq_detach (mq );
9711026
9721027return result ;
9731028}
974- */