Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5a991ef

Browse files
committed
Allow logical decoding via the walsender interface.
In order for this to work, walsenders need the optional ability toconnect to a database, so the "replication" keyword now allows trueor false, for backward-compatibility, and the new value "database"(which causes the "dbname" parameter to be respected).walsender needs to loop not only when idle but also when sendingdecoded data to the user and when waiting for more xlog data to decode.This means that there are now three separate loops inside walsender.c;although some refactoring has been done here, this is still a bit ugly.Andres Freund, with contributions from Álvaro Herrera, and furtherreview by me.
1 parentcb9a0c7 commit5a991ef

File tree

12 files changed

+915
-167
lines changed

12 files changed

+915
-167
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,10 +1302,13 @@
13021302

13031303
<para>
13041304
To initiate streaming replication, the frontend sends the
1305-
<literal>replication</> parameter in the startup message. This tells the
1306-
backend to go into walsender mode, wherein a small set of replication commands
1307-
can be issued instead of SQL statements. Only the simple query protocol can be
1308-
used in walsender mode.
1305+
<literal>replication</> parameter in the startup message. A boolean value
1306+
of <literal>true</> tells the backend to go into walsender mode, wherein a
1307+
small set of replication commands can be issued instead of SQL statements. Only
1308+
the simple query protocol can be used in walsender mode.
1309+
Passing <literal>database</> as the value instructs walsender to connect to
1310+
the database specified in the <literal>dbname</> parameter, which will allow
1311+
the connection to be used for logical replication from that database.
13091312

13101313
The commands accepted in walsender mode are:
13111314

@@ -1315,7 +1318,7 @@ The commands accepted in walsender mode are:
13151318
<listitem>
13161319
<para>
13171320
Requests the server to identify itself. Server replies with a result
1318-
set of a single row, containingthree fields:
1321+
set of a single row, containingfour fields:
13191322
</para>
13201323

13211324
<para>
@@ -1357,6 +1360,17 @@ The commands accepted in walsender mode are:
13571360
</listitem>
13581361
</varlistentry>
13591362

1363+
<varlistentry>
1364+
<term>
1365+
dbname
1366+
</term>
1367+
<listitem>
1368+
<para>
1369+
Database connected to or NULL.
1370+
</para>
1371+
</listitem>
1372+
</varlistentry>
1373+
13601374
</variablelist>
13611375
</para>
13621376
</listitem>

‎src/backend/postmaster/postmaster.c

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1884,10 +1884,23 @@ ProcessStartupPacket(Port *port, bool SSLdone)
18841884
port->cmdline_options=pstrdup(valptr);
18851885
elseif (strcmp(nameptr,"replication")==0)
18861886
{
1887-
if (!parse_bool(valptr,&am_walsender))
1887+
/*
1888+
* Due to backward compatibility concerns the replication
1889+
* parameter is a hybrid beast which allows the value to be
1890+
* either boolean or the string 'database'. The latter
1891+
* connects to a specific database which is e.g. required for
1892+
* logical decoding while.
1893+
*/
1894+
if (strcmp(valptr,"database")==0)
1895+
{
1896+
am_walsender= true;
1897+
am_db_walsender= true;
1898+
}
1899+
elseif (!parse_bool(valptr,&am_walsender))
18881900
ereport(FATAL,
18891901
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1890-
errmsg("invalid value for boolean option \"replication\"")));
1902+
errmsg("invalid value for parameter \"replication\""),
1903+
errhint("Valid values are: false, 0, true, 1, database.")));
18911904
}
18921905
else
18931906
{
@@ -1968,8 +1981,15 @@ ProcessStartupPacket(Port *port, bool SSLdone)
19681981
if (strlen(port->user_name) >=NAMEDATALEN)
19691982
port->user_name[NAMEDATALEN-1]='\0';
19701983

1971-
/* Walsender is not related to a particular database */
1972-
if (am_walsender)
1984+
/*
1985+
* Normal walsender backends, e.g. for streaming replication, are not
1986+
* connected to a particular database. But walsenders used for logical
1987+
* replication need to connect to a specific database. We allow streaming
1988+
* replication commands to be issued even if connected to a database as it
1989+
* can make sense to first make a basebackup and then stream changes
1990+
* starting from that.
1991+
*/
1992+
if (am_walsender&& !am_db_walsender)
19731993
port->database_name[0]='\0';
19741994

19751995
/*

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,16 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
131131
"the primary server: %s",
132132
PQerrorMessage(streamConn))));
133133
}
134-
if (PQnfields(res)!=3||PQntuples(res)!=1)
134+
if (PQnfields(res)<3||PQntuples(res)!=1)
135135
{
136136
intntuples=PQntuples(res);
137137
intnfields=PQnfields(res);
138138

139139
PQclear(res);
140140
ereport(ERROR,
141141
(errmsg("invalid response from primary server"),
142-
errdetail("Expected 1 tuple with 3fields,got %dtuples with %d fields.",
143-
ntuples,nfields)));
142+
errdetail("Could not identify system: Got %d rows and %dfields,expected %drows and %d or more fields.",
143+
ntuples,nfields,3,1)));
144144
}
145145
primary_sysid=PQgetvalue(res,0,0);
146146
*primary_tli=pg_atoi(PQgetvalue(res,0,1),4,0);

‎src/backend/replication/repl_gram.y

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,17 @@ Node *replication_parse_result;
7373
%tokenK_WAL
7474
%tokenK_TIMELINE
7575
%tokenK_PHYSICAL
76+
%tokenK_LOGICAL
7677
%tokenK_SLOT
7778

7879
%type<node>command
79-
%type<node>base_backupstart_replicationcreate_replication_slotdrop_replication_slotidentify_systemtimeline_history
80+
%type<node>base_backupstart_replicationstart_logical_replicationcreate_replication_slotdrop_replication_slotidentify_systemtimeline_history
8081
%type<list>base_backup_opt_list
8182
%type<defelt>base_backup_opt
8283
%type<uintval>opt_timeline
84+
%type<list>plugin_optionsplugin_opt_list
85+
%type<defelt>plugin_opt_elem
86+
%type<node>plugin_opt_arg
8387
%type<str>opt_slot
8488

8589
%%
@@ -98,6 +102,7 @@ command:
98102
identify_system
99103
|base_backup
100104
|start_replication
105+
|start_logical_replication
101106
|create_replication_slot
102107
|drop_replication_slot
103108
|timeline_history
@@ -165,8 +170,8 @@ base_backup_opt:
165170
}
166171
;
167172

168-
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL*/
169173
create_replication_slot:
174+
/* CREATE_REPLICATION_SLOT slot PHYSICAL*/
170175
K_CREATE_REPLICATION_SLOTIDENTK_PHYSICAL
171176
{
172177
CreateReplicationSlotCmd *cmd;
@@ -175,9 +180,19 @@ create_replication_slot:
175180
cmd->slotname =$2;
176181
$$ = (Node *) cmd;
177182
}
183+
/* CREATE_REPLICATION_SLOT slot LOGICAL plugin*/
184+
|K_CREATE_REPLICATION_SLOTIDENTK_LOGICALIDENT
185+
{
186+
CreateReplicationSlotCmd *cmd;
187+
cmd = makeNode(CreateReplicationSlotCmd);
188+
cmd->kind = REPLICATION_KIND_LOGICAL;
189+
cmd->slotname =$2;
190+
cmd->plugin =$4;
191+
$$ = (Node *) cmd;
192+
}
178193
;
179194

180-
/* DROP_REPLICATION_SLOTSLOTslot*/
195+
/* DROP_REPLICATION_SLOT slot*/
181196
drop_replication_slot:
182197
K_DROP_REPLICATION_SLOTIDENT
183198
{
@@ -205,19 +220,19 @@ start_replication:
205220
}
206221
;
207222

208-
opt_timeline:
209-
K_TIMELINEUCONST
223+
/* START_REPLICATION SLOT slot LOGICAL %X/%X options*/
224+
start_logical_replication:
225+
K_START_REPLICATIONK_SLOTIDENTK_LOGICALRECPTRplugin_options
210226
{
211-
if ($2 <=0)
212-
ereport(ERROR,
213-
(errcode(ERRCODE_SYNTAX_ERROR),
214-
(errmsg("invalid timeline %u", $2))));
215-
$$ =$2;
227+
StartReplicationCmd *cmd;
228+
cmd = makeNode(StartReplicationCmd);
229+
cmd->kind = REPLICATION_KIND_LOGICAL;;
230+
cmd->slotname =$3;
231+
cmd->startpoint =$5;
232+
cmd->options =$6;
233+
$$ = (Node *) cmd;
216234
}
217-
|/* EMPTY*/
218-
{$$ =0; }
219235
;
220-
221236
/*
222237
* TIMELINE_HISTORY %d
223238
*/
@@ -250,6 +265,46 @@ opt_slot:
250265
{$$ =NULL; }
251266
;
252267

268+
opt_timeline:
269+
K_TIMELINEUCONST
270+
{
271+
if ($2 <=0)
272+
ereport(ERROR,
273+
(errcode(ERRCODE_SYNTAX_ERROR),
274+
(errmsg("invalid timeline %u", $2))));
275+
$$ =$2;
276+
}
277+
|/* EMPTY*/{$$ =0; }
278+
;
279+
280+
281+
plugin_options:
282+
'('plugin_opt_list')'{$$ =$2; }
283+
|/* EMPTY*/{$$ = NIL; }
284+
;
285+
286+
plugin_opt_list:
287+
plugin_opt_elem
288+
{
289+
$$ = list_make1($1);
290+
}
291+
|plugin_opt_list','plugin_opt_elem
292+
{
293+
$$ = lappend($1,$3);
294+
}
295+
;
296+
297+
plugin_opt_elem:
298+
IDENTplugin_opt_arg
299+
{
300+
$$ = makeDefElem($1,$2);
301+
}
302+
;
303+
304+
plugin_opt_arg:
305+
SCONST{$$ = (Node *) makeString($1); }
306+
|/* EMPTY*/{$$ =NULL; }
307+
;
253308
%%
254309

255310
#include"repl_scanner.c"

‎src/backend/replication/repl_scanner.l

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ CREATE_REPLICATION_SLOT{ return K_CREATE_REPLICATION_SLOT; }
9494
DROP_REPLICATION_SLOT{return K_DROP_REPLICATION_SLOT; }
9595
TIMELINE_HISTORY{return K_TIMELINE_HISTORY; }
9696
PHYSICAL{return K_PHYSICAL; }
97+
LOGICAL{return K_LOGICAL; }
9798
SLOT{return K_SLOT; }
9899

99100
","{return','; }

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp