@@ -50,8 +50,9 @@ static pthread_t stream_thread;
5050
5151static int is_ptrack_enable = false;
5252
53- /* Backupconnection */
53+ /* Backupconnections */
5454static PGconn * backup_conn = NULL ;
55+ static PGconn * master_conn = NULL ;
5556
5657/* PostgreSQL server version from "backup_conn" */
5758static int server_version = 0 ;
@@ -434,6 +435,17 @@ do_backup(void)
434435if (!current .stream && !pg_archive_enabled ())
435436elog (ERROR ,"Archiving must be enabled for archive backup" );
436437
438+ if (from_replica )
439+ {
440+ /* Check master connection options */
441+ if (master_host == NULL )
442+ elog (ERROR ,"Options for connection to master must be provided to perform backup from replica" );
443+
444+ /* Create connection to master server */
445+ master_conn = pgut_connect_extended (master_host ,master_port ,
446+ master_db ,master_user ,password );
447+ }
448+
437449/* Get exclusive lock of backup catalog */
438450catalog_lock ();
439451
@@ -896,6 +908,7 @@ pg_stop_backup(pgBackup *backup)
896908PGresult * res ;
897909uint32 xlogid ;
898910uint32 xrecoff ;
911+ XLogRecPtr restore_lsn ;
899912
900913/*
901914 * We will use this values if there are no transactions between start_lsn
@@ -912,6 +925,80 @@ pg_stop_backup(pgBackup *backup)
9129250 ,NULL );
913926PQclear (res );
914927
928+ /* Create restore point */
929+ if (backup != NULL )
930+ {
931+ const char * params [1 ];
932+ char name [1024 ];
933+ char * backup_id ;
934+
935+ backup_id = base36enc (backup -> start_time );
936+
937+ if (!from_replica )
938+ {
939+ snprintf (name ,lengthof (name ),"pg_probackup, backup_id %s" ,
940+ backup_id );
941+ params [0 ]= name ;
942+
943+ res = pgut_execute (backup_conn ,"SELECT pg_create_restore_point($1)" ,
944+ 1 ,params );
945+ PQclear (res );
946+ }
947+ else
948+ {
949+ uint32 try_count = 0 ;
950+
951+ snprintf (name ,lengthof (name ),"pg_probackup, backup_id %s. Replica Backup" ,
952+ backup_id );
953+ params [0 ]= name ;
954+
955+ res = pgut_execute (master_conn ,"SELECT pg_create_restore_point($1)" ,
956+ 1 ,params );
957+ /* Extract timeline and LSN from result */
958+ XLogDataFromLSN (PQgetvalue (res ,0 ,0 ),& xlogid ,& xrecoff );
959+ /* Calculate LSN */
960+ restore_lsn = (XLogRecPtr ) ((uint64 )xlogid <<32 ) |xrecoff ;
961+ PQclear (res );
962+
963+ /* Wait for restore_lsn from master */
964+ while (true)
965+ {
966+ XLogRecPtr min_recovery_lsn ;
967+
968+ res = pgut_execute (backup_conn ,"SELECT min_recovery_end_location from pg_control_recovery()" ,
969+ 0 ,NULL );
970+ /* Extract timeline and LSN from result */
971+ XLogDataFromLSN (PQgetvalue (res ,0 ,0 ),& xlogid ,& xrecoff );
972+ /* Calculate LSN */
973+ min_recovery_lsn = (XLogRecPtr ) ((uint64 )xlogid <<32 ) |xrecoff ;
974+ PQclear (res );
975+
976+ /* restore_lsn was streamed and applied to the replica */
977+ if (min_recovery_lsn >=restore_lsn )
978+ break ;
979+
980+ sleep (1 );
981+ if (interrupted )
982+ elog (ERROR ,"Interrupted during waiting for restore point LSN" );
983+ try_count ++ ;
984+
985+ /* Inform user if restore_lsn is absent in first attempt */
986+ if (try_count == 1 )
987+ elog (INFO ,"Wait for restore point LSN %X/%X to be streamed "
988+ "to replica" ,
989+ (uint32 ) (restore_lsn >>32 ), (uint32 )restore_lsn );
990+
991+ if (replica_timeout > 0 && try_count > replica_timeout )
992+ elog (ERROR ,"Restore point LSN %X/%X could not be "
993+ "streamed to replica in %d seconds" ,
994+ (uint32 ) (restore_lsn >>32 ), (uint32 )restore_lsn ,
995+ replica_timeout );
996+ }
997+ }
998+
999+ pfree (backup_id );
1000+ }
1001+
9151002if (!exclusive_backup )
9161003/*
9171004 * Stop the non-exclusive backup. Besides stop_lsn it returns from
@@ -937,6 +1024,15 @@ pg_stop_backup(pgBackup *backup)
9371024/* Calculate LSN */
9381025stop_backup_lsn = (XLogRecPtr ) ((uint64 )xlogid <<32 ) |xrecoff ;
9391026
1027+ if (!XRecOffIsValid (stop_backup_lsn ))
1028+ {
1029+ stop_backup_lsn = restore_lsn ;
1030+ }
1031+
1032+ if (!XRecOffIsValid (stop_backup_lsn ))
1033+ elog (ERROR ,"Invalid stop_backup_lsn value %X/%X" ,
1034+ (uint32 ) (stop_backup_lsn >>32 ), (uint32 ) (stop_backup_lsn ));
1035+
9401036/* Write backup_label and tablespace_map for backup from replica */
9411037if (!exclusive_backup )
9421038{
@@ -1019,18 +1115,19 @@ pg_stop_backup(pgBackup *backup)
10191115if (stream_wal )
10201116/* Wait for the completion of stream */
10211117pthread_join (stream_thread ,NULL );
1022- /*
1023- * Wait for stop_lsn to be archived or streamed.
1024- * We wait for stop_lsn in stream mode just in case.
1025- */
1026- wait_wal_lsn (stop_backup_lsn );
10271118
10281119/* Fill in fields if that is the correct end of backup. */
10291120if (backup != NULL )
10301121{
10311122char * xlog_path ,
10321123stream_xlog_path [MAXPGPATH ];
10331124
1125+ /*
1126+ * Wait for stop_lsn to be archived or streamed.
1127+ * We wait for stop_lsn in stream mode just in case.
1128+ */
1129+ wait_wal_lsn (stop_backup_lsn );
1130+
10341131if (stream_wal )
10351132{
10361133pgBackupGetPath2 (backup ,stream_xlog_path ,
@@ -1134,6 +1231,8 @@ static void
11341231backup_disconnect (bool fatal ,void * userdata )
11351232{
11361233pgut_disconnect (backup_conn );
1234+ if (master_conn )
1235+ pgut_disconnect (master_conn );
11371236}
11381237
11391238/* Count bytes in file */