Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit57891c2

Browse files
author
Amit Kapila
committed
Add STREAM_START/STREAM_STOP for transactional messages during decoding.
In test_decoding module, when skip_empty_xacts option was specified, addstream_start/stop for streaming transactional messages. This makes thehandling of transactional messages stream consistent irrespective ofwhether skip_empty_xacts option was specified.Commit26dd028 made a similar change for non-streaming messages butforgot to update the streaming cases.Author: Peter SmithReviewed-by: Amit KapilaDiscussion:http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent675fed4 commit57891c2

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

‎contrib/test_decoding/expected/stream.out

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ COMMIT;
2929
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
3030
data
3131
----------------------------------------------------------
32+
opening a streamed block for transaction
3233
streaming message: transactional: 1 prefix: test, sz: 50
34+
closing a streamed block for transaction
35+
aborting streamed (sub)transaction
3336
opening a streamed block for transaction
3437
streaming change for transaction
3538
streaming change for transaction
@@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
5356
streaming change for transaction
5457
closing a streamed block for transaction
5558
committing streamed transaction
56-
(24 rows)
59+
(27 rows)
5760

5861
-- streaming test for toast changes
5962
ALTER TABLE stream_test ALTER COLUMN data set storage external;

‎contrib/test_decoding/expected/twophase_stream.out

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
3131
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
3232
data
3333
----------------------------------------------------------
34+
opening a streamed block for transaction
3435
streaming message: transactional: 1 prefix: test, sz: 50
36+
closing a streamed block for transaction
37+
aborting streamed (sub)transaction
3538
opening a streamed block for transaction
3639
streaming change for transaction
3740
streaming change for transaction
@@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
5558
streaming change for transaction
5659
closing a streamed block for transaction
5760
preparing streamed transaction 'test1'
58-
(24 rows)
61+
(27 rows)
5962

6063
COMMIT PREPARED 'test1';
6164
--should show the COMMIT PREPARED and the other changes in the transaction
@@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
8487
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
8588
data
8689
----------------------------------------------------------
90+
opening a streamed block for transaction
8791
streaming message: transactional: 1 prefix: test, sz: 50
88-
(1 row)
92+
closing a streamed block for transaction
93+
aborting streamed (sub)transaction
94+
(4 rows)
8995

9096
COMMIT PREPARED 'test1_nodecode';
9197
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT

‎contrib/test_decoding/test_decoding.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
944944
ReorderBufferTXN*txn,XLogRecPtrlsn,booltransactional,
945945
constchar*prefix,Sizesz,constchar*message)
946946
{
947+
/* Output stream start if we haven't yet for transactional messages. */
948+
if (transactional)
949+
{
950+
TestDecodingData*data=ctx->output_plugin_private;
951+
TestDecodingTxnData*txndata=txn->output_plugin_private;
952+
953+
if (data->skip_empty_xacts&& !txndata->stream_wrote_changes)
954+
{
955+
pg_output_stream_start(ctx,data,txn, false);
956+
}
957+
txndata->xact_wrote_changes=txndata->stream_wrote_changes= true;
958+
}
959+
947960
OutputPluginPrepareWrite(ctx, true);
948961

949962
if (transactional)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp