Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7fa827b

Browse files
committed
Send deadlock graph ysing logical messages
1 parent1b8929d commit7fa827b

File tree

10 files changed

+187
-60
lines changed

10 files changed

+187
-60
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ static void MtmSendHeartbeat()
340340
timestamp_tnow=MtmGetSystemTime();
341341
msg.code=MSG_HEARTBEAT;
342342
msg.disabledNodeMask=Mtm->disabledNodeMask;
343+
msg.connectivityMask=Mtm->connectivityMask;
343344
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
344345
msg.node=MtmNodeId;
345346
msg.csn=now;
@@ -463,6 +464,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
463464
req.hdr.sxid=ShmemVariableCache->nextXid;
464465
req.hdr.csn=MtmGetCurrentTime();
465466
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
467+
req.hdr.connectivityMask=Mtm->connectivityMask;
466468
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
467469
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
468470
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
@@ -599,6 +601,7 @@ static void MtmAcceptOneConnection()
599601

600602
resp.code=MSG_STATUS;
601603
resp.disabledNodeMask=Mtm->disabledNodeMask;
604+
resp.connectivityMask=Mtm->connectivityMask;
602605
resp.dxid=HANDSHAKE_MAGIC;
603606
resp.sxid=ShmemVariableCache->nextXid;
604607
resp.csn=MtmGetCurrentTime();
@@ -881,6 +884,8 @@ static void MtmReceiver(Datum arg)
881884

882885
Assert(node>0&&node <=nNodes&&node!=MtmNodeId);
883886
Mtm->nodes[node-1].oldestSnapshot=msg->oldestSnapshot;
887+
Mtm->nodes[node-1].disabledNodeMask=msg->disabledNodeMask;
888+
Mtm->nodes[node-1].connectivityMask=msg->connectivityMask;
884889
Mtm->nodes[node-1].lastHeartbeat=MtmGetSystemTime();
885890

886891
switch (msg->code) {

‎contrib/mmts/bgwpool.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ static void BgwStartExtraWorker(BgwPool* pool)
130130
{
131131
if (pool->nWorkers<MtmMaxWorkers) {
132132
timestamp_tnow=MtmGetSystemTime();
133-
if (pool->lastDynamicWorkerStartTime+MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC<now) {
133+
/*if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now)*/
134+
{
134135
BackgroundWorkerworker;
135136
BackgroundWorkerHandle*handle;
136137
MemSet(&worker,0,sizeof(BackgroundWorker));

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
7070
AS'MODULE_PATHNAME','mtm_inject_2pc_error'
7171
LANGUAGE C;
7272

73+
CREATEFUNCTIONmtm.check_deadlock(xidinteger) RETURNSboolean
74+
AS'MODULE_PATHNAME','mtm_check_deadlock'
75+
LANGUAGE C;
76+
7377
-- CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
7478

7579
-- CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));

‎contrib/mmts/multimaster.c‎

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include"access/twophase.h"
3939
#include"utils/guc.h"
4040
#include"utils/hsearch.h"
41+
#include"utils/timeout.h"
4142
#include"utils/tqual.h"
4243
#include"utils/array.h"
4344
#include"utils/builtins.h"
@@ -94,7 +95,7 @@ typedef enum
9495
#defineMTM_MAP_SIZE MTM_HASH_SIZE
9596
#defineMIN_WAIT_TIMEOUT 1000
9697
#defineMAX_WAIT_TIMEOUT 100000
97-
#defineMAX_WAIT_LOOPS 100
98+
#defineMAX_WAIT_LOOPS 100 // 1000000
9899
#defineSTATUS_POLL_DELAY USECS_PER_SEC
99100

100101
void_PG_init(void);
@@ -117,6 +118,7 @@ PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
117118
PG_FUNCTION_INFO_V1(mtm_make_table_local);
118119
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
119120
PG_FUNCTION_INFO_V1(mtm_inject_2pc_error);
121+
PG_FUNCTION_INFO_V1(mtm_check_deadlock);
120122

121123
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
122124
staticvoidMtmInitialize(void);
@@ -274,15 +276,15 @@ void MtmUnlock(void)
274276
Mtm->lastLockHolder=0;
275277
}
276278

277-
voidMtmLockNode(intnodeId)
279+
voidMtmLockNode(intnodeId,LWLockModemode)
278280
{
279-
Assert(nodeId>0&&nodeId <=Mtm->nAllNodes);
280-
LWLockAcquire((LWLockId)&Mtm->locks[nodeId],LW_EXCLUSIVE);
281+
Assert(nodeId>0&&nodeId <=MtmMaxNodes*2);
282+
LWLockAcquire((LWLockId)&Mtm->locks[nodeId],mode);
281283
}
282284

283285
voidMtmUnlockNode(intnodeId)
284286
{
285-
Assert(nodeId>0&&nodeId <=Mtm->nAllNodes);
287+
Assert(nodeId>0&&nodeId <=MtmMaxNodes*2);
286288
LWLockRelease((LWLockId)&Mtm->locks[nodeId]);
287289
}
288290

@@ -437,6 +439,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
437439
statictimestamp_ttotalSleepTime;
438440
statictimestamp_tmaxSleepTime;
439441
#endif
442+
timestamp_tstart=MtmGetSystemTime();
440443
timestamp_tdelay=MIN_WAIT_TIMEOUT;
441444
inti;
442445
Assert(xid!=InvalidTransactionId);
@@ -460,7 +463,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
460463
if (ts->csn>MtmTx.snapshot) {
461464
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
462465
MyProcPid,xid,ts->csn,MtmTx.snapshot);
463-
MtmUnlock();
466+
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
467+
elog(WARNING,"Backend %d waits for transaction %x status %ld usecs",MyProcPid,xid,MtmGetSystemTime()-start);
468+
}
469+
MtmUnlock();
464470
return true;
465471
}
466472
if (ts->status==TRANSACTION_STATUS_UNKNOWN)
@@ -499,6 +505,9 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
499505
MTM_LOG4("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld",
500506
MyProcPid,xid,ts->csn,invisible ?"rollbacked" :"committed",MtmTx.snapshot);
501507
MtmUnlock();
508+
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
509+
elog(WARNING,"Backend %d waits for %s transaction %x %ld usecs",MyProcPid,invisible ?"rollbacked" :"committed",xid,MtmGetSystemTime()-start);
510+
}
502511
returninvisible;
503512
}
504513
}
@@ -510,7 +519,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
510519
}
511520
}
512521
MtmUnlock();
513-
elog(ERROR,"Failed to get status of XID %d",xid);
522+
elog(ERROR,"Failed to get status of XID %d in %ld usec",xid,MtmGetSystemTime()-start);
514523
return true;
515524
}
516525

@@ -1091,6 +1100,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
10911100
msg.sxid=ts->xid;
10921101
msg.csn=ts->csn;
10931102
msg.disabledNodeMask=Mtm->disabledNodeMask;
1103+
msg.connectivityMask=Mtm->connectivityMask;
10941104
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
10951105
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
10961106

@@ -1118,6 +1128,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
11181128
MtmArbiterMessagemsg;
11191129
msg.code=MSG_POLL_REQUEST;
11201130
msg.disabledNodeMask=Mtm->disabledNodeMask;
1131+
msg.connectivityMask=Mtm->connectivityMask;
11211132
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
11221133
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
11231134

@@ -1681,8 +1692,6 @@ void MtmCheckQuorum(void)
16811692

16821693
voidMtmOnNodeDisconnect(intnodeId)
16831694
{
1684-
MtmTransState*ts;
1685-
16861695
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
16871696
{
16881697
/* Node is already disabled */
@@ -1711,11 +1720,13 @@ void MtmOnNodeDisconnect(int nodeId)
17111720
}
17121721

17131722
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
1714-
1723+
#if0
17151724
if (!MtmUseRaftable)
17161725
{
17171726
MtmLock(LW_EXCLUSIVE);
17181727
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1728+
MtmTransState*ts;
1729+
17191730
MtmDisableNode(nodeId);
17201731
MtmCheckQuorum();
17211732
/* Interrupt voting for active transaction and abort them */
@@ -1729,7 +1740,10 @@ void MtmOnNodeDisconnect(int nodeId)
17291740
}
17301741
}
17311742
MtmUnlock();
1732-
}else {
1743+
}
1744+
else
1745+
#endif
1746+
{
17331747
MtmRefreshClusterStatus(false,0);
17341748
}
17351749
}
@@ -1942,6 +1956,11 @@ static void MtmInitialize()
19421956
Mtm->freeQueue=NULL;
19431957
for (i=0;i<MtmNodes;i++) {
19441958
Mtm->nodes[i].oldestSnapshot=0;
1959+
Mtm->nodes[i].disabledNodeMask=0;
1960+
Mtm->nodes[i].connectivityMask=0;
1961+
Mtm->nodes[i].lockGraphUsed=0;
1962+
Mtm->nodes[i].lockGraphAllocated=0;
1963+
Mtm->nodes[i].lockGraphData=NULL;
19451964
Mtm->nodes[i].transDelay=0;
19461965
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
19471966
Mtm->nodes[i].con=MtmConnections[i];
@@ -2581,7 +2600,7 @@ _PG_init(void)
25812600
* resources in mtm_shmem_startup().
25822601
*/
25832602
RequestAddinShmemSpace(MTM_SHMEM_SIZE+MtmQueueSize);
2584-
RequestNamedLWLockTranche(MULTIMASTER_NAME,1+MtmMaxNodes);
2603+
RequestNamedLWLockTranche(MULTIMASTER_NAME,1+MtmMaxNodes*2);
25852604

25862605
BgwPoolStart(MtmWorkers,MtmPoolConstructor);
25872606

@@ -3238,7 +3257,7 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
32383257
for (i=0;i<Mtm->nAllNodes;i++)
32393258
{
32403259
size_tsize;
3241-
char*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, false);
3260+
char*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, false);
32423261
if (data) {
32433262
GlobalTransactionId*gtid= (GlobalTransactionId*)data;
32443263
GlobalTransactionId*last= (GlobalTransactionId*)(data+size);
@@ -3630,12 +3649,28 @@ static bool MtmProcessDDLCommand(char const* queryString)
36303649
}
36313650

36323651
MTM_LOG1("Sending utility: %s",queryWithContext);
3633-
LogLogicalMessage("MTM:GUC",queryWithContext,strlen(queryWithContext), true);
3652+
LogLogicalMessage("G",queryWithContext,strlen(queryWithContext)+1, true);
36343653

36353654
MtmTx.containsDML= true;
36363655
return false;
36373656
}
36383657

3658+
voidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize)
3659+
{
3660+
intallocated;
3661+
MtmLockNode(nodeId+MtmMaxNodes,LW_EXCLUSIVE);
3662+
allocated=Mtm->nodes[nodeId-1].lockGraphAllocated;
3663+
if (messageSize>allocated) {
3664+
allocated=Max(Max(MULTIMASTER_LOCK_BUF_INIT_SIZE,allocated*2),messageSize);
3665+
Mtm->nodes[nodeId-1].lockGraphData=ShmemAlloc(allocated);
3666+
Mtm->nodes[nodeId-1].lockGraphAllocated=allocated;
3667+
}
3668+
memcpy(Mtm->nodes[nodeId-1].lockGraphData,messageBody,messageSize);
3669+
Mtm->nodes[nodeId-1].lockGraphUsed=messageSize;
3670+
MtmUnlockNode(nodeId+MtmMaxNodes);
3671+
MTM_LOG1("Update deadlock graph for node %d size %d",nodeId,messageSize);
3672+
}
3673+
36393674
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
36403675
ProcessUtilityContextcontext,ParamListInfoparams,
36413676
DestReceiver*dest,char*completionTag)
@@ -3953,20 +3988,19 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
39533988
}
39543989

39553990
staticbool
3956-
MtmDetectGlobalDeadLock(PGPROC*proc)
3991+
MtmDetectGlobalDeadLockFortXid(TransactionIdxid)
39573992
{
3958-
ByteBufferbuf;
3959-
PGXACT*pgxact=&ProcGlobal->allPgXact[proc->pgprocno];
39603993
boolhasDeadlock= false;
3961-
3962-
if (TransactionIdIsValid(pgxact->xid)) {
3994+
if (TransactionIdIsValid(xid)) {
3995+
ByteBufferbuf;
39633996
MtmGraphgraph;
39643997
GlobalTransactionIdgtid;
39653998
inti;
39663999

39674000
ByteBufferAlloc(&buf);
39684001
EnumerateLocks(MtmSerializeLock,&buf);
39694002
RaftableSet(psprintf("lock-graph-%d",MtmNodeId),buf.data,buf.used, false);
4003+
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
39704004
MtmGraphInit(&graph);
39714005
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data,buf.used/sizeof(GlobalTransactionId));
39724006
ByteBufferFree(&buf);
@@ -3981,9 +4015,9 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
39814015
}
39824016
}
39834017
}
3984-
MtmGetGtid(pgxact->xid,&gtid);
4018+
MtmGetGtid(xid,&gtid);
39854019
hasDeadlock=MtmGraphFindLoop(&graph,&gtid);
3986-
elog(WARNING,"Distributed deadlock check for %u:%u = %d",gtid.node,gtid.xid,hasDeadlock);
4020+
elog(WARNING,"Distributed deadlock checkby backend %dfor %u:%u = %d",MyProcPid,gtid.node,gtid.xid,hasDeadlock);
39874021
if (!hasDeadlock) {
39884022
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
39894023
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
@@ -3994,8 +4028,27 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
39944028
hasDeadlock= true;
39954029
elog(WARNING,"Apply workers were blocked more than %d msec",
39964030
(int)USEC_TO_MSEC(MtmGetSystemTime()-lastPeekTime));
4031+
}else {
4032+
MTM_LOG1("Enable deadlock timeout in backend %d for transaction %d",MyProcPid,xid);
4033+
enable_timeout_after(DEADLOCK_TIMEOUT,DeadlockTimeout);
39974034
}
39984035
}
39994036
}
40004037
returnhasDeadlock;
40014038
}
4039+
4040+
staticbool
4041+
MtmDetectGlobalDeadLock(PGPROC*proc)
4042+
{
4043+
PGXACT*pgxact=&ProcGlobal->allPgXact[proc->pgprocno];
4044+
4045+
MTM_LOG1("Detect global deadlock for %d by backend %d",pgxact->xid,MyProcPid);
4046+
4047+
returnMtmDetectGlobalDeadLockFortXid(pgxact->xid);
4048+
}
4049+
4050+
Datummtm_check_deadlock(PG_FUNCTION_ARGS)
4051+
{
4052+
TransactionIdxid=PG_GETARG_INT32(0);
4053+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4054+
}

‎contrib/mmts/multimaster.h‎

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
5555
#defineMULTIMASTER_MAX_CTL_STR_SIZE 256
56+
#defineMULTIMASTER_LOCK_BUF_INIT_SIZE 4096
5657
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5758
#defineMULTIMASTER_ADMIN "mtm_admin"
5859

@@ -138,8 +139,9 @@ typedef struct
138139
TransactionIdsxid;/* Transaction ID at sender node */
139140
XidStatusstatus;/* Transaction status */
140141
csn_tcsn;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
141-
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
142142
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
143+
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
144+
nodemask_tconnectivityMask;/* Connectivity bittmask at the sender of message */
143145
pgid_tgid;/* Global transaction identifier */
144146
}MtmArbiterMessage;
145147

@@ -179,13 +181,18 @@ typedef struct
179181
timestamp_treceiverStartTime;
180182
timestamp_tsenderStartTime;
181183
timestamp_tlastHeartbeat;
184+
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes received from this node */
185+
nodemask_tconnectivityMask;/* Connectivity mask at this node */
182186
intsenderPid;
183187
intreceiverPid;
184188
XLogRecPtrflushPos;
185-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
189+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
186190
XLogRecPtrrestartLsn;
187191
RepOriginIdoriginId;
188192
inttimeline;
193+
void*lockGraphData;
194+
intlockGraphAllocated;
195+
intlockGraphUsed;
189196
}MtmNodeInfo;
190197

191198
typedefstructMtmTransState
@@ -223,7 +230,7 @@ typedef struct
223230
MtmNodeStatusstatus;/* Status of this node */
224231
intrecoverySlot;/* NodeId of recovery slot or 0 if none */
225232
volatileslock_tspinlock;/* spinlock used to protect access to hash table */
226-
PGSemaphoreDatasendSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
233+
PGSemaphoreDatasendSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
227234
LWLockPadded*locks;/* multimaster lock tranche */
228235
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
229236
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes */
@@ -310,7 +317,7 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
310317
externvoidMtmBroadcastPollMessage(MtmTransState*ts);
311318
externvoidMtmLock(LWLockModemode);
312319
externvoidMtmUnlock(void);
313-
externvoidMtmLockNode(intnodeId);
320+
externvoidMtmLockNode(intnodeId,LWLockModemode);
314321
externvoidMtmUnlockNode(intnodeId);
315322
externvoidMtmDropNode(intnodeId,booldropSlot);
316323
externvoidMtmRecoverNode(intnodeId);
@@ -340,6 +347,7 @@ extern XLogRecPtr MtmGetFlushPosition(int nodeId);
340347
externboolMtmWatchdog(timestamp_tnow);
341348
externvoidMtmCheckHeartbeat(void);
342349
externvoidMtmResetTransaction(void);
350+
externvoidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize);
343351
externPGconn*PQconnectdb_safe(constchar*conninfo);
344352

345353

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp