@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8181ReorderBufferTXN * txn ,XLogRecPtr prepare_lsn );
8282
8383static bool publications_valid ;
84- static bool in_streaming ;
8584
8685static List * LoadPublications (List * pubnames );
8786static void publication_invalidation_cb (Datum arg ,int cacheid ,
@@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
480479(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
481480errmsg ("streaming requested, but not supported by output plugin" )));
482481
483- /* Also remember we're currently not streaming any transaction. */
484- in_streaming = false;
485-
486482/*
487483 * Here, we just check whether the two-phase option is passed by
488484 * plugin and decide whether to enable it at later point of time. It
@@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
680676ReorderBufferChange * change ,
681677Relation relation ,RelationSyncEntry * relentry )
682678{
679+ PGOutputData * data = (PGOutputData * )ctx -> output_plugin_private ;
683680bool schema_sent ;
684681TransactionId xid = InvalidTransactionId ;
685682TransactionId topxid = InvalidTransactionId ;
@@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
692689 * If we're not in a streaming block, just use InvalidTransactionId and
693690 * the write methods will not include it.
694691 */
695- if (in_streaming )
692+ if (data -> in_streaming )
696693xid = change -> txn -> xid ;
697694
698695if (rbtxn_is_subtxn (change -> txn ))
@@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
712709 * doing that we need to study its impact on the case where we have a mix
713710 * of streaming and non-streaming transactions.
714711 */
715- if (in_streaming )
712+ if (data -> in_streaming )
716713schema_sent = get_schema_sent_in_streamed_txn (relentry ,topxid );
717714else
718715schema_sent = relentry -> schema_sent ;
@@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
736733
737734send_relation_and_attrs (relation ,xid ,ctx ,relentry -> columns );
738735
739- if (in_streaming )
736+ if (data -> in_streaming )
740737set_schema_sent_in_streamed_txn (relentry ,topxid );
741738else
742739relentry -> schema_sent = true;
@@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14221419 * their association and on aborts, it can discard the corresponding
14231420 * changes.
14241421 */
1425- if (in_streaming )
1422+ if (data -> in_streaming )
14261423xid = change -> txn -> xid ;
14271424
14281425relentry = get_rel_sync_entry (data ,relation );
@@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15711568TransactionId xid = InvalidTransactionId ;
15721569
15731570/* Remember the xid for the change in streaming mode. See pgoutput_change. */
1574- if (in_streaming )
1571+ if (data -> in_streaming )
15751572xid = change -> txn -> xid ;
15761573
15771574old = MemoryContextSwitchTo (data -> context );
@@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16401637 * Remember the xid for the message in streaming mode. See
16411638 * pgoutput_change.
16421639 */
1643- if (in_streaming )
1640+ if (data -> in_streaming )
16441641xid = txn -> xid ;
16451642
16461643/*
@@ -1743,10 +1740,11 @@ static void
17431740pgoutput_stream_start (struct LogicalDecodingContext * ctx ,
17441741ReorderBufferTXN * txn )
17451742{
1743+ PGOutputData * data = (PGOutputData * )ctx -> output_plugin_private ;
17461744bool send_replication_origin = txn -> origin_id != InvalidRepOriginId ;
17471745
17481746/* we can't nest streaming of transactions */
1749- Assert (!in_streaming );
1747+ Assert (!data -> in_streaming );
17501748
17511749/*
17521750 * If we already sent the first stream for this transaction then don't
@@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
17641762OutputPluginWrite (ctx , true);
17651763
17661764/* we're streaming a chunk of transaction now */
1767- in_streaming = true;
1765+ data -> in_streaming = true;
17681766}
17691767
17701768/*
@@ -1774,15 +1772,17 @@ static void
17741772pgoutput_stream_stop (struct LogicalDecodingContext * ctx ,
17751773ReorderBufferTXN * txn )
17761774{
1775+ PGOutputData * data = (PGOutputData * )ctx -> output_plugin_private ;
1776+
17771777/* we should be streaming a transaction */
1778- Assert (in_streaming );
1778+ Assert (data -> in_streaming );
17791779
17801780OutputPluginPrepareWrite (ctx , true);
17811781logicalrep_write_stream_stop (ctx -> out );
17821782OutputPluginWrite (ctx , true);
17831783
17841784/* we've stopped streaming a transaction */
1785- in_streaming = false;
1785+ data -> in_streaming = false;
17861786}
17871787
17881788/*
@@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
18021802 * The abort should happen outside streaming block, even for streamed
18031803 * transactions. The transaction has to be marked as streamed, though.
18041804 */
1805- Assert (!in_streaming );
1805+ Assert (!data -> in_streaming );
18061806
18071807/* determine the toplevel transaction */
18081808toptxn = rbtxn_get_toptxn (txn );
@@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
18271827ReorderBufferTXN * txn ,
18281828XLogRecPtr commit_lsn )
18291829{
1830+ PGOutputData * data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData * )ctx -> output_plugin_private ;
1831+
18301832/*
18311833 * The commit should happen outside streaming block, even for streamed
18321834 * transactions. The transaction has to be marked as streamed, though.
18331835 */
1834- Assert (!in_streaming );
1836+ Assert (!data -> in_streaming );
18351837Assert (rbtxn_is_streamed (txn ));
18361838
18371839OutputPluginUpdateProgress (ctx , false);