4
4
*
5
5
* Copyright (c) 2016-2016, Postgres Professional
6
6
*
7
- * IDENTIFICATION
8
7
* contrib/pg_query_state/pg_query_state.c
8
+ * IDENTIFICATION
9
9
*/
10
10
11
11
#include "pg_query_state.h"
12
12
13
13
#include "access/htup_details.h"
14
14
#include "catalog/pg_type.h"
15
15
#include "funcapi.h"
16
+ #include "executor/execParallel.h"
16
17
#include "executor/executor.h"
17
18
#include "miscadmin.h"
19
+ #include "nodes/nodeFuncs.h"
20
+ #include "nodes/print.h"
18
21
#include "pgstat.h"
22
+ #include "postmaster/bgworker.h"
19
23
#include "storage/ipc.h"
20
24
#include "storage/procarray.h"
21
25
#include "storage/procsignal.h"
@@ -63,6 +67,7 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
63
67
List * QueryDescStack = NIL ;
64
68
static ProcSignalReason QueryStatePollReason ;
65
69
static ProcSignalReason RolePollReason ;
70
+ static ProcSignalReason WorkerPollReason ;
66
71
static bool module_initialized = false;
67
72
static const char * be_state_str []= {/* BackendState -> string repr */
68
73
"undefined" ,/* STATE_UNDEFINED */
@@ -93,8 +98,10 @@ typedef struct
93
98
pid_t traceable ;
94
99
}trace_request ;
95
100
96
- static void SendCurrentRoleOid (void );
97
- Oid GetRemoteBackendUser (pid_t pid ,int * error_code );
101
+ static void SendCurrentUserId (void );
102
+ Oid GetRemoteBackendUserId (PGPROC * proc ,int * error_code );
103
+ static void SendWorkerPids (void );
104
+ List * GetRemoteBackendWorkers (PGPROC * proc ,int * error_code );
98
105
99
106
/* Shared memory variables */
100
107
shm_toc * toc = NULL ;
@@ -187,8 +194,10 @@ _PG_init(void)
187
194
188
195
/* Register interrupt on custom signal of polling query state */
189
196
QueryStatePollReason = RegisterCustomProcSignalHandler (SendQueryState );
190
- RolePollReason = RegisterCustomProcSignalHandler (SendCurrentRoleOid );
191
- if (QueryStatePollReason == INVALID_PROCSIGNAL || RolePollReason == INVALID_PROCSIGNAL )
197
+ RolePollReason = RegisterCustomProcSignalHandler (SendCurrentUserId );
198
+ WorkerPollReason = RegisterCustomProcSignalHandler (SendWorkerPids );
199
+ if (QueryStatePollReason == INVALID_PROCSIGNAL || RolePollReason == INVALID_PROCSIGNAL
200
+ || WorkerPollReason == INVALID_PROCSIGNAL )
192
201
{
193
202
ereport (WARNING , (errcode (ERRCODE_INSUFFICIENT_RESOURCES ),
194
203
errmsg ("pg_query_state isn't loaded: insufficient custom ProcSignal slots" )));
@@ -615,9 +624,6 @@ pg_query_state(PG_FUNCTION_ARGS)
615
624
init_lock_tag (& tag ,PG_QUERY_STATE_KEY );
616
625
LockAcquire (& tag ,ExclusiveLock , false, false);
617
626
618
- int error_code ;
619
- Oid user_id = GetRemoteBackendUser (pid ,& error_code );
620
-
621
627
/* fill in caller's user data */
622
628
caller -> user_id = GetUserId ();
623
629
caller -> superuser = superuser ();
@@ -827,56 +833,55 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, Size *nbytesp, void **datap, lon
827
833
{
828
834
829
835
#ifdef HAVE_INT64_TIMESTAMP
830
- #define GetNowLong ()((long ) GetCurrentTimestamp() / 1000)
836
+ #define GetNowFloat ()((float8 ) GetCurrentTimestamp() / 1000.0 )
831
837
#else
832
- #define GetNowLong ()1000 * GetCurrentTimestamp()
838
+ #define GetNowFloat ()1000.0 * GetCurrentTimestamp()
833
839
#endif
834
840
835
- long endtime = GetNowLong ()+ timeout ;
841
+ float8 endtime = GetNowFloat ()+ timeout ;
842
+ int rc = 0 ;
836
843
837
844
for (;;)
838
845
{
839
- int rc ;
840
846
long delay ;
841
847
shm_mq_result mq_receive_result = shm_mq_receive (mqh ,nbytesp ,datap , true);
842
848
843
849
if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
844
850
return mq_receive_result ;
845
851
846
- delay = endtime - GetNowLong ();
852
+ if (rc & WL_TIMEOUT )
853
+ return SHM_MQ_WOULD_BLOCK ;
854
+
855
+ delay = (long ) (endtime - GetNowFloat ());
847
856
rc = WaitLatch (MyLatch ,WL_LATCH_SET |WL_TIMEOUT ,delay );
848
857
CHECK_FOR_INTERRUPTS ();
849
858
ResetLatch (MyLatch );
850
-
851
- if (rc & WL_TIMEOUT )
852
- return SHM_MQ_WOULD_BLOCK ;
853
859
}
854
860
}
855
861
856
862
static void
857
- SendCurrentRoleOid (void )
863
+ SendCurrentUserId (void )
858
864
{
859
865
shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
860
- Oid role_oid = GetUserId ();
866
+ Oid user_oid = GetUserId ();
861
867
862
- shm_mq_send (mqh ,sizeof (Oid ),& role_oid , false);
868
+ shm_mq_send (mqh ,sizeof (Oid ),& user_oid , false);
863
869
}
864
870
865
871
#define NOT_BACKEND_PROCESS 1
866
872
#define COULD_NOT_SEND_SIGNAL 2
867
873
#define INVALID_MQ_READ 3
868
874
869
875
Oid
870
- GetRemoteBackendUser ( pid_t pid ,int * error_code )
876
+ GetRemoteBackendUserId ( PGPROC * proc ,int * error_code )
871
877
{
872
- PGPROC * proc = BackendPidGetProc (pid );
873
878
int sig_result ;
874
- shm_mq_handle * mqh ;
875
- shm_mq_result mq_receive_result ;
879
+ shm_mq_handle * mqh ;
880
+ shm_mq_result mq_receive_result ;
876
881
Oid * result ;
877
882
Size res_len ;
878
883
879
- if (proc == NULL || proc -> backendId == InvalidBackendId )
884
+ if (proc -> backendId == InvalidBackendId )
880
885
{
881
886
* error_code = NOT_BACKEND_PROCESS ;
882
887
return InvalidOid ;
@@ -886,7 +891,7 @@ GetRemoteBackendUser(pid_t pid, int *error_code)
886
891
shm_mq_set_sender (mq ,proc );
887
892
shm_mq_set_receiver (mq ,MyProc );
888
893
889
- sig_result = SendProcSignal (pid ,RolePollReason ,proc -> backendId );
894
+ sig_result = SendProcSignal (proc -> pid ,RolePollReason ,proc -> backendId );
890
895
if (sig_result == -1 )
891
896
{
892
897
* error_code = COULD_NOT_SEND_SIGNAL ;
@@ -905,3 +910,113 @@ GetRemoteBackendUser(pid_t pid, int *error_code)
905
910
906
911
return * result ;
907
912
}
913
+
914
+ static bool
915
+ extract_worker_handles (PlanState * node ,List * * result )
916
+ {
917
+ if (node == NULL )
918
+ return false;
919
+
920
+ if (IsA (node ,GatherState ))
921
+ {
922
+ GatherState * gather_node = (GatherState * )node ;
923
+ int i ;
924
+
925
+ if (gather_node -> pei )
926
+ {
927
+ for (i = 0 ;i < gather_node -> pei -> pcxt -> nworkers_launched ;i ++ )
928
+ {
929
+ pid_t pid ;
930
+ BackgroundWorkerHandle * bgwh = gather_node -> pei -> pcxt -> worker [i ].bgwhandle ;
931
+ BgwHandleStatus status ;
932
+
933
+ if (!bgwh )
934
+ continue ;
935
+
936
+ status = GetBackgroundWorkerPid (bgwh ,& pid );
937
+ if (status == BGWH_STARTED )
938
+ * result = lcons_int (pid ,* result );
939
+ }
940
+ }
941
+ }
942
+ return planstate_tree_walker (node ,extract_worker_handles , (void * )result );
943
+ }
944
+
945
+ typedef struct
946
+ {
947
+ int num ;
948
+ pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
949
+ }workers_msg ;
950
+
951
+ static void
952
+ SendWorkerPids (void )
953
+ {
954
+ ListCell * iter ;
955
+ List * all_workers = NIL ;
956
+ workers_msg * msg ;
957
+ int msg_len ;
958
+ int i ;
959
+ shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
960
+
961
+ foreach (iter ,QueryDescStack )
962
+ {
963
+ QueryDesc * curQueryDesc = (QueryDesc * )lfirst (iter );
964
+ List * bgworker_pids = NIL ;
965
+
966
+ extract_worker_handles (curQueryDesc -> planstate ,& bgworker_pids );
967
+ all_workers = list_concat (all_workers ,bgworker_pids );
968
+ }
969
+
970
+ msg_len = offsetof(workers_msg ,pids )+ sizeof (pid_t )* list_length (all_workers );
971
+ msg = palloc (msg_len );
972
+ msg -> num = list_length (all_workers );
973
+ i = 0 ;
974
+ foreach (iter ,all_workers )
975
+ msg -> pids [i ++ ]= lfirst_int (iter );
976
+
977
+ shm_mq_send (mqh ,msg_len ,msg , false);
978
+ }
979
+
980
+ List *
981
+ GetRemoteBackendWorkers (PGPROC * proc ,int * error_code )
982
+ {
983
+ int sig_result ;
984
+ shm_mq_handle * mqh ;
985
+ shm_mq_result mq_receive_result ;
986
+ workers_msg * msg ;
987
+ Size msg_len ;
988
+ int i ;
989
+ List * result = NIL ;
990
+
991
+ if (proc -> backendId == InvalidBackendId )
992
+ {
993
+ * error_code = NOT_BACKEND_PROCESS ;
994
+ return InvalidOid ;
995
+ }
996
+
997
+ mq = shm_mq_create (mq ,QUEUE_SIZE );
998
+ shm_mq_set_sender (mq ,proc );
999
+ shm_mq_set_receiver (mq ,MyProc );
1000
+
1001
+ sig_result = SendProcSignal (proc -> pid ,WorkerPollReason ,proc -> backendId );
1002
+ if (sig_result == -1 )
1003
+ {
1004
+ * error_code = COULD_NOT_SEND_SIGNAL ;
1005
+ return InvalidOid ;
1006
+ }
1007
+
1008
+ mqh = shm_mq_attach (mq ,NULL ,NULL );
1009
+ mq_receive_result = shm_mq_receive_with_timeout (mqh ,& msg_len , (void * * )& msg ,1000 );
1010
+ if (mq_receive_result != SHM_MQ_SUCCESS )
1011
+ {
1012
+ * error_code = INVALID_MQ_READ ;
1013
+ return InvalidOid ;
1014
+ }
1015
+
1016
+ for (i = 0 ;i < msg -> num ;i ++ )
1017
+ result = lcons_int (msg -> pids [i ],result );
1018
+
1019
+ shm_mq_detach (mq );
1020
+
1021
+ return result ;
1022
+ }