Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0c013e0

Browse files
committed
Refactor replication connection code of various pg_basebackup utilities.
Move some more code to manage replication connection command tostreamutil.c. A later patch will introduce replication slot viapg_receivexlog and this avoid duplicating relevant code betweenpg_receivexlog and pg_recvlogical.Author: Michael Paquier, with some editing by me.
1 parentfdf81c9 commit0c013e0

File tree

5 files changed

+222
-141
lines changed

5 files changed

+222
-141
lines changed

‎src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,8 +1569,8 @@ BaseBackup(void)
15691569
{
15701570
PGresult*res;
15711571
char*sysidentifier;
1572-
uint32latesttli;
1573-
uint32starttli;
1572+
TimeLineIDlatesttli;
1573+
TimeLineIDstarttli;
15741574
char*basebkp;
15751575
charescaped_label[MAXPGPATH];
15761576
char*maxrate_clause=NULL;
@@ -1624,23 +1624,8 @@ BaseBackup(void)
16241624
/*
16251625
* Run IDENTIFY_SYSTEM so we can get the timeline
16261626
*/
1627-
res=PQexec(conn,"IDENTIFY_SYSTEM");
1628-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
1629-
{
1630-
fprintf(stderr,_("%s: could not send replication command \"%s\": %s"),
1631-
progname,"IDENTIFY_SYSTEM",PQerrorMessage(conn));
1627+
if (!RunIdentifySystem(conn,&sysidentifier,&latesttli,NULL,NULL))
16321628
disconnect_and_exit(1);
1633-
}
1634-
if (PQntuples(res)!=1||PQnfields(res)<3)
1635-
{
1636-
fprintf(stderr,
1637-
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
1638-
progname,PQntuples(res),PQnfields(res),1,3);
1639-
disconnect_and_exit(1);
1640-
}
1641-
sysidentifier=pg_strdup(PQgetvalue(res,0,0));
1642-
latesttli=atoi(PQgetvalue(res,0,1));
1643-
PQclear(res);
16441629

16451630
/*
16461631
* Start the actual backup

‎src/bin/pg_basebackup/pg_receivexlog.c

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli)
253253
staticvoid
254254
StreamLog(void)
255255
{
256-
PGresult*res;
257-
XLogRecPtrstartpos;
258-
uint32starttli;
259-
XLogRecPtrserverpos;
260-
uint32servertli;
261-
uint32hi,
262-
lo;
256+
XLogRecPtrstartpos,serverpos;
257+
TimeLineIDstarttli,servertli;
263258

264259
/*
265260
* Connect in replication mode to the server
@@ -280,33 +275,12 @@ StreamLog(void)
280275
}
281276

282277
/*
283-
* Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
284-
* position.
278+
* Identify server, obtaining start LSN position and current timeline ID
279+
* at the same time, necessary if not valid data can be found in the
280+
* existing output directory.
285281
*/
286-
res=PQexec(conn,"IDENTIFY_SYSTEM");
287-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
288-
{
289-
fprintf(stderr,_("%s: could not send replication command \"%s\": %s"),
290-
progname,"IDENTIFY_SYSTEM",PQerrorMessage(conn));
291-
disconnect_and_exit(1);
292-
}
293-
if (PQntuples(res)!=1||PQnfields(res)<3)
294-
{
295-
fprintf(stderr,
296-
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
297-
progname,PQntuples(res),PQnfields(res),1,3);
282+
if (!RunIdentifySystem(conn,NULL,&servertli,&serverpos,NULL))
298283
disconnect_and_exit(1);
299-
}
300-
servertli=atoi(PQgetvalue(res,0,1));
301-
if (sscanf(PQgetvalue(res,0,2),"%X/%X",&hi,&lo)!=2)
302-
{
303-
fprintf(stderr,
304-
_("%s: could not parse transaction log location \"%s\"\n"),
305-
progname,PQgetvalue(res,0,2));
306-
disconnect_and_exit(1);
307-
}
308-
serverpos= ((uint64)hi) <<32 |lo;
309-
PQclear(res);
310284

311285
/*
312286
* Figure out where to start streaming.

‎src/bin/pg_basebackup/pg_recvlogical.c

Lines changed: 27 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,6 @@ sighup_handler(int signum)
596596
int
597597
main(intargc,char**argv)
598598
{
599-
PGresult*res;
600599
staticstructoptionlong_options[]= {
601600
/* general options */
602601
{"file",required_argument,NULL,'f'},
@@ -628,6 +627,7 @@ main(int argc, char **argv)
628627
intoption_index;
629628
uint32hi,
630629
lo;
630+
char*db_name;
631631

632632
progname=get_progname(argv[0]);
633633
set_pglocale_pgservice(argv[0],PG_TEXTDOMAIN("pg_recvlogical"));
@@ -834,124 +834,62 @@ main(int argc, char **argv)
834834
#endif
835835

836836
/*
837-
*don't really need this but it actually helps to get more precise error
838-
*messages about authentication, required GUCs and such without starting
839-
*to loop around connection attempts lateron.
837+
*Obtain a connection to server. This is not really necessary but it
838+
*helps to get more precise error messages about authentification,
839+
*required GUC parameters and such.
840840
*/
841-
{
842-
conn=GetConnection();
843-
if (!conn)
844-
/* Error message already written in GetConnection() */
845-
exit(1);
841+
conn=GetConnection();
842+
if (!conn)
843+
/* Error message already written in GetConnection() */
844+
exit(1);
846845

847-
/*
848-
* Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
849-
* position.
850-
*/
851-
res=PQexec(conn,"IDENTIFY_SYSTEM");
852-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
853-
{
854-
fprintf(stderr,_("%s: could not send replication command \"%s\": %s"),
855-
progname,"IDENTIFY_SYSTEM",PQerrorMessage(conn));
856-
disconnect_and_exit(1);
857-
}
846+
/*
847+
* Run IDENTIFY_SYSTEM to make sure we connected using a database specific
848+
* replication connection.
849+
*/
850+
if (!RunIdentifySystem(conn,NULL,NULL,NULL,&db_name))
851+
disconnect_and_exit(1);
858852

859-
if (PQntuples(res)!=1||PQnfields(res)<4)
860-
{
861-
fprintf(stderr,
862-
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
863-
progname,PQntuples(res),PQnfields(res),1,4);
864-
disconnect_and_exit(1);
865-
}
866-
PQclear(res);
853+
if (db_name==NULL)
854+
{
855+
fprintf(stderr,
856+
_("%s: failed to establish database specific replication connection\n"),
857+
progname);
858+
disconnect_and_exit(1);
867859
}
868860

869-
870-
/*
871-
* drop a replication slot
872-
*/
861+
/* Drop a replication slot. */
873862
if (do_drop_slot)
874863
{
875-
charquery[256];
876-
877864
if (verbose)
878865
fprintf(stderr,
879866
_("%s: dropping replication slot \"%s\"\n"),
880867
progname,replication_slot);
881868

882-
snprintf(query,sizeof(query),"DROP_REPLICATION_SLOT \"%s\"",
883-
replication_slot);
884-
res=PQexec(conn,query);
885-
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
886-
{
887-
fprintf(stderr,_("%s: could not send replication command \"%s\": %s"),
888-
progname,query,PQerrorMessage(conn));
869+
if (!DropReplicationSlot(conn,replication_slot))
889870
disconnect_and_exit(1);
890-
}
891-
892-
if (PQntuples(res)!=0||PQnfields(res)!=0)
893-
{
894-
fprintf(stderr,
895-
_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
896-
progname,replication_slot,PQntuples(res),PQnfields(res),0,0);
897-
disconnect_and_exit(1);
898-
}
899-
900-
PQclear(res);
901-
disconnect_and_exit(0);
902871
}
903872

904-
/*
905-
* create a replication slot
906-
*/
873+
/* Create a replication slot. */
907874
if (do_create_slot)
908875
{
909-
charquery[256];
910-
911876
if (verbose)
912877
fprintf(stderr,
913878
_("%s: creating replication slot \"%s\"\n"),
914879
progname,replication_slot);
915880

916-
snprintf(query,sizeof(query),"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
917-
replication_slot,plugin);
918-
919-
res=PQexec(conn,query);
920-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
921-
{
922-
fprintf(stderr,_("%s: could not send replication command \"%s\": %s"),
923-
progname,query,PQerrorMessage(conn));
881+
if (!CreateReplicationSlot(conn,replication_slot,plugin,
882+
&startpos, false))
924883
disconnect_and_exit(1);
925-
}
926-
927-
if (PQntuples(res)!=1||PQnfields(res)!=4)
928-
{
929-
fprintf(stderr,
930-
_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
931-
progname,replication_slot,PQntuples(res),PQnfields(res),1,4);
932-
disconnect_and_exit(1);
933-
}
934-
935-
if (sscanf(PQgetvalue(res,0,1),"%X/%X",&hi,&lo)!=2)
936-
{
937-
fprintf(stderr,
938-
_("%s: could not parse transaction log location \"%s\"\n"),
939-
progname,PQgetvalue(res,0,1));
940-
disconnect_and_exit(1);
941-
}
942-
startpos= ((uint64)hi) <<32 |lo;
943-
944-
replication_slot=strdup(PQgetvalue(res,0,0));
945-
PQclear(res);
946884
}
947885

948-
949886
if (!do_start_slot)
950887
disconnect_and_exit(0);
951888

889+
/* Stream loop */
952890
while (true)
953891
{
954-
StreamLog();
892+
StreamLogicalLog();
955893
if (time_to_abort)
956894
{
957895
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp