Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbc9dc44

Browse files
committed
Set proper restartLsn for all origins on start
1 parentbe41202 commitbc9dc44

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

‎pglogical_receiver.c

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,14 @@ pglogical_receiver_main(Datum main_arg)
217217
MtmReplicationModemode;
218218

219219
ByteBufferbuf;
220-
RepOriginIdoriginId;
221-
char*originName;
222220
/* Buffer for COPY data */
223221
char*copybuf=NULL;
224222
intspill_file=-1;
225223
StringInfoDataspill_info;
226224
char*slotName;
227225
char*connString=psprintf("replication=database %s",Mtm->nodes[nodeId-1].con.connStr);
228226
staticPortalDatafakePortal;
227+
inti;
229228

230229
MtmBackgroundWorker= true;
231230

@@ -258,16 +257,27 @@ pglogical_receiver_main(Datum main_arg)
258257
ActivePortal->status=PORTAL_ACTIVE;
259258
ActivePortal->sourceText="";
260259

261-
/* Create originid */
262-
StartTransactionCommand();
263-
originName=psprintf(MULTIMASTER_SLOT_PATTERN,nodeId);
264-
originId=replorigin_by_name(originName, true);
265-
if (originId==InvalidRepOriginId) {
266-
originId=replorigin_create(originName);
260+
/*
261+
* Set proper restartLsn for all origins
262+
*/
263+
MtmLock(LW_EXCLUSIVE);
264+
for (i=0;i<Mtm->nAllNodes;i++)
265+
{
266+
char*originName;
267+
RepOriginIdoriginId;
268+
269+
StartTransactionCommand();
270+
originName=psprintf(MULTIMASTER_SLOT_PATTERN,i+1);
271+
originId=replorigin_by_name(originName, true);
272+
if (originId==InvalidRepOriginId) {
273+
originId=replorigin_create(originName);
274+
}
275+
CommitTransactionCommand();
276+
if (Mtm->nodes[i].restartLSN==INVALID_LSN)
277+
Mtm->nodes[i].restartLSN=replorigin_get_progress(originId, true);
278+
Mtm->nodes[i].originId=originId;
267279
}
268-
CommitTransactionCommand();
269-
Mtm->nodes[nodeId-1].originId=originId;
270-
Mtm->nodes[nodeId-1].restartLSN=INVALID_LSN;
280+
MtmUnlock();
271281

272282
/* This is main loop of logical replication.
273283
* In case of errors we will try to reestablish connection.
@@ -277,7 +287,7 @@ pglogical_receiver_main(Datum main_arg)
277287
{
278288
intcount;
279289
ConnStatusTypestatus;
280-
lsn_toriginStartPos=Mtm->nodes[nodeId-1].restartLSN;
290+
lsn_toriginStartPos;
281291
inttimeline;
282292

283293
/*
@@ -308,7 +318,7 @@ pglogical_receiver_main(Datum main_arg)
308318
query=createPQExpBuffer();
309319

310320
/* Start logical replication at specified position */
311-
originStartPos=replorigin_get_progress(originId, false);
321+
originStartPos=replorigin_get_progress(Mtm->nodes[nodeId-1].originId, false);
312322
if (originStartPos==INVALID_LSN||Mtm->nodes[nodeId-1].manualRecovery) {
313323
/*
314324
* We are just creating new replication slot.
@@ -337,7 +347,7 @@ pglogical_receiver_main(Datum main_arg)
337347
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
338348
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
339349
}
340-
MTM_LOG1("Restart logical receiver at position %llxwith origin=%dfrom node %d",originStartPos,originId,nodeId);
350+
MTM_LOG1("Restart logical receiver at position %llx from node %d",originStartPos,nodeId);
341351
}
342352

343353
MTM_LOG1("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx",

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp