Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite3cf708

Browse files
committed
Wait between tablesync worker restarts
Before restarting a tablesync worker for the same relation, waitwal_retrieve_retry_interval (currently 5s by default). This avoidsrestarting failing workers in a tight loop.We keep the last start times in a hash table last_start_times that isseparate from the table_states list, because that list is cleared out onsyscache invalidation, which happens whenever a table finishes syncing.The hash table is kept until all tables have finished syncing.A future project might be to unify these two and keep everything in onedata structure, but for now this is a less invasive change to accomplishthe original purpose.For the test suite, set wal_retrieve_retry_interval to its minimumvalue, to not increase the test suite run time.Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parentd981074 commite3cf708

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

‎src/backend/replication/logical/tablesync.c

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
245245
*
246246
* If there are tables that need synchronizing and are not being synchronized
247247
* yet, start sync workers for them (if there are free slots for sync
248-
* workers).
248+
* workers). To prevent starting the sync worker for the same relation at a
249+
* high frequency after a failure, we store its last start time with each sync
250+
* state info. We start the sync worker for the same relation after waiting
251+
* at least wal_retrieve_retry_interval.
249252
*
250253
* For tables that are being synchronized already, check if sync workers
251254
* either need action from the apply worker or have finished.
@@ -263,7 +266,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
263266
staticvoid
264267
process_syncing_tables_for_apply(XLogRecPtrcurrent_lsn)
265268
{
269+
structtablesync_start_time_mapping
270+
{
271+
Oidrelid;
272+
TimestampTzlast_start_time;
273+
};
266274
staticList*table_states=NIL;
275+
staticHTAB*last_start_times=NULL;
267276
ListCell*lc;
268277

269278
Assert(!IsTransactionState());
@@ -300,6 +309,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
300309
table_states_valid= true;
301310
}
302311

312+
/*
313+
* Prepare hash table for tracking last start times of workers, to avoid
314+
* immediate restarts. We don't need it if there are no tables that need
315+
* syncing.
316+
*/
317+
if (table_states&& !last_start_times)
318+
{
319+
HASHCTLctl;
320+
321+
memset(&ctl,0,sizeof(ctl));
322+
ctl.keysize=sizeof(Oid);
323+
ctl.entrysize=sizeof(structtablesync_start_time_mapping);
324+
last_start_times=hash_create("Logical replication table sync worker start times",
325+
256,&ctl,HASH_ELEM |HASH_BLOBS);
326+
}
327+
/*
328+
* Clean up the hash table when we're done with all tables (just to
329+
* release the bit of memory).
330+
*/
331+
elseif (!table_states&&last_start_times)
332+
{
333+
hash_destroy(last_start_times);
334+
last_start_times=NULL;
335+
}
336+
303337
/* Process all tables that are being synchronized. */
304338
foreach(lc,table_states)
305339
{
@@ -403,11 +437,23 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
403437
*/
404438
elseif (!syncworker&&nsyncworkers<max_sync_workers_per_subscription)
405439
{
406-
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
407-
MySubscription->oid,
408-
MySubscription->name,
409-
MyLogicalRepWorker->userid,
410-
rstate->relid);
440+
TimestampTznow=GetCurrentTimestamp();
441+
structtablesync_start_time_mapping*hentry;
442+
boolfound;
443+
444+
hentry=hash_search(last_start_times,&rstate->relid,HASH_ENTER,&found);
445+
446+
if (!found||
447+
TimestampDifferenceExceeds(hentry->last_start_time,now,
448+
wal_retrieve_retry_interval))
449+
{
450+
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
451+
MySubscription->oid,
452+
MySubscription->name,
453+
MyLogicalRepWorker->userid,
454+
rstate->relid);
455+
hentry->last_start_time=now;
456+
}
411457
}
412458
}
413459
}

‎src/test/subscription/t/004_sync.pl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# Create subscriber node
1414
my$node_subscriber = get_new_node('subscriber');
1515
$node_subscriber->init(allows_streaming=>'logical');
16+
$node_subscriber->append_conf('postgresql.conf',"wal_retrieve_retry_interval = 1ms");
1617
$node_subscriber->start;
1718

1819
# Create some preexisting content on publisher

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp