Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit404cd45

Browse files
committed
[refer #4197]] Protect queue access by lock
1 parent9ad9d93 commit404cd45

File tree

3 files changed

+55
-18
lines changed

3 files changed

+55
-18
lines changed

‎pg_query_state.c

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
PG_MODULE_MAGIC;
3434
#endif
3535

36-
#definePG_QS_MODULE_KEY0xCA94B108
37-
#definePG_QUERY_STATE_KEY0
38-
3936
#defineTEXT_CSTR_CMP(text,cstr) \
4037
(memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
4138

@@ -78,6 +75,7 @@ static const char*be_state_str[] = {/* BackendState -> string repr */
7875
"idle in transaction (aborted)",/* STATE_IDLEINTRANSACTION_ABORTED */
7976
"disabled",/* STATE_DISABLED */
8077
};
78+
staticintreqid=0;
8179

8280
typedefstruct
8381
{
@@ -376,20 +374,29 @@ search_be_status(int pid)
376374
returnNULL;
377375
}
378376

379-
/*
380-
* Init userlock
381-
*/
382-
staticvoid
383-
init_lock_tag(LOCKTAG*tag,uint32key)
377+
378+
void
379+
UnlockShmem(LOCKTAG*tag)
384380
{
381+
LockRelease(tag,ExclusiveLock, false);
382+
}
383+
384+
void
385+
LockShmem(LOCKTAG*tag,uint32key)
386+
{
387+
LockAcquireResultresult;
385388
tag->locktag_field1=PG_QS_MODULE_KEY;
386389
tag->locktag_field2=key;
387390
tag->locktag_field3=0;
388391
tag->locktag_field4=0;
389392
tag->locktag_type=LOCKTAG_USERLOCK;
390393
tag->locktag_lockmethodid=USER_LOCKMETHOD;
394+
result=LockAcquire(tag,ExclusiveLock, false, false);
395+
Assert(result==LOCKACQUIRE_OK);
391396
}
392397

398+
399+
393400
/*
394401
* Structure of stack frame of fucntion call which transfers through message queue
395402
*/
@@ -512,8 +519,7 @@ pg_query_state(PG_FUNCTION_ARGS)
512519
* init and acquire lock so that any other concurrent calls of this fuction
513520
* can not occupy shared queue for transfering query state
514521
*/
515-
init_lock_tag(&tag,PG_QUERY_STATE_KEY);
516-
LockAcquire(&tag,ExclusiveLock, false, false);
522+
LockShmem(&tag,PG_QS_RCV_KEY);
517523

518524
INSTR_TIME_SET_CURRENT(start_time);
519525

@@ -532,6 +538,8 @@ pg_query_state(PG_FUNCTION_ARGS)
532538
}
533539
}
534540
pg_atomic_write_u32(&counterpart_userid->n_peers,1);
541+
params->reqid=++reqid;
542+
pg_write_barrier();
535543

536544
counterpart_user_id=GetRemoteBackendUserId(proc);
537545
if (!(superuser()||GetUserId()==counterpart_user_id))
@@ -553,7 +561,7 @@ pg_query_state(PG_FUNCTION_ARGS)
553561
if (list_length(msgs)==0)
554562
{
555563
elog(WARNING,"backend does not reply");
556-
LockRelease(&tag,ExclusiveLock, false);
564+
UnlockShmem(&tag);
557565
SRF_RETURN_DONE(funcctx);
558566
}
559567

@@ -570,12 +578,12 @@ pg_query_state(PG_FUNCTION_ARGS)
570578
else
571579
elog(INFO,"backend is not running query");
572580

573-
LockRelease(&tag,ExclusiveLock, false);
581+
UnlockShmem(&tag);
574582
SRF_RETURN_DONE(funcctx);
575583
}
576584
caseSTAT_DISABLED:
577585
elog(INFO,"query execution statistics disabled");
578-
LockRelease(&tag,ExclusiveLock, false);
586+
UnlockShmem(&tag);
579587
SRF_RETURN_DONE(funcctx);
580588
caseQS_RETURNED:
581589
{
@@ -636,7 +644,7 @@ pg_query_state(PG_FUNCTION_ARGS)
636644
TupleDescInitEntry(tupdesc, (AttrNumber)5,"leader_pid",INT4OID,-1,0);
637645
funcctx->tuple_desc=BlessTupleDesc(tupdesc);
638646

639-
LockRelease(&tag,ExclusiveLock, false);
647+
UnlockShmem(&tag);
640648
MemoryContextSwitchTo(oldcontext);
641649
}
642650
break;
@@ -828,6 +836,7 @@ extract_running_bgworkers(PlanState *node, List **result)
828836

829837
typedefstruct
830838
{
839+
intreqid;
831840
intnumber;
832841
pid_tpids[FLEXIBLE_ARRAY_MEMBER];
833842
}BgWorkerPids;
@@ -841,6 +850,9 @@ SendBgWorkerPids(void)
841850
intmsg_len;
842851
inti;
843852
shm_mq_handle*mqh;
853+
LOCKTAGtag;
854+
855+
LockShmem(&tag,PG_QS_SND_KEY);
844856

845857
mqh=shm_mq_attach(mq,NULL,NULL);
846858

@@ -856,6 +868,7 @@ SendBgWorkerPids(void)
856868
msg_len= offsetof(BgWorkerPids,pids)
857869
+sizeof(pid_t)*list_length(all_workers);
858870
msg=palloc(msg_len);
871+
msg->reqid=params->reqid;
859872
msg->number=list_length(all_workers);
860873
i=0;
861874
foreach(iter,all_workers)
@@ -867,6 +880,7 @@ SendBgWorkerPids(void)
867880
}
868881

869882
shm_mq_send(mqh,msg_len,msg, false);
883+
UnlockShmem(&tag);
870884
}
871885

872886
/*
@@ -882,22 +896,25 @@ GetRemoteBackendWorkers(PGPROC *proc)
882896
Sizemsg_len;
883897
inti;
884898
List*result=NIL;
899+
LOCKTAGtag;
885900

886901
Assert(proc&&proc->backendId!=InvalidBackendId);
887902
Assert(WorkerPollReason!=INVALID_PROCSIGNAL);
888903
Assert(mq);
889904

905+
LockShmem(&tag,PG_QS_SND_KEY);
890906
mq=shm_mq_create(mq,QUEUE_SIZE);
891907
shm_mq_set_sender(mq,proc);
892908
shm_mq_set_receiver(mq,MyProc);
909+
UnlockShmem(&tag);
893910

894911
sig_result=SendProcSignal(proc->pid,WorkerPollReason,proc->backendId);
895912
if (sig_result==-1)
896913
gotosignal_error;
897914

898915
mqh=shm_mq_attach(mq,NULL,NULL);
899916
mq_receive_result=shm_mq_receive(mqh,&msg_len, (void**)&msg, false);
900-
if (mq_receive_result!=SHM_MQ_SUCCESS||msg==NULL||msg_len!=sizeof(int)+msg->number*sizeof(pid_t))
917+
if (mq_receive_result!=SHM_MQ_SUCCESS||msg==NULL||msg->reqid!=reqid||msg_len!=offsetof(BgWorkerPids,pids)+msg->number*sizeof(pid_t))
901918
gotomq_error;
902919

903920
for (i=0;i<msg->number;i++)
@@ -952,7 +969,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
952969
shm_mq_resultmq_receive_result;
953970
shm_mq_msg*msg;
954971
Sizelen;
955-
staticintreqid=0;
972+
LOCKTAGtag;
956973

957974
Assert(QueryStatePollReason!=INVALID_PROCSIGNAL);
958975
Assert(mq);
@@ -964,13 +981,14 @@ GetRemoteBackendQueryStates(PGPROC *leader,
964981
params->buffers=buffers;
965982
params->triggers=triggers;
966983
params->format=format;
967-
params->reqid=++reqid;
968984
pg_write_barrier();
969985

970986
/* initialize message queue that will transfer query states */
987+
LockShmem(&tag,PG_QS_SND_KEY);
971988
mq=shm_mq_create(mq,QUEUE_SIZE);
972989
shm_mq_set_sender(mq,leader);
973990
shm_mq_set_receiver(mq,MyProc);
991+
UnlockShmem(&tag);
974992

975993
/*
976994
* send signal `QueryStatePollReason` to all processes and define all alive
@@ -1028,11 +1046,13 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10281046

10291047
/* prepare message queue to transfer data */
10301048
elog(DEBUG1,"Wait response from worker %d",proc->pid);
1049+
LockShmem(&tag,PG_QS_SND_KEY);
10311050
mq=shm_mq_create(mq,QUEUE_SIZE);
10321051
shm_mq_set_sender(mq,proc);
10331052
shm_mq_set_receiver(mq,MyProc);/* this function notifies the
10341053
counterpart to come into data
10351054
transfer */
1055+
UnlockShmem(&tag);
10361056

10371057
/* retrieve result data from message queue */
10381058
mqh=shm_mq_attach(mq,NULL,NULL);

‎pg_query_state.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
#defineTIMINIG_OFF_WARNING 1
2323
#defineBUFFERS_OFF_WARNING 2
2424

25+
#definePG_QS_MODULE_KEY0xCA94B108
26+
#definePG_QS_RCV_KEY 0
27+
#definePG_QS_SND_KEY 1
28+
2529
/* Receive timeout should be larger than send timeout to let workers stop waiting before polling process */
2630
#defineMAX_RCV_TIMEOUT 6000/* 6 seconds */
2731
#defineMAX_SND_TIMEOUT 3000/* 3 seconds */
@@ -34,7 +38,7 @@ typedef enum
3438
{
3539
QUERY_NOT_RUNNING,/* Backend doesn't execute any query */
3640
STAT_DISABLED,/* Collection of execution statistics is disabled */
37-
QS_RETURNED/* Backendsuccesfully returned its query state */
41+
QS_RETURNED/* Backendsuccx[esfully returned its query state */
3842
}PG_QS_RequestResult;
3943

4044
/*
@@ -48,6 +52,7 @@ typedef struct
4852
PG_QS_RequestResultresult_code;
4953
intwarnings;/* bitmap of warnings */
5054
intstack_depth;
55+
charfiller[1024*1024];
5156
charstack[FLEXIBLE_ARRAY_MEMBER];/* sequencially laid out stack frames in form of
5257
text records */
5358
}shm_mq_msg;
@@ -77,5 +82,7 @@ extern shm_mq *mq;
7782
/* signal_handler.c */
7883
externvoidSendQueryState(void);
7984
externvoidDetachPeer(void);
85+
externvoidUnlockShmem(LOCKTAG*tag);
86+
externvoidLockShmem(LOCKTAG*tag,uint32key);
8087

8188
#endif

‎signal_handler.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ SendQueryState(void)
161161
instr_timecur_time;
162162
int64delay=MAX_SND_TIMEOUT;
163163
intreqid=params->reqid;
164+
LOCKTAGtag;
164165

165166
INSTR_TIME_SET_CURRENT(start_time);
166167

@@ -190,9 +191,17 @@ SendQueryState(void)
190191
CHECK_FOR_INTERRUPTS();
191192
ResetLatch(MyLatch);
192193
}
194+
195+
LockShmem(&tag,PG_QS_SND_KEY);
196+
193197
elog(DEBUG1,"Worker %d receives pg_query_state request from %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
194198
mqh=shm_mq_attach(mq,NULL,NULL);
195199

200+
if (reqid!=params->reqid||shm_mq_get_sender(mq)!=MyProc)
201+
{
202+
UnlockShmem(&tag);
203+
return;
204+
}
196205
/* check if module is enabled */
197206
if (!pg_qs_enable)
198207
{
@@ -233,4 +242,5 @@ SendQueryState(void)
233242
}
234243
elog(DEBUG1,"Worker %d sends response for pg_query_state to %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
235244
DetachPeer();
245+
UnlockShmem(&tag);
236246
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp