Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9471875

Browse files
committed
Add stream mode for save WAL during backup process.
1 parentccd4f48 commit9471875

File tree

14 files changed

+353
-23
lines changed

14 files changed

+353
-23
lines changed

‎Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ OBJS = backup.o \
1616
datapagemap.o\
1717
parsexlog.o\
1818
xlogreader.o\
19+
streamutil.o\
20+
receivelog.o\
1921
pgut/pgut.o\
2022
pgut/pgut-port.o
2123

@@ -39,7 +41,7 @@ PG_LIBS = $(libpq_pgport) ${PTHREAD_LIBS} ${PTHREAD_CFLAGS}
3941

4042
REGRESS = init option show delete backup restore
4143

42-
all: checksrcdir docs datapagemap.h pg_arman
44+
all: checksrcdir docs datapagemap.hreceivelog.h streamutil.hpg_arman
4345

4446
# This rule's only purpose is to give the user instructions on how to pass
4547
# the path to PostgreSQL source tree to the makefile.

‎backup.c

Lines changed: 207 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,19 @@
2323
#include"pgut/pgut-port.h"
2424
#include"storage/bufpage.h"
2525
#include"datapagemap.h"
26+
#include"streamutil.h"
27+
#include"receivelog.h"
2628

2729
/* wait 10 sec until WAL archive complete */
28-
#defineTIMEOUT_ARCHIVE10
30+
#defineTIMEOUT_ARCHIVE10
2931

3032
/* Server version */
3133
staticintserver_version=0;
3234

33-
staticboolin_backup= false;/* TODO: more robust logic */
35+
staticboolin_backup= false;/* TODO: more robust logic */
36+
staticintstandby_message_timeout=10*1000;/* 10 sec = default */
37+
staticXLogRecPtrstop_backup_lsn=InvalidXLogRecPtr;
38+
constchar*progname="pg_arman";
3439

3540
/* list of files contained in backup */
3641
parray*backup_files_list;
@@ -71,6 +76,15 @@ static void create_file_list(parray *files,
7176
boolis_append);
7277
staticvoidwait_for_archive(pgBackup*backup,constchar*sql);
7378
staticvoidmake_pagemap_from_ptrack(parray*files);
79+
staticvoidStreamLog(void*arg);
80+
81+
82+
#definedisconnect_and_exit(code)\
83+
{\
84+
if (conn != NULL) PQfinish(conn);\
85+
exit(code);\
86+
}
87+
7488

7589
/*
7690
* Take a backup of database and return the list of files backed up.
@@ -82,12 +96,14 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
8296
parray*prev_files=NULL;/* file list of previous database backup */
8397
FILE*fp;
8498
charpath[MAXPGPATH];
99+
chardst_backup_path[MAXPGPATH];
85100
charlabel[1024];
86101
XLogRecPtr*lsn=NULL;
87102
charprev_file_txt[MAXPGPATH];/* path of the previous backup
88103
* list file */
89104
boolhas_backup_label= true;/* flag if backup_label is there */
90105
pthread_tbackup_threads[num_threads];
106+
pthread_tstream_thread;
91107
backup_files_args*backup_threads_args[num_threads];
92108

93109
/* repack the options */
@@ -129,9 +145,19 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
129145
"or validate existing one.");
130146
}
131147

148+
/* clear ptrack files for FULL and DIFF backup */
132149
if (current.backup_mode!=BACKUP_MODE_DIFF_PTRACK)
133150
pg_ptrack_clear();
134151

152+
/* start stream replication */
153+
if (stream_wal)
154+
{
155+
pgBackupGetPath(&current,path,lengthof(path),DATABASE_DIR);
156+
join_path_components(dst_backup_path,path,"pg_xlog");
157+
dir_create_dir(dst_backup_path,DIR_PERMISSION);
158+
pthread_create(&stream_thread,NULL, (void*(*)(void*))StreamLog,dst_backup_path);
159+
}
160+
135161
/* notify start of backup to PostgreSQL server */
136162
time2iso(label,lengthof(label),current.start_time);
137163
strncat(label," with pg_arman",lengthof(label));
@@ -322,6 +348,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
322348
/* Notify end of backup */
323349
pg_stop_backup(&current);
324350

351+
if (stream_wal)
352+
{
353+
parray*list_file;
354+
charpg_xlog_path[MAXPGPATH];
355+
356+
/* We expect the completion of stream */
357+
pthread_join(stream_thread,NULL);
358+
359+
/* Scan backup pg_xlog dir */
360+
list_file=parray_new();
361+
join_path_components(pg_xlog_path,path,"pg_xlog");
362+
dir_list_file(list_file,pg_xlog_path,NULL, true, false);
363+
364+
/* Remove file path root prefix and calc meta */
365+
for (i=0;i<parray_num(list_file);i++)
366+
{
367+
pgFile*file= (pgFile*)parray_get(list_file,i);
368+
369+
calc_file(file);
370+
if (strstr(file->path,path)==file->path)
371+
{
372+
char*ptr=file->path;
373+
file->path=pstrdup(JoinPathEnd(ptr,path));
374+
free(ptr);
375+
}
376+
}
377+
parray_concat(backup_files_list,list_file);
378+
}
379+
325380
/* Create file list */
326381
create_file_list(backup_files_list,pgdata,DATABASE_FILE_LIST,NULL, false);
327382

@@ -549,31 +604,31 @@ static void
549604
pg_ptrack_clear(void)
550605
{
551606
PGresult*res_db,*res;
552-
constchar*old_dbname=dbname;
607+
constchar*old_dbname=pgut_dbname;
553608
inti;
554609

555610
reconnect();
556611
res_db=execute("SELECT datname FROM pg_database",0,NULL);
557612
disconnect();
558613
for(i=0;i<PQntuples(res_db);i++)
559614
{
560-
dbname=PQgetvalue(res_db,i,0);
561-
if (!strcmp(dbname,"template0"))
615+
pgut_dbname=PQgetvalue(res_db,i,0);
616+
if (!strcmp(pgut_dbname,"template0"))
562617
continue;
563618
reconnect();
564619
res=execute("SELECT pg_ptrack_clear()",0,NULL);
565620
PQclear(res);
566621
}
567622
PQclear(res_db);
568623
disconnect();
569-
dbname=old_dbname;
624+
pgut_dbname=old_dbname;
570625
}
571626

572627
staticchar*
573628
pg_ptrack_get_and_clear(Oidtablespace_oid,Oiddb_oid,Oidrel_oid,size_t*result_size)
574629
{
575630
PGresult*res_db,*res;
576-
constchar*old_dbname=dbname;
631+
constchar*old_dbname=pgut_dbname;
577632
char*params[2];
578633
char*result;
579634

@@ -584,7 +639,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
584639
sprintf(params[1],"%i",rel_oid);
585640
res_db=execute("SELECT datname FROM pg_database WHERE oid=$1",1, (constchar**)params);
586641
disconnect();
587-
dbname=pstrdup(PQgetvalue(res_db,0,0));
642+
pgut_dbname=pstrdup(PQgetvalue(res_db,0,0));
588643
PQclear(res_db);
589644

590645
reconnect();
@@ -595,8 +650,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
595650
pfree(params[0]);
596651
pfree(params[1]);
597652

598-
pfree((char*)dbname);
599-
dbname=old_dbname;
653+
pfree((char*)pgut_dbname);
654+
pgut_dbname=old_dbname;
600655

601656
returnresult;
602657
}
@@ -683,7 +738,52 @@ wait_for_archive(pgBackup *backup, const char *sql)
683738
staticvoid
684739
pg_stop_backup(pgBackup*backup)
685740
{
686-
wait_for_archive(backup,
741+
if (stream_wal)
742+
{
743+
PGresult*res;
744+
TimeLineIDtli;
745+
746+
reconnect();
747+
748+
/* Remove annoying NOTICE messages generated by backend */
749+
res=execute("SET client_min_messages = warning;",0,NULL);
750+
PQclear(res);
751+
752+
/* And execute the query wanted */
753+
res=execute("SELECT * FROM pg_stop_backup()",0,NULL);
754+
755+
/* Get LSN from execution result */
756+
get_lsn(res,&stop_backup_lsn);
757+
PQclear(res);
758+
759+
/*
760+
* Enforce TLI obtention if backup is not present as this code
761+
* path can be taken as a callback at exit.
762+
*/
763+
tli=get_current_timeline(false);
764+
765+
/* Fill in fields if backup exists */
766+
if (backup!=NULL)
767+
{
768+
backup->tli=tli;
769+
backup->stop_lsn=stop_backup_lsn;
770+
elog(LOG,"%s(): tli=%X lsn=%X/%08X",
771+
__FUNCTION__,backup->tli,
772+
(uint32) (backup->stop_lsn >>32),
773+
(uint32)backup->stop_lsn);
774+
}
775+
776+
res=execute(TXID_CURRENT_SQL,0,NULL);
777+
if (backup!=NULL)
778+
{
779+
get_xid(res,&backup->recovery_xid);
780+
backup->recovery_time=time(NULL);
781+
}
782+
PQclear(res);
783+
disconnect();
784+
}
785+
else
786+
wait_for_archive(backup,
687787
"SELECT * FROM pg_stop_backup()");
688788
}
689789

@@ -719,8 +819,8 @@ get_lsn(PGresult *res, XLogRecPtr *lsn)
719819
* Extract timeline and LSN from results of pg_stop_backup()
720820
* and friends.
721821
*/
722-
XLogDataFromLSN(PQgetvalue(res,0,0),&xlogid,&xrecoff);
723822

823+
XLogDataFromLSN(PQgetvalue(res,0,0),&xlogid,&xrecoff);
724824
/* Calculate LSN */
725825
*lsn= (XLogRecPtr) ((uint64)xlogid <<32) |xrecoff;
726826
}
@@ -1137,3 +1237,98 @@ void make_pagemap_from_ptrack(parray *files)
11371237
}
11381238
}
11391239
}
1240+
1241+
1242+
staticbool
1243+
stop_streaming(XLogRecPtrxlogpos,uint32timeline,boolsegment_finished)
1244+
{
1245+
staticuint32prevtimeline=0;
1246+
staticXLogRecPtrprevpos=InvalidXLogRecPtr;
1247+
1248+
/* we assume that we get called once at the end of each segment */
1249+
if (verbose&&segment_finished)
1250+
fprintf(stderr,_("%s: finished segment at %X/%X (timeline %u)\n"),
1251+
progname, (uint32) (xlogpos >>32), (uint32)xlogpos,
1252+
timeline);
1253+
1254+
/*
1255+
* Note that we report the previous, not current, position here. After a
1256+
* timeline switch, xlogpos points to the beginning of the segment because
1257+
* that's where we always begin streaming. Reporting the end of previous
1258+
* timeline isn't totally accurate, because the next timeline can begin
1259+
* slightly before the end of the WAL that we received on the previous
1260+
* timeline, but it's close enough for reporting purposes.
1261+
*/
1262+
if (prevtimeline!=0&&prevtimeline!=timeline)
1263+
fprintf(stderr,_("%s: switched to timeline %u at %X/%X\n"),
1264+
progname,timeline,
1265+
(uint32) (prevpos >>32), (uint32)prevpos);
1266+
1267+
if (stop_backup_lsn!=InvalidXLogRecPtr&&xlogpos>stop_backup_lsn)
1268+
return true;
1269+
1270+
prevtimeline=timeline;
1271+
prevpos=xlogpos;
1272+
1273+
return false;
1274+
}
1275+
1276+
/*
1277+
* Start the log streaming
1278+
*/
1279+
staticvoid
1280+
StreamLog(void*arg)
1281+
{
1282+
XLogRecPtrstartpos;
1283+
TimeLineIDstarttli;
1284+
char*basedir= (char*)arg;
1285+
1286+
/*
1287+
* Connect in replication mode to the server
1288+
*/
1289+
if (conn==NULL)
1290+
conn=GetConnection();
1291+
if (!conn)
1292+
/* Error message already written in GetConnection() */
1293+
return;
1294+
1295+
if (!CheckServerVersionForStreaming(conn))
1296+
{
1297+
/*
1298+
* Error message already written in CheckServerVersionForStreaming().
1299+
* There's no hope of recovering from a version mismatch, so don't
1300+
* retry.
1301+
*/
1302+
disconnect_and_exit(1);
1303+
}
1304+
1305+
/*
1306+
* Identify server, obtaining start LSN position and current timeline ID
1307+
* at the same time, necessary if not valid data can be found in the
1308+
* existing output directory.
1309+
*/
1310+
if (!RunIdentifySystem(conn,NULL,&starttli,&startpos,NULL))
1311+
disconnect_and_exit(1);
1312+
1313+
1314+
/*
1315+
* Always start streaming at the beginning of a segment
1316+
*/
1317+
startpos-=startpos %XLOG_SEG_SIZE;
1318+
1319+
/*
1320+
* Start the replication
1321+
*/
1322+
if (verbose)
1323+
fprintf(stderr,
1324+
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
1325+
progname, (uint32) (startpos >>32), (uint32)startpos,
1326+
starttli);
1327+
1328+
ReceiveXlogStream(conn,startpos,starttli,NULL,basedir,
1329+
stop_streaming,standby_message_timeout,".partial",
1330+
false, false);
1331+
1332+
PQfinish(conn);
1333+
conn=NULL;
1334+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp