Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6334bec

Browse files
Nikhil Sontakkearssher
Nikhil Sontakke
authored andcommitted
Support decoding of two-phase transactions at PREPARE
Until now two-phase transactions were decoded at COMMIT, just likeregular transaction. During replay, two-phase transactions weretranslated into regular transactions on the subscriber, and the GIDwas not forwarded to it.This patch allows PREPARE-time decoding two-phase transactions (ifthe output plugin supports this capability), in which case thetransactions are replayed at PREPARE and then committed later whenCOMMIT PREPARED arrives.On the subscriber, the transactions will be executed as two-phasetransactions, with the same GID. This is important for variousexternal transaction managers, that often encode information intothe GID itself.Includes documentation changes.
1 parent7a5e3d7 commit6334bec

File tree

7 files changed

+746
-32
lines changed

7 files changed

+746
-32
lines changed

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,12 @@ typedef struct OutputPluginCallbacks
385385
LogicalDecodeChangeCB change_cb;
386386
LogicalDecodeTruncateCB truncate_cb;
387387
LogicalDecodeCommitCB commit_cb;
388+
LogicalDecodeAbortCB abort_cb;
388389
LogicalDecodeMessageCB message_cb;
390+
LogicalDecodeFilterPrepareCB filter_prepare_cb;
391+
LogicalDecodePrepareCB prepare_cb;
392+
LogicalDecodeCommitPreparedCB commit_prepared_cb;
393+
LogicalDecodeAbortPreparedCB abort_prepared_cb;
389394
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
390395
LogicalDecodeShutdownCB shutdown_cb;
391396
} OutputPluginCallbacks;
@@ -457,7 +462,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
457462
never get
458463
decoded. Successful savepoints are
459464
folded into the transaction containing them in the order they were
460-
executed within that transaction.
465+
executed within that transaction. A transaction that is prepared for
466+
a two-phase commit using <command>PREPARE TRANSACTION</command> will
467+
also be decoded if the output plugin callbacks needed for decoding
468+
them are provided. It is possible that the current transaction which
469+
is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
470+
command. In that case, the logical decoding of this transaction will
471+
be aborted too.
461472
</para>
462473

463474
<note>
@@ -558,6 +569,71 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
558569
</para>
559570
</sect3>
560571

572+
<sect3 id="logicaldecoding-output-plugin-prepare">
573+
<title>Transaction Prepare Callback</title>
574+
575+
<para>
576+
The optional <function>prepare_cb</function> callback is called whenever
577+
a transaction which is prepared for two-phase commit has been
578+
decoded. The <function>change_cb</function> callbacks for all modified
579+
rows will have been called before this, if there have been any modified
580+
rows.
581+
<programlisting>
582+
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
583+
ReorderBufferTXN *txn,
584+
XLogRecPtr prepare_lsn);
585+
</programlisting>
586+
</para>
587+
</sect3>
588+
589+
<sect3 id="logicaldecoding-output-plugin-commit-prepared">
590+
<title>Commit Prepared Transaction Callback</title>
591+
592+
<para>
593+
The optional <function>commit_prepared_cb</function> callback is called whenever
594+
a commit prepared transaction has been decoded. The <parameter>gid</parameter> field,
595+
which is part of the <parameter>txn</parameter> parameter can be used in this
596+
callback.
597+
<programlisting>
598+
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
599+
ReorderBufferTXN *txn,
600+
XLogRecPtr commit_lsn);
601+
</programlisting>
602+
</para>
603+
</sect3>
604+
605+
<sect3 id="logicaldecoding-output-plugin-abort-prepared">
606+
<title>Rollback Prepared Transaction Callback</title>
607+
608+
<para>
609+
The optional <function>abort_prepared_cb</function> callback is called whenever
610+
a rollback prepared transaction has been decoded. The <parameter>gid</parameter> field,
611+
which is part of the <parameter>txn</parameter> parameter can be used in this
612+
callback.
613+
<programlisting>
614+
typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
615+
ReorderBufferTXN *txn,
616+
XLogRecPtr abort_lsn);
617+
</programlisting>
618+
</para>
619+
</sect3>
620+
621+
<sect3 id="logicaldecoding-output-plugin-abort">
622+
<title>Transaction Abort Callback</title>
623+
624+
<para>
625+
The required <function>abort_cb</function> callback is called whenever
626+
a transaction abort has to be initiated. This can happen if we are
627+
decoding a transaction that has been prepared for two-phase commit and
628+
a concurrent rollback happens while we are decoding it.
629+
<programlisting>
630+
typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
631+
ReorderBufferTXN *txn,
632+
XLogRecPtr abort_lsn);
633+
</programlisting>
634+
</para>
635+
</sect3>
636+
561637
<sect3 id="logicaldecoding-output-plugin-change">
562638
<title>Change Callback</title>
563639

@@ -567,7 +643,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
567643
an <command>INSERT</command>, <command>UPDATE</command>,
568644
or <command>DELETE</command>. Even if the original command modified
569645
several rows at once the callback will be called individually for each
570-
row.
646+
row. The <function>change_cb</function> callback may access system or
647+
user catalog tables to aid in the process of outputting the row
648+
modification details. In case of decoding a prepared (but yet
649+
uncommitted) transaction or decoding of an uncommitted transaction, this
650+
change callback might also error out due to simultaneous rollback of
651+
this very same transaction. In that case, the logical decoding of this
652+
aborted transaction is stopped gracefully.
571653
<programlisting>
572654
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
573655
ReorderBufferTXN *txn,
@@ -644,6 +726,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
644726
</para>
645727
</sect3>
646728

729+
<sect3 id="logicaldecoding-output-plugin-filter-prepare">
730+
<title>Prepare Filter Callback</title>
731+
732+
<para>
733+
The optional <function>filter_prepare_cb</function> callback
734+
is called to determine whether data that is part of the current
735+
two-phase commit transaction should be considered for decode
736+
at this prepare stage or as a regular one-phase transaction at
737+
<command>COMMIT PREPARED</command> time later. To signal that
738+
decoding should be skipped, return <literal>true</literal>;
739+
<literal>false</literal> otherwise. When the callback is not
740+
defined, <literal>false</literal> is assumed (i.e. nothing is
741+
filtered).
742+
<programlisting>
743+
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
744+
ReorderBufferTXN *txn,
745+
TransactionId xid,
746+
const char *gid);
747+
</programlisting>
748+
The <parameter>ctx</parameter> parameter has the same contents
749+
as for the other callbacks. The <parameter>txn</parameter> parameter
750+
contains meta information about the transaction. The <parameter>xid</parameter>
751+
contains the XID because <parameter>txn</parameter> can be NULL in some cases.
752+
The <parameter>gid</parameter> is the identifier that later identifies this
753+
transaction for <command>COMMIT PREPARED</command> or <command>ROLLBACK PREPARED</command>.
754+
</para>
755+
<para>
756+
The callback has to provide the same static answer for a given combination of
757+
<parameter>xid</parameter> and <parameter>gid</parameter> every time it is
758+
called.
759+
</para>
760+
</sect3>
761+
647762
<sect3 id="logicaldecoding-output-plugin-message">
648763
<title>Generic Message Callback</title>
649764

@@ -665,7 +780,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
665780
non-transactional and the XID was not assigned yet in the transaction
666781
which logged the message. The <parameter>lsn</parameter> has WAL
667782
location of the message. The <parameter>transactional</parameter> says
668-
if the message was sent as transactional or not.
783+
if the message was sent as transactional or not. Similar to the change
784+
callback, in case of decoding a prepared (but yet uncommitted)
785+
transaction or decoding of an uncommitted transaction, this message
786+
callback might also error out due to simultaneous rollback of
787+
this very same transaction. In that case, the logical decoding of this
788+
aborted transaction is stopped gracefully.
789+
669790
The <parameter>prefix</parameter> is arbitrary null-terminated prefix
670791
which can be used for identifying interesting messages for the current
671792
plugin. And finally the <parameter>message</parameter> parameter holds

‎src/backend/replication/logical/decode.c

Lines changed: 136 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include"access/xlogutils.h"
3535
#include"access/xlogreader.h"
3636
#include"access/xlogrecord.h"
37+
#include"access/twophase.h"
3738

3839
#include"catalog/pg_control.h"
3940

@@ -73,6 +74,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
7374
xl_xact_parsed_commit*parsed,TransactionIdxid);
7475
staticvoidDecodeAbort(LogicalDecodingContext*ctx,XLogRecordBuffer*buf,
7576
xl_xact_parsed_abort*parsed,TransactionIdxid);
77+
staticvoidDecodePrepare(LogicalDecodingContext*ctx,XLogRecordBuffer*buf,
78+
xl_xact_parsed_prepare*parsed);
7679

7780
/* common function to decode tuples */
7881
staticvoidDecodeXLogTuple(char*data,Sizelen,ReorderBufferTupleBuf*tup);
@@ -281,16 +284,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
281284
break;
282285
}
283286
caseXLOG_XACT_PREPARE:
287+
{
288+
xl_xact_parsed_prepareparsed;
284289

285-
/*
286-
* Currently decoding ignores PREPARE TRANSACTION and will just
287-
* decode the transaction when the COMMIT PREPARED is sent or
288-
* throw away the transaction's contents when a ROLLBACK PREPARED
289-
* is received. In the future we could add code to expose prepared
290-
* transactions in the changestream allowing for a kind of
291-
* distributed 2PC.
292-
*/
293-
ReorderBufferProcessXid(reorder,XLogRecGetXid(r),buf->origptr);
290+
/* check that output plugin is capable of twophase decoding */
291+
if (!ctx->options.enable_twophase)
292+
{
293+
ReorderBufferProcessXid(reorder,XLogRecGetXid(r),buf->origptr);
294+
break;
295+
}
296+
297+
/* ok, parse it */
298+
ParsePrepareRecord(XLogRecGetInfo(buf->record),
299+
XLogRecGetData(buf->record),&parsed);
300+
301+
/* does output plugin want this particular transaction? */
302+
if (ctx->callbacks.filter_prepare_cb&&
303+
ReorderBufferPrepareNeedSkip(reorder,parsed.twophase_xid,
304+
parsed.twophase_gid))
305+
{
306+
ReorderBufferProcessXid(reorder,parsed.twophase_xid,
307+
buf->origptr);
308+
break;
309+
}
310+
311+
DecodePrepare(ctx,buf,&parsed);
312+
break;
313+
}
294314
break;
295315
default:
296316
elog(ERROR,"unexpected RM_XACT_ID record type: %u",info);
@@ -633,9 +653,90 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
633653
buf->origptr,buf->endptr);
634654
}
635655

656+
/*
657+
* Decide if we're processing COMMIT PREPARED, or a regular COMMIT.
658+
* Regular commit simply triggers a replay of transaction changes from the
659+
* reorder buffer. For COMMIT PREPARED that however already happened at
660+
* PREPARE time, and so we only need to notify the subscriber that the GID
661+
* finally committed.
662+
*
663+
* For output plugins that do not support PREPARE-time decoding of
664+
* two-phase transactions, we never even see the PREPARE and all two-phase
665+
* transactions simply fall through to the second branch.
666+
*/
667+
if (TransactionIdIsValid(parsed->twophase_xid)&&
668+
ReorderBufferTxnIsPrepared(ctx->reorder,
669+
parsed->twophase_xid,parsed->twophase_gid))
670+
{
671+
Assert(xid==parsed->twophase_xid);
672+
/* we are processing COMMIT PREPARED */
673+
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
674+
commit_time,origin_id,origin_lsn,
675+
parsed->twophase_gid, true);
676+
}
677+
else
678+
{
679+
/* replay actions of all transaction + subtransactions in order */
680+
ReorderBufferCommit(ctx->reorder,xid,buf->origptr,buf->endptr,
681+
commit_time,origin_id,origin_lsn);
682+
}
683+
}
684+
685+
/*
686+
* Decode PREPARE record. Similar logic as in COMMIT
687+
*/
688+
staticvoid
689+
DecodePrepare(LogicalDecodingContext*ctx,XLogRecordBuffer*buf,
690+
xl_xact_parsed_prepare*parsed)
691+
{
692+
XLogRecPtrorigin_lsn=parsed->origin_lsn;
693+
TimestampTzcommit_time=parsed->origin_timestamp;
694+
XLogRecPtrorigin_id=XLogRecGetOrigin(buf->record);
695+
inti;
696+
TransactionIdxid=parsed->twophase_xid;
697+
698+
/*
699+
* Process invalidation messages, even if we're not interested in the
700+
* transaction's contents, since the various caches need to always be
701+
* consistent.
702+
*/
703+
if (parsed->nmsgs>0)
704+
{
705+
if (!ctx->fast_forward)
706+
ReorderBufferAddInvalidations(ctx->reorder,xid,buf->origptr,
707+
parsed->nmsgs,parsed->msgs);
708+
ReorderBufferXidSetCatalogChanges(ctx->reorder,xid,buf->origptr);
709+
}
710+
711+
/*
712+
* Tell the reorderbuffer about the surviving subtransactions. We need to
713+
* do this because the main transaction itself has not committed since we
714+
* are in the prepare phase right now. So we need to be sure the snapshot
715+
* is setup correctly for the main transaction in case all changes
716+
* happened in subtransanctions
717+
*/
718+
for (i=0;i<parsed->nsubxacts;i++)
719+
{
720+
ReorderBufferCommitChild(ctx->reorder,xid,parsed->subxacts[i],
721+
buf->origptr,buf->endptr);
722+
}
723+
724+
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr)||
725+
(parsed->dbId!=InvalidOid&&parsed->dbId!=ctx->slot->data.database)||
726+
ctx->fast_forward||FilterByOrigin(ctx,origin_id))
727+
{
728+
for (i=0;i<parsed->nsubxacts;i++)
729+
{
730+
ReorderBufferForget(ctx->reorder,parsed->subxacts[i],buf->origptr);
731+
}
732+
ReorderBufferForget(ctx->reorder,xid,buf->origptr);
733+
734+
return;
735+
}
736+
636737
/* replay actions of all transaction + subtransactions in order */
637-
ReorderBufferCommit(ctx->reorder,xid,buf->origptr,buf->endptr,
638-
commit_time,origin_id,origin_lsn);
738+
ReorderBufferPrepare(ctx->reorder,xid,buf->origptr,buf->endptr,
739+
commit_time,origin_id,origin_lsn,parsed->twophase_gid);
639740
}
640741

641742
/*
@@ -647,6 +748,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
647748
xl_xact_parsed_abort*parsed,TransactionIdxid)
648749
{
649750
inti;
751+
XLogRecPtrorigin_lsn=InvalidXLogRecPtr;
752+
TimestampTzcommit_time=0;
753+
XLogRecPtrorigin_id=XLogRecGetOrigin(buf->record);
754+
755+
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
756+
{
757+
origin_lsn=parsed->origin_lsn;
758+
commit_time=parsed->origin_timestamp;
759+
}
760+
761+
/*
762+
* If it's ROLLBACK PREPARED then handle it via callbacks.
763+
*/
764+
if (TransactionIdIsValid(xid)&&
765+
!SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr)&&
766+
parsed->dbId==ctx->slot->data.database&&
767+
!FilterByOrigin(ctx,origin_id)&&
768+
ReorderBufferTxnIsPrepared(ctx->reorder,xid,parsed->twophase_gid))
769+
{
770+
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
771+
commit_time,origin_id,origin_lsn,
772+
parsed->twophase_gid, false);
773+
return;
774+
}
650775

651776
for (i=0;i<parsed->nsubxacts;i++)
652777
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp