@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
81
81
ReorderBufferTXN * txn ,XLogRecPtr prepare_lsn );
82
82
83
83
static bool publications_valid ;
84
- static bool in_streaming ;
85
84
86
85
static List * LoadPublications (List * pubnames );
87
86
static void publication_invalidation_cb (Datum arg ,int cacheid ,
@@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
480
479
(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
481
480
errmsg ("streaming requested, but not supported by output plugin" )));
482
481
483
- /* Also remember we're currently not streaming any transaction. */
484
- in_streaming = false;
485
-
486
482
/*
487
483
* Here, we just check whether the two-phase option is passed by
488
484
* plugin and decide whether to enable it at later point of time. It
@@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
680
676
ReorderBufferChange * change ,
681
677
Relation relation ,RelationSyncEntry * relentry )
682
678
{
679
+ PGOutputData * data = (PGOutputData * )ctx -> output_plugin_private ;
683
680
bool schema_sent ;
684
681
TransactionId xid = InvalidTransactionId ;
685
682
TransactionId topxid = InvalidTransactionId ;
@@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
692
689
* If we're not in a streaming block, just use InvalidTransactionId and
693
690
* the write methods will not include it.
694
691
*/
695
- if (in_streaming )
692
+ if (data -> in_streaming )
696
693
xid = change -> txn -> xid ;
697
694
698
695
if (rbtxn_is_subtxn (change -> txn ))
@@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
712
709
* doing that we need to study its impact on the case where we have a mix
713
710
* of streaming and non-streaming transactions.
714
711
*/
715
- if (in_streaming )
712
+ if (data -> in_streaming )
716
713
schema_sent = get_schema_sent_in_streamed_txn (relentry ,topxid );
717
714
else
718
715
schema_sent = relentry -> schema_sent ;
@@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
736
733
737
734
send_relation_and_attrs (relation ,xid ,ctx ,relentry -> columns );
738
735
739
- if (in_streaming )
736
+ if (data -> in_streaming )
740
737
set_schema_sent_in_streamed_txn (relentry ,topxid );
741
738
else
742
739
relentry -> schema_sent = true;
@@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1422
1419
* their association and on aborts, it can discard the corresponding
1423
1420
* changes.
1424
1421
*/
1425
- if (in_streaming )
1422
+ if (data -> in_streaming )
1426
1423
xid = change -> txn -> xid ;
1427
1424
1428
1425
relentry = get_rel_sync_entry (data ,relation );
@@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1571
1568
TransactionId xid = InvalidTransactionId ;
1572
1569
1573
1570
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
1574
- if (in_streaming )
1571
+ if (data -> in_streaming )
1575
1572
xid = change -> txn -> xid ;
1576
1573
1577
1574
old = MemoryContextSwitchTo (data -> context );
@@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1640
1637
* Remember the xid for the message in streaming mode. See
1641
1638
* pgoutput_change.
1642
1639
*/
1643
- if (in_streaming )
1640
+ if (data -> in_streaming )
1644
1641
xid = txn -> xid ;
1645
1642
1646
1643
/*
@@ -1743,10 +1740,11 @@ static void
1743
1740
pgoutput_stream_start (struct LogicalDecodingContext * ctx ,
1744
1741
ReorderBufferTXN * txn )
1745
1742
{
1743
+ PGOutputData * data = (PGOutputData * )ctx -> output_plugin_private ;
1746
1744
bool send_replication_origin = txn -> origin_id != InvalidRepOriginId ;
1747
1745
1748
1746
/* we can't nest streaming of transactions */
1749
- Assert (!in_streaming );
1747
+ Assert (!data -> in_streaming );
1750
1748
1751
1749
/*
1752
1750
* If we already sent the first stream for this transaction then don't
@@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1764
1762
OutputPluginWrite (ctx , true);
1765
1763
1766
1764
/* we're streaming a chunk of transaction now */
1767
- in_streaming = true;
1765
+ data -> in_streaming = true;
1768
1766
}
1769
1767
1770
1768
/*
@@ -1774,15 +1772,17 @@ static void
1774
1772
pgoutput_stream_stop (struct LogicalDecodingContext * ctx ,
1775
1773
ReorderBufferTXN * txn )
1776
1774
{
1775
+ PGOutputData * data = (PGOutputData * )ctx -> output_plugin_private ;
1776
+
1777
1777
/* we should be streaming a transaction */
1778
- Assert (in_streaming );
1778
+ Assert (data -> in_streaming );
1779
1779
1780
1780
OutputPluginPrepareWrite (ctx , true);
1781
1781
logicalrep_write_stream_stop (ctx -> out );
1782
1782
OutputPluginWrite (ctx , true);
1783
1783
1784
1784
/* we've stopped streaming a transaction */
1785
- in_streaming = false;
1785
+ data -> in_streaming = false;
1786
1786
}
1787
1787
1788
1788
/*
@@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1802
1802
* The abort should happen outside streaming block, even for streamed
1803
1803
* transactions. The transaction has to be marked as streamed, though.
1804
1804
*/
1805
- Assert (!in_streaming );
1805
+ Assert (!data -> in_streaming );
1806
1806
1807
1807
/* determine the toplevel transaction */
1808
1808
toptxn = rbtxn_get_toptxn (txn );
@@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1827
1827
ReorderBufferTXN * txn ,
1828
1828
XLogRecPtr commit_lsn )
1829
1829
{
1830
+ PGOutputData * data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData * )ctx -> output_plugin_private ;
1831
+
1830
1832
/*
1831
1833
* The commit should happen outside streaming block, even for streamed
1832
1834
* transactions. The transaction has to be marked as streamed, though.
1833
1835
*/
1834
- Assert (!in_streaming );
1836
+ Assert (!data -> in_streaming );
1835
1837
Assert (rbtxn_is_streamed (txn ));
1836
1838
1837
1839
OutputPluginUpdateProgress (ctx , false);