3333PG_MODULE_MAGIC ;
3434#endif
3535
36- #define PG_QS_MODULE_KEY 0xCA94B108
37- #define PG_QUERY_STATE_KEY 0
38-
3936#define TEXT_CSTR_CMP (text ,cstr ) \
4037(memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
4138
@@ -78,6 +75,7 @@ static const char*be_state_str[] = {/* BackendState -> string repr */
7875"idle in transaction (aborted)" ,/* STATE_IDLEINTRANSACTION_ABORTED */
7976"disabled" ,/* STATE_DISABLED */
8077};
78+ static int reqid = 0 ;
8179
8280typedef struct
8381{
@@ -376,20 +374,29 @@ search_be_status(int pid)
376374return NULL ;
377375}
378376
379- /*
380- * Init userlock
381- */
382- static void
383- init_lock_tag (LOCKTAG * tag ,uint32 key )
377+
378+ void
379+ UnlockShmem (LOCKTAG * tag )
384380{
381+ LockRelease (tag ,ExclusiveLock , false);
382+ }
383+
384+ void
385+ LockShmem (LOCKTAG * tag ,uint32 key )
386+ {
387+ LockAcquireResult result ;
385388tag -> locktag_field1 = PG_QS_MODULE_KEY ;
386389tag -> locktag_field2 = key ;
387390tag -> locktag_field3 = 0 ;
388391tag -> locktag_field4 = 0 ;
389392tag -> locktag_type = LOCKTAG_USERLOCK ;
390393tag -> locktag_lockmethodid = USER_LOCKMETHOD ;
394+ result = LockAcquire (tag ,ExclusiveLock , false, false);
395+ Assert (result == LOCKACQUIRE_OK );
391396}
392397
398+
399+
393400/*
394401 * Structure of stack frame of fucntion call which transfers through message queue
395402 */
@@ -512,8 +519,7 @@ pg_query_state(PG_FUNCTION_ARGS)
512519 * init and acquire lock so that any other concurrent calls of this fuction
513520 * can not occupy shared queue for transfering query state
514521 */
515- init_lock_tag (& tag ,PG_QUERY_STATE_KEY );
516- LockAcquire (& tag ,ExclusiveLock , false, false);
522+ LockShmem (& tag ,PG_QS_RCV_KEY );
517523
518524INSTR_TIME_SET_CURRENT (start_time );
519525
@@ -532,6 +538,8 @@ pg_query_state(PG_FUNCTION_ARGS)
532538}
533539}
534540pg_atomic_write_u32 (& counterpart_userid -> n_peers ,1 );
541+ params -> reqid = ++ reqid ;
542+ pg_write_barrier ();
535543
536544counterpart_user_id = GetRemoteBackendUserId (proc );
537545if (!(superuser ()|| GetUserId ()== counterpart_user_id ))
@@ -553,7 +561,7 @@ pg_query_state(PG_FUNCTION_ARGS)
553561if (list_length (msgs )== 0 )
554562{
555563elog (WARNING ,"backend does not reply" );
556- LockRelease (& tag , ExclusiveLock , false );
564+ UnlockShmem (& tag );
557565SRF_RETURN_DONE (funcctx );
558566}
559567
@@ -570,12 +578,12 @@ pg_query_state(PG_FUNCTION_ARGS)
570578else
571579elog (INFO ,"backend is not running query" );
572580
573- LockRelease (& tag , ExclusiveLock , false );
581+ UnlockShmem (& tag );
574582SRF_RETURN_DONE (funcctx );
575583}
576584case STAT_DISABLED :
577585elog (INFO ,"query execution statistics disabled" );
578- LockRelease (& tag , ExclusiveLock , false );
586+ UnlockShmem (& tag );
579587SRF_RETURN_DONE (funcctx );
580588case QS_RETURNED :
581589{
@@ -636,7 +644,7 @@ pg_query_state(PG_FUNCTION_ARGS)
636644TupleDescInitEntry (tupdesc , (AttrNumber )5 ,"leader_pid" ,INT4OID ,-1 ,0 );
637645funcctx -> tuple_desc = BlessTupleDesc (tupdesc );
638646
639- LockRelease (& tag , ExclusiveLock , false );
647+ UnlockShmem (& tag );
640648MemoryContextSwitchTo (oldcontext );
641649}
642650break ;
@@ -828,6 +836,7 @@ extract_running_bgworkers(PlanState *node, List **result)
828836
829837typedef struct
830838{
839+ int reqid ;
831840int number ;
832841pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
833842}BgWorkerPids ;
@@ -841,6 +850,9 @@ SendBgWorkerPids(void)
841850int msg_len ;
842851int i ;
843852shm_mq_handle * mqh ;
853+ LOCKTAG tag ;
854+
855+ LockShmem (& tag ,PG_QS_SND_KEY );
844856
845857mqh = shm_mq_attach (mq ,NULL ,NULL );
846858
@@ -856,6 +868,7 @@ SendBgWorkerPids(void)
856868msg_len = offsetof(BgWorkerPids ,pids )
857869+ sizeof (pid_t )* list_length (all_workers );
858870msg = palloc (msg_len );
871+ msg -> reqid = params -> reqid ;
859872msg -> number = list_length (all_workers );
860873i = 0 ;
861874foreach (iter ,all_workers )
@@ -867,6 +880,7 @@ SendBgWorkerPids(void)
867880}
868881
869882shm_mq_send (mqh ,msg_len ,msg , false);
883+ UnlockShmem (& tag );
870884}
871885
872886/*
@@ -882,22 +896,25 @@ GetRemoteBackendWorkers(PGPROC *proc)
882896Size msg_len ;
883897int i ;
884898List * result = NIL ;
899+ LOCKTAG tag ;
885900
886901Assert (proc && proc -> backendId != InvalidBackendId );
887902Assert (WorkerPollReason != INVALID_PROCSIGNAL );
888903Assert (mq );
889904
905+ LockShmem (& tag ,PG_QS_SND_KEY );
890906mq = shm_mq_create (mq ,QUEUE_SIZE );
891907shm_mq_set_sender (mq ,proc );
892908shm_mq_set_receiver (mq ,MyProc );
909+ UnlockShmem (& tag );
893910
894911sig_result = SendProcSignal (proc -> pid ,WorkerPollReason ,proc -> backendId );
895912if (sig_result == -1 )
896913gotosignal_error ;
897914
898915mqh = shm_mq_attach (mq ,NULL ,NULL );
899916mq_receive_result = shm_mq_receive (mqh ,& msg_len , (void * * )& msg , false);
900- if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg_len != sizeof ( int )+ msg -> number * sizeof (pid_t ))
917+ if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg -> reqid != reqid || msg_len != offsetof( BgWorkerPids , pids )+ msg -> number * sizeof (pid_t ))
901918gotomq_error ;
902919
903920for (i = 0 ;i < msg -> number ;i ++ )
@@ -952,7 +969,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
952969shm_mq_result mq_receive_result ;
953970shm_mq_msg * msg ;
954971Size len ;
955- static int reqid = 0 ;
972+ LOCKTAG tag ;
956973
957974Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
958975Assert (mq );
@@ -964,13 +981,14 @@ GetRemoteBackendQueryStates(PGPROC *leader,
964981params -> buffers = buffers ;
965982params -> triggers = triggers ;
966983params -> format = format ;
967- params -> reqid = ++ reqid ;
968984pg_write_barrier ();
969985
970986/* initialize message queue that will transfer query states */
987+ LockShmem (& tag ,PG_QS_SND_KEY );
971988mq = shm_mq_create (mq ,QUEUE_SIZE );
972989shm_mq_set_sender (mq ,leader );
973990shm_mq_set_receiver (mq ,MyProc );
991+ UnlockShmem (& tag );
974992
975993/*
976994 * send signal `QueryStatePollReason` to all processes and define all alive
@@ -1028,11 +1046,13 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10281046
10291047/* prepare message queue to transfer data */
10301048elog (DEBUG1 ,"Wait response from worker %d" ,proc -> pid );
1049+ LockShmem (& tag ,PG_QS_SND_KEY );
10311050mq = shm_mq_create (mq ,QUEUE_SIZE );
10321051shm_mq_set_sender (mq ,proc );
10331052shm_mq_set_receiver (mq ,MyProc );/* this function notifies the
10341053 counterpart to come into data
10351054 transfer */
1055+ UnlockShmem (& tag );
10361056
10371057/* retrieve result data from message queue */
10381058mqh = shm_mq_attach (mq ,NULL ,NULL );