Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5509055

Browse files
author
Amit Kapila
committed
Add sequence synchronization for logical replication.
This patch introduces sequence synchronization. Sequences that are syncedwill have 2 states: - INIT (needs [re]synchronizing) - READY (is already synchronized)A new sequencesync worker is launched as needed to synchronize sequences.A single sequencesync worker is responsible for synchronizing allsequences. It begins by retrieving the list of sequences that are flaggedfor synchronization, i.e., those in the INIT state. These sequences arethen processed in batches, allowing multiple entries to be synchronizedwithin a single transaction. The worker fetches the current sequencevalues and page LSNs from the remote publisher, updates the correspondingsequences on the local subscriber, and finally marks each sequence asREADY upon successful synchronization.Sequence synchronization occurs in 3 places:1) CREATE SUBSCRIPTION - The command syntax remains unchanged. - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize all sequences.2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - The command syntax remains unchanged. - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize only newly added sequences.3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES - A new command introduced for PG19 byf0b3573. - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker to synchronize all sequences. - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command, addition and removal of missing sequences will not be done in this case.Author: Vignesh C <vignesh21@gmail.com>Reviewed-by: shveta malik <shveta.malik@gmail.com>Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>Reviewed-by: Peter Smith <smithpb2250@gmail.com>Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Reviewed-by: Chao Li <li.evan.chao@gmail.com>Discussion:https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
1 parent1fd981f commit5509055

File tree

19 files changed

+1229
-126
lines changed

19 files changed

+1229
-126
lines changed

‎src/backend/catalog/pg_subscription.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
354354
ObjectIdGetDatum(relid),
355355
ObjectIdGetDatum(subid));
356356
if (!HeapTupleIsValid(tup))
357-
elog(ERROR,"subscriptiontable %u in subscription %u does not exist",
357+
elog(ERROR,"subscriptionrelation %u in subscription %u does not exist",
358358
relid,subid);
359359

360360
/* Update the tuple. */

‎src/backend/commands/sequence.c‎

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity,
112112
bool*is_called,
113113
bool*need_seq_rewrite,
114114
List**owned_by);
115-
staticvoiddo_setval(Oidrelid,int64next,booliscalled);
116115
staticvoidprocess_owned_by(Relationseqrel,List*owned_by,boolfor_identity);
117116

118117

@@ -954,8 +953,8 @@ lastval(PG_FUNCTION_ARGS)
954953
* it is the only way to clear the is_called flag in an existing
955954
* sequence.
956955
*/
957-
staticvoid
958-
do_setval(Oidrelid,int64next,booliscalled)
956+
void
957+
SetSequence(Oidrelid,int64next,booliscalled)
959958
{
960959
SeqTableelm;
961960
Relationseqrel;
@@ -1056,22 +1055,22 @@ do_setval(Oid relid, int64 next, bool iscalled)
10561055

10571056
/*
10581057
* Implement the 2 arg setval procedure.
1059-
* Seedo_setval for discussion.
1058+
* SeeSetSequence for discussion.
10601059
*/
10611060
Datum
10621061
setval_oid(PG_FUNCTION_ARGS)
10631062
{
10641063
Oidrelid=PG_GETARG_OID(0);
10651064
int64next=PG_GETARG_INT64(1);
10661065

1067-
do_setval(relid,next, true);
1066+
SetSequence(relid,next, true);
10681067

10691068
PG_RETURN_INT64(next);
10701069
}
10711070

10721071
/*
10731072
* Implement the 3 arg setval procedure.
1074-
* Seedo_setval for discussion.
1073+
* SeeSetSequence for discussion.
10751074
*/
10761075
Datum
10771076
setval3_oid(PG_FUNCTION_ARGS)
@@ -1080,7 +1079,7 @@ setval3_oid(PG_FUNCTION_ARGS)
10801079
int64next=PG_GETARG_INT64(1);
10811080
booliscalled=PG_GETARG_BOOL(2);
10821081

1083-
do_setval(relid,next,iscalled);
1082+
SetSequence(relid,next,iscalled);
10841083

10851084
PG_RETURN_INT64(next);
10861085
}
@@ -1797,8 +1796,9 @@ pg_sequence_parameters(PG_FUNCTION_ARGS)
17971796
/*
17981797
* Return the sequence tuple along with its page LSN.
17991798
*
1800-
* This is primarily intended for use by pg_dump to gather sequence data
1801-
* without needing to individually query each sequence relation.
1799+
* This is primarily used by pg_dump to efficiently collect sequence data
1800+
* without querying each sequence individually, and is also leveraged by
1801+
* logical replication while synchronizing sequences.
18021802
*/
18031803
Datum
18041804
pg_get_sequence_data(PG_FUNCTION_ARGS)

‎src/backend/postmaster/bgworker.c‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ static const struct
131131
"ParallelApplyWorkerMain",ParallelApplyWorkerMain
132132
},
133133
{
134-
"TablesyncWorkerMain",TablesyncWorkerMain
134+
"TableSyncWorkerMain",TableSyncWorkerMain
135+
},
136+
{
137+
"SequenceSyncWorkerMain",SequenceSyncWorkerMain
135138
}
136139
};
137140

‎src/backend/replication/logical/Makefile‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ OBJS = \
2626
proto.o\
2727
relation.o\
2828
reorderbuffer.o\
29+
sequencesync.o\
2930
slotsync.o\
3031
snapbuild.o\
3132
syncutils.o\

‎src/backend/replication/logical/launcher.c‎

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
248248
* Walks the workers array and searches for one that matches given worker type,
249249
* subscription id, and relation id.
250250
*
251-
* For apply workers, the relid should be set to InvalidOid, as they manage
252-
* changes across all tables. For table sync workers, the relid should be set
253-
* to the OID of the relation being synchronized.
251+
* For both apply workers and sequencesync workers, the relid should be set to
252+
* InvalidOid, as these workers handle changes across all tables and sequences
253+
* respectively, rather than targeting a specific relation. For tablesync
254+
* workers, the relid should be set to the OID of the relation being
255+
* synchronized.
254256
*/
255257
LogicalRepWorker*
256258
logicalrep_worker_find(LogicalRepWorkerTypewtype,Oidsubid,Oidrelid,
@@ -334,6 +336,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
334336
intnparallelapplyworkers;
335337
TimestampTznow;
336338
boolis_tablesync_worker= (wtype==WORKERTYPE_TABLESYNC);
339+
boolis_sequencesync_worker= (wtype==WORKERTYPE_SEQUENCESYNC);
337340
boolis_parallel_apply_worker= (wtype==WORKERTYPE_PARALLEL_APPLY);
338341

339342
/*----------
@@ -422,7 +425,8 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
422425
* sync worker limit per subscription. So, just return silently as we
423426
* might get here because of an otherwise harmless race condition.
424427
*/
425-
if (is_tablesync_worker&&nsyncworkers >=max_sync_workers_per_subscription)
428+
if ((is_tablesync_worker||is_sequencesync_worker)&&
429+
nsyncworkers >=max_sync_workers_per_subscription)
426430
{
427431
LWLockRelease(LogicalRepWorkerLock);
428432
return false;
@@ -478,6 +482,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
478482
TIMESTAMP_NOBEGIN(worker->last_recv_time);
479483
worker->reply_lsn=InvalidXLogRecPtr;
480484
TIMESTAMP_NOBEGIN(worker->reply_time);
485+
worker->last_seqsync_start_time=0;
481486

482487
/* Before releasing lock, remember generation for future identification. */
483488
generation=worker->generation;
@@ -511,8 +516,16 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
511516
memcpy(bgw.bgw_extra,&subworker_dsm,sizeof(dsm_handle));
512517
break;
513518

519+
caseWORKERTYPE_SEQUENCESYNC:
520+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"SequenceSyncWorkerMain");
521+
snprintf(bgw.bgw_name,BGW_MAXLEN,
522+
"logical replication sequencesync worker for subscription %u",
523+
subid);
524+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication sequencesync worker");
525+
break;
526+
514527
caseWORKERTYPE_TABLESYNC:
515-
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"TablesyncWorkerMain");
528+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"TableSyncWorkerMain");
516529
snprintf(bgw.bgw_name,BGW_MAXLEN,
517530
"logical replication tablesync worker for subscription %u sync %u",
518531
subid,
@@ -848,6 +861,33 @@ logicalrep_launcher_onexit(int code, Datum arg)
848861
LogicalRepCtx->launcher_pid=0;
849862
}
850863

864+
/*
865+
* Reset the last_seqsync_start_time of the sequencesync worker in the
866+
* subscription's apply worker.
867+
*
868+
* Note that this value is not stored in the sequencesync worker, because that
869+
* has finished already and is about to exit.
870+
*/
871+
void
872+
logicalrep_reset_seqsync_start_time(void)
873+
{
874+
LogicalRepWorker*worker;
875+
876+
/*
877+
* The apply worker can't access last_seqsync_start_time concurrently, so
878+
* it is okay to use SHARED lock here. See ProcessSequencesForSync().
879+
*/
880+
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
881+
882+
worker=logicalrep_worker_find(WORKERTYPE_APPLY,
883+
MyLogicalRepWorker->subid,InvalidOid,
884+
true);
885+
if (worker)
886+
worker->last_seqsync_start_time=0;
887+
888+
LWLockRelease(LogicalRepWorkerLock);
889+
}
890+
851891
/*
852892
* Cleanup function.
853893
*
@@ -896,7 +936,7 @@ logicalrep_sync_worker_count(Oid subid)
896936
{
897937
LogicalRepWorker*w=&LogicalRepCtx->workers[i];
898938

899-
if (isTablesyncWorker(w)&&w->subid==subid)
939+
if (w->subid==subid&& (isTableSyncWorker(w)||isSequenceSyncWorker(w)))
900940
res++;
901941
}
902942

@@ -1610,7 +1650,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
16101650
worker_pid=worker.proc->pid;
16111651

16121652
values[0]=ObjectIdGetDatum(worker.subid);
1613-
if (isTablesyncWorker(&worker))
1653+
if (isTableSyncWorker(&worker))
16141654
values[1]=ObjectIdGetDatum(worker.relid);
16151655
else
16161656
nulls[1]= true;
@@ -1650,6 +1690,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
16501690
caseWORKERTYPE_PARALLEL_APPLY:
16511691
values[9]=CStringGetTextDatum("parallel apply");
16521692
break;
1693+
caseWORKERTYPE_SEQUENCESYNC:
1694+
values[9]=CStringGetTextDatum("sequence synchronization");
1695+
break;
16531696
caseWORKERTYPE_TABLESYNC:
16541697
values[9]=CStringGetTextDatum("table synchronization");
16551698
break;

‎src/backend/replication/logical/meson.build‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ backend_sources += files(
1212
'proto.c',
1313
'relation.c',
1414
'reorderbuffer.c',
15+
'sequencesync.c',
1516
'slotsync.c',
1617
'snapbuild.c',
1718
'syncutils.c',

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp