Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit63cf61c

Browse files
author
Amit Kapila
committed
Add prepare API support for streaming transactions in logical replication.
Commita8fd13c added support for prepared transactions to built-inlogical replication via a new option "two_phase" for a subscription. The"two_phase" option was not allowed with the existing streaming option.This commit permits the combination of "streaming" and "two_phase"subscription options. It extends the pgoutput plugin and the subscriberside code to add the prepare API for streaming transactions which willapply the changes accumulated in the spool-file at prepare time.Author: Peter Smith and Ajin CherianReviewed-by: Vignesh C, Amit Kapila, Greg NancarrowTested-By: Haiying TangDiscussion:https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ruDiscussion:https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
1 parent6424337 commit63cf61c

File tree

12 files changed

+667
-79
lines changed

12 files changed

+667
-79
lines changed

‎doc/src/sgml/logicaldecoding.sgml‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,9 @@ OutputPluginWrite(ctx, true);
11991199
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
12001200
and <function>stream_change_cb</function>) and two optional callbacks
12011201
(<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
1202+
Also, if streaming of two-phase commands is to be supported, then additional
1203+
callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
1204+
for details).
12021205
</para>
12031206

12041207
<para>
@@ -1237,7 +1240,13 @@ stream_start_cb(...); &lt;-- start of second block of changes
12371240
stream_change_cb(...);
12381241
stream_stop_cb(...); &lt;-- end of second block of changes
12391242

1240-
stream_commit_cb(...); &lt;-- commit of the streamed transaction
1243+
1244+
[a. when using normal commit]
1245+
stream_commit_cb(...); &lt;-- commit of the streamed transaction
1246+
1247+
[b. when using two-phase commit]
1248+
stream_prepare_cb(...); &lt;-- prepare the streamed transaction
1249+
commit_prepared_cb(...); &lt;-- commit of the prepared transaction
12411250
</programlisting>
12421251
</para>
12431252

‎doc/src/sgml/protocol.sgml‎

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7411,7 +7411,7 @@ Stream Abort
74117411
</variablelist>
74127412

74137413
<para>
7414-
The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
7414+
The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared, Stream Prepare)
74157415
are available since protocol version 3.
74167416
</para>
74177417

@@ -7714,6 +7714,80 @@ are available since protocol version 3.
77147714
</listitem>
77157715
</varlistentry>
77167716

7717+
<varlistentry>
7718+
7719+
<term>Stream Prepare</term>
7720+
<listitem>
7721+
<para>
7722+
7723+
<variablelist>
7724+
7725+
<varlistentry>
7726+
<term>Byte1('p')</term>
7727+
<listitem><para>
7728+
Identifies the message as a two-phase stream prepare message.
7729+
</para></listitem>
7730+
</varlistentry>
7731+
7732+
<varlistentry>
7733+
<term>
7734+
Int8(0)
7735+
</term>
7736+
<listitem><para>
7737+
Flags; currently unused.
7738+
</para></listitem>
7739+
</varlistentry>
7740+
7741+
<varlistentry>
7742+
<term>
7743+
Int64 (XLogRecPtr)
7744+
</term>
7745+
<listitem><para>
7746+
The LSN of the prepare.
7747+
</para></listitem>
7748+
</varlistentry>
7749+
7750+
<varlistentry>
7751+
<term>
7752+
Int64 (XLogRecPtr)
7753+
</term>
7754+
<listitem><para>
7755+
The end LSN of the prepare transaction.
7756+
</para></listitem>
7757+
</varlistentry>
7758+
7759+
<varlistentry>
7760+
<term>
7761+
Int64 (TimestampTz)
7762+
</term>
7763+
<listitem><para>
7764+
Prepare timestamp of the transaction. The value is in number
7765+
of microseconds since PostgreSQL epoch (2000-01-01).
7766+
</para></listitem>
7767+
</varlistentry>
7768+
7769+
<varlistentry>
7770+
<term>
7771+
Int32 (TransactionId)
7772+
</term>
7773+
<listitem><para>
7774+
Xid of the transaction.
7775+
</para></listitem>
7776+
</varlistentry>
7777+
7778+
<varlistentry>
7779+
<term>String</term>
7780+
<listitem><para>
7781+
The user defined GID of the two-phase transaction.
7782+
</para></listitem>
7783+
</varlistentry>
7784+
7785+
</variablelist>
7786+
7787+
</para>
7788+
</listitem>
7789+
</varlistentry>
7790+
77177791
</variablelist>
77187792

77197793
<para>

‎doc/src/sgml/ref/create_subscription.sgml‎

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
238238
subscriber as a whole.
239239
</para>
240240

241-
<para>
242-
The <literal>streaming</literal> option cannot be used with the
243-
<literal>two_phase</literal> option.
244-
</para>
245-
246241
</listitem>
247242
</varlistentry>
248243
<varlistentry>
@@ -269,11 +264,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
269264
to know the actual two-phase state.
270265
</para>
271266

272-
<para>
273-
The <literal>two_phase</literal> option cannot be used with the
274-
<literal>streaming</literal> option.
275-
</para>
276-
277267
</listitem>
278268
</varlistentry>
279269
</variablelist></para>

‎src/backend/commands/subscriptioncmds.c‎

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -335,25 +335,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
335335
errmsg("subscription with %s must also set %s",
336336
"slot_name = NONE","create_slot = false")));
337337
}
338-
339-
/*
340-
* Do additional checking for the disallowed combination of two_phase and
341-
* streaming. While streaming and two_phase can theoretically be
342-
* supported, it needs more analysis to allow them together.
343-
*/
344-
if (opts->twophase&&
345-
IsSet(supported_opts,SUBOPT_TWOPHASE_COMMIT)&&
346-
IsSet(opts->specified_opts,SUBOPT_TWOPHASE_COMMIT))
347-
{
348-
if (opts->streaming&&
349-
IsSet(supported_opts,SUBOPT_STREAMING)&&
350-
IsSet(opts->specified_opts,SUBOPT_STREAMING))
351-
ereport(ERROR,
352-
(errcode(ERRCODE_SYNTAX_ERROR),
353-
/*- translator: both %s are strings of the form "option = value" */
354-
errmsg("%s and %s are mutually exclusive options",
355-
"two_phase = true","streaming = true")));
356-
}
357338
}
358339

359340
/*
@@ -933,12 +914,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
933914

934915
if (IsSet(opts.specified_opts,SUBOPT_STREAMING))
935916
{
936-
if ((sub->twophasestate!=LOGICALREP_TWOPHASE_STATE_DISABLED)&&opts.streaming)
937-
ereport(ERROR,
938-
(errcode(ERRCODE_SYNTAX_ERROR),
939-
errmsg("cannot set %s for two-phase enabled subscription",
940-
"streaming = true")));
941-
942917
values[Anum_pg_subscription_substream-1]=
943918
BoolGetDatum(opts.streaming);
944919
replaces[Anum_pg_subscription_substream-1]= true;

‎src/backend/replication/logical/proto.c‎

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
145145
}
146146

147147
/*
148-
* The core functionality for logicalrep_write_prepare.
148+
* The core functionality for logicalrep_write_prepare and
149+
* logicalrep_write_stream_prepare.
149150
*/
150151
staticvoid
151152
logicalrep_write_prepare_common(StringInfoout,LogicalRepMsgTypetype,
@@ -188,7 +189,8 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
188189
}
189190

190191
/*
191-
* The core functionality for logicalrep_read_prepare.
192+
* The core functionality for logicalrep_read_prepare and
193+
* logicalrep_read_stream_prepare.
192194
*/
193195
staticvoid
194196
logicalrep_read_prepare_common(StringInfoin,char*msgtype,
@@ -209,6 +211,8 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype,
209211
elog(ERROR,"end_lsn is not set in %s message",msgtype);
210212
prepare_data->prepare_time=pq_getmsgint64(in);
211213
prepare_data->xid=pq_getmsgint(in,4);
214+
if (prepare_data->xid==InvalidTransactionId)
215+
elog(ERROR,"invalid two-phase transaction ID in %s message",msgtype);
212216

213217
/* read gid (copy it into a pre-allocated buffer) */
214218
strlcpy(prepare_data->gid,pq_getmsgstring(in),sizeof(prepare_data->gid));
@@ -339,6 +343,27 @@ logicalrep_read_rollback_prepared(StringInfo in,
339343
strlcpy(rollback_data->gid,pq_getmsgstring(in),sizeof(rollback_data->gid));
340344
}
341345

346+
/*
347+
* Write STREAM PREPARE to the output stream.
348+
*/
349+
void
350+
logicalrep_write_stream_prepare(StringInfoout,
351+
ReorderBufferTXN*txn,
352+
XLogRecPtrprepare_lsn)
353+
{
354+
logicalrep_write_prepare_common(out,LOGICAL_REP_MSG_STREAM_PREPARE,
355+
txn,prepare_lsn);
356+
}
357+
358+
/*
359+
* Read STREAM PREPARE from the stream.
360+
*/
361+
void
362+
logicalrep_read_stream_prepare(StringInfoin,LogicalRepPreparedTxnData*prepare_data)
363+
{
364+
logicalrep_read_prepare_common(in,"stream prepare",prepare_data);
365+
}
366+
342367
/*
343368
* Write ORIGIN to the output stream.
344369
*/

‎src/backend/replication/logical/worker.c‎

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,56 @@ apply_handle_rollback_prepared(StringInfo s)
10521052
pgstat_report_activity(STATE_IDLE,NULL);
10531053
}
10541054

1055+
/*
1056+
* Handle STREAM PREPARE.
1057+
*
1058+
* Logic is in two parts:
1059+
* 1. Replay all the spooled operations
1060+
* 2. Mark the transaction as prepared
1061+
*/
1062+
staticvoid
1063+
apply_handle_stream_prepare(StringInfos)
1064+
{
1065+
LogicalRepPreparedTxnDataprepare_data;
1066+
1067+
if (in_streamed_transaction)
1068+
ereport(ERROR,
1069+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1070+
errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1071+
1072+
/* Tablesync should never receive prepare. */
1073+
if (am_tablesync_worker())
1074+
ereport(ERROR,
1075+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1076+
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1077+
1078+
logicalrep_read_stream_prepare(s,&prepare_data);
1079+
1080+
elog(DEBUG1,"received prepare for streamed transaction %u",prepare_data.xid);
1081+
1082+
/* Replay all the spooled operations. */
1083+
apply_spooled_messages(prepare_data.xid,prepare_data.prepare_lsn);
1084+
1085+
/* Mark the transaction as prepared. */
1086+
apply_handle_prepare_internal(&prepare_data);
1087+
1088+
CommitTransactionCommand();
1089+
1090+
pgstat_report_stat(false);
1091+
1092+
store_flush_position(prepare_data.end_lsn);
1093+
1094+
in_remote_transaction= false;
1095+
1096+
/* unlink the files with serialized changes and subxact info. */
1097+
stream_cleanup_files(MyLogicalRepWorker->subid,prepare_data.xid);
1098+
1099+
/* Process any tables that are being synchronized in parallel. */
1100+
process_syncing_tables(prepare_data.end_lsn);
1101+
1102+
pgstat_report_activity(STATE_IDLE,NULL);
1103+
}
1104+
10551105
/*
10561106
* Handle ORIGIN message.
10571107
*
@@ -1291,7 +1341,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
12911341
*/
12921342
oldcxt=MemoryContextSwitchTo(TopTransactionContext);
12931343

1294-
/*open the spool file for the committed transaction */
1344+
/*Open the spool file for the committed/prepared transaction */
12951345
changes_filename(path,MyLogicalRepWorker->subid,xid);
12961346
elog(DEBUG1,"replaying changes from file \"%s\"",path);
12971347

@@ -2357,6 +2407,10 @@ apply_dispatch(StringInfo s)
23572407
caseLOGICAL_REP_MSG_ROLLBACK_PREPARED:
23582408
apply_handle_rollback_prepared(s);
23592409
return;
2410+
2411+
caseLOGICAL_REP_MSG_STREAM_PREPARE:
2412+
apply_handle_stream_prepare(s);
2413+
return;
23602414
}
23612415

23622416
ereport(ERROR,

‎src/backend/replication/pgoutput/pgoutput.c‎

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
7171
staticvoidpgoutput_stream_commit(structLogicalDecodingContext*ctx,
7272
ReorderBufferTXN*txn,
7373
XLogRecPtrcommit_lsn);
74+
staticvoidpgoutput_stream_prepare_txn(LogicalDecodingContext*ctx,
75+
ReorderBufferTXN*txn,XLogRecPtrprepare_lsn);
7476

7577
staticboolpublications_valid;
7678
staticboolin_streaming;
@@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
175177
cb->stream_message_cb=pgoutput_message;
176178
cb->stream_truncate_cb=pgoutput_truncate;
177179
/* transaction streaming - two-phase commit */
178-
cb->stream_prepare_cb=NULL;
180+
cb->stream_prepare_cb=pgoutput_stream_prepare_txn;
179181
}
180182

181183
staticvoid
@@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data)
280282
}
281283
else
282284
elog(ERROR,"unrecognized pgoutput option: %s",defel->defname);
283-
284-
/*
285-
* Do additional checking for the disallowed combination of two_phase
286-
* and streaming. While streaming and two_phase can theoretically be
287-
* supported, it needs more analysis to allow them together.
288-
*/
289-
if (data->two_phase&&data->streaming)
290-
ereport(ERROR,
291-
(errcode(ERRCODE_SYNTAX_ERROR),
292-
errmsg("%s and %s are mutually exclusive options",
293-
"two_phase","streaming")));
294285
}
295286
}
296287

@@ -1029,6 +1020,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
10291020
cleanup_rel_sync_cache(txn->xid, true);
10301021
}
10311022

1023+
/*
1024+
* PREPARE callback (for streaming two-phase commit).
1025+
*
1026+
* Notify the downstream to prepare the transaction.
1027+
*/
1028+
staticvoid
1029+
pgoutput_stream_prepare_txn(LogicalDecodingContext*ctx,
1030+
ReorderBufferTXN*txn,
1031+
XLogRecPtrprepare_lsn)
1032+
{
1033+
Assert(rbtxn_is_streamed(txn));
1034+
1035+
OutputPluginUpdateProgress(ctx);
1036+
OutputPluginPrepareWrite(ctx, true);
1037+
logicalrep_write_stream_prepare(ctx->out,txn,prepare_lsn);
1038+
OutputPluginWrite(ctx, true);
1039+
}
1040+
10321041
/*
10331042
* Initialize the relation schema sync cache for a decoding session.
10341043
*

‎src/include/replication/logicalproto.h‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ typedef enum LogicalRepMsgType
6767
LOGICAL_REP_MSG_STREAM_START='S',
6868
LOGICAL_REP_MSG_STREAM_END='E',
6969
LOGICAL_REP_MSG_STREAM_COMMIT='c',
70-
LOGICAL_REP_MSG_STREAM_ABORT='A'
70+
LOGICAL_REP_MSG_STREAM_ABORT='A',
71+
LOGICAL_REP_MSG_STREAM_PREPARE='p'
7172
}LogicalRepMsgType;
7273

7374
/*
@@ -196,7 +197,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN
196197
TimestampTzprepare_time);
197198
externvoidlogicalrep_read_rollback_prepared(StringInfoin,
198199
LogicalRepRollbackPreparedTxnData*rollback_data);
199-
200+
externvoidlogicalrep_write_stream_prepare(StringInfoout,ReorderBufferTXN*txn,
201+
XLogRecPtrprepare_lsn);
202+
externvoidlogicalrep_read_stream_prepare(StringInfoin,
203+
LogicalRepPreparedTxnData*prepare_data);
200204

201205
externvoidlogicalrep_write_origin(StringInfoout,constchar*origin,
202206
XLogRecPtrorigin_lsn);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp