Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7d2f872

Browse files
committed
allow to skip prepared tx
1 parent77c26d0 commit7d2f872

File tree

7 files changed

+217
-77
lines changed

7 files changed

+217
-77
lines changed

‎contrib/test_decoding/expected/prepared.out

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ BEGIN;
2525
INSERT INTO test_prepared1 VALUES (5);
2626
ALTER TABLE test_prepared1 ADD COLUMN data text;
2727
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
28-
LOCK test_prepared1;
2928
PREPARE TRANSACTION 'test_prepared#3';
3029
-- test that we decode correctly while an uncommitted prepared xact
3130
-- with ddl exists.
@@ -45,33 +44,27 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
4544
-------------------------------------------------------------------------
4645
BEGIN
4746
table public.test_prepared1: INSERT: id[integer]:1
48-
PREPARE
49-
COMMIT PREPARED
47+
COMMIT
5048
BEGIN
5149
table public.test_prepared1: INSERT: id[integer]:2
5250
COMMIT
5351
BEGIN
54-
table public.test_prepared1: INSERT: id[integer]:3
55-
PREPARE
56-
ABORT PREPARED
57-
BEGIN
5852
table public.test_prepared1: INSERT: id[integer]:4
5953
COMMIT
6054
BEGIN
55+
table public.test_prepared2: INSERT: id[integer]:7
56+
COMMIT
57+
BEGIN
6158
table public.test_prepared1: INSERT: id[integer]:5
6259
table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
63-
PREPARE
64-
BEGIN
65-
table public.test_prepared2: INSERT: id[integer]:7
6660
COMMIT
67-
COMMIT PREPARED
6861
BEGIN
6962
table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
7063
COMMIT
7164
BEGIN
7265
table public.test_prepared2: INSERT: id[integer]:9
7366
COMMIT
74-
(28 rows)
67+
(22 rows)
7568

7669
SELECT pg_drop_replication_slot('regression_slot');
7770
pg_drop_replication_slot

‎contrib/test_decoding/sql/prepared.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ BEGIN;
2525
INSERT INTO test_prepared1VALUES (5);
2626
ALTERTABLE test_prepared1 ADD COLUMN datatext;
2727
INSERT INTO test_prepared1VALUES (6,'frakbar');
28-
LOCK test_prepared1;
2928
PREPARE TRANSACTION'test_prepared#3';
3029

3130
-- test that we decode correctly while an uncommitted prepared xact
3231
-- with ddl exists.
3332

3433
-- separate table because of the lock from the ALTER
34+
-- this will come before the '5' row above, as this commits before it.
3535
INSERT INTO test_prepared2VALUES (7);
3636

37-
ROLLBACK PREPARED'test_prepared#3';
37+
COMMIT PREPARED'test_prepared#3';
3838

3939
-- make sure stuff still works
4040
INSERT INTO test_prepared1VALUES (8);
@@ -47,4 +47,4 @@ DROP TABLE test_prepared2;
4747
-- show results
4848
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
4949

50-
SELECT pg_drop_replication_slot('regression_slot');
50+
SELECT pg_drop_replication_slot('regression_slot');

‎contrib/test_decoding/test_decoding.c

Lines changed: 123 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct
4646
boolskip_empty_xacts;
4747
boolxact_wrote_changes;
4848
boolonly_local;
49+
booltwophase_decoding;
4950
}TestDecodingData;
5051

5152
staticvoidpg_decode_startup(LogicalDecodingContext*ctx,OutputPluginOptions*opt,
@@ -68,6 +69,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
6869
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
6970
booltransactional,constchar*prefix,
7071
Sizesz,constchar*message);
72+
staticboolpg_filter_prepare(LogicalDecodingContext*ctx,
73+
ReorderBufferTXN*txn,
74+
char*gid);
75+
staticvoidpg_decode_prepare_txn(LogicalDecodingContext*ctx,
76+
ReorderBufferTXN*txn,
77+
XLogRecPtrprepare_lsn);
78+
staticvoidpg_decode_commit_prepared_txn(LogicalDecodingContext*ctx,
79+
ReorderBufferTXN*txn,
80+
XLogRecPtrcommit_lsn);
81+
staticvoidpg_decode_abort_prepared_txn(LogicalDecodingContext*ctx,
82+
ReorderBufferTXN*txn,
83+
XLogRecPtrabort_lsn);
84+
7185

7286
void
7387
_PG_init(void)
@@ -85,9 +99,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8599
cb->begin_cb=pg_decode_begin_txn;
86100
cb->change_cb=pg_decode_change;
87101
cb->commit_cb=pg_decode_commit_txn;
102+
88103
cb->filter_by_origin_cb=pg_decode_filter;
89104
cb->shutdown_cb=pg_decode_shutdown;
90105
cb->message_cb=pg_decode_message;
106+
107+
cb->filter_prepare_cb=pg_filter_prepare;
108+
cb->prepare_cb=pg_decode_prepare_txn;
109+
cb->commit_prepared_cb=pg_decode_commit_prepared_txn;
110+
cb->abort_prepared_cb=pg_decode_abort_prepared_txn;
91111
}
92112

93113

@@ -107,6 +127,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
107127
data->include_timestamp= false;
108128
data->skip_empty_xacts= false;
109129
data->only_local= false;
130+
data->twophase_decoding= false;
110131

111132
ctx->output_plugin_private=data;
112133

@@ -176,6 +197,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
176197
errmsg("could not parse value \"%s\" for parameter \"%s\"",
177198
strVal(elem->arg),elem->defname)));
178199
}
200+
elseif (strcmp(elem->defname,"twophase-decoding")==0)
201+
{
202+
203+
if (elem->arg==NULL)
204+
data->twophase_decoding= true;
205+
elseif (!parse_bool(strVal(elem->arg),&data->twophase_decoding))
206+
ereport(ERROR,
207+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
208+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
209+
strVal(elem->arg),elem->defname)));
210+
}
179211
else
180212
{
181213
ereport(ERROR,
@@ -233,21 +265,97 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
233265

234266
OutputPluginPrepareWrite(ctx, true);
235267

236-
switch(txn->xact_action)
237-
{
238-
caseXLOG_XACT_COMMIT:
239-
appendStringInfoString(ctx->out,"COMMIT");
240-
break;
241-
caseXLOG_XACT_PREPARE:
242-
appendStringInfo(ctx->out,"PREPARE '%s'",txn->gid);
243-
break;
244-
caseXLOG_XACT_COMMIT_PREPARED:
245-
appendStringInfo(ctx->out,"COMMIT PREPARED '%s'",txn->gid);
246-
break;
247-
caseXLOG_XACT_ABORT_PREPARED:
248-
appendStringInfo(ctx->out,"ABORT PREPARED '%s'",txn->gid);
249-
break;
250-
}
268+
appendStringInfoString(ctx->out,"COMMIT");
269+
270+
if (data->include_xids)
271+
appendStringInfo(ctx->out," %u",txn->xid);
272+
273+
if (data->include_timestamp)
274+
appendStringInfo(ctx->out," (at %s)",
275+
timestamptz_to_str(txn->commit_time));
276+
277+
OutputPluginWrite(ctx, true);
278+
}
279+
280+
staticbool
281+
pg_filter_prepare(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
282+
char*gid)
283+
{
284+
TestDecodingData*data=ctx->output_plugin_private;
285+
286+
// has_catalog_changes?
287+
// LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
288+
289+
// OutputPluginPrepareWrite(ctx, true);
290+
291+
// appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
292+
293+
// OutputPluginWrite(ctx, true);
294+
return true;
295+
}
296+
297+
298+
/* PREPARE callback */
299+
staticvoid
300+
pg_decode_prepare_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
301+
XLogRecPtrprepare_lsn)
302+
{
303+
TestDecodingData*data=ctx->output_plugin_private;
304+
305+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
306+
return;
307+
308+
OutputPluginPrepareWrite(ctx, true);
309+
310+
appendStringInfo(ctx->out,"PREPARE! '%s'",txn->gid);
311+
312+
if (data->include_xids)
313+
appendStringInfo(ctx->out," %u",txn->xid);
314+
315+
if (data->include_timestamp)
316+
appendStringInfo(ctx->out," (at %s)",
317+
timestamptz_to_str(txn->commit_time));
318+
319+
OutputPluginWrite(ctx, true);
320+
}
321+
322+
/* COMMIT PREPARED callback */
323+
staticvoid
324+
pg_decode_commit_prepared_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
325+
XLogRecPtrcommit_lsn)
326+
{
327+
TestDecodingData*data=ctx->output_plugin_private;
328+
329+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
330+
return;
331+
332+
OutputPluginPrepareWrite(ctx, true);
333+
334+
appendStringInfo(ctx->out,"COMMIT PREPARED '%s'",txn->gid);
335+
336+
if (data->include_xids)
337+
appendStringInfo(ctx->out," %u",txn->xid);
338+
339+
if (data->include_timestamp)
340+
appendStringInfo(ctx->out," (at %s)",
341+
timestamptz_to_str(txn->commit_time));
342+
343+
OutputPluginWrite(ctx, true);
344+
}
345+
346+
/* ABORT PREPARED callback */
347+
staticvoid
348+
pg_decode_abort_prepared_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
349+
XLogRecPtrabort_lsn)
350+
{
351+
TestDecodingData*data=ctx->output_plugin_private;
352+
353+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
354+
return;
355+
356+
OutputPluginPrepareWrite(ctx, true);
357+
358+
appendStringInfo(ctx->out,"ABORT PREPARED '%s'",txn->gid);
251359

252360
if (data->include_xids)
253361
appendStringInfo(ctx->out," %u",txn->xid);

‎src/backend/replication/logical/decode.c

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,6 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
224224
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
225225
return;
226226

227-
reorder->xact_action=info;
228-
229227
switch (info)
230228
{
231229
caseXLOG_XACT_COMMIT:
@@ -627,14 +625,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
627625
buf->origptr,buf->endptr);
628626
}
629627

630-
if (TransactionIdIsValid(parsed->twophase_xid)) {
628+
if (TransactionIdIsValid(parsed->twophase_xid)&&
629+
ReorderBufferTxnIsPrepared(ctx->reorder,xid))
630+
{
631631
/*
632632
* We are processing COMMIT PREPARED and know that reorder buffer is
633633
* empty. So we can skip use shortcut for coomiting bare xact.
634634
*/
635-
strcpy(ctx->reorder->gid,parsed->twophase_gid);
636-
ReorderBufferCommitBareXact(ctx->reorder,xid,buf->origptr,buf->endptr,
637-
commit_time,origin_id,origin_lsn);
635+
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
636+
commit_time,origin_id,origin_lsn,parsed->twophase_gid, true);
638637
}else {
639638
/* replay actions of all transaction + subtransactions in order */
640639
ReorderBufferCommit(ctx->reorder,xid,buf->origptr,buf->endptr,
@@ -651,7 +650,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
651650
XLogRecPtrorigin_id=XLogRecGetOrigin(buf->record);
652651
inti;
653652
TransactionIdxid=parsed->twophase_xid;
654-
strcpy(ctx->reorder->gid,parsed->twophase_gid);
655653

656654
/*
657655
* Process invalidation messages, even if we're not interested in the
@@ -689,8 +687,8 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
689687
}
690688

691689
/* replay actions of all transaction + subtransactions in order */
692-
ReorderBufferCommit(ctx->reorder,xid,buf->origptr,buf->endptr,
693-
commit_time,origin_id,origin_lsn);
690+
ReorderBufferPrepare(ctx->reorder,xid,buf->origptr,buf->endptr,
691+
commit_time,origin_id,origin_lsn,parsed->twophase_gid);
694692
}
695693

696694
/*
@@ -709,13 +707,13 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
709707
/*
710708
* If that is ROLLBACK PREPARED than send that to callbacks.
711709
*/
712-
if (TransactionIdIsValid(parsed->twophase_xid)
713-
&& (parsed->dbId==ctx->slot->data.database)) {
714-
715-
strcpy(ctx->reorder->gid,parsed->twophase_gid);
716-
717-
ReorderBufferCommitBareXact(ctx->reorder,xid,buf->origptr,buf->endptr,
718-
commit_time,origin_id,origin_lsn);
710+
if (TransactionIdIsValid(xid)&&
711+
parsed->dbId==ctx->slot->data.database&&
712+
ReorderBufferTxnIsPrepared(ctx->reorder,xid))
713+
{
714+
ReorderBufferFinishPrepared(ctx->reorder,xid,buf->origptr,buf->endptr,
715+
commit_time,origin_id,origin_lsn,
716+
parsed->twophase_gid, false);
719717
return;
720718
}
721719

‎src/backend/replication/logical/logical.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,8 @@ filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid
814814
errcallback.previous=error_context_stack;
815815
error_context_stack=&errcallback;
816816

817-
// /* set output state */
818-
//ctx->accept_writes =false;
817+
/* set output state */
818+
ctx->accept_writes=true;// ->false
819819

820820
/* do the actual work: call callback */
821821
ret=ctx->callbacks.filter_prepare_cb(ctx,txn,gid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp