Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3a62970

Browse files
knizhnikkelvich
authored andcommitted
Distributed deadlock detections
1 parente8cf78f commit3a62970

File tree

5 files changed

+166
-12
lines changed

5 files changed

+166
-12
lines changed

‎arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ static void MtmOpenConnections()
335335
if (i+1!=MtmNodeId) {
336336
sockets[i]=MtmConnectSocket(host,MtmArbiterPort+i+1,MtmConnectAttempts);
337337
if (sockets[i]<0) {
338-
MtmDropNode(i+1, false);
338+
MtmOnLostConnection(i+1);
339339
}
340340
}else {
341341
sockets[i]=-1;
@@ -358,7 +358,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
358358
close(sockets[node]);
359359
sockets[node]=MtmConnectSocket(hosts[node],MtmArbiterPort+node+1,MtmReconnectAttempts);
360360
if (sockets[node]<0) {
361-
MtmDropNode(node+1, false);
361+
MtmOnLostConnection(node+1);
362362
return false;
363363
}
364364
}

‎ddd.c

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include<stddef.h>
2+
#include<stdlib.h>
3+
#include<string.h>
4+
#include"ddd.h"
5+
6+
7+
voidMtmGraphInit(MtmGraph*graph)
8+
{
9+
memset(graph->hashtable,0,sizeof(graph->hashtable));
10+
}
11+
12+
staticinlineMtmVertex*findVertex(MtmGraph*graph,GlobalTransactionId*gtid)
13+
{
14+
xid_th=gtid->xid %MAX_TRANSACTIONS;
15+
MtmVertex*v;
16+
for (v=graph->hashtable[h];v!=NULL;v=v->next) {
17+
if (v->gtid==*gtid) {
18+
returnv;
19+
}
20+
}
21+
v= (MtmVertex*)palloc(sizeof(MtmVertex));
22+
v->gtid=*gtid;
23+
v->outgoingEdges=NULL;
24+
v->collision=graph->hashtable[h];
25+
graph->hashtable[h]=v;
26+
returnv;
27+
}
28+
29+
voidMtmGraphAdd(MtmGraph*graph,GlobalTransactionId*gtid,intsize)
30+
{
31+
GlobalTransactionId*last=gtid+size;
32+
MtmEdge*e,*next,*edges=NULL;
33+
while (gtid!=last) {
34+
Vertex*src=findVertex(graph,gtid++);
35+
while (gtid->node!=0) {
36+
Vertex*dst=findVertex(graph,gtid++);
37+
e= (MtmEdge*)palloc(sizeof(MtmEdge));
38+
dst->nIncomingEdges+=1;
39+
e->dst=dst;
40+
e->src=src;
41+
e->next=v->outgoingEdges;
42+
v->outgoingEdges=e;
43+
}
44+
gtid+=1;
45+
}
46+
}
47+
48+
staticboolrecursiveTraverseGraph(MtmVertex*root,MtmVertex*v)
49+
{
50+
Edge*e;
51+
v->node=VISITED_NODE_MARK;
52+
for (e=v->outgoingEdges;e!=NULL;e=e->next) {
53+
if (e->dst==root) {
54+
return true;
55+
}elseif (e->dst->node!=VISITED_NODE_MAR&&recursiveTraverseGraph(root,e->dst)) {/* loop */
56+
return true;
57+
}
58+
}
59+
return false;
60+
}
61+
62+
boolMtmGraphFindLoop(MtmGraph*graph,GlobalTransactionId*root)
63+
{
64+
Vertex*v;
65+
for (v=graph->hashtable[root->xid %MAX_TRANSACTIONS];v!=NULL;v=v->next) {
66+
if (v->gtid==*root) {
67+
if (recursiveTraverseGraph(v,v)) {
68+
return true;
69+
}
70+
break;
71+
}
72+
}
73+
return false;
74+
}

‎ddd.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#ifndef__DDD_H__
2+
3+
#include"multimaster.h"
4+
5+
#defineMAX_TRANSACTIONS 1024
6+
#defineVISITED_NODE_MARK 0
7+
8+
typedefstructMtmEdge {
9+
structMtmEdge*next;/* list of outgoing edges */
10+
structMtmVertex*dst;
11+
structMtmVertex*src;
12+
}MtmEdge;
13+
14+
typedefstructMtmVertex
15+
{
16+
structMtmEdge*outgoingEdges;
17+
structMtmVertex*collision;
18+
GlobalTransactionIdgtid;
19+
}MtmVertex;
20+
21+
typedefstructMtmGraph
22+
{
23+
MtmVertex*hashtable[MTM_MAX_TRANSACTIONS];
24+
}Graph;
25+
26+
externvoidMtmGraphInit(MtmGraph*graph);
27+
externvoidMtmGraphAdd(MtmGraph*graph,GlobalTransactionId*subgraph,intsize);
28+
externboolMtmGraphFindLoop(MtmGraph*graph,GlobalTransactionId*root);
29+
30+
#endif

‎multimaster.c

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ static BgwPool* MtmPoolConstructor(void);
112112
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql);
113113
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
114114
staticvoidMtmVoteForTransaction(MtmTransState*ts);
115+
staticvoidMtmSerializeLockGraph(ByteBuffer*buf)
115116

116117
staticHTAB*xid2state;
117118
staticMtmCurrentTransdtmTx;
@@ -805,13 +806,6 @@ MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
805806
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
806807
}
807808

808-
staticbool
809-
MtmDetectGlobalDeadLock(PGPROC*proc)
810-
{
811-
elog(WARNING,"Global deadlock?");
812-
return true;
813-
}
814-
815809
staticvoid
816810
MtmShmemStartup(void)
817811
{
@@ -1582,7 +1576,63 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
15821576
}
15831577
}
15841578

1585-
voidMtmSerializeLockGraph(ByteBuffer*buf)
1579+
/* Stubs */
1580+
typedefPaxosTimestamp {
1581+
time_ttime;/* local time at master */
1582+
uint32naster;/* master node for this operation */
1583+
uint32psn;/* PAXOS serial number */
1584+
}PaxosTimestamp;
1585+
1586+
void*PaxosGet(charconst*key,int*size,PaxosTimestamp*ts);
1587+
voidPaxosSet(charconst*key,void*value,intsize);
1588+
1589+
1590+
staticbool
1591+
MtmDetectGlobalDeadLock(PGPROC*proc)
1592+
{
1593+
boolhasDeadlock= false;
1594+
ByteBufferbuf;
1595+
PGXACT*pgxact=&ProcGlobal->allPgXact[proc->pgprocno];
1596+
boolhasDeadlock= false;
1597+
if (TransactionIdIsValid(pgxact->xid)) {
1598+
MtmGraphgraph;
1599+
GlobalTransactionIdgtid;
1600+
inti;
1601+
1602+
ByteBufferAlloc(&buf);
1603+
EnumerateLocks(DtmSerializeLock,&buf);
1604+
ByteBufferFree(&buf);
1605+
PaxosPut(dsprintf("lock-graph-%d",MMNodeId),buf.data,buf.size);
1606+
MtmGraphInit(&graph);
1607+
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data,buf.size/sizeof(GlobalTransactionId));
1608+
for (i=0;i<MtmNodes;i++) {
1609+
if (i+1MtmNodeId&& !BIT_CHECK(dtm->disabledNodeMask,i)) {
1610+
intsize;
1611+
void*data=PaxosGet(dsprintf("lock-graph-%d",i+1),&size,NULL);
1612+
MtmGraphAdd(&graph, (GlobalTransactionId*)data,size/sizeof(GlobalTransactoinId));
1613+
}
1614+
}
1615+
MtmGetGtid(pgxact->xid,&gtid);
1616+
hasDeadlock=MtmGraphFindLoop(&graph,&gtid);
1617+
elog(WARNING,"Distributed deadlock check for %u:%u = %d",gtid.node,gtid.xid,hasDeadlock);
1618+
}
1619+
returnhasDeadlock;
1620+
}
1621+
1622+
voidMtmOnLostConnection(intnodeId)
15861623
{
1587-
EnumerateLocks(MtmSerializeLock,buf);
1624+
inti;
1625+
nodemask_tmask=dtm->disabledNodeMask;
1626+
nodemask_tmatrix= (nodemask_t*)palloc0(sizeof(nodemask_t)*MtmNodes);
1627+
1628+
mask |= (nodemask_t)1 << (nodeId-1);
1629+
PaxosPut(dsprintf("node-mask-%d",MMNodeId),&mask,sizeofmask);
1630+
matrix[MtmNodeId-1]=mask;
1631+
MtmSleep(MtmConnectTimeout);
1632+
for (i=0;i<MtnNodes;i++) {
1633+
if (i+1!=MtmNodeId&& !BIT_CHECK(dtm->disabledNodeMask,i)) {
1634+
void*data=PaxosGet(dsprintf("node-mask-%d",i+1),NULL,NULL);
1635+
matrix[i]=*(nodemask_t*)data;
1636+
}
1637+
}
15881638
}

‎multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,10 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
141141
externvoidMtmLock(LWLockModemode);
142142
externvoidMtmUnlock(void);
143143
externvoidMtmDropNode(intnodeId,booldropSlot);
144+
externvoidMtmOnLostConnection(intnodeId);
144145
externMtmState*MtmGetState(void);
145146
externtimestamp_tMtmGetCurrentTime(void);
146147
externvoidMtmSleep(timestamp_tinterval);
147148
externboolMtmIsRecoveredNode(intnodeId);
148-
externvoidMtmSerializeLockGraph(ByteBuffer*buf);
149149

150150
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp