Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3d144c6

Browse files
author
Amit Kapila
committed
Fix invalid memory access during the shutdown of the parallel apply worker.
The callback function pa_shutdown() accesses MyLogicalRepWorker which maynot be initialized if there is an error during the initialization of theparallel apply worker. The other problem is that by the time it is invokedeven after the initialization of the worker, the MyLogicalRepWorker willbe reset by another callback logicalrep_worker_onexit. So, it won't havethe required information.To fix this, register the shutdown callback after we are attached to theworker slot.After this fix, we observed another issue which is that sometimes theleader apply worker tries to receive the message from the error queue thatmight already be detached by the parallel apply worker leading to anerror. To prevent such an error, we ensure that the leader apply workerdetaches from the parallel apply worker's error queue before stopping it.Reported-by: Sawada MasahikoAuthor: Hou ZhijieReviewed-by: Sawada Masahiko, Amit KapilaDiscussion:https://postgr.es/m/CAD21AoDo+yUwNq6nTrvE2h9bB2vZfcag=jxWc7QxuWCmkDAqcA@mail.gmail.com
1 parent455f948 commit3d144c6

File tree

3 files changed

+38
-18
lines changed

3 files changed

+38
-18
lines changed

‎src/backend/replication/logical/applyparallelworker.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
577577
list_length(ParallelApplyWorkerPool)>
578578
(max_parallel_apply_workers_per_subscription /2))
579579
{
580-
intslot_no;
581-
uint16generation;
582-
583-
SpinLockAcquire(&winfo->shared->mutex);
584-
generation=winfo->shared->logicalrep_worker_generation;
585-
slot_no=winfo->shared->logicalrep_worker_slot_no;
586-
SpinLockRelease(&winfo->shared->mutex);
587-
588-
logicalrep_pa_worker_stop(slot_no,generation);
589-
580+
logicalrep_pa_worker_stop(winfo);
590581
pa_free_worker_info(winfo);
591582

592583
return;
@@ -636,8 +627,11 @@ pa_detach_all_error_mq(void)
636627
{
637628
ParallelApplyWorkerInfo*winfo= (ParallelApplyWorkerInfo*)lfirst(lc);
638629

639-
shm_mq_detach(winfo->error_mq_handle);
640-
winfo->error_mq_handle=NULL;
630+
if (winfo->error_mq_handle)
631+
{
632+
shm_mq_detach(winfo->error_mq_handle);
633+
winfo->error_mq_handle=NULL;
634+
}
641635
}
642636
}
643637

@@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
845839
* Make sure the leader apply worker tries to read from our error queue one more
846840
* time. This guards against the case where we exit uncleanly without sending
847841
* an ErrorResponse, for example because some code calls proc_exit directly.
842+
*
843+
* Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
844+
* if any. See ParallelWorkerShutdown for details.
848845
*/
849846
staticvoid
850847
pa_shutdown(intcode,Datumarg)
@@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg)
901898
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
902899
errmsg("bad magic number in dynamic shared memory segment")));
903900

904-
before_shmem_exit(pa_shutdown,PointerGetDatum(seg));
905-
906901
/* Look up the shared information. */
907902
shared=shm_toc_lookup(toc,PARALLEL_APPLY_KEY_SHARED, false);
908903
MyParallelShared=shared;
@@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg)
921916
*/
922917
logicalrep_worker_attach(worker_slot);
923918

919+
/*
920+
* Register the shutdown callback after we are attached to the worker
921+
* slot. This is to ensure that MyLogicalRepWorker remains valid when this
922+
* callback is invoked.
923+
*/
924+
before_shmem_exit(pa_shutdown,PointerGetDatum(seg));
925+
924926
SpinLockAcquire(&MyParallelShared->mutex);
925927
MyParallelShared->logicalrep_worker_generation=MyLogicalRepWorker->generation;
926928
MyParallelShared->logicalrep_worker_slot_no=worker_slot;

‎src/backend/replication/logical/launcher.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid)
609609
}
610610

611611
/*
612-
* Stop the logical replication parallel apply worker corresponding to the
613-
* input slot number.
612+
* Stop the given logical replication parallel apply worker.
614613
*
615614
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
616615
* worker so that the worker exits cleanly.
617616
*/
618617
void
619-
logicalrep_pa_worker_stop(intslot_no,uint16generation)
618+
logicalrep_pa_worker_stop(ParallelApplyWorkerInfo*winfo)
620619
{
620+
intslot_no;
621+
uint16generation;
621622
LogicalRepWorker*worker;
622623

624+
SpinLockAcquire(&winfo->shared->mutex);
625+
generation=winfo->shared->logicalrep_worker_generation;
626+
slot_no=winfo->shared->logicalrep_worker_slot_no;
627+
SpinLockRelease(&winfo->shared->mutex);
628+
623629
Assert(slot_no >=0&&slot_no<max_logical_replication_workers);
624630

631+
/*
632+
* Detach from the error_mq_handle for the parallel apply worker before
633+
* stopping it. This prevents the leader apply worker from trying to
634+
* receive the message from the error queue that might already be detached
635+
* by the parallel apply worker.
636+
*/
637+
if (winfo->error_mq_handle)
638+
{
639+
shm_mq_detach(winfo->error_mq_handle);
640+
winfo->error_mq_handle=NULL;
641+
}
642+
625643
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
626644

627645
worker=&LogicalRepCtx->workers[slot_no];

‎src/include/replication/worker_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
235235
Oiduserid,Oidrelid,
236236
dsm_handlesubworker_dsm);
237237
externvoidlogicalrep_worker_stop(Oidsubid,Oidrelid);
238-
externvoidlogicalrep_pa_worker_stop(intslot_no,uint16generation);
238+
externvoidlogicalrep_pa_worker_stop(ParallelApplyWorkerInfo*winfo);
239239
externvoidlogicalrep_worker_wakeup(Oidsubid,Oidrelid);
240240
externvoidlogicalrep_worker_wakeup_ptr(LogicalRepWorker*worker);
241241

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp