Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit465dd92

Browse files
committed
logical decoding: Tell reorderbuffer about all xids.
Logical decoding's reorderbuffer keeps transactions in an LSN orderedlist for efficiency. To make that's efficiently possible upper-levelxids are forced to be logged before nested subtransaction xids. Thatonly works though if these records are all looked at: Unfortunately wedidn't do so for e.g. row level locks, which are otherwise uninterestingfor logical decoding.This could lead to errors like:"ERROR: subxact logged without previous toplevel record".It's not sufficient to just look at row locking records, the xid couldappear first due to a lot of other types of records (which will triggerthe transaction to be marked logged with MarkCurrentTransactionIdLoggedIfAny).So invent infrastructure to tell reorderbuffer about xids seen, whenthey'd otherwise not pass through reorderbuffer.c.Reported-By: Jarred WardBug: #13844Discussion: 20160105033249.1087.66040@wrigleys.postgresql.orgBackpatch: 9.4, where logical decoding was added
1 parenta9613ee commit465dd92

File tree

7 files changed

+108
-16
lines changed

7 files changed

+108
-16
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ submake-isolation:
3737
submake-test_decoding:
3838
$(MAKE) -C$(top_builddir)/contrib/test_decoding
3939

40-
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
40+
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact\
41+
decoding_into_rel binary prepared
4142

4243
regresscheck: all | submake-regress submake-test_decoding
4344
$(MKDIR_P) regression_output
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
4+
?column?
5+
----------
6+
init
7+
(1 row)
8+
9+
-- bug #13844, xids in non-decoded records need to be inspected
10+
CREATE TABLE xact_test(data text);
11+
INSERT INTO xact_test VALUES ('before-test');
12+
BEGIN;
13+
-- perform operation in xact that creates and logs xid, but isn't decoded
14+
SELECT * FROM xact_test FOR UPDATE;
15+
data
16+
-------------
17+
before-test
18+
(1 row)
19+
20+
SAVEPOINT foo;
21+
-- and now actually insert in subxact, xid is expected to be known
22+
INSERT INTO xact_test VALUES ('after-assignment');
23+
COMMIT;
24+
-- and now show those changes
25+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
26+
data
27+
---------------------------------------------------------------
28+
BEGIN
29+
table public.xact_test: INSERT: data[text]:'before-test'
30+
COMMIT
31+
BEGIN
32+
table public.xact_test: INSERT: data[text]:'after-assignment'
33+
COMMIT
34+
(6 rows)
35+
36+
DROP TABLE xact_test;
37+
SELECT pg_drop_replication_slot('regression_slot');
38+
pg_drop_replication_slot
39+
--------------------------
40+
41+
(1 row)
42+

‎contrib/test_decoding/sql/xact.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- predictability
2+
SET synchronous_commit=on;
3+
4+
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','test_decoding');
5+
6+
-- bug #13844, xids in non-decoded records need to be inspected
7+
CREATETABLExact_test(datatext);
8+
INSERT INTO xact_testVALUES ('before-test');
9+
10+
BEGIN;
11+
-- perform operation in xact that creates and logs xid, but isn't decoded
12+
SELECT*FROM xact_test FORUPDATE;
13+
SAVEPOINT foo;
14+
-- and now actually insert in subxact, xid is expected to be known
15+
INSERT INTO xact_testVALUES ('after-assignment');
16+
COMMIT;
17+
-- and now show those changes
18+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
19+
20+
DROPTABLE xact_test;
21+
22+
SELECT pg_drop_replication_slot('regression_slot');

‎src/backend/replication/logical/decode.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
7777
* Take every XLogReadRecord()ed record and perform the actions required to
7878
* decode it using the output plugin already setup in the logical decoding
7979
* context.
80+
*
81+
* NB: Note that every record's xid needs to be processed by reorderbuffer
82+
* (xids contained in the content of records are not relevant for this rule).
83+
* That means that for records which'd otherwise not go through the
84+
* reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
85+
* call ReorderBufferProcessXid for each record type by default, because
86+
* e.g. empty xacts can be handled more efficiently if there's no previous
87+
* state for them.
8088
*/
8189
void
8290
LogicalDecodingProcessRecord(LogicalDecodingContext*ctx,XLogRecord*record)
@@ -132,6 +140,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
132140
caseRM_GIST_ID:
133141
caseRM_SEQ_ID:
134142
caseRM_SPGIST_ID:
143+
/* just deal with xid, and done */
144+
ReorderBufferProcessXid(ctx->reorder,record->xl_xid,
145+
buf.origptr);
135146
break;
136147
caseRM_NEXT_ID:
137148
elog(ERROR,"unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds)buf.record.xl_rmid);
@@ -147,6 +158,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
147158
SnapBuild*builder=ctx->snapshot_builder;
148159
uint8info=buf->record.xl_info& ~XLR_INFO_MASK;
149160

161+
ReorderBufferProcessXid(ctx->reorder,buf->record.xl_xid,
162+
buf->origptr);
163+
150164
switch (info)
151165
{
152166
/* this is also used in END_OF_RECOVERY checkpoints */
@@ -187,7 +201,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
187201
XLogRecord*r=&buf->record;
188202
uint8info=r->xl_info& ~XLR_INFO_MASK;
189203

190-
/* no point in doing anything yet, data could not be decoded anyway */
204+
/*
205+
* No point in doing anything yet, data could not be decoded anyway. It's
206+
* ok not to call ReorderBufferProcessXid() in that case, except in the
207+
* assignment case there'll not be any later records with the same xid;
208+
* and in the assignment case we'll not decode those xacts.
209+
*/
191210
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
192211
return;
193212

@@ -302,6 +321,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
302321
* transactions in the changestream allowing for a kind of
303322
* distributed 2PC.
304323
*/
324+
ReorderBufferProcessXid(reorder,r->xl_xid,buf->origptr);
305325
break;
306326
default:
307327
elog(ERROR,"unexpected RM_XACT_ID record type: %u",info);
@@ -318,6 +338,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
318338
XLogRecord*r=&buf->record;
319339
uint8info=r->xl_info& ~XLR_INFO_MASK;
320340

341+
ReorderBufferProcessXid(ctx->reorder,r->xl_xid,buf->origptr);
342+
321343
switch (info)
322344
{
323345
caseXLOG_RUNNING_XACTS:
@@ -355,6 +377,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
355377
TransactionIdxid=buf->record.xl_xid;
356378
SnapBuild*builder=ctx->snapshot_builder;
357379

380+
ReorderBufferProcessXid(ctx->reorder,xid,buf->origptr);
381+
358382
/* no point in doing anything yet */
359383
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
360384
return;
@@ -408,6 +432,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
408432
TransactionIdxid=buf->record.xl_xid;
409433
SnapBuild*builder=ctx->snapshot_builder;
410434

435+
ReorderBufferProcessXid(ctx->reorder,xid,buf->origptr);
436+
411437
/* no point in doing anything yet */
412438
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
413439
return;

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,16 +1669,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
16691669

16701670

16711671
/*
1672-
* Check whether a transaction is already known in this module.xs
1672+
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1673+
* least once for every xid in XLogRecord->xl_xid (other places in records
1674+
* may, but do not have to be passed through here).
1675+
*
1676+
* Reorderbuffer keeps some datastructures about transactions in LSN order,
1677+
* for efficiency. To do that it has to know about when transactions are seen
1678+
* first in the WAL. As many types of records are not actually interesting for
1679+
* logical decoding, they do not necessarily pass though here.
16731680
*/
1674-
bool
1675-
ReorderBufferIsXidKnown(ReorderBuffer*rb,TransactionIdxid)
1681+
void
1682+
ReorderBufferProcessXid(ReorderBuffer*rb,TransactionIdxid,XLogRecPtrlsn)
16761683
{
1677-
ReorderBufferTXN*txn;
1678-
1679-
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr,
1680-
false);
1681-
returntxn!=NULL;
1684+
/* many records won't have an xid assigned, centralize check here */
1685+
if (xid!=InvalidTransactionId)
1686+
ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
16821687
}
16831688

16841689
/*

‎src/backend/replication/logical/snapbuild.c

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -636,8 +636,6 @@ SnapBuildClearExportedSnapshot()
636636
bool
637637
SnapBuildProcessChange(SnapBuild*builder,TransactionIdxid,XLogRecPtrlsn)
638638
{
639-
boolis_old_tx;
640-
641639
/*
642640
* We can't handle data in transactions if we haven't built a snapshot
643641
* yet, so don't store them.
@@ -658,9 +656,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
658656
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
659657
* be needed to decode the change we're currently processing.
660658
*/
661-
is_old_tx=ReorderBufferIsXidKnown(builder->reorder,xid);
662-
663-
if (!is_old_tx|| !ReorderBufferXidHasBaseSnapshot(builder->reorder,xid))
659+
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder,xid))
664660
{
665661
/* only build a new snapshot if we don't have a prebuilt one */
666662
if (builder->snapshot==NULL)

‎src/include/replication/reorderbuffer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
352352
CommandIdcmin,CommandIdcmax,CommandIdcombocid);
353353
voidReorderBufferAddInvalidations(ReorderBuffer*,TransactionId,XLogRecPtrlsn,
354354
Sizenmsgs,SharedInvalidationMessage*msgs);
355-
boolReorderBufferIsXidKnown(ReorderBuffer*,TransactionIdxid);
355+
voidReorderBufferProcessXid(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
356356
voidReorderBufferXidSetCatalogChanges(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
357357
boolReorderBufferXidHasCatalogChanges(ReorderBuffer*,TransactionIdxid);
358358
boolReorderBufferXidHasBaseSnapshot(ReorderBuffer*,TransactionIdxid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp