Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit77c26d0

Browse files
committed
kinda new callbacks interface
1 parentd5fe0f3 commit77c26d0

File tree

8 files changed

+261
-12
lines changed

8 files changed

+261
-12
lines changed

‎contrib/test_decoding/sql/prepared.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ PREPARE TRANSACTION 'test_prepared#3';
3232
-- with ddl exists.
3333

3434
-- separate table because of the lock from the ALTER
35-
-- this will come before the '5' row above, as this commits before it.
3635
INSERT INTO test_prepared2VALUES (7);
3736

38-
COMMIT PREPARED'test_prepared#3';
37+
ROLLBACK PREPARED'test_prepared#3';
3938

4039
-- make sure stuff still works
4140
INSERT INTO test_prepared1VALUES (8);

‎contrib/test_decoding/test_decoding.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
239239
appendStringInfoString(ctx->out,"COMMIT");
240240
break;
241241
caseXLOG_XACT_PREPARE:
242-
appendStringInfoString(ctx->out,"PREPARE");
242+
appendStringInfo(ctx->out,"PREPARE '%s'",txn->gid);
243243
break;
244244
caseXLOG_XACT_COMMIT_PREPARED:
245-
appendStringInfoString(ctx->out,"COMMIT PREPARED");
245+
appendStringInfo(ctx->out,"COMMIT PREPARED '%s'",txn->gid);
246246
break;
247247
caseXLOG_XACT_ABORT_PREPARED:
248-
appendStringInfoString(ctx->out,"ABORT PREPARED");
248+
appendStringInfo(ctx->out,"ABORT PREPARED '%s'",txn->gid);
249249
break;
250250
}
251251

‎src/backend/replication/logical/decode.c

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,31 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
282282
break;
283283
}
284284
caseXLOG_XACT_PREPARE:
285-
{
286-
xl_xact_parsed_prepareparsed;
287-
ParsePrepareRecord(XLogRecGetInfo(buf->record),
288-
XLogRecGetData(buf->record),&parsed);
289-
DecodePrepare(ctx,buf,&parsed);
290-
break;
285+
{
286+
xl_xact_parsed_prepareparsed;
287+
288+
/* check that output plugin capable of twophase decoding */
289+
if (!ctx->twophase_hadling)
290+
{
291+
ReorderBufferProcessXid(reorder,XLogRecGetXid(r),buf->origptr);
292+
break;
293+
}
294+
295+
/* ok, parse it */
296+
ParsePrepareRecord(XLogRecGetInfo(buf->record),
297+
XLogRecGetData(buf->record),&parsed);
298+
299+
/* does output plugin wants this particular transaction? */
300+
if (ReorderBufferPrepareNeedSkip(reorder,parsed.twophase_xid,
301+
parsed.twophase_gid))
302+
{
303+
ReorderBufferProcessXid(reorder,parsed.twophase_xid,
304+
buf->origptr);
305+
break;
306+
}
307+
308+
DecodePrepare(ctx,buf,&parsed);
309+
break;
291310
}
292311
default:
293312
elog(ERROR,"unexpected RM_XACT_ID record type: %u",info);

‎src/backend/replication/logical/logical.c

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions
5858
boolis_init);
5959
staticvoidshutdown_cb_wrapper(LogicalDecodingContext*ctx);
6060
staticvoidbegin_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn);
61+
staticboolfilter_prepare_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
62+
char*gid);
63+
staticvoidprepare_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
64+
XLogRecPtrprepare_lsn);
65+
staticvoidcommit_prepared_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
66+
XLogRecPtrcommit_lsn);
67+
staticvoidabort_prepared_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
68+
XLogRecPtrabort_lsn);
6169
staticvoidcommit_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
6270
XLogRecPtrcommit_lsn);
6371
staticvoidchange_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
@@ -122,6 +130,7 @@ StartupDecodingContext(List *output_plugin_options,
122130
MemoryContextcontext,
123131
old_context;
124132
LogicalDecodingContext*ctx;
133+
inttwophase_callbacks;
125134

126135
/* shorter lines... */
127136
slot=MyReplicationSlot;
@@ -179,8 +188,25 @@ StartupDecodingContext(List *output_plugin_options,
179188
ctx->reorder->begin=begin_cb_wrapper;
180189
ctx->reorder->apply_change=change_cb_wrapper;
181190
ctx->reorder->commit=commit_cb_wrapper;
191+
ctx->reorder->filter_prepare=filter_prepare_cb_wrapper;
192+
ctx->reorder->prepare=prepare_cb_wrapper;
193+
ctx->reorder->commit_prepared=commit_prepared_cb_wrapper;
194+
ctx->reorder->abort_prepared=abort_prepared_cb_wrapper;
182195
ctx->reorder->message=message_cb_wrapper;
183196

197+
/* check that plugin implements all necessary callbacks to perform 2PC */
198+
twophase_callbacks= (ctx->callbacks.prepare_cb!=NULL)+
199+
(ctx->callbacks.commit_prepared_cb!=NULL)+
200+
(ctx->callbacks.abort_prepared_cb!=NULL);
201+
202+
ctx->twophase_hadling= (twophase_callbacks==3);
203+
204+
if (twophase_callbacks!=3&&twophase_callbacks!=0)
205+
ereport(WARNING,
206+
(errmsg("Output plugin registered only %d twophase callbacks out of 3. "
207+
"Twophase transactions will be decoded as ordinary ones.",
208+
twophase_callbacks)));
209+
184210
ctx->out=makeStringInfo();
185211
ctx->prepare_write=prepare_write;
186212
ctx->write=do_write;
@@ -649,6 +675,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
649675
error_context_stack=errcallback.previous;
650676
}
651677

678+
staticvoid
679+
prepare_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
680+
XLogRecPtrprepare_lsn)
681+
{
682+
LogicalDecodingContext*ctx=cache->private_data;
683+
LogicalErrorCallbackStatestate;
684+
ErrorContextCallbackerrcallback;
685+
686+
/* Push callback + info on the error context stack */
687+
state.ctx=ctx;
688+
state.callback_name="prepare";
689+
state.report_location=txn->final_lsn;/* beginning of commit record */
690+
errcallback.callback=output_plugin_error_callback;
691+
errcallback.arg= (void*)&state;
692+
errcallback.previous=error_context_stack;
693+
error_context_stack=&errcallback;
694+
695+
/* set output state */
696+
ctx->accept_writes= true;
697+
ctx->write_xid=txn->xid;
698+
ctx->write_location=txn->end_lsn;/* points to the end of the record */
699+
700+
/* do the actual work: call callback */
701+
ctx->callbacks.prepare_cb(ctx,txn,prepare_lsn);
702+
703+
/* Pop the error context stack */
704+
error_context_stack=errcallback.previous;
705+
}
706+
707+
staticvoid
708+
commit_prepared_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
709+
XLogRecPtrcommit_lsn)
710+
{
711+
LogicalDecodingContext*ctx=cache->private_data;
712+
LogicalErrorCallbackStatestate;
713+
ErrorContextCallbackerrcallback;
714+
715+
/* Push callback + info on the error context stack */
716+
state.ctx=ctx;
717+
state.callback_name="commit_prepared";
718+
state.report_location=txn->final_lsn;/* beginning of commit record */
719+
errcallback.callback=output_plugin_error_callback;
720+
errcallback.arg= (void*)&state;
721+
errcallback.previous=error_context_stack;
722+
error_context_stack=&errcallback;
723+
724+
/* set output state */
725+
ctx->accept_writes= true;
726+
ctx->write_xid=txn->xid;
727+
ctx->write_location=txn->end_lsn;/* points to the end of the record */
728+
729+
/* do the actual work: call callback */
730+
ctx->callbacks.commit_prepared_cb(ctx,txn,commit_lsn);
731+
732+
/* Pop the error context stack */
733+
error_context_stack=errcallback.previous;
734+
}
735+
736+
staticvoid
737+
abort_prepared_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
738+
XLogRecPtrabort_lsn)
739+
{
740+
LogicalDecodingContext*ctx=cache->private_data;
741+
LogicalErrorCallbackStatestate;
742+
ErrorContextCallbackerrcallback;
743+
744+
/* Push callback + info on the error context stack */
745+
state.ctx=ctx;
746+
state.callback_name="abort_prepared";
747+
state.report_location=txn->final_lsn;/* beginning of commit record */
748+
errcallback.callback=output_plugin_error_callback;
749+
errcallback.arg= (void*)&state;
750+
errcallback.previous=error_context_stack;
751+
error_context_stack=&errcallback;
752+
753+
/* set output state */
754+
ctx->accept_writes= true;
755+
ctx->write_xid=txn->xid;
756+
ctx->write_location=txn->end_lsn;/* points to the end of the record */
757+
758+
/* do the actual work: call callback */
759+
ctx->callbacks.abort_prepared_cb(ctx,txn,abort_lsn);
760+
761+
/* Pop the error context stack */
762+
error_context_stack=errcallback.previous;
763+
}
764+
652765
staticvoid
653766
change_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,
654767
Relationrelation,ReorderBufferChange*change)
@@ -684,6 +797,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
684797
error_context_stack=errcallback.previous;
685798
}
686799

800+
staticbool
801+
filter_prepare_cb_wrapper(ReorderBuffer*cache,ReorderBufferTXN*txn,char*gid)
802+
{
803+
LogicalDecodingContext*ctx=cache->private_data;
804+
LogicalErrorCallbackStatestate;
805+
ErrorContextCallbackerrcallback;
806+
boolret;
807+
808+
/* Push callback + info on the error context stack */
809+
state.ctx=ctx;
810+
state.callback_name="filter_prepare";
811+
state.report_location=InvalidXLogRecPtr;
812+
errcallback.callback=output_plugin_error_callback;
813+
errcallback.arg= (void*)&state;
814+
errcallback.previous=error_context_stack;
815+
error_context_stack=&errcallback;
816+
817+
// /* set output state */
818+
// ctx->accept_writes = false;
819+
820+
/* do the actual work: call callback */
821+
ret=ctx->callbacks.filter_prepare_cb(ctx,txn,gid);
822+
823+
/* Pop the error context stack */
824+
error_context_stack=errcallback.previous;
825+
returnret;
826+
}
827+
687828
bool
688829
filter_by_origin_cb_wrapper(LogicalDecodingContext*ctx,RepOriginIdorigin_id)
689830
{

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,6 +1669,27 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16691669
PG_END_TRY();
16701670
}
16711671

1672+
bool
1673+
ReorderBufferPrepareNeedSkip(ReorderBuffer*rb,TransactionIdxid,char*gid)
1674+
{
1675+
ReorderBufferTXN*txn;
1676+
1677+
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr, false);
1678+
Assert(txn!=NULL);
1679+
returnrb->filter_prepare(rb,txn,gid);
1680+
}
1681+
1682+
void
1683+
ReorderBufferStartPrepare()
1684+
{
1685+
1686+
}
1687+
1688+
void
1689+
ReorderBufferFinishPrepare()
1690+
{
1691+
1692+
}
16721693

16731694
/*
16741695
* Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.

‎src/include/replication/logical.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ typedef struct LogicalDecodingContext
7373
boolprepared_write;
7474
XLogRecPtrwrite_location;
7575
TransactionIdwrite_xid;
76+
77+
/*
78+
* Capabilities of decoding plugin used.
79+
*/
80+
booltwophase_hadling;
7681
}LogicalDecodingContext;
7782

7883
externvoidCheckLogicalDecodingRequirements(void);
@@ -98,5 +103,4 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
98103
externvoidLogicalConfirmReceivedLocation(XLogRecPtrlsn);
99104

100105
externboolfilter_by_origin_cb_wrapper(LogicalDecodingContext*ctx,RepOriginIdorigin_id);
101-
102106
#endif

‎src/include/replication/output_plugin.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,38 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
6767
ReorderBufferTXN*txn,
6868
XLogRecPtrcommit_lsn);
6969

70+
/*
71+
* Called before decoding of PREPARE record to decide whether this
72+
* transaction should be decoded with separate calls to prepare
73+
* and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
74+
* and send as usual transaction.
75+
*/
76+
typedefbool (*LogicalDecodeFilterPrepareCB) (structLogicalDecodingContext*ctx,
77+
ReorderBufferTXN*txn,
78+
char*gid);
79+
80+
/*
81+
* Called for PREPARE record unless it was filtered by filter_prepare()
82+
* callback.
83+
*/
84+
typedefvoid (*LogicalDecodePrepareCB) (structLogicalDecodingContext*ctx,
85+
ReorderBufferTXN*txn,
86+
XLogRecPtrprepare_lsn);
87+
88+
/*
89+
* Called for COMMIT PREPARED.
90+
*/
91+
typedefvoid (*LogicalDecodeCommitPreparedCB) (structLogicalDecodingContext*ctx,
92+
ReorderBufferTXN*txn,
93+
XLogRecPtrcommit_lsn);
94+
95+
/*
96+
* Called for ROLLBACK PREPARED.
97+
*/
98+
typedefvoid (*LogicalDecodeAbortPreparedCB) (structLogicalDecodingContext*ctx,
99+
ReorderBufferTXN*txn,
100+
XLogRecPtrabort_lsn);
101+
70102
/*
71103
* Called for the generic logical decoding messages.
72104
*/
@@ -98,6 +130,10 @@ typedef struct OutputPluginCallbacks
98130
LogicalDecodeBeginCBbegin_cb;
99131
LogicalDecodeChangeCBchange_cb;
100132
LogicalDecodeCommitCBcommit_cb;
133+
LogicalDecodeFilterPrepareCBfilter_prepare_cb;
134+
LogicalDecodePrepareCBprepare_cb;
135+
LogicalDecodeCommitPreparedCBcommit_prepared_cb;
136+
LogicalDecodeAbortPreparedCBabort_prepared_cb;
101137
LogicalDecodeMessageCBmessage_cb;
102138
LogicalDecodeFilterByOriginCBfilter_by_origin_cb;
103139
LogicalDecodeShutdownCBshutdown_cb;

‎src/include/replication/reorderbuffer.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,29 @@ typedef void (*ReorderBufferCommitCB) (
292292
ReorderBufferTXN*txn,
293293
XLogRecPtrcommit_lsn);
294294

295+
typedefbool (*ReorderBufferFilterPrepareCB) (
296+
ReorderBuffer*rb,
297+
ReorderBufferTXN*txn,
298+
char*gid);
299+
300+
/* prepare callback signature */
301+
typedefvoid (*ReorderBufferPrepareCB) (
302+
ReorderBuffer*rb,
303+
ReorderBufferTXN*txn,
304+
XLogRecPtrprepare_lsn);
305+
306+
/* commit prepared callback signature */
307+
typedefvoid (*ReorderBufferCommitPreparedCB) (
308+
ReorderBuffer*rb,
309+
ReorderBufferTXN*txn,
310+
XLogRecPtrcommit_lsn);
311+
312+
/* abort prepared callback signature */
313+
typedefvoid (*ReorderBufferAbortPreparedCB) (
314+
ReorderBuffer*rb,
315+
ReorderBufferTXN*txn,
316+
XLogRecPtrabort_lsn);
317+
295318
/* message callback signature */
296319
typedefvoid (*ReorderBufferMessageCB) (
297320
ReorderBuffer*rb,
@@ -331,6 +354,10 @@ struct ReorderBuffer
331354
ReorderBufferBeginCBbegin;
332355
ReorderBufferApplyChangeCBapply_change;
333356
ReorderBufferCommitCBcommit;
357+
ReorderBufferFilterPrepareCBfilter_prepare;
358+
ReorderBufferPrepareCBprepare;
359+
ReorderBufferCommitPreparedCBcommit_prepared;
360+
ReorderBufferAbortPreparedCBabort_prepared;
334361
ReorderBufferMessageCBmessage;
335362

336363
/*
@@ -413,6 +440,8 @@ voidReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
413440
boolReorderBufferXidHasCatalogChanges(ReorderBuffer*,TransactionIdxid);
414441
boolReorderBufferXidHasBaseSnapshot(ReorderBuffer*,TransactionIdxid);
415442

443+
boolReorderBufferPrepareNeedSkip(ReorderBuffer*rb,TransactionIdxid,char*gid);
444+
416445
ReorderBufferTXN*ReorderBufferGetOldestTXN(ReorderBuffer*);
417446

418447
voidReorderBufferSetRestartPoint(ReorderBuffer*,XLogRecPtrptr);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp