Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfb67c0b

Browse files
knizhnikkelvich
authored andcommitted
Do not drop slot on end of recovery and try to explicitly specify restart position for it
1 parent4b71181 commitfb67c0b

File tree

5 files changed

+25
-5
lines changed

5 files changed

+25
-5
lines changed

‎multimaster.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,8 @@ static void MtmInitialize()
18011801
Mtm->nodes[i].con=MtmConnections[i];
18021802
Mtm->nodes[i].flushPos=0;
18031803
Mtm->nodes[i].lastHeartbeat=0;
1804+
Mtm->nodes[i].restartLsn=0;
1805+
Mtm->nodes[i].originId=InvalidRepOriginId;
18041806
}
18051807
PGSemaphoreCreate(&Mtm->votingSemaphore);
18061808
PGSemaphoreReset(&Mtm->votingSemaphore);

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ typedef struct
146146
XLogRecPtrflushPos;
147147
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
148148
XLogRecPtrrestartLsn;
149+
RepOriginIdoriginId;
149150
}MtmNodeInfo;
150151

151152
typedefstructMtmTransState

‎pglogical_apply.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,13 @@ MtmEndSession(void)
539539
staticvoid
540540
process_remote_commit(StringInfoin)
541541
{
542+
inti;
542543
uint8flags;
543544
csn_tcsn;
544545
constchar*gid=NULL;
545546
XLogRecPtrend_lsn;
546547
XLogRecPtrorigin_lsn;
548+
RepOriginIdoriginId;
547549
intn_records;
548550
/* read flags */
549551
flags=pq_getmsgbyte(in);
@@ -558,9 +560,21 @@ process_remote_commit(StringInfo in)
558560
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
559561
end_lsn=pq_getmsgint64(in);/* end_lsn */
560562
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
561-
origin_lsn=pq_getmsgint64(in);
562-
Mtm->nodes[MtmReplicationNodeId-1].restartLsn=origin_lsn;
563-
563+
564+
originId= (RepOriginId)pq_getmsgint(in,2);
565+
origin_lsn=pq_getmsgint64(in);
566+
567+
if (originId!=InvalidRepOriginId) {
568+
for (i=0;i<Mtm->nAllNodes;i++) {
569+
if (Mtm->nodes[i].originId==originId) {
570+
Mtm->nodes[i].restartLsn=origin_lsn;
571+
break;
572+
}
573+
}
574+
if (i==Mtm->nAllNodes) {
575+
elog(WARNING,"Failed to map origin %d",originId);
576+
}
577+
}
564578
Assert(replorigin_session_origin==InvalidRepOriginId);
565579

566580
switch(PGLOGICAL_XACT_EVENT(flags))

‎pglogical_proto.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
192192
pq_sendint64(out,commit_lsn);
193193
pq_sendint64(out,txn->end_lsn);
194194
pq_sendint64(out,txn->commit_time);
195+
196+
pq_sendint(out,txn->origin_id,2);
195197
pq_sendint64(out,txn->origin_lsn);
196198

197199
if (txn->xact_action==XLOG_XACT_COMMIT_PREPARED) {

‎pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ pglogical_receiver_main(Datum main_arg)
272272
}
273273

274274
query=createPQExpBuffer();
275-
#if1/* Do we need to recreate slot ? */
275+
#if0/* Do we need to recreate slot ? */
276276
if (mode==REPLMODE_RECOVERED) {/* recreate slot */
277277
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
278278
res=PQexec(conn,query->data);
@@ -305,7 +305,7 @@ pglogical_receiver_main(Datum main_arg)
305305

306306
/* Start logical replication at specified position */
307307
if (mode==REPLMODE_RECOVERED) {
308-
originStartPos=Mtm->nodes[nodeId].restartLsn;
308+
originStartPos=Mtm->nodes[nodeId-1].restartLsn;
309309
}
310310
if (originStartPos==0) {
311311
StartTransactionCommand();
@@ -325,6 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
originStartPos=replorigin_get_progress(originId, false);
326326
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
327327
}
328+
Mtm->nodes[nodeId-1].originId=originId;
328329
CommitTransactionCommand();
329330
}
330331

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp