Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5a3a953

Browse files
committed
Track logrep apply workers' last start times to avoid useless waits.
Enforce wal_retrieve_retry_interval on a per-subscription basis,rather than globally, and arrange to skip that delay in case ofan intentional worker exit. This probably makes little differencein the field, where apply workers wouldn't be restarted often;but it has a significant impact on the runtime of our logicalreplication regression tests (even though those tests useartificially-small wal_retrieve_retry_interval settings already).Nathan Bossart, with mostly-cosmetic editorialization by meDiscussion:https://postgr.es/m/20221122004119.GA132961@nathanxps13
1 parentc9f7f92 commit5a3a953

File tree

9 files changed

+243
-49
lines changed

9 files changed

+243
-49
lines changed

‎doc/src/sgml/config.sgml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4877,6 +4877,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
48774877
environments where the number of times an infrastructure is accessed
48784878
is taken into account.
48794879
</para>
4880+
<para>
4881+
In logical replication, this parameter also limits how often a failing
4882+
replication apply worker will be respawned.
4883+
</para>
48804884
</listitem>
48814885
</varlistentry>
48824886

‎doc/src/sgml/monitoring.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,6 +2008,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
20082008
<entry>Waiting to read or update information
20092009
about <quote>heavyweight</quote> locks.</entry>
20102010
</row>
2011+
<row>
2012+
<entry><literal>LogicalRepLauncherDSA</literal></entry>
2013+
<entry>Waiting to access logical replication launcher's dynamic shared
2014+
memory allocator.</entry>
2015+
</row>
2016+
<row>
2017+
<entry><literal>LogicalRepLauncherHash</literal></entry>
2018+
<entry>Waiting to access logical replication launcher's shared
2019+
hash table.</entry>
2020+
</row>
20112021
<row>
20122022
<entry><literal>LogicalRepWorker</literal></entry>
20132023
<entry>Waiting to read or update the state of logical replication

‎src/backend/commands/subscriptioncmds.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
15041504
}
15051505
list_free(subworkers);
15061506

1507+
/*
1508+
* Remove the no-longer-useful entry in the launcher's table of apply
1509+
* worker start times.
1510+
*
1511+
* If this transaction rolls back, the launcher might restart a failed
1512+
* apply worker before wal_retrieve_retry_interval milliseconds have
1513+
* elapsed, but that's pretty harmless.
1514+
*/
1515+
ApplyLauncherForgetWorkerStartTime(subid);
1516+
15071517
/*
15081518
* Cleanup of tablesync replication origins.
15091519
*

‎src/backend/replication/logical/launcher.c

Lines changed: 183 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include"catalog/pg_subscription.h"
2626
#include"catalog/pg_subscription_rel.h"
2727
#include"funcapi.h"
28+
#include"lib/dshash.h"
2829
#include"libpq/pqsignal.h"
2930
#include"miscadmin.h"
3031
#include"pgstat.h"
@@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct
6465
/* Supervisor process. */
6566
pid_tlauncher_pid;
6667

68+
/* Hash table holding last start times of subscriptions' apply workers. */
69+
dsa_handlelast_start_dsa;
70+
dshash_table_handlelast_start_dsh;
71+
6772
/* Background workers. */
6873
LogicalRepWorkerworkers[FLEXIBLE_ARRAY_MEMBER];
6974
}LogicalRepCtxStruct;
7075

7176
staticLogicalRepCtxStruct*LogicalRepCtx;
7277

78+
/* an entry in the last-start-times shared hash table */
79+
typedefstructLauncherLastStartTimesEntry
80+
{
81+
Oidsubid;/* OID of logrep subscription (hash key) */
82+
TimestampTzlast_start_time;/* last time its apply worker was started */
83+
}LauncherLastStartTimesEntry;
84+
85+
/* parameters for the last-start-times shared hash table */
86+
staticconstdshash_parametersdsh_params= {
87+
sizeof(Oid),
88+
sizeof(LauncherLastStartTimesEntry),
89+
dshash_memcmp,
90+
dshash_memhash,
91+
LWTRANCHE_LAUNCHER_HASH
92+
};
93+
94+
staticdsa_area*last_start_times_dsa=NULL;
95+
staticdshash_table*last_start_times=NULL;
96+
97+
staticboolon_commit_launcher_wakeup= false;
98+
99+
73100
staticvoidApplyLauncherWakeup(void);
74101
staticvoidlogicalrep_launcher_onexit(intcode,Datumarg);
75102
staticvoidlogicalrep_worker_onexit(intcode,Datumarg);
76103
staticvoidlogicalrep_worker_detach(void);
77104
staticvoidlogicalrep_worker_cleanup(LogicalRepWorker*worker);
78105
staticintlogicalrep_pa_worker_count(Oidsubid);
79-
80-
staticboolon_commit_launcher_wakeup= false;
106+
staticvoidlogicalrep_launcher_attach_dshmem(void);
107+
staticvoidApplyLauncherSetWorkerStartTime(Oidsubid,TimestampTzstart_time);
108+
staticTimestampTzApplyLauncherGetWorkerStartTime(Oidsubid);
81109

82110

83111
/*
@@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void)
894922

895923
memset(LogicalRepCtx,0,ApplyLauncherShmemSize());
896924

925+
LogicalRepCtx->last_start_dsa=DSM_HANDLE_INVALID;
926+
LogicalRepCtx->last_start_dsh=DSM_HANDLE_INVALID;
927+
897928
/* Initialize memory and spin locks for each worker slot. */
898929
for (slot=0;slot<max_logical_replication_workers;slot++)
899930
{
@@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void)
905936
}
906937
}
907938

939+
/*
940+
* Initialize or attach to the dynamic shared hash table that stores the
941+
* last-start times, if not already done.
942+
* This must be called before accessing the table.
943+
*/
944+
staticvoid
945+
logicalrep_launcher_attach_dshmem(void)
946+
{
947+
MemoryContextoldcontext;
948+
949+
/* Quick exit if we already did this. */
950+
if (LogicalRepCtx->last_start_dsh!=DSM_HANDLE_INVALID&&
951+
last_start_times!=NULL)
952+
return;
953+
954+
/* Otherwise, use a lock to ensure only one process creates the table. */
955+
LWLockAcquire(LogicalRepWorkerLock,LW_EXCLUSIVE);
956+
957+
/* Be sure any local memory allocated by DSA routines is persistent. */
958+
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
959+
960+
if (LogicalRepCtx->last_start_dsh==DSM_HANDLE_INVALID)
961+
{
962+
/* Initialize dynamic shared hash table for last-start times. */
963+
last_start_times_dsa=dsa_create(LWTRANCHE_LAUNCHER_DSA);
964+
dsa_pin(last_start_times_dsa);
965+
dsa_pin_mapping(last_start_times_dsa);
966+
last_start_times=dshash_create(last_start_times_dsa,&dsh_params,0);
967+
968+
/* Store handles in shared memory for other backends to use. */
969+
LogicalRepCtx->last_start_dsa=dsa_get_handle(last_start_times_dsa);
970+
LogicalRepCtx->last_start_dsh=dshash_get_hash_table_handle(last_start_times);
971+
}
972+
elseif (!last_start_times)
973+
{
974+
/* Attach to existing dynamic shared hash table. */
975+
last_start_times_dsa=dsa_attach(LogicalRepCtx->last_start_dsa);
976+
dsa_pin_mapping(last_start_times_dsa);
977+
last_start_times=dshash_attach(last_start_times_dsa,&dsh_params,
978+
LogicalRepCtx->last_start_dsh,0);
979+
}
980+
981+
MemoryContextSwitchTo(oldcontext);
982+
LWLockRelease(LogicalRepWorkerLock);
983+
}
984+
985+
/*
986+
* Set the last-start time for the subscription.
987+
*/
988+
staticvoid
989+
ApplyLauncherSetWorkerStartTime(Oidsubid,TimestampTzstart_time)
990+
{
991+
LauncherLastStartTimesEntry*entry;
992+
boolfound;
993+
994+
logicalrep_launcher_attach_dshmem();
995+
996+
entry=dshash_find_or_insert(last_start_times,&subid,&found);
997+
entry->last_start_time=start_time;
998+
dshash_release_lock(last_start_times,entry);
999+
}
1000+
1001+
/*
1002+
* Return the last-start time for the subscription, or 0 if there isn't one.
1003+
*/
1004+
staticTimestampTz
1005+
ApplyLauncherGetWorkerStartTime(Oidsubid)
1006+
{
1007+
LauncherLastStartTimesEntry*entry;
1008+
TimestampTzret;
1009+
1010+
logicalrep_launcher_attach_dshmem();
1011+
1012+
entry=dshash_find(last_start_times,&subid, false);
1013+
if (entry==NULL)
1014+
return0;
1015+
1016+
ret=entry->last_start_time;
1017+
dshash_release_lock(last_start_times,entry);
1018+
1019+
returnret;
1020+
}
1021+
1022+
/*
1023+
* Remove the last-start-time entry for the subscription, if one exists.
1024+
*
1025+
* This has two use-cases: to remove the entry related to a subscription
1026+
* that's been deleted or disabled (just to avoid leaking shared memory),
1027+
* and to allow immediate restart of an apply worker that has exited
1028+
* due to subscription parameter changes.
1029+
*/
1030+
void
1031+
ApplyLauncherForgetWorkerStartTime(Oidsubid)
1032+
{
1033+
logicalrep_launcher_attach_dshmem();
1034+
1035+
(void)dshash_delete_key(last_start_times,&subid);
1036+
}
1037+
9081038
/*
9091039
* Wakeup the launcher on commit if requested.
9101040
*/
@@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void)
9471077
void
9481078
ApplyLauncherMain(Datummain_arg)
9491079
{
950-
TimestampTzlast_start_time=0;
951-
9521080
ereport(DEBUG1,
9531081
(errmsg_internal("logical replication launcher started")));
9541082

@@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg)
9761104
ListCell*lc;
9771105
MemoryContextsubctx;
9781106
MemoryContextoldctx;
979-
TimestampTznow;
9801107
longwait_time=DEFAULT_NAPTIME_PER_CYCLE;
9811108

9821109
CHECK_FOR_INTERRUPTS();
9831110

984-
now=GetCurrentTimestamp();
1111+
/* Use temporary context to avoid leaking memory across cycles. */
1112+
subctx=AllocSetContextCreate(TopMemoryContext,
1113+
"Logical Replication Launcher sublist",
1114+
ALLOCSET_DEFAULT_SIZES);
1115+
oldctx=MemoryContextSwitchTo(subctx);
9851116

986-
/*Limit the start retry to once a wal_retrieve_retry_interval */
987-
if (TimestampDifferenceExceeds(last_start_time,now,
988-
wal_retrieve_retry_interval))
1117+
/*Start any missing workers for enabled subscriptions. */
1118+
sublist=get_subscription_list();
1119+
foreach(lc,sublist)
9891120
{
990-
/* Use temporary context for the database list and worker info. */
991-
subctx=AllocSetContextCreate(TopMemoryContext,
992-
"Logical Replication Launcher sublist",
993-
ALLOCSET_DEFAULT_SIZES);
994-
oldctx=MemoryContextSwitchTo(subctx);
1121+
Subscription*sub= (Subscription*)lfirst(lc);
1122+
LogicalRepWorker*w;
1123+
TimestampTzlast_start;
1124+
TimestampTznow;
1125+
longelapsed;
9951126

996-
/* search for subscriptions to start or stop. */
997-
sublist=get_subscription_list();
998-
999-
/* Start the missing workers for enabled subscriptions. */
1000-
foreach(lc,sublist)
1001-
{
1002-
Subscription*sub= (Subscription*)lfirst(lc);
1003-
LogicalRepWorker*w;
1127+
if (!sub->enabled)
1128+
continue;
10041129

1005-
if (!sub->enabled)
1006-
continue;
1007-
1008-
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
1009-
w=logicalrep_worker_find(sub->oid,InvalidOid, false);
1010-
LWLockRelease(LogicalRepWorkerLock);
1011-
1012-
if (w==NULL)
1013-
{
1014-
last_start_time=now;
1015-
wait_time=wal_retrieve_retry_interval;
1130+
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
1131+
w=logicalrep_worker_find(sub->oid,InvalidOid, false);
1132+
LWLockRelease(LogicalRepWorkerLock);
10161133

1017-
logicalrep_worker_launch(sub->dbid,sub->oid,sub->name,
1018-
sub->owner,InvalidOid,DSM_HANDLE_INVALID);
1019-
}
1020-
}
1134+
if (w!=NULL)
1135+
continue;/* worker is running already */
10211136

1022-
/* Switch back to original memory context. */
1023-
MemoryContextSwitchTo(oldctx);
1024-
/* Clean the temporary memory. */
1025-
MemoryContextDelete(subctx);
1026-
}
1027-
else
1028-
{
10291137
/*
1030-
* The wait in previous cycle was interrupted in less than
1031-
* wal_retrieve_retry_interval since last worker was started, this
1032-
* usually means crash of the worker, so we should retry in
1033-
* wal_retrieve_retry_interval again.
1138+
* If the worker is eligible to start now, launch it. Otherwise,
1139+
* adjust wait_time so that we'll wake up as soon as it can be
1140+
* started.
1141+
*
1142+
* Each subscription's apply worker can only be restarted once per
1143+
* wal_retrieve_retry_interval, so that errors do not cause us to
1144+
* repeatedly restart the worker as fast as possible. In cases
1145+
* where a restart is expected (e.g., subscription parameter
1146+
* changes), another process should remove the last-start entry
1147+
* for the subscription so that the worker can be restarted
1148+
* without waiting for wal_retrieve_retry_interval to elapse.
10341149
*/
1035-
wait_time=wal_retrieve_retry_interval;
1150+
last_start=ApplyLauncherGetWorkerStartTime(sub->oid);
1151+
now=GetCurrentTimestamp();
1152+
if (last_start==0||
1153+
(elapsed=TimestampDifferenceMilliseconds(last_start,now)) >=wal_retrieve_retry_interval)
1154+
{
1155+
ApplyLauncherSetWorkerStartTime(sub->oid,now);
1156+
logicalrep_worker_launch(sub->dbid,sub->oid,sub->name,
1157+
sub->owner,InvalidOid,
1158+
DSM_HANDLE_INVALID);
1159+
}
1160+
else
1161+
{
1162+
wait_time=Min(wait_time,
1163+
wal_retrieve_retry_interval-elapsed);
1164+
}
10361165
}
10371166

1167+
/* Switch back to original memory context. */
1168+
MemoryContextSwitchTo(oldctx);
1169+
/* Clean the temporary memory. */
1170+
MemoryContextDelete(subctx);
1171+
10381172
/* Wait for more work. */
10391173
rc=WaitLatch(MyLatch,
10401174
WL_LATCH_SET |WL_TIMEOUT |WL_EXIT_ON_PM_DEATH,

‎src/backend/replication/logical/tablesync.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
628628
}
629629

630630
if (should_exit)
631+
{
632+
/*
633+
* Reset the last-start time for this worker so that the launcher will
634+
* restart it without waiting for wal_retrieve_retry_interval.
635+
*/
636+
ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
637+
631638
proc_exit(0);
639+
}
632640
}
633641

634642
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp