Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit76666c3

Browse files
committed
some comments
1 parent1801ced commit76666c3

File tree

7 files changed

+76
-21
lines changed

7 files changed

+76
-21
lines changed

‎contrib/test_decoding/test_decoding.c

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,32 +279,48 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
279279
OutputPluginWrite(ctx, true);
280280
}
281281

282+
283+
/* Filter out unnecessary two-phase transactions */
282284
staticbool
283285
pg_filter_prepare(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
284286
char*gid)
285287
{
286288
TestDecodingData*data=ctx->output_plugin_private;
287289

290+
/* treat all transaction as one-phase */
288291
if (!data->twophase_decoding)
289292
return true;
290293

294+
/*
295+
* Two-phase transactions that accessed catalog require special treatment.
296+
*
297+
* Right now we don't have a save way to decode catalog changes made in
298+
* prepared transaction that was already aborted by the time of decoding.
299+
*
300+
* That kind of problem arises only when we are trying to retrospectively
301+
* decode aborted transactions. If one wants to code distributed commit
302+
* based on prepare decoding then commits/aborts will happend strictly after
303+
* decoding will be completed, so it is safe to skip any checks/locks here.
304+
*/
291305
if (txn->has_catalog_changes)
292306
{
293307
LWLockAcquire(TwoPhaseStateLock,LW_SHARED);
294308

295309
if (TransactionIdIsInProgress(txn->xid))
296310
{
297311
/*
298-
* XXX
312+
* For the sake of simplicity we just ignore in-progess transaction
313+
* in this extension, as they may abort during deconing.
299314
*/
300315
LWLockRelease(TwoPhaseStateLock);
301316
return true;
302317
}
303318
elseif (TransactionIdDidAbort(txn->xid))
304319
{
305320
/*
306-
* Here we know that it is already aborted and should humble
307-
* ourselves.
321+
* Here we know that it is already aborted and there is no
322+
* mush sence in doing something with this transaction.
323+
* Consequent ABORT PREPARED will be suppressed.
308324
*/
309325
LWLockRelease(TwoPhaseStateLock);
310326
return true;

‎src/backend/replication/logical/decode.c

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
295295
XLogRecGetData(buf->record),&parsed);
296296

297297
/* does output plugin wants this particular transaction? */
298-
if (ReorderBufferPrepareNeedSkip(reorder,parsed.twophase_xid,
298+
if (ctx->callbacks.filter_prepare_cb&&
299+
ReorderBufferPrepareNeedSkip(reorder,parsed.twophase_xid,
299300
parsed.twophase_gid))
300301
{
301302
ReorderBufferProcessXid(reorder,parsed.twophase_xid,
@@ -641,22 +642,23 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
641642
}
642643
}
643644

645+
646+
/*
647+
* Decode PREPARE record. Same logic as in COMMIT, but diffent calls
648+
* to SnapshotBuilder as we need to mark this transaction as commited
649+
* instead of running to properly decode it. When prepared transation
650+
* is decoded we mark it in snapshot as running again.
651+
*/
644652
staticvoid
645653
DecodePrepare(LogicalDecodingContext*ctx,XLogRecordBuffer*buf,
646654
xl_xact_parsed_prepare*parsed)
647655
{
648-
XLogRecPtrorigin_lsn=InvalidXLogRecPtr;
649-
TimestampTzcommit_time=0;
656+
XLogRecPtrorigin_lsn=parsed->origin_lsn;
657+
TimestampTzcommit_time=parsed->origin_timestamp;
650658
XLogRecPtrorigin_id=XLogRecGetOrigin(buf->record);
651659
inti;
652660
TransactionIdxid=parsed->twophase_xid;
653661

654-
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
655-
{
656-
origin_lsn=parsed->origin_lsn;
657-
commit_time=parsed->origin_timestamp;
658-
}
659-
660662
/*
661663
* Process invalidation messages, even if we're not interested in the
662664
* transaction's contents, since the various caches need to always be

‎src/backend/replication/logical/logical.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid
815815
error_context_stack=&errcallback;
816816

817817
/* set output state */
818-
ctx->accept_writes=true;// ->false
818+
ctx->accept_writes= false;
819819

820820
/* do the actual work: call callback */
821821
ret=ctx->callbacks.filter_prepare_cb(ctx,txn,gid);

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,17 +1667,24 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
16671667
PG_END_TRY();
16681668
}
16691669

1670+
/*
1671+
* Ask output plugin whether we want to skip this PREPARE and send
1672+
* this transaction as one-phase later on commit.
1673+
*/
16701674
bool
16711675
ReorderBufferPrepareNeedSkip(ReorderBuffer*rb,TransactionIdxid,char*gid)
16721676
{
16731677
ReorderBufferTXN*txn;
16741678

16751679
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr, false);
1676-
Assert(txn!=NULL);
1677-
// ctx->callbacks.filter_prepare_cb
1680+
16781681
returnrb->filter_prepare(rb,txn,gid);
16791682
}
16801683

1684+
1685+
/*
1686+
* Commit non-twophase transaction. See comments to ReorderBufferCommitInternal()
1687+
*/
16811688
void
16821689
ReorderBufferCommit(ReorderBuffer*rb,TransactionIdxid,
16831690
XLogRecPtrcommit_lsn,XLogRecPtrend_lsn,
@@ -1693,6 +1700,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16931700
commit_time,origin_id,origin_lsn);
16941701
}
16951702

1703+
/*
1704+
* Prepare twophase transaction. It calls ReorderBufferCommitInternal()
1705+
* since all transaction changes should be decoded on PREPARE.
1706+
*/
16961707
void
16971708
ReorderBufferPrepare(ReorderBuffer*rb,TransactionIdxid,
16981709
XLogRecPtrcommit_lsn,XLogRecPtrend_lsn,
@@ -1712,6 +1723,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
17121723
commit_time,origin_id,origin_lsn);
17131724
}
17141725

1726+
/*
1727+
* Check whether this transaction was sent as prepared to receiver.
1728+
* Called upon commit/abort prepared.
1729+
*/
17151730
bool
17161731
ReorderBufferTxnIsPrepared(ReorderBuffer*rb,TransactionIdxid)
17171732
{
@@ -1720,7 +1735,6 @@ ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
17201735
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr,
17211736
false);
17221737

1723-
/* TXN not found mean that it was send already, isn't it? */
17241738
returntxn==NULL ? true :txn->prepared;
17251739
}
17261740

‎src/backend/replication/logical/snapbuild.c

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,18 +1101,39 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
11011101
}
11021102
}
11031103

1104+
/*
1105+
* Just a wrapper to clarify DecodePrepare().
1106+
* Right now we can't extract correct historic catalog data that
1107+
* was produced by aborted prepared transaction, so it work of
1108+
* decoding plugin to avoid such situation and here we just construct usual
1109+
* snapshot to able to decode prepare.
1110+
*/
11041111
void
11051112
SnapBuildPrepareTxnStart(SnapBuild*builder,XLogRecPtrlsn,TransactionIdxid,
11061113
intnsubxacts,TransactionId*subxacts)
11071114
{
11081115
SnapBuildCommitTxn(builder,lsn,xid,nsubxacts,subxacts);
11091116
}
11101117

1118+
1119+
/*
1120+
* When decoding of preppare is finished we want should exclude our xid
1121+
* from list of committed xids to have correct snapshot between prepare
1122+
* and commit.
1123+
*
1124+
* However, this is not sctrictly needed. Prepared transaction holds locks
1125+
* between prepare and commit so nodody can produce new version of our
1126+
* catalog tuples. In case of abort we will have this xid in array of
1127+
* commited xids, but it also will not cause a problem since checks of
1128+
* HeapTupleHeaderXminInvalid() in HeapTupleSatisfiesHistoricMVCC()
1129+
* have higher priority then checks for xip array. Anyway let's be consistent
1130+
* about definitions and delete this xid from xip array.
1131+
*/
11111132
void
11121133
SnapBuildPrepareTxnFinish(SnapBuild*builder,TransactionIdxid)
11131134
{
11141135
TransactionId*search=bsearch(&xid,builder->running.xip,
1115-
builder->running.xcnt_space,sizeof(TransactionId),xidComparator);
1136+
builder->running.xcnt,sizeof(TransactionId),xidComparator);
11161137

11171138
if (search==NULL)
11181139
return;
@@ -1121,6 +1142,8 @@ SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid)
11211142
memmove(search,search+1,
11221143
((builder->running.xip+builder->running.xcnt-1)-search)*sizeof(TransactionId));
11231144
builder->running.xcnt--;
1145+
1146+
/* update min/max */
11241147
builder->running.xmin=builder->running.xip[0];
11251148
builder->running.xmax=builder->running.xip[builder->running.xcnt-1];
11261149
}

‎src/include/replication/output_plugin.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
7171
* Called before decoding of PREPARE record to decide whether this
7272
* transaction should be decoded with separate calls to prepare
7373
* and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
74-
* andsend as usual transaction.
74+
* andsent as usual transaction.
7575
*/
7676
typedefbool (*LogicalDecodeFilterPrepareCB) (structLogicalDecodingContext*ctx,
7777
ReorderBufferTXN*txn,

‎src/include/replication/reorderbuffer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ typedef struct ReorderBufferTXN
149149
chargid[GIDSIZE];
150150

151151
/*
152-
*We have ability to treat twophase transaction as ordinary one
153-
*with the help of filter_prepare callback.
154-
*XXX: try to reword that comment
152+
*By using filter_prepare() callback we can force decoding to treat
153+
*two-phase transaction as on ordinary one. This flag is set if we are
154+
*actually called prepape() callback in output plugin.
155155
*/
156156
boolprepared;
157157

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp