Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit37df65e

Browse files
knizhnikkelvich
authored andcommitted
Add switch from recovery to normal mode
1 parentc83f738 commit37df65e

File tree

5 files changed

+110
-54
lines changed

5 files changed

+110
-54
lines changed

‎arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ typedef struct
8383
TransactionIddxid;/* Transaction ID at destination node */
8484
TransactionIdsxid;/* Transaction IO at sender node */
8585
csn_tcsn;/* local CSN in case of sending data from replica to master, global CSN master->replica */
86-
int64disabledNodeMask;/* bitmask of disabled nodes at the sender of message */
86+
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes at the sender of message */
8787
}MtmArbiterMessage;
8888

8989
typedefstruct

‎multimaster.c

Lines changed: 98 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include"storage/proc.h"
4646
#include"utils/syscache.h"
4747
#include"replication/walsender.h"
48+
#include"replication/walsender_private.h"
4849
#include"replication/slot.h"
4950
#include"port/atomics.h"
5051
#include"tcop/utility.h"
@@ -60,16 +61,14 @@ typedef struct {
6061
boolisReplicated;/* transaction on replica */
6162
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6263
boolcontainsDML;/* transaction contains DML statements */
63-
boolisPrepared;/* transaction was prepared for commit */
6464
csn_tsnapshot;/* transaction snaphsot */
6565
}MtmCurrentTrans;
6666

6767
/* #define USE_SPINLOCK 1 */
6868

6969
typedefenum
7070
{
71-
HASH_LOCK_ID,
72-
COMMIT_LOCK_ID,
71+
MTM_STATE_LOCK_ID,
7372
N_LOCKS
7473
}MtmLockIds;
7574

@@ -142,6 +141,7 @@ int MtmReconnectAttempts;
142141
staticintMtmQueueSize;
143142
staticintMtmWorkers;
144143
staticintMtmVacuumDelay;
144+
staticintMtmMinRecoveryLag;
145145

146146
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
147147
staticProcessUtility_hook_typePreviousProcessUtilityHook;
@@ -158,7 +158,7 @@ void MtmLock(LWLockMode mode)
158158
#ifdefUSE_SPINLOCK
159159
SpinLockAcquire(&dtm->spinlock);
160160
#else
161-
LWLockAcquire(dtm->locks[HASH_LOCK_ID],mode);
161+
LWLockAcquire((LWLockId)&dtm->locks[MTM_STATE_LOCK_ID],mode);
162162
#endif
163163
}
164164

@@ -167,7 +167,7 @@ void MtmUnlock(void)
167167
#ifdefUSE_SPINLOCK
168168
SpinLockRelease(&dtm->spinlock);
169169
#else
170-
LWLockRelease(dtm->locks[HASH_LOCK_ID]);
170+
LWLockRelease((LWLockId)&dtm->locks[MTM_STATE_LOCK_ID]);
171171
#endif
172172
}
173173

@@ -426,12 +426,14 @@ static void MtmInitialize()
426426
dtm->nNodes=MtmNodes;
427427
dtm->disabledNodeMask=0;
428428
dtm->pglogicalNodeMask=0;
429+
dtm->walSenderLockerMask=0;
430+
dtm->nodeLockerMask=0;
431+
dtm->nLockers=0;
429432
dtm->votingTransactions=NULL;
430433
dtm->transListHead=NULL;
431434
dtm->transListTail=&dtm->transListHead;
432435
dtm->nReceivers=0;
433436
dtm->timeShift=0;
434-
pg_atomic_write_u32(&dtm->nCommittingTrans,0);
435437
PGSemaphoreCreate(&dtm->votingSemaphore);
436438
PGSemaphoreReset(&dtm->votingSemaphore);
437439
SpinLockInit(&dtm->spinlock);
@@ -476,7 +478,6 @@ MtmBeginTransaction(MtmCurrentTrans* x)
476478
x->xid=GetCurrentTransactionIdIfAny();
477479
x->isReplicated= false;
478480
x->isDistributed=IsNormalProcessingMode()&&dtm->status==MTM_ONLINE&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
479-
x->isPrepared= false;
480481
x->containsDML= false;
481482
x->snapshot=MtmAssignCSN();
482483
x->gtid.xid=InvalidTransactionId;
@@ -487,6 +488,53 @@ MtmBeginTransaction(MtmCurrentTrans* x)
487488
}
488489

489490

491+
/* This function is called at transaction start with multimaster ock set */
492+
staticvoid
493+
MtmCheckClusterLock()
494+
{
495+
while (true)
496+
{
497+
nodemask_tmask=dtm->walSenderLockerMask;
498+
if (mask!=0) {
499+
XLogRecPtrcurrLogPos=GetXLogInsertRecPtr();
500+
inti;
501+
timestamp_tdelay=MIN_WAIT_TIMEOUT;
502+
for (i=0;mask!=0;i++,mask >>=1) {
503+
if (mask&1) {
504+
if (WalSndCtl->walsnds[i].sentPtr!=currLogPos) {
505+
/* recovery is in progress */
506+
break;
507+
}else {
508+
/* recovered replica catched up with master */
509+
dtm->walSenderLockerMask &= ~((nodemask_t)1 <<i);
510+
}
511+
}
512+
}
513+
if (mask!=0) {
514+
/* some "almost catch-up" wal-senders are still working */
515+
/* Do not start new transactions until them complete */
516+
MtmUnlock();
517+
MtmSleep(delay);
518+
if (delay*2 <=MAX_WAIT_TIMEOUT) {
519+
delay *=2;
520+
}
521+
MtmLock(LW_EXCLUSIVE);
522+
continue;
523+
}else {
524+
/* All lockers are synchronized their logs */
525+
/* Remove lock and mark them as receovered */
526+
Assert(dtm->walSenderLockerMask==0);
527+
Assert((dtm->nodeLockerMask&dtm->disabledNodeMask)==dtm->nodeLockerMask);
528+
dtm->disabledNodeMask &= ~dtm->nodeLockerMask;
529+
dtm->nNodes+=dtm->nLockers;
530+
dtm->nLockers=0;
531+
dtm->nodeLockerMask=0;
532+
}
533+
}
534+
break;
535+
}
536+
}
537+
490538
/*
491539
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
492540
* before commit
@@ -499,15 +547,12 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
499547
if (!x->isDistributed) {
500548
return;
501549
}
502-
/* Check that commits are not disabled */
503-
LWLockAcquire(dtm->locks[COMMIT_LOCK_ID],LW_SHARED);
504-
LWLockRelease(dtm->locks[COMMIT_LOCK_ID]);
505550

506-
pg_atomic_fetch_add_u32(dtm->nCommittingTransactions,1);
507-
x->isPrepared= true;
508551
x->xid=GetCurrentTransactionId();
509552

510553
MtmLock(LW_EXCLUSIVE);
554+
MtmCheckClusterLock();
555+
511556
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
512557
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
513558
ts->snapshot=x->isReplicated|| !x->containsDML ?INVALID_CSN :x->snapshot;
@@ -546,9 +591,6 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
546591
MtmAdjustSubtransactions(ts);
547592
MtmUnlock();
548593
}
549-
if (x->isPrepared) {
550-
pg_atomic_fetch_add_u32(dtm->nCommittingTransactions,-1);
551-
}
552594
x->snapshot=INVALID_CSN;
553595
x->xid=InvalidTransactionId;
554596
x->gtid.xid=InvalidTransactionId;
@@ -568,39 +610,29 @@ void MtmSendNotificationMessage(MtmTransState* ts)
568610
}
569611
}
570612

571-
voidMtmUpdateStatus(boolrecovered)
572-
{
573-
if (dtm->status==MTM_RECOVERY) {
574-
MtmLock(LW_EXCLUSIVE);
575-
dtm->status=MTM_ONLINE;/* Is it all we shoudl do t switch to nortmal state */
576-
MtmUnlock();
577-
}
578-
}
579-
580-
voidMtmRecoveryCompleted(intnodeId)
613+
/*
614+
* This function is called by WAL sender when start sending new transaction
615+
*/
616+
boolMtmIsRecoveredNode(intnodeId)
581617
{
582-
if (BIT_CHECK(dtm->pglogicalNodeMask,nodeId-1)) {
583-
if (MyWalSnd->sentPtr==GetXLogInsertRecPtr()) {
584-
/* Ok, now we done with recovery of node */
618+
if (BIT_CHECK(dtm->disabledNodeMask,nodeId-1)) {
619+
Assert(MyWalSnd!=NULL);
620+
if (!BIT_CHECK(dtm->nodeLockerMask,nodeId-1)
621+
&&MyWalSnd->sentPtr+MtmMinRecoveryLag>GetXLogInsertRecPtr())
622+
{
623+
/* Wal sender almost catched up */
624+
/* Lock cluster preventing new transaction to start until wal is completely replayed */
585625
MtmLock(LW_EXCLUSIVE);
586-
dtm->pglogicalNodeMask &= (int64)1 << (nodeId-1);/* now node is assumed as recovered */
587-
dtm->nNodes+=1;
626+
dtm->nodeLockerMask |= (nodemask_t)1 << (nodeId-1);
627+
dtm->walSenderLockerMask |= (nodemask_t)1 << (MyWalSnd-WalSndCtl->walsnds);
628+
dtm->nLockers+=1;
588629
MtmUnlock();
589-
590-
LWLockRelease(dtm->locks[COMMIT_LOCK_ID]);/* enable commits */
591-
592-
return true;
593-
}elseif (MyWalSnd->sentPtr+MtmSlotDelayThreashold>GetXLogInsertRecPtr()) {
594-
/* we almost done with recovery of node.. */
595-
LWLockAcquire(dtm->locks[COMMIT_LOCK_ID],LW_EXCLUSIVE);/* disable new commits */
596630
}
597-
return false;
598-
}else {
599631
return true;
600632
}
633+
return false;
601634
}
602635

603-
604636
staticbool
605637
MtmCommitTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids)
606638
{
@@ -717,6 +749,21 @@ _PG_init(void)
717749
if (!process_shared_preload_libraries_in_progress)
718750
return;
719751

752+
DefineCustomIntVariable(
753+
"multimaster.min_recovery_lag",
754+
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed",
755+
NULL,
756+
&MtmMinRecoveryLag,
757+
100000,
758+
1,
759+
INT_MAX,
760+
PGC_BACKEND,
761+
0,
762+
NULL,
763+
NULL,
764+
NULL
765+
);
766+
720767
DefineCustomIntVariable(
721768
"multimaster.vacuum_delay",
722769
"Minimal age of records which can be vacuumed (seconds)",
@@ -897,6 +944,12 @@ _PG_fini(void)
897944
* ***************************************************************************
898945
*/
899946

947+
staticvoidMtmSwitchFromRecoveryToNormalMode()
948+
{
949+
dtm->status=MTM_ONLINE;
950+
/* ??? Something else to do here? */
951+
}
952+
900953
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
901954
{
902955
csn_tlocalSnapshot;
@@ -910,6 +963,11 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
910963
elog(ERROR,"Too old snapshot: requested %ld, current %ld",globalSnapshot,localSnapshot);
911964
}
912965

966+
if (!TransactionIdIsValid(gtid->xid)) {
967+
Assert(dtm->status==MTM_RECOVERY);
968+
}elseif (dtm->status==MTM_RECOVERY) {
969+
MtmSwitchFromRecoveryToNormalMode();
970+
}
913971
dtmTx.gtid=*gtid;
914972
dtmTx.xid=GetCurrentTransactionId();
915973
dtmTx.snapshot=globalSnapshot;

‎multimaster.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#define MTM_TUPLE_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
*/
1414

15-
#defineBIT_CHECK(mask,bit) ((mask) & ((int64)1 << (bit)))
15+
#defineBIT_CHECK(mask,bit) (((mask) & ((nodemask_t)1 << (bit))) != 0)
1616

1717
#defineMULTIMASTER_NAME "mtm"
1818
#defineMULTIMASTER_SCHEMA_NAME "mtm"
@@ -30,6 +30,7 @@ typedef uint64 csn_t; /* commit serial number */
3030
#defineINVALID_CSN ((csn_t)-1)
3131

3232
typedefuint64timestamp_t;
33+
typedefuint64_tnodemask_t;
3334

3435
/* Identifier of global transaction */
3536
typedefstruct
@@ -95,13 +96,15 @@ typedef struct
9596
PGSemaphoreDatavotingSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
9697
LWLockPadded*locks;/* multimaster lock tranche */
9798
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
98-
int64disabledNodeMask;/* bitmask of disabled nodes (so no more than 64 nodes in multimaster:) */
99-
int64pglogicalNodeMask;/* bitmask of started pglogic receviers */
99+
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes (so no more than 64 nodes in multimaster:) */
100+
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
101+
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
102+
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
100103
intnNodes;/* number of active nodes */
101104
intnReceivers;/* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
105+
intnLockers;/* number of lockers */
102106
longtimeShift;/* local time correction */
103107
csn_tcsn;/* last obtained CSN: used to provide unique acending CSNs based on system time */
104-
pg_atomic_uint32nCommittingTrans;/* nubmer of transactions i process of commit */
105108
MtmTransState*votingTransactions;/* L1-list of replicated transactions sendings notifications to coordinator.
106109
This list is used to pass information to mtm-sender BGW */
107110
MtmTransState*transListHead;/* L1 list of all finished transactions present in xid2state hash.
@@ -141,6 +144,5 @@ extern void MtmDropNode(int nodeId, bool dropSlot);
141144
externMtmState*MtmGetState(void);
142145
externtimestamp_tMtmGetCurrentTime(void);
143146
externvoidMtmSleep(timestamp_tinterval);
144-
externvoidMtmRecoveryCompleted(intnodeId);
145-
externvoidMtmUpdateStatus(boolrecovered);
147+
externboolMtmIsRecoveredNode(intnodeId);
146148
#endif

‎pglogical_apply.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,9 +466,7 @@ read_rel(StringInfo s, LOCKMODE mode)
466466
staticvoid
467467
process_remote_commit(StringInfos)
468468
{
469-
boolrecovered=pq_getmsgbyte(s);
470469
CommitTransactionCommand();
471-
MtmUpdateStatus(recovered);
472470
}
473471

474472
staticvoid

‎pglogical_proto.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
typedefstructPGLogicalProtoMM
4040
{
4141
PGLogicalProtoAPIapi;
42-
MtmState*state;
4342
intnodeId;
4443
boolisLocal;
4544
}PGLogicalProtoMM;
@@ -107,14 +106,15 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107106
{
108107
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
109108
csn_tcsn=MtmTransactionSnapshot(txn->xid);
109+
boolisRecovery=MtmIsRecoveredNode(mm->nodeId);
110110
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
111-
if (csn==INVALID_CSN||BIT_CHECK(mm->state->disabledNodeMask,mm->nodeId-1)) {
111+
if (csn==INVALID_CSN&& !isRecovery) {
112112
mm->isLocal= true;
113113
}else {
114114
mm->isLocal= false;
115115
pq_sendbyte(out,'B');/* BEGIN */
116116
pq_sendint(out,MtmNodeId,4);
117-
pq_sendint(out,txn->xid,4);
117+
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
118118
pq_sendint64(out,csn);
119119
}
120120
}
@@ -129,7 +129,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
129129
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
130130
if (!mm->isLocal) {
131131
pq_sendbyte(out,'C');/* sending COMMIT */
132-
pq_sendbyte(out,MtmRecoveryCompleted(mm->nodeId));
133132
}
134133
}
135134

@@ -380,7 +379,6 @@ pglogical_init_api(PGLogicalProtoType typ)
380379
PGLogicalProtoMM*pmm=palloc0(sizeof(PGLogicalProtoMM));
381380
PGLogicalProtoAPI*res=&pmm->api;
382381
pmm->isLocal= false;
383-
pmm->state=MtmGetState();
384382
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&pmm->nodeId);
385383
res->write_rel=pglogical_write_rel;
386384
res->write_begin=pglogical_write_begin;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp