Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitde43897

Browse files
committed
Fix various concurrency issues in logical replication worker launching
The code was originally written with assumption that launcher is theonly process starting the worker. However that hasn't been true sincecommit7c4f524 which failed to modify the worker management codeadequately.This patch adds an in_use field to the LogicalRepWorker struct toindicate whether the worker slot is being used and uses proper lockingeverywhere this flag is set or read.However if the parent process dies while the new worker is starting andthe new worker fails to attach to shared memory, this flag would neverget cleared. We solve this rare corner case by adding a sort of garbagecollector for in_use slots. This uses another field in theLogicalRepWorker struct named launch_time that contains the time whenthe worker was started. If any request to start a new worker does notfind free slot, we'll check for workers that were supposed to start buttook too long to actually do so, and reuse their slot.In passing also fix possible race conditions when stopping a worker thathasn't finished starting yet.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reported-by: Fujii Masao <masao.fujii@gmail.com>
1 parent309191f commitde43897

File tree

2 files changed

+155
-35
lines changed

2 files changed

+155
-35
lines changed

‎src/backend/replication/logical/launcher.c

Lines changed: 146 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include"replication/logicallauncher.h"
3939
#include"replication/logicalworker.h"
4040
#include"replication/slot.h"
41+
#include"replication/walreceiver.h"
4142
#include"replication/worker_internal.h"
4243

4344
#include"storage/ipc.h"
@@ -76,6 +77,7 @@ static void ApplyLauncherWakeup(void);
7677
staticvoidlogicalrep_launcher_onexit(intcode,Datumarg);
7778
staticvoidlogicalrep_worker_onexit(intcode,Datumarg);
7879
staticvoidlogicalrep_worker_detach(void);
80+
staticvoidlogicalrep_worker_cleanup(LogicalRepWorker*worker);
7981

8082
/* Flags set by signal handlers */
8183
volatilesig_atomic_tgot_SIGHUP= false;
@@ -154,34 +156,49 @@ get_subscription_list(void)
154156
/*
155157
* Wait for a background worker to start up and attach to the shmem context.
156158
*
157-
* This islike WaitForBackgroundWorkerStartup(), except that we wait for
158-
*attaching, not just start and we also just exit if postmaster died.
159+
* This isonly needed for cleaning up the shared memory in case the worker
160+
*fails to attach.
159161
*/
160-
staticbool
162+
staticvoid
161163
WaitForReplicationWorkerAttach(LogicalRepWorker*worker,
162164
BackgroundWorkerHandle*handle)
163165
{
164166
BgwHandleStatusstatus;
165167
intrc;
168+
uint16generation;
169+
170+
/* Remember generation for future identification. */
171+
generation=worker->generation;
166172

167173
for (;;)
168174
{
169175
pid_tpid;
170176

171177
CHECK_FOR_INTERRUPTS();
172178

179+
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
180+
181+
/* Worker either died or has started; no need to do anything. */
182+
if (!worker->in_use||worker->proc)
183+
{
184+
LWLockRelease(LogicalRepWorkerLock);
185+
return;
186+
}
187+
188+
LWLockRelease(LogicalRepWorkerLock);
189+
190+
/* Check if worker has died before attaching, and clean up after it. */
173191
status=GetBackgroundWorkerPid(handle,&pid);
174192

175-
/*
176-
* Worker started and attached to our shmem. This check is safe
177-
* because only launcher ever starts the workers, so nobody can steal
178-
* the worker slot.
179-
*/
180-
if (status==BGWH_STARTED&&worker->proc)
181-
return true;
182-
/* Worker didn't start or died before attaching to our shmem. */
183193
if (status==BGWH_STOPPED)
184-
return false;
194+
{
195+
LWLockAcquire(LogicalRepWorkerLock,LW_EXCLUSIVE);
196+
/* Ensure that this was indeed the worker we waited for. */
197+
if (generation==worker->generation)
198+
logicalrep_worker_cleanup(worker);
199+
LWLockRelease(LogicalRepWorkerLock);
200+
return;
201+
}
185202

186203
/*
187204
* We need timeout because we generally don't get notified via latch
@@ -197,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
197214
ResetLatch(MyLatch);
198215
}
199216

200-
return false;
217+
return;
201218
}
202219

203220
/*
@@ -216,8 +233,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
216233
for (i=0;i<max_logical_replication_workers;i++)
217234
{
218235
LogicalRepWorker*w=&LogicalRepCtx->workers[i];
219-
if (w->subid==subid&&w->relid==relid&&
220-
(!only_running||(w->proc&&IsBackendPid(w->proc->pid))))
236+
if (w->in_use&&w->subid==subid&&w->relid==relid&&
237+
(!only_running||w->proc))
221238
{
222239
res=w;
223240
break;
@@ -236,8 +253,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
236253
{
237254
BackgroundWorkerbgw;
238255
BackgroundWorkerHandle*bgw_handle;
256+
inti;
239257
intslot;
240258
LogicalRepWorker*worker=NULL;
259+
intnsyncworkers;
260+
TimestampTznow;
241261

242262
ereport(LOG,
243263
(errmsg("starting logical replication worker for subscription \"%s\"",
@@ -255,17 +275,73 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
255275
*/
256276
LWLockAcquire(LogicalRepWorkerLock,LW_EXCLUSIVE);
257277

278+
retry:
258279
/* Find unused worker slot. */
259-
for (slot=0;slot<max_logical_replication_workers;slot++)
280+
for (i=0;i<max_logical_replication_workers;i++)
260281
{
261-
if (!LogicalRepCtx->workers[slot].proc)
282+
LogicalRepWorker*w=&LogicalRepCtx->workers[i];
283+
284+
if (!w->in_use)
262285
{
263-
worker=&LogicalRepCtx->workers[slot];
286+
worker=w;
287+
slot=i;
264288
break;
265289
}
266290
}
267291

268-
/* Bail if not found */
292+
nsyncworkers=logicalrep_sync_worker_count(subid);
293+
294+
now=GetCurrentTimestamp();
295+
296+
/*
297+
* If we didn't find a free slot, try to do garbage collection. The
298+
* reason we do this is because if some worker failed to start up and its
299+
* parent has crashed while waiting, the in_use state was never cleared.
300+
*/
301+
if (worker==NULL||nsyncworkers >=max_sync_workers_per_subscription)
302+
{
303+
booldid_cleanup= false;
304+
305+
for (i=0;i<max_logical_replication_workers;i++)
306+
{
307+
LogicalRepWorker*w=&LogicalRepCtx->workers[i];
308+
309+
/*
310+
* If the worker was marked in use but didn't manage to attach in
311+
* time, clean it up.
312+
*/
313+
if (w->in_use&& !w->proc&&
314+
TimestampDifferenceExceeds(w->launch_time,now,
315+
wal_receiver_timeout))
316+
{
317+
elog(WARNING,
318+
"logical replication worker for subscription \"%d\" took too long to start; canceled",
319+
worker->subid);
320+
321+
logicalrep_worker_cleanup(w);
322+
did_cleanup= true;
323+
}
324+
}
325+
326+
if (did_cleanup)
327+
gotoretry;
328+
}
329+
330+
/*
331+
* If we reached the sync worker limit per subscription, just exit
332+
* silently as we might get here because of an otherwise harmless race
333+
* condition.
334+
*/
335+
if (nsyncworkers >=max_sync_workers_per_subscription)
336+
{
337+
LWLockRelease(LogicalRepWorkerLock);
338+
return;
339+
}
340+
341+
/*
342+
* However if there are no more free worker slots, inform user about it
343+
* before exiting.
344+
*/
269345
if (worker==NULL)
270346
{
271347
LWLockRelease(LogicalRepWorkerLock);
@@ -276,7 +352,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
276352
return;
277353
}
278354

279-
/* Prepare the worker info. */
355+
/* Prepare the worker slot. */
356+
worker->launch_time=now;
357+
worker->in_use= true;
358+
worker->generation++;
280359
worker->proc=NULL;
281360
worker->dbid=dbid;
282361
worker->userid=userid;
@@ -331,6 +410,7 @@ void
331410
logicalrep_worker_stop(Oidsubid,Oidrelid)
332411
{
333412
LogicalRepWorker*worker;
413+
uint16generation;
334414

335415
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
336416

@@ -343,11 +423,17 @@ logicalrep_worker_stop(Oid subid, Oid relid)
343423
return;
344424
}
345425

426+
/*
427+
* Remember which generation was our worker so we can check if what we see
428+
* is still the same one.
429+
*/
430+
generation=worker->generation;
431+
346432
/*
347433
* If we found worker but it does not have proc set it is starting up,
348434
* wait for it to finish and then kill it.
349435
*/
350-
while (worker&& !worker->proc)
436+
while (worker->in_use&& !worker->proc)
351437
{
352438
intrc;
353439

@@ -370,10 +456,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
370456
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
371457

372458
/*
373-
* Worker is no longer associated with subscription. It must have
374-
* exited, nothing more for us to do.
459+
* Check whether the worker slot is no longer used, which would mean
460+
* that the worker has exited, or whether the worker generation is
461+
* different, meaning that a different worker has taken the slot.
375462
*/
376-
if (worker->subid==InvalidOid)
463+
if (!worker->in_use||worker->generation!=generation)
377464
{
378465
LWLockRelease(LogicalRepWorkerLock);
379466
return;
@@ -394,7 +481,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
394481
intrc;
395482

396483
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
397-
if (!worker->proc)
484+
if (!worker->proc||worker->generation!=generation)
398485
{
399486
LWLockRelease(LogicalRepWorkerLock);
400487
break;
@@ -453,11 +540,23 @@ logicalrep_worker_attach(int slot)
453540
Assert(slot >=0&&slot<max_logical_replication_workers);
454541
MyLogicalRepWorker=&LogicalRepCtx->workers[slot];
455542

543+
if (!MyLogicalRepWorker->in_use)
544+
{
545+
LWLockRelease(LogicalRepWorkerLock);
546+
ereport(ERROR,
547+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548+
errmsg("logical replication worker slot %d is empty, cannot attach",
549+
slot)));
550+
}
551+
456552
if (MyLogicalRepWorker->proc)
553+
{
554+
LWLockRelease(LogicalRepWorkerLock);
457555
ereport(ERROR,
458556
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459-
errmsg("logical replication worker slot %d already used by "
460-
"another worker",slot)));
557+
errmsg("logical replication worker slot %d is already used by "
558+
"another worker, cannot attach",slot)));
559+
}
461560

462561
MyLogicalRepWorker->proc=MyProc;
463562
before_shmem_exit(logicalrep_worker_onexit, (Datum)0);
@@ -474,14 +573,27 @@ logicalrep_worker_detach(void)
474573
/* Block concurrent access. */
475574
LWLockAcquire(LogicalRepWorkerLock,LW_EXCLUSIVE);
476575

477-
MyLogicalRepWorker->dbid=InvalidOid;
478-
MyLogicalRepWorker->userid=InvalidOid;
479-
MyLogicalRepWorker->subid=InvalidOid;
480-
MyLogicalRepWorker->proc=NULL;
576+
logicalrep_worker_cleanup(MyLogicalRepWorker);
481577

482578
LWLockRelease(LogicalRepWorkerLock);
483579
}
484580

581+
/*
582+
* Clean up worker info.
583+
*/
584+
staticvoid
585+
logicalrep_worker_cleanup(LogicalRepWorker*worker)
586+
{
587+
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock,LW_EXCLUSIVE));
588+
589+
worker->in_use= false;
590+
worker->proc=NULL;
591+
worker->dbid=InvalidOid;
592+
worker->userid=InvalidOid;
593+
worker->subid=InvalidOid;
594+
worker->relid=InvalidOid;
595+
}
596+
485597
/*
486598
* Cleanup function for logical replication launcher.
487599
*
@@ -732,12 +844,11 @@ ApplyLauncherMain(Datum main_arg)
732844

733845
if (sub->enabled&&w==NULL)
734846
{
735-
logicalrep_worker_launch(sub->dbid,sub->oid,sub->name,
736-
sub->owner,InvalidOid);
737847
last_start_time=now;
738848
wait_time=wal_retrieve_retry_interval;
739-
/* Limit to one worker per mainloop cycle. */
740-
break;
849+
850+
logicalrep_worker_launch(sub->dbid,sub->oid,sub->name,
851+
sub->owner,InvalidOid);
741852
}
742853
}
743854

‎src/include/replication/worker_internal.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@
2121

2222
typedefstructLogicalRepWorker
2323
{
24+
/* Time at which this worker was launched. */
25+
TimestampTzlaunch_time;
26+
27+
/* Indicates if this slot is used or free. */
28+
boolin_use;
29+
30+
/* Increased everytime the slot is taken by new worker. */
31+
uint16generation;
32+
2433
/* Pointer to proc array. NULL if not running. */
2534
PGPROC*proc;
2635

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp