Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8bdb133

Browse files
author
Amit Kapila
committed
Avoid repeated decoding of prepared transactions after a restart.
In commita271a1b, we allowed decoding at prepare time and the preparewas decoded again if there is a restart after decoding it. It was donethat way because we can't distinguish between the cases where we have notdecoded the prepare because it was prior to consistent snapshot or we havedecoded it earlier but restarted. To distinguish between these two cases,we have introduced an initial_consistent_point at the slot level which isan LSN at which we found a consistent point at the time of slot creation.This is also the point where we have exported a snapshot for the initialcopy. So, prepare transaction prior to this point are sent along withcommit prepared.This commit bumps SNAPBUILD_VERSION because of change in SnapBuild. Itwill break existing slots which is fine in a major release.Author: Ajin Cherian, based on idea by Andres FreundReviewed-by: Amit Kapila and Vignesh CDiscussion:https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
1 parent6230912 commit8bdb133

File tree

10 files changed

+61
-67
lines changed

10 files changed

+61
-67
lines changed

‎contrib/test_decoding/expected/twophase.out

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
3333

3434
COMMIT PREPARED 'test_prepared#1';
3535
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
36-
data
37-
----------------------------------------------------
38-
BEGIN
39-
table public.test_prepared1: INSERT: id[integer]:1
40-
table public.test_prepared1: INSERT: id[integer]:2
41-
PREPARE TRANSACTION 'test_prepared#1'
36+
data
37+
-----------------------------------
4238
COMMIT PREPARED 'test_prepared#1'
43-
(5 rows)
39+
(1 row)
4440

4541
-- Test that rollback of a prepared xact is decoded.
4642
BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
10399

104100
COMMIT PREPARED 'test_prepared#3';
105101
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
106-
data
107-
-------------------------------------------------------------------------
108-
BEGIN
109-
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
110-
PREPARE TRANSACTION 'test_prepared#3'
102+
data
103+
-----------------------------------
111104
COMMIT PREPARED 'test_prepared#3'
112-
(4 rows)
105+
(1 row)
113106

114107
-- make sure stuff still works
115108
INSERT INTO test_prepared1 VALUES (6);
@@ -158,14 +151,10 @@ RESET statement_timeout;
158151
COMMIT PREPARED 'test_prepared_lock';
159152
-- consume the commit
160153
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
161-
data
162-
---------------------------------------------------------------------------
163-
BEGIN
164-
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
165-
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
166-
PREPARE TRANSACTION 'test_prepared_lock'
154+
data
155+
--------------------------------------
167156
COMMIT PREPARED 'test_prepared_lock'
168-
(5 rows)
157+
(1 row)
169158

170159
-- Test savepoints and sub-xacts. Creating savepoints will create
171160
-- sub-xacts implicitly.
@@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
188177
COMMIT PREPARED 'test_prepared_savepoint';
189178
-- consume the commit
190179
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
191-
data
192-
------------------------------------------------------------
193-
BEGIN
194-
table public.test_prepared_savepoint: INSERT: a[integer]:1
195-
PREPARE TRANSACTION 'test_prepared_savepoint'
180+
data
181+
-------------------------------------------
196182
COMMIT PREPARED 'test_prepared_savepoint'
197-
(4 rows)
183+
(1 row)
198184

199185
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
200186
BEGIN;

‎contrib/test_decoding/expected/twophase_stream.out

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
6060
COMMIT PREPARED 'test1';
6161
--should show the COMMIT PREPARED and the other changes in the transaction
6262
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
63-
data
64-
-------------------------------------------------------------
65-
BEGIN
66-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
67-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
68-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
69-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
70-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
71-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
72-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
73-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
74-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
75-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
76-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
77-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
78-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
79-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
80-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
81-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
82-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
83-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
84-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
85-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
86-
PREPARE TRANSACTION 'test1'
63+
data
64+
-------------------------
8765
COMMIT PREPARED 'test1'
88-
(23 rows)
66+
(1 row)
8967

9068
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
9169
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
191191
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
192192
lsn | xid | data
193193
-----------+-----+--------------------------------------------
194-
0/1689DC0 | 529 | BEGIN 529
195-
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
196-
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
197194
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
198195
(4 row)
199196

@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
822819
<parameter>gid</parameter> field, which is part of the
823820
<parameter>txn</parameter> parameter, can be used in this callback to
824821
check if the plugin has already received this <command>PREPARE</command>
825-
in which case it can skip the remaining changes of the transaction.
826-
This can only happen if the user restarts the decoding after receiving
827-
the <command>PREPARE</command> for a transaction but before receiving
828-
the <command>COMMIT PREPARED</command>, say because of some error.
822+
in which case it can either error out or skip the remaining changes of
823+
the transaction.
829824
<programlisting>
830825
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
831826
ReorderBufferTXN *txn);

‎src/backend/replication/logical/decode.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
730730
if (two_phase)
731731
{
732732
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
733+
SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
733734
commit_time,origin_id,origin_lsn,
734735
parsed->twophase_gid, true);
735736
}
@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
868869
{
869870
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
870871
abort_time,origin_id,origin_lsn,
872+
InvalidXLogRecPtr,
871873
parsed->twophase_gid, false);
872874
}
873875
else

‎src/backend/replication/logical/logical.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
207207
ctx->reorder=ReorderBufferAllocate();
208208
ctx->snapshot_builder=
209209
AllocateSnapshotBuilder(ctx->reorder,xmin_horizon,start_lsn,
210-
need_full_snapshot);
210+
need_full_snapshot,slot->data.initial_consistent_point);
211211

212212
ctx->reorder->private_data=ctx;
213213

@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
590590

591591
SpinLockAcquire(&slot->mutex);
592592
slot->data.confirmed_flush=ctx->reader->EndRecPtr;
593+
slot->data.initial_consistent_point=ctx->reader->EndRecPtr;
593594
SpinLockRelease(&slot->mutex);
594595
}
595596

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
26722672
void
26732673
ReorderBufferFinishPrepared(ReorderBuffer*rb,TransactionIdxid,
26742674
XLogRecPtrcommit_lsn,XLogRecPtrend_lsn,
2675+
XLogRecPtrinitial_consistent_point,
26752676
TimestampTzcommit_time,RepOriginIdorigin_id,
26762677
XLogRecPtrorigin_lsn,char*gid,boolis_commit)
26772678
{
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
26982699
/*
26992700
* It is possible that this transaction is not decoded at prepare time
27002701
* either because by that time we didn't have a consistent snapshot or it
2701-
* was decoded earlier but we have restarted. We can't distinguish between
2702-
* those two cases so we send the prepare in both the cases and let
2703-
* downstream decide whether to process or skip it. We don't need to
2704-
* decode the xact for aborts if it is not done already.
2702+
* was decoded earlier but we have restarted. We only need to send the
2703+
* prepare if it was not decoded earlier. We don't need to decode the xact
2704+
* for aborts if it is not done already.
27052705
*/
2706-
if (!rbtxn_prepared(txn)&&is_commit)
2706+
if ((txn->final_lsn<initial_consistent_point)&&is_commit)
27072707
{
27082708
txn->txn_flags |=RBTXN_PREPARE;
27092709

‎src/backend/replication/logical/snapbuild.c

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ struct SnapBuild
164164
*/
165165
XLogRecPtrstart_decoding_at;
166166

167+
/*
168+
* LSN at which we found a consistent point at the time of slot creation.
169+
* This is also the point where we have exported a snapshot for the
170+
* initial copy.
171+
*
172+
* The prepared transactions that are not covered by initial snapshot
173+
* needs to be sent later along with commit prepared and they must be
174+
* before this point.
175+
*/
176+
XLogRecPtrinitial_consistent_point;
177+
167178
/*
168179
* Don't start decoding WAL until the "xl_running_xacts" information
169180
* indicates there are no running xids with an xid smaller than this.
@@ -269,7 +280,8 @@ SnapBuild *
269280
AllocateSnapshotBuilder(ReorderBuffer*reorder,
270281
TransactionIdxmin_horizon,
271282
XLogRecPtrstart_lsn,
272-
boolneed_full_snapshot)
283+
boolneed_full_snapshot,
284+
XLogRecPtrinitial_consistent_point)
273285
{
274286
MemoryContextcontext;
275287
MemoryContextoldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
297309
builder->initial_xmin_horizon=xmin_horizon;
298310
builder->start_decoding_at=start_lsn;
299311
builder->building_full_snapshot=need_full_snapshot;
312+
builder->initial_consistent_point=initial_consistent_point;
300313

301314
MemoryContextSwitchTo(oldcontext);
302315

@@ -356,6 +369,15 @@ SnapBuildCurrentState(SnapBuild *builder)
356369
returnbuilder->state;
357370
}
358371

372+
/*
373+
* Return the LSN at which the snapshot was exported
374+
*/
375+
XLogRecPtr
376+
SnapBuildInitialConsistentPoint(SnapBuild*builder)
377+
{
378+
returnbuilder->initial_consistent_point;
379+
}
380+
359381
/*
360382
* Should the contents of transaction ending at 'ptr' be decoded?
361383
*/
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
14221444
offsetof(SnapBuildOnDisk, version)
14231445

14241446
#defineSNAPBUILD_MAGIC 0x51A1E001
1425-
#defineSNAPBUILD_VERSION3
1447+
#defineSNAPBUILD_VERSION4
14261448

14271449
/*
14281450
* Store/Load a snapshot from disk, depending on the snapshot builder's state.

‎src/include/replication/reorderbuffer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ voidReorderBufferCommit(ReorderBuffer *, TransactionId,
643643
TimestampTzcommit_time,RepOriginIdorigin_id,XLogRecPtrorigin_lsn);
644644
voidReorderBufferFinishPrepared(ReorderBuffer*rb,TransactionIdxid,
645645
XLogRecPtrcommit_lsn,XLogRecPtrend_lsn,
646+
XLogRecPtrinitial_consistent_point,
646647
TimestampTzcommit_time,
647648
RepOriginIdorigin_id,XLogRecPtrorigin_lsn,
648649
char*gid,boolis_commit);

‎src/include/replication/slot.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
9191
*/
9292
XLogRecPtrconfirmed_flush;
9393

94+
/*
95+
* LSN at which we found a consistent point at the time of slot creation.
96+
* This is also the point where we have exported a snapshot for the
97+
* initial copy.
98+
*/
99+
XLogRecPtrinitial_consistent_point;
100+
94101
/* plugin name */
95102
NameDataplugin;
96103
}ReplicationSlotPersistentData;

‎src/include/replication/snapbuild.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
6161

6262
externSnapBuild*AllocateSnapshotBuilder(structReorderBuffer*cache,
6363
TransactionIdxmin_horizon,XLogRecPtrstart_lsn,
64-
boolneed_full_snapshot);
64+
boolneed_full_snapshot,
65+
XLogRecPtrinitial_consistent_point);
6566
externvoidFreeSnapshotBuilder(SnapBuild*cache);
6667

6768
externvoidSnapBuildSnapDecRefcount(Snapshotsnap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
7576
TransactionIdxid);
7677

7778
externboolSnapBuildXactNeedsSkip(SnapBuild*snapstate,XLogRecPtrptr);
79+
externXLogRecPtrSnapBuildInitialConsistentPoint(SnapBuild*builder);
7880

7981
externvoidSnapBuildCommitTxn(SnapBuild*builder,XLogRecPtrlsn,
8082
TransactionIdxid,intnsubxacts,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp