11/*
22 * pg_query_state.c
3- *Extract information about query stateof other backend
3+ *Extract information about query statefrom other backend
44 *
55 * Copyright (c) 2016-2016, Postgres Professional
66 *
3131PG_MODULE_MAGIC ;
3232#endif
3333
34- #define QUEUE_SIZE (16 * 1024)
3534#define PG_QS_MODULE_KEY 0xCA94B108
3635#define PG_QUERY_STATE_KEY 0
3736#define EXECUTOR_TRACE_KEY 1
@@ -66,7 +65,6 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6665/* Global variables */
6766List * QueryDescStack = NIL ;
6867static ProcSignalReason QueryStatePollReason ;
69- static ProcSignalReason RolePollReason ;
7068static ProcSignalReason WorkerPollReason ;
7169static bool module_initialized = false;
7270static const char * be_state_str []= {/* BackendState -> string repr */
@@ -98,8 +96,6 @@ typedef struct
9896pid_t traceable ;
9997}trace_request ;
10098
101- static void SendCurrentUserId (void );
102- Oid GetRemoteBackendUserId (PGPROC * proc );
10399static void SendWorkerPids (void );
104100List * GetRemoteBackendWorkers (PGPROC * proc ,int * error_code );
105101
@@ -108,6 +104,7 @@ shm_toc*toc = NULL;
108104pg_qs_params * params = NULL ;
109105trace_request * trace_req = NULL ;
110106shm_mq * mq = NULL ;
107+ void * grbui_shm = NULL ;
111108
112109/*
113110 * Estimate amount of shared memory needed.
@@ -121,11 +118,12 @@ pg_qs_shmem_size()
121118
122119shm_toc_initialize_estimator (& e );
123120
124- nkeys = 3 ;
121+ nkeys = 4 ;
125122
126123shm_toc_estimate_chunk (& e ,sizeof (trace_request ));
127124shm_toc_estimate_chunk (& e ,sizeof (pg_qs_params ));
128125shm_toc_estimate_chunk (& e , (Size )QUEUE_SIZE );
126+ shm_toc_estimate_chunk (& e ,grbui_EstimateShmemSize ());
129127
130128shm_toc_estimate_keys (& e ,nkeys );
131129size = shm_toc_estimate (& e );
@@ -156,6 +154,8 @@ pg_qs_shmem_startup(void)
156154MemSet (trace_req ,0 ,sizeof (trace_request ));
157155mq = shm_toc_allocate (toc ,QUEUE_SIZE );
158156shm_toc_insert (toc ,num_toc ++ ,mq );
157+ grbui_shm = shm_toc_allocate (toc ,grbui_EstimateShmemSize ());
158+ shm_toc_insert (toc ,num_toc ++ ,grbui_shm );
159159}
160160else
161161{
@@ -164,7 +164,9 @@ pg_qs_shmem_startup(void)
164164params = shm_toc_lookup (toc ,num_toc ++ );
165165trace_req = shm_toc_lookup (toc ,num_toc ++ );
166166mq = shm_toc_lookup (toc ,num_toc ++ );
167+ grbui_shm = shm_toc_lookup (toc ,num_toc ++ );
167168}
169+ grbui_ShmemInit (grbui_shm ,found );
168170
169171if (prev_shmem_startup_hook )
170172prev_shmem_startup_hook ();
@@ -186,14 +188,13 @@ _PG_init(void)
186188 * the postmaster process.) We'll allocate or attach to the shared
187189 * resources in qs_shmem_startup().
188190 */
189- RequestAddinShmemSpace (QUEUE_SIZE );
191+ RequestAddinShmemSpace (pg_qs_shmem_size () );
190192
191193/* Register interrupt on custom signal of polling query state */
194+ RegisterGetRemoteBackendUserId ();
192195QueryStatePollReason = RegisterCustomProcSignalHandler (SendQueryState );
193- RolePollReason = RegisterCustomProcSignalHandler (SendCurrentUserId );
194196WorkerPollReason = RegisterCustomProcSignalHandler (SendWorkerPids );
195- if (QueryStatePollReason == INVALID_PROCSIGNAL || RolePollReason == INVALID_PROCSIGNAL
196- || WorkerPollReason == INVALID_PROCSIGNAL )
197+ if (QueryStatePollReason == INVALID_PROCSIGNAL || WorkerPollReason == INVALID_PROCSIGNAL )
197198{
198199ereport (WARNING , (errcode (ERRCODE_INSUFFICIENT_RESOURCES ),
199200errmsg ("pg_query_state isn't loaded: insufficient custom ProcSignal slots" )));
@@ -802,85 +803,6 @@ executor_continue(PG_FUNCTION_ARGS)
802803PG_RETURN_VOID ();
803804}
804805
805- static shm_mq_result
806- shm_mq_receive_with_timeout (shm_mq_handle * mqh ,Size * nbytesp ,void * * datap ,long timeout )
807- {
808-
809- #ifdef HAVE_INT64_TIMESTAMP
810- #define GetNowFloat ()((float8) GetCurrentTimestamp() / 1000.0)
811- #else
812- #define GetNowFloat ()1000.0 * GetCurrentTimestamp()
813- #endif
814-
815- float8 endtime = GetNowFloat ()+ timeout ;
816- int rc = 0 ;
817-
818- for (;;)
819- {
820- long delay ;
821- shm_mq_result mq_receive_result = shm_mq_receive (mqh ,nbytesp ,datap , true);
822-
823- if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
824- return mq_receive_result ;
825-
826- if (rc & WL_TIMEOUT )
827- return SHM_MQ_WOULD_BLOCK ;
828-
829- delay = (long ) (endtime - GetNowFloat ());
830- rc = WaitLatch (MyLatch ,WL_LATCH_SET |WL_TIMEOUT ,delay );
831- CHECK_FOR_INTERRUPTS ();
832- ResetLatch (MyLatch );
833- }
834- }
835-
836- static void
837- SendCurrentUserId (void )
838- {
839- shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
840- Oid user_oid = GetUserId ();
841-
842- shm_mq_send (mqh ,sizeof (Oid ),& user_oid , false);
843- }
844-
845- #define NOT_BACKEND_PROCESS 1
846- #define COULD_NOT_SEND_SIGNAL 2
847- #define INVALID_MQ_READ 3
848-
849- /*
850- * Extract effective user id of external backend session
851- * Assume `proc` is valid backend and doesn't point to current process
852- */
853- Oid
854- GetRemoteBackendUserId (PGPROC * proc )
855- {
856- int sig_result ;
857- shm_mq_handle * mqh ;
858- shm_mq_result mq_receive_result ;
859- Oid * result ;
860- Size res_len ;
861-
862- Assert (proc && proc != MyProc && proc -> backendId != InvalidBackendId );
863-
864- mq = shm_mq_create (mq ,QUEUE_SIZE );
865- shm_mq_set_sender (mq ,proc );
866- shm_mq_set_receiver (mq ,MyProc );
867-
868- sig_result = SendProcSignal (proc -> pid ,RolePollReason ,proc -> backendId );
869- if (sig_result == -1 )
870- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
871- errmsg ("invalid send signal" )));
872-
873- mqh = shm_mq_attach (mq ,NULL ,NULL );
874- mq_receive_result = shm_mq_receive_with_timeout (mqh ,& res_len , (void * * )& result ,1000 );
875- if (mq_receive_result != SHM_MQ_SUCCESS )
876- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
877- errmsg ("invalid read from message queue" )));
878-
879- shm_mq_detach (mq );
880-
881- return * result ;
882- }
883-
884806static bool
885807extract_worker_handles (PlanState * node ,List * * result )
886808{
@@ -947,46 +869,43 @@ SendWorkerPids(void)
947869shm_mq_send (mqh ,msg_len ,msg , false);
948870}
949871
950- List *
951- GetRemoteBackendWorkers (PGPROC * proc ,int * error_code )
952- {
953- int sig_result ;
954- shm_mq_handle * mqh ;
955- shm_mq_result mq_receive_result ;
956- workers_msg * msg ;
957- Size msg_len ;
958- int i ;
959- List * result = NIL ;
960-
961- if (proc -> backendId == InvalidBackendId )
962- {
963- * error_code = NOT_BACKEND_PROCESS ;
964- return NIL ;
965- }
966-
967- mq = shm_mq_create (mq ,QUEUE_SIZE );
968- shm_mq_set_sender (mq ,proc );
969- shm_mq_set_receiver (mq ,MyProc );
970-
971- sig_result = SendProcSignal (proc -> pid ,WorkerPollReason ,proc -> backendId );
972- if (sig_result == -1 )
973- {
974- * error_code = COULD_NOT_SEND_SIGNAL ;
975- return NIL ;
976- }
977-
978- mqh = shm_mq_attach (mq ,NULL ,NULL );
979- mq_receive_result = shm_mq_receive_with_timeout (mqh ,& msg_len , (void * * )& msg ,1000 );
980- if (mq_receive_result != SHM_MQ_SUCCESS )
981- {
982- * error_code = INVALID_MQ_READ ;
983- return NIL ;
984- }
985-
986- for (i = 0 ;i < msg -> num ;i ++ )
987- result = lcons_int (msg -> pids [i ],result );
988-
989- shm_mq_detach (mq );
990-
991- return result ;
992- }
872+ // List *
873+ // GetRemoteBackendWorkers(PGPROC *proc, int *error_code)
874+ // {
875+ // intsig_result;
876+ // shm_mq_handle*mqh;
877+ // shm_mq_result mq_receive_result;
878+ // workers_msg*msg;
879+ // Sizemsg_len;
880+ // inti;
881+ // List*result = NIL;
882+
883+ // if (proc->backendId == InvalidBackendId)
884+ // {
885+ // return NIL;
886+ // }
887+
888+ // mq = shm_mq_create(mq, QUEUE_SIZE);
889+ // shm_mq_set_sender(mq, proc);
890+ // shm_mq_set_receiver(mq, MyProc);
891+
892+ // sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->backendId);
893+ // if (sig_result == -1)
894+ // {
895+ // return NIL;
896+ // }
897+
898+ // mqh = shm_mq_attach(mq, NULL, NULL);
899+ // mq_receive_result = shm_mq_receive_with_timeout(mqh, &msg_len, (void **) &msg, 1000);
900+ // if (mq_receive_result != SHM_MQ_SUCCESS)
901+ // {
902+ // return NIL;
903+ // }
904+
905+ // for (i = 0; i < msg->num; i++)
906+ // result = lcons_int(msg->pids[i], result);
907+
908+ // shm_mq_detach(mq);
909+
910+ // return result;
911+ // }