44 *
55 * Copyright (c) 2016-2016, Postgres Professional
66 *
7- * IDENTIFICATION
87 * contrib/pg_query_state/pg_query_state.c
8+ * IDENTIFICATION
99 */
1010
1111#include "pg_query_state.h"
1212
1313#include "access/htup_details.h"
1414#include "catalog/pg_type.h"
1515#include "funcapi.h"
16+ #include "executor/execParallel.h"
1617#include "executor/executor.h"
1718#include "miscadmin.h"
19+ #include "nodes/nodeFuncs.h"
20+ #include "nodes/print.h"
1821#include "pgstat.h"
22+ #include "postmaster/bgworker.h"
1923#include "storage/ipc.h"
2024#include "storage/procarray.h"
2125#include "storage/procsignal.h"
@@ -63,6 +67,7 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6367List * QueryDescStack = NIL ;
6468static ProcSignalReason QueryStatePollReason ;
6569static ProcSignalReason RolePollReason ;
70+ static ProcSignalReason WorkerPollReason ;
6671static bool module_initialized = false;
6772static const char * be_state_str []= {/* BackendState -> string repr */
6873"undefined" ,/* STATE_UNDEFINED */
@@ -93,8 +98,10 @@ typedef struct
9398pid_t traceable ;
9499}trace_request ;
95100
96- static void SendCurrentRoleOid (void );
97- Oid GetRemoteBackendUser (pid_t pid ,int * error_code );
101+ static void SendCurrentUserId (void );
102+ Oid GetRemoteBackendUserId (PGPROC * proc ,int * error_code );
103+ static void SendWorkerPids (void );
104+ List * GetRemoteBackendWorkers (PGPROC * proc ,int * error_code );
98105
99106/* Shared memory variables */
100107shm_toc * toc = NULL ;
@@ -187,8 +194,10 @@ _PG_init(void)
187194
188195/* Register interrupt on custom signal of polling query state */
189196QueryStatePollReason = RegisterCustomProcSignalHandler (SendQueryState );
190- RolePollReason = RegisterCustomProcSignalHandler (SendCurrentRoleOid );
191- if (QueryStatePollReason == INVALID_PROCSIGNAL || RolePollReason == INVALID_PROCSIGNAL )
197+ RolePollReason = RegisterCustomProcSignalHandler (SendCurrentUserId );
198+ WorkerPollReason = RegisterCustomProcSignalHandler (SendWorkerPids );
199+ if (QueryStatePollReason == INVALID_PROCSIGNAL || RolePollReason == INVALID_PROCSIGNAL
200+ || WorkerPollReason == INVALID_PROCSIGNAL )
192201{
193202ereport (WARNING , (errcode (ERRCODE_INSUFFICIENT_RESOURCES ),
194203errmsg ("pg_query_state isn't loaded: insufficient custom ProcSignal slots" )));
@@ -615,9 +624,6 @@ pg_query_state(PG_FUNCTION_ARGS)
615624init_lock_tag (& tag ,PG_QUERY_STATE_KEY );
616625LockAcquire (& tag ,ExclusiveLock , false, false);
617626
618- int error_code ;
619- Oid user_id = GetRemoteBackendUser (pid ,& error_code );
620-
621627/* fill in caller's user data */
622628caller -> user_id = GetUserId ();
623629caller -> superuser = superuser ();
@@ -827,56 +833,55 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, Size *nbytesp, void **datap, lon
827833{
828834
829835#ifdef HAVE_INT64_TIMESTAMP
830- #define GetNowLong ()((long ) GetCurrentTimestamp() / 1000)
836+ #define GetNowFloat ()((float8 ) GetCurrentTimestamp() / 1000.0 )
831837#else
832- #define GetNowLong ()1000 * GetCurrentTimestamp()
838+ #define GetNowFloat ()1000.0 * GetCurrentTimestamp()
833839#endif
834840
835- long endtime = GetNowLong ()+ timeout ;
841+ float8 endtime = GetNowFloat ()+ timeout ;
842+ int rc = 0 ;
836843
837844for (;;)
838845{
839- int rc ;
840846long delay ;
841847shm_mq_result mq_receive_result = shm_mq_receive (mqh ,nbytesp ,datap , true);
842848
843849if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
844850return mq_receive_result ;
845851
846- delay = endtime - GetNowLong ();
852+ if (rc & WL_TIMEOUT )
853+ return SHM_MQ_WOULD_BLOCK ;
854+
855+ delay = (long ) (endtime - GetNowFloat ());
847856rc = WaitLatch (MyLatch ,WL_LATCH_SET |WL_TIMEOUT ,delay );
848857CHECK_FOR_INTERRUPTS ();
849858ResetLatch (MyLatch );
850-
851- if (rc & WL_TIMEOUT )
852- return SHM_MQ_WOULD_BLOCK ;
853859}
854860}
855861
856862static void
857- SendCurrentRoleOid (void )
863+ SendCurrentUserId (void )
858864{
859865shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
860- Oid role_oid = GetUserId ();
866+ Oid user_oid = GetUserId ();
861867
862- shm_mq_send (mqh ,sizeof (Oid ),& role_oid , false);
868+ shm_mq_send (mqh ,sizeof (Oid ),& user_oid , false);
863869}
864870
865871#define NOT_BACKEND_PROCESS 1
866872#define COULD_NOT_SEND_SIGNAL 2
867873#define INVALID_MQ_READ 3
868874
869875Oid
870- GetRemoteBackendUser ( pid_t pid ,int * error_code )
876+ GetRemoteBackendUserId ( PGPROC * proc ,int * error_code )
871877{
872- PGPROC * proc = BackendPidGetProc (pid );
873878int sig_result ;
874- shm_mq_handle * mqh ;
875- shm_mq_result mq_receive_result ;
879+ shm_mq_handle * mqh ;
880+ shm_mq_result mq_receive_result ;
876881Oid * result ;
877882Size res_len ;
878883
879- if (proc == NULL || proc -> backendId == InvalidBackendId )
884+ if (proc -> backendId == InvalidBackendId )
880885{
881886* error_code = NOT_BACKEND_PROCESS ;
882887return InvalidOid ;
@@ -886,7 +891,7 @@ GetRemoteBackendUser(pid_t pid, int *error_code)
886891shm_mq_set_sender (mq ,proc );
887892shm_mq_set_receiver (mq ,MyProc );
888893
889- sig_result = SendProcSignal (pid ,RolePollReason ,proc -> backendId );
894+ sig_result = SendProcSignal (proc -> pid ,RolePollReason ,proc -> backendId );
890895if (sig_result == -1 )
891896{
892897* error_code = COULD_NOT_SEND_SIGNAL ;
@@ -905,3 +910,113 @@ GetRemoteBackendUser(pid_t pid, int *error_code)
905910
906911return * result ;
907912}
913+
914+ static bool
915+ extract_worker_handles (PlanState * node ,List * * result )
916+ {
917+ if (node == NULL )
918+ return false;
919+
920+ if (IsA (node ,GatherState ))
921+ {
922+ GatherState * gather_node = (GatherState * )node ;
923+ int i ;
924+
925+ if (gather_node -> pei )
926+ {
927+ for (i = 0 ;i < gather_node -> pei -> pcxt -> nworkers_launched ;i ++ )
928+ {
929+ pid_t pid ;
930+ BackgroundWorkerHandle * bgwh = gather_node -> pei -> pcxt -> worker [i ].bgwhandle ;
931+ BgwHandleStatus status ;
932+
933+ if (!bgwh )
934+ continue ;
935+
936+ status = GetBackgroundWorkerPid (bgwh ,& pid );
937+ if (status == BGWH_STARTED )
938+ * result = lcons_int (pid ,* result );
939+ }
940+ }
941+ }
942+ return planstate_tree_walker (node ,extract_worker_handles , (void * )result );
943+ }
944+
945+ typedef struct
946+ {
947+ int num ;
948+ pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
949+ }workers_msg ;
950+
951+ static void
952+ SendWorkerPids (void )
953+ {
954+ ListCell * iter ;
955+ List * all_workers = NIL ;
956+ workers_msg * msg ;
957+ int msg_len ;
958+ int i ;
959+ shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
960+
961+ foreach (iter ,QueryDescStack )
962+ {
963+ QueryDesc * curQueryDesc = (QueryDesc * )lfirst (iter );
964+ List * bgworker_pids = NIL ;
965+
966+ extract_worker_handles (curQueryDesc -> planstate ,& bgworker_pids );
967+ all_workers = list_concat (all_workers ,bgworker_pids );
968+ }
969+
970+ msg_len = offsetof(workers_msg ,pids )+ sizeof (pid_t )* list_length (all_workers );
971+ msg = palloc (msg_len );
972+ msg -> num = list_length (all_workers );
973+ i = 0 ;
974+ foreach (iter ,all_workers )
975+ msg -> pids [i ++ ]= lfirst_int (iter );
976+
977+ shm_mq_send (mqh ,msg_len ,msg , false);
978+ }
979+
980+ List *
981+ GetRemoteBackendWorkers (PGPROC * proc ,int * error_code )
982+ {
983+ int sig_result ;
984+ shm_mq_handle * mqh ;
985+ shm_mq_result mq_receive_result ;
986+ workers_msg * msg ;
987+ Size msg_len ;
988+ int i ;
989+ List * result = NIL ;
990+
991+ if (proc -> backendId == InvalidBackendId )
992+ {
993+ * error_code = NOT_BACKEND_PROCESS ;
994+ return InvalidOid ;
995+ }
996+
997+ mq = shm_mq_create (mq ,QUEUE_SIZE );
998+ shm_mq_set_sender (mq ,proc );
999+ shm_mq_set_receiver (mq ,MyProc );
1000+
1001+ sig_result = SendProcSignal (proc -> pid ,WorkerPollReason ,proc -> backendId );
1002+ if (sig_result == -1 )
1003+ {
1004+ * error_code = COULD_NOT_SEND_SIGNAL ;
1005+ return InvalidOid ;
1006+ }
1007+
1008+ mqh = shm_mq_attach (mq ,NULL ,NULL );
1009+ mq_receive_result = shm_mq_receive_with_timeout (mqh ,& msg_len , (void * * )& msg ,1000 );
1010+ if (mq_receive_result != SHM_MQ_SUCCESS )
1011+ {
1012+ * error_code = INVALID_MQ_READ ;
1013+ return InvalidOid ;
1014+ }
1015+
1016+ for (i = 0 ;i < msg -> num ;i ++ )
1017+ result = lcons_int (msg -> pids [i ],result );
1018+
1019+ shm_mq_detach (mq );
1020+
1021+ return result ;
1022+ }