Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit269d8dd

Browse files
committed
Node recovery
1 parent5f43df1 commit269d8dd

File tree

5 files changed

+33
-13
lines changed

5 files changed

+33
-13
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
348348
/* Some node considered that I am dead, so switch to recovery mode */
349349
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
350350
elog(WARNING,"Node %d think that I am dead",resp.node);
351+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
351352
MtmSwitchClusterMode(MTM_RECOVERY);
352353
}
353-
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
354-
Mtm->disabledNodeMask |=resp.disabledNodeMask;
355354
returnsd;
356355
}
357356
}
@@ -377,7 +376,7 @@ static void MtmOpenConnections()
377376
}
378377
if (Mtm->nNodes<MtmNodes/2+1) {/* no quorum */
379378
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",Mtm->nNodes,MtmNodes);
380-
Mtm->status=MTM_OFFLINE;
379+
Mtm->status=MTM_IN_MINORITY;
381380
}elseif (Mtm->status==MTM_INITIALIZATION) {
382381
MtmSwitchClusterMode(MTM_CONNECTED);
383382
}
@@ -431,6 +430,7 @@ static void MtmAcceptOneConnection()
431430
resp.dxid=HANDSHAKE_MAGIC;
432431
resp.sxid=ShmemVariableCache->nextXid;
433432
resp.csn=MtmGetCurrentTime();
433+
resp.node=MtmNodeId;
434434
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con,req.connStr);
435435
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
436436
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);

‎contrib/mmts/multimaster.c‎

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ char const* const MtmNodeStatusMnem[] =
156156
"Offline",
157157
"Connected",
158158
"Online",
159-
"Recovery"
159+
"Recovery",
160+
"InMinor"
160161
};
161162

162163
boolMtmDoReplication;
@@ -633,10 +634,10 @@ MtmBeginTransaction(MtmCurrentTrans* x)
633634
x->isDistributed=MtmIsUserTransaction();
634635
x->isPrepared= false;
635636
x->isTransactionBlock=IsTransactionBlock();
636-
if (x->isDistributed&&Mtm->status!=MTM_ONLINE) {
637+
/* Application name can be cahnged usnig PGAPPNAME environment variable */
638+
if (x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0) {
637639
/* reject all user's transactions at offline cluster */
638640
MtmUnlock();
639-
Assert(Mtm->status==MTM_ONLINE);
640641
elog(ERROR,"Multimaster node is not online: current status %s",MtmNodeStatusMnem[Mtm->status]);
641642
}
642643
x->containsDML= false;
@@ -983,11 +984,14 @@ bool MtmIsRecoveredNode(int nodeId)
983984
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
984985
* Is there some better way to establish mapping between nodes ad WAL-seconder?
985986
*/
987+
elog(WARNING,"Node %d is catching up",nodeId);
986988
MtmLock(LW_EXCLUSIVE);
987989
BIT_SET(Mtm->nodeLockerMask,nodeId-1);
988990
BIT_SET(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
989991
Mtm->nLockers+=1;
990992
MtmUnlock();
993+
}else {
994+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n",nodeId,MyWalSnd->sentPtr,GetXLogInsertRecPtr(),Mtm->nLockers);
991995
}
992996
return true;
993997
}
@@ -1024,7 +1028,7 @@ MtmCheckClusterLock()
10241028
break;
10251029
}else {
10261030
/* recovered replica catched up with master */
1027-
elog(WARNING,"WAL-sender %d completereceovery",i);
1031+
elog(WARNING,"WAL-sender %d completerecovery",i);
10281032
BIT_CLEAR(Mtm->walSenderLockerMask,i);
10291033
}
10301034
}
@@ -1610,8 +1614,9 @@ void MtmReceiverStarted(int nodeId)
16101614
if (!BIT_CHECK(Mtm->pglogicalNodeMask,nodeId-1)) {
16111615
BIT_SET(Mtm->pglogicalNodeMask,nodeId-1);
16121616
if (++Mtm->nReceivers==Mtm->nNodes-1) {
1613-
Assert(Mtm->status==MTM_CONNECTED);
1614-
MtmSwitchClusterMode(MTM_ONLINE);
1617+
if (Mtm->status==MTM_CONNECTED) {
1618+
MtmSwitchClusterMode(MTM_ONLINE);
1619+
}
16151620
}
16161621
}
16171622
SpinLockRelease(&Mtm->spinlock);
@@ -1624,19 +1629,28 @@ void MtmReceiverStarted(int nodeId)
16241629
*/
16251630
MtmSlotModeMtmReceiverSlotMode(intnodeId)
16261631
{
1632+
boolrecovery= false;
16271633
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE) {
1634+
MTM_INFO("%d: receiver slot mode %s\n",MyProcPid,MtmNodeStatusMnem[Mtm->status]);
16281635
if (Mtm->status==MTM_RECOVERY) {
1636+
recovery= true;
16291637
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId) {
16301638
/* Choose for recovery first available slot */
1639+
elog(WARNING,"Start recovery from node %d",nodeId);
16311640
Mtm->recoverySlot=nodeId;
16321641
returnSLOT_OPEN_EXISTED;
16331642
}
16341643
}
16351644
/* delay opening of other slots until recovery is completed */
16361645
MtmSleep(STATUS_POLL_DELAY);
16371646
}
1647+
if (recovery) {
1648+
elog(WARNING,"Recreate replication slot for node %d after end of recovery",nodeId);
1649+
}else {
1650+
MTM_INFO("%d: Reuse replication slot for node %d\n",MyProcPid,nodeId);
1651+
}
16381652
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1639-
returnMtm->recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
1653+
returnrecovery ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
16401654
}
16411655

16421656
staticboolMtmIsBroadcast()
@@ -1692,7 +1706,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16921706
staticbool
16931707
MtmReplicationTxnFilterHook(structPGLogicalTxnFilterArgs*args)
16941708
{
1695-
returnargs->origin_id==InvalidRepOriginId||MtmIsRecoveredNode(MtmReplicationNodeId);
1709+
boolres=Mtm->status!=MTM_RECOVERY
1710+
&& (args->origin_id==InvalidRepOriginId
1711+
||MtmIsRecoveredNode(MtmReplicationNodeId));
1712+
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n",MyProcPid,res);
1713+
returnres;
16961714
}
16971715

16981716
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)

‎contrib/mmts/multimaster.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
2929
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
30+
#defineMULTIMASTER_ADMIN "mtm_admin"
3031

3132
#defineUSEC 1000000
3233

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_tcsn=MtmTransactionSnapshot(txn->xid);
106-
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
106+
MTM_INFO("%d:pglogical_write_begin %d CSN=%ld\n",MyProcPid,txn->xid,csn);
107107

108108
if (csn==INVALID_CSN&& !isRecovery) {
109109
MtmIsFilteredTxn= true;

‎src/backend/replication/slot.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,11 @@ ReplicationSlotCreate(const char *name, bool db_specific,
248248
{
249249
ReplicationSlot*s=&ReplicationSlotCtl->replication_slots[i];
250250

251-
if (s->in_use&&strcmp(name,NameStr(s->data.name))==0)
251+
if (s->in_use&&strcmp(name,NameStr(s->data.name))==0) {
252252
ereport(ERROR,
253253
(errcode(ERRCODE_DUPLICATE_OBJECT),
254254
errmsg("replication slot \"%s\" already exists",name)));
255+
}
255256
if (!s->in_use&&slot==NULL)
256257
slot=s;
257258
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp