Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf64ea6d

Browse files
author
Amit Kapila
committed
Add a xid argument to the filter_prepare callback for output plugins.
Along with gid, this provides a different way to identify the transaction.The users that use xid in some way to prepare the transactions can use itto filter prepare transactions. The later commands COMMIT PREPARED orROLLBACK PREPARED carries both identifiers, providing an output plugin thechoice of what to use.Author: Markus WannerReviewed-by: Vignesh C, Amit KapilaDiscussion:https://postgr.es/m/ee280000-7355-c4dc-e47b-2436e7be959c@enterprisedb.com
1 parentbc2797e commitf64ea6d

File tree

6 files changed

+38
-21
lines changed

6 files changed

+38
-21
lines changed

‎contrib/test_decoding/test_decoding.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
7777
booltransactional,constchar*prefix,
7878
Sizesz,constchar*message);
7979
staticboolpg_decode_filter_prepare(LogicalDecodingContext*ctx,
80+
TransactionIdxid,
8081
constchar*gid);
8182
staticvoidpg_decode_begin_prepare_txn(LogicalDecodingContext*ctx,
8283
ReorderBufferTXN*txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
440441
* substring, then we filter it out.
441442
*/
442443
staticbool
443-
pg_decode_filter_prepare(LogicalDecodingContext*ctx,constchar*gid)
444+
pg_decode_filter_prepare(LogicalDecodingContext*ctx,TransactionIdxid,
445+
constchar*gid)
444446
{
445447
if (strstr(gid,"_nodecode")!=NULL)
446448
return true;

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -794,20 +794,25 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
794794
<command>COMMIT PREPARED</command> time. To signal that
795795
decoding should be skipped, return <literal>true</literal>;
796796
<literal>false</literal> otherwise. When the callback is not
797-
defined, <literal>false</literal> is assumed (i.e.nothing is
798-
filtered).
797+
defined, <literal>false</literal> is assumed (i.e.no filtering, all
798+
transactions using two-phase commit are decoded in two phases as well).
799799
<programlisting>
800800
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
801+
TransactionId xid,
801802
const char *gid);
802803
</programlisting>
803-
The <parameter>ctx</parameter> parameter has the same contents as for the
804-
other callbacks. The <parameter>gid</parameter> is the identifier that later
805-
identifies this transaction for <command>COMMIT PREPARED</command> or
806-
<command>ROLLBACK PREPARED</command>.
804+
The <parameter>ctx</parameter> parameter has the same contents as for
805+
the other callbacks. The parameters <parameter>xid</parameter>
806+
and <parameter>gid</parameter> provide two different ways to identify
807+
the transaction. The later <command>COMMIT PREPARED</command> or
808+
<command>ROLLBACK PREPARED</command> carries both identifiers,
809+
providing an output plugin the choice of what to use.
807810
</para>
808811
<para>
809-
The callback has to provide the same static answer for a given
810-
<parameter>gid</parameter> every time it is called.
812+
The callback may be invoked multiple times per transaction to decode
813+
and must provide the same static answer for a given pair of
814+
<parameter>xid</parameter> and <parameter>gid</parameter> every time
815+
it is called.
811816
</para>
812817
</sect3>
813818

@@ -1219,9 +1224,11 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction
12191224
</para>
12201225

12211226
<para>
1222-
Optionally the output plugin can specify a name pattern in the
1223-
<function>filter_prepare_cb</function> and transactions with gid containing
1224-
that name pattern will not be decoded as a two-phase commit transaction.
1227+
Optionally the output plugin can define filtering rules via
1228+
<function>filter_prepare_cb</function> to decode only specific transaction
1229+
in two phases. This can be achieved by pattern matching on the
1230+
<parameter>gid</parameter> or via lookups using the
1231+
<parameter>xid</parameter>.
12251232
</para>
12261233

12271234
<para>

‎src/backend/replication/logical/decode.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
8080
staticvoidDecodeXLogTuple(char*data,Sizelen,ReorderBufferTupleBuf*tup);
8181

8282
/* helper functions for decoding transactions */
83-
staticinlineboolFilterPrepare(LogicalDecodingContext*ctx,constchar*gid);
83+
staticinlineboolFilterPrepare(LogicalDecodingContext*ctx,
84+
TransactionIdxid,constchar*gid);
8485
staticboolDecodeTXNNeedSkip(LogicalDecodingContext*ctx,
8586
XLogRecordBuffer*buf,OiddbId,
8687
RepOriginIdorigin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
271272
* doesn't filter the transaction at prepare time.
272273
*/
273274
if (info==XLOG_XACT_COMMIT_PREPARED)
274-
two_phase= !(FilterPrepare(ctx,parsed.twophase_gid));
275+
two_phase= !(FilterPrepare(ctx,xid,
276+
parsed.twophase_gid));
275277

276278
DecodeCommit(ctx,buf,&parsed,xid,two_phase);
277279
break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
298300
* doesn't filter the transaction at prepare time.
299301
*/
300302
if (info==XLOG_XACT_ABORT_PREPARED)
301-
two_phase= !(FilterPrepare(ctx,parsed.twophase_gid));
303+
two_phase= !(FilterPrepare(ctx,xid,
304+
parsed.twophase_gid));
302305

303306
DecodeAbort(ctx,buf,&parsed,xid,two_phase);
304307
break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
355358
* manner iff output plugin supports two-phase commits and
356359
* doesn't filter the transaction at prepare time.
357360
*/
358-
if (FilterPrepare(ctx,parsed.twophase_gid))
361+
if (FilterPrepare(ctx,parsed.twophase_xid,
362+
parsed.twophase_gid))
359363
{
360364
ReorderBufferProcessXid(reorder,parsed.twophase_xid,
361365
buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
581585
* this transaction as a regular commit later.
582586
*/
583587
staticinlinebool
584-
FilterPrepare(LogicalDecodingContext*ctx,constchar*gid)
588+
FilterPrepare(LogicalDecodingContext*ctx,TransactionIdxid,
589+
constchar*gid)
585590
{
586591
/*
587592
* Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
599604
if (ctx->callbacks.filter_prepare_cb==NULL)
600605
return false;
601606

602-
returnfilter_prepare_cb_wrapper(ctx,gid);
607+
returnfilter_prepare_cb_wrapper(ctx,xid,gid);
603608
}
604609

605610
staticinlinebool

‎src/backend/replication/logical/logical.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10831083
}
10841084

10851085
bool
1086-
filter_prepare_cb_wrapper(LogicalDecodingContext*ctx,constchar*gid)
1086+
filter_prepare_cb_wrapper(LogicalDecodingContext*ctx,TransactionIdxid,
1087+
constchar*gid)
10871088
{
10881089
LogicalErrorCallbackStatestate;
10891090
ErrorContextCallbackerrcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
11041105
ctx->accept_writes= false;
11051106

11061107
/* do the actual work: call callback */
1107-
ret=ctx->callbacks.filter_prepare_cb(ctx,gid);
1108+
ret=ctx->callbacks.filter_prepare_cb(ctx,xid,gid);
11081109

11091110
/* Pop the error context stack */
11101111
error_context_stack=errcallback.previous;

‎src/include/replication/logical.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
125125
XLogRecPtrrestart_lsn);
126126
externvoidLogicalConfirmReceivedLocation(XLogRecPtrlsn);
127127

128-
externboolfilter_prepare_cb_wrapper(LogicalDecodingContext*ctx,constchar*gid);
128+
externboolfilter_prepare_cb_wrapper(LogicalDecodingContext*ctx,
129+
TransactionIdxid,constchar*gid);
129130
externboolfilter_by_origin_cb_wrapper(LogicalDecodingContext*ctx,RepOriginIdorigin_id);
130131
externvoidResetLogicalStreamingState(void);
131132
externvoidUpdateDecodingStats(LogicalDecodingContext*ctx);

‎src/include/replication/output_plugin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
106106
* and sent as usual transaction.
107107
*/
108108
typedefbool (*LogicalDecodeFilterPrepareCB) (structLogicalDecodingContext*ctx,
109+
TransactionIdxid,
109110
constchar*gid);
110111

111112
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp