Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1f201a8

Browse files
committed
Fix race conditions and missed wakeups in syncrep worker signaling.
When a sync worker is waiting for the associated apply worker to noticethat it's in SYNCWAIT state, wait_for_worker_state_change() would justpatiently wait for that to happen. This generally required waiting forthe 1-second timeout in LogicalRepApplyLoop to elapse. Kicking the workervia its latch makes things significantly snappier.While at it, fix race conditions that could potentially result in crashes:we can *not* call logicalrep_worker_wakeup_ptr() once we've released theLogicalRepWorkerLock, because worker->proc might've been reset to NULLafter we do that (indeed, there's no really solid reason to believe thatthe LogicalRepWorker slot even belongs to the same worker anymore).In logicalrep_worker_wakeup(), we can just move the wakeup inside thelock scope. In process_syncing_tables_for_apply(), a bit more coderearrangement is needed.Also improve some nearby comments.
1 parent1db49c3 commit1f201a8

File tree

2 files changed

+100
-68
lines changed

2 files changed

+100
-68
lines changed

‎src/backend/replication/logical/launcher.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -515,27 +515,33 @@ logicalrep_worker_stop(Oid subid, Oid relid)
515515
}
516516

517517
/*
518-
* Wake up (using latch)the logical replication worker.
518+
* Wake up (using latch)any logical replication worker for specified sub/rel.
519519
*/
520520
void
521521
logicalrep_worker_wakeup(Oidsubid,Oidrelid)
522522
{
523523
LogicalRepWorker*worker;
524524

525525
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
526+
526527
worker=logicalrep_worker_find(subid,relid, true);
527-
LWLockRelease(LogicalRepWorkerLock);
528528

529529
if (worker)
530530
logicalrep_worker_wakeup_ptr(worker);
531+
532+
LWLockRelease(LogicalRepWorkerLock);
531533
}
532534

533535
/*
534-
* Wake up (using latch) the logical replication worker.
536+
* Wake up (using latch) the specified logical replication worker.
537+
*
538+
* Caller must hold lock, else worker->proc could change under us.
535539
*/
536540
void
537541
logicalrep_worker_wakeup_ptr(LogicalRepWorker*worker)
538542
{
543+
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
544+
539545
SetLatch(&worker->proc->procLatch);
540546
}
541547

‎src/backend/replication/logical/tablesync.c

Lines changed: 91 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
212212
*
213213
* Used when transitioning from SYNCWAIT state to CATCHUP.
214214
*
215-
* Returns false if the apply worker has disappeared or the table state has been
216-
* reset.
215+
* Returns false if the apply worker has disappeared.
217216
*/
218217
staticbool
219218
wait_for_worker_state_change(charexpected_state)
@@ -226,17 +225,30 @@ wait_for_worker_state_change(char expected_state)
226225

227226
CHECK_FOR_INTERRUPTS();
228227

229-
/* Bail if the apply has died. */
228+
/*
229+
* Done if already in correct state. (We assume this fetch is atomic
230+
* enough to not give a misleading answer if we do it with no lock.)
231+
*/
232+
if (MyLogicalRepWorker->relstate==expected_state)
233+
return true;
234+
235+
/*
236+
* Bail out if the apply worker has died, else signal it we're
237+
* waiting.
238+
*/
230239
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
231240
worker=logicalrep_worker_find(MyLogicalRepWorker->subid,
232241
InvalidOid, false);
242+
if (worker&&worker->proc)
243+
logicalrep_worker_wakeup_ptr(worker);
233244
LWLockRelease(LogicalRepWorkerLock);
234245
if (!worker)
235-
return false;
236-
237-
if (MyLogicalRepWorker->relstate==expected_state)
238-
return true;
246+
break;
239247

248+
/*
249+
* Wait. We expect to get a latch signal back from the apply worker,
250+
* but use a timeout in case it dies without sending one.
251+
*/
240252
rc=WaitLatch(MyLatch,
241253
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,
242254
1000L,WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
@@ -245,7 +257,8 @@ wait_for_worker_state_change(char expected_state)
245257
if (rc&WL_POSTMASTER_DEATH)
246258
proc_exit(1);
247259

248-
ResetLatch(MyLatch);
260+
if (rc&WL_LATCH_SET)
261+
ResetLatch(MyLatch);
249262
}
250263

251264
return false;
@@ -422,83 +435,96 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
422435
else
423436
{
424437
LogicalRepWorker*syncworker;
425-
intnsyncworkers=0;
426438

439+
/*
440+
* Look for a sync worker for this relation.
441+
*/
427442
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
443+
428444
syncworker=logicalrep_worker_find(MyLogicalRepWorker->subid,
429445
rstate->relid, false);
446+
430447
if (syncworker)
431448
{
449+
/* Found one, update our copy of its state */
432450
SpinLockAcquire(&syncworker->relmutex);
433451
rstate->state=syncworker->relstate;
434452
rstate->lsn=syncworker->relstate_lsn;
453+
if (rstate->state==SUBREL_STATE_SYNCWAIT)
454+
{
455+
/*
456+
* Sync worker is waiting for apply. Tell sync worker it
457+
* can catchup now.
458+
*/
459+
syncworker->relstate=SUBREL_STATE_CATCHUP;
460+
syncworker->relstate_lsn=
461+
Max(syncworker->relstate_lsn,current_lsn);
462+
}
435463
SpinLockRelease(&syncworker->relmutex);
464+
465+
/* If we told worker to catch up, wait for it. */
466+
if (rstate->state==SUBREL_STATE_SYNCWAIT)
467+
{
468+
/* Signal the sync worker, as it may be waiting for us. */
469+
if (syncworker->proc)
470+
logicalrep_worker_wakeup_ptr(syncworker);
471+
472+
/* Now safe to release the LWLock */
473+
LWLockRelease(LogicalRepWorkerLock);
474+
475+
/*
476+
* Enter busy loop and wait for synchronization worker to
477+
* reach expected state (or die trying).
478+
*/
479+
if (!started_tx)
480+
{
481+
StartTransactionCommand();
482+
started_tx= true;
483+
}
484+
485+
wait_for_relation_state_change(rstate->relid,
486+
SUBREL_STATE_SYNCDONE);
487+
}
488+
else
489+
LWLockRelease(LogicalRepWorkerLock);
436490
}
437491
else
438-
492+
{
439493
/*
440494
* If there is no sync worker for this table yet, count
441495
* running sync workers for this subscription, while we have
442-
* the lock, for later.
496+
* the lock.
443497
*/
444-
nsyncworkers=logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
445-
LWLockRelease(LogicalRepWorkerLock);
446-
447-
/*
448-
* There is a worker synchronizing the relation and waiting for
449-
* apply to do something.
450-
*/
451-
if (syncworker&&rstate->state==SUBREL_STATE_SYNCWAIT)
452-
{
453-
/*
454-
* Tell sync worker it can catchup now. We'll wait for it so
455-
* it does not get lost.
456-
*/
457-
SpinLockAcquire(&syncworker->relmutex);
458-
syncworker->relstate=SUBREL_STATE_CATCHUP;
459-
syncworker->relstate_lsn=
460-
Max(syncworker->relstate_lsn,current_lsn);
461-
SpinLockRelease(&syncworker->relmutex);
498+
intnsyncworkers=
499+
logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
462500

463-
/*Signal the sync worker, as it may be waiting for us. */
464-
logicalrep_worker_wakeup_ptr(syncworker);
501+
/*Now safe to release the LWLock */
502+
LWLockRelease(LogicalRepWorkerLock);
465503

466504
/*
467-
*Enter busy loop and wait for synchronization worker to
468-
*reach expected state (or die trying).
505+
*If there are free sync worker slot(s), start a new sync
506+
*worker for the table.
469507
*/
470-
if (!started_tx)
471-
{
472-
StartTransactionCommand();
473-
started_tx= true;
474-
}
475-
wait_for_relation_state_change(rstate->relid,
476-
SUBREL_STATE_SYNCDONE);
477-
}
478-
479-
/*
480-
* If there is no sync worker registered for the table and there
481-
* is some free sync worker slot, start a new sync worker for the
482-
* table.
483-
*/
484-
elseif (!syncworker&&nsyncworkers<max_sync_workers_per_subscription)
485-
{
486-
TimestampTznow=GetCurrentTimestamp();
487-
structtablesync_start_time_mapping*hentry;
488-
boolfound;
489-
490-
hentry=hash_search(last_start_times,&rstate->relid,HASH_ENTER,&found);
491-
492-
if (!found||
493-
TimestampDifferenceExceeds(hentry->last_start_time,now,
494-
wal_retrieve_retry_interval))
508+
if (nsyncworkers<max_sync_workers_per_subscription)
495509
{
496-
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
497-
MySubscription->oid,
498-
MySubscription->name,
499-
MyLogicalRepWorker->userid,
500-
rstate->relid);
501-
hentry->last_start_time=now;
510+
TimestampTznow=GetCurrentTimestamp();
511+
structtablesync_start_time_mapping*hentry;
512+
boolfound;
513+
514+
hentry=hash_search(last_start_times,&rstate->relid,
515+
HASH_ENTER,&found);
516+
517+
if (!found||
518+
TimestampDifferenceExceeds(hentry->last_start_time,now,
519+
wal_retrieve_retry_interval))
520+
{
521+
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
522+
MySubscription->oid,
523+
MySubscription->name,
524+
MyLogicalRepWorker->userid,
525+
rstate->relid);
526+
hentry->last_start_time=now;
527+
}
502528
}
503529
}
504530
}
@@ -512,7 +538,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
512538
}
513539

514540
/*
515-
* Processstatepossible change(s) of tables that are being synchronized.
541+
* Process possible state change(s) of tables that are being synchronized.
516542
*/
517543
void
518544
process_syncing_tables(XLogRecPtrcurrent_lsn)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp