Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc83f738

Browse files
knizhnikkelvich
authored andcommitted
Switch from recovery to normal mode
1 parent31594e5 commitc83f738

File tree

5 files changed

+86
-15
lines changed

5 files changed

+86
-15
lines changed

‎multimaster.c

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,19 @@ typedef struct {
6060
boolisReplicated;/* transaction on replica */
6161
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6262
boolcontainsDML;/* transaction contains DML statements */
63+
boolisPrepared;/* transaction was prepared for commit */
6364
csn_tsnapshot;/* transaction snaphsot */
6465
}MtmCurrentTrans;
6566

6667
/* #define USE_SPINLOCK 1 */
6768

69+
typedefenum
70+
{
71+
HASH_LOCK_ID,
72+
COMMIT_LOCK_ID,
73+
N_LOCKS
74+
}MtmLockIds;
75+
6876
#defineMTM_SHMEM_SIZE (64*1024*1024)
6977
#defineMTM_HASH_SIZE 100003
7078
#defineUSEC 1000000
@@ -150,7 +158,7 @@ void MtmLock(LWLockMode mode)
150158
#ifdefUSE_SPINLOCK
151159
SpinLockAcquire(&dtm->spinlock);
152160
#else
153-
LWLockAcquire(dtm->hashLock,mode);
161+
LWLockAcquire(dtm->locks[HASH_LOCK_ID],mode);
154162
#endif
155163
}
156164

@@ -159,7 +167,7 @@ void MtmUnlock(void)
159167
#ifdefUSE_SPINLOCK
160168
SpinLockRelease(&dtm->spinlock);
161169
#else
162-
LWLockRelease(dtm->hashLock);
170+
LWLockRelease(dtm->locks[HASH_LOCK_ID]);
163171
#endif
164172
}
165173

@@ -412,7 +420,7 @@ static void MtmInitialize()
412420
{
413421
dtm->status=MTM_INITIALIZATION;
414422
dtm->recoverySlot=0;
415-
dtm->hashLock=(LWLock*)GetNamedLWLockTranche(MULTIMASTER_NAME);
423+
dtm->locks=GetNamedLWLockTranche(MULTIMASTER_NAME);
416424
dtm->csn=MtmGetCurrentTime();
417425
dtm->oldestXid=FirstNormalTransactionId;
418426
dtm->nNodes=MtmNodes;
@@ -423,6 +431,7 @@ static void MtmInitialize()
423431
dtm->transListTail=&dtm->transListHead;
424432
dtm->nReceivers=0;
425433
dtm->timeShift=0;
434+
pg_atomic_write_u32(&dtm->nCommittingTrans,0);
426435
PGSemaphoreCreate(&dtm->votingSemaphore);
427436
PGSemaphoreReset(&dtm->votingSemaphore);
428437
SpinLockInit(&dtm->spinlock);
@@ -467,6 +476,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
467476
x->xid=GetCurrentTransactionIdIfAny();
468477
x->isReplicated= false;
469478
x->isDistributed=IsNormalProcessingMode()&&dtm->status==MTM_ONLINE&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
479+
x->isPrepared= false;
470480
x->containsDML= false;
471481
x->snapshot=MtmAssignCSN();
472482
x->gtid.xid=InvalidTransactionId;
@@ -476,6 +486,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
476486
}
477487
}
478488

489+
479490
/*
480491
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
481492
* before commit
@@ -488,8 +499,14 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
488499
if (!x->isDistributed) {
489500
return;
490501
}
491-
x->xid=GetCurrentTransactionId();
502+
/* Check that commits are not disabled */
503+
LWLockAcquire(dtm->locks[COMMIT_LOCK_ID],LW_SHARED);
504+
LWLockRelease(dtm->locks[COMMIT_LOCK_ID]);
492505

506+
pg_atomic_fetch_add_u32(dtm->nCommittingTransactions,1);
507+
x->isPrepared= true;
508+
x->xid=GetCurrentTransactionId();
509+
493510
MtmLock(LW_EXCLUSIVE);
494511
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
495512
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
@@ -500,6 +517,7 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
500517
ts->procno=MyProc->pgprocno;
501518
ts->nVotes=0;
502519
ts->done= false;
520+
503521
if (TransactionIdIsValid(x->gtid.xid)) {
504522
ts->gtid=x->gtid;
505523
}else {
@@ -528,6 +546,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
528546
MtmAdjustSubtransactions(ts);
529547
MtmUnlock();
530548
}
549+
if (x->isPrepared) {
550+
pg_atomic_fetch_add_u32(dtm->nCommittingTransactions,-1);
551+
}
531552
x->snapshot=INVALID_CSN;
532553
x->xid=InvalidTransactionId;
533554
x->gtid.xid=InvalidTransactionId;
@@ -547,6 +568,39 @@ void MtmSendNotificationMessage(MtmTransState* ts)
547568
}
548569
}
549570

571+
voidMtmUpdateStatus(boolrecovered)
572+
{
573+
if (dtm->status==MTM_RECOVERY) {
574+
MtmLock(LW_EXCLUSIVE);
575+
dtm->status=MTM_ONLINE;/* Is it all we shoudl do t switch to nortmal state */
576+
MtmUnlock();
577+
}
578+
}
579+
580+
voidMtmRecoveryCompleted(intnodeId)
581+
{
582+
if (BIT_CHECK(dtm->pglogicalNodeMask,nodeId-1)) {
583+
if (MyWalSnd->sentPtr==GetXLogInsertRecPtr()) {
584+
/* Ok, now we done with recovery of node */
585+
MtmLock(LW_EXCLUSIVE);
586+
dtm->pglogicalNodeMask &= (int64)1 << (nodeId-1);/* now node is assumed as recovered */
587+
dtm->nNodes+=1;
588+
MtmUnlock();
589+
590+
LWLockRelease(dtm->locks[COMMIT_LOCK_ID]);/* enable commits */
591+
592+
return true;
593+
}elseif (MyWalSnd->sentPtr+MtmSlotDelayThreashold>GetXLogInsertRecPtr()) {
594+
/* we almost done with recovery of node.. */
595+
LWLockAcquire(dtm->locks[COMMIT_LOCK_ID],LW_EXCLUSIVE);/* disable new commits */
596+
}
597+
return false;
598+
}else {
599+
return true;
600+
}
601+
}
602+
603+
550604
staticbool
551605
MtmCommitTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids)
552606
{
@@ -803,7 +857,7 @@ _PG_init(void)
803857
* resources in mtm_shmem_startup().
804858
*/
805859
RequestAddinShmemSpace(MTM_SHMEM_SIZE+MtmQueueSize);
806-
RequestNamedLWLockTranche(MULTIMASTER_NAME,1);
860+
RequestNamedLWLockTranche(MULTIMASTER_NAME,N_LOCKS);
807861

808862
MtmNodes=MtmStartReceivers(MtmConnStrs,MtmNodeId);
809863
if (MtmNodes<2) {

‎multimaster.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414

1515
#defineBIT_CHECK(mask,bit) ((mask) & ((int64)1 << (bit)))
1616

17-
#defineMULTIMASTER_NAME "mtm"
18-
#defineMULTIMASTER_SCHEMA_NAME "mtm"
19-
#defineMULTIMASTER_DDL_TABLE "ddl_log"
20-
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
17+
#defineMULTIMASTER_NAME "mtm"
18+
#defineMULTIMASTER_SCHEMA_NAME "mtm"
19+
#defineMULTIMASTER_DDL_TABLE "ddl_log"
20+
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
21+
#defineMULTIMASTER_MIN_PROTO_VERSION 1
22+
#defineMULTIMASTER_MAX_PROTO_VERSION 1
2123

2224
#defineNatts_mtm_ddl_log 2
2325
#defineAnum_mtm_ddl_log_issued1
@@ -91,14 +93,15 @@ typedef struct
9193
intrecoverySlot;/* NodeId of recovery slot or 0 if none */
9294
volatileslock_tspinlock;/* spinlock used to protect access to hash table */
9395
PGSemaphoreDatavotingSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
94-
LWLockIdhashLock;/* lockto synchronize access to hash table */
96+
LWLockPadded*locks;/*multimasterlocktranche */
9597
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
9698
int64disabledNodeMask;/* bitmask of disabled nodes (so no more than 64 nodes in multimaster:) */
9799
int64pglogicalNodeMask;/* bitmask of started pglogic receviers */
98100
intnNodes;/* number of active nodes */
99101
intnReceivers;/* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
100102
longtimeShift;/* local time correction */
101103
csn_tcsn;/* last obtained CSN: used to provide unique acending CSNs based on system time */
104+
pg_atomic_uint32nCommittingTrans;/* nubmer of transactions i process of commit */
102105
MtmTransState*votingTransactions;/* L1-list of replicated transactions sendings notifications to coordinator.
103106
This list is used to pass information to mtm-sender BGW */
104107
MtmTransState*transListHead;/* L1 list of all finished transactions present in xid2state hash.
@@ -137,6 +140,7 @@ extern void MtmUnlock(void);
137140
externvoidMtmDropNode(intnodeId,booldropSlot);
138141
externMtmState*MtmGetState(void);
139142
externtimestamp_tMtmGetCurrentTime(void);
140-
externvoidMtmSleep(timestamp_tinterval);
141-
143+
externvoidMtmSleep(timestamp_tinterval);
144+
externvoidMtmRecoveryCompleted(intnodeId);
145+
externvoidMtmUpdateStatus(boolrecovered);
142146
#endif

‎pglogical_apply.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,9 @@ read_rel(StringInfo s, LOCKMODE mode)
466466
staticvoid
467467
process_remote_commit(StringInfos)
468468
{
469+
boolrecovered=pq_getmsgbyte(s);
469470
CommitTransactionCommand();
471+
MtmUpdateStatus(recovered);
470472
}
471473

472474
staticvoid

‎pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
129129
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
130130
if (!mm->isLocal) {
131131
pq_sendbyte(out,'C');/* sending COMMIT */
132+
pq_sendbyte(out,MtmRecoveryCompleted(mm->nodeId));
132133
}
133134
}
134135

‎pglogical_receiver.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include"utils/guc.h"
3232
#include"utils/snapmgr.h"
3333
#include"executor/spi.h"
34+
#include"replication/origin.h"
3435

3536
#include"multimaster.h"
3637

@@ -203,10 +204,12 @@ pglogical_receiver_main(Datum main_arg)
203204
PGconn*conn;
204205
PGresult*res;
205206
MtmSlotModemode;
207+
MtmState*ds;
206208
#ifndefUSE_PGLOGICAL_OUTPUT
207209
boolinsideTrans= false;
208210
#endif
209211
ByteBufferbuf;
212+
XLogRecPtroriginStartPos;
210213

211214
/* Register functions for SIGTERM/SIGHUP management */
212215
pqsignal(SIGHUP,receiver_raw_sighup);
@@ -258,8 +261,14 @@ pglogical_receiver_main(Datum main_arg)
258261
resetPQExpBuffer(query);
259262
}
260263
/* Start logical replication at specified position */
261-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL 0/0 (\"startup_params_format\" '1', \"max_proto_version\" '1', \"min_proto_version\" '1')",
262-
args->receiver_slot);
264+
originStartPos=replorigin_session_get_progress(false);
265+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
266+
args->receiver_slot,
267+
(uint32) (originStartPos >>32),
268+
(uint32)originStartPos,
269+
MULTIMASTER_MAX_PROTO_VERSION,
270+
MULTIMASTER_MIN_PROTO_VERSION
271+
);
263272
res=PQexec(conn,query->data);
264273
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
265274
{
@@ -273,6 +282,7 @@ pglogical_receiver_main(Datum main_arg)
273282

274283
MtmReceiverStarted(args->receiver_node);
275284
ByteBufferAlloc(&buf);
285+
ds=MtmGetState();
276286

277287
while (!got_sigterm)
278288
{
@@ -366,7 +376,7 @@ pglogical_receiver_main(Datum main_arg)
366376
* If sync mode is sent reply in all cases to ensure that
367377
* server knows how far replay has been done.
368378
*/
369-
if (replyRequested||receiver_sync_mode)
379+
if (replyRequested||receiver_sync_mode||ds->status==MTM_RECOVERY)
370380
{
371381
int64now=feGetCurrentTimestamp();
372382

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp