Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7e174fa

Browse files
committed
Only kill sync workers at commit time in subscription DDL
This allows a transaction abort to avoid killing those workers.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
1 parentff98a5e commit7e174fa

File tree

5 files changed

+117
-6
lines changed

5 files changed

+117
-6
lines changed

‎src/backend/access/transam/xact.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
22772277
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
22782278
errmsg("cannot PREPARE a transaction that has exported snapshots")));
22792279

2280+
/*
2281+
* Don't allow PREPARE but for transaction that has/might kill logical
2282+
* replication workers.
2283+
*/
2284+
if (XactManipulatesLogicalReplicationWorkers())
2285+
ereport(ERROR,
2286+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2287+
errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
2288+
22802289
/* Prevent cancel/die interrupt while cleaning up */
22812290
HOLD_INTERRUPTS();
22822291

‎src/backend/commands/subscriptioncmds.c

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
597597

598598
RemoveSubscriptionRel(sub->oid,relid);
599599

600-
logicalrep_worker_stop(sub->oid,relid);
600+
logicalrep_worker_stop_at_commit(sub->oid,relid);
601601

602602
namespace=get_namespace_name(get_rel_namespace(relid));
603603
ereport(NOTICE,
@@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
819819
char*subname;
820820
char*conninfo;
821821
char*slotname;
822+
List*subworkers;
823+
ListCell*lc;
822824
charoriginname[NAMEDATALEN];
823825
char*err=NULL;
824826
RepOriginIdoriginid;
@@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
909911

910912
ReleaseSysCache(tup);
911913

914+
/*
915+
* If we are dropping the replication slot, stop all the subscription
916+
* workers immediately, so that the slot becomes accessible. Otherwise
917+
* just schedule the stopping for the end of the transaction.
918+
*
919+
* New workers won't be started because we hold an exclusive lock on the
920+
* subscription till the end of the transaction.
921+
*/
922+
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
923+
subworkers=logicalrep_workers_find(subid, false);
924+
LWLockRelease(LogicalRepWorkerLock);
925+
foreach (lc,subworkers)
926+
{
927+
LogicalRepWorker*w= (LogicalRepWorker*)lfirst(lc);
928+
if (slotname)
929+
logicalrep_worker_stop(w->subid,w->relid);
930+
else
931+
logicalrep_worker_stop_at_commit(w->subid,w->relid);
932+
}
933+
list_free(subworkers);
934+
912935
/* Clean up dependencies */
913936
deleteSharedDependencyRecordsFor(SubscriptionRelationId,subid,0);
914937

915938
/* Remove any associated relation synchronization states. */
916939
RemoveSubscriptionRel(subid,InvalidOid);
917940

918-
/* Kill the apply worker so that the slot becomes accessible. */
919-
logicalrep_worker_stop(subid,InvalidOid);
920-
921941
/* Remove the origin tracking if exists. */
922942
snprintf(originname,sizeof(originname),"pg_%u",subid);
923943
originid=replorigin_by_name(originname, true);

‎src/backend/replication/logical/launcher.c

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
7373

7474
LogicalRepCtxStruct*LogicalRepCtx;
7575

76+
typedefstructLogicalRepWorkerId
77+
{
78+
Oidsubid;
79+
Oidrelid;
80+
}LogicalRepWorkerId;
81+
82+
staticList*on_commit_stop_workers=NIL;
83+
7684
staticvoidApplyLauncherWakeup(void);
7785
staticvoidlogicalrep_launcher_onexit(intcode,Datumarg);
7886
staticvoidlogicalrep_worker_onexit(intcode,Datumarg);
@@ -249,6 +257,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
249257
returnres;
250258
}
251259

260+
/*
261+
* Similar to logicalrep_worker_find(), but returns list of all workers for
262+
* the subscription, instead just one.
263+
*/
264+
List*
265+
logicalrep_workers_find(Oidsubid,boolonly_running)
266+
{
267+
inti;
268+
List*res=NIL;
269+
270+
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
271+
272+
/* Search for attached worker for a given subscription id. */
273+
for (i=0;i<max_logical_replication_workers;i++)
274+
{
275+
LogicalRepWorker*w=&LogicalRepCtx->workers[i];
276+
277+
if (w->in_use&&w->subid==subid&& (!only_running||w->proc))
278+
res=lappend(res,w);
279+
}
280+
281+
returnres;
282+
}
283+
252284
/*
253285
* Start new apply background worker.
254286
*/
@@ -513,6 +545,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
513545
LWLockRelease(LogicalRepWorkerLock);
514546
}
515547

548+
/*
549+
* Request worker for specified sub/rel to be stopped on commit.
550+
*/
551+
void
552+
logicalrep_worker_stop_at_commit(Oidsubid,Oidrelid)
553+
{
554+
LogicalRepWorkerId*wid;
555+
MemoryContextoldctx;
556+
557+
/* Make sure we store the info in context that survives until commit. */
558+
oldctx=MemoryContextSwitchTo(TopTransactionContext);
559+
560+
wid=palloc(sizeof(LogicalRepWorkerId));
561+
wid->subid=subid;
562+
wid->relid=relid;
563+
564+
on_commit_stop_workers=lappend(on_commit_stop_workers,wid);
565+
566+
MemoryContextSwitchTo(oldctx);
567+
}
568+
516569
/*
517570
* Wake up (using latch) any logical replication worker for specified sub/rel.
518571
*/
@@ -753,15 +806,41 @@ ApplyLauncherShmemInit(void)
753806
}
754807
}
755808

809+
/*
810+
* Check whether current transaction has manipulated logical replication
811+
* workers.
812+
*/
813+
bool
814+
XactManipulatesLogicalReplicationWorkers(void)
815+
{
816+
return (on_commit_stop_workers!=NIL);
817+
}
818+
756819
/*
757820
* Wakeup the launcher on commit if requested.
758821
*/
759822
void
760823
AtEOXact_ApplyLauncher(boolisCommit)
761824
{
762-
if (isCommit&&on_commit_launcher_wakeup)
763-
ApplyLauncherWakeup();
825+
if (isCommit)
826+
{
827+
ListCell*lc;
764828

829+
foreach (lc,on_commit_stop_workers)
830+
{
831+
LogicalRepWorkerId*wid=lfirst(lc);
832+
logicalrep_worker_stop(wid->subid,wid->relid);
833+
}
834+
835+
if (on_commit_launcher_wakeup)
836+
ApplyLauncherWakeup();
837+
}
838+
839+
/*
840+
* No need to pfree on_commit_stop_workers. It was allocated in
841+
* transaction memory context, which is going to be cleaned soon.
842+
*/
843+
on_commit_stop_workers=NIL;
765844
on_commit_launcher_wakeup= false;
766845
}
767846

‎src/include/replication/logicallauncher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
2222
externvoidApplyLauncherShmemInit(void);
2323

2424
externvoidApplyLauncherWakeupAtCommit(void);
25+
externboolXactManipulatesLogicalReplicationWorkers(void);
2526
externvoidAtEOXact_ApplyLauncher(boolisCommit);
2627

2728
externboolIsLogicalLauncher(void);

‎src/include/replication/worker_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,11 @@ extern bool in_remote_transaction;
7171
externvoidlogicalrep_worker_attach(intslot);
7272
externLogicalRepWorker*logicalrep_worker_find(Oidsubid,Oidrelid,
7373
boolonly_running);
74+
externList*logicalrep_workers_find(Oidsubid,boolonly_running);
7475
externvoidlogicalrep_worker_launch(Oiddbid,Oidsubid,constchar*subname,
7576
Oiduserid,Oidrelid);
7677
externvoidlogicalrep_worker_stop(Oidsubid,Oidrelid);
78+
externvoidlogicalrep_worker_stop_at_commit(Oidsubid,Oidrelid);
7779
externvoidlogicalrep_worker_wakeup(Oidsubid,Oidrelid);
7880
externvoidlogicalrep_worker_wakeup_ptr(LogicalRepWorker*worker);
7981

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp