Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6c06fdd

Browse files
Amit Kapilapull[bot]
Amit Kapila
authored andcommitted
Make the tablesync worker's replication origin drop logic robust.
In commitf6c5edb, we started to drop the replication origin slotsbefore tablesync worker exits to avoid consuming more slots than required.We were dropping the replication origin in the same transaction where wewere marking the tablesync state as SYNCDONE. Now, if there is any errorafter we have dropped the origin but before we commit the containingtransaction, the in-memory state of replication progress won't be rolledback. Due to this, after the restart, tablesync worker can start streamingfrom the wrong location and can apply the already processed transaction.To fix this, we need to opportunistically drop the origin after markingthe tablesync state as SYNCDONE. Even, if the tablesync worker fails toremove the replication origin before exit, the apply worker ensures toclean it up afterward.Reported by Tom Lane as per buildfarm.Diagnosed-by: Masahiko SawadaAuthor: Hou ZhijieReviewed-By: Masahiko Sawada, Amit KapilaDiscussion:https://postgr.es/m/20220714115155.GA5439@depesz.comDiscussion:https://postgr.es/m/CAD21AoAw0Oofi4kiDpJBOwpYyBBBkJj=sLUOn4Gd2GjUAKG-fw@mail.gmail.com
1 parentca62947 commit6c06fdd

File tree

2 files changed

+75
-41
lines changed

2 files changed

+75
-41
lines changed

‎src/backend/commands/subscriptioncmds.c

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -931,19 +931,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
931931
logicalrep_worker_stop(sub->oid,relid);
932932

933933
/*
934-
* For READY state and SYNCDONE state, we would have already
935-
*dropped thetablesync origin.
934+
* For READY state, we would have already dropped the
935+
* tablesync origin.
936936
*/
937-
if (state!=SUBREL_STATE_READY&&state!=SUBREL_STATE_SYNCDONE)
937+
if (state!=SUBREL_STATE_READY)
938938
{
939939
charoriginname[NAMEDATALEN];
940940

941941
/*
942942
* Drop the tablesync's origin tracking if exists.
943943
*
944944
* It is possible that the origin is not yet created for
945-
* tablesync worker so passing missing_ok = true. This can
946-
* happen for the states before SUBREL_STATE_FINISHEDCOPY.
945+
* tablesync worker, this can happen for the states before
946+
* SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
947+
* apply worker can also concurrently try to drop the
948+
* origin and by this time the origin might be already
949+
* removed. For these reasons, passing missing_ok = true.
947950
*/
948951
ReplicationOriginNameForTablesync(sub->oid,relid,originname,
949952
sizeof(originname));
@@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
15161519
/*
15171520
* Drop the tablesync's origin tracking if exists.
15181521
*
1519-
* For SYNCDONE/READY states, the tablesync origin tracking is known
1520-
* to have already been dropped by the tablesync worker.
1521-
*
15221522
* It is possible that the origin is not yet created for tablesync
15231523
* worker so passing missing_ok = true. This can happen for the states
15241524
* before SUBREL_STATE_FINISHEDCOPY.
15251525
*/
1526-
if (rstate->state!=SUBREL_STATE_SYNCDONE)
1527-
{
1528-
ReplicationOriginNameForTablesync(subid,relid,originname,
1529-
sizeof(originname));
1530-
replorigin_drop_by_name(originname, true, false);
1531-
}
1526+
ReplicationOriginNameForTablesync(subid,relid,originname,
1527+
sizeof(originname));
1528+
replorigin_drop_by_name(originname, true, false);
15321529
}
15331530

15341531
/* Clean up dependencies */

‎src/backend/replication/logical/tablesync.c

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
300300

301301
/*
302302
* UpdateSubscriptionRelState must be called within a transaction.
303-
* That transaction will be ended within the finish_sync_worker().
304303
*/
305304
if (!IsTransactionState())
306305
StartTransactionCommand();
@@ -310,30 +309,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
310309
MyLogicalRepWorker->relstate,
311310
MyLogicalRepWorker->relstate_lsn);
312311

313-
/*
314-
* Cleanup the tablesync origin tracking.
315-
*
316-
* Resetting the origin session removes the ownership of the slot.
317-
* This is needed to allow the origin to be dropped.
318-
*/
319-
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
320-
MyLogicalRepWorker->relid,
321-
originname,
322-
sizeof(originname));
323-
replorigin_session_reset();
324-
replorigin_session_origin=InvalidRepOriginId;
325-
replorigin_session_origin_lsn=InvalidXLogRecPtr;
326-
replorigin_session_origin_timestamp=0;
327-
328-
/*
329-
* We expect that origin must be present. The concurrent operations
330-
* that remove origin like a refresh for the subscription take an
331-
* access exclusive lock on pg_subscription which prevent the previous
332-
* operation to update the rel state to SUBREL_STATE_SYNCDONE to
333-
* succeed.
334-
*/
335-
replorigin_drop_by_name(originname, false, false);
336-
337312
/*
338313
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
339314
* the slot.
@@ -343,7 +318,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
343318
/*
344319
* Cleanup the tablesync slot.
345320
*
346-
* This has to be done after thedata changes because otherwise if
321+
* This has to be done afterupdatingthestate because otherwise if
347322
* there is an error while doing the database operations we won't be
348323
* able to rollback dropped slot.
349324
*/
@@ -359,6 +334,49 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
359334
*/
360335
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn,syncslotname, false);
361336

337+
CommitTransactionCommand();
338+
pgstat_report_stat(false);
339+
340+
/*
341+
* Start a new transaction to clean up the tablesync origin tracking.
342+
* This transaction will be ended within the finish_sync_worker().
343+
* Now, even, if we fail to remove this here, the apply worker will
344+
* ensure to clean it up afterward.
345+
*
346+
* We need to do this after the table state is set to SYNCDONE.
347+
* Otherwise, if an error occurs while performing the database
348+
* operation, the worker will be restarted and the in-memory state of
349+
* replication progress (remote_lsn) won't be rolled-back which would
350+
* have been cleared before restart. So, the restarted worker will use
351+
* invalid replication progress state resulting in replay of
352+
* transactions that have already been applied.
353+
*/
354+
StartTransactionCommand();
355+
356+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
357+
MyLogicalRepWorker->relid,
358+
originname,
359+
sizeof(originname));
360+
361+
/*
362+
* Resetting the origin session removes the ownership of the slot.
363+
* This is needed to allow the origin to be dropped.
364+
*/
365+
replorigin_session_reset();
366+
replorigin_session_origin=InvalidRepOriginId;
367+
replorigin_session_origin_lsn=InvalidXLogRecPtr;
368+
replorigin_session_origin_timestamp=0;
369+
370+
/*
371+
* Drop the tablesync's origin tracking if exists.
372+
*
373+
* There is a chance that the user is concurrently performing refresh
374+
* for the subscription where we remove the table state and its origin
375+
* or the apply worker would have removed this origin. So passing
376+
* missing_ok = true.
377+
*/
378+
replorigin_drop_by_name(originname, true, false);
379+
362380
finish_sync_worker();
363381
}
364382
else
@@ -466,6 +484,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
466484
*/
467485
if (current_lsn >=rstate->lsn)
468486
{
487+
charoriginname[NAMEDATALEN];
488+
469489
rstate->state=SUBREL_STATE_READY;
470490
rstate->lsn=current_lsn;
471491
if (!started_tx)
@@ -475,7 +495,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
475495
}
476496

477497
/*
478-
* Update the state to READY.
498+
* Remove the tablesync origin tracking if exists.
499+
*
500+
* There is a chance that the user is concurrently performing
501+
* refresh for the subscription where we remove the table
502+
* state and its origin or the tablesync worker would have
503+
* already removed this origin. We can't rely on tablesync
504+
* worker to remove the origin tracking as if there is any
505+
* error while dropping we won't restart it to drop the
506+
* origin. So passing missing_ok = true.
507+
*/
508+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
509+
rstate->relid,
510+
originname,
511+
sizeof(originname));
512+
replorigin_drop_by_name(originname, true, false);
513+
514+
/*
515+
* Update the state to READY only after the origin cleanup.
479516
*/
480517
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
481518
rstate->relid,rstate->state,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp