Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1d135e6

Browse files
knizhnikkelvich
authored andcommitted
Node recovery
1 parent78af328 commit1d135e6

File tree

4 files changed

+32
-12
lines changed

4 files changed

+32
-12
lines changed

‎arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
348348
/* Some node considered that I am dead, so switch to recovery mode */
349349
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
350350
elog(WARNING,"Node %d think that I am dead",resp.node);
351+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
351352
MtmSwitchClusterMode(MTM_RECOVERY);
352353
}
353-
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
354-
Mtm->disabledNodeMask |=resp.disabledNodeMask;
355354
returnsd;
356355
}
357356
}
@@ -377,7 +376,7 @@ static void MtmOpenConnections()
377376
}
378377
if (Mtm->nNodes<MtmNodes/2+1) {/* no quorum */
379378
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",Mtm->nNodes,MtmNodes);
380-
Mtm->status=MTM_OFFLINE;
379+
Mtm->status=MTM_IN_MINORITY;
381380
}elseif (Mtm->status==MTM_INITIALIZATION) {
382381
MtmSwitchClusterMode(MTM_CONNECTED);
383382
}
@@ -431,6 +430,7 @@ static void MtmAcceptOneConnection()
431430
resp.dxid=HANDSHAKE_MAGIC;
432431
resp.sxid=ShmemVariableCache->nextXid;
433432
resp.csn=MtmGetCurrentTime();
433+
resp.node=MtmNodeId;
434434
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con,req.connStr);
435435
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
436436
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);

‎multimaster.c

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ char const* const MtmNodeStatusMnem[] =
155155
"Offline",
156156
"Connected",
157157
"Online",
158-
"Recovery"
158+
"Recovery",
159+
"InMinor"
159160
};
160161

161162
boolMtmDoReplication;
@@ -631,10 +632,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
631632
x->isReplicated= false;
632633
x->isDistributed=MtmIsUserTransaction();
633634
x->isPrepared= false;
634-
if (x->isDistributed&&Mtm->status!=MTM_ONLINE) {
635+
x->isTransactionBlock=IsTransactionBlock();
636+
/* Application name can be cahnged usnig PGAPPNAME environment variable */
637+
if (x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0) {
635638
/* reject all user's transactions at offline cluster */
636639
MtmUnlock();
637-
Assert(Mtm->status==MTM_ONLINE);
638640
elog(ERROR,"Multimaster node is not online: current status %s",MtmNodeStatusMnem[Mtm->status]);
639641
}
640642
x->containsDML= false;
@@ -981,11 +983,14 @@ bool MtmIsRecoveredNode(int nodeId)
981983
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
982984
* Is there some better way to establish mapping between nodes ad WAL-seconder?
983985
*/
986+
elog(WARNING,"Node %d is catching up",nodeId);
984987
MtmLock(LW_EXCLUSIVE);
985988
BIT_SET(Mtm->nodeLockerMask,nodeId-1);
986989
BIT_SET(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
987990
Mtm->nLockers+=1;
988991
MtmUnlock();
992+
}else {
993+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n",nodeId,MyWalSnd->sentPtr,GetXLogInsertRecPtr(),Mtm->nLockers);
989994
}
990995
return true;
991996
}
@@ -1022,7 +1027,7 @@ MtmCheckClusterLock()
10221027
break;
10231028
}else {
10241029
/* recovered replica catched up with master */
1025-
elog(WARNING,"WAL-sender %d completereceovery",i);
1030+
elog(WARNING,"WAL-sender %d completerecovery",i);
10261031
BIT_CLEAR(Mtm->walSenderLockerMask,i);
10271032
}
10281033
}
@@ -1608,8 +1613,9 @@ void MtmReceiverStarted(int nodeId)
16081613
if (!BIT_CHECK(Mtm->pglogicalNodeMask,nodeId-1)) {
16091614
BIT_SET(Mtm->pglogicalNodeMask,nodeId-1);
16101615
if (++Mtm->nReceivers==Mtm->nNodes-1) {
1611-
Assert(Mtm->status==MTM_CONNECTED);
1612-
MtmSwitchClusterMode(MTM_ONLINE);
1616+
if (Mtm->status==MTM_CONNECTED) {
1617+
MtmSwitchClusterMode(MTM_ONLINE);
1618+
}
16131619
}
16141620
}
16151621
SpinLockRelease(&Mtm->spinlock);
@@ -1622,19 +1628,28 @@ void MtmReceiverStarted(int nodeId)
16221628
*/
16231629
MtmSlotModeMtmReceiverSlotMode(intnodeId)
16241630
{
1631+
boolrecovery= false;
16251632
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE) {
1633+
MTM_INFO("%d: receiver slot mode %s\n",MyProcPid,MtmNodeStatusMnem[Mtm->status]);
16261634
if (Mtm->status==MTM_RECOVERY) {
1635+
recovery= true;
16271636
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId) {
16281637
/* Choose for recovery first available slot */
1638+
elog(WARNING,"Start recovery from node %d",nodeId);
16291639
Mtm->recoverySlot=nodeId;
16301640
returnSLOT_OPEN_EXISTED;
16311641
}
16321642
}
16331643
/* delay opening of other slots until recovery is completed */
16341644
MtmSleep(STATUS_POLL_DELAY);
16351645
}
1646+
if (recovery) {
1647+
elog(WARNING,"Recreate replication slot for node %d after end of recovery",nodeId);
1648+
}else {
1649+
MTM_INFO("%d: Reuse replication slot for node %d\n",MyProcPid,nodeId);
1650+
}
16361651
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1637-
returnMtm->recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
1652+
returnrecovery ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
16381653
}
16391654

16401655
staticboolMtmIsBroadcast()
@@ -1690,7 +1705,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16901705
staticbool
16911706
MtmReplicationTxnFilterHook(structPGLogicalTxnFilterArgs*args)
16921707
{
1693-
returnargs->origin_id==InvalidRepOriginId||MtmIsRecoveredNode(MtmReplicationNodeId);
1708+
boolres=Mtm->status!=MTM_RECOVERY
1709+
&& (args->origin_id==InvalidRepOriginId
1710+
||MtmIsRecoveredNode(MtmReplicationNodeId));
1711+
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n",MyProcPid,res);
1712+
returnres;
16941713
}
16951714

16961715
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
2929
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
30+
#defineMULTIMASTER_ADMIN "mtm_admin"
3031

3132
#defineUSEC 1000000
3233

‎pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_tcsn=MtmTransactionSnapshot(txn->xid);
106-
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
106+
MTM_INFO("%d:pglogical_write_begin %d CSN=%ld\n",MyProcPid,txn->xid,csn);
107107

108108
if (csn==INVALID_CSN&& !isRecovery) {
109109
MtmIsFilteredTxn= true;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp