@@ -109,6 +109,10 @@ class BackupException(Exception):
109
109
pass
110
110
111
111
112
+ class CatchUpException (Exception ):
113
+ pass
114
+
115
+
112
116
class TestgresLogger (threading .Thread ):
113
117
"""
114
118
Helper class to implement reading from postgresql.log
@@ -318,12 +322,14 @@ def spawn_primary(self, name, destroy=True):
318
322
# Copy backup to new data dir
319
323
shutil .copytree (data1 ,data2 )
320
324
except Exception as e :
321
- raise BackupException (e . message )
325
+ raise BackupException (str ( e ) )
322
326
else :
323
327
base_dir = self .base_dir
324
328
325
329
# build a new PostgresNode
326
- node = get_new_node (name ,base_dir )
330
+ node = PostgresNode (name = name ,
331
+ base_dir = base_dir ,
332
+ master = self .original_node )
327
333
node .append_conf ("postgresql.conf" ,"port = {}" .format (node .port ))
328
334
329
335
# record new status
@@ -803,13 +809,21 @@ def restore(self, dbname, filename, username=None):
803
809
804
810
self .psql (dbname = dbname ,filename = filename ,username = username )
805
811
806
- def poll_query_until (self ,dbname ,query ,username = None ,max_attempts = 60 ,sleep_time = 1 ):
812
+ def poll_query_until (self ,
813
+ dbname ,
814
+ query ,
815
+ username = None ,
816
+ max_attempts = 60 ,
817
+ sleep_time = 1 ):
807
818
"""
808
819
Run a query once a second until it returs True.
809
820
810
821
Args:
811
822
dbname: database name to connect to (str).
812
823
query: query to be executed (str).
824
+ username: database user name (str).
825
+ max_attempts: how many times should we try?
826
+ sleep_time: how long should we sleep after a failure?
813
827
"""
814
828
815
829
attemps = 0
@@ -844,7 +858,7 @@ def execute(self, dbname, query, username=None, commit=False):
844
858
dbname: database name to connect to (str).
845
859
query: query to be executed (str).
846
860
username: database user name (str).
847
- commit: should we commit this query?.
861
+ commit: should we commit this query?
848
862
849
863
Returns:
850
864
A list of tuples representing rows.
@@ -885,6 +899,32 @@ def replicate(self, name, username=None, xlog_method=DEFAULT_XLOG_METHOD):
885
899
return self .backup (username = username ,
886
900
xlog_method = xlog_method ).spawn_replica (name )
887
901
902
+ def catchup (self ):
903
+ """
904
+ Wait until async replica catches up with its master.
905
+ """
906
+
907
+ master = self .master
908
+
909
+ cur_ver = LooseVersion (get_pg_config ()["VERSION_NUM" ])
910
+ min_ver = LooseVersion ('10.0' )
911
+
912
+ if cur_ver >= min_ver :
913
+ poll_lsn = "select pg_current_wal_lsn()::text"
914
+ wait_lsn = "select pg_last_wal_replay_lsn() >= '{}'::pg_lsn"
915
+ else :
916
+ poll_lsn = "select pg_current_xlog_location()::text"
917
+ wait_lsn = "select pg_last_xlog_replay_location() >= '{}'::pg_lsn"
918
+
919
+ if not master :
920
+ raise CatchUpException ("Master node is not specified" )
921
+
922
+ try :
923
+ lsn = master .execute ('postgres' ,poll_lsn )[0 ][0 ]
924
+ self .poll_query_until ('postgres' ,wait_lsn .format (lsn ))
925
+ except Exception as e :
926
+ raise CatchUpException (str (e ))
927
+
888
928
def pgbench_init (self ,dbname = 'postgres' ,scale = 1 ,options = []):
889
929
"""
890
930
Prepare database for pgbench (create tables etc).
@@ -963,7 +1003,7 @@ def call_initdb(_data_dir):
963
1003
_params = [_data_dir ,"-N" ]+ initdb_params
964
1004
_execute_utility ("initdb" ,_params ,initdb_logfile )
965
1005
except Exception as e :
966
- raise InitNodeException (e . message )
1006
+ raise InitNodeException (str ( e ) )
967
1007
968
1008
# Call initdb if we have custom params
969
1009
if initdb_params :
@@ -981,7 +1021,7 @@ def call_initdb(_data_dir):
981
1021
# Copy cached initdb to current data dir
982
1022
shutil .copytree (cached_data_dir ,data_dir )
983
1023
except Exception as e :
984
- raise InitNodeException (e . message )
1024
+ raise InitNodeException (str ( e ) )
985
1025
986
1026
987
1027
def _execute_utility (util ,args ,logfile ,write_to_pipe = True ):