Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf46f9d8

Browse files
committed
Offer to do 2PC only if snap is consistent.
1 parentf1bb569 commitf46f9d8

File tree

3 files changed

+37
-28
lines changed

3 files changed

+37
-28
lines changed

‎src/backend/replication/logical/decode.c

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
287287
{
288288
xl_xact_parsed_prepareparsed;
289289

290-
/* check that output plugin is capable of twophase decoding */
291-
if (!ctx->options.enable_twophase)
290+
/*
291+
* Check that output plugin is capable of twophase decoding.
292+
* We also don't offer to do 2PC if snap is not yet consistent
293+
* as of reading PREPARE.
294+
*/
295+
if (!ctx->options.enable_twophase||
296+
SnapBuildCurrentState(builder)<SNAPBUILD_CONSISTENT)
292297
{
293298
ReorderBufferProcessXid(reorder,XLogRecGetXid(r),buf->origptr);
294299
break;
@@ -584,13 +589,23 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
584589
TimestampTzcommit_time=parsed->xact_time;
585590
RepOriginIdorigin_id=XLogRecGetOrigin(buf->record);
586591
inti;
592+
boolreorderbuffer_has_xid;
587593

588594
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
589595
{
590596
origin_lsn=parsed->origin_lsn;
591597
commit_time=parsed->origin_timestamp;
592598
}
593599

600+
/*
601+
* If this is COMMIT PREPARED and ReorderBuffer doesn't have this xid,
602+
* either the plugin refused to do 2PC on this xact or we didn't have
603+
* consistent snapshot yet during PREPARE processing. Anyway, in this case
604+
* we don't do 2PC and replay xact fully now. We must check this early
605+
* since invalidation addition below might add the record to the RB.
606+
*/
607+
reorderbuffer_has_xid=ReorderBufferHasXid(ctx->reorder,xid);
608+
594609
/*
595610
* Process invalidation messages, even if we're not interested in the
596611
* transaction's contents, since the various caches need to always be
@@ -663,10 +678,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
663678
* For output plugins that do not support PREPARE-time decoding of
664679
* two-phase transactions, we never even see the PREPARE and all two-phase
665680
* transactions simply fall through to the second branch.
681+
*
682+
* We rely on existence of xid in reorderbuffer to determine was 2PC done
683+
* or not. This is correct because we always see PREPARE before COMMIT
684+
* PREPARED if the latter was after consistent point.
685+
*
666686
*/
667687
if (TransactionIdIsValid(parsed->twophase_xid)&&
668-
ReorderBufferTxnIsPrepared(ctx->reorder,
669-
parsed->twophase_xid,parsed->twophase_gid))
688+
!reorderbuffer_has_xid)
670689
{
671690
Assert(xid==parsed->twophase_xid);
672691
/* we are processing COMMIT PREPARED */
@@ -765,7 +784,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
765784
!SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr)&&
766785
parsed->dbId==ctx->slot->data.database&&
767786
!FilterByOrigin(ctx,origin_id)&&
768-
ReorderBufferTxnIsPrepared(ctx->reorder,xid,parsed->twophase_gid))
787+
!ReorderBufferHasXid(ctx->reorder,xid))
769788
{
770789
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
771790
commit_time,origin_id,origin_lsn,

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1932,27 +1932,6 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
19321932
commit_time,origin_id,origin_lsn);
19331933
}
19341934

1935-
/*
1936-
* Check whether this transaction was sent as prepared to subscribers.
1937-
* Called while handling commit|abort prepared.
1938-
*/
1939-
bool
1940-
ReorderBufferTxnIsPrepared(ReorderBuffer*rb,TransactionIdxid,
1941-
constchar*gid)
1942-
{
1943-
ReorderBufferTXN*txn;
1944-
1945-
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr,
1946-
false);
1947-
1948-
/*
1949-
* Always call the prepare filter. It's the job of the prepare filter to
1950-
* give us the *same* response for a given xid across multiple calls
1951-
* (including ones on restart)
1952-
*/
1953-
return !(rb->filter_prepare(rb,txn,xid,gid));
1954-
}
1955-
19561935
/*
19571936
* Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
19581937
*/
@@ -2175,6 +2154,18 @@ ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
21752154
ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
21762155
}
21772156

2157+
/*
2158+
* Does reorderbuffer currently holds this xact?
2159+
*/
2160+
bool
2161+
ReorderBufferHasXid(ReorderBuffer*rb,TransactionIdxid)
2162+
{
2163+
ReorderBufferTXN*txn;
2164+
2165+
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr, true);
2166+
returntxn!=NULL;
2167+
}
2168+
21782169
/*
21792170
* Add a new snapshot to this transaction that may only used after lsn 'lsn'
21802171
* because the previous snapshot doesn't describe the catalog correctly for

‎src/include/replication/reorderbuffer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,14 +497,13 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr ls
497497
voidReorderBufferImmediateInvalidation(ReorderBuffer*,uint32ninvalidations,
498498
SharedInvalidationMessage*invalidations);
499499
voidReorderBufferProcessXid(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
500+
boolReorderBufferHasXid(ReorderBuffer*,TransactionIdxid);
500501
voidReorderBufferXidSetCatalogChanges(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
501502
boolReorderBufferXidHasCatalogChanges(ReorderBuffer*,TransactionIdxid);
502503
boolReorderBufferXidHasBaseSnapshot(ReorderBuffer*,TransactionIdxid);
503504

504505
boolReorderBufferPrepareNeedSkip(ReorderBuffer*rb,TransactionIdxid,
505506
constchar*gid);
506-
boolReorderBufferTxnIsPrepared(ReorderBuffer*rb,TransactionIdxid,
507-
constchar*gid);
508507
voidReorderBufferPrepare(ReorderBuffer*rb,TransactionIdxid,
509508
XLogRecPtrcommit_lsn,XLogRecPtrend_lsn,
510509
TimestampTzcommit_time,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp