Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4648243

Browse files
author
Amit Kapila
committed
Add support for streaming to built-in logical replication.
To add support for streaming of in-progress transactions into thebuilt-in logical replication, we need to do three things:* Extend the logical replication protocol, so identify in-progresstransactions, and allow adding additional bits of information (e.g.XID of subtransactions).* Modify the output plugin (pgoutput) to implement the new streamAPI callbacks, by leveraging the extended replication protocol.* Modify the replication apply worker, to properly handle streamedin-progress transaction by spilling the data to disk and thenreplaying them on commit.We however must explicitly disable streaming replication duringreplication slot creation, even if the plugin supports it. Wedon't need to replicate the changes accumulated during this phase,and moreover we don't have a replication connection open so wedon't have where to send the data anyway.Author: Tomas Vondra, Dilip Kumar and Amit KapilaReviewed-by: Amit Kapila, Kuntal Ghosh and Ajin CherianTested-by: Neha Sharma, Mahendra Singh Thalor and Ajin CherianDiscussion:https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent66f1630 commit4648243

File tree

23 files changed

+1766
-74
lines changed

23 files changed

+1766
-74
lines changed

‎doc/src/sgml/monitoring.sgml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,22 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
15091509
<entry><literal>WALWrite</literal></entry>
15101510
<entry>Waiting for a write to a WAL file.</entry>
15111511
</row>
1512+
<row>
1513+
<entry><literal>LogicalChangesRead</literal></entry>
1514+
<entry>Waiting for a read from a logical changes file.</entry>
1515+
</row>
1516+
<row>
1517+
<entry><literal>LogicalChangesWrite</literal></entry>
1518+
<entry>Waiting for a write to a logical changes file.</entry>
1519+
</row>
1520+
<row>
1521+
<entry><literal>LogicalSubxactRead</literal></entry>
1522+
<entry>Waiting for a read from a logical subxact file.</entry>
1523+
</row>
1524+
<row>
1525+
<entry><literal>LogicalSubxactWrite</literal></entry>
1526+
<entry>Waiting for a write to a logical subxact file.</entry>
1527+
</row>
15121528
</tbody>
15131529
</tgroup>
15141530
</table>

‎doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
165165
<xref linkend="sql-createsubscription"/>. See there for more
166166
information. The parameters that can be altered
167167
are <literal>slot_name</literal>,
168-
<literal>synchronous_commit</literal>, and
169-
<literal>binary</literal>.
168+
<literal>synchronous_commit</literal>,
169+
<literal>binary</literal>, and
170+
<literal>streaming</literal>.
170171
</para>
171172
</listitem>
172173
</varlistentry>

‎doc/src/sgml/ref/create_subscription.sgml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
228228
</para>
229229
</listitem>
230230
</varlistentry>
231+
<varlistentry>
232+
<term><literal>streaming</literal> (<type>boolean</type>)</term>
233+
<listitem>
234+
<para>
235+
Specifies whether streaming of in-progress transactions should
236+
be enabled for this subscription. By default, all transactions
237+
are fully decoded on the publisher, and only then sent to the
238+
subscriber as a whole.
239+
</para>
240+
</listitem>
241+
</varlistentry>
231242
</variablelist></para>
232243
</listitem>
233244
</varlistentry>

‎src/backend/catalog/pg_subscription.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
6666
sub->owner=subform->subowner;
6767
sub->enabled=subform->subenabled;
6868
sub->binary=subform->subbinary;
69+
sub->stream=subform->substream;
6970

7071
/* Get conninfo */
7172
datum=SysCacheGetAttr(SUBSCRIPTIONOID,

‎src/backend/catalog/system_views.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
11281128

11291129
-- All columns of pg_subscription except subconninfo are readable.
11301130
REVOKE ALLON pg_subscriptionFROM public;
1131-
GRANTSELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
1131+
GRANTSELECT (subdbid, subname, subowner, subenabled, subbinary,substream,subslotname, subpublications)
11321132
ON pg_subscription TO public;
11331133

11341134

‎src/backend/commands/subscriptioncmds.c

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ parse_subscription_options(List *options,
6363
bool*copy_data,
6464
char**synchronous_commit,
6565
bool*refresh,
66-
bool*binary_given,bool*binary)
66+
bool*binary_given,bool*binary,
67+
bool*streaming_given,bool*streaming)
6768
{
6869
ListCell*lc;
6970
boolconnect_given= false;
@@ -99,6 +100,11 @@ parse_subscription_options(List *options,
99100
*binary_given= false;
100101
*binary= false;
101102
}
103+
if (streaming)
104+
{
105+
*streaming_given= false;
106+
*streaming= false;
107+
}
102108

103109
/* Parse options */
104110
foreach(lc,options)
@@ -194,6 +200,16 @@ parse_subscription_options(List *options,
194200
*binary_given= true;
195201
*binary=defGetBoolean(defel);
196202
}
203+
elseif (strcmp(defel->defname,"streaming")==0&&streaming)
204+
{
205+
if (*streaming_given)
206+
ereport(ERROR,
207+
(errcode(ERRCODE_SYNTAX_ERROR),
208+
errmsg("conflicting or redundant options")));
209+
210+
*streaming_given= true;
211+
*streaming=defGetBoolean(defel);
212+
}
197213
else
198214
ereport(ERROR,
199215
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
337353
boolenabled_given;
338354
boolenabled;
339355
boolcopy_data;
356+
boolstreaming;
357+
boolstreaming_given;
340358
char*synchronous_commit;
341359
char*conninfo;
342360
char*slotname;
@@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
360378
&copy_data,
361379
&synchronous_commit,
362380
NULL,/* no "refresh" */
363-
&binary_given,&binary);
381+
&binary_given,&binary,
382+
&streaming_given,&streaming);
364383

365384
/*
366385
* Since creating a replication slot is not transactional, rolling back
@@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
427446
values[Anum_pg_subscription_subowner-1]=ObjectIdGetDatum(owner);
428447
values[Anum_pg_subscription_subenabled-1]=BoolGetDatum(enabled);
429448
values[Anum_pg_subscription_subbinary-1]=BoolGetDatum(binary);
449+
values[Anum_pg_subscription_substream-1]=BoolGetDatum(streaming);
430450
values[Anum_pg_subscription_subconninfo-1]=
431451
CStringGetTextDatum(conninfo);
432452
if (slotname)
@@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
698718
char*synchronous_commit;
699719
boolbinary_given;
700720
boolbinary;
721+
boolstreaming_given;
722+
boolstreaming;
701723

702724
parse_subscription_options(stmt->options,
703725
NULL,/* no "connect" */
@@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
707729
NULL,/* no "copy_data" */
708730
&synchronous_commit,
709731
NULL,/* no "refresh" */
710-
&binary_given,&binary);
732+
&binary_given,&binary,
733+
&streaming_given,&streaming);
711734

712735
if (slotname_given)
713736
{
@@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
739762
replaces[Anum_pg_subscription_subbinary-1]= true;
740763
}
741764

765+
if (streaming_given)
766+
{
767+
values[Anum_pg_subscription_substream-1]=
768+
BoolGetDatum(streaming);
769+
replaces[Anum_pg_subscription_substream-1]= true;
770+
}
771+
742772
update_tuple= true;
743773
break;
744774
}
@@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
756786
NULL,/* no "copy_data" */
757787
NULL,/* no "synchronous_commit" */
758788
NULL,/* no "refresh" */
759-
NULL,NULL);/* no "binary" */
789+
NULL,NULL,/* no "binary" */
790+
NULL,NULL);/* no streaming */
760791
Assert(enabled_given);
761792

762793
if (!sub->slotname&&enabled)
@@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
800831
&copy_data,
801832
NULL,/* no "synchronous_commit" */
802833
&refresh,
803-
NULL,NULL);/* no "binary" */
804-
834+
NULL,NULL,/* no "binary" */
835+
NULL,NULL);/* no "streaming" */
805836
values[Anum_pg_subscription_subpublications-1]=
806837
publicationListToArray(stmt->publication);
807838
replaces[Anum_pg_subscription_subpublications-1]= true;
@@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
843874
&copy_data,
844875
NULL,/* no "synchronous_commit" */
845876
NULL,/* no "refresh" */
846-
NULL,NULL);/* no "binary" */
877+
NULL,NULL,/* no "binary" */
878+
NULL,NULL);/* no "streaming" */
847879

848880
AlterSubscription_refresh(sub,copy_data);
849881

‎src/backend/postmaster/pgstat.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w)
41414141
caseWAIT_EVENT_WAL_WRITE:
41424142
event_name="WALWrite";
41434143
break;
4144+
caseWAIT_EVENT_LOGICAL_CHANGES_READ:
4145+
event_name="LogicalChangesRead";
4146+
break;
4147+
caseWAIT_EVENT_LOGICAL_CHANGES_WRITE:
4148+
event_name="LogicalChangesWrite";
4149+
break;
4150+
caseWAIT_EVENT_LOGICAL_SUBXACT_READ:
4151+
event_name="LogicalSubxactRead";
4152+
break;
4153+
caseWAIT_EVENT_LOGICAL_SUBXACT_WRITE:
4154+
event_name="LogicalSubxactWrite";
4155+
break;
41444156

41454157
/* no default case, so that compiler will warn */
41464158
}

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
425425
appendStringInfo(&cmd,"proto_version '%u'",
426426
options->proto.logical.proto_version);
427427

428+
if (options->proto.logical.streaming&&
429+
PQserverVersion(conn->streamConn) >=140000)
430+
appendStringInfo(&cmd,", streaming 'on'");
431+
428432
pubnames=options->proto.logical.publication_names;
429433
pubnames_str=stringlist_to_identifierstr(conn->streamConn,pubnames);
430434
if (!pubnames_str)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp