Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9210afd

Browse files
committed
Move tracking of in_streaming to PGOutputData
"in_streaming" is a flag used to track if an instance of pgoutput isstreaming changes. When pgoutput is started, the flag was always reset,switched it back and forth in the stream start/stop callbacks.Before this commit, it was a global variable, which is confusing as itis actually attached to a state of PGOutputData. Per my analysis, usinga global variable did not lead to an active bug like in54ccfd6,but it makes the code more consistent. Note that we cannot backpatchthis change anyway as it requires the addition of a new field toPGOutputData, exposed in pgoutput.h.Author: Hou ZhijieReviewed-by: Amit Kapila, Michael Paquier, Peter SmithDiscussion:https://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parentebf76f2 commit9210afd

File tree

2 files changed

+21
-16
lines changed

2 files changed

+21
-16
lines changed

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8181
ReorderBufferTXN*txn,XLogRecPtrprepare_lsn);
8282

8383
staticboolpublications_valid;
84-
staticboolin_streaming;
8584

8685
staticList*LoadPublications(List*pubnames);
8786
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
@@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
480479
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
481480
errmsg("streaming requested, but not supported by output plugin")));
482481

483-
/* Also remember we're currently not streaming any transaction. */
484-
in_streaming= false;
485-
486482
/*
487483
* Here, we just check whether the two-phase option is passed by
488484
* plugin and decide whether to enable it at later point of time. It
@@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
680676
ReorderBufferChange*change,
681677
Relationrelation,RelationSyncEntry*relentry)
682678
{
679+
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
683680
boolschema_sent;
684681
TransactionIdxid=InvalidTransactionId;
685682
TransactionIdtopxid=InvalidTransactionId;
@@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
692689
* If we're not in a streaming block, just use InvalidTransactionId and
693690
* the write methods will not include it.
694691
*/
695-
if (in_streaming)
692+
if (data->in_streaming)
696693
xid=change->txn->xid;
697694

698695
if (rbtxn_is_subtxn(change->txn))
@@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
712709
* doing that we need to study its impact on the case where we have a mix
713710
* of streaming and non-streaming transactions.
714711
*/
715-
if (in_streaming)
712+
if (data->in_streaming)
716713
schema_sent=get_schema_sent_in_streamed_txn(relentry,topxid);
717714
else
718715
schema_sent=relentry->schema_sent;
@@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
736733

737734
send_relation_and_attrs(relation,xid,ctx,relentry->columns);
738735

739-
if (in_streaming)
736+
if (data->in_streaming)
740737
set_schema_sent_in_streamed_txn(relentry,topxid);
741738
else
742739
relentry->schema_sent= true;
@@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14221419
* their association and on aborts, it can discard the corresponding
14231420
* changes.
14241421
*/
1425-
if (in_streaming)
1422+
if (data->in_streaming)
14261423
xid=change->txn->xid;
14271424

14281425
relentry=get_rel_sync_entry(data,relation);
@@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15711568
TransactionIdxid=InvalidTransactionId;
15721569

15731570
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
1574-
if (in_streaming)
1571+
if (data->in_streaming)
15751572
xid=change->txn->xid;
15761573

15771574
old=MemoryContextSwitchTo(data->context);
@@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16401637
* Remember the xid for the message in streaming mode. See
16411638
* pgoutput_change.
16421639
*/
1643-
if (in_streaming)
1640+
if (data->in_streaming)
16441641
xid=txn->xid;
16451642

16461643
/*
@@ -1743,10 +1740,11 @@ static void
17431740
pgoutput_stream_start(structLogicalDecodingContext*ctx,
17441741
ReorderBufferTXN*txn)
17451742
{
1743+
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
17461744
boolsend_replication_origin=txn->origin_id!=InvalidRepOriginId;
17471745

17481746
/* we can't nest streaming of transactions */
1749-
Assert(!in_streaming);
1747+
Assert(!data->in_streaming);
17501748

17511749
/*
17521750
* If we already sent the first stream for this transaction then don't
@@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
17641762
OutputPluginWrite(ctx, true);
17651763

17661764
/* we're streaming a chunk of transaction now */
1767-
in_streaming= true;
1765+
data->in_streaming= true;
17681766
}
17691767

17701768
/*
@@ -1774,15 +1772,17 @@ static void
17741772
pgoutput_stream_stop(structLogicalDecodingContext*ctx,
17751773
ReorderBufferTXN*txn)
17761774
{
1775+
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
1776+
17771777
/* we should be streaming a transaction */
1778-
Assert(in_streaming);
1778+
Assert(data->in_streaming);
17791779

17801780
OutputPluginPrepareWrite(ctx, true);
17811781
logicalrep_write_stream_stop(ctx->out);
17821782
OutputPluginWrite(ctx, true);
17831783

17841784
/* we've stopped streaming a transaction */
1785-
in_streaming= false;
1785+
data->in_streaming= false;
17861786
}
17871787

17881788
/*
@@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
18021802
* The abort should happen outside streaming block, even for streamed
18031803
* transactions. The transaction has to be marked as streamed, though.
18041804
*/
1805-
Assert(!in_streaming);
1805+
Assert(!data->in_streaming);
18061806

18071807
/* determine the toplevel transaction */
18081808
toptxn=rbtxn_get_toptxn(txn);
@@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
18271827
ReorderBufferTXN*txn,
18281828
XLogRecPtrcommit_lsn)
18291829
{
1830+
PGOutputData*dataPG_USED_FOR_ASSERTS_ONLY= (PGOutputData*)ctx->output_plugin_private;
1831+
18301832
/*
18311833
* The commit should happen outside streaming block, even for streamed
18321834
* transactions. The transaction has to be marked as streamed, though.
18331835
*/
1834-
Assert(!in_streaming);
1836+
Assert(!data->in_streaming);
18351837
Assert(rbtxn_is_streamed(txn));
18361838

18371839
OutputPluginUpdateProgress(ctx, false);

‎src/include/replication/pgoutput.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ typedef struct PGOutputData
2121
* allocations */
2222
MemoryContextcachectx;/* private memory context for cache data */
2323

24+
boolin_streaming;/* true if we are streaming a chunk of
25+
* transaction */
26+
2427
/* client-supplied info: */
2528
uint32protocol_version;
2629
List*publication_names;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp