2323#include "pgut/pgut-port.h"
2424#include "storage/bufpage.h"
2525#include "datapagemap.h"
26+ #include "streamutil.h"
27+ #include "receivelog.h"
2628
2729/* wait 10 sec until WAL archive complete */
28- #define TIMEOUT_ARCHIVE 10
30+ #define TIMEOUT_ARCHIVE 10
2931
3032/* Server version */
3133static int server_version = 0 ;
3234
33- static bool in_backup = false;/* TODO: more robust logic */
35+ static bool in_backup = false;/* TODO: more robust logic */
36+ static int standby_message_timeout = 10 * 1000 ;/* 10 sec = default */
37+ static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr ;
38+ const char * progname = "pg_arman" ;
3439
3540/* list of files contained in backup */
3641parray * backup_files_list ;
@@ -71,6 +76,15 @@ static void create_file_list(parray *files,
7176bool is_append );
7277static void wait_for_archive (pgBackup * backup ,const char * sql );
7378static void make_pagemap_from_ptrack (parray * files );
79+ static void StreamLog (void * arg );
80+
81+
82+ #define disconnect_and_exit (code )\
83+ {\
84+ if (conn != NULL) PQfinish(conn);\
85+ exit(code);\
86+ }
87+
7488
7589/*
7690 * Take a backup of database and return the list of files backed up.
@@ -82,12 +96,14 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
8296parray * prev_files = NULL ;/* file list of previous database backup */
8397FILE * fp ;
8498char path [MAXPGPATH ];
99+ char dst_backup_path [MAXPGPATH ];
85100char label [1024 ];
86101XLogRecPtr * lsn = NULL ;
87102char prev_file_txt [MAXPGPATH ];/* path of the previous backup
88103 * list file */
89104bool has_backup_label = true;/* flag if backup_label is there */
90105pthread_t backup_threads [num_threads ];
106+ pthread_t stream_thread ;
91107backup_files_args * backup_threads_args [num_threads ];
92108
93109/* repack the options */
@@ -129,9 +145,19 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
129145"or validate existing one." );
130146}
131147
148+ /* clear ptrack files for FULL and DIFF backup */
132149if (current .backup_mode != BACKUP_MODE_DIFF_PTRACK )
133150pg_ptrack_clear ();
134151
152+ /* start stream replication */
153+ if (stream_wal )
154+ {
155+ pgBackupGetPath (& current ,path ,lengthof (path ),DATABASE_DIR );
156+ join_path_components (dst_backup_path ,path ,"pg_xlog" );
157+ dir_create_dir (dst_backup_path ,DIR_PERMISSION );
158+ pthread_create (& stream_thread ,NULL , (void * (* )(void * ))StreamLog ,dst_backup_path );
159+ }
160+
135161/* notify start of backup to PostgreSQL server */
136162time2iso (label ,lengthof (label ),current .start_time );
137163strncat (label ," with pg_arman" ,lengthof (label ));
@@ -322,6 +348,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
322348/* Notify end of backup */
323349pg_stop_backup (& current );
324350
351+ if (stream_wal )
352+ {
353+ parray * list_file ;
354+ char pg_xlog_path [MAXPGPATH ];
355+
356+ /* We expect the completion of stream */
357+ pthread_join (stream_thread ,NULL );
358+
359+ /* Scan backup pg_xlog dir */
360+ list_file = parray_new ();
361+ join_path_components (pg_xlog_path ,path ,"pg_xlog" );
362+ dir_list_file (list_file ,pg_xlog_path ,NULL , true, false);
363+
364+ /* Remove file path root prefix and calc meta */
365+ for (i = 0 ;i < parray_num (list_file );i ++ )
366+ {
367+ pgFile * file = (pgFile * )parray_get (list_file ,i );
368+
369+ calc_file (file );
370+ if (strstr (file -> path ,path )== file -> path )
371+ {
372+ char * ptr = file -> path ;
373+ file -> path = pstrdup (JoinPathEnd (ptr ,path ));
374+ free (ptr );
375+ }
376+ }
377+ parray_concat (backup_files_list ,list_file );
378+ }
379+
325380/* Create file list */
326381create_file_list (backup_files_list ,pgdata ,DATABASE_FILE_LIST ,NULL , false);
327382
@@ -549,31 +604,31 @@ static void
549604pg_ptrack_clear (void )
550605{
551606PGresult * res_db ,* res ;
552- const char * old_dbname = dbname ;
607+ const char * old_dbname = pgut_dbname ;
553608int i ;
554609
555610reconnect ();
556611res_db = execute ("SELECT datname FROM pg_database" ,0 ,NULL );
557612disconnect ();
558613for (i = 0 ;i < PQntuples (res_db );i ++ )
559614{
560- dbname = PQgetvalue (res_db ,i ,0 );
561- if (!strcmp (dbname ,"template0" ))
615+ pgut_dbname = PQgetvalue (res_db ,i ,0 );
616+ if (!strcmp (pgut_dbname ,"template0" ))
562617continue ;
563618reconnect ();
564619res = execute ("SELECT pg_ptrack_clear()" ,0 ,NULL );
565620PQclear (res );
566621}
567622PQclear (res_db );
568623disconnect ();
569- dbname = old_dbname ;
624+ pgut_dbname = old_dbname ;
570625}
571626
572627static char *
573628pg_ptrack_get_and_clear (Oid tablespace_oid ,Oid db_oid ,Oid rel_oid ,size_t * result_size )
574629{
575630PGresult * res_db ,* res ;
576- const char * old_dbname = dbname ;
631+ const char * old_dbname = pgut_dbname ;
577632char * params [2 ];
578633char * result ;
579634
@@ -584,7 +639,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
584639sprintf (params [1 ],"%i" ,rel_oid );
585640res_db = execute ("SELECT datname FROM pg_database WHERE oid=$1" ,1 , (const char * * )params );
586641disconnect ();
587- dbname = pstrdup (PQgetvalue (res_db ,0 ,0 ));
642+ pgut_dbname = pstrdup (PQgetvalue (res_db ,0 ,0 ));
588643PQclear (res_db );
589644
590645reconnect ();
@@ -595,8 +650,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
595650pfree (params [0 ]);
596651pfree (params [1 ]);
597652
598- pfree ((char * )dbname );
599- dbname = old_dbname ;
653+ pfree ((char * )pgut_dbname );
654+ pgut_dbname = old_dbname ;
600655
601656return result ;
602657}
@@ -683,7 +738,52 @@ wait_for_archive(pgBackup *backup, const char *sql)
683738static void
684739pg_stop_backup (pgBackup * backup )
685740{
686- wait_for_archive (backup ,
741+ if (stream_wal )
742+ {
743+ PGresult * res ;
744+ TimeLineID tli ;
745+
746+ reconnect ();
747+
748+ /* Remove annoying NOTICE messages generated by backend */
749+ res = execute ("SET client_min_messages = warning;" ,0 ,NULL );
750+ PQclear (res );
751+
752+ /* And execute the query wanted */
753+ res = execute ("SELECT * FROM pg_stop_backup()" ,0 ,NULL );
754+
755+ /* Get LSN from execution result */
756+ get_lsn (res ,& stop_backup_lsn );
757+ PQclear (res );
758+
759+ /*
760+ * Enforce TLI obtention if backup is not present as this code
761+ * path can be taken as a callback at exit.
762+ */
763+ tli = get_current_timeline (false);
764+
765+ /* Fill in fields if backup exists */
766+ if (backup != NULL )
767+ {
768+ backup -> tli = tli ;
769+ backup -> stop_lsn = stop_backup_lsn ;
770+ elog (LOG ,"%s(): tli=%X lsn=%X/%08X" ,
771+ __FUNCTION__ ,backup -> tli ,
772+ (uint32 ) (backup -> stop_lsn >>32 ),
773+ (uint32 )backup -> stop_lsn );
774+ }
775+
776+ res = execute (TXID_CURRENT_SQL ,0 ,NULL );
777+ if (backup != NULL )
778+ {
779+ get_xid (res ,& backup -> recovery_xid );
780+ backup -> recovery_time = time (NULL );
781+ }
782+ PQclear (res );
783+ disconnect ();
784+ }
785+ else
786+ wait_for_archive (backup ,
687787"SELECT * FROM pg_stop_backup()" );
688788}
689789
@@ -719,8 +819,8 @@ get_lsn(PGresult *res, XLogRecPtr *lsn)
719819 * Extract timeline and LSN from results of pg_stop_backup()
720820 * and friends.
721821 */
722- XLogDataFromLSN (PQgetvalue (res ,0 ,0 ),& xlogid ,& xrecoff );
723822
823+ XLogDataFromLSN (PQgetvalue (res ,0 ,0 ),& xlogid ,& xrecoff );
724824/* Calculate LSN */
725825* lsn = (XLogRecPtr ) ((uint64 )xlogid <<32 ) |xrecoff ;
726826}
@@ -1137,3 +1237,98 @@ void make_pagemap_from_ptrack(parray *files)
11371237}
11381238}
11391239}
1240+
1241+
1242+ static bool
1243+ stop_streaming (XLogRecPtr xlogpos ,uint32 timeline ,bool segment_finished )
1244+ {
1245+ static uint32 prevtimeline = 0 ;
1246+ static XLogRecPtr prevpos = InvalidXLogRecPtr ;
1247+
1248+ /* we assume that we get called once at the end of each segment */
1249+ if (verbose && segment_finished )
1250+ fprintf (stderr ,_ ("%s: finished segment at %X/%X (timeline %u)\n" ),
1251+ progname , (uint32 ) (xlogpos >>32 ), (uint32 )xlogpos ,
1252+ timeline );
1253+
1254+ /*
1255+ * Note that we report the previous, not current, position here. After a
1256+ * timeline switch, xlogpos points to the beginning of the segment because
1257+ * that's where we always begin streaming. Reporting the end of previous
1258+ * timeline isn't totally accurate, because the next timeline can begin
1259+ * slightly before the end of the WAL that we received on the previous
1260+ * timeline, but it's close enough for reporting purposes.
1261+ */
1262+ if (prevtimeline != 0 && prevtimeline != timeline )
1263+ fprintf (stderr ,_ ("%s: switched to timeline %u at %X/%X\n" ),
1264+ progname ,timeline ,
1265+ (uint32 ) (prevpos >>32 ), (uint32 )prevpos );
1266+
1267+ if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn )
1268+ return true;
1269+
1270+ prevtimeline = timeline ;
1271+ prevpos = xlogpos ;
1272+
1273+ return false;
1274+ }
1275+
1276+ /*
1277+ * Start the log streaming
1278+ */
1279+ static void
1280+ StreamLog (void * arg )
1281+ {
1282+ XLogRecPtr startpos ;
1283+ TimeLineID starttli ;
1284+ char * basedir = (char * )arg ;
1285+
1286+ /*
1287+ * Connect in replication mode to the server
1288+ */
1289+ if (conn == NULL )
1290+ conn = GetConnection ();
1291+ if (!conn )
1292+ /* Error message already written in GetConnection() */
1293+ return ;
1294+
1295+ if (!CheckServerVersionForStreaming (conn ))
1296+ {
1297+ /*
1298+ * Error message already written in CheckServerVersionForStreaming().
1299+ * There's no hope of recovering from a version mismatch, so don't
1300+ * retry.
1301+ */
1302+ disconnect_and_exit (1 );
1303+ }
1304+
1305+ /*
1306+ * Identify server, obtaining start LSN position and current timeline ID
1307+ * at the same time, necessary if not valid data can be found in the
1308+ * existing output directory.
1309+ */
1310+ if (!RunIdentifySystem (conn ,NULL ,& starttli ,& startpos ,NULL ))
1311+ disconnect_and_exit (1 );
1312+
1313+
1314+ /*
1315+ * Always start streaming at the beginning of a segment
1316+ */
1317+ startpos -= startpos %XLOG_SEG_SIZE ;
1318+
1319+ /*
1320+ * Start the replication
1321+ */
1322+ if (verbose )
1323+ fprintf (stderr ,
1324+ _ ("%s: starting log streaming at %X/%X (timeline %u)\n" ),
1325+ progname , (uint32 ) (startpos >>32 ), (uint32 )startpos ,
1326+ starttli );
1327+
1328+ ReceiveXlogStream (conn ,startpos ,starttli ,NULL ,basedir ,
1329+ stop_streaming ,standby_message_timeout ,".partial" ,
1330+ false, false);
1331+
1332+ PQfinish (conn );
1333+ conn = NULL ;
1334+ }