Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9c7d06d

Browse files
Ability to advance replication slots
Ability to advance both physical and logical replication slots using anew user function pg_replication_slot_advance().For logical advance that means records are consumed as fast as possibleand changes are not given to output plugin for sending. Makes 2nd phase(after we reached SNAPBUILD_FULL_SNAPSHOT) of replication slot creationfaster, especially when there are big transactions as the reorder bufferdoes not have to deal with data changes and does not have to spill todisk.Author: Petr JelinekReviewed-by: Simon Riggs
1 parent585e166 commit9c7d06d

File tree

10 files changed

+333
-17
lines changed

10 files changed

+333
-17
lines changed

‎contrib/test_decoding/expected/slot.out

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,36 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'in
9292
COMMIT
9393
(3 rows)
9494

95+
INSERT INTO replication_example(somedata, text) VALUES (1, 4);
96+
INSERT INTO replication_example(somedata, text) VALUES (1, 5);
97+
SELECT pg_current_wal_lsn() AS wal_lsn \gset
98+
INSERT INTO replication_example(somedata, text) VALUES (1, 6);
99+
SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
100+
SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
101+
slot_name
102+
------------------
103+
regression_slot2
104+
(1 row)
105+
106+
SELECT :'wal_lsn' = :'end_lsn';
107+
?column?
108+
----------
109+
t
110+
(1 row)
111+
112+
SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
113+
data
114+
---------------------------------------------------------------------------------------------------------
115+
BEGIN
116+
table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6'
117+
COMMIT
118+
(3 rows)
119+
120+
SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
121+
data
122+
------
123+
(0 rows)
124+
95125
DROP TABLE replication_example;
96126
-- error
97127
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);

‎contrib/test_decoding/sql/slot.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ INSERT INTO replication_example(somedata, text) VALUES (1, 3);
4545
SELECT dataFROM pg_logical_slot_get_changes('regression_slot1',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
4646
SELECT dataFROM pg_logical_slot_get_changes('regression_slot2',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
4747

48+
INSERT INTO replication_example(somedata,text)VALUES (1,4);
49+
INSERT INTO replication_example(somedata,text)VALUES (1,5);
50+
51+
SELECT pg_current_wal_lsn()AS wal_lsn \gset
52+
53+
INSERT INTO replication_example(somedata,text)VALUES (1,6);
54+
55+
SELECT end_lsnFROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
56+
SELECT slot_nameFROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
57+
58+
SELECT :'wal_lsn'= :'end_lsn';
59+
60+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot1',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
61+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot2',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
62+
4863
DROPTABLE replication_example;
4964

5065
-- error

‎doc/src/sgml/func.sgml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19155,6 +19155,25 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
1915519155
</entry>
1915619156
</row>
1915719157

19158+
<row>
19159+
<entry>
19160+
<indexterm>
19161+
<primary>pg_replication_slot_advance</primary>
19162+
</indexterm>
19163+
<literal><function>pg_replication_slot_advance(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>)</function></literal>
19164+
</entry>
19165+
<entry>
19166+
(<parameter>slot_name</parameter> <type>name</type>, <parameter>end_lsn</parameter> <type>pg_lsn</type>)
19167+
<type>bool</type>
19168+
</entry>
19169+
<entry>
19170+
Advances the current confirmed position of a replication slot named
19171+
<parameter>slot_name</parameter>. The slot will not be moved backwards,
19172+
and it will not be moved beyond the current insert location. Returns
19173+
name of the slot and real position to which it was advanced to.
19174+
</entry>
19175+
</row>
19176+
1915819177
<row>
1915919178
<entry id="pg-replication-origin-create">
1916019179
<indexterm>

‎src/backend/replication/logical/decode.c

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
8888
* call ReorderBufferProcessXid for each record type by default, because
8989
* e.g. empty xacts can be handled more efficiently if there's no previous
9090
* state for them.
91+
*
92+
* We also support the ability to fast forward thru records, skipping some
93+
* record types completely - see individual record types for details.
9194
*/
9295
void
9396
LogicalDecodingProcessRecord(LogicalDecodingContext*ctx,XLogReaderState*record)
@@ -332,8 +335,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
332335
xl_invalidations*invalidations=
333336
(xl_invalidations*)XLogRecGetData(r);
334337

335-
ReorderBufferImmediateInvalidation(
336-
ctx->reorder,invalidations->nmsgs,invalidations->msgs);
338+
if (!ctx->fast_forward)
339+
ReorderBufferImmediateInvalidation(ctx->reorder,
340+
invalidations->nmsgs,
341+
invalidations->msgs);
337342
}
338343
break;
339344
default:
@@ -353,14 +358,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
353358

354359
ReorderBufferProcessXid(ctx->reorder,xid,buf->origptr);
355360

356-
/* no point in doing anything yet */
357-
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
361+
/*
362+
* If we don't have snapshot or we are just fast-forwarding, there is no
363+
* point in decoding changes.
364+
*/
365+
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT||
366+
ctx->fast_forward)
358367
return;
359368

360369
switch (info)
361370
{
362371
caseXLOG_HEAP2_MULTI_INSERT:
363-
if (SnapBuildProcessChange(builder,xid,buf->origptr))
372+
if (!ctx->fast_forward&&
373+
SnapBuildProcessChange(builder,xid,buf->origptr))
364374
DecodeMultiInsert(ctx,buf);
365375
break;
366376
caseXLOG_HEAP2_NEW_CID:
@@ -408,8 +418,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
408418

409419
ReorderBufferProcessXid(ctx->reorder,xid,buf->origptr);
410420

411-
/* no point in doing anything yet */
412-
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
421+
/*
422+
* If we don't have snapshot or we are just fast-forwarding, there is no
423+
* point in decoding data changes.
424+
*/
425+
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT||
426+
ctx->fast_forward)
413427
return;
414428

415429
switch (info)
@@ -501,8 +515,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
501515

502516
ReorderBufferProcessXid(ctx->reorder,XLogRecGetXid(r),buf->origptr);
503517

504-
/* No point in doing anything yet. */
505-
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
518+
/*
519+
* If we don't have snapshot or we are just fast-forwarding, there is no
520+
* point in decoding messages.
521+
*/
522+
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT||
523+
ctx->fast_forward)
506524
return;
507525

508526
message= (xl_logical_message*)XLogRecGetData(r);
@@ -554,8 +572,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
554572
*/
555573
if (parsed->nmsgs>0)
556574
{
557-
ReorderBufferAddInvalidations(ctx->reorder,xid,buf->origptr,
558-
parsed->nmsgs,parsed->msgs);
575+
if (!ctx->fast_forward)
576+
ReorderBufferAddInvalidations(ctx->reorder,xid,buf->origptr,
577+
parsed->nmsgs,parsed->msgs);
559578
ReorderBufferXidSetCatalogChanges(ctx->reorder,xid,buf->origptr);
560579
}
561580

@@ -574,6 +593,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
574593
* are restarting or if we haven't assembled a consistent snapshot yet.
575594
* 2) The transaction happened in another database.
576595
* 3) The output plugin is not interested in the origin.
596+
* 4) We are doing fast-forwarding
577597
*
578598
* We can't just use ReorderBufferAbort() here, because we need to execute
579599
* the transaction's invalidations. This currently won't be needed if
@@ -589,7 +609,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
589609
*/
590610
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr)||
591611
(parsed->dbId!=InvalidOid&&parsed->dbId!=ctx->slot->data.database)||
592-
FilterByOrigin(ctx,origin_id))
612+
ctx->fast_forward||FilterByOrigin(ctx,origin_id))
593613
{
594614
for (i=0;i<parsed->nsubxacts;i++)
595615
{

‎src/backend/replication/logical/logical.c

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options,
115115
XLogRecPtrstart_lsn,
116116
TransactionIdxmin_horizon,
117117
boolneed_full_snapshot,
118+
boolfast_forward,
118119
XLogPageReadCBread_page,
119120
LogicalOutputPluginWriterPrepareWriteprepare_write,
120121
LogicalOutputPluginWriterWritedo_write,
@@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options,
140141
* (re-)load output plugins, so we detect a bad (removed) output plugin
141142
* now.
142143
*/
143-
LoadOutputPlugin(&ctx->callbacks,NameStr(slot->data.plugin));
144+
if (!fast_forward)
145+
LoadOutputPlugin(&ctx->callbacks,NameStr(slot->data.plugin));
144146

145147
/*
146148
* Now that the slot's xmin has been set, we can announce ourselves as a
@@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options,
191193

192194
ctx->output_plugin_options=output_plugin_options;
193195

196+
ctx->fast_forward=fast_forward;
197+
194198
MemoryContextSwitchTo(old_context);
195199

196200
returnctx;
@@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin,
303307
ReplicationSlotSave();
304308

305309
ctx=StartupDecodingContext(NIL,InvalidXLogRecPtr,xmin_horizon,
306-
need_full_snapshot,read_page,prepare_write,
307-
do_write,update_progress);
310+
need_full_snapshot, true,
311+
read_page,prepare_write,do_write,
312+
update_progress);
308313

309314
/* call output plugin initialization callback */
310315
old_context=MemoryContextSwitchTo(ctx->context);
@@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin,
342347
LogicalDecodingContext*
343348
CreateDecodingContext(XLogRecPtrstart_lsn,
344349
List*output_plugin_options,
350+
boolfast_forward,
345351
XLogPageReadCBread_page,
346352
LogicalOutputPluginWriterPrepareWriteprepare_write,
347353
LogicalOutputPluginWriterWritedo_write,
@@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
395401

396402
ctx=StartupDecodingContext(output_plugin_options,
397403
start_lsn,InvalidTransactionId, false,
398-
read_page,prepare_write,do_write,
399-
update_progress);
404+
fast_forward,read_page,prepare_write,
405+
do_write,update_progress);
400406

401407
/* call output plugin initialization callback */
402408
old_context=MemoryContextSwitchTo(ctx->context);
@@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
573579
LogicalErrorCallbackStatestate;
574580
ErrorContextCallbackerrcallback;
575581

582+
Assert(!ctx->fast_forward);
583+
576584
/* Push callback + info on the error context stack */
577585
state.ctx=ctx;
578586
state.callback_name="startup";
@@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
598606
LogicalErrorCallbackStatestate;
599607
ErrorContextCallbackerrcallback;
600608

609+
Assert(!ctx->fast_forward);
610+
601611
/* Push callback + info on the error context stack */
602612
state.ctx=ctx;
603613
state.callback_name="shutdown";
@@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
629639
LogicalErrorCallbackStatestate;
630640
ErrorContextCallbackerrcallback;
631641

642+
Assert(!ctx->fast_forward);
643+
632644
/* Push callback + info on the error context stack */
633645
state.ctx=ctx;
634646
state.callback_name="begin";
@@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
658670
LogicalErrorCallbackStatestate;
659671
ErrorContextCallbackerrcallback;
660672

673+
Assert(!ctx->fast_forward);
674+
661675
/* Push callback + info on the error context stack */
662676
state.ctx=ctx;
663677
state.callback_name="commit";
@@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
687701
LogicalErrorCallbackStatestate;
688702
ErrorContextCallbackerrcallback;
689703

704+
Assert(!ctx->fast_forward);
705+
690706
/* Push callback + info on the error context stack */
691707
state.ctx=ctx;
692708
state.callback_name="change";
@@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
721737
ErrorContextCallbackerrcallback;
722738
boolret;
723739

740+
Assert(!ctx->fast_forward);
741+
724742
/* Push callback + info on the error context stack */
725743
state.ctx=ctx;
726744
state.callback_name="filter_by_origin";
@@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
751769
LogicalErrorCallbackStatestate;
752770
ErrorContextCallbackerrcallback;
753771

772+
Assert(!ctx->fast_forward);
773+
754774
if (ctx->callbacks.message_cb==NULL)
755775
return;
756776

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
251251
/* restart at slot's confirmed_flush */
252252
ctx=CreateDecodingContext(InvalidXLogRecPtr,
253253
options,
254+
false,
254255
logical_read_local_xlog_page,
255256
LogicalOutputPrepareWrite,
256257
LogicalOutputWrite,NULL);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp