Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9653f24

Browse files
author
Amit Kapila
committed
Fix 'skip-empty-xacts' option in test_decoding for streaming mode.
In streaming mode, the transaction can be decoded in multiple streams andthose streams can be interleaved with streams of other transactions. So,we can't remember the transaction's write status in the logical decodingcontext because that might get changed due to some other transactions andlead to wrong answers for 'skip-empty-xacts' option. We decided to keepeach transaction's write status in the ReorderBufferTxn to avoidinterleaved streams changing the status of some unrelated transactions.Diagnosed-by: Amit KapilaAuthor: Dilip KumarReviewed-by: Amit KapilaDiscussion:https://postgr.es/m/CAA4eK1LR7=XNM_TLmpZMFuV8ZQpoxkem--NZJYf8YXmesbvwLA@mail.gmail.com
1 parent2bd49b4 commit9653f24

File tree

6 files changed

+96
-19
lines changed

6 files changed

+96
-19
lines changed

‎contrib/test_decoding/expected/concurrent_stream.out‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
Parsed test spec with2 sessions
1+
Parsed test spec with3 sessions
22

3-
starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
3+
starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_inserts2_ddls1_commit s1_get_stream_changes
44
step s0_begin: BEGIN;
55
step s0_ddl: CREATE TABLE stream_test1(data text);
66
step s1_ddl: CREATE TABLE stream_test(data text);
77
step s1_begin: BEGIN;
88
step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
9+
step s2_ddl: CREATE TABLE stream_test2(data text);
910
step s1_commit: COMMIT;
1011
step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
1112
data

‎contrib/test_decoding/specs/concurrent_stream.spec‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ setup { SET synchronous_commit=on; }
2323
step"s0_begin" {BEGIN; }
2424
step"s0_ddl" {CREATETABLEstream_test1(datatext);}
2525

26+
session"s2"
27+
setup {SETsynchronous_commit=on; }
28+
step"s2_ddl" {CREATETABLEstream_test2(datatext);}
29+
2630
# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
2731
# the currently running s0_ddl and we want to test that s0_ddl should not get
28-
# streamed when user asked to skip-empty-xacts.
32+
# streamed when user asked to skip-empty-xacts. Similarly, the
33+
# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for
34+
# what gets streamed.
2935
session"s1"
3036
setup {SETsynchronous_commit=on; }
3137
step"s1_ddl" {CREATETABLEstream_test(datatext); }
@@ -34,4 +40,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
3440
step"s1_commit" {COMMIT; }
3541
step"s1_get_stream_changes" {SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1','stream-changes','1');}
3642

37-
permutation"s0_begin""s0_ddl""s1_ddl""s1_begin""s1_toast_insert""s1_commit""s1_get_stream_changes"
43+
permutation"s0_begin""s0_ddl""s1_ddl""s1_begin""s1_toast_insert""s2_ddl""s1_commit""s1_get_stream_changes"

‎contrib/test_decoding/test_decoding.c‎

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,24 @@ typedef struct
3434
boolinclude_xids;
3535
boolinclude_timestamp;
3636
boolskip_empty_xacts;
37-
boolxact_wrote_changes;
3837
boolonly_local;
3938
}TestDecodingData;
4039

40+
/*
41+
* Maintain the per-transaction level variables to track whether the
42+
* transaction and or streams have written any changes. In streaming mode the
43+
* transaction can be decoded in streams so along with maintaining whether the
44+
* transaction has written any changes, we also need to track whether the
45+
* current stream has written any changes. This is required so that if user
46+
* has requested to skip the empty transactions we can skip the empty streams
47+
* even though the transaction has written some changes.
48+
*/
49+
typedefstruct
50+
{
51+
boolxact_wrote_changes;
52+
boolstream_wrote_changes;
53+
}TestDecodingTxnData;
54+
4155
staticvoidpg_decode_startup(LogicalDecodingContext*ctx,OutputPluginOptions*opt,
4256
boolis_init);
4357
staticvoidpg_decode_shutdown(LogicalDecodingContext*ctx);
@@ -255,8 +269,12 @@ static void
255269
pg_decode_begin_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn)
256270
{
257271
TestDecodingData*data=ctx->output_plugin_private;
272+
TestDecodingTxnData*txndata=
273+
MemoryContextAllocZero(ctx->context,sizeof(TestDecodingTxnData));
274+
275+
txndata->xact_wrote_changes= false;
276+
txn->output_plugin_private=txndata;
258277

259-
data->xact_wrote_changes= false;
260278
if (data->skip_empty_xacts)
261279
return;
262280

@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
280298
XLogRecPtrcommit_lsn)
281299
{
282300
TestDecodingData*data=ctx->output_plugin_private;
301+
TestDecodingTxnData*txndata=txn->output_plugin_private;
302+
boolxact_wrote_changes=txndata->xact_wrote_changes;
303+
304+
pfree(txndata);
305+
txn->output_plugin_private=NULL;
283306

284-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
307+
if (data->skip_empty_xacts&& !xact_wrote_changes)
285308
return;
286309

287310
OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
442465
Relationrelation,ReorderBufferChange*change)
443466
{
444467
TestDecodingData*data;
468+
TestDecodingTxnData*txndata;
445469
Form_pg_classclass_form;
446470
TupleDesctupdesc;
447471
MemoryContextold;
448472

449473
data=ctx->output_plugin_private;
474+
txndata=txn->output_plugin_private;
450475

451476
/* output BEGIN if we haven't yet */
452-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
477+
if (data->skip_empty_xacts&& !txndata->xact_wrote_changes)
453478
{
454479
pg_output_begin(ctx,data,txn, false);
455480
}
456-
data->xact_wrote_changes= true;
481+
txndata->xact_wrote_changes= true;
457482

458483
class_form=RelationGetForm(relation);
459484
tupdesc=RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
527552
intnrelations,Relationrelations[],ReorderBufferChange*change)
528553
{
529554
TestDecodingData*data;
555+
TestDecodingTxnData*txndata;
530556
MemoryContextold;
531557
inti;
532558

533559
data=ctx->output_plugin_private;
560+
txndata=txn->output_plugin_private;
534561

535562
/* output BEGIN if we haven't yet */
536-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
563+
if (data->skip_empty_xacts&& !txndata->xact_wrote_changes)
537564
{
538565
pg_output_begin(ctx,data,txn, false);
539566
}
540-
data->xact_wrote_changes= true;
567+
txndata->xact_wrote_changes= true;
541568

542569
/* Avoid leaking memory by using and resetting our own context */
543570
old=MemoryContextSwitchTo(data->context);
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
592619
ReorderBufferTXN*txn)
593620
{
594621
TestDecodingData*data=ctx->output_plugin_private;
622+
TestDecodingTxnData*txndata=txn->output_plugin_private;
595623

596-
data->xact_wrote_changes= false;
624+
/*
625+
* Allocate the txn plugin data for the first stream in the transaction.
626+
*/
627+
if (txndata==NULL)
628+
{
629+
txndata=
630+
MemoryContextAllocZero(ctx->context,sizeof(TestDecodingTxnData));
631+
txndata->xact_wrote_changes= false;
632+
txn->output_plugin_private=txndata;
633+
}
634+
635+
txndata->stream_wrote_changes= false;
597636
if (data->skip_empty_xacts)
598637
return;
599638
pg_output_stream_start(ctx,data,txn, true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
615654
ReorderBufferTXN*txn)
616655
{
617656
TestDecodingData*data=ctx->output_plugin_private;
657+
TestDecodingTxnData*txndata=txn->output_plugin_private;
618658

619-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
659+
if (data->skip_empty_xacts&& !txndata->stream_wrote_changes)
620660
return;
621661

622662
OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
634674
{
635675
TestDecodingData*data=ctx->output_plugin_private;
636676

637-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
677+
/*
678+
* stream abort can be sent for an individual subtransaction but we
679+
* maintain the output_plugin_private only under the toptxn so if this is
680+
* not the toptxn then fetch the toptxn.
681+
*/
682+
ReorderBufferTXN*toptxn=txn->toptxn ?txn->toptxn :txn;
683+
TestDecodingTxnData*txndata=toptxn->output_plugin_private;
684+
boolxact_wrote_changes=txndata->xact_wrote_changes;
685+
686+
if (txn->toptxn==NULL)
687+
{
688+
Assert(txn->output_plugin_private!=NULL);
689+
pfree(txndata);
690+
txn->output_plugin_private=NULL;
691+
}
692+
693+
if (data->skip_empty_xacts&& !xact_wrote_changes)
638694
return;
639695

640696
OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
651707
XLogRecPtrcommit_lsn)
652708
{
653709
TestDecodingData*data=ctx->output_plugin_private;
710+
TestDecodingTxnData*txndata=txn->output_plugin_private;
711+
boolxact_wrote_changes=txndata->xact_wrote_changes;
712+
713+
pfree(txndata);
714+
txn->output_plugin_private=NULL;
654715

655-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
716+
if (data->skip_empty_xacts&& !xact_wrote_changes)
656717
return;
657718

658719
OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
681742
ReorderBufferChange*change)
682743
{
683744
TestDecodingData*data=ctx->output_plugin_private;
745+
TestDecodingTxnData*txndata=txn->output_plugin_private;
684746

685747
/* output stream start if we haven't yet */
686-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
748+
if (data->skip_empty_xacts&& !txndata->stream_wrote_changes)
687749
{
688750
pg_output_stream_start(ctx,data,txn, false);
689751
}
690-
data->xact_wrote_changes= true;
752+
txndata->xact_wrote_changes=txndata->stream_wrote_changes= true;
691753

692754
OutputPluginPrepareWrite(ctx, true);
693755
if (data->include_xids)
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
734796
ReorderBufferChange*change)
735797
{
736798
TestDecodingData*data=ctx->output_plugin_private;
799+
TestDecodingTxnData*txndata=txn->output_plugin_private;
737800

738-
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
801+
if (data->skip_empty_xacts&& !txndata->stream_wrote_changes)
739802
{
740803
pg_output_stream_start(ctx,data,txn, false);
741804
}
742-
data->xact_wrote_changes= true;
805+
txndata->xact_wrote_changes=txndata->stream_wrote_changes= true;
743806

744807
OutputPluginPrepareWrite(ctx, true);
745808
if (data->include_xids)

‎src/backend/replication/logical/reorderbuffer.c‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
402402

403403
/* InvalidCommandId is not zero, so set it explicitly */
404404
txn->command_id=InvalidCommandId;
405+
txn->output_plugin_private=NULL;
405406

406407
returntxn;
407408
}

‎src/include/replication/reorderbuffer.h‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
378378

379379
/* If we have detected concurrent abort then ignore future changes. */
380380
boolconcurrent_abort;
381+
382+
/*
383+
* Private data pointer of the output plugin.
384+
*/
385+
void*output_plugin_private;
381386
}ReorderBufferTXN;
382387

383388
/* so we can define the callbacks used inside struct ReorderBuffer itself */

‎src/tools/pgindent/typedefs.list‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2505,6 +2505,7 @@ Tcl_Obj
25052505
Tcl_Time
25062506
TempNamespaceStatus
25072507
TestDecodingData
2508+
TestDecodingTxnData
25082509
TestSpec
25092510
TextFreq
25102511
TextPositionState

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp