Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd5a5f87

Browse files
committed
buggy
1 parente681d3f commitd5a5f87

File tree

10 files changed

+211
-219
lines changed

10 files changed

+211
-219
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,7 +991,7 @@ static void MtmReceiver(Datum arg)
991991
default:
992992
break;
993993
}
994-
if (BIT_CHECK(msg->disabledNodeMask,node-1)) {
994+
if (BIT_CHECK(msg->disabledNodeMask,node-1)||BIT_CHECK(Mtm->disabledNodeMask,node-1)) {
995995
MTM_ELOG(WARNING,"Ignore message from dead node %d\n",node);
996996
continue;
997997
}

‎contrib/mmts/multimaster.c

Lines changed: 81 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ char const* const MtmNodeStatusMnem[] =
221221
"Recovery",
222222
"Recovered",
223223
"InMinor",
224+
"OutOfClique",
224225
"OutOfService"
225226
};
226227

@@ -366,6 +367,7 @@ void MtmLock(LWLockMode mode)
366367
if (mode==LW_EXCLUSIVE) {
367368
Assert(MtmLockCount==0);
368369
Mtm->lastLockHolder=MyProcPid;
370+
Assert(MyProcPid);
369371
MtmLockCount=1;
370372
}
371373
}
@@ -1145,7 +1147,6 @@ bool MtmWatchdog(timestamp_t now)
11451147
MTM_LOG1("[STATE] Node %i: Disconnect due to heartbeat timeout (%d msec)",
11461148
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
11471149
MtmOnNodeDisconnect(i+1);
1148-
MtmDisableNode(i+1);
11491150
allAlive= false;
11501151
}
11511152
}
@@ -1166,8 +1167,11 @@ void MtmPrecommitTransaction(char const* gid)
11661167
MTM_ELOG(WARNING,"MtmPrecommitTransaction: transaction '%s' is not found",gid);
11671168
}else {
11681169
MtmTransState*ts=tm->state;
1169-
Assert(ts!=NULL);
1170-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
1170+
// Assert(ts != NULL);
1171+
if (ts==NULL) {
1172+
MTM_ELOG(WARNING,"MtmPrecommitTransaction: transaction '%s' is not yet prepared, status %s",gid,MtmTxnStatusMnem[tm->status]);
1173+
MtmUnlock();
1174+
}elseif (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
11711175
ts->status=TRANSACTION_STATUS_UNKNOWN;
11721176
ts->csn=MtmAssignCSN();
11731177
MtmAdjustSubtransactions(ts);
@@ -1489,6 +1493,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
14891493
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
14901494
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
14911495
{
1496+
MtmUnlock();
14921497
MTM_ELOG(ERROR,"Attempt to commit %s transaction %s (%llu)",
14931498
MtmTxnStatusMnem[ts->status],ts->gid, (long64)ts->xid);
14941499
}
@@ -2014,15 +2019,19 @@ static int64 MtmGetSlotLag(int nodeId)
20142019
*/
20152020
boolMtmIsRecoveredNode(intnodeId)
20162021
{
2017-
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
2018-
if (!MtmIsRecoverySession) {
2019-
MTM_ELOG(ERROR,"Node %d is marked as disabled but is not in recovery mode",nodeId);
2020-
}
2021-
return true;
2022-
}else {
2023-
MtmIsRecoverySession= false;/* recovery is completed */
2024-
return false;
2025-
}
2022+
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
2023+
Assert(!MtmIsRecoverySession);
2024+
2025+
returnBIT_CHECK(Mtm->disabledNodeMask,nodeId-1)&&MtmIsRecoverySession;
2026+
// if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2027+
// if (!MtmIsRecoverySession) {
2028+
// MTM_ELOG(WARNING, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2029+
// }
2030+
// return true;
2031+
// } else {
2032+
// MtmIsRecoverySession = false; /* recovery is completed */
2033+
// return false;
2034+
// }
20262035
}
20272036

20282037
/*
@@ -2048,7 +2057,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20482057
*/
20492058
MTM_LOG1("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d",
20502059
nodeId,slotLSN,walLSN,Mtm->nActiveTransactions);
2051-
BIT_SET(Mtm->originLockNodeMask,nodeId-1);
2060+
BIT_SET(Mtm->originLockNodeMask,nodeId-1);// XXXX: log that
20522061
}else {
20532062
MTM_LOG2("Continue recovery of node %d, slot position %llx, WAL position %llx,"
20542063
" WAL sender position %llx, lockers %llx, active transactions %d",nodeId,slotLSN,
@@ -2070,6 +2079,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20702079
if (MtmIsRecoveredNode(nodeId)&&Mtm->nActiveTransactions==0) {
20712080
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20722081
caughtUp= true;
2082+
MtmIsRecoverySession= false;
20732083
}
20742084
MtmUnlock();
20752085
returncaughtUp;
@@ -2087,6 +2097,7 @@ MtmLockCluster(void)
20872097
}
20882098
MtmLock(LW_EXCLUSIVE);
20892099
if (BIT_CHECK(Mtm->originLockNodeMask,MtmNodeId-1)) {
2100+
MtmUnlock();
20902101
elog(ERROR,"There is already pending exclusive lock");
20912102
}
20922103
BIT_SET(Mtm->originLockNodeMask,MtmNodeId-1);
@@ -2339,6 +2350,7 @@ static void MtmInitialize()
23392350
Mtm->nLiveNodes=0;//MtmNodes;
23402351
Mtm->nAllNodes=MtmNodes;
23412352
Mtm->disabledNodeMask=7;//XXXX
2353+
Mtm->clique=7;// XXXX
23422354
Mtm->stalledNodeMask=0;
23432355
Mtm->stoppedNodeMask=0;
23442356
Mtm->deadNodeMask=0;
@@ -2371,7 +2383,7 @@ static void MtmInitialize()
23712383
for (i=0;i<MtmNodes;i++) {
23722384
Mtm->nodes[i].oldestSnapshot=0;
23732385
Mtm->nodes[i].disabledNodeMask=0;
2374-
Mtm->nodes[i].connectivityMask=7;
2386+
Mtm->nodes[i].connectivityMask=7;// XXXX
23752387
Mtm->nodes[i].lockGraphUsed=0;
23762388
Mtm->nodes[i].lockGraphAllocated=0;
23772389
Mtm->nodes[i].lockGraphData=NULL;
@@ -3214,17 +3226,56 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32143226
Mtm->preparedTransactionsLoaded= true;
32153227
}
32163228

3217-
while (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1)||
3218-
BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1))
3229+
// while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3230+
// BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3231+
// !BIT_CHECK(Mtm->clique, nodeId - 1) ||
3232+
// !BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
3233+
// {
3234+
// if (*shutdown)
3235+
// {
3236+
// MtmUnlock();
3237+
// return REPLMODE_EXIT;
3238+
// }
3239+
3240+
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3241+
// (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3242+
// {
3243+
// /* Lock on us */
3244+
// Mtm->recoverySlot = nodeId;
3245+
// MtmPollStatusOfPreparedTransactions();
3246+
// MtmUnlock();
3247+
// return REPLMODE_RECOVERY;
3248+
// }
3249+
3250+
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3251+
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3252+
3253+
// MtmUnlock();
3254+
// /* delay opening of other slots until recovery is completed */
3255+
// MtmSleep(STATUS_POLL_DELAY);
3256+
// MtmLock(LW_EXCLUSIVE);
3257+
// }
3258+
3259+
// MtmUnlock();
3260+
3261+
// return REPLMODE_RECOVERED;
3262+
3263+
/* Await until node is connected and both receiver and sender are in clique */
3264+
while (BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1)||
3265+
!BIT_CHECK(Mtm->clique,nodeId-1)||
3266+
!BIT_CHECK(Mtm->clique,MtmNodeId-1) )
32193267
{
3268+
MtmUnlock();
32203269
if (*shutdown)
3221-
{
3222-
MtmUnlock();
32233270
returnREPLMODE_EXIT;
3224-
}
3271+
MtmSleep(STATUS_POLL_DELAY);
3272+
MtmLock(LW_EXCLUSIVE);
3273+
}
32253274

3226-
if ((Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)&&
3227-
(!BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1)))
3275+
if (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
3276+
{
3277+
/* Ok, then start recovery by luckiest walreceiver */
3278+
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)
32283279
{
32293280
/* Lock on us */
32303281
Mtm->recoverySlot=nodeId;
@@ -3233,87 +3284,19 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32333284
returnREPLMODE_RECOVERY;
32343285
}
32353286

3236-
MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3237-
nodeId,Mtm->recoverySlot,Mtm->donorNodeId,SELF_CONNECTIVITY_MASK,Mtm->disabledNodeMask);
3238-
3239-
MtmUnlock();
3240-
/* delay opening of other slots until recovery is completed */
3241-
MtmSleep(STATUS_POLL_DELAY);
3242-
MtmLock(LW_EXCLUSIVE);
3287+
/* And force less lucky walreceivers wait until recovery is completed */
3288+
while (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
3289+
{
3290+
MtmUnlock();
3291+
if (*shutdown)
3292+
returnREPLMODE_EXIT;
3293+
MtmSleep(STATUS_POLL_DELAY);
3294+
MtmLock(LW_EXCLUSIVE);
3295+
}
32433296
}
32443297

32453298
MtmUnlock();
3246-
32473299
returnREPLMODE_RECOVERED;
3248-
3249-
3250-
3251-
3252-
3253-
// while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3254-
// || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3255-
// // while (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3256-
// {
3257-
// if (*shutdown)
3258-
// {
3259-
// MtmUnlock();
3260-
// return REPLMODE_EXIT;
3261-
// }
3262-
// // /* We are not interested in receiving any deteriorated logical messages from recovered node, so recreate slot */
3263-
// // if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
3264-
// // mode = REPLMODE_CREATE_NEW;
3265-
// // }
3266-
// // MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
3267-
3268-
// if (Mtm->status == MTM_RECOVERY) {
3269-
// mode = REPLMODE_RECOVERED;
3270-
// /* Choose node for recovery if
3271-
// * 1. It is not chosen yet or the same node was chosen before
3272-
// * 2. It is donor node or there is no donor node
3273-
// * 3. Connections with all other live nodes were established
3274-
// */
3275-
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3276-
// && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId)
3277-
// && (SELF_CONNECTIVITY_MASK & ~Mtm->disabledNodeMask) == 0)
3278-
// {
3279-
// /* Choose for recovery first available slot or slot of donor node (if any) */
3280-
// if (Mtm->nAllNodes >= 3) {
3281-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d restartLSNs={%llx, %llx, %llx}",
3282-
// MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3283-
// } else {
3284-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3285-
// }
3286-
// Mtm->recoverySlot = nodeId;
3287-
// // Mtm->nReceivers = 0;
3288-
// // Mtm->nSenders = 0;
3289-
// // Mtm->recoveryCount += 1;
3290-
// // Mtm->pglogicalReceiverMask = 0;
3291-
// // Mtm->pglogicalSenderMask = 0;
3292-
// MtmPollStatusOfPreparedTransactions();
3293-
// MtmUnlock();
3294-
// return REPLMODE_RECOVERY;
3295-
// }
3296-
// }
3297-
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3298-
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3299-
// MtmUnlock();
3300-
// /* delay opening of other slots until recovery is completed */
3301-
// MtmSleep(STATUS_POLL_DELAY);
3302-
// MtmLock(LW_EXCLUSIVE);
3303-
// }
3304-
// if (Mtm->status == MTM_RECOVERED) {
3305-
// mode = REPLMODE_RECOVERED;
3306-
// }
3307-
// // if (mode == REPLMODE_RECOVERED) {
3308-
// // MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
3309-
// // } else if (mode == REPLMODE_CREATE_NEW) {
3310-
// // MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
3311-
// // } else {
3312-
// // MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
3313-
// // }
3314-
// BIT_SET(Mtm->reconnectMask, nodeId-1); /* arbiter should try to reestablish connection with this node */
3315-
// MtmUnlock();
3316-
// return mode;
33173300
}
33183301

33193302
staticboolMtmIsBroadcast()

‎contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ typedef enum
108108
/* Identifier of global transaction */
109109
typedefstruct
110110
{
111-
intnode;/*Zero basedindex of node initiating transaction */
111+
intnode;/*One basedid of node initiating transaction */
112112
TransactionIdxid;/* Transaction ID at this node */
113113
}GlobalTransactionId;
114114

@@ -137,6 +137,7 @@ typedef enum
137137
MTM_RECOVERY,/* Node is in recovery process */
138138
MTM_RECOVERED,/* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
139139
MTM_IN_MINORITY,/* Node is out of quorum */
140+
MTM_OUT_OF_CLIQUE,/* Node is out of cluster by clique detector */
140141
MTM_OUT_OF_SERVICE/* Node is not available to to critical, non-recoverable error */
141142
}MtmNodeStatus;
142143

@@ -288,6 +289,7 @@ typedef struct
288289
LWLockPadded*locks;/* multimaster lock tranche */
289290
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
290291
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes */
292+
nodemask_tclique;/* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
291293
nodemask_tdeadNodeMask;/* Bitmask of nodes considered as dead by referee */
292294
nodemask_trecoveredNodeMask;/* Bitmask of nodes recoverd after been reported as dead by referee */
293295
nodemask_tstalledNodeMask;/* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
@@ -415,7 +417,6 @@ extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* co
415417
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
416418
externboolMtmRecoveryCaughtUp(intnodeId,lsn_twalEndPtr);
417419
externvoidMtmCheckRecoveryCaughtUp(intnodeId,lsn_tslotLSN);
418-
externvoidMtmRecoveryCompleted(void);
419420
externvoidMtmMakeTableLocal(charconst*schema,charconst*name);
420421
externvoidMtmHandleApplyError(void);
421422
externvoidMtmUpdateLsnMapping(intnodeId,lsn_tendLsn);

‎contrib/mmts/pglogical_apply.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ process_remote_commit(StringInfo in)
658658
csn_tcsn;
659659
lsn_tend_lsn;
660660
lsn_torigin_lsn;
661+
lsn_tcommit_lsn;
661662
intorigin_node;
662663
chargid[MULTIMASTER_MAX_GID_SIZE];
663664

@@ -668,7 +669,7 @@ process_remote_commit(StringInfo in)
668669
MtmReplicationNodeId=pq_getmsgbyte(in);
669670

670671
/* read fields */
671-
pq_getmsgint64(in);/* commit_lsn */
672+
commit_lsn=pq_getmsgint64(in);/* commit_lsn */
672673
end_lsn=pq_getmsgint64(in);/* end_lsn */
673674
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
674675

@@ -692,7 +693,7 @@ process_remote_commit(StringInfo in)
692693
}
693694
casePGLOGICAL_COMMIT:
694695
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
696+
MTM_LOG1("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
696697
if (IsTransactionState()) {
697698
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698699
MtmBeginSession(origin_node);
@@ -969,10 +970,10 @@ process_remote_update(StringInfo s, Relation rel)
969970
{
970971
StringInfoDatao;
971972
initStringInfo(&o);
972-
tuple_to_stringinfo(&o,RelationGetDescr(rel),oldslot->tts_tuple);
973+
tuple_to_stringinfo(&o,RelationGetDescr(rel),oldslot->tts_tuple, false);
973974
appendStringInfo(&o," to");
974-
tuple_to_stringinfo(&o,RelationGetDescr(rel),remote_tuple);
975-
MTM_LOG1(DEBUG1,"UPDATE:%s",o.data);
975+
tuple_to_stringinfo(&o,RelationGetDescr(rel),remote_tuple, false);
976+
MTM_LOG1("%lu:UPDATE:%s",GetCurrentTransactionId(),o.data);
976977
resetStringInfo(&o);
977978
}
978979
#endif
@@ -1191,7 +1192,6 @@ void MtmExecutor(void* work, size_t size)
11911192
}
11921193
case'Z':
11931194
{
1194-
// MtmRecoveryCompleted();
11951195
MtmStateProcessEvent(MTM_RECOVERY_FINISH2);
11961196
inside_transaction= false;
11971197
break;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp