Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit305bb17

Browse files
committed
Improve deadlock detection algorithm by taking in account hidden dependencies between transactions caused by lack of vacant workers in apply pool
1 parent82d2e2d commit305bb17

File tree

4 files changed

+69
-26
lines changed

4 files changed

+69
-26
lines changed

‎contrib/mmts/bgwpool.c‎

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ static void BgwPoolMainLoop(Datum arg)
3535
work=malloc(size);
3636
pool->pending-=1;
3737
pool->active+=1;
38+
if (pool->lastPeakTime==0&&pool->active==pool->nWorkers&&pool->pending!=0) {
39+
pool->lastPeakTime=MtmGetSystemTime();
40+
}
3841
if (pool->head+size+4>pool->size) {
3942
memcpy(work,pool->queue,size);
4043
pool->head=INTALIGN(size);
@@ -48,17 +51,19 @@ static void BgwPoolMainLoop(Datum arg)
4851
if (pool->producerBlocked) {
4952
pool->producerBlocked= false;
5053
PGSemaphoreUnlock(&pool->overflow);
54+
pool->lastPeakTime=0;
5155
}
5256
SpinLockRelease(&pool->lock);
5357
pool->executor(id,work,size);
5458
free(work);
5559
SpinLockAcquire(&pool->lock);
5660
pool->active-=1;
61+
pool->lastPeakTime=0;
5762
SpinLockRelease(&pool->lock);
5863
}
5964
}
6065

61-
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize)
66+
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize,size_tnWorkers)
6267
{
6368
pool->queue= (char*)ShmemAlloc(queueSize);
6469
pool->executor=executor;
@@ -73,8 +78,15 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7378
pool->size=queueSize;
7479
pool->active=0;
7580
pool->pending=0;
81+
pool->nWorkers=nWorkers;
82+
pool->lastPeakTime=0;
7683
strcpy(pool->dbname,dbname);
7784
}
85+
86+
timestamp_tBgwGetLastPeekTime(BgwPool*pool)
87+
{
88+
returnpool->lastPeakTime;
89+
}
7890

7991
voidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor)
8092
{
@@ -123,12 +135,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
123135
if ((pool->head <=pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
124136
|| (pool->head>pool->tail&&pool->head-pool->tail<size+4))
125137
{
126-
pool->producerBlocked= true;
138+
if (pool->lastPeakTime==0) {
139+
pool->lastPeakTime=MtmGetSystemTime();
140+
}
141+
pool->producerBlocked= true;
127142
SpinLockRelease(&pool->lock);
128143
PGSemaphoreLock(&pool->overflow);
129144
SpinLockAcquire(&pool->lock);
130145
}else {
131146
pool->pending+=1;
147+
if (pool->lastPeakTime==0&&pool->active==pool->nWorkers&&pool->pending!=0) {
148+
pool->lastPeakTime=MtmGetSystemTime();
149+
}
132150
*(int*)&pool->queue[pool->tail]=size;
133151
if (pool->size-pool->tail >=size+4) {
134152
memcpy(&pool->queue[pool->tail+4],work,size);

‎contrib/mmts/bgwpool.h‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
99

10+
typedefuint64timestamp_t;
11+
1012
#defineMAX_DBNAME_LEN 30
1113
#defineMULTIMASTER_BGW_RESTART_TIMEOUT 1/* seconds */
1214

@@ -21,6 +23,8 @@ typedef struct
2123
size_tsize;
2224
size_tactive;
2325
size_tpending;
26+
size_tnWorkers;
27+
time_tlastPeakTime;
2428
boolproducerBlocked;
2529
chardbname[MAX_DBNAME_LEN];
2630
char*queue;
@@ -30,10 +34,12 @@ typedef BgwPool*(*BgwPoolConstructor)(void);
3034

3135
externvoidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor);
3236

33-
externvoidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize);
37+
externvoidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize,size_tnWorkers);
3438

3539
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
3640

3741
externsize_tBgwPoolGetQueueSize(BgwPool*pool);
3842

43+
externtimestamp_tBgwGetLastPeekTime(BgwPool*pool);
44+
3945
#endif

‎contrib/mmts/multimaster.c‎

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,18 @@ void MtmUnlockNode(int nodeId)
256256
*/
257257

258258

259-
timestamp_tMtmGetCurrentTime(void)
259+
timestamp_tMtmGetSystemTime(void)
260260
{
261261
structtimevaltv;
262262
gettimeofday(&tv,NULL);
263263
return (timestamp_t)tv.tv_sec*USEC+tv.tv_usec+Mtm->timeShift;
264264
}
265265

266+
timestamp_tMtmGetCurrentTime(void)
267+
{
268+
returnMtmGetSystemTime()+Mtm->timeShift;
269+
}
270+
266271
voidMtmSleep(timestamp_tinterval)
267272
{
268273
structtimespects;
@@ -1046,7 +1051,7 @@ void MtmRecoveryCompleted(void)
10461051
MtmLock(LW_EXCLUSIVE);
10471052
Mtm->recoverySlot=0;
10481053
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
1049-
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime=time(NULL);
1054+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
10501055
/* Mode will be changed to online once all locagical reciever are connected */
10511056
MtmSwitchClusterMode(MTM_CONNECTED);
10521057
MtmUnlock();
@@ -1135,7 +1140,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11351140
/* We are lucky: caugth-up without locking cluster! */
11361141
}
11371142
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1138-
Mtm->nodes[nodeId-1].lastStatusChangeTime=time(NULL);
1143+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
11391144
Mtm->nNodes+=1;
11401145
caughtUp= true;
11411146
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
@@ -1280,15 +1285,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12801285
if (mask&1) {
12811286
Mtm->nNodes-=1;
12821287
BIT_SET(Mtm->disabledNodeMask,i);
1283-
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
1288+
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
12841289
}
12851290
}
12861291
mask=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
12871292
for (i=0;mask!=0;i++,mask >>=1) {
12881293
if (mask&1) {
12891294
Mtm->nNodes+=1;
12901295
BIT_CLEAR(Mtm->disabledNodeMask,i);
1291-
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
1296+
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
12921297
}
12931298
}
12941299
MtmCheckQuorum();
@@ -1328,7 +1333,7 @@ void MtmOnNodeDisconnect(int nodeId)
13281333
{
13291334
MtmTransState*ts;
13301335

1331-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MtmNodeDisableDelay>time(NULL)) {
1336+
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)>MtmGetSystemTime()) {
13321337
/* Avoid false detection of node failure and prevent node status blinking */
13331338
return;
13341339
}
@@ -1343,7 +1348,7 @@ void MtmOnNodeDisconnect(int nodeId)
13431348
if (!MtmRefreshClusterStatus(false)) {
13441349
MtmLock(LW_EXCLUSIVE);
13451350
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1346-
Mtm->nodes[nodeId-1].lastStatusChangeTime=time(NULL);
1351+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
13471352
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
13481353
Mtm->nNodes-=1;
13491354
MtmCheckQuorum();
@@ -1511,14 +1516,14 @@ static void MtmInitialize()
15111516
for (i=0;i<MtmNodes;i++) {
15121517
Mtm->nodes[i].oldestSnapshot=0;
15131518
Mtm->nodes[i].transDelay=0;
1514-
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
1519+
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
15151520
Mtm->nodes[i].con=MtmConnections[i];
15161521
Mtm->nodes[i].flushPos=0;
15171522
}
15181523
PGSemaphoreCreate(&Mtm->votingSemaphore);
15191524
PGSemaphoreReset(&Mtm->votingSemaphore);
15201525
SpinLockInit(&Mtm->spinlock);
1521-
BgwPoolInit(&Mtm->pool,MtmExecutor,MtmDatabaseName,MtmQueueSize);
1526+
BgwPoolInit(&Mtm->pool,MtmExecutor,MtmDatabaseName,MtmQueueSize,MtmWorkers);
15221527
RegisterXactCallback(MtmXactCallback,NULL);
15231528
MtmTx.snapshot=INVALID_CSN;
15241529
MtmTx.xid=InvalidTransactionId;
@@ -1682,10 +1687,10 @@ _PG_init(void)
16821687

16831688
DefineCustomIntVariable(
16841689
"multimaster.node_disable_delay",
1685-
"Minamal amount of time (sec) between node status change",
1690+
"Minamal amount of time (msec) between node status change",
16861691
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
16871692
&MtmNodeDisableDelay,
1688-
1,
1693+
1000,
16891694
1,
16901695
INT_MAX,
16911696
PGC_BACKEND,
@@ -2033,7 +2038,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20332038
{
20342039
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nNodes);
20352040
}
2036-
Mtm->nodes[nodeId-1].lastStatusChangeTime=time(NULL);
2041+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
20372042
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
20382043
Mtm->nNodes-=1;
20392044
MtmCheckQuorum();
@@ -2084,15 +2089,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20842089
if (MtmIsRecoverySession) {
20852090
MTM_LOG1("%d: Node %d start recovery of node %d",MyProcPid,MtmNodeId,MtmReplicationNodeId);
20862091
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
2087-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=time(NULL);
2092+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
20882093
BIT_SET(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
20892094
Mtm->nNodes-=1;
20902095
MtmCheckQuorum();
20912096
}
20922097
}elseif (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
20932098
if (recoveryCompleted) {
20942099
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication",MtmNodeId,MtmReplicationNodeId);
2095-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=time(NULL);
2100+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
20962101
BIT_CLEAR(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
20972102
Mtm->nNodes+=1;
20982103
MtmCheckQuorum();
@@ -2239,7 +2244,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
22392244
}
22402245
if (!nowait) {
22412246
/* Just wait some time until logical repication channels will be reestablished */
2242-
MtmSleep(MtmNodeDisableDelay);
2247+
MtmSleep(MSEC_TO_USEC(MtmNodeDisableDelay));
22432248
}
22442249
PG_RETURN_BOOL(online);
22452250
}
@@ -2298,7 +2303,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22982303
usrfctx->values[4]=Int64GetDatum(lag);
22992304
usrfctx->nulls[4]=lag<0;
23002305
usrfctx->values[5]=Int64GetDatum(Mtm->transCount ?Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount :0);
2301-
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2306+
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USEC));
23022307
usrfctx->values[7]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23032308
usrfctx->nodeId+=1;
23042309

@@ -3061,6 +3066,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
30613066
MtmGetGtid(pgxact->xid,&gtid);
30623067
hasDeadlock=MtmGraphFindLoop(&graph,&gtid);
30633068
elog(WARNING,"Distributed deadlock check for %u:%u = %d",gtid.node,gtid.xid,hasDeadlock);
3069+
if (!hasDeadlock) {
3070+
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3071+
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3072+
* refelected in lock graph
3073+
*/
3074+
timestamp_tlastPeekTime=BgwGetLastPeekTime(&Mtm->pool);
3075+
if (lastPeekTime!=0&&MtmGetSystemTime()-lastPeekTime >=MSEC_TO_USEC(DeadlockTimeout)) {
3076+
hasDeadlock= true;
3077+
elog(WARNING,"Apply workers were blocked more than %d msec",
3078+
(int)USEC_TO_MSEC(MtmGetSystemTime()-lastPeekTime));
3079+
}
3080+
}
30643081
}
30653082
returnhasDeadlock;
30663083
}

‎contrib/mmts/multimaster.h‎

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848

4949
#defineUSEC 1000000
5050

51+
#defineUSEC_TO_MSEC(t) ((t)/1000)
52+
#defineMSEC_TO_USEC(t) ((t)*1000)
53+
5154
#defineNatts_mtm_ddl_log 2
5255
#defineAnum_mtm_ddl_log_issued1
5356
#defineAnum_mtm_ddl_log_query2
@@ -72,8 +75,6 @@ typedef uint64 csn_t; /* commit serial number */
7275
#definePGLOGICAL_CAUGHT_UP 0x04
7376

7477

75-
typedefuint64timestamp_t;
76-
7778
/* Identifier of global transaction */
7879
typedefstruct
7980
{
@@ -122,9 +123,9 @@ typedef struct
122123
typedefstruct
123124
{
124125
MtmConnectionInfocon;
125-
time_ttransDelay;
126-
time_tlastStatusChangeTime;
127-
XLogRecPtrflushPos;
126+
timestamp_ttransDelay;
127+
timestamp_tlastStatusChangeTime;
128+
XLogRecPtrflushPos;
128129
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
129130
}MtmNodeInfo;
130131

@@ -232,8 +233,9 @@ extern void MtmRecoverNode(int nodeId);
232233
externvoidMtmOnNodeDisconnect(intnodeId);
233234
externvoidMtmOnNodeConnect(intnodeId);
234235
externvoidMtmWakeUpBackend(MtmTransState*ts);
235-
externtimestamp_tMtmGetCurrentTime(void);
236-
externvoidMtmSleep(timestamp_tinterval);
236+
externtimestamp_tMtmGetSystemTime(void);/* non-adjusted current system time */
237+
externtimestamp_tMtmGetCurrentTime(void);/* adjusted current system time */
238+
externvoidMtmSleep(timestamp_tinterval);
237239
externvoidMtmAbortTransaction(MtmTransState*ts);
238240
externvoidMtmSetCurrentTransactionGID(charconst*gid);
239241
externcsn_tMtmGetTransactionCSN(TransactionIdxid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp