Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9de77b5

Browse files
committed
Allow logical replication to transfer data in binary format.
This patch adds a "binary" option to CREATE/ALTER SUBSCRIPTION.When that's set, the publisher will send data using the data type'stypsend function if any, rather than typoutput. This is generallyfaster, if slightly less robust.As committed, we won't try to transfer user-defined array or compositetypes in binary, for fear that type OIDs won't match at the subscriber.This might be changed later, but it seems like fit material for afollow-on patch.Dave Cramer, reviewed by Daniel Gustafsson, Petr Jelinek, and others;adjusted some by meDiscussion:https://postgr.es/m/CADK3HH+R3xMn=8t3Ct+uD+qJ1KD=Hbif5NFMJ+d5DkoCzp6Vgw@mail.gmail.com
1 parent9add405 commit9de77b5

File tree

21 files changed

+610
-212
lines changed

21 files changed

+610
-212
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7472,7 +7472,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
74727472
(references <link linkend="catalog-pg-database"><structname>pg_database</structname></link>.<structfield>oid</structfield>)
74737473
</para>
74747474
<para>
7475-
OID of the databasewhich the subscription resides in
7475+
OID of the databasethat the subscription resides in
74767476
</para></entry>
74777477
</row>
74787478

@@ -7500,7 +7500,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
75007500
<structfield>subenabled</structfield> <type>bool</type>
75017501
</para>
75027502
<para>
7503-
If true, the subscription is enabled and should be replicating.
7503+
If true, the subscription is enabled and should be replicating
7504+
</para></entry>
7505+
</row>
7506+
7507+
<row>
7508+
<entry role="catalog_table_entry"><para role="column_definition">
7509+
<structfield>subbinary</structfield> <type>bool</type>
7510+
</para>
7511+
<para>
7512+
If true, the subscription will request that the publisher send data
7513+
in binary format
75047514
</para></entry>
75057515
</row>
75067516

@@ -7518,8 +7528,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
75187528
<structfield>subslotname</structfield> <type>name</type>
75197529
</para>
75207530
<para>
7521-
Name of the replication slot in the upstream database. Also used
7522-
for local replication origin name.
7531+
Name of the replication slot in the upstream database (also used
7532+
forthelocal replication origin name)
75237533
</para></entry>
75247534
</row>
75257535

@@ -7528,8 +7538,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
75287538
<structfield>subsynccommit</structfield> <type>text</type>
75297539
</para>
75307540
<para>
7531-
Contains the value of the <varname>synchronous_commit</varname>
7532-
setting for the subscription workers.
7541+
The <varname>synchronous_commit</varname>
7542+
setting for the subscription's workers to use
75337543
</para></entry>
75347544
</row>
75357545

@@ -7538,8 +7548,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
75387548
<structfield>subpublications</structfield> <type>text[]</type>
75397549
</para>
75407550
<para>
7541-
Array of subscribed publication names. These reference the
7542-
publicationsonthepublisher server. For more on publications
7551+
Array of subscribed publication names. These reference
7552+
publicationsdefined intheupstream database. For more on publications
75437553
see <xref linkend="logical-replication-publication"/>.
75447554
</para></entry>
75457555
</row>

‎doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
163163
<para>
164164
This clause alters parameters originally set by
165165
<xref linkend="sql-createsubscription"/>. See there for more
166-
information. The allowed options are <literal>slot_name</literal> and
167-
<literal>synchronous_commit</literal>
166+
information. The parameters that can be altered
167+
are <literal>slot_name</literal>,
168+
<literal>synchronous_commit</literal>, and
169+
<literal>binary</literal>.
168170
</para>
169171
</listitem>
170172
</varlistentry>

‎doc/src/sgml/ref/create_subscription.sgml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
152152
<listitem>
153153
<para>
154154
The value of this parameter overrides the
155-
<xref linkend="guc-synchronous-commit"/> setting. The default
156-
value is <literal>off</literal>.
155+
<xref linkend="guc-synchronous-commit"/> setting within this
156+
subscription's apply worker processes. The default value
157+
is <literal>off</literal>.
157158
</para>
158159

159160
<para>
@@ -178,6 +179,27 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
178179
</listitem>
179180
</varlistentry>
180181

182+
<varlistentry>
183+
<term><literal>binary</literal> (<type>boolean</type>)</term>
184+
<listitem>
185+
<para>
186+
Specifies whether the subscription will request the publisher to
187+
send the data in binary format (as opposed to text).
188+
The default is <literal>false</literal>.
189+
Even when this option is enabled, only data types that have
190+
binary send and receive functions will be transferred in binary.
191+
</para>
192+
193+
<para>
194+
When doing cross-version replication, it could happen that the
195+
publisher has a binary send function for some data type, but the
196+
subscriber lacks a binary receive function for the type. In
197+
such a case, data transfer will fail, and
198+
the <literal>binary</literal> option cannot be used.
199+
</para>
200+
</listitem>
201+
</varlistentry>
202+
181203
<varlistentry>
182204
<term><literal>connect</literal> (<type>boolean</type>)</term>
183205
<listitem>

‎src/backend/catalog/pg_subscription.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok)
6565
sub->name=pstrdup(NameStr(subform->subname));
6666
sub->owner=subform->subowner;
6767
sub->enabled=subform->subenabled;
68+
sub->binary=subform->subbinary;
6869

6970
/* Get conninfo */
7071
datum=SysCacheGetAttr(SUBSCRIPTIONOID,

‎src/backend/catalog/system_views.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
11221122

11231123
-- All columns of pg_subscription except subconninfo are readable.
11241124
REVOKE ALLON pg_subscriptionFROM public;
1125-
GRANTSELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
1125+
GRANTSELECT (subdbid, subname, subowner, subenabled,subbinary,subslotname, subpublications)
11261126
ON pg_subscription TO public;
11271127

11281128

‎src/backend/commands/subscriptioncmds.c

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
5555
* accommodate that.
5656
*/
5757
staticvoid
58-
parse_subscription_options(List*options,bool*connect,bool*enabled_given,
59-
bool*enabled,bool*create_slot,
58+
parse_subscription_options(List*options,
59+
bool*connect,
60+
bool*enabled_given,bool*enabled,
61+
bool*create_slot,
6062
bool*slot_name_given,char**slot_name,
61-
bool*copy_data,char**synchronous_commit,
62-
bool*refresh)
63+
bool*copy_data,
64+
char**synchronous_commit,
65+
bool*refresh,
66+
bool*binary_given,bool*binary)
6367
{
6468
ListCell*lc;
6569
boolconnect_given= false;
@@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
9094
*synchronous_commit=NULL;
9195
if (refresh)
9296
*refresh= true;
97+
if (binary)
98+
{
99+
*binary_given= false;
100+
*binary= false;
101+
}
93102

94103
/* Parse options */
95104
foreach(lc,options)
@@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
175184
refresh_given= true;
176185
*refresh=defGetBoolean(defel);
177186
}
187+
elseif (strcmp(defel->defname,"binary")==0&&binary)
188+
{
189+
if (*binary_given)
190+
ereport(ERROR,
191+
(errcode(ERRCODE_SYNTAX_ERROR),
192+
errmsg("conflicting or redundant options")));
193+
194+
*binary_given= true;
195+
*binary=defGetBoolean(defel);
196+
}
178197
else
179198
ereport(ERROR,
180199
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
322341
char*conninfo;
323342
char*slotname;
324343
boolslotname_given;
344+
boolbinary;
345+
boolbinary_given;
325346
charoriginname[NAMEDATALEN];
326347
boolcreate_slot;
327348
List*publications;
@@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
331352
*
332353
* Connection and publication should not be specified here.
333354
*/
334-
parse_subscription_options(stmt->options,&connect,&enabled_given,
335-
&enabled,&create_slot,&slotname_given,
336-
&slotname,&copy_data,&synchronous_commit,
337-
NULL);
355+
parse_subscription_options(stmt->options,
356+
&connect,
357+
&enabled_given,&enabled,
358+
&create_slot,
359+
&slotname_given,&slotname,
360+
&copy_data,
361+
&synchronous_commit,
362+
NULL,/* no "refresh" */
363+
&binary_given,&binary);
338364

339365
/*
340366
* Since creating a replication slot is not transactional, rolling back
@@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
400426
DirectFunctionCall1(namein,CStringGetDatum(stmt->subname));
401427
values[Anum_pg_subscription_subowner-1]=ObjectIdGetDatum(owner);
402428
values[Anum_pg_subscription_subenabled-1]=BoolGetDatum(enabled);
429+
values[Anum_pg_subscription_subbinary-1]=BoolGetDatum(binary);
403430
values[Anum_pg_subscription_subconninfo-1]=
404431
CStringGetTextDatum(conninfo);
405432
if (slotname)
@@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
669696
char*slotname;
670697
boolslotname_given;
671698
char*synchronous_commit;
672-
673-
parse_subscription_options(stmt->options,NULL,NULL,NULL,
674-
NULL,&slotname_given,&slotname,
675-
NULL,&synchronous_commit,NULL);
699+
boolbinary_given;
700+
boolbinary;
701+
702+
parse_subscription_options(stmt->options,
703+
NULL,/* no "connect" */
704+
NULL,NULL,/* no "enabled" */
705+
NULL,/* no "create_slot" */
706+
&slotname_given,&slotname,
707+
NULL,/* no "copy_data" */
708+
&synchronous_commit,
709+
NULL,/* no "refresh" */
710+
&binary_given,&binary);
676711

677712
if (slotname_given)
678713
{
@@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
697732
replaces[Anum_pg_subscription_subsynccommit-1]= true;
698733
}
699734

735+
if (binary_given)
736+
{
737+
values[Anum_pg_subscription_subbinary-1]=
738+
BoolGetDatum(binary);
739+
replaces[Anum_pg_subscription_subbinary-1]= true;
740+
}
741+
700742
update_tuple= true;
701743
break;
702744
}
@@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
706748
boolenabled,
707749
enabled_given;
708750

709-
parse_subscription_options(stmt->options,NULL,
710-
&enabled_given,&enabled,NULL,
711-
NULL,NULL,NULL,NULL,NULL);
751+
parse_subscription_options(stmt->options,
752+
NULL,/* no "connect" */
753+
&enabled_given,&enabled,
754+
NULL,/* no "create_slot" */
755+
NULL,NULL,/* no "slot_name" */
756+
NULL,/* no "copy_data" */
757+
NULL,/* no "synchronous_commit" */
758+
NULL,/* no "refresh" */
759+
NULL,NULL);/* no "binary" */
712760
Assert(enabled_given);
713761

714762
if (!sub->slotname&&enabled)
@@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
744792
boolcopy_data;
745793
boolrefresh;
746794

747-
parse_subscription_options(stmt->options,NULL,NULL,NULL,
748-
NULL,NULL,NULL,&copy_data,
749-
NULL,&refresh);
795+
parse_subscription_options(stmt->options,
796+
NULL,/* no "connect" */
797+
NULL,NULL,/* no "enabled" */
798+
NULL,/* no "create_slot" */
799+
NULL,NULL,/* no "slot_name" */
800+
&copy_data,
801+
NULL,/* no "synchronous_commit" */
802+
&refresh,
803+
NULL,NULL);/* no "binary" */
750804

751805
values[Anum_pg_subscription_subpublications-1]=
752806
publicationListToArray(stmt->publication);
@@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
781835
(errcode(ERRCODE_SYNTAX_ERROR),
782836
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
783837

784-
parse_subscription_options(stmt->options,NULL,NULL,NULL,
785-
NULL,NULL,NULL,&copy_data,
786-
NULL,NULL);
838+
parse_subscription_options(stmt->options,
839+
NULL,/* no "connect" */
840+
NULL,NULL,/* no "enabled" */
841+
NULL,/* no "create_slot" */
842+
NULL,NULL,/* no "slot_name" */
843+
&copy_data,
844+
NULL,/* no "synchronous_commit" */
845+
NULL,/* no "refresh" */
846+
NULL,NULL);/* no "binary" */
787847

788848
AlterSubscription_refresh(sub,copy_data);
789849

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
424424
PQfreemem(pubnames_literal);
425425
pfree(pubnames_str);
426426

427+
if (options->proto.logical.binary&&
428+
PQserverVersion(conn->streamConn) >=140000)
429+
appendStringInfoString(&cmd,", binary 'true'");
430+
427431
appendStringInfoChar(&cmd,')');
428432
}
429433
else

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp