Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0ba5181

Browse files
author
Amit Kapila
committed
Skip empty transaction stream in test_decoding.
We were decoding empty transactions via streaming APIs added in commit45fdc97 even when the user used the option 'skip-empty-xacts'. The APIsmakes no effort to skip empty xacts under the assumption that we willnever try to stream such transactions. However, that is not true becausewe can pick to stream a transaction that has change messages forREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such messages todownstream rather they are just to update the internal state. So, we needto skip such xacts when plugin uses the option 'skip-empty-xacts'.Diagnosed-By: Amit KapilaAuthor: Dilip KumarReviewed-by: Amit KapilaDiscussion:https://postgr.es/m/CAA4eK1+OqgFNZkf7=ETe_y5ntjgDk3T0wcdkd4Sot_u1hySGfw@mail.gmail.com
1 parent9f1cf97 commit0ba5181

File tree

5 files changed

+95
-23
lines changed

5 files changed

+95
-23
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
77
decoding_into_rel binary prepared replorigin time messages\
88
spill slot truncate stream
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml\
10-
oldest_xmin snapshot_transfer subxact_without_top
10+
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
1111

1212
REGRESS_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
1313
ISOLATION_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
4+
step s0_begin: BEGIN;
5+
step s0_ddl: CREATE TABLE stream_test1(data text);
6+
step s1_ddl: CREATE TABLE stream_test(data text);
7+
step s1_begin: BEGIN;
8+
step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
9+
step s1_commit: COMMIT;
10+
step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
11+
data
12+
13+
opening a streamed block for transaction
14+
streaming change for transaction
15+
closing a streamed block for transaction
16+
committing streamed transaction
17+
?column?
18+
19+
stop

‎contrib/test_decoding/expected/stream.out

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ COMMIT;
2929
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
3030
data
3131
----------------------------------------------------------
32-
opening a streamed block for transaction
3332
streaming message: transactional: 1 prefix: test, sz: 50
34-
closing a streamed block for transaction
35-
aborting streamed (sub)transaction
3633
opening a streamed block for transaction
3734
streaming change for transaction
3835
streaming change for transaction
@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
5653
streaming change for transaction
5754
closing a streamed block for transaction
5855
committing streamed transaction
59-
(27 rows)
56+
(24 rows)
6057

6158
-- streaming test for toast changes
6259
ALTER TABLE stream_test ALTER COLUMN data set storage external;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Test decoding of in-progress transaction containing dml and a concurrent
2+
# transaction with ddl operation. The transaction containing ddl operation
3+
# should not get streamed as it doesn't have any changes.
4+
5+
setup
6+
{
7+
SELECT'init'FROMpg_create_logical_replication_slot('isolation_slot','test_decoding');
8+
9+
--consumeDDL
10+
SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
11+
CREATEORREPLACEFUNCTIONlarge_val()RETURNSTEXTLANGUAGESQLAS'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
12+
}
13+
14+
teardown
15+
{
16+
DROPTABLEIFEXISTSstream_test;
17+
DROPTABLEIFEXISTSstream_test1;
18+
SELECT'stop'FROMpg_drop_replication_slot('isolation_slot');
19+
}
20+
21+
session"s0"
22+
setup {SETsynchronous_commit=on; }
23+
step"s0_begin" {BEGIN; }
24+
step"s0_ddl" {CREATETABLEstream_test1(datatext);}
25+
26+
# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
27+
# the currently running s0_ddl and we want to test that s0_ddl should not get
28+
# streamed when user asked to skip-empty-xacts.
29+
session"s1"
30+
setup {SETsynchronous_commit=on; }
31+
step"s1_ddl" {CREATETABLEstream_test(datatext); }
32+
step"s1_begin" {BEGIN; }
33+
step"s1_toast_insert" {INSERTINTOstream_testSELECTlarge_val();}
34+
step"s1_commit" {COMMIT; }
35+
step"s1_get_stream_changes" {SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1','stream-changes','1');}
36+
37+
permutation"s0_begin""s0_ddl""s1_ddl""s1_begin""s1_toast_insert""s1_commit""s1_get_stream_changes"

‎contrib/test_decoding/test_decoding.c

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
6464
Sizesz,constchar*message);
6565
staticvoidpg_decode_stream_start(LogicalDecodingContext*ctx,
6666
ReorderBufferTXN*txn);
67+
staticvoidpg_output_stream_start(LogicalDecodingContext*ctx,
68+
TestDecodingData*data,
69+
ReorderBufferTXN*txn,
70+
boollast_write);
6771
staticvoidpg_decode_stream_stop(LogicalDecodingContext*ctx,
6872
ReorderBufferTXN*txn);
6973
staticvoidpg_decode_stream_abort(LogicalDecodingContext*ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
583587
OutputPluginWrite(ctx, true);
584588
}
585589

586-
/*
587-
* We never try to stream any empty xact so we don't need any special handling
588-
* for skip_empty_xacts in streaming mode APIs.
589-
*/
590590
staticvoid
591591
pg_decode_stream_start(LogicalDecodingContext*ctx,
592592
ReorderBufferTXN*txn)
593593
{
594594
TestDecodingData*data=ctx->output_plugin_private;
595595

596-
OutputPluginPrepareWrite(ctx, true);
596+
data->xact_wrote_changes= false;
597+
if (data->skip_empty_xacts)
598+
return;
599+
pg_output_stream_start(ctx,data,txn, true);
600+
}
601+
602+
staticvoid
603+
pg_output_stream_start(LogicalDecodingContext*ctx,TestDecodingData*data,ReorderBufferTXN*txn,boollast_write)
604+
{
605+
OutputPluginPrepareWrite(ctx,last_write);
597606
if (data->include_xids)
598607
appendStringInfo(ctx->out,"opening a streamed block for transaction TXN %u",txn->xid);
599608
else
600609
appendStringInfo(ctx->out,"opening a streamed block for transaction");
601-
OutputPluginWrite(ctx,true);
610+
OutputPluginWrite(ctx,last_write);
602611
}
603612

604-
/*
605-
* We never try to stream any empty xact so we don't need any special handling
606-
* for skip_empty_xacts in streaming mode APIs.
607-
*/
608613
staticvoid
609614
pg_decode_stream_stop(LogicalDecodingContext*ctx,
610615
ReorderBufferTXN*txn)
611616
{
612617
TestDecodingData*data=ctx->output_plugin_private;
613618

619+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
620+
return;
621+
614622
OutputPluginPrepareWrite(ctx, true);
615623
if (data->include_xids)
616624
appendStringInfo(ctx->out,"closing a streamed block for transaction TXN %u",txn->xid);
@@ -619,17 +627,16 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
619627
OutputPluginWrite(ctx, true);
620628
}
621629

622-
/*
623-
* We never try to stream any empty xact so we don't need any special handling
624-
* for skip_empty_xacts in streaming mode APIs.
625-
*/
626630
staticvoid
627631
pg_decode_stream_abort(LogicalDecodingContext*ctx,
628632
ReorderBufferTXN*txn,
629633
XLogRecPtrabort_lsn)
630634
{
631635
TestDecodingData*data=ctx->output_plugin_private;
632636

637+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
638+
return;
639+
633640
OutputPluginPrepareWrite(ctx, true);
634641
if (data->include_xids)
635642
appendStringInfo(ctx->out,"aborting streamed (sub)transaction TXN %u",txn->xid);
@@ -638,17 +645,16 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
638645
OutputPluginWrite(ctx, true);
639646
}
640647

641-
/*
642-
* We never try to stream any empty xact so we don't need any special handling
643-
* for skip_empty_xacts in streaming mode APIs.
644-
*/
645648
staticvoid
646649
pg_decode_stream_commit(LogicalDecodingContext*ctx,
647650
ReorderBufferTXN*txn,
648651
XLogRecPtrcommit_lsn)
649652
{
650653
TestDecodingData*data=ctx->output_plugin_private;
651654

655+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
656+
return;
657+
652658
OutputPluginPrepareWrite(ctx, true);
653659

654660
if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
676682
{
677683
TestDecodingData*data=ctx->output_plugin_private;
678684

685+
/* output stream start if we haven't yet */
686+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
687+
{
688+
pg_output_stream_start(ctx,data,txn, false);
689+
}
690+
data->xact_wrote_changes= true;
691+
679692
OutputPluginPrepareWrite(ctx, true);
680693
if (data->include_xids)
681694
appendStringInfo(ctx->out,"streaming change for TXN %u",txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
722735
{
723736
TestDecodingData*data=ctx->output_plugin_private;
724737

738+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
739+
{
740+
pg_output_stream_start(ctx,data,txn, false);
741+
}
742+
data->xact_wrote_changes= true;
743+
725744
OutputPluginPrepareWrite(ctx, true);
726745
if (data->include_xids)
727746
appendStringInfo(ctx->out,"streaming truncate for TXN %u",txn->xid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp