33
33
PG_MODULE_MAGIC ;
34
34
#endif
35
35
36
- #define PG_QS_MODULE_KEY 0xCA94B108
37
- #define PG_QUERY_STATE_KEY 0
38
-
39
36
#define TEXT_CSTR_CMP (text ,cstr ) \
40
37
(memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
41
38
@@ -78,6 +75,7 @@ static const char*be_state_str[] = {/* BackendState -> string repr */
78
75
"idle in transaction (aborted)" ,/* STATE_IDLEINTRANSACTION_ABORTED */
79
76
"disabled" ,/* STATE_DISABLED */
80
77
};
78
+ static int reqid = 0 ;
81
79
82
80
typedef struct
83
81
{
@@ -376,20 +374,29 @@ search_be_status(int pid)
376
374
return NULL ;
377
375
}
378
376
379
- /*
380
- * Init userlock
381
- */
382
- static void
383
- init_lock_tag (LOCKTAG * tag ,uint32 key )
377
+
378
+ void
379
+ UnlockShmem (LOCKTAG * tag )
384
380
{
381
+ LockRelease (tag ,ExclusiveLock , false);
382
+ }
383
+
384
+ void
385
+ LockShmem (LOCKTAG * tag ,uint32 key )
386
+ {
387
+ LockAcquireResult result ;
385
388
tag -> locktag_field1 = PG_QS_MODULE_KEY ;
386
389
tag -> locktag_field2 = key ;
387
390
tag -> locktag_field3 = 0 ;
388
391
tag -> locktag_field4 = 0 ;
389
392
tag -> locktag_type = LOCKTAG_USERLOCK ;
390
393
tag -> locktag_lockmethodid = USER_LOCKMETHOD ;
394
+ result = LockAcquire (tag ,ExclusiveLock , false, false);
395
+ Assert (result == LOCKACQUIRE_OK );
391
396
}
392
397
398
+
399
+
393
400
/*
394
401
* Structure of stack frame of fucntion call which transfers through message queue
395
402
*/
@@ -512,8 +519,7 @@ pg_query_state(PG_FUNCTION_ARGS)
512
519
* init and acquire lock so that any other concurrent calls of this fuction
513
520
* can not occupy shared queue for transfering query state
514
521
*/
515
- init_lock_tag (& tag ,PG_QUERY_STATE_KEY );
516
- LockAcquire (& tag ,ExclusiveLock , false, false);
522
+ LockShmem (& tag ,PG_QS_RCV_KEY );
517
523
518
524
INSTR_TIME_SET_CURRENT (start_time );
519
525
@@ -532,6 +538,8 @@ pg_query_state(PG_FUNCTION_ARGS)
532
538
}
533
539
}
534
540
pg_atomic_write_u32 (& counterpart_userid -> n_peers ,1 );
541
+ params -> reqid = ++ reqid ;
542
+ pg_write_barrier ();
535
543
536
544
counterpart_user_id = GetRemoteBackendUserId (proc );
537
545
if (!(superuser ()|| GetUserId ()== counterpart_user_id ))
@@ -553,7 +561,7 @@ pg_query_state(PG_FUNCTION_ARGS)
553
561
if (list_length (msgs )== 0 )
554
562
{
555
563
elog (WARNING ,"backend does not reply" );
556
- LockRelease (& tag , ExclusiveLock , false );
564
+ UnlockShmem (& tag );
557
565
SRF_RETURN_DONE (funcctx );
558
566
}
559
567
@@ -570,12 +578,12 @@ pg_query_state(PG_FUNCTION_ARGS)
570
578
else
571
579
elog (INFO ,"backend is not running query" );
572
580
573
- LockRelease (& tag , ExclusiveLock , false );
581
+ UnlockShmem (& tag );
574
582
SRF_RETURN_DONE (funcctx );
575
583
}
576
584
case STAT_DISABLED :
577
585
elog (INFO ,"query execution statistics disabled" );
578
- LockRelease (& tag , ExclusiveLock , false );
586
+ UnlockShmem (& tag );
579
587
SRF_RETURN_DONE (funcctx );
580
588
case QS_RETURNED :
581
589
{
@@ -636,7 +644,7 @@ pg_query_state(PG_FUNCTION_ARGS)
636
644
TupleDescInitEntry (tupdesc , (AttrNumber )5 ,"leader_pid" ,INT4OID ,-1 ,0 );
637
645
funcctx -> tuple_desc = BlessTupleDesc (tupdesc );
638
646
639
- LockRelease (& tag , ExclusiveLock , false );
647
+ UnlockShmem (& tag );
640
648
MemoryContextSwitchTo (oldcontext );
641
649
}
642
650
break ;
@@ -828,6 +836,7 @@ extract_running_bgworkers(PlanState *node, List **result)
828
836
829
837
typedef struct
830
838
{
839
+ int reqid ;
831
840
int number ;
832
841
pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
833
842
}BgWorkerPids ;
@@ -841,6 +850,9 @@ SendBgWorkerPids(void)
841
850
int msg_len ;
842
851
int i ;
843
852
shm_mq_handle * mqh ;
853
+ LOCKTAG tag ;
854
+
855
+ LockShmem (& tag ,PG_QS_SND_KEY );
844
856
845
857
mqh = shm_mq_attach (mq ,NULL ,NULL );
846
858
@@ -856,6 +868,7 @@ SendBgWorkerPids(void)
856
868
msg_len = offsetof(BgWorkerPids ,pids )
857
869
+ sizeof (pid_t )* list_length (all_workers );
858
870
msg = palloc (msg_len );
871
+ msg -> reqid = params -> reqid ;
859
872
msg -> number = list_length (all_workers );
860
873
i = 0 ;
861
874
foreach (iter ,all_workers )
@@ -867,6 +880,7 @@ SendBgWorkerPids(void)
867
880
}
868
881
869
882
shm_mq_send (mqh ,msg_len ,msg , false);
883
+ UnlockShmem (& tag );
870
884
}
871
885
872
886
/*
@@ -882,22 +896,25 @@ GetRemoteBackendWorkers(PGPROC *proc)
882
896
Size msg_len ;
883
897
int i ;
884
898
List * result = NIL ;
899
+ LOCKTAG tag ;
885
900
886
901
Assert (proc && proc -> backendId != InvalidBackendId );
887
902
Assert (WorkerPollReason != INVALID_PROCSIGNAL );
888
903
Assert (mq );
889
904
905
+ LockShmem (& tag ,PG_QS_SND_KEY );
890
906
mq = shm_mq_create (mq ,QUEUE_SIZE );
891
907
shm_mq_set_sender (mq ,proc );
892
908
shm_mq_set_receiver (mq ,MyProc );
909
+ UnlockShmem (& tag );
893
910
894
911
sig_result = SendProcSignal (proc -> pid ,WorkerPollReason ,proc -> backendId );
895
912
if (sig_result == -1 )
896
913
gotosignal_error ;
897
914
898
915
mqh = shm_mq_attach (mq ,NULL ,NULL );
899
916
mq_receive_result = shm_mq_receive (mqh ,& msg_len , (void * * )& msg , false);
900
- if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg_len != sizeof ( int )+ msg -> number * sizeof (pid_t ))
917
+ if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg -> reqid != reqid || msg_len != offsetof( BgWorkerPids , pids )+ msg -> number * sizeof (pid_t ))
901
918
gotomq_error ;
902
919
903
920
for (i = 0 ;i < msg -> number ;i ++ )
@@ -952,7 +969,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
952
969
shm_mq_result mq_receive_result ;
953
970
shm_mq_msg * msg ;
954
971
Size len ;
955
- static int reqid = 0 ;
972
+ LOCKTAG tag ;
956
973
957
974
Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
958
975
Assert (mq );
@@ -964,13 +981,14 @@ GetRemoteBackendQueryStates(PGPROC *leader,
964
981
params -> buffers = buffers ;
965
982
params -> triggers = triggers ;
966
983
params -> format = format ;
967
- params -> reqid = ++ reqid ;
968
984
pg_write_barrier ();
969
985
970
986
/* initialize message queue that will transfer query states */
987
+ LockShmem (& tag ,PG_QS_SND_KEY );
971
988
mq = shm_mq_create (mq ,QUEUE_SIZE );
972
989
shm_mq_set_sender (mq ,leader );
973
990
shm_mq_set_receiver (mq ,MyProc );
991
+ UnlockShmem (& tag );
974
992
975
993
/*
976
994
* send signal `QueryStatePollReason` to all processes and define all alive
@@ -1028,11 +1046,13 @@ GetRemoteBackendQueryStates(PGPROC *leader,
1028
1046
1029
1047
/* prepare message queue to transfer data */
1030
1048
elog (DEBUG1 ,"Wait response from worker %d" ,proc -> pid );
1049
+ LockShmem (& tag ,PG_QS_SND_KEY );
1031
1050
mq = shm_mq_create (mq ,QUEUE_SIZE );
1032
1051
shm_mq_set_sender (mq ,proc );
1033
1052
shm_mq_set_receiver (mq ,MyProc );/* this function notifies the
1034
1053
counterpart to come into data
1035
1054
transfer */
1055
+ UnlockShmem (& tag );
1036
1056
1037
1057
/* retrieve result data from message queue */
1038
1058
mqh = shm_mq_attach (mq ,NULL ,NULL );