Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd29666d

Browse files
knizhnikkelvich
authored andcommitted
some fixes in recovery
1 parentaface56 commitd29666d

File tree

4 files changed

+49
-32
lines changed

4 files changed

+49
-32
lines changed

‎multimaster.c

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -635,9 +635,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
635635
x->isDistributed=MtmIsUserTransaction();
636636
x->isPrepared= false;
637637
x->isTransactionBlock=IsTransactionBlock();
638-
/* Application name can becahnged usnig PGAPPNAME environment variable */
638+
/* Application name can bechanged usnig PGAPPNAME environment variable */
639639
if (!IsBackgroundWorker&&x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0) {
640-
/* reject all user's transactions at offline cluster */
640+
/* Reject all user's transactions at offline cluster.
641+
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
642+
*/
641643
MtmUnlock();
642644
elog(ERROR,"Multimaster node is not online: current status %s",MtmNodeStatusMnem[Mtm->status]);
643645
}
@@ -673,14 +675,17 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
673675
if (Mtm->disabledNodeMask!=0) {
674676
MtmRefreshClusterStatus(true);
675677
if (!IsBackgroundWorker&&Mtm->status!=MTM_ONLINE) {
676-
elog(ERROR,"Abort current transaction because this cluster node is not online");
678+
/* Do not take in accoutn bg-workers which are performing recovery */
679+
elog(ERROR,"Abort current transaction because this cluster node is in %s status",MtmNodeStatusMnem[Mtm->status]);
677680
}
678681
}
679682

680683
MtmLock(LW_EXCLUSIVE);
681684

682685
/*
683-
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
686+
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
687+
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
688+
* and should not cause cluster status change.
684689
*/
685690
if (!x->isReplicated) {
686691
MtmCheckClusterLock();
@@ -716,7 +721,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716721
}
717722
MtmTransactionListAppend(ts);
718723
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
719-
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d CSN=%ld\n",MyProcPid,x->xid,ts->csn);
724+
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n",
725+
MyProcPid,x->xid,ts->gtid.xid,ts->gtid.node,ts->csn);
720726
MtmUnlock();
721727

722728
}
@@ -842,14 +848,6 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
842848
}
843849
}
844850

845-
voidMtmRecoveryCompleted(void)
846-
{
847-
elog(WARNING,"Recovery of node %d is completed",MtmNodeId);
848-
Mtm->recoverySlot=0;
849-
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
850-
MtmSwitchClusterMode(MTM_ONLINE);
851-
}
852-
853851
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
854852
{
855853
MtmLock(LW_EXCLUSIVE);
@@ -933,6 +931,18 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
933931
* -------------------------------------------
934932
*/
935933

934+
voidMtmRecoveryCompleted(void)
935+
{
936+
elog(WARNING,"Recovery of node %d is completed",MtmNodeId);
937+
MtmLock(LW_EXCLUSIVE);
938+
Mtm->recoverySlot=0;
939+
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
940+
/* Mode will be changed to online once all locagical reciever are connected */
941+
MtmSwitchClusterMode(MTM_CONNECTED);
942+
MtmUnlock();
943+
}
944+
945+
936946

937947
/**
938948
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
@@ -993,10 +1003,10 @@ bool MtmIsRecoveredNode(int nodeId)
9931003
boolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN)
9941004
{
9951005
boolcaughtUp= false;
1006+
MtmLock(LW_EXCLUSIVE);
9961007
if (MtmIsRecoveredNode(nodeId)) {
9971008
XLogRecPtrwalLSN=GetXLogInsertRecPtr();
998-
MtmLock(LW_EXCLUSIVE);
999-
if (slotLSN==walLSN) {
1009+
if (slotLSN==walLSN&&Mtm->nActiveTransactions==0) {
10001010
if (BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)) {
10011011
elog(WARNING,"Node %d is caught-up",nodeId);
10021012
BIT_CLEAR(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
@@ -1018,18 +1028,17 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10181028
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
10191029
* Is there some better way to establish mapping between nodes ad WAL-seconder?
10201030
*/
1021-
elog(WARNING,"Node %d is almost caught-up: lock cluster",nodeId);
1031+
elog(WARNING,"Node %d is almost caught-up: slot position %lx, WAL position %lx, active transactions %d",
1032+
nodeId,slotLSN,walLSN,Mtm->nActiveTransactions);
10221033
Assert(MyWalSnd!=NULL);/* This function is called by WAL-sender, so it should not be NULL */
10231034
BIT_SET(Mtm->nodeLockerMask,nodeId-1);
10241035
BIT_SET(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
10251036
Mtm->nLockers+=1;
10261037
}else {
10271038
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d\n",nodeId,slotLSN,walLSN,MyWalSnd->sentPtr,Mtm->nLockers,Mtm->nActiveTransactions);
10281039
}
1029-
MtmUnlock();
1030-
}else {
1031-
MTM_INFO("Node %d is not in recovery mode\n",nodeId);
10321040
}
1041+
MtmUnlock();
10331042
returncaughtUp;
10341043
}
10351044

@@ -1044,7 +1053,7 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10441053
/*
10451054
* If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
10461055
* WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
1047-
* This function is calledat transactionstart with multimaster lock set
1056+
* This function is calledbefore transactionprepare with multimaster lock set.
10481057
*/
10491058
staticvoid
10501059
MtmCheckClusterLock()
@@ -1071,8 +1080,8 @@ MtmCheckClusterLock()
10711080
}
10721081
}
10731082
if (mask!=0) {
1074-
/* some "almost catch-up" wal-senders are still working */
1075-
/* Do not start new transactions until themcomplete */
1083+
/* some "almost catch-up" wal-senders are still working. */
1084+
/* Do not start new transactions until themare completed. */
10761085
MtmUnlock();
10771086
MtmSleep(delay);
10781087
if (delay*2 <=MAX_WAIT_TIMEOUT) {
@@ -1215,6 +1224,7 @@ void MtmOnNodeDisconnect(int nodeId)
12151224
voidMtmOnNodeConnect(intnodeId)
12161225
{
12171226
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
1227+
elog(NOTICE,"Reconnect node %d",nodeId);
12181228
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
12191229
}
12201230

@@ -1645,19 +1655,23 @@ _PG_fini(void)
16451655
}
16461656

16471657

1648-
1658+
/*
1659+
* This functions is called by pglogical receiver main function when receiver background worker is started.
1660+
* We switch to ONLINE mode when all receviers are connected.
1661+
* As far as background worker can be restarted multiple times, use node bitmask.
1662+
*/
16491663
voidMtmReceiverStarted(intnodeId)
16501664
{
1651-
SpinLockAcquire(&Mtm->spinlock);
1665+
MtmLock(LW_EXCLUSIVE);
16521666
if (!BIT_CHECK(Mtm->pglogicalNodeMask,nodeId-1)) {
16531667
BIT_SET(Mtm->pglogicalNodeMask,nodeId-1);
16541668
if (++Mtm->nReceivers==Mtm->nNodes-1) {
16551669
if (Mtm->status==MTM_CONNECTED) {
16561670
MtmSwitchClusterMode(MTM_ONLINE);
16571671
}
16581672
}
1659-
}
1660-
SpinLockRelease(&Mtm->spinlock);
1673+
}
1674+
MtmUnlock();
16611675
}
16621676

16631677
/*

‎multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ typedef uint64 csn_t; /* commit serial number */
4545

4646
#definePGLOGICAL_XACT_EVENT(flags)(flags & 0x03)
4747

48+
#definePGLOGICAL_CAUGHT_UP 0x04
49+
50+
4851
typedefuint64timestamp_t;
4952

5053
/* Identifier of global transaction */

‎pglogical_apply.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,10 @@ process_remote_commit(StringInfo in)
497497
uint8flags;
498498
csn_tcsn;
499499
constchar*gid=NULL;
500-
boolcaughtUp;
501500

502501
/* read flags */
503502
flags=pq_getmsgbyte(in);
504503
MtmReplicationNode=pq_getmsgbyte(in);
505-
caughtUp=pq_getmsgbyte(in)!=0;
506504

507505
/* read fields */
508506
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
@@ -571,7 +569,7 @@ process_remote_commit(StringInfo in)
571569
Assert(false);
572570
}
573571
MtmEndSession(true);
574-
if (caughtUp) {
572+
if (flags&PGLOGICAL_CAUGHT_UP) {
575573
MtmRecoveryCompleted();
576574
}
577575
}

‎pglogical_proto.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_tcsn=MtmTransactionSnapshot(txn->xid);
106-
MTM_INFO("%d: pglogical_write_begin %d CSN=%ld\n",MyProcPid,txn->xid,csn);
106+
MTM_INFO("%d: pglogical_write_beginXID=%dnode=%dCSN=%ld recovery=%d\n",MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery);
107107

108108
if (csn==INVALID_CSN&& !isRecovery) {
109109
MtmIsFilteredTxn= true;
@@ -124,7 +124,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
124124
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn)
125125
{
126126
uint8flags=0;
127-
127+
128128
if (txn->xact_action==XLOG_XACT_COMMIT)
129129
flags=PGLOGICAL_COMMIT;
130130
elseif (txn->xact_action==XLOG_XACT_PREPARE)
@@ -146,6 +146,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
if (csn==INVALID_CSN&& !isRecovery) {
147147
return;
148148
}
149+
if (MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn)) {
150+
flags |=PGLOGICAL_CAUGHT_UP;
151+
}
149152
}
150153
pq_sendbyte(out,'C');/* sending COMMIT */
151154

@@ -154,7 +157,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
154157
/* send the flags field */
155158
pq_sendbyte(out,flags);
156159
pq_sendbyte(out,MtmNodeId);
157-
pq_sendbyte(out,MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn));
158160

159161
/* send fixed fields */
160162
pq_sendint64(out,commit_lsn);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp