Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd9e903f

Browse files
committed
logical decoding: Tell reorderbuffer about all xids.
Logical decoding's reorderbuffer keeps transactions in an LSN orderedlist for efficiency. To make that's efficiently possible upper-levelxids are forced to be logged before nested subtransaction xids. Thatonly works though if these records are all looked at: Unfortunately wedidn't do so for e.g. row level locks, which are otherwise uninterestingfor logical decoding.This could lead to errors like:"ERROR: subxact logged without previous toplevel record".It's not sufficient to just look at row locking records, the xid couldappear first due to a lot of other types of records (which will triggerthe transaction to be marked logged with MarkCurrentTransactionIdLoggedIfAny).So invent infrastructure to tell reorderbuffer about xids seen, whenthey'd otherwise not pass through reorderbuffer.c.Reported-By: Jarred WardBug: #13844Discussion: 20160105033249.1087.66040@wrigleys.postgresql.orgBackpatch: 9.4, where logical decoding was added
1 parentdc7d70e commitd9e903f

File tree

7 files changed

+108
-17
lines changed

7 files changed

+108
-17
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ submake-isolation:
3737
submake-test_decoding:
3838
$(MAKE) -C$(top_builddir)/contrib/test_decoding
3939

40-
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel\
41-
binary prepared replorigin time
40+
REGRESSCHECKS=ddlxactrewrite toast permissions decoding_in_xact\
41+
decoding_into_relbinary prepared replorigin time
4242

4343
regresscheck: | submake-regress submake-test_decoding temp-install
4444
$(MKDIR_P) regression_output
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
4+
?column?
5+
----------
6+
init
7+
(1 row)
8+
9+
-- bug #13844, xids in non-decoded records need to be inspected
10+
CREATE TABLE xact_test(data text);
11+
INSERT INTO xact_test VALUES ('before-test');
12+
BEGIN;
13+
-- perform operation in xact that creates and logs xid, but isn't decoded
14+
SELECT * FROM xact_test FOR UPDATE;
15+
data
16+
-------------
17+
before-test
18+
(1 row)
19+
20+
SAVEPOINT foo;
21+
-- and now actually insert in subxact, xid is expected to be known
22+
INSERT INTO xact_test VALUES ('after-assignment');
23+
COMMIT;
24+
-- and now show those changes
25+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
26+
data
27+
---------------------------------------------------------------
28+
BEGIN
29+
table public.xact_test: INSERT: data[text]:'before-test'
30+
COMMIT
31+
BEGIN
32+
table public.xact_test: INSERT: data[text]:'after-assignment'
33+
COMMIT
34+
(6 rows)
35+
36+
DROP TABLE xact_test;
37+
SELECT pg_drop_replication_slot('regression_slot');
38+
pg_drop_replication_slot
39+
--------------------------
40+
41+
(1 row)
42+

‎contrib/test_decoding/sql/xact.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- predictability
2+
SET synchronous_commit=on;
3+
4+
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','test_decoding');
5+
6+
-- bug #13844, xids in non-decoded records need to be inspected
7+
CREATETABLExact_test(datatext);
8+
INSERT INTO xact_testVALUES ('before-test');
9+
10+
BEGIN;
11+
-- perform operation in xact that creates and logs xid, but isn't decoded
12+
SELECT*FROM xact_test FORUPDATE;
13+
SAVEPOINT foo;
14+
-- and now actually insert in subxact, xid is expected to be known
15+
INSERT INTO xact_testVALUES ('after-assignment');
16+
COMMIT;
17+
-- and now show those changes
18+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
19+
20+
DROPTABLE xact_test;
21+
22+
SELECT pg_drop_replication_slot('regression_slot');

‎src/backend/replication/logical/decode.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
7878
* Take every XLogReadRecord()ed record and perform the actions required to
7979
* decode it using the output plugin already setup in the logical decoding
8080
* context.
81+
*
82+
* NB: Note that every record's xid needs to be processed by reorderbuffer
83+
* (xids contained in the content of records are not relevant for this rule).
84+
* That means that for records which'd otherwise not go through the
85+
* reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
86+
* call ReorderBufferProcessXid for each record type by default, because
87+
* e.g. empty xacts can be handled more efficiently if there's no previous
88+
* state for them.
8189
*/
8290
void
8391
LogicalDecodingProcessRecord(LogicalDecodingContext*ctx,XLogReaderState*record)
@@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
135143
caseRM_BRIN_ID:
136144
caseRM_COMMIT_TS_ID:
137145
caseRM_REPLORIGIN_ID:
146+
/* just deal with xid, and done */
147+
ReorderBufferProcessXid(ctx->reorder,XLogRecGetXid(record),
148+
buf.origptr);
138149
break;
139150
caseRM_NEXT_ID:
140151
elog(ERROR,"unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds)XLogRecGetRmid(buf.record));
@@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
150161
SnapBuild*builder=ctx->snapshot_builder;
151162
uint8info=XLogRecGetInfo(buf->record)& ~XLR_INFO_MASK;
152163

164+
ReorderBufferProcessXid(ctx->reorder,XLogRecGetXid(buf->record),
165+
buf->origptr);
166+
153167
switch (info)
154168
{
155169
/* this is also used in END_OF_RECOVERY checkpoints */
@@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
191205
XLogReaderState*r=buf->record;
192206
uint8info=XLogRecGetInfo(r)&XLOG_XACT_OPMASK;
193207

194-
/* no point in doing anything yet, data could not be decoded anyway */
208+
/*
209+
* No point in doing anything yet, data could not be decoded anyway. It's
210+
* ok not to call ReorderBufferProcessXid() in that case, except in the
211+
* assignment case there'll not be any later records with the same xid;
212+
* and in the assignment case we'll not decode those xacts.
213+
*/
195214
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
196215
return;
197216

@@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
260279
* transactions in the changestream allowing for a kind of
261280
* distributed 2PC.
262281
*/
282+
ReorderBufferProcessXid(reorder,XLogRecGetXid(r),buf->origptr);
263283
break;
264284
default:
265285
elog(ERROR,"unexpected RM_XACT_ID record type: %u",info);
@@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
276296
XLogReaderState*r=buf->record;
277297
uint8info=XLogRecGetInfo(r)& ~XLR_INFO_MASK;
278298

299+
ReorderBufferProcessXid(ctx->reorder,XLogRecGetXid(r),buf->origptr);
300+
279301
switch (info)
280302
{
281303
caseXLOG_RUNNING_XACTS:
@@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
313335
TransactionIdxid=XLogRecGetXid(buf->record);
314336
SnapBuild*builder=ctx->snapshot_builder;
315337

338+
ReorderBufferProcessXid(ctx->reorder,xid,buf->origptr);
339+
316340
/* no point in doing anything yet */
317341
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
318342
return;
@@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
366390
TransactionIdxid=XLogRecGetXid(buf->record);
367391
SnapBuild*builder=ctx->snapshot_builder;
368392

393+
ReorderBufferProcessXid(ctx->reorder,xid,buf->origptr);
394+
369395
/* no point in doing anything yet */
370396
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
371397
return;

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,16 +1741,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
17411741

17421742

17431743
/*
1744-
* Check whether a transaction is already known in this module.xs
1744+
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1745+
* least once for every xid in XLogRecord->xl_xid (other places in records
1746+
* may, but do not have to be passed through here).
1747+
*
1748+
* Reorderbuffer keeps some datastructures about transactions in LSN order,
1749+
* for efficiency. To do that it has to know about when transactions are seen
1750+
* first in the WAL. As many types of records are not actually interesting for
1751+
* logical decoding, they do not necessarily pass though here.
17451752
*/
1746-
bool
1747-
ReorderBufferIsXidKnown(ReorderBuffer*rb,TransactionIdxid)
1753+
void
1754+
ReorderBufferProcessXid(ReorderBuffer*rb,TransactionIdxid,XLogRecPtrlsn)
17481755
{
1749-
ReorderBufferTXN*txn;
1750-
1751-
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr,
1752-
false);
1753-
returntxn!=NULL;
1756+
/* many records won't have an xid assigned, centralize check here */
1757+
if (xid!=InvalidTransactionId)
1758+
ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
17541759
}
17551760

17561761
/*

‎src/backend/replication/logical/snapbuild.c

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,6 @@ SnapBuildClearExportedSnapshot(void)
635635
bool
636636
SnapBuildProcessChange(SnapBuild*builder,TransactionIdxid,XLogRecPtrlsn)
637637
{
638-
boolis_old_tx;
639-
640638
/*
641639
* We can't handle data in transactions if we haven't built a snapshot
642640
* yet, so don't store them.
@@ -657,9 +655,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
657655
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
658656
* be needed to decode the change we're currently processing.
659657
*/
660-
is_old_tx=ReorderBufferIsXidKnown(builder->reorder,xid);
661-
662-
if (!is_old_tx|| !ReorderBufferXidHasBaseSnapshot(builder->reorder,xid))
658+
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder,xid))
663659
{
664660
/* only build a new snapshot if we don't have a prebuilt one */
665661
if (builder->snapshot==NULL)

‎src/include/replication/reorderbuffer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
366366
CommandIdcmin,CommandIdcmax,CommandIdcombocid);
367367
voidReorderBufferAddInvalidations(ReorderBuffer*,TransactionId,XLogRecPtrlsn,
368368
Sizenmsgs,SharedInvalidationMessage*msgs);
369-
boolReorderBufferIsXidKnown(ReorderBuffer*,TransactionIdxid);
369+
voidReorderBufferProcessXid(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
370370
voidReorderBufferXidSetCatalogChanges(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
371371
boolReorderBufferXidHasCatalogChanges(ReorderBuffer*,TransactionIdxid);
372372
boolReorderBufferXidHasBaseSnapshot(ReorderBuffer*,TransactionIdxid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp