Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit45fc4c3

Browse files
author
Arthur Zakirov
committed
Use wait_replica_wal_lsn() in pg_start_backup() and pg_stop_backup()
1 parenta0c7fcd commit45fc4c3

File tree

1 file changed

+53
-88
lines changed

1 file changed

+53
-88
lines changed

‎src/backup.c‎

Lines changed: 53 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,18 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
640640

641641
PQclear(res);
642642

643+
/*
644+
* Switch to a new WAL segment. It is necessary to get archived WAL
645+
* segment, which includes start LSN of current backup.
646+
*
647+
* Do not switch for standby node and if backup is stream.
648+
*/
649+
if (!stream_wal)
650+
pg_switch_wal(conn);
651+
/* Wait for start_lsn to be received by replica */
652+
if (from_replica)
653+
wait_replica_wal_lsn(backup->start_lsn, true);
654+
643655
if (!stream_wal)
644656
/*
645657
* Do not wait start_lsn for stream backup.
@@ -658,16 +670,15 @@ pg_switch_wal(PGconn *conn)
658670
PGresult*res;
659671

660672
/* Remove annoying NOTICE messages generated by backend */
661-
res=pgut_execute(conn,"SET client_min_messages = warning;",0,
662-
NULL);
673+
res=pgut_execute(conn,"SET client_min_messages = warning;",0,NULL);
663674
PQclear(res);
664675

665676
if (server_version >=100000)
666677
res=pgut_execute(conn,"SELECT * FROM pg_switch_wal()",0,
667-
NULL);
678+
NULL);
668679
else
669680
res=pgut_execute(conn,"SELECT * FROM pg_switch_xlog()",0,
670-
NULL);
681+
NULL);
671682

672683
PQclear(res);
673684
}
@@ -908,7 +919,7 @@ wait_wal_lsn(XLogRecPtr lsn)
908919
}
909920

910921
/*
911-
* Wait for target 'lsn' on replica instance.
922+
* Wait for target 'lsn' on replica instance from master.
912923
*/
913924
staticvoid
914925
wait_replica_wal_lsn(XLogRecPtrlsn,boolis_start_backup)
@@ -973,6 +984,7 @@ wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup)
973984
staticvoid
974985
pg_stop_backup(pgBackup*backup)
975986
{
987+
PGconn*conn;
976988
PGresult*res;
977989
uint32xlogid;
978990
uint32xrecoff;
@@ -990,8 +1002,11 @@ pg_stop_backup(pgBackup *backup)
9901002
if (!backup_in_progress)
9911003
elog(FATAL,"backup is not in progress");
9921004

1005+
/* For replica we call pg_stop_backup() on master */
1006+
conn= (from_replica) ?master_conn :backup_conn;
1007+
9931008
/* Remove annoying NOTICE messages generated by backend */
994-
res=pgut_execute(backup_conn,"SET client_min_messages = warning;",
1009+
res=pgut_execute(conn,"SET client_min_messages = warning;",
9951010
0,NULL);
9961011
PQclear(res);
9971012

@@ -1005,69 +1020,16 @@ pg_stop_backup(pgBackup *backup)
10051020
backup_id=base36enc(backup->start_time);
10061021

10071022
if (!from_replica)
1008-
{
10091023
snprintf(name,lengthof(name),"pg_probackup, backup_id %s",
10101024
backup_id);
1011-
params[0]=name;
1012-
1013-
res=pgut_execute(backup_conn,"SELECT pg_create_restore_point($1)",
1014-
1,params);
1015-
PQclear(res);
1016-
}
10171025
else
1018-
{
1019-
uint32try_count=0;
1020-
10211026
snprintf(name,lengthof(name),"pg_probackup, backup_id %s. Replica Backup",
10221027
backup_id);
1023-
params[0]=name;
1028+
params[0]=name;
10241029

1025-
res=pgut_execute(master_conn,"SELECT pg_create_restore_point($1)",
1026-
1,params);
1027-
/* Extract timeline and LSN from result */
1028-
XLogDataFromLSN(PQgetvalue(res,0,0),&xlogid,&xrecoff);
1029-
/* Calculate LSN */
1030-
restore_lsn= (XLogRecPtr) ((uint64)xlogid <<32) |xrecoff;
1031-
PQclear(res);
1032-
1033-
/* Switch WAL on master to retreive restore_lsn */
1034-
pg_switch_wal(master_conn);
1035-
1036-
/* Wait for restore_lsn from master */
1037-
while (true)
1038-
{
1039-
XLogRecPtrmin_recovery_lsn;
1040-
1041-
res=pgut_execute(backup_conn,"SELECT min_recovery_end_location from pg_control_recovery()",
1042-
0,NULL);
1043-
/* Extract timeline and LSN from result */
1044-
XLogDataFromLSN(PQgetvalue(res,0,0),&xlogid,&xrecoff);
1045-
/* Calculate LSN */
1046-
min_recovery_lsn= (XLogRecPtr) ((uint64)xlogid <<32) |xrecoff;
1047-
PQclear(res);
1048-
1049-
/* restore_lsn was streamed and applied to the replica */
1050-
if (min_recovery_lsn >=restore_lsn)
1051-
break;
1052-
1053-
sleep(1);
1054-
if (interrupted)
1055-
elog(ERROR,"Interrupted during waiting for restore point LSN");
1056-
try_count++;
1057-
1058-
/* Inform user if restore_lsn is absent in first attempt */
1059-
if (try_count==1)
1060-
elog(INFO,"Wait for restore point LSN %X/%X to be streamed "
1061-
"to replica",
1062-
(uint32) (restore_lsn >>32), (uint32)restore_lsn);
1063-
1064-
if (replica_timeout>0&&try_count>replica_timeout)
1065-
elog(ERROR,"Restore point LSN %X/%X could not be "
1066-
"streamed to replica in %d seconds",
1067-
(uint32) (restore_lsn >>32), (uint32)restore_lsn,
1068-
replica_timeout);
1069-
}
1070-
}
1030+
res=pgut_execute(conn,"SELECT pg_create_restore_point($1)",
1031+
1,params);
1032+
PQclear(res);
10711033

10721034
pfree(backup_id);
10731035
}
@@ -1084,13 +1046,13 @@ pg_stop_backup(pgBackup *backup)
10841046
* pg_stop_backup(false) copy of the backup label and tablespace map
10851047
* so they can be written to disk by the caller.
10861048
*/
1087-
sent=pgut_send(backup_conn,
1049+
sent=pgut_send(conn,
10881050
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
10891051
" current_timestamp(0)::timestamp"
10901052
" FROM pg_stop_backup(false)",
10911053
0,NULL,WARNING);
10921054
else
1093-
sent=pgut_send(backup_conn,
1055+
sent=pgut_send(conn,
10941056
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
10951057
" current_timestamp(0)::timestamp"
10961058
" FROM pg_stop_backup()",
@@ -1108,30 +1070,30 @@ pg_stop_backup(pgBackup *backup)
11081070

11091071
while (1)
11101072
{
1111-
if (!PQconsumeInput(backup_conn)||PQisBusy(backup_conn))
1073+
if (!PQconsumeInput(conn)||PQisBusy(conn))
11121074
{
1113-
pg_stop_backup_timeout++;
1114-
sleep(1);
1075+
pg_stop_backup_timeout++;
1076+
sleep(1);
11151077

1116-
if (interrupted)
1117-
{
1118-
pgut_cancel(backup_conn);
1119-
elog(ERROR,"interrupted during waiting for pg_stop_backup");
1120-
}
1121-
/*
1122-
* If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1123-
* send an interrupt.
1124-
*/
1125-
if (pg_stop_backup_timeout>PG_STOP_BACKUP_TIMEOUT)
1126-
{
1127-
pgut_cancel(backup_conn);
1128-
elog(ERROR,"pg_stop_backup doesn't answer in %d seconds, cancel it",
1129-
PG_STOP_BACKUP_TIMEOUT);
1130-
}
1078+
if (interrupted)
1079+
{
1080+
pgut_cancel(conn);
1081+
elog(ERROR,"interrupted during waiting for pg_stop_backup");
1082+
}
1083+
/*
1084+
* If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1085+
* send an interrupt.
1086+
*/
1087+
if (pg_stop_backup_timeout>PG_STOP_BACKUP_TIMEOUT)
1088+
{
1089+
pgut_cancel(conn);
1090+
elog(ERROR,"pg_stop_backup doesn't answer in %d seconds, cancel it",
1091+
PG_STOP_BACKUP_TIMEOUT);
1092+
}
11311093
}
11321094
else
11331095
{
1134-
res=PQgetResult(backup_conn);
1096+
res=PQgetResult(conn);
11351097
break;
11361098
}
11371099
}
@@ -1228,22 +1190,22 @@ pg_stop_backup(pgBackup *backup)
12281190
if (sscanf(PQgetvalue(res,0,3),XID_FMT,&recovery_xid)!=1)
12291191
elog(ERROR,
12301192
"result of txid_snapshot_xmax() is invalid: %s",
1231-
PQerrorMessage(backup_conn));
1193+
PQerrorMessage(conn));
12321194
if (!parse_time(PQgetvalue(res,0,4),&recovery_time))
12331195
elog(ERROR,
12341196
"result of current_timestamp is invalid: %s",
1235-
PQerrorMessage(backup_conn));
1197+
PQerrorMessage(conn));
12361198
}
12371199
else
12381200
{
12391201
if (sscanf(PQgetvalue(res,0,1),XID_FMT,&recovery_xid)!=1)
12401202
elog(ERROR,
12411203
"result of txid_snapshot_xmax() is invalid: %s",
1242-
PQerrorMessage(backup_conn));
1204+
PQerrorMessage(conn));
12431205
if (!parse_time(PQgetvalue(res,0,2),&recovery_time))
12441206
elog(ERROR,
12451207
"result of current_timestamp is invalid: %s",
1246-
PQerrorMessage(backup_conn));
1208+
PQerrorMessage(conn));
12471209
}
12481210

12491211
PQclear(res);
@@ -1258,6 +1220,9 @@ pg_stop_backup(pgBackup *backup)
12581220
char*xlog_path,
12591221
stream_xlog_path[MAXPGPATH];
12601222

1223+
/* Wait for stop_lsn to be received by replica */
1224+
if (from_replica)
1225+
wait_replica_wal_lsn(stop_backup_lsn, false);
12611226
/*
12621227
* Wait for stop_lsn to be archived or streamed.
12631228
* We wait for stop_lsn in stream mode just in case.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp