Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit97c667c

Browse files
knizhnikkelvich
authored andcommitted
Improve deadlock detection algorithm by taking in account hidden dependencies between transactions caused by lack of vacant workers in apply pool
1 parent46a5c82 commit97c667c

File tree

4 files changed

+69
-26
lines changed

4 files changed

+69
-26
lines changed

‎bgwpool.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ static void BgwPoolMainLoop(Datum arg)
3535
work=malloc(size);
3636
pool->pending-=1;
3737
pool->active+=1;
38+
if (pool->lastPeakTime==0&&pool->active==pool->nWorkers&&pool->pending!=0) {
39+
pool->lastPeakTime=MtmGetSystemTime();
40+
}
3841
if (pool->head+size+4>pool->size) {
3942
memcpy(work,pool->queue,size);
4043
pool->head=INTALIGN(size);
@@ -48,17 +51,19 @@ static void BgwPoolMainLoop(Datum arg)
4851
if (pool->producerBlocked) {
4952
pool->producerBlocked= false;
5053
PGSemaphoreUnlock(&pool->overflow);
54+
pool->lastPeakTime=0;
5155
}
5256
SpinLockRelease(&pool->lock);
5357
pool->executor(id,work,size);
5458
free(work);
5559
SpinLockAcquire(&pool->lock);
5660
pool->active-=1;
61+
pool->lastPeakTime=0;
5762
SpinLockRelease(&pool->lock);
5863
}
5964
}
6065

61-
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize)
66+
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize,size_tnWorkers)
6267
{
6368
pool->queue= (char*)ShmemAlloc(queueSize);
6469
pool->executor=executor;
@@ -73,8 +78,15 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7378
pool->size=queueSize;
7479
pool->active=0;
7580
pool->pending=0;
81+
pool->nWorkers=nWorkers;
82+
pool->lastPeakTime=0;
7683
strcpy(pool->dbname,dbname);
7784
}
85+
86+
timestamp_tBgwGetLastPeekTime(BgwPool*pool)
87+
{
88+
returnpool->lastPeakTime;
89+
}
7890

7991
voidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor)
8092
{
@@ -123,12 +135,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
123135
if ((pool->head <=pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
124136
|| (pool->head>pool->tail&&pool->head-pool->tail<size+4))
125137
{
126-
pool->producerBlocked= true;
138+
if (pool->lastPeakTime==0) {
139+
pool->lastPeakTime=MtmGetSystemTime();
140+
}
141+
pool->producerBlocked= true;
127142
SpinLockRelease(&pool->lock);
128143
PGSemaphoreLock(&pool->overflow);
129144
SpinLockAcquire(&pool->lock);
130145
}else {
131146
pool->pending+=1;
147+
if (pool->lastPeakTime==0&&pool->active==pool->nWorkers&&pool->pending!=0) {
148+
pool->lastPeakTime=MtmGetSystemTime();
149+
}
132150
*(int*)&pool->queue[pool->tail]=size;
133151
if (pool->size-pool->tail >=size+4) {
134152
memcpy(&pool->queue[pool->tail+4],work,size);

‎bgwpool.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
99

10+
typedefuint64timestamp_t;
11+
1012
#defineMAX_DBNAME_LEN 30
1113
#defineMULTIMASTER_BGW_RESTART_TIMEOUT 1/* seconds */
1214

@@ -21,6 +23,8 @@ typedef struct
2123
size_tsize;
2224
size_tactive;
2325
size_tpending;
26+
size_tnWorkers;
27+
time_tlastPeakTime;
2428
boolproducerBlocked;
2529
chardbname[MAX_DBNAME_LEN];
2630
char*queue;
@@ -30,10 +34,12 @@ typedef BgwPool*(*BgwPoolConstructor)(void);
3034

3135
externvoidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor);
3236

33-
externvoidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize);
37+
externvoidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize,size_tnWorkers);
3438

3539
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
3640

3741
externsize_tBgwPoolGetQueueSize(BgwPool*pool);
3842

43+
externtimestamp_tBgwGetLastPeekTime(BgwPool*pool);
44+
3945
#endif

‎multimaster.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,18 @@ void MtmUnlockNode(int nodeId)
255255
*/
256256

257257

258-
timestamp_tMtmGetCurrentTime(void)
258+
timestamp_tMtmGetSystemTime(void)
259259
{
260260
structtimevaltv;
261261
gettimeofday(&tv,NULL);
262262
return (timestamp_t)tv.tv_sec*USEC+tv.tv_usec+Mtm->timeShift;
263263
}
264264

265+
timestamp_tMtmGetCurrentTime(void)
266+
{
267+
returnMtmGetSystemTime()+Mtm->timeShift;
268+
}
269+
265270
voidMtmSleep(timestamp_tinterval)
266271
{
267272
structtimespects;
@@ -1045,7 +1050,7 @@ void MtmRecoveryCompleted(void)
10451050
MtmLock(LW_EXCLUSIVE);
10461051
Mtm->recoverySlot=0;
10471052
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
1048-
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime=time(NULL);
1053+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
10491054
/* Mode will be changed to online once all locagical reciever are connected */
10501055
MtmSwitchClusterMode(MTM_CONNECTED);
10511056
MtmUnlock();
@@ -1134,7 +1139,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11341139
/* We are lucky: caugth-up without locking cluster! */
11351140
}
11361141
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1137-
Mtm->nodes[nodeId-1].lastStatusChangeTime=time(NULL);
1142+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
11381143
Mtm->nNodes+=1;
11391144
caughtUp= true;
11401145
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
@@ -1279,15 +1284,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12791284
if (mask&1) {
12801285
Mtm->nNodes-=1;
12811286
BIT_SET(Mtm->disabledNodeMask,i);
1282-
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
1287+
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
12831288
}
12841289
}
12851290
mask=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
12861291
for (i=0;mask!=0;i++,mask >>=1) {
12871292
if (mask&1) {
12881293
Mtm->nNodes+=1;
12891294
BIT_CLEAR(Mtm->disabledNodeMask,i);
1290-
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
1295+
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
12911296
}
12921297
}
12931298
MtmCheckQuorum();
@@ -1327,7 +1332,7 @@ void MtmOnNodeDisconnect(int nodeId)
13271332
{
13281333
MtmTransState*ts;
13291334

1330-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MtmNodeDisableDelay>time(NULL)) {
1335+
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)>MtmGetSystemTime()) {
13311336
/* Avoid false detection of node failure and prevent node status blinking */
13321337
return;
13331338
}
@@ -1342,7 +1347,7 @@ void MtmOnNodeDisconnect(int nodeId)
13421347
if (!MtmRefreshClusterStatus(false)) {
13431348
MtmLock(LW_EXCLUSIVE);
13441349
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1345-
Mtm->nodes[nodeId-1].lastStatusChangeTime=time(NULL);
1350+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
13461351
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
13471352
Mtm->nNodes-=1;
13481353
MtmCheckQuorum();
@@ -1510,14 +1515,14 @@ static void MtmInitialize()
15101515
for (i=0;i<MtmNodes;i++) {
15111516
Mtm->nodes[i].oldestSnapshot=0;
15121517
Mtm->nodes[i].transDelay=0;
1513-
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
1518+
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
15141519
Mtm->nodes[i].con=MtmConnections[i];
15151520
Mtm->nodes[i].flushPos=0;
15161521
}
15171522
PGSemaphoreCreate(&Mtm->votingSemaphore);
15181523
PGSemaphoreReset(&Mtm->votingSemaphore);
15191524
SpinLockInit(&Mtm->spinlock);
1520-
BgwPoolInit(&Mtm->pool,MtmExecutor,MtmDatabaseName,MtmQueueSize);
1525+
BgwPoolInit(&Mtm->pool,MtmExecutor,MtmDatabaseName,MtmQueueSize,MtmWorkers);
15211526
RegisterXactCallback(MtmXactCallback,NULL);
15221527
MtmTx.snapshot=INVALID_CSN;
15231528
MtmTx.xid=InvalidTransactionId;
@@ -1681,10 +1686,10 @@ _PG_init(void)
16811686

16821687
DefineCustomIntVariable(
16831688
"multimaster.node_disable_delay",
1684-
"Minamal amount of time (sec) between node status change",
1689+
"Minamal amount of time (msec) between node status change",
16851690
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
16861691
&MtmNodeDisableDelay,
1687-
1,
1692+
1000,
16881693
1,
16891694
INT_MAX,
16901695
PGC_BACKEND,
@@ -2032,7 +2037,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20322037
{
20332038
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nNodes);
20342039
}
2035-
Mtm->nodes[nodeId-1].lastStatusChangeTime=time(NULL);
2040+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
20362041
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
20372042
Mtm->nNodes-=1;
20382043
MtmCheckQuorum();
@@ -2083,15 +2088,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20832088
if (MtmIsRecoverySession) {
20842089
MTM_LOG1("%d: Node %d start recovery of node %d",MyProcPid,MtmNodeId,MtmReplicationNodeId);
20852090
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
2086-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=time(NULL);
2091+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
20872092
BIT_SET(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
20882093
Mtm->nNodes-=1;
20892094
MtmCheckQuorum();
20902095
}
20912096
}elseif (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
20922097
if (recoveryCompleted) {
20932098
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication",MtmNodeId,MtmReplicationNodeId);
2094-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=time(NULL);
2099+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
20952100
BIT_CLEAR(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
20962101
Mtm->nNodes+=1;
20972102
MtmCheckQuorum();
@@ -2238,7 +2243,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
22382243
}
22392244
if (!nowait) {
22402245
/* Just wait some time until logical repication channels will be reestablished */
2241-
MtmSleep(MtmNodeDisableDelay);
2246+
MtmSleep(MSEC_TO_USEC(MtmNodeDisableDelay));
22422247
}
22432248
PG_RETURN_BOOL(online);
22442249
}
@@ -2297,7 +2302,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22972302
usrfctx->values[4]=Int64GetDatum(lag);
22982303
usrfctx->nulls[4]=lag<0;
22992304
usrfctx->values[5]=Int64GetDatum(Mtm->transCount ?Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount :0);
2300-
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2305+
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USEC));
23012306
usrfctx->values[7]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23022307
usrfctx->nodeId+=1;
23032308

@@ -3058,6 +3063,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
30583063
MtmGetGtid(pgxact->xid,&gtid);
30593064
hasDeadlock=MtmGraphFindLoop(&graph,&gtid);
30603065
elog(WARNING,"Distributed deadlock check for %u:%u = %d",gtid.node,gtid.xid,hasDeadlock);
3066+
if (!hasDeadlock) {
3067+
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3068+
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3069+
* refelected in lock graph
3070+
*/
3071+
timestamp_tlastPeekTime=BgwGetLastPeekTime(&Mtm->pool);
3072+
if (lastPeekTime!=0&&MtmGetSystemTime()-lastPeekTime >=MSEC_TO_USEC(DeadlockTimeout)) {
3073+
hasDeadlock= true;
3074+
elog(WARNING,"Apply workers were blocked more than %d msec",
3075+
(int)USEC_TO_MSEC(MtmGetSystemTime()-lastPeekTime));
3076+
}
3077+
}
30613078
}
30623079
returnhasDeadlock;
30633080
}

‎multimaster.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848

4949
#defineUSEC 1000000
5050

51+
#defineUSEC_TO_MSEC(t) ((t)/1000)
52+
#defineMSEC_TO_USEC(t) ((t)*1000)
53+
5154
#defineNatts_mtm_ddl_log 2
5255
#defineAnum_mtm_ddl_log_issued1
5356
#defineAnum_mtm_ddl_log_query2
@@ -72,8 +75,6 @@ typedef uint64 csn_t; /* commit serial number */
7275
#definePGLOGICAL_CAUGHT_UP 0x04
7376

7477

75-
typedefuint64timestamp_t;
76-
7778
/* Identifier of global transaction */
7879
typedefstruct
7980
{
@@ -122,9 +123,9 @@ typedef struct
122123
typedefstruct
123124
{
124125
MtmConnectionInfocon;
125-
time_ttransDelay;
126-
time_tlastStatusChangeTime;
127-
XLogRecPtrflushPos;
126+
timestamp_ttransDelay;
127+
timestamp_tlastStatusChangeTime;
128+
XLogRecPtrflushPos;
128129
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
129130
}MtmNodeInfo;
130131

@@ -232,8 +233,9 @@ extern void MtmRecoverNode(int nodeId);
232233
externvoidMtmOnNodeDisconnect(intnodeId);
233234
externvoidMtmOnNodeConnect(intnodeId);
234235
externvoidMtmWakeUpBackend(MtmTransState*ts);
235-
externtimestamp_tMtmGetCurrentTime(void);
236-
externvoidMtmSleep(timestamp_tinterval);
236+
externtimestamp_tMtmGetSystemTime(void);/* non-adjusted current system time */
237+
externtimestamp_tMtmGetCurrentTime(void);/* adjusted current system time */
238+
externvoidMtmSleep(timestamp_tinterval);
237239
externvoidMtmAbortTransaction(MtmTransState*ts);
238240
externvoidMtmSetCurrentTransactionGID(charconst*gid);
239241
externcsn_tMtmGetTransactionCSN(TransactionIdxid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp