Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb9226d0

Browse files
committed
buggy
1 parentbc37ba2 commitb9226d0

File tree

10 files changed

+207
-217
lines changed

10 files changed

+207
-217
lines changed

‎arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,7 @@ static void MtmReceiver(Datum arg)
999999
default:
10001000
break;
10011001
}
1002-
if (BIT_CHECK(msg->disabledNodeMask,node-1)) {
1002+
if (BIT_CHECK(msg->disabledNodeMask,node-1)||BIT_CHECK(Mtm->disabledNodeMask,node-1)) {
10031003
MTM_ELOG(WARNING,"Ignore message from dead node %d\n",node);
10041004
continue;
10051005
}

‎multimaster.c

Lines changed: 77 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ char const* const MtmNodeStatusMnem[] =
225225
"Recovery",
226226
"Recovered",
227227
"InMinor",
228+
"OutOfClique",
228229
"OutOfService"
229230
};
230231

@@ -373,6 +374,7 @@ void MtmLock(LWLockMode mode)
373374
if (mode==LW_EXCLUSIVE) {
374375
Assert(MtmLockCount==0);
375376
Mtm->lastLockHolder=MyProcPid;
377+
Assert(MyProcPid);
376378
MtmLockCount=1;
377379
}
378380
}
@@ -1155,7 +1157,6 @@ bool MtmWatchdog(timestamp_t now)
11551157
MTM_LOG1("[STATE] Node %i: Disconnect due to heartbeat timeout (%d msec)",
11561158
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
11571159
MtmOnNodeDisconnect(i+1);
1158-
MtmDisableNode(i+1);
11591160
allAlive= false;
11601161
}
11611162
}
@@ -1176,6 +1177,7 @@ void MtmPrecommitTransaction(char const* gid)
11761177
MTM_ELOG(WARNING,"MtmPrecommitTransaction: transaction '%s' is not found",gid);
11771178
}else {
11781179
MtmTransState*ts=tm->state;
1180+
// Assert(ts != NULL);
11791181
if (ts==NULL) {
11801182
MTM_ELOG(WARNING,"MtmPrecommitTransaction: transaction '%s' is not yet prepared, status %s",gid,MtmTxnStatusMnem[tm->status]);
11811183
MtmUnlock();
@@ -1501,6 +1503,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15011503
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
15021504
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
15031505
{
1506+
MtmUnlock();
15041507
MTM_ELOG(ERROR,"Attempt to commit %s transaction %s (%llu)",
15051508
MtmTxnStatusMnem[ts->status],ts->gid, (long64)ts->xid);
15061509
}
@@ -2026,15 +2029,19 @@ static int64 MtmGetSlotLag(int nodeId)
20262029
*/
20272030
boolMtmIsRecoveredNode(intnodeId)
20282031
{
2029-
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
2030-
if (!MtmIsRecoverySession) {
2031-
MTM_ELOG(ERROR,"Node %d is marked as disabled but is not in recovery mode",nodeId);
2032-
}
2033-
return true;
2034-
}else {
2035-
MtmIsRecoverySession= false;/* recovery is completed */
2036-
return false;
2037-
}
2032+
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
2033+
Assert(!MtmIsRecoverySession);
2034+
2035+
returnBIT_CHECK(Mtm->disabledNodeMask,nodeId-1)&&MtmIsRecoverySession;
2036+
// if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2037+
// if (!MtmIsRecoverySession) {
2038+
// MTM_ELOG(WARNING, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2039+
// }
2040+
// return true;
2041+
// } else {
2042+
// MtmIsRecoverySession = false; /* recovery is completed */
2043+
// return false;
2044+
// }
20382045
}
20392046

20402047
/*
@@ -2060,7 +2067,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20602067
*/
20612068
MTM_LOG1("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d",
20622069
nodeId,slotLSN,walLSN,Mtm->nActiveTransactions);
2063-
BIT_SET(Mtm->originLockNodeMask,nodeId-1);
2070+
BIT_SET(Mtm->originLockNodeMask,nodeId-1);// XXXX: log that
20642071
}else {
20652072
MTM_LOG2("Continue recovery of node %d, slot position %llx, WAL position %llx,"
20662073
" WAL sender position %llx, lockers %llx, active transactions %d",nodeId,slotLSN,
@@ -2082,6 +2089,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20822089
if (MtmIsRecoveredNode(nodeId)&&Mtm->nActiveTransactions==0) {
20832090
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20842091
caughtUp= true;
2092+
MtmIsRecoverySession= false;
20852093
}
20862094
MtmUnlock();
20872095
returncaughtUp;
@@ -2099,6 +2107,7 @@ MtmLockCluster(void)
20992107
}
21002108
MtmLock(LW_EXCLUSIVE);
21012109
if (BIT_CHECK(Mtm->originLockNodeMask,MtmNodeId-1)) {
2110+
MtmUnlock();
21022111
elog(ERROR,"There is already pending exclusive lock");
21032112
}
21042113
BIT_SET(Mtm->originLockNodeMask,MtmNodeId-1);
@@ -2351,6 +2360,7 @@ static void MtmInitialize()
23512360
Mtm->nLiveNodes=0;//MtmNodes;
23522361
Mtm->nAllNodes=MtmNodes;
23532362
Mtm->disabledNodeMask=7;//XXXX
2363+
Mtm->clique=7;// XXXX
23542364
Mtm->stalledNodeMask=0;
23552365
Mtm->stoppedNodeMask=0;
23562366
Mtm->deadNodeMask=0;
@@ -2383,7 +2393,7 @@ static void MtmInitialize()
23832393
for (i=0;i<MtmNodes;i++) {
23842394
Mtm->nodes[i].oldestSnapshot=0;
23852395
Mtm->nodes[i].disabledNodeMask=0;
2386-
Mtm->nodes[i].connectivityMask=7;
2396+
Mtm->nodes[i].connectivityMask=7;// XXXX
23872397
Mtm->nodes[i].lockGraphUsed=0;
23882398
Mtm->nodes[i].lockGraphAllocated=0;
23892399
Mtm->nodes[i].lockGraphData=NULL;
@@ -3308,17 +3318,56 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33083318
Mtm->preparedTransactionsLoaded= true;
33093319
}
33103320

3311-
while (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1)||
3312-
BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1))
3321+
// while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3322+
// BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3323+
// !BIT_CHECK(Mtm->clique, nodeId - 1) ||
3324+
// !BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
3325+
// {
3326+
// if (*shutdown)
3327+
// {
3328+
// MtmUnlock();
3329+
// return REPLMODE_EXIT;
3330+
// }
3331+
3332+
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3333+
// (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3334+
// {
3335+
// /* Lock on us */
3336+
// Mtm->recoverySlot = nodeId;
3337+
// MtmPollStatusOfPreparedTransactions();
3338+
// MtmUnlock();
3339+
// return REPLMODE_RECOVERY;
3340+
// }
3341+
3342+
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3343+
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3344+
3345+
// MtmUnlock();
3346+
// /* delay opening of other slots until recovery is completed */
3347+
// MtmSleep(STATUS_POLL_DELAY);
3348+
// MtmLock(LW_EXCLUSIVE);
3349+
// }
3350+
3351+
// MtmUnlock();
3352+
3353+
// return REPLMODE_RECOVERED;
3354+
3355+
/* Await until node is connected and both receiver and sender are in clique */
3356+
while (BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1)||
3357+
!BIT_CHECK(Mtm->clique,nodeId-1)||
3358+
!BIT_CHECK(Mtm->clique,MtmNodeId-1) )
33133359
{
3360+
MtmUnlock();
33143361
if (*shutdown)
3315-
{
3316-
MtmUnlock();
33173362
returnREPLMODE_EXIT;
3318-
}
3363+
MtmSleep(STATUS_POLL_DELAY);
3364+
MtmLock(LW_EXCLUSIVE);
3365+
}
33193366

3320-
if ((Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)&&
3321-
(!BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1)))
3367+
if (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
3368+
{
3369+
/* Ok, then start recovery by luckiest walreceiver */
3370+
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)
33223371
{
33233372
/* Lock on us */
33243373
Mtm->recoverySlot=nodeId;
@@ -3327,87 +3376,19 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33273376
returnREPLMODE_RECOVERY;
33283377
}
33293378

3330-
MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3331-
nodeId,Mtm->recoverySlot,Mtm->donorNodeId,SELF_CONNECTIVITY_MASK,Mtm->disabledNodeMask);
3332-
3333-
MtmUnlock();
3334-
/* delay opening of other slots until recovery is completed */
3335-
MtmSleep(STATUS_POLL_DELAY);
3336-
MtmLock(LW_EXCLUSIVE);
3379+
/* And force less lucky walreceivers wait until recovery is completed */
3380+
while (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
3381+
{
3382+
MtmUnlock();
3383+
if (*shutdown)
3384+
returnREPLMODE_EXIT;
3385+
MtmSleep(STATUS_POLL_DELAY);
3386+
MtmLock(LW_EXCLUSIVE);
3387+
}
33373388
}
33383389

33393390
MtmUnlock();
3340-
33413391
returnREPLMODE_RECOVERED;
3342-
3343-
3344-
3345-
3346-
3347-
// while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3348-
// || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3349-
// // while (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3350-
// {
3351-
// if (*shutdown)
3352-
// {
3353-
// MtmUnlock();
3354-
// return REPLMODE_EXIT;
3355-
// }
3356-
// // /* We are not interested in receiving any deteriorated logical messages from recovered node, so recreate slot */
3357-
// // if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
3358-
// // mode = REPLMODE_CREATE_NEW;
3359-
// // }
3360-
// // MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
3361-
3362-
// if (Mtm->status == MTM_RECOVERY) {
3363-
// mode = REPLMODE_RECOVERED;
3364-
// /* Choose node for recovery if
3365-
// * 1. It is not chosen yet or the same node was chosen before
3366-
// * 2. It is donor node or there is no donor node
3367-
// * 3. Connections with all other live nodes were established
3368-
// */
3369-
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3370-
// && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId)
3371-
// && (SELF_CONNECTIVITY_MASK & ~Mtm->disabledNodeMask) == 0)
3372-
// {
3373-
// /* Choose for recovery first available slot or slot of donor node (if any) */
3374-
// if (Mtm->nAllNodes >= 3) {
3375-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d restartLSNs={%llx, %llx, %llx}",
3376-
// MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3377-
// } else {
3378-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3379-
// }
3380-
// Mtm->recoverySlot = nodeId;
3381-
// // Mtm->nReceivers = 0;
3382-
// // Mtm->nSenders = 0;
3383-
// // Mtm->recoveryCount += 1;
3384-
// // Mtm->pglogicalReceiverMask = 0;
3385-
// // Mtm->pglogicalSenderMask = 0;
3386-
// MtmPollStatusOfPreparedTransactions();
3387-
// MtmUnlock();
3388-
// return REPLMODE_RECOVERY;
3389-
// }
3390-
// }
3391-
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3392-
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3393-
// MtmUnlock();
3394-
// /* delay opening of other slots until recovery is completed */
3395-
// MtmSleep(STATUS_POLL_DELAY);
3396-
// MtmLock(LW_EXCLUSIVE);
3397-
// }
3398-
// if (Mtm->status == MTM_RECOVERED) {
3399-
// mode = REPLMODE_RECOVERED;
3400-
// }
3401-
// // if (mode == REPLMODE_RECOVERED) {
3402-
// // MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
3403-
// // } else if (mode == REPLMODE_CREATE_NEW) {
3404-
// // MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
3405-
// // } else {
3406-
// // MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
3407-
// // }
3408-
// BIT_SET(Mtm->reconnectMask, nodeId-1); /* arbiter should try to reestablish connection with this node */
3409-
// MtmUnlock();
3410-
// return mode;
34113392
}
34123393

34133394
staticboolMtmIsBroadcast()

‎multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ typedef enum
108108
/* Identifier of global transaction */
109109
typedefstruct
110110
{
111-
intnode;/*Zero basedindex of node initiating transaction */
111+
intnode;/*One basedid of node initiating transaction */
112112
TransactionIdxid;/* Transaction ID at this node */
113113
}GlobalTransactionId;
114114

@@ -137,6 +137,7 @@ typedef enum
137137
MTM_RECOVERY,/* Node is in recovery process */
138138
MTM_RECOVERED,/* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
139139
MTM_IN_MINORITY,/* Node is out of quorum */
140+
MTM_OUT_OF_CLIQUE,/* Node is out of cluster by clique detector */
140141
MTM_OUT_OF_SERVICE/* Node is not available to to critical, non-recoverable error */
141142
}MtmNodeStatus;
142143

@@ -288,6 +289,7 @@ typedef struct
288289
LWLockPadded*locks;/* multimaster lock tranche */
289290
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
290291
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes */
292+
nodemask_tclique;/* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
291293
nodemask_tdeadNodeMask;/* Bitmask of nodes considered as dead by referee */
292294
nodemask_trecoveredNodeMask;/* Bitmask of nodes recoverd after been reported as dead by referee */
293295
nodemask_tstalledNodeMask;/* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
@@ -417,7 +419,6 @@ extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* co
417419
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
418420
externboolMtmRecoveryCaughtUp(intnodeId,lsn_twalEndPtr);
419421
externvoidMtmCheckRecoveryCaughtUp(intnodeId,lsn_tslotLSN);
420-
externvoidMtmRecoveryCompleted(void);
421422
externvoidMtmMakeTableLocal(charconst*schema,charconst*name);
422423
externvoidMtmHandleApplyError(void);
423424
externvoidMtmUpdateLsnMapping(intnodeId,lsn_tendLsn);

‎pglogical_apply.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ process_remote_commit(StringInfo in)
658658
csn_tcsn;
659659
lsn_tend_lsn;
660660
lsn_torigin_lsn;
661+
lsn_tcommit_lsn;
661662
intorigin_node;
662663
chargid[MULTIMASTER_MAX_GID_SIZE];
663664

@@ -668,7 +669,7 @@ process_remote_commit(StringInfo in)
668669
MtmReplicationNodeId=pq_getmsgbyte(in);
669670

670671
/* read fields */
671-
pq_getmsgint64(in);/* commit_lsn */
672+
commit_lsn=pq_getmsgint64(in);/* commit_lsn */
672673
end_lsn=pq_getmsgint64(in);/* end_lsn */
673674
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
674675

@@ -692,7 +693,7 @@ process_remote_commit(StringInfo in)
692693
}
693694
casePGLOGICAL_COMMIT:
694695
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
696+
MTM_LOG1("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
696697
if (IsTransactionState()) {
697698
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698699
MtmBeginSession(origin_node);
@@ -969,10 +970,10 @@ process_remote_update(StringInfo s, Relation rel)
969970
{
970971
StringInfoDatao;
971972
initStringInfo(&o);
972-
tuple_to_stringinfo(&o,RelationGetDescr(rel),oldslot->tts_tuple);
973+
tuple_to_stringinfo(&o,RelationGetDescr(rel),oldslot->tts_tuple, false);
973974
appendStringInfo(&o," to");
974-
tuple_to_stringinfo(&o,RelationGetDescr(rel),remote_tuple);
975-
MTM_LOG1(DEBUG1,"UPDATE:%s",o.data);
975+
tuple_to_stringinfo(&o,RelationGetDescr(rel),remote_tuple, false);
976+
MTM_LOG1("%lu:UPDATE:%s",GetCurrentTransactionId(),o.data);
976977
resetStringInfo(&o);
977978
}
978979
#endif
@@ -1191,7 +1192,6 @@ void MtmExecutor(void* work, size_t size)
11911192
}
11921193
case'Z':
11931194
{
1194-
// MtmRecoveryCompleted();
11951195
MtmStateProcessEvent(MTM_RECOVERY_FINISH2);
11961196
inside_transaction= false;
11971197
break;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp