Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfbdc41a

Browse files
committed
[refer #PGPRO-4197] Correctly handle timeouts
1 parent1a8558c commitfbdc41a

File tree

3 files changed

+35
-12
lines changed

3 files changed

+35
-12
lines changed

‎pg_query_state.c

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ PG_MODULE_MAGIC;
3636
#definePG_QS_MODULE_KEY0xCA94B108
3737
#definePG_QUERY_STATE_KEY0
3838

39-
#defineMIN_TIMEOUT 5000
40-
4139
#defineTEXT_CSTR_CMP(text,cstr) \
4240
(memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
4341

@@ -516,11 +514,14 @@ pg_query_state(PG_FUNCTION_ARGS)
516514
init_lock_tag(&tag,PG_QUERY_STATE_KEY);
517515
LockAcquire(&tag,ExclusiveLock, false, false);
518516

519-
for (i=0;pg_atomic_read_u32(&counterpart_userid->n_peers)!=0&&i<MIN_TIMEOUT/1000;i++)
517+
for (i=0;pg_atomic_read_u32(&counterpart_userid->n_peers)!=0&&i <=MAX_TIMEOUT/1000;i++)
520518
{
521519
pg_usleep(1000000);/* wait one second */
522520
CHECK_FOR_INTERRUPTS();
523521
}
522+
if (i>MAX_TIMEOUT/1000)
523+
elog(WARNING,"pg_query_state: last request was interrupted");
524+
524525
pg_atomic_write_u32(&counterpart_userid->n_peers,1);
525526

526527
counterpart_user_id=GetRemoteBackendUserId(proc);
@@ -741,15 +742,15 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
741742
{
742743
intrc=0;
743744
longdelay=timeout;
745+
instr_timestart_time;
746+
instr_timecur_time;
747+
748+
INSTR_TIME_SET_CURRENT(start_time);
744749

745750
for (;;)
746751
{
747-
instr_timestart_time;
748-
instr_timecur_time;
749752
shm_mq_resultmq_receive_result;
750753

751-
INSTR_TIME_SET_CURRENT(start_time);
752-
753754
mq_receive_result=shm_mq_receive(mqh,nbytesp,datap, true);
754755
if (mq_receive_result!=SHM_MQ_WOULD_BLOCK)
755756
returnmq_receive_result;
@@ -772,6 +773,8 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
772773
INSTR_TIME_SUBTRACT(cur_time,start_time);
773774

774775
delay=timeout- (long)INSTR_TIME_GET_MILLISEC(cur_time);
776+
if (delay <=0)
777+
returnSHM_MQ_WOULD_BLOCK;
775778

776779
CHECK_FOR_INTERRUPTS();
777780
ResetLatch(MyLatch);
@@ -970,6 +973,9 @@ GetRemoteBackendQueryStates(PGPROC *leader,
970973
PGPROC*proc= (PGPROC*)lfirst(iter);
971974
if (!proc|| !proc->pid)
972975
continue;
976+
977+
pg_atomic_add_fetch_u32(&counterpart_userid->n_peers,1);
978+
973979
sig_result=SendProcSignal(proc->pid,
974980
QueryStatePollReason,
975981
proc->backendId);
@@ -980,7 +986,6 @@ GetRemoteBackendQueryStates(PGPROC *leader,
980986
continue;
981987
}
982988

983-
pg_atomic_add_fetch_u32(&counterpart_userid->n_peers,1);
984989
alive_procs=lappend(alive_procs,proc);
985990
}
986991

@@ -1018,7 +1023,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10181023
mq_receive_result=shm_mq_receive_with_timeout(mqh,
10191024
&len,
10201025
(void**)&msg,
1021-
MIN_TIMEOUT);
1026+
MAX_TIMEOUT);
10221027
if (mq_receive_result!=SHM_MQ_SUCCESS)
10231028
/* counterpart is died, not consider it */
10241029
continue;

‎pg_query_state.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
#defineTIMINIG_OFF_WARNING 1
2323
#defineBUFFERS_OFF_WARNING 2
2424

25+
#defineMAX_TIMEOUT 5000/* 5 seconds */
26+
27+
2528
/*
2629
* Result status on query state request from asked backend
2730
*/

‎signal_handler.c

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ void
157157
SendQueryState(void)
158158
{
159159
shm_mq_handle*mqh;
160+
instr_timestart_time;
161+
instr_timecur_time;
162+
longdelay=MAX_TIMEOUT;
163+
164+
INSTR_TIME_SET_CURRENT(start_time);
160165

161166
/* wait until caller sets this process as sender to message queue */
162167
for (;;)
@@ -165,12 +170,22 @@ SendQueryState(void)
165170
break;
166171

167172
#ifPG_VERSION_NUM<100000
168-
WaitLatch(MyLatch,WL_LATCH_SET,0);
173+
WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,delay);
169174
#elifPG_VERSION_NUM<120000
170-
WaitLatch(MyLatch,WL_LATCH_SET,0,PG_WAIT_IPC);
175+
WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,delay,PG_WAIT_IPC);
171176
#else
172-
WaitLatch(MyLatch,WL_LATCH_SET |WL_EXIT_ON_PM_DEATH,0,PG_WAIT_IPC);
177+
WaitLatch(MyLatch,WL_LATCH_SET |WL_EXIT_ON_PM_DEATH |WL_TIMEOUT,delay,PG_WAIT_IPC);
173178
#endif
179+
INSTR_TIME_SET_CURRENT(cur_time);
180+
INSTR_TIME_SUBTRACT(cur_time,start_time);
181+
182+
delay=MAX_TIMEOUT- (long)INSTR_TIME_GET_MILLISEC(cur_time);
183+
if (delay <=0)
184+
{
185+
elog(WARNING,"pg_query_state: failed to receive request from leader");
186+
DetachPeer();
187+
return;
188+
}
174189
CHECK_FOR_INTERRUPTS();
175190
ResetLatch(MyLatch);
176191
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp