Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0aa8a01

Browse files
author
Amit Kapila
committed
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support forstreaming changes of two-phase transactions at prepare time.* begin_prepare* filter_prepare* prepare* commit_prepared* rollback_prepared* stream_prepareMost of this is a simple extension of the existing methods, with thesemantic difference that the transaction is not yet committed and maybeaborted later.Until now two-phase transactions were translated into regular transactionson the subscriber, and the GID was not forwarded to it. None of thetwo-phase commands were communicated to the subscriber.This patch provides the infrastructure for logical decoding plugins to beinformed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPAREDand ROLLBACK PREPARED commands with the corresponding GID.This also extends the 'test_decoding' plugin, implementing these newmethods.This commit simply adds these new APIs and the upcoming patch to "allowthe decoding at prepare time in ReorderBuffer" will use these APIs.Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas KelvichReviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip KumarDiscussion:https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ruhttps://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
1 parentfa74469 commit0aa8a01

File tree

7 files changed

+744
-7
lines changed

7 files changed

+744
-7
lines changed

‎contrib/test_decoding/test_decoding.c

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
7676
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
7777
booltransactional,constchar*prefix,
7878
Sizesz,constchar*message);
79+
staticboolpg_decode_filter_prepare(LogicalDecodingContext*ctx,
80+
constchar*gid);
81+
staticvoidpg_decode_begin_prepare_txn(LogicalDecodingContext*ctx,
82+
ReorderBufferTXN*txn);
83+
staticvoidpg_decode_prepare_txn(LogicalDecodingContext*ctx,
84+
ReorderBufferTXN*txn,
85+
XLogRecPtrprepare_lsn);
86+
staticvoidpg_decode_commit_prepared_txn(LogicalDecodingContext*ctx,
87+
ReorderBufferTXN*txn,
88+
XLogRecPtrcommit_lsn);
89+
staticvoidpg_decode_rollback_prepared_txn(LogicalDecodingContext*ctx,
90+
ReorderBufferTXN*txn,
91+
XLogRecPtrprepare_end_lsn,
92+
TimestampTzprepare_time);
7993
staticvoidpg_decode_stream_start(LogicalDecodingContext*ctx,
8094
ReorderBufferTXN*txn);
8195
staticvoidpg_output_stream_start(LogicalDecodingContext*ctx,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
87101
staticvoidpg_decode_stream_abort(LogicalDecodingContext*ctx,
88102
ReorderBufferTXN*txn,
89103
XLogRecPtrabort_lsn);
104+
staticvoidpg_decode_stream_prepare(LogicalDecodingContext*ctx,
105+
ReorderBufferTXN*txn,
106+
XLogRecPtrprepare_lsn);
90107
staticvoidpg_decode_stream_commit(LogicalDecodingContext*ctx,
91108
ReorderBufferTXN*txn,
92109
XLogRecPtrcommit_lsn);
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
123140
cb->filter_by_origin_cb=pg_decode_filter;
124141
cb->shutdown_cb=pg_decode_shutdown;
125142
cb->message_cb=pg_decode_message;
143+
cb->filter_prepare_cb=pg_decode_filter_prepare;
144+
cb->begin_prepare_cb=pg_decode_begin_prepare_txn;
145+
cb->prepare_cb=pg_decode_prepare_txn;
146+
cb->commit_prepared_cb=pg_decode_commit_prepared_txn;
147+
cb->rollback_prepared_cb=pg_decode_rollback_prepared_txn;
126148
cb->stream_start_cb=pg_decode_stream_start;
127149
cb->stream_stop_cb=pg_decode_stream_stop;
128150
cb->stream_abort_cb=pg_decode_stream_abort;
151+
cb->stream_prepare_cb=pg_decode_stream_prepare;
129152
cb->stream_commit_cb=pg_decode_stream_commit;
130153
cb->stream_change_cb=pg_decode_stream_change;
131154
cb->stream_message_cb=pg_decode_stream_message;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
141164
ListCell*option;
142165
TestDecodingData*data;
143166
boolenable_streaming= false;
167+
boolenable_twophase= false;
144168

145169
data=palloc0(sizeof(TestDecodingData));
146170
data->context=AllocSetContextCreate(ctx->context,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
241265
errmsg("could not parse value \"%s\" for parameter \"%s\"",
242266
strVal(elem->arg),elem->defname)));
243267
}
268+
elseif (strcmp(elem->defname,"two-phase-commit")==0)
269+
{
270+
if (elem->arg==NULL)
271+
continue;
272+
elseif (!parse_bool(strVal(elem->arg),&enable_twophase))
273+
ereport(ERROR,
274+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
275+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
276+
strVal(elem->arg),elem->defname)));
277+
}
244278
else
245279
{
246280
ereport(ERROR,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
252286
}
253287

254288
ctx->streaming &=enable_streaming;
289+
ctx->twophase &=enable_twophase;
255290
}
256291

257292
/* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
320355
OutputPluginWrite(ctx, true);
321356
}
322357

358+
/* BEGIN PREPARE callback */
359+
staticvoid
360+
pg_decode_begin_prepare_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn)
361+
{
362+
TestDecodingData*data=ctx->output_plugin_private;
363+
TestDecodingTxnData*txndata=
364+
MemoryContextAllocZero(ctx->context,sizeof(TestDecodingTxnData));
365+
366+
txndata->xact_wrote_changes= false;
367+
txn->output_plugin_private=txndata;
368+
369+
if (data->skip_empty_xacts)
370+
return;
371+
372+
pg_output_begin(ctx,data,txn, true);
373+
}
374+
375+
/* PREPARE callback */
376+
staticvoid
377+
pg_decode_prepare_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
378+
XLogRecPtrprepare_lsn)
379+
{
380+
TestDecodingData*data=ctx->output_plugin_private;
381+
TestDecodingTxnData*txndata=txn->output_plugin_private;
382+
383+
if (data->skip_empty_xacts&& !txndata->xact_wrote_changes)
384+
return;
385+
386+
OutputPluginPrepareWrite(ctx, true);
387+
388+
appendStringInfo(ctx->out,"PREPARE TRANSACTION %s",
389+
quote_literal_cstr(txn->gid));
390+
391+
if (data->include_xids)
392+
appendStringInfo(ctx->out,", txid %u",txn->xid);
393+
394+
if (data->include_timestamp)
395+
appendStringInfo(ctx->out," (at %s)",
396+
timestamptz_to_str(txn->commit_time));
397+
398+
OutputPluginWrite(ctx, true);
399+
}
400+
401+
/* COMMIT PREPARED callback */
402+
staticvoid
403+
pg_decode_commit_prepared_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
404+
XLogRecPtrcommit_lsn)
405+
{
406+
TestDecodingData*data=ctx->output_plugin_private;
407+
408+
OutputPluginPrepareWrite(ctx, true);
409+
410+
appendStringInfo(ctx->out,"COMMIT PREPARED %s",
411+
quote_literal_cstr(txn->gid));
412+
413+
if (data->include_xids)
414+
appendStringInfo(ctx->out,", txid %u",txn->xid);
415+
416+
if (data->include_timestamp)
417+
appendStringInfo(ctx->out," (at %s)",
418+
timestamptz_to_str(txn->commit_time));
419+
420+
OutputPluginWrite(ctx, true);
421+
}
422+
423+
/* ROLLBACK PREPARED callback */
424+
staticvoid
425+
pg_decode_rollback_prepared_txn(LogicalDecodingContext*ctx,
426+
ReorderBufferTXN*txn,
427+
XLogRecPtrprepare_end_lsn,
428+
TimestampTzprepare_time)
429+
{
430+
TestDecodingData*data=ctx->output_plugin_private;
431+
432+
OutputPluginPrepareWrite(ctx, true);
433+
434+
appendStringInfo(ctx->out,"ROLLBACK PREPARED %s",
435+
quote_literal_cstr(txn->gid));
436+
437+
if (data->include_xids)
438+
appendStringInfo(ctx->out,", txid %u",txn->xid);
439+
440+
if (data->include_timestamp)
441+
appendStringInfo(ctx->out," (at %s)",
442+
timestamptz_to_str(txn->commit_time));
443+
444+
OutputPluginWrite(ctx, true);
445+
}
446+
447+
/*
448+
* Filter out two-phase transactions.
449+
*
450+
* Each plugin can implement its own filtering logic. Here we demonstrate a
451+
* simple logic by checking the GID. If the GID contains the "_nodecode"
452+
* substring, then we filter it out.
453+
*/
454+
staticbool
455+
pg_decode_filter_prepare(LogicalDecodingContext*ctx,constchar*gid)
456+
{
457+
if (strstr(gid,"_nodecode")!=NULL)
458+
return true;
459+
460+
return false;
461+
}
462+
323463
staticbool
324464
pg_decode_filter(LogicalDecodingContext*ctx,
325465
RepOriginIdorigin_id)
@@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
701841
OutputPluginWrite(ctx, true);
702842
}
703843

844+
staticvoid
845+
pg_decode_stream_prepare(LogicalDecodingContext*ctx,
846+
ReorderBufferTXN*txn,
847+
XLogRecPtrprepare_lsn)
848+
{
849+
TestDecodingData*data=ctx->output_plugin_private;
850+
TestDecodingTxnData*txndata=txn->output_plugin_private;
851+
852+
if (data->skip_empty_xacts&& !txndata->xact_wrote_changes)
853+
return;
854+
855+
OutputPluginPrepareWrite(ctx, true);
856+
857+
if (data->include_xids)
858+
appendStringInfo(ctx->out,"preparing streamed transaction TXN %s, txid %u",
859+
quote_literal_cstr(txn->gid),txn->xid);
860+
else
861+
appendStringInfo(ctx->out,"preparing streamed transaction %s",
862+
quote_literal_cstr(txn->gid));
863+
864+
if (data->include_timestamp)
865+
appendStringInfo(ctx->out," (at %s)",
866+
timestamptz_to_str(txn->commit_time));
867+
868+
OutputPluginWrite(ctx, true);
869+
}
870+
704871
staticvoid
705872
pg_decode_stream_commit(LogicalDecodingContext*ctx,
706873
ReorderBufferTXN*txn,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp