Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7dce5ca

Browse files
Nikhil Sontakkearssher
Nikhil Sontakke
authored andcommitted
Gracefully handle concurrent aborts of uncommitted transactions that are being decoded alongside.
When a transaction aborts, it's changes are considered unnecessary forother transactions. That means the changes may be either cleaned up byvacuum or removed from HOT chains (thus made inaccessible throughindexes), and there may be other such consequences.When decoding committed transactions this is not an issue, and wenever decode transactions that abort before the decoding starts.But for in-progress transactions - for example when decoding preparedtransactions on PREPARE (and not COMMIT PREPARED as before), thismay cause failures when the output plugin consults catalogs (bothsystem and user-defined).We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACKsqlerrcode from system table scan APIs to the backend decoding aspecific uncommitted transaction. The decoding logic on the receiptof such an sqlerrcode aborts the ongoing decoding and returnsgracefully.
1 parent6334bec commit7dce5ca

File tree

6 files changed

+143
-9
lines changed

6 files changed

+143
-9
lines changed

‎doc/src/sgml/logicaldecoding.sgml‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
421421
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
422422
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
423423
</programlisting>
424-
Any actions leading to transaction ID assignment are prohibited. That, among others,
424+
Note that access to user catalog tables or regular system catalog tables
425+
in the output plugins has to be done via the <literal>systable_*</literal> scan APIs only.
426+
Access via the <literal>heap_*</literal> scan APIs will error out.
427+
Additionally, any actions leading to transaction ID assignment are prohibited. That, among others,
425428
includes writing to tables, performing DDL changes, and
426429
calling <literal>txid_current()</literal>.
427430
</para>

‎src/backend/access/heap/heapam.c‎

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,6 +1834,17 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
18341834
HeapTuple
18351835
heap_getnext(HeapScanDescscan,ScanDirectiondirection)
18361836
{
1837+
/*
1838+
* We don't expect direct calls to heap_getnext with valid
1839+
* CheckXidAlive for regular tables. Track that below.
1840+
*/
1841+
if (unlikely(TransactionIdIsValid(CheckXidAlive)&&
1842+
!(IsCatalogRelation(scan->rs_rd)||
1843+
RelationIsUsedAsCatalogTable(scan->rs_rd))))
1844+
ereport(ERROR,
1845+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
1846+
errmsg("improper heap_getnext call")));
1847+
18371848
/* Note: no locking manipulations needed */
18381849

18391850
HEAPDEBUG_1;/* heap_getnext( info ) */
@@ -1914,6 +1925,16 @@ heap_fetch(Relation relation,
19141925
OffsetNumberoffnum;
19151926
boolvalid;
19161927

1928+
/*
1929+
* We don't expect direct calls to heap_fetch with valid
1930+
* CheckXidAlive for regular tables. Track that below.
1931+
*/
1932+
if (unlikely(TransactionIdIsValid(CheckXidAlive)&&
1933+
!(IsCatalogRelation(relation)||RelationIsUsedAsCatalogTable(relation))))
1934+
ereport(ERROR,
1935+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
1936+
errmsg("improper heap_fetch call")));
1937+
19171938
/*
19181939
* Fetch and pin the appropriate page of the relation.
19191940
*/
@@ -2046,6 +2067,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
20462067
boolvalid;
20472068
boolskip;
20482069

2070+
/*
2071+
* We don't expect direct calls to heap_hot_search_buffer with
2072+
* valid CheckXidAlive for regular tables. Track that below.
2073+
*/
2074+
if (unlikely(TransactionIdIsValid(CheckXidAlive)&&
2075+
!(IsCatalogRelation(relation)||RelationIsUsedAsCatalogTable(relation))))
2076+
ereport(ERROR,
2077+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2078+
errmsg("improper heap_hot_search_buffer call")));
2079+
20492080
/* If this is not the first call, previous call returned a (live!) tuple */
20502081
if (all_dead)
20512082
*all_dead=first_call;
@@ -2187,6 +2218,16 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
21872218
Bufferbuffer;
21882219
HeapTupleDataheapTuple;
21892220

2221+
/*
2222+
* We don't expect direct calls to heap_hot_search with
2223+
* valid CheckXidAlive for regular tables. Track that below.
2224+
*/
2225+
if (unlikely(TransactionIdIsValid(CheckXidAlive)&&
2226+
!(IsCatalogRelation(relation)||RelationIsUsedAsCatalogTable(relation))))
2227+
ereport(ERROR,
2228+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2229+
errmsg("improper heap_hot_search call")));
2230+
21902231
buffer=ReadBuffer(relation,ItemPointerGetBlockNumber(tid));
21912232
LockBuffer(buffer,BUFFER_LOCK_SHARE);
21922233
result=heap_hot_search_buffer(tid,relation,buffer,snapshot,
@@ -2216,6 +2257,16 @@ heap_get_latest_tid(Relation relation,
22162257
ItemPointerDatactid;
22172258
TransactionIdpriorXmax;
22182259

2260+
/*
2261+
* We don't expect direct calls to heap_get_latest_tid with valid
2262+
* CheckXidAlive for regular tables. Track that below.
2263+
*/
2264+
if (unlikely(TransactionIdIsValid(CheckXidAlive)&&
2265+
!(IsCatalogRelation(relation)||RelationIsUsedAsCatalogTable(relation))))
2266+
ereport(ERROR,
2267+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2268+
errmsg("improper heap_get_latest_tid call")));
2269+
22192270
/* this is to avoid Assert failures on bad input */
22202271
if (!ItemPointerIsValid(tid))
22212272
return;

‎src/backend/access/index/genam.c‎

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include"lib/stringinfo.h"
2626
#include"miscadmin.h"
2727
#include"storage/bufmgr.h"
28+
#include"storage/procarray.h"
2829
#include"utils/acl.h"
2930
#include"utils/builtins.h"
3031
#include"utils/lsyscache.h"
@@ -437,6 +438,17 @@ systable_getnext(SysScanDesc sysscan)
437438
else
438439
htup=heap_getnext(sysscan->scan,ForwardScanDirection);
439440

441+
/*
442+
* If CheckXidAlive is valid, then we check if it aborted. If it did, we
443+
* error out
444+
*/
445+
if (TransactionIdIsValid(CheckXidAlive)&&
446+
!TransactionIdIsInProgress(CheckXidAlive)&&
447+
!TransactionIdDidCommit(CheckXidAlive))
448+
ereport(ERROR,
449+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
450+
errmsg("transaction aborted during system catalog scan")));
451+
440452
returnhtup;
441453
}
442454

@@ -490,6 +502,18 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
490502
result=HeapTupleSatisfiesVisibility(tup,freshsnap,scan->rs_cbuf);
491503
LockBuffer(scan->rs_cbuf,BUFFER_LOCK_UNLOCK);
492504
}
505+
506+
/*
507+
* If CheckXidAlive is valid, then we check if it aborted. If it did, we
508+
* error out
509+
*/
510+
if (TransactionIdIsValid(CheckXidAlive)&&
511+
!TransactionIdIsInProgress(CheckXidAlive)&&
512+
!TransactionIdDidCommit(CheckXidAlive))
513+
ereport(ERROR,
514+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
515+
errmsg("transaction aborted during system catalog scan")));
516+
493517
returnresult;
494518
}
495519

@@ -607,6 +631,17 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
607631
if (htup&&sysscan->iscan->xs_recheck)
608632
elog(ERROR,"system catalog scans with lossy index conditions are not implemented");
609633

634+
/*
635+
* If CheckXidAlive is valid, then we check if it aborted. If it did, we
636+
* error out
637+
*/
638+
if (TransactionIdIsValid(CheckXidAlive)&&
639+
!TransactionIdIsInProgress(CheckXidAlive)&&
640+
!TransactionIdDidCommit(CheckXidAlive))
641+
ereport(ERROR,
642+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
643+
errmsg("transaction aborted during system catalog scan")));
644+
610645
returnhtup;
611646
}
612647

‎src/backend/replication/logical/reorderbuffer.c‎

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
636636
txn=ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
637637

638638
/* setup snapshot to allow catalog access */
639-
SetupHistoricSnapshot(snapshot_now,NULL);
639+
SetupHistoricSnapshot(snapshot_now,NULL,xid);
640640
PG_TRY();
641641
{
642642
rb->message(rb,txn,lsn, false,prefix,message_size,message);
@@ -1442,6 +1442,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
14421442
volatileCommandIdcommand_id=FirstCommandId;
14431443
boolusing_subtxn;
14441444
ReorderBufferIterTXNState*volatileiterstate=NULL;
1445+
MemoryContextccxt=CurrentMemoryContext;
14451446

14461447
txn->final_lsn=commit_lsn;
14471448
txn->end_lsn=end_lsn;
@@ -1468,7 +1469,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
14681469
ReorderBufferBuildTupleCidHash(rb,txn);
14691470

14701471
/* setup the initial snapshot */
1471-
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash);
1472+
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash,xid);
14721473

14731474
/*
14741475
* Decoding needs access to syscaches et al., which in turn use
@@ -1709,7 +1710,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
17091710

17101711

17111712
/* and continue with the new one */
1712-
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash);
1713+
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash,xid);
17131714
break;
17141715

17151716
caseREORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1729,7 +1730,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
17291730
snapshot_now->curcid=command_id;
17301731

17311732
TeardownHistoricSnapshot(false);
1732-
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash);
1733+
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash,xid);
17331734

17341735
/*
17351736
* Every time the CommandId is incremented, we could
@@ -1814,6 +1815,20 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
18141815
PG_CATCH();
18151816
{
18161817
/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1818+
MemoryContextecxt=MemoryContextSwitchTo(ccxt);
1819+
ErrorData*errdata=CopyErrorData();
1820+
1821+
/*
1822+
* if the catalog scan access returned an error of
1823+
* rollback, then abort on the other side as well
1824+
*/
1825+
if (errdata->sqlerrcode==ERRCODE_TRANSACTION_ROLLBACK)
1826+
{
1827+
elog(LOG,"stopping decoding of %s (%u)",
1828+
txn->gid[0]!='\0'?txn->gid:"",txn->xid);
1829+
rb->abort(rb,txn,commit_lsn);
1830+
}
1831+
18171832
if (iterstate)
18181833
ReorderBufferIterTXNFinish(rb,iterstate);
18191834

@@ -1837,7 +1852,14 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
18371852
/* remove potential on-disk data, and deallocate */
18381853
ReorderBufferCleanupTXN(rb,txn);
18391854

1840-
PG_RE_THROW();
1855+
/* re-throw only if it's not an abort */
1856+
if (errdata->sqlerrcode!=ERRCODE_TRANSACTION_ROLLBACK)
1857+
{
1858+
MemoryContextSwitchTo(ecxt);
1859+
PG_RE_THROW();
1860+
}
1861+
else
1862+
FlushErrorState();
18411863
}
18421864
PG_END_TRY();
18431865
}

‎src/backend/utils/time/snapmgr.c‎

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ static Snapshot SecondarySnapshot = NULL;
152152
staticSnapshotCatalogSnapshot=NULL;
153153
staticSnapshotHistoricSnapshot=NULL;
154154

155+
/*
156+
* An xid value pointing to a possibly ongoing or a prepared transaction.
157+
* Currently used in logical decoding. It's possible that such transactions
158+
* can get aborted while the decoding is ongoing.
159+
*/
160+
TransactionIdCheckXidAlive=InvalidTransactionId;
161+
155162
/*
156163
* These are updated by GetSnapshotData. We initialize them this way
157164
* for the convenience of TransactionIdIsInProgress: even in bootstrap
@@ -2000,10 +2007,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
20002007
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
20012008
* access to behave just like it did at a certain point in the past.
20022009
*
2010+
* If a valid xid is passed in, we check if it is uncommitted and track it in
2011+
* CheckXidAlive. This is to re-check XID status while accessing catalog.
2012+
*
20032013
* Needed for logical decoding.
20042014
*/
20052015
void
2006-
SetupHistoricSnapshot(Snapshothistoric_snapshot,HTAB*tuplecids)
2016+
SetupHistoricSnapshot(Snapshothistoric_snapshot,HTAB*tuplecids,
2017+
TransactionIdsnapshot_xid)
20072018
{
20082019
Assert(historic_snapshot!=NULL);
20092020

@@ -2012,8 +2023,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
20122023

20132024
/* setup (cmin, cmax) lookup hash */
20142025
tuplecid_data=tuplecids;
2015-
}
20162026

2027+
/*
2028+
* setup CheckXidAlive if it's not committed yet. We don't check
2029+
* if the xid aborted. That will happen during catalog access.
2030+
*/
2031+
if (TransactionIdIsValid(snapshot_xid)&&
2032+
!TransactionIdDidCommit(snapshot_xid))
2033+
CheckXidAlive=snapshot_xid;
2034+
else
2035+
CheckXidAlive=InvalidTransactionId;
2036+
}
20172037

20182038
/*
20192039
* Make catalog snapshots behave normally again.
@@ -2023,6 +2043,7 @@ TeardownHistoricSnapshot(bool is_error)
20232043
{
20242044
HistoricSnapshot=NULL;
20252045
tuplecid_data=NULL;
2046+
CheckXidAlive=InvalidTransactionId;
20262047
}
20272048

20282049
bool

‎src/include/utils/snapmgr.h‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,10 @@ extern char *ExportSnapshot(Snapshot snapshot);
103103

104104
/* Support for catalog timetravel for logical decoding */
105105
structHTAB;
106+
externTransactionIdCheckXidAlive;
106107
externstructHTAB*HistoricSnapshotGetTupleCids(void);
107-
externvoidSetupHistoricSnapshot(Snapshotsnapshot_now,structHTAB*tuplecids);
108+
externvoidSetupHistoricSnapshot(Snapshotsnapshot_now,structHTAB*tuplecids,
109+
TransactionIdsnapshot_xid);
108110
externvoidTeardownHistoricSnapshot(boolis_error);
109111
externboolHistoricSnapshotActive(void);
110112

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp