Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcda03cf

Browse files
author
Amit Kapila
committed
Allow enabling two-phase option via replication protocol.
Extend the replication command CREATE_REPLICATION_SLOT to support theTWO_PHASE option. This will allow decoding commands like PREPARETRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED for slots created withthis option. The decoding of the transaction happens at prepare command.This patch also adds support of two-phase in pg_recvlogical via a newoption --two-phase.This option will also be used by future patches that allow streaming oftransactions at prepare time for built-in logical replication. With this,the out-of-core logical replication solutions can enable replication oftwo-phase transactions via replication protocol.Author: Ajin CherianReviewed-By: Jeff Davis, Vignesh C, Amit KapilaDiscussion:https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ruhttps://postgr.es/m/64b9f783c6e125f18f88fbc0c0234e34e71d8639.camel@j-davis.com
1 parent17707c0 commitcda03cf

File tree

12 files changed

+143
-13
lines changed

12 files changed

+143
-13
lines changed

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,19 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot');
144144
</programlisting>
145145

146146
<para>
147-
The followingexample shows how logical decoding is controlled over the
147+
The followingexamples shows how logical decoding is controlled over the
148148
streaming replication protocol, using the
149149
program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
150150
distribution. This requires that client authentication is set up to allow
151151
replication connections
152152
(see <xref linkend="streaming-replication-authentication"/>) and
153153
that <varname>max_wal_senders</varname> is set sufficiently high to allow
154-
an additional connection.
154+
an additional connection. The second example shows how to stream two-phase
155+
transactions. Before you use two-phase commands, you must set
156+
<xref linkend="guc-max-prepared-transactions"/> to atleast 1.
155157
</para>
156158
<programlisting>
159+
Example 1:
157160
$ pg_recvlogical -d postgres --slot=test --create-slot
158161
$ pg_recvlogical -d postgres --slot=test --start -f -
159162
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
@@ -164,6 +167,22 @@ table public.data: INSERT: id[integer]:4 data[text]:'4'
164167
COMMIT 693
165168
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
166169
$ pg_recvlogical -d postgres --slot=test --drop-slot
170+
171+
Example 2:
172+
$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
173+
$ pg_recvlogical -d postgres --slot=test --start -f -
174+
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
175+
$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
176+
$ fg
177+
BEGIN 694
178+
table public.data: INSERT: id[integer]:5 data[text]:'5'
179+
PREPARE TRANSACTION 'test', txid 694
180+
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
181+
$ psql -d postgres -c "COMMIT PREPARED 'test';"
182+
$ fg
183+
COMMIT PREPARED 'test', txid 694
184+
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
185+
$ pg_recvlogical -d postgres --slot=test --drop-slot
167186
</programlisting>
168187

169188
<para>

‎doc/src/sgml/protocol.sgml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
19141914
</varlistentry>
19151915

19161916
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
1917-
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> ] }
1917+
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal>| <literal>TWO_PHASE</literal>] }
19181918
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
19191919
</term>
19201920
<listitem>
@@ -1955,6 +1955,20 @@ The commands accepted in replication mode are:
19551955
</listitem>
19561956
</varlistentry>
19571957

1958+
<varlistentry>
1959+
<term><literal>TWO_PHASE</literal></term>
1960+
<listitem>
1961+
<para>
1962+
Specify that this logical replication slot supports decoding of two-phase
1963+
transactions. With this option, two-phase commands like
1964+
<literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
1965+
and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
1966+
The transaction will be decoded and transmitted at
1967+
<literal>PREPARE TRANSACTION</literal> time.
1968+
</para>
1969+
</listitem>
1970+
</varlistentry>
1971+
19581972
<varlistentry>
19591973
<term><literal>RESERVE_WAL</literal></term>
19601974
<listitem>

‎doc/src/sgml/ref/pg_recvlogical.sgml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ PostgreSQL documentation
6565
<option>--plugin</option>, for the database specified
6666
by <option>--dbname</option>.
6767
</para>
68+
69+
<para>
70+
The <option>--two-phase</option> can be specified with
71+
<option>--create-slot</option> to enable two-phase decoding.
72+
</para>
6873
</listitem>
6974
</varlistentry>
7075

@@ -256,6 +261,17 @@ PostgreSQL documentation
256261
</listitem>
257262
</varlistentry>
258263

264+
<varlistentry>
265+
<term><option>-t</option></term>
266+
<term><option>--two-phase</option></term>
267+
<listitem>
268+
<para>
269+
Enables two-phase decoding. This option should only be specified with
270+
<option>--create-slot</option>
271+
</para>
272+
</listitem>
273+
</varlistentry>
274+
259275
<varlistentry>
260276
<term><option>-v</option></term>
261277
<term><option>--verbose</option></term>

‎src/backend/replication/repl_gram.y

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
8484
%tokenK_SLOT
8585
%tokenK_RESERVE_WAL
8686
%tokenK_TEMPORARY
87+
%tokenK_TWO_PHASE
8788
%tokenK_EXPORT_SNAPSHOT
8889
%tokenK_NOEXPORT_SNAPSHOT
8990
%tokenK_USE_SNAPSHOT
@@ -283,6 +284,11 @@ create_slot_opt:
283284
$$ = makeDefElem("reserve_wal",
284285
(Node *)makeInteger(true), -1);
285286
}
287+
|K_TWO_PHASE
288+
{
289+
$$ = makeDefElem("two_phase",
290+
(Node *)makeInteger(true), -1);
291+
}
286292
;
287293

288294
/* DROP_REPLICATION_SLOT slot*/

‎src/backend/replication/repl_scanner.l

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ RESERVE_WAL{ return K_RESERVE_WAL; }
103103
LOGICAL{return K_LOGICAL; }
104104
SLOT{return K_SLOT; }
105105
TEMPORARY{return K_TEMPORARY; }
106+
TWO_PHASE{return K_TWO_PHASE; }
106107
EXPORT_SNAPSHOT{return K_EXPORT_SNAPSHOT; }
107108
NOEXPORT_SNAPSHOT{return K_NOEXPORT_SNAPSHOT; }
108109
USE_SNAPSHOT{return K_USE_SNAPSHOT; }

‎src/backend/replication/walsender.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
863863
staticvoid
864864
parseCreateReplSlotOptions(CreateReplicationSlotCmd*cmd,
865865
bool*reserve_wal,
866-
CRSSnapshotAction*snapshot_action)
866+
CRSSnapshotAction*snapshot_action,
867+
bool*two_phase)
867868
{
868869
ListCell*lc;
869870
boolsnapshot_action_given= false;
870871
boolreserve_wal_given= false;
872+
booltwo_phase_given= false;
871873

872874
/* Parse options */
873875
foreach(lc,cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
905907
reserve_wal_given= true;
906908
*reserve_wal= true;
907909
}
910+
elseif (strcmp(defel->defname,"two_phase")==0)
911+
{
912+
if (two_phase_given||cmd->kind!=REPLICATION_KIND_LOGICAL)
913+
ereport(ERROR,
914+
(errcode(ERRCODE_SYNTAX_ERROR),
915+
errmsg("conflicting or redundant options")));
916+
two_phase_given= true;
917+
*two_phase= true;
918+
}
908919
else
909920
elog(ERROR,"unrecognized option: %s",defel->defname);
910921
}
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
920931
charxloc[MAXFNAMELEN];
921932
char*slot_name;
922933
boolreserve_wal= false;
934+
booltwo_phase= false;
923935
CRSSnapshotActionsnapshot_action=CRS_EXPORT_SNAPSHOT;
924936
DestReceiver*dest;
925937
TupOutputState*tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
929941

930942
Assert(!MyReplicationSlot);
931943

932-
parseCreateReplSlotOptions(cmd,&reserve_wal,&snapshot_action);
944+
parseCreateReplSlotOptions(cmd,&reserve_wal,&snapshot_action,&two_phase);
933945

934946
/* setup state for WalSndSegmentOpen */
935947
sendTimeLineIsHistoric= false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
954966
*/
955967
ReplicationSlotCreate(cmd->slotname, true,
956968
cmd->temporary ?RS_TEMPORARY :RS_EPHEMERAL,
957-
false);
969+
two_phase);
958970
}
959971

960972
if (cmd->kind==REPLICATION_KIND_LOGICAL)

‎src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
646646
if (temp_replication_slot||create_slot)
647647
{
648648
if (!CreateReplicationSlot(param->bgconn,replication_slot,NULL,
649-
temp_replication_slot, true, true, false))
649+
temp_replication_slot, true, true, false, false))
650650
exit(1);
651651

652652
if (verbose)

‎src/bin/pg_basebackup/pg_receivewal.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ main(int argc, char **argv)
741741
pg_log_info("creating replication slot \"%s\"",replication_slot);
742742

743743
if (!CreateReplicationSlot(conn,replication_slot,NULL, false, true, false,
744-
slot_exists_ok))
744+
slot_exists_ok, false))
745745
exit(1);
746746
exit(0);
747747
}

‎src/bin/pg_basebackup/pg_recvlogical.c

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
/* Global Options */
3636
staticchar*outfile=NULL;
3737
staticintverbose=0;
38+
staticbooltwo_phase= false;
3839
staticintnoloop=0;
3940
staticintstandby_message_timeout=10*1000;/* 10 sec = default */
4041
staticintfsync_interval=10*1000;/* 10 sec = default */
@@ -93,6 +94,7 @@ usage(void)
9394
printf(_(" -s, --status-interval=SECS\n"
9495
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout /1000));
9596
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
97+
printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
9698
printf(_(" -v, --verbose output verbose messages\n"));
9799
printf(_(" -V, --version output version information, then exit\n"));
98100
printf(_(" -?, --help show this help, then exit\n"));
@@ -678,6 +680,7 @@ main(int argc, char **argv)
678680
{"fsync-interval",required_argument,NULL,'F'},
679681
{"no-loop",no_argument,NULL,'n'},
680682
{"verbose",no_argument,NULL,'v'},
683+
{"two-phase",no_argument,NULL,'t'},
681684
{"version",no_argument,NULL,'V'},
682685
{"help",no_argument,NULL,'?'},
683686
/* connection options */
@@ -726,7 +729,7 @@ main(int argc, char **argv)
726729
}
727730
}
728731

729-
while ((c=getopt_long(argc,argv,"E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
732+
while ((c=getopt_long(argc,argv,"E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
730733
long_options,&option_index))!=-1)
731734
{
732735
switch (c)
@@ -749,6 +752,9 @@ main(int argc, char **argv)
749752
case'v':
750753
verbose++;
751754
break;
755+
case't':
756+
two_phase= true;
757+
break;
752758
/* connection options */
753759
case'd':
754760
dbname=pg_strdup(optarg);
@@ -920,6 +926,15 @@ main(int argc, char **argv)
920926
exit(1);
921927
}
922928

929+
if (two_phase&& !do_create_slot)
930+
{
931+
pg_log_error("--two-phase may only be specified with --create-slot");
932+
fprintf(stderr,_("Try \"%s --help\" for more information.\n"),
933+
progname);
934+
exit(1);
935+
}
936+
937+
923938
#ifndefWIN32
924939
pqsignal(SIGINT,sigint_handler);
925940
pqsignal(SIGHUP,sighup_handler);
@@ -976,7 +991,7 @@ main(int argc, char **argv)
976991
pg_log_info("creating replication slot \"%s\"",replication_slot);
977992

978993
if (!CreateReplicationSlot(conn,replication_slot,plugin, false,
979-
false, false,slot_exists_ok))
994+
false, false,slot_exists_ok,two_phase))
980995
exit(1);
981996
startpos=InvalidXLogRecPtr;
982997
}

‎src/bin/pg_basebackup/streamutil.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
486486
bool
487487
CreateReplicationSlot(PGconn*conn,constchar*slot_name,constchar*plugin,
488488
boolis_temporary,boolis_physical,boolreserve_wal,
489-
boolslot_exists_ok)
489+
boolslot_exists_ok,booltwo_phase)
490490
{
491491
PQExpBufferquery;
492492
PGresult*res;
@@ -495,6 +495,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
495495

496496
Assert((is_physical&&plugin==NULL)||
497497
(!is_physical&&plugin!=NULL));
498+
Assert(!(two_phase&&is_physical));
498499
Assert(slot_name!=NULL);
499500

500501
/* Build query */
@@ -510,6 +511,9 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
510511
else
511512
{
512513
appendPQExpBuffer(query," LOGICAL \"%s\"",plugin);
514+
if (two_phase&&PQserverVersion(conn) >=150000)
515+
appendPQExpBufferStr(query," TWO_PHASE");
516+
513517
if (PQserverVersion(conn) >=100000)
514518
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
515519
appendPQExpBufferStr(query," NOEXPORT_SNAPSHOT");

‎src/bin/pg_basebackup/streamutil.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ extern PGconn *GetConnection(void);
3434
externboolCreateReplicationSlot(PGconn*conn,constchar*slot_name,
3535
constchar*plugin,boolis_temporary,
3636
boolis_physical,boolreserve_wal,
37-
boolslot_exists_ok);
37+
boolslot_exists_ok,booltwo_phase);
3838
externboolDropReplicationSlot(PGconn*conn,constchar*slot_name);
3939
externboolRunIdentifySystem(PGconn*conn,char**sysid,
4040
TimeLineID*starttli,

‎src/bin/pg_basebackup/t/030_pg_recvlogical.pl

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use warnings;
66
use TestLib;
77
use PostgresNode;
8-
use Test::Moretests=>15;
8+
use Test::Moretests=>20;
99

1010
program_help_ok('pg_recvlogical');
1111
program_version_ok('pg_recvlogical');
@@ -22,6 +22,7 @@
2222
max_wal_senders = 4
2323
log_min_messages = 'debug1'
2424
log_error_verbosity = verbose
25+
max_prepared_transactions = 10
2526
});
2627
$node->dump_info;
2728
$node->start;
@@ -63,3 +64,45 @@
6364
'--start','--endpos',"$nextlsn",'--no-loop','-f','-'
6465
],
6566
'replayed a transaction');
67+
68+
$node->command_ok(
69+
[
70+
'pg_recvlogical','-S',
71+
'test','-d',
72+
$node->connstr('postgres'),'--drop-slot'
73+
],
74+
'slot dropped');
75+
76+
#test with two-phase option enabled
77+
$node->command_ok(
78+
[
79+
'pg_recvlogical','-S',
80+
'test','-d',
81+
$node->connstr('postgres'),'--create-slot','--two-phase'
82+
],
83+
'slot with two-phase created');
84+
85+
$slot =$node->slot('test');
86+
isnt($slot->{'restart_lsn'},'','restart lsn is defined for new slot');
87+
88+
$node->safe_psql('postgres',
89+
"BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
90+
$node->safe_psql('postgres',
91+
"COMMIT PREPARED 'test'");
92+
$nextlsn =
93+
$node->safe_psql('postgres','SELECT pg_current_wal_insert_lsn()');
94+
chomp($nextlsn);
95+
96+
$node->command_fails(
97+
[
98+
'pg_recvlogical','-S','test','-d',$node->connstr('postgres'),
99+
'--start','--endpos',"$nextlsn",'--two-phase','--no-loop','-f','-'
100+
],
101+
'incorrect usage');
102+
103+
$node->command_ok(
104+
[
105+
'pg_recvlogical','-S','test','-d',$node->connstr('postgres'),
106+
'--start','--endpos',"$nextlsn",'--no-loop','-f','-'
107+
],
108+
'replayed a two-phase transaction');

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp