Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdd1634f

Browse files
knizhnikkelvich
authored andcommitted
Send deadlock graph ysing logical messages
1 parent496a138 commitdd1634f

File tree

9 files changed

+186
-65
lines changed

9 files changed

+186
-65
lines changed

‎arbiter.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ static void MtmSendHeartbeat()
341341
timestamp_tnow=MtmGetSystemTime();
342342
msg.code=MSG_HEARTBEAT;
343343
msg.disabledNodeMask=Mtm->disabledNodeMask;
344+
msg.connectivityMask=Mtm->connectivityMask;
344345
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
345346
msg.node=MtmNodeId;
346347
msg.csn=now;
@@ -464,6 +465,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
464465
req.hdr.sxid=ShmemVariableCache->nextXid;
465466
req.hdr.csn=MtmGetCurrentTime();
466467
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
468+
req.hdr.connectivityMask=Mtm->connectivityMask;
467469
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
468470
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
469471
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
@@ -600,6 +602,7 @@ static void MtmAcceptOneConnection()
600602

601603
resp.code=MSG_STATUS;
602604
resp.disabledNodeMask=Mtm->disabledNodeMask;
605+
resp.connectivityMask=Mtm->connectivityMask;
603606
resp.dxid=HANDSHAKE_MAGIC;
604607
resp.sxid=ShmemVariableCache->nextXid;
605608
resp.csn=MtmGetCurrentTime();
@@ -882,6 +885,8 @@ static void MtmReceiver(Datum arg)
882885

883886
Assert(node>0&&node <=nNodes&&node!=MtmNodeId);
884887
Mtm->nodes[node-1].oldestSnapshot=msg->oldestSnapshot;
888+
Mtm->nodes[node-1].disabledNodeMask=msg->disabledNodeMask;
889+
Mtm->nodes[node-1].connectivityMask=msg->connectivityMask;
885890
Mtm->nodes[node-1].lastHeartbeat=MtmGetSystemTime();
886891

887892
switch (msg->code) {

‎bgwpool.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ static void BgwStartExtraWorker(BgwPool* pool)
130130
{
131131
if (pool->nWorkers<MtmMaxWorkers) {
132132
timestamp_tnow=MtmGetSystemTime();
133-
if (pool->lastDynamicWorkerStartTime+MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC<now) {
133+
/*if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now)*/
134+
{
134135
BackgroundWorkerworker;
135136
BackgroundWorkerHandle*handle;
136137
MemSet(&worker,0,sizeof(BackgroundWorker));

‎multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
7070
AS'MODULE_PATHNAME','mtm_inject_2pc_error'
7171
LANGUAGE C;
7272

73+
CREATEFUNCTIONmtm.check_deadlock(xidinteger) RETURNSboolean
74+
AS'MODULE_PATHNAME','mtm_check_deadlock'
75+
LANGUAGE C;
76+
7377
-- CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
7478

7579
-- CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));

‎multimaster.c

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include"access/twophase.h"
3939
#include"utils/guc.h"
4040
#include"utils/hsearch.h"
41+
#include"utils/timeout.h"
4142
#include"utils/tqual.h"
4243
#include"utils/array.h"
4344
#include"utils/builtins.h"
@@ -93,7 +94,7 @@ typedef enum
9394
#defineMTM_MAP_SIZE MTM_HASH_SIZE
9495
#defineMIN_WAIT_TIMEOUT 1000
9596
#defineMAX_WAIT_TIMEOUT 100000
96-
#defineMAX_WAIT_LOOPS 100
97+
#defineMAX_WAIT_LOOPS 100 // 1000000
9798
#defineSTATUS_POLL_DELAY USECS_PER_SEC
9899

99100
void_PG_init(void);
@@ -116,6 +117,7 @@ PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
116117
PG_FUNCTION_INFO_V1(mtm_make_table_local);
117118
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
118119
PG_FUNCTION_INFO_V1(mtm_inject_2pc_error);
120+
PG_FUNCTION_INFO_V1(mtm_check_deadlock);
119121

120122
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
121123
staticvoidMtmInitialize(void);
@@ -274,15 +276,15 @@ void MtmUnlock(void)
274276
Mtm->lastLockHolder=0;
275277
}
276278

277-
voidMtmLockNode(intnodeId)
279+
voidMtmLockNode(intnodeId,LWLockModemode)
278280
{
279-
Assert(nodeId>0&&nodeId <=Mtm->nAllNodes);
280-
LWLockAcquire((LWLockId)&Mtm->locks[nodeId],LW_EXCLUSIVE);
281+
Assert(nodeId>0&&nodeId <=MtmMaxNodes*2);
282+
LWLockAcquire((LWLockId)&Mtm->locks[nodeId],mode);
281283
}
282284

283285
voidMtmUnlockNode(intnodeId)
284286
{
285-
Assert(nodeId>0&&nodeId <=Mtm->nAllNodes);
287+
Assert(nodeId>0&&nodeId <=MtmMaxNodes*2);
286288
LWLockRelease((LWLockId)&Mtm->locks[nodeId]);
287289
}
288290

@@ -437,6 +439,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
437439
statictimestamp_ttotalSleepTime;
438440
statictimestamp_tmaxSleepTime;
439441
#endif
442+
timestamp_tstart=MtmGetSystemTime();
440443
timestamp_tdelay=MIN_WAIT_TIMEOUT;
441444
inti;
442445
Assert(xid!=InvalidTransactionId);
@@ -460,7 +463,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
460463
if (ts->csn>MtmTx.snapshot) {
461464
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
462465
MyProcPid,xid,ts->csn,MtmTx.snapshot);
463-
MtmUnlock();
466+
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
467+
elog(WARNING,"Backend %d waits for transaction %x status %ld usecs",MyProcPid,xid,MtmGetSystemTime()-start);
468+
}
469+
MtmUnlock();
464470
return true;
465471
}
466472
if (ts->status==TRANSACTION_STATUS_UNKNOWN)
@@ -499,6 +505,9 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
499505
MTM_LOG4("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld",
500506
MyProcPid,xid,ts->csn,invisible ?"rollbacked" :"committed",MtmTx.snapshot);
501507
MtmUnlock();
508+
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
509+
elog(WARNING,"Backend %d waits for %s transaction %x %ld usecs",MyProcPid,invisible ?"rollbacked" :"committed",xid,MtmGetSystemTime()-start);
510+
}
502511
returninvisible;
503512
}
504513
}
@@ -510,7 +519,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
510519
}
511520
}
512521
MtmUnlock();
513-
elog(ERROR,"Failed to get status of XID %d",xid);
522+
elog(ERROR,"Failed to get status of XID %d in %ld usec",xid,MtmGetSystemTime()-start);
514523
return true;
515524
}
516525

@@ -1091,6 +1100,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
10911100
msg.sxid=ts->xid;
10921101
msg.csn=ts->csn;
10931102
msg.disabledNodeMask=Mtm->disabledNodeMask;
1103+
msg.connectivityMask=Mtm->connectivityMask;
10941104
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
10951105
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
10961106

@@ -1118,6 +1128,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
11181128
MtmArbiterMessagemsg;
11191129
msg.code=MSG_POLL_REQUEST;
11201130
msg.disabledNodeMask=Mtm->disabledNodeMask;
1131+
msg.connectivityMask=Mtm->connectivityMask;
11211132
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
11221133
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
11231134

@@ -1703,8 +1714,6 @@ void MtmCheckQuorum(void)
17031714

17041715
voidMtmOnNodeDisconnect(intnodeId)
17051716
{
1706-
MtmTransState*ts;
1707-
17081717
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
17091718
{
17101719
/* Node is already disabled */
@@ -1733,11 +1742,13 @@ void MtmOnNodeDisconnect(int nodeId)
17331742
}
17341743

17351744
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
1736-
1745+
#if0
17371746
if (!MtmUseRaftable)
17381747
{
17391748
MtmLock(LW_EXCLUSIVE);
17401749
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1750+
MtmTransState*ts;
1751+
17411752
MtmDisableNode(nodeId);
17421753
MtmCheckQuorum();
17431754
/* Interrupt voting for active transaction and abort them */
@@ -1751,7 +1762,10 @@ void MtmOnNodeDisconnect(int nodeId)
17511762
}
17521763
}
17531764
MtmUnlock();
1754-
}else {
1765+
}
1766+
else
1767+
#endif
1768+
{
17551769
MtmRefreshClusterStatus(false,0);
17561770
}
17571771
}
@@ -1964,6 +1978,11 @@ static void MtmInitialize()
19641978
Mtm->freeQueue=NULL;
19651979
for (i=0;i<MtmNodes;i++) {
19661980
Mtm->nodes[i].oldestSnapshot=0;
1981+
Mtm->nodes[i].disabledNodeMask=0;
1982+
Mtm->nodes[i].connectivityMask=0;
1983+
Mtm->nodes[i].lockGraphUsed=0;
1984+
Mtm->nodes[i].lockGraphAllocated=0;
1985+
Mtm->nodes[i].lockGraphData=NULL;
19671986
Mtm->nodes[i].transDelay=0;
19681987
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
19691988
Mtm->nodes[i].con=MtmConnections[i];
@@ -2603,7 +2622,7 @@ _PG_init(void)
26032622
* resources in mtm_shmem_startup().
26042623
*/
26052624
RequestAddinShmemSpace(MTM_SHMEM_SIZE+MtmQueueSize);
2606-
RequestNamedLWLockTranche(MULTIMASTER_NAME,1+MtmMaxNodes);
2625+
RequestNamedLWLockTranche(MULTIMASTER_NAME,1+MtmMaxNodes*2);
26072626

26082627
BgwPoolStart(MtmWorkers,MtmPoolConstructor);
26092628

@@ -3260,7 +3279,7 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
32603279
for (i=0;i<Mtm->nAllNodes;i++)
32613280
{
32623281
size_tsize;
3263-
char*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, false);
3282+
char*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, false);
32643283
if (data) {
32653284
GlobalTransactionId*gtid= (GlobalTransactionId*)data;
32663285
GlobalTransactionId*last= (GlobalTransactionId*)(data+size);
@@ -3672,12 +3691,28 @@ static bool MtmProcessDDLCommand(char const* queryString)
36723691
}
36733692

36743693
MTM_LOG1("Sending utility: %s",queryWithContext);
3675-
LogLogicalMessage("MTM:GUC",queryWithContext,strlen(queryWithContext), true);
3694+
LogLogicalMessage("G",queryWithContext,strlen(queryWithContext)+1, true);
36763695

36773696
MtmTx.containsDML= true;
36783697
return false;
36793698
}
36803699

3700+
voidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize)
3701+
{
3702+
intallocated;
3703+
MtmLockNode(nodeId+MtmMaxNodes,LW_EXCLUSIVE);
3704+
allocated=Mtm->nodes[nodeId-1].lockGraphAllocated;
3705+
if (messageSize>allocated) {
3706+
allocated=Max(Max(MULTIMASTER_LOCK_BUF_INIT_SIZE,allocated*2),messageSize);
3707+
Mtm->nodes[nodeId-1].lockGraphData=ShmemAlloc(allocated);
3708+
Mtm->nodes[nodeId-1].lockGraphAllocated=allocated;
3709+
}
3710+
memcpy(Mtm->nodes[nodeId-1].lockGraphData,messageBody,messageSize);
3711+
Mtm->nodes[nodeId-1].lockGraphUsed=messageSize;
3712+
MtmUnlockNode(nodeId+MtmMaxNodes);
3713+
MTM_LOG1("Update deadlock graph for node %d size %d",nodeId,messageSize);
3714+
}
3715+
36813716
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
36823717
ProcessUtilityContextcontext,ParamListInfoparams,
36833718
DestReceiver*dest,char*completionTag)
@@ -3996,20 +4031,19 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
39964031
}
39974032

39984033
staticbool
3999-
MtmDetectGlobalDeadLock(PGPROC*proc)
4034+
MtmDetectGlobalDeadLockFortXid(TransactionIdxid)
40004035
{
4001-
ByteBufferbuf;
4002-
PGXACT*pgxact=&ProcGlobal->allPgXact[proc->pgprocno];
40034036
boolhasDeadlock= false;
4004-
4005-
if (TransactionIdIsValid(pgxact->xid)) {
4037+
if (TransactionIdIsValid(xid)) {
4038+
ByteBufferbuf;
40064039
MtmGraphgraph;
40074040
GlobalTransactionIdgtid;
40084041
inti;
40094042

40104043
ByteBufferAlloc(&buf);
40114044
EnumerateLocks(MtmSerializeLock,&buf);
40124045
RaftableSet(psprintf("lock-graph-%d",MtmNodeId),buf.data,buf.used, false);
4046+
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
40134047
MtmGraphInit(&graph);
40144048
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data,buf.used/sizeof(GlobalTransactionId));
40154049
ByteBufferFree(&buf);
@@ -4024,9 +4058,9 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
40244058
}
40254059
}
40264060
}
4027-
MtmGetGtid(pgxact->xid,&gtid);
4061+
MtmGetGtid(xid,&gtid);
40284062
hasDeadlock=MtmGraphFindLoop(&graph,&gtid);
4029-
elog(WARNING,"Distributed deadlock check for %u:%u = %d",gtid.node,gtid.xid,hasDeadlock);
4063+
elog(WARNING,"Distributed deadlock checkby backend %dfor %u:%u = %d",MyProcPid,gtid.node,gtid.xid,hasDeadlock);
40304064
if (!hasDeadlock) {
40314065
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
40324066
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
@@ -4037,8 +4071,27 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
40374071
hasDeadlock= true;
40384072
elog(WARNING,"Apply workers were blocked more than %d msec",
40394073
(int)USEC_TO_MSEC(MtmGetSystemTime()-lastPeekTime));
4074+
}else {
4075+
MTM_LOG1("Enable deadlock timeout in backend %d for transaction %d",MyProcPid,xid);
4076+
enable_timeout_after(DEADLOCK_TIMEOUT,DeadlockTimeout);
40404077
}
40414078
}
40424079
}
40434080
returnhasDeadlock;
40444081
}
4082+
4083+
staticbool
4084+
MtmDetectGlobalDeadLock(PGPROC*proc)
4085+
{
4086+
PGXACT*pgxact=&ProcGlobal->allPgXact[proc->pgprocno];
4087+
4088+
MTM_LOG1("Detect global deadlock for %d by backend %d",pgxact->xid,MyProcPid);
4089+
4090+
returnMtmDetectGlobalDeadLockFortXid(pgxact->xid);
4091+
}
4092+
4093+
Datummtm_check_deadlock(PG_FUNCTION_ARGS)
4094+
{
4095+
TransactionIdxid=PG_GETARG_INT32(0);
4096+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4097+
}

‎multimaster.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
5555
#defineMULTIMASTER_MAX_CTL_STR_SIZE 256
56+
#defineMULTIMASTER_LOCK_BUF_INIT_SIZE 4096
5657
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5758
#defineMULTIMASTER_ADMIN "mtm_admin"
5859

@@ -138,8 +139,9 @@ typedef struct
138139
TransactionIdsxid;/* Transaction ID at sender node */
139140
XidStatusstatus;/* Transaction status */
140141
csn_tcsn;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
141-
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
142142
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
143+
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
144+
nodemask_tconnectivityMask;/* Connectivity bittmask at the sender of message */
143145
pgid_tgid;/* Global transaction identifier */
144146
}MtmArbiterMessage;
145147

@@ -179,13 +181,18 @@ typedef struct
179181
timestamp_treceiverStartTime;
180182
timestamp_tsenderStartTime;
181183
timestamp_tlastHeartbeat;
184+
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes received from this node */
185+
nodemask_tconnectivityMask;/* Connectivity mask at this node */
182186
intsenderPid;
183187
intreceiverPid;
184188
XLogRecPtrflushPos;
185-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
189+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
186190
XLogRecPtrrestartLsn;
187191
RepOriginIdoriginId;
188192
inttimeline;
193+
void*lockGraphData;
194+
intlockGraphAllocated;
195+
intlockGraphUsed;
189196
}MtmNodeInfo;
190197

191198
typedefstructMtmTransState
@@ -223,7 +230,7 @@ typedef struct
223230
MtmNodeStatusstatus;/* Status of this node */
224231
intrecoverySlot;/* NodeId of recovery slot or 0 if none */
225232
volatileslock_tspinlock;/* spinlock used to protect access to hash table */
226-
PGSemaphoreDatasendSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
233+
PGSemaphoreDatasendSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
227234
LWLockPadded*locks;/* multimaster lock tranche */
228235
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
229236
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes */
@@ -310,7 +317,7 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
310317
externvoidMtmBroadcastPollMessage(MtmTransState*ts);
311318
externvoidMtmLock(LWLockModemode);
312319
externvoidMtmUnlock(void);
313-
externvoidMtmLockNode(intnodeId);
320+
externvoidMtmLockNode(intnodeId,LWLockModemode);
314321
externvoidMtmUnlockNode(intnodeId);
315322
externvoidMtmDropNode(intnodeId,booldropSlot);
316323
externvoidMtmRecoverNode(intnodeId);
@@ -340,6 +347,7 @@ extern XLogRecPtr MtmGetFlushPosition(int nodeId);
340347
externboolMtmWatchdog(timestamp_tnow);
341348
externvoidMtmCheckHeartbeat(void);
342349
externvoidMtmResetTransaction(void);
350+
externvoidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize);
343351
externPGconn*PQconnectdb_safe(constchar*conninfo);
344352

345353

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp