Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit69f6a0b

Browse files
committed
Distributed deadlock detection
1 parent3c5b2fb commit69f6a0b

File tree

5 files changed

+54
-34
lines changed

5 files changed

+54
-34
lines changed

‎contrib/pg_dtm/dtmd/src/ddd.c‎

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ void initGraph(Graph* graph)
1313
graph->freeEdges=NULL;
1414
graph->freeVertexes=NULL;
1515
graph->marker=0;
16-
graph->min_deadlock_duration=3;
1716
}
1817

1918
staticinlineEdge*newEdge(Graph*graph)
@@ -54,7 +53,6 @@ static inline Vertex* newVertex(Graph* graph)
5453
}else {
5554
graph->freeVertexes=v->next;
5655
}
57-
v->deadlock_duration=0;
5856
returnv;
5957
}
6058

@@ -148,9 +146,8 @@ bool detectDeadLock(Graph* graph, xid_t root)
148146
for (v=graph->hashtable[root %MAX_TRANSACTIONS];v!=NULL;v=v->next) {
149147
if (v->xid==root) {
150148
if (recursiveTraverseGraph(v,v,++graph->marker)) {
151-
return++v->deadlock_duration >=graph->min_deadlock_duration;
149+
returntrue;
152150
}
153-
v->deadlock_duration=0;
154151
break;
155152
}
156153
}

‎contrib/pg_dtm/dtmd/src/main.c‎

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ static void onmessage(client_t client, size_t len, char *data) {
599599

600600
staticvoidusage(char*prog) {
601601
printf(
602-
"Usage: %s [-d DATADIR] [-k] [-a HOST] [-p PORT] [-l LOGFILE] [-m MIN_DEADLOCK_DURATION]\n"
602+
"Usage: %s [-d DATADIR] [-k] [-a HOST] [-p PORT] [-l LOGFILE]\n"
603603
" arbiter will try to kill the other one running at\n"
604604
" the same DATADIR.\n"
605605
" -l : Run as a daemon and write output to LOGFILE.\n"
@@ -716,9 +716,6 @@ int main(int argc, char **argv) {
716716
case'k':
717717
assassin= true;
718718
break;
719-
case'm':
720-
graph.min_deadlock_duration=atoi(optarg);
721-
break;
722719
default:
723720
usage(argv[0]);
724721
returnEXIT_FAILURE;

‎contrib/pg_dtm/pg_dtm.c‎

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void _PG_fini(void);
7171
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
7272
staticvoidDtmMergeWithGlobalSnapshot(Snapshotsnapshot);
7373
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
74-
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
74+
staticboolDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
7575
staticvoidDtmUpdateRecentXmin(Snapshotsnapshot);
7676
staticvoidDtmInitialize(void);
7777
staticvoidDtmXactCallback(XactEventevent,void*arg);
@@ -627,8 +627,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
627627
returnstatus;
628628
}
629629

630-
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
630+
staticboolDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
631631
{
632+
boolacknowledged= true;
632633
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
633634
if (!RecoveryInProgress())
634635
{
@@ -640,7 +641,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
640641
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
641642
DtmGlobalSetTransStatus(xid,status, false);
642643
XTM_INFO("Abort transaction %d\n",xid);
643-
return;
644+
return true;
644645
}
645646
else
646647
{
@@ -649,8 +650,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
649650
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
650651
hash_search(xid_in_doubt,&DtmNextXid,HASH_ENTER,NULL);
651652
LWLockRelease(dtm->hashLock);
652-
DtmGlobalSetTransStatus(xid,status, true);
653-
XTM_INFO("Commit transaction %d\n",xid);
653+
if (!DtmGlobalSetTransStatus(xid,status, true)) {
654+
acknowledged= false;
655+
XTM_INFO("Commit of transaction %d in rejected by DTM\n",xid);
656+
status=TRANSACTION_STATUS_ABORTED;
657+
}else {
658+
XTM_INFO("Commit transaction %d\n",xid);
659+
}
654660
}
655661
}
656662
else
@@ -661,11 +667,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
661667
else
662668
{
663669
XidStatusgs;
664-
gs=DtmGlobalGetTransStatus(xid, false);
665-
if (gs!=TRANSACTION_STATUS_UNKNOWN)
670+
gs=DtmGlobalGetTransStatus(xid, false);
671+
if (gs!=TRANSACTION_STATUS_UNKNOWN) {
672+
acknowledged=status==gs;
666673
status=gs;
674+
}
667675
}
668-
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
676+
returnPgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn)&&acknowledged;
669677
}
670678

671679
staticuint32dtm_xid_hash_fn(constvoid*key,Sizekeysize)
@@ -991,27 +999,35 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
991999
{
9921000
ByteBuffer*buf= (ByteBuffer*)arg;
9931001
LOCK*lock=proclock->tag.myLock;
994-
PGPROC*proc=proclock->tag.myProc;
1002+
PGPROC*proc=proclock->tag.myProc;
1003+
9951004
if (lock!=NULL) {
996-
if (proc->waitLock==lock) {
1005+
PGXACT*srcPgXact=&ProcGlobal->allPgXact[proc->pgprocno];
1006+
1007+
if (TransactionIdIsValid(srcPgXact->xid)&&proc->waitLock==lock) {
9971008
LockMethodlockMethodTable=GetLocksMethodTable(lock);
9981009
intnumLockModes=lockMethodTable->numLockModes;
9991010
intconflictMask=lockMethodTable->conflictTab[proc->waitLockMode];
10001011
SHM_QUEUE*procLocks=&(lock->procLocks);
10011012
intlm;
10021013

1003-
ByteBufferAppendInt32(buf,proc->lxid);/* waiting transaction */
1014+
ByteBufferAppendInt32(buf,srcPgXact->xid);/* waiting transaction */
10041015
proclock= (PROCLOCK*)SHMQueueNext(procLocks,procLocks,
10051016
offsetof(PROCLOCK,lockLink));
10061017
while (proclock)
10071018
{
10081019
if (proc!=proclock->tag.myProc) {
1009-
for (lm=1;lm <=numLockModes;lm++)
1010-
{
1011-
if ((proclock->holdMask&LOCKBIT_ON(lm))&& (conflictMask&LOCKBIT_ON(lm)))
1020+
PGXACT*dstPgXact=&ProcGlobal->allPgXact[proclock->tag.myProc->pgprocno];
1021+
if (TransactionIdIsValid(dstPgXact->xid)) {
1022+
Assert(srcPgXact->xid!=dstPgXact->xid);
1023+
for (lm=1;lm <=numLockModes;lm++)
10121024
{
1013-
ByteBufferAppendInt32(buf,proclock->tag.myProc->lxid);/* transaction holding lock */
1014-
break;
1025+
if ((proclock->holdMask&LOCKBIT_ON(lm))&& (conflictMask&LOCKBIT_ON(lm)))
1026+
{
1027+
XTM_INFO("%d: %u(%u) waits for %u(%u)\n",getpid(),srcPgXact->xid,proc->pid,dstPgXact->xid,proclock->tag.myProc->pid);
1028+
ByteBufferAppendInt32(buf,dstPgXact->xid);/* transaction holding lock */
1029+
break;
1030+
}
10151031
}
10161032
}
10171033
}
@@ -1025,12 +1041,19 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
10251041

10261042
boolDtmDetectGlobalDeadLock(PGPROC*proc)
10271043
{
1028-
boolhasDeadlock;
1044+
boolhasDeadlock= false;
10291045
ByteBufferbuf;
1030-
ByteBufferAlloc(&buf);
1031-
EnumerateLocks(DtmSerializeLock,&buf);
1032-
hasDeadlock=DtmGlobalDetectDeadLock(PostPortNumber,proc->lxid,buf.data,buf.used);
1033-
ByteBufferFree(&buf);
1034-
elog(NOTICE,"Deadlock detected for transaction %u",proc->lxid);
1046+
PGXACT*pgxact=&ProcGlobal->allPgXact[proc->pgprocno];
1047+
1048+
if (TransactionIdIsValid(pgxact->xid)) {
1049+
ByteBufferAlloc(&buf);
1050+
XTM_INFO("%d: wait graph begin\n",getpid());
1051+
EnumerateLocks(DtmSerializeLock,&buf);
1052+
XTM_INFO("%d: wait graph end\n",getpid());
1053+
hasDeadlock=DtmGlobalDetectDeadLock(PostPortNumber,pgxact->xid,buf.data,buf.used);
1054+
ByteBufferFree(&buf);
1055+
XTM_INFO("%d: deadlock detected for %u\n",getpid(),pgxact->xid);
1056+
elog(WARNING,"Deadlock detected for transaction %u",pgxact->xid);
1057+
}
10351058
returnhasDeadlock;
10361059
}

‎contrib/pg_dtm/tests/dtmbench.cpp‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,10 @@ void* writer(void* arg)
161161
for (int i =0; i < cfg.nIterations; i++)
162162
{
163163
int srcCon, dstCon;
164-
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165-
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
164+
//int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165+
//int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
166+
int srcAcc =random() % cfg.nAccounts;
167+
int dstAcc =random() % cfg.nAccounts;
166168

167169
do {
168170
srcCon =random() % cfg.connections.size();
@@ -183,7 +185,7 @@ void* writer(void* arg)
183185
exec(dstTx,"update t set v = v + 1 where u=%d", dstAcc);
184186
}catch (pqxx_exceptionconst& x) {
185187
exec(srcTx,"rollback");
186-
exec(srcTx,"rollback");
188+
exec(dstTx,"rollback");
187189
t.aborts +=1;
188190
i -=1;
189191
continue;

‎src/backend/access/transam/xact.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1312,6 +1312,7 @@ RecordTransactionCommit(void)
13121312
MyPgXact->delayChkpt= false;
13131313
END_CRIT_SECTION();
13141314
if (!committed) {
1315+
CurrentTransactionState->state=TRANS_ABORT;
13151316
elog(ERROR,"Transaction commit rejected by XTM");
13161317
}
13171318
}
@@ -5405,7 +5406,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
54055406
StandbyReleaseLockTree(xid,0,NULL);
54065407
}
54075408
if (!committed) {
5408-
elog(NOTICE,"XTM rejectedrecovert oftran saction %u",xid);
5409+
elog(WARNING,"XTM rejectedrecovery oftransaction %u",xid);
54095410
}
54105411
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
54115412
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp