23
23
#include "pgut/pgut-port.h"
24
24
#include "storage/bufpage.h"
25
25
#include "datapagemap.h"
26
+ #include "streamutil.h"
27
+ #include "receivelog.h"
26
28
27
29
/* wait 10 sec until WAL archive complete */
28
- #define TIMEOUT_ARCHIVE 10
30
+ #define TIMEOUT_ARCHIVE 10
29
31
30
32
/* Server version */
31
33
static int server_version = 0 ;
32
34
33
- static bool in_backup = false;/* TODO: more robust logic */
35
+ static bool in_backup = false;/* TODO: more robust logic */
36
+ static int standby_message_timeout = 10 * 1000 ;/* 10 sec = default */
37
+ static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr ;
38
+ const char * progname = "pg_arman" ;
34
39
35
40
/* list of files contained in backup */
36
41
parray * backup_files_list ;
@@ -71,6 +76,15 @@ static void create_file_list(parray *files,
71
76
bool is_append );
72
77
static void wait_for_archive (pgBackup * backup ,const char * sql );
73
78
static void make_pagemap_from_ptrack (parray * files );
79
+ static void StreamLog (void * arg );
80
+
81
+
82
+ #define disconnect_and_exit (code )\
83
+ {\
84
+ if (conn != NULL) PQfinish(conn);\
85
+ exit(code);\
86
+ }
87
+
74
88
75
89
/*
76
90
* Take a backup of database and return the list of files backed up.
@@ -82,12 +96,14 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
82
96
parray * prev_files = NULL ;/* file list of previous database backup */
83
97
FILE * fp ;
84
98
char path [MAXPGPATH ];
99
+ char dst_backup_path [MAXPGPATH ];
85
100
char label [1024 ];
86
101
XLogRecPtr * lsn = NULL ;
87
102
char prev_file_txt [MAXPGPATH ];/* path of the previous backup
88
103
* list file */
89
104
bool has_backup_label = true;/* flag if backup_label is there */
90
105
pthread_t backup_threads [num_threads ];
106
+ pthread_t stream_thread ;
91
107
backup_files_args * backup_threads_args [num_threads ];
92
108
93
109
/* repack the options */
@@ -129,9 +145,19 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
129
145
"or validate existing one." );
130
146
}
131
147
148
+ /* clear ptrack files for FULL and DIFF backup */
132
149
if (current .backup_mode != BACKUP_MODE_DIFF_PTRACK )
133
150
pg_ptrack_clear ();
134
151
152
+ /* start stream replication */
153
+ if (stream_wal )
154
+ {
155
+ pgBackupGetPath (& current ,path ,lengthof (path ),DATABASE_DIR );
156
+ join_path_components (dst_backup_path ,path ,"pg_xlog" );
157
+ dir_create_dir (dst_backup_path ,DIR_PERMISSION );
158
+ pthread_create (& stream_thread ,NULL , (void * (* )(void * ))StreamLog ,dst_backup_path );
159
+ }
160
+
135
161
/* notify start of backup to PostgreSQL server */
136
162
time2iso (label ,lengthof (label ),current .start_time );
137
163
strncat (label ," with pg_arman" ,lengthof (label ));
@@ -322,6 +348,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
322
348
/* Notify end of backup */
323
349
pg_stop_backup (& current );
324
350
351
+ if (stream_wal )
352
+ {
353
+ parray * list_file ;
354
+ char pg_xlog_path [MAXPGPATH ];
355
+
356
+ /* We expect the completion of stream */
357
+ pthread_join (stream_thread ,NULL );
358
+
359
+ /* Scan backup pg_xlog dir */
360
+ list_file = parray_new ();
361
+ join_path_components (pg_xlog_path ,path ,"pg_xlog" );
362
+ dir_list_file (list_file ,pg_xlog_path ,NULL , true, false);
363
+
364
+ /* Remove file path root prefix and calc meta */
365
+ for (i = 0 ;i < parray_num (list_file );i ++ )
366
+ {
367
+ pgFile * file = (pgFile * )parray_get (list_file ,i );
368
+
369
+ calc_file (file );
370
+ if (strstr (file -> path ,path )== file -> path )
371
+ {
372
+ char * ptr = file -> path ;
373
+ file -> path = pstrdup (JoinPathEnd (ptr ,path ));
374
+ free (ptr );
375
+ }
376
+ }
377
+ parray_concat (backup_files_list ,list_file );
378
+ }
379
+
325
380
/* Create file list */
326
381
create_file_list (backup_files_list ,pgdata ,DATABASE_FILE_LIST ,NULL , false);
327
382
@@ -549,31 +604,31 @@ static void
549
604
pg_ptrack_clear (void )
550
605
{
551
606
PGresult * res_db ,* res ;
552
- const char * old_dbname = dbname ;
607
+ const char * old_dbname = pgut_dbname ;
553
608
int i ;
554
609
555
610
reconnect ();
556
611
res_db = execute ("SELECT datname FROM pg_database" ,0 ,NULL );
557
612
disconnect ();
558
613
for (i = 0 ;i < PQntuples (res_db );i ++ )
559
614
{
560
- dbname = PQgetvalue (res_db ,i ,0 );
561
- if (!strcmp (dbname ,"template0" ))
615
+ pgut_dbname = PQgetvalue (res_db ,i ,0 );
616
+ if (!strcmp (pgut_dbname ,"template0" ))
562
617
continue ;
563
618
reconnect ();
564
619
res = execute ("SELECT pg_ptrack_clear()" ,0 ,NULL );
565
620
PQclear (res );
566
621
}
567
622
PQclear (res_db );
568
623
disconnect ();
569
- dbname = old_dbname ;
624
+ pgut_dbname = old_dbname ;
570
625
}
571
626
572
627
static char *
573
628
pg_ptrack_get_and_clear (Oid tablespace_oid ,Oid db_oid ,Oid rel_oid ,size_t * result_size )
574
629
{
575
630
PGresult * res_db ,* res ;
576
- const char * old_dbname = dbname ;
631
+ const char * old_dbname = pgut_dbname ;
577
632
char * params [2 ];
578
633
char * result ;
579
634
@@ -584,7 +639,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
584
639
sprintf (params [1 ],"%i" ,rel_oid );
585
640
res_db = execute ("SELECT datname FROM pg_database WHERE oid=$1" ,1 , (const char * * )params );
586
641
disconnect ();
587
- dbname = pstrdup (PQgetvalue (res_db ,0 ,0 ));
642
+ pgut_dbname = pstrdup (PQgetvalue (res_db ,0 ,0 ));
588
643
PQclear (res_db );
589
644
590
645
reconnect ();
@@ -595,8 +650,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
595
650
pfree (params [0 ]);
596
651
pfree (params [1 ]);
597
652
598
- pfree ((char * )dbname );
599
- dbname = old_dbname ;
653
+ pfree ((char * )pgut_dbname );
654
+ pgut_dbname = old_dbname ;
600
655
601
656
return result ;
602
657
}
@@ -683,7 +738,52 @@ wait_for_archive(pgBackup *backup, const char *sql)
683
738
static void
684
739
pg_stop_backup (pgBackup * backup )
685
740
{
686
- wait_for_archive (backup ,
741
+ if (stream_wal )
742
+ {
743
+ PGresult * res ;
744
+ TimeLineID tli ;
745
+
746
+ reconnect ();
747
+
748
+ /* Remove annoying NOTICE messages generated by backend */
749
+ res = execute ("SET client_min_messages = warning;" ,0 ,NULL );
750
+ PQclear (res );
751
+
752
+ /* And execute the query wanted */
753
+ res = execute ("SELECT * FROM pg_stop_backup()" ,0 ,NULL );
754
+
755
+ /* Get LSN from execution result */
756
+ get_lsn (res ,& stop_backup_lsn );
757
+ PQclear (res );
758
+
759
+ /*
760
+ * Enforce TLI obtention if backup is not present as this code
761
+ * path can be taken as a callback at exit.
762
+ */
763
+ tli = get_current_timeline (false);
764
+
765
+ /* Fill in fields if backup exists */
766
+ if (backup != NULL )
767
+ {
768
+ backup -> tli = tli ;
769
+ backup -> stop_lsn = stop_backup_lsn ;
770
+ elog (LOG ,"%s(): tli=%X lsn=%X/%08X" ,
771
+ __FUNCTION__ ,backup -> tli ,
772
+ (uint32 ) (backup -> stop_lsn >>32 ),
773
+ (uint32 )backup -> stop_lsn );
774
+ }
775
+
776
+ res = execute (TXID_CURRENT_SQL ,0 ,NULL );
777
+ if (backup != NULL )
778
+ {
779
+ get_xid (res ,& backup -> recovery_xid );
780
+ backup -> recovery_time = time (NULL );
781
+ }
782
+ PQclear (res );
783
+ disconnect ();
784
+ }
785
+ else
786
+ wait_for_archive (backup ,
687
787
"SELECT * FROM pg_stop_backup()" );
688
788
}
689
789
@@ -719,8 +819,8 @@ get_lsn(PGresult *res, XLogRecPtr *lsn)
719
819
* Extract timeline and LSN from results of pg_stop_backup()
720
820
* and friends.
721
821
*/
722
- XLogDataFromLSN (PQgetvalue (res ,0 ,0 ),& xlogid ,& xrecoff );
723
822
823
+ XLogDataFromLSN (PQgetvalue (res ,0 ,0 ),& xlogid ,& xrecoff );
724
824
/* Calculate LSN */
725
825
* lsn = (XLogRecPtr ) ((uint64 )xlogid <<32 ) |xrecoff ;
726
826
}
@@ -1137,3 +1237,98 @@ void make_pagemap_from_ptrack(parray *files)
1137
1237
}
1138
1238
}
1139
1239
}
1240
+
1241
+
1242
+ static bool
1243
+ stop_streaming (XLogRecPtr xlogpos ,uint32 timeline ,bool segment_finished )
1244
+ {
1245
+ static uint32 prevtimeline = 0 ;
1246
+ static XLogRecPtr prevpos = InvalidXLogRecPtr ;
1247
+
1248
+ /* we assume that we get called once at the end of each segment */
1249
+ if (verbose && segment_finished )
1250
+ fprintf (stderr ,_ ("%s: finished segment at %X/%X (timeline %u)\n" ),
1251
+ progname , (uint32 ) (xlogpos >>32 ), (uint32 )xlogpos ,
1252
+ timeline );
1253
+
1254
+ /*
1255
+ * Note that we report the previous, not current, position here. After a
1256
+ * timeline switch, xlogpos points to the beginning of the segment because
1257
+ * that's where we always begin streaming. Reporting the end of previous
1258
+ * timeline isn't totally accurate, because the next timeline can begin
1259
+ * slightly before the end of the WAL that we received on the previous
1260
+ * timeline, but it's close enough for reporting purposes.
1261
+ */
1262
+ if (prevtimeline != 0 && prevtimeline != timeline )
1263
+ fprintf (stderr ,_ ("%s: switched to timeline %u at %X/%X\n" ),
1264
+ progname ,timeline ,
1265
+ (uint32 ) (prevpos >>32 ), (uint32 )prevpos );
1266
+
1267
+ if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn )
1268
+ return true;
1269
+
1270
+ prevtimeline = timeline ;
1271
+ prevpos = xlogpos ;
1272
+
1273
+ return false;
1274
+ }
1275
+
1276
+ /*
1277
+ * Start the log streaming
1278
+ */
1279
+ static void
1280
+ StreamLog (void * arg )
1281
+ {
1282
+ XLogRecPtr startpos ;
1283
+ TimeLineID starttli ;
1284
+ char * basedir = (char * )arg ;
1285
+
1286
+ /*
1287
+ * Connect in replication mode to the server
1288
+ */
1289
+ if (conn == NULL )
1290
+ conn = GetConnection ();
1291
+ if (!conn )
1292
+ /* Error message already written in GetConnection() */
1293
+ return ;
1294
+
1295
+ if (!CheckServerVersionForStreaming (conn ))
1296
+ {
1297
+ /*
1298
+ * Error message already written in CheckServerVersionForStreaming().
1299
+ * There's no hope of recovering from a version mismatch, so don't
1300
+ * retry.
1301
+ */
1302
+ disconnect_and_exit (1 );
1303
+ }
1304
+
1305
+ /*
1306
+ * Identify server, obtaining start LSN position and current timeline ID
1307
+ * at the same time, necessary if not valid data can be found in the
1308
+ * existing output directory.
1309
+ */
1310
+ if (!RunIdentifySystem (conn ,NULL ,& starttli ,& startpos ,NULL ))
1311
+ disconnect_and_exit (1 );
1312
+
1313
+
1314
+ /*
1315
+ * Always start streaming at the beginning of a segment
1316
+ */
1317
+ startpos -= startpos %XLOG_SEG_SIZE ;
1318
+
1319
+ /*
1320
+ * Start the replication
1321
+ */
1322
+ if (verbose )
1323
+ fprintf (stderr ,
1324
+ _ ("%s: starting log streaming at %X/%X (timeline %u)\n" ),
1325
+ progname , (uint32 ) (startpos >>32 ), (uint32 )startpos ,
1326
+ starttli );
1327
+
1328
+ ReceiveXlogStream (conn ,startpos ,starttli ,NULL ,basedir ,
1329
+ stop_streaming ,standby_message_timeout ,".partial" ,
1330
+ false, false);
1331
+
1332
+ PQfinish (conn );
1333
+ conn = NULL ;
1334
+ }