Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd3d4146

Browse files
committed
Allow bidirectional copy messages in streaming replication mode.
Fujii Masao. Review by Alvaro Herrera, Tom Lane, and myself.
1 parent20f3964 commitd3d4146

File tree

11 files changed

+172
-21
lines changed

11 files changed

+172
-21
lines changed

‎doc/src/sgml/libpq.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2194,6 +2194,16 @@ ExecStatusType PQresultStatus(const PGresult *res);
21942194
</listitem>
21952195
</varlistentry>
21962196

2197+
<varlistentry id="libpq-pgres-copy-both">
2198+
<term><literal>PGRES_COPY_BOTH</literal></term>
2199+
<listitem>
2200+
<para>
2201+
Copy In/Out (to and from server) data transfer started. This is
2202+
currently used only for streaming replication.
2203+
</para>
2204+
</listitem>
2205+
</varlistentry>
2206+
21972207
<varlistentry id="libpq-pgres-bad-response">
21982208
<term><literal>PGRES_BAD_RESPONSE</literal></term>
21992209
<listitem>

‎doc/src/sgml/protocol.sgml

Lines changed: 92 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,12 +1033,25 @@
10331033
</para>
10341034

10351035
<para>
1036-
The CopyInResponse and CopyOutResponse messages include fields that
1037-
inform the frontend of the number of columns per row and the format
1038-
codes being used for each column. (As of the present implementation,
1039-
all columns in a given <command>COPY</> operation will use the same
1040-
format, but the message design does not assume this.)
1036+
There is another Copy-related mode called Copy-both, which allows
1037+
high-speed bulk data transfer to <emphasis>and</> from the server.
1038+
Copy-both mode is initiated when a backend in walsender mode
1039+
executes a <command>START_REPLICATION</command> statement. The
1040+
backend sends a CopyBothResponse message to the frontend. Both
1041+
the backend and the frontend may then send CopyData messages
1042+
until the connection is terminated. See see <xref
1043+
linkend="protocol-replication">.
10411044
</para>
1045+
1046+
<para>
1047+
The CopyInResponse, CopyOutResponse and CopyBothResponse messages
1048+
include fields that inform the frontend of the number of columns
1049+
per row and the format codes being used for each column. (As of
1050+
the present implementation, all columns in a given <command>COPY</>
1051+
operation will use the same format, but the message design does not
1052+
assume this.)
1053+
</para>
1054+
10421055
</sect2>
10431056

10441057
<sect2 id="protocol-async">
@@ -1344,7 +1357,7 @@ The commands accepted in walsender mode are:
13441357
WAL position <replaceable>XXX</>/<replaceable>XXX</>.
13451358
The server can reply with an error, e.g. if the requested section of WAL
13461359
has already been recycled. On success, server responds with a
1347-
CopyOutResponse message, and then starts to stream WAL to the frontend.
1360+
CopyBothResponse message, and then starts to stream WAL to the frontend.
13481361
WAL will continue to be streamed until the connection is broken;
13491362
no further commands will be accepted.
13501363
</para>
@@ -2694,6 +2707,79 @@ CopyOutResponse (B)
26942707
</varlistentry>
26952708

26962709

2710+
<varlistentry>
2711+
<term>
2712+
CopyBothResponse (B)
2713+
</term>
2714+
<listitem>
2715+
<para>
2716+
2717+
<variablelist>
2718+
<varlistentry>
2719+
<term>
2720+
Byte1('W')
2721+
</term>
2722+
<listitem>
2723+
<para>
2724+
Identifies the message as a Start Copy Both response.
2725+
This message is used only for Streaming Replication.
2726+
</para>
2727+
</listitem>
2728+
</varlistentry>
2729+
<varlistentry>
2730+
<term>
2731+
Int32
2732+
</term>
2733+
<listitem>
2734+
<para>
2735+
Length of message contents in bytes, including self.
2736+
</para>
2737+
</listitem>
2738+
</varlistentry>
2739+
<varlistentry>
2740+
<term>
2741+
Int8
2742+
</term>
2743+
<listitem>
2744+
<para>
2745+
0 indicates the overall <command>COPY</command> format
2746+
is textual (rows separated by newlines, columns
2747+
separated by separator characters, etc). 1 indicates
2748+
the overall copy format is binary (similar to DataRow
2749+
format). See <xref linkend="sql-copy"> for more information.
2750+
</para>
2751+
</listitem>
2752+
</varlistentry>
2753+
<varlistentry>
2754+
<term>
2755+
Int16
2756+
</term>
2757+
<listitem>
2758+
<para>
2759+
The number of columns in the data to be copied
2760+
(denoted <replaceable>N</> below).
2761+
</para>
2762+
</listitem>
2763+
</varlistentry>
2764+
<varlistentry>
2765+
<term>
2766+
Int16[<replaceable>N</>]
2767+
</term>
2768+
<listitem>
2769+
<para>
2770+
The format codes to be used for each column.
2771+
Each must presently be zero (text) or one (binary).
2772+
All must be zero if the overall copy format is textual.
2773+
</para>
2774+
</listitem>
2775+
</varlistentry>
2776+
</variablelist>
2777+
2778+
</para>
2779+
</listitem>
2780+
</varlistentry>
2781+
2782+
26972783
<varlistentry>
26982784
<term>
26992785
DataRow (B)

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ static char *recvBuf = NULL;
5050
staticboollibpqrcv_connect(char*conninfo,XLogRecPtrstartpoint);
5151
staticboollibpqrcv_receive(inttimeout,unsignedchar*type,
5252
char**buffer,int*len);
53+
staticvoidlibpqrcv_send(constchar*buffer,intnbytes);
5354
staticvoidlibpqrcv_disconnect(void);
5455

5556
/* Prototypes for private functions */
@@ -64,10 +65,11 @@ _PG_init(void)
6465
{
6566
/* Tell walreceiver how to reach us */
6667
if (walrcv_connect!=NULL||walrcv_receive!=NULL||
67-
walrcv_disconnect!=NULL)
68+
walrcv_send!=NULL||walrcv_disconnect!=NULL)
6869
elog(ERROR,"libpqwalreceiver already loaded");
6970
walrcv_connect=libpqrcv_connect;
7071
walrcv_receive=libpqrcv_receive;
72+
walrcv_send=libpqrcv_send;
7173
walrcv_disconnect=libpqrcv_disconnect;
7274
}
7375

@@ -157,7 +159,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
157159
snprintf(cmd,sizeof(cmd),"START_REPLICATION %X/%X",
158160
startpoint.xlogid,startpoint.xrecoff);
159161
res=libpqrcv_PQexec(cmd);
160-
if (PQresultStatus(res)!=PGRES_COPY_OUT)
162+
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
161163
{
162164
PQclear(res);
163165
ereport(ERROR,
@@ -303,6 +305,7 @@ libpqrcv_PQexec(const char *query)
303305

304306
if (PQresultStatus(lastResult)==PGRES_COPY_IN||
305307
PQresultStatus(lastResult)==PGRES_COPY_OUT||
308+
PQresultStatus(lastResult)==PGRES_COPY_BOTH||
306309
PQstatus(streamConn)==CONNECTION_BAD)
307310
break;
308311
}
@@ -398,3 +401,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
398401

399402
return true;
400403
}
404+
405+
/*
406+
* Send a message to XLOG stream.
407+
*
408+
* ereports on error.
409+
*/
410+
staticvoid
411+
libpqrcv_send(constchar*buffer,intnbytes)
412+
{
413+
if (PQputCopyData(streamConn,buffer,nbytes) <=0||
414+
PQflush(streamConn))
415+
ereport(ERROR,
416+
(errmsg("could not send data to WAL stream: %s",
417+
PQerrorMessage(streamConn))));
418+
}

‎src/backend/replication/walreceiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ boolam_walreceiver;
5757
/* libpqreceiver hooks to these when loaded */
5858
walrcv_connect_typewalrcv_connect=NULL;
5959
walrcv_receive_typewalrcv_receive=NULL;
60+
walrcv_send_typewalrcv_send=NULL;
6061
walrcv_disconnect_typewalrcv_disconnect=NULL;
6162

6263
#defineNAPTIME_PER_CYCLE 100/* max sleep time between cycles (100ms) */
@@ -247,7 +248,7 @@ WalReceiverMain(void)
247248
/* Load the libpq-specific functions */
248249
load_file("libpqwalreceiver", false);
249250
if (walrcv_connect==NULL||walrcv_receive==NULL||
250-
walrcv_disconnect==NULL)
251+
walrcv_send==NULL||walrcv_disconnect==NULL)
251252
elog(ERROR,"libpqwalreceiver didn't initialize correctly");
252253

253254
/*

‎src/backend/replication/walsender.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,8 @@ WalSndHandshake(void)
287287
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
288288
errmsg("standby connections not allowed because wal_level=minimal")));
289289

290-
/* Send aCopyOutResponse message, and start streaming */
291-
pq_beginmessage(&buf,'H');
290+
/* Send aCopyBothResponse message, and start streaming */
291+
pq_beginmessage(&buf,'W');
292292
pq_sendbyte(&buf,0);
293293
pq_sendint(&buf,0,2);
294294
pq_endmessage(&buf);

‎src/include/replication/walreceiver.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
8484
char**buffer,int*len);
8585
externPGDLLIMPORTwalrcv_receive_typewalrcv_receive;
8686

87+
typedefvoid (*walrcv_send_type) (constchar*buffer,intnbytes);
88+
externPGDLLIMPORTwalrcv_send_typewalrcv_send;
89+
8790
typedefvoid (*walrcv_disconnect_type) (void);
8891
externPGDLLIMPORTwalrcv_disconnect_typewalrcv_disconnect;
8992

‎src/interfaces/libpq/fe-exec.c

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ char *const pgresStatus[] = {
3535
"PGRES_TUPLES_OK",
3636
"PGRES_COPY_OUT",
3737
"PGRES_COPY_IN",
38+
"PGRES_COPY_BOTH",
3839
"PGRES_BAD_RESPONSE",
3940
"PGRES_NONFATAL_ERROR",
4041
"PGRES_FATAL_ERROR"
@@ -174,6 +175,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
174175
casePGRES_TUPLES_OK:
175176
casePGRES_COPY_OUT:
176177
casePGRES_COPY_IN:
178+
casePGRES_COPY_BOTH:
177179
/* non-error cases */
178180
break;
179181
default:
@@ -1591,6 +1593,12 @@ PQgetResult(PGconn *conn)
15911593
else
15921594
res=PQmakeEmptyPGresult(conn,PGRES_COPY_OUT);
15931595
break;
1596+
casePGASYNC_COPY_BOTH:
1597+
if (conn->result&&conn->result->resultStatus==PGRES_COPY_BOTH)
1598+
res=pqPrepareAsyncResult(conn);
1599+
else
1600+
res=PQmakeEmptyPGresult(conn,PGRES_COPY_BOTH);
1601+
break;
15941602
default:
15951603
printfPQExpBuffer(&conn->errorMessage,
15961604
libpq_gettext("unexpected asyncStatus: %d\n"),
@@ -1775,6 +1783,13 @@ PQexecStart(PGconn *conn)
17751783
return false;
17761784
}
17771785
}
1786+
elseif (resultStatus==PGRES_COPY_BOTH)
1787+
{
1788+
/* We don't allow PQexec during COPY BOTH */
1789+
printfPQExpBuffer(&conn->errorMessage,
1790+
libpq_gettext("PQexec not allowed during COPY BOTH\n"));
1791+
return false;
1792+
}
17781793
/* check for loss of connection, too */
17791794
if (conn->status==CONNECTION_BAD)
17801795
return false;
@@ -1798,7 +1813,7 @@ PQexecFinish(PGconn *conn)
17981813
* than one --- but merge error messages if we get more than one error
17991814
* result.
18001815
*
1801-
* We have to stop if we see copy in/out, however. We will resume parsing
1816+
* We have to stop if we see copy in/out/both, however. We will resume parsing
18021817
* after application performs the data transfer.
18031818
*
18041819
* Also stop if the connection is lost (else we'll loop infinitely).
@@ -1827,6 +1842,7 @@ PQexecFinish(PGconn *conn)
18271842
lastResult=result;
18281843
if (result->resultStatus==PGRES_COPY_IN||
18291844
result->resultStatus==PGRES_COPY_OUT||
1845+
result->resultStatus==PGRES_COPY_BOTH||
18301846
conn->status==CONNECTION_BAD)
18311847
break;
18321848
}
@@ -2000,7 +2016,7 @@ PQnotifies(PGconn *conn)
20002016
}
20012017

20022018
/*
2003-
* PQputCopyData - send some data to the backend during COPY IN
2019+
* PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
20042020
*
20052021
* Returns 1 if successful, 0 if data could not be sent (only possible
20062022
* in nonblock mode), or -1 if an error occurs.
@@ -2010,7 +2026,8 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
20102026
{
20112027
if (!conn)
20122028
return-1;
2013-
if (conn->asyncStatus!=PGASYNC_COPY_IN)
2029+
if (conn->asyncStatus!=PGASYNC_COPY_IN&&
2030+
conn->asyncStatus!=PGASYNC_COPY_BOTH)
20142031
{
20152032
printfPQExpBuffer(&conn->errorMessage,
20162033
libpq_gettext("no COPY in progress\n"));
@@ -2148,6 +2165,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
21482165

21492166
/*
21502167
* PQgetCopyData - read a row of data from the backend during COPY OUT
2168+
* or COPY BOTH
21512169
*
21522170
* If successful, sets *buffer to point to a malloc'd row of data, and
21532171
* returns row length (always > 0) as result.
@@ -2161,7 +2179,8 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
21612179
*buffer=NULL;/* for all failure cases */
21622180
if (!conn)
21632181
return-2;
2164-
if (conn->asyncStatus!=PGASYNC_COPY_OUT)
2182+
if (conn->asyncStatus!=PGASYNC_COPY_OUT&&
2183+
conn->asyncStatus!=PGASYNC_COPY_BOTH)
21652184
{
21662185
printfPQExpBuffer(&conn->errorMessage,
21672186
libpq_gettext("no COPY in progress\n"));

‎src/interfaces/libpq/fe-protocol2.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,10 @@ pqParseInput2(PGconn *conn)
541541
case'H':/* Start Copy Out */
542542
conn->asyncStatus=PGASYNC_COPY_OUT;
543543
break;
544+
/*
545+
* Don't need to process CopyBothResponse here because
546+
* it never arrives from the server during protocol 2.0.
547+
*/
544548
default:
545549
printfPQExpBuffer(&conn->errorMessage,
546550
libpq_gettext(

‎src/interfaces/libpq/fe-protocol3.c

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,12 @@ pqParseInput3(PGconn *conn)
358358
conn->asyncStatus=PGASYNC_COPY_OUT;
359359
conn->copy_already_done=0;
360360
break;
361+
case'W':/* Start Copy Both */
362+
if (getCopyStart(conn,PGRES_COPY_BOTH))
363+
return;
364+
conn->asyncStatus=PGASYNC_COPY_BOTH;
365+
conn->copy_already_done=0;
366+
break;
361367
case'd':/* Copy Data */
362368

363369
/*
@@ -1196,7 +1202,8 @@ getNotify(PGconn *conn)
11961202
}
11971203

11981204
/*
1199-
* getCopyStart - process CopyInResponse or CopyOutResponse message
1205+
* getCopyStart - process CopyInResponse, CopyOutResponse or
1206+
* CopyBothResponse message
12001207
*
12011208
* parseInput already read the message type and length.
12021209
*/
@@ -1367,6 +1374,7 @@ getCopyDataMessage(PGconn *conn)
13671374

13681375
/*
13691376
* PQgetCopyData - read a row of data from the backend during COPY OUT
1377+
* or COPY BOTH
13701378
*
13711379
* If successful, sets *buffer to point to a malloc'd row of data, and
13721380
* returns row length (always > 0) as result.
@@ -1390,10 +1398,10 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
13901398
if (msgLength<0)
13911399
{
13921400
/*
1393-
* On end-of-copy, exit COPY_OUT mode and let caller read status
1394-
* with PQgetResult().The normal case is that it's Copy Done,
1395-
* but we let parseInput read that. If error, we expect the state
1396-
* was already changed.
1401+
* On end-of-copy, exit COPY_OUTor COPY_BOTHmode and let caller
1402+
*read statuswith PQgetResult().The normal case is that it's
1403+
*Copy Done,but we let parseInput read that. If error, we expect
1404+
*the statewas already changed.
13971405
*/
13981406
if (msgLength==-1)
13991407
conn->asyncStatus=PGASYNC_BUSY;

‎src/interfaces/libpq/libpq-fe.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ typedef enum
8585
* contains the result tuples */
8686
PGRES_COPY_OUT,/* Copy Out data transfer in progress */
8787
PGRES_COPY_IN,/* Copy In data transfer in progress */
88+
PGRES_COPY_BOTH,/* Copy In/Out data transfer in progress */
8889
PGRES_BAD_RESPONSE,/* an unexpected response was recv'd from the
8990
* backend */
9091
PGRES_NONFATAL_ERROR,/* notice or warning message */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp