3232RECOVERY_CONF_FILE , \
3333PG_LOG_FILE , \
3434UTILS_LOG_FILE , \
35- PG_PID_FILE
35+ PG_PID_FILE , \
36+ REPLICATION_SLOTS
3637
3738from .decorators import \
3839method_decorator , \
@@ -277,7 +278,7 @@ def _assign_master(self, master):
277278# now this node has a master
278279self ._master = master
279280
280- def _create_recovery_conf (self ,username ):
281+ def _create_recovery_conf (self ,username , slot_name = None ):
281282"""NOTE: this is a private method!"""
282283
283284# fetch master of this node
@@ -305,6 +306,9 @@ def _create_recovery_conf(self, username):
305306"standby_mode=on\n "
306307 ).format (conninfo )
307308
309+ if slot_name :
310+ line += "primary_slot_name={}\n " .format (slot_name )
311+
308312self .append_conf (RECOVERY_CONF_FILE ,line )
309313
310314def _maybe_start_logger (self ):
@@ -348,6 +352,28 @@ def _collect_special_files(self):
348352
349353return result
350354
355+ def _create_replication_slot (self ,slot_name ,dbname = None ,username = None ):
356+ """
357+ Create a physical replication slot.
358+
359+ Args:
360+ slot_name: slot name
361+ dbname: database name
362+ username: database user name
363+ """
364+ rs = self .execute ("select exists (select * from pg_replication_slots "
365+ "where slot_name = '{}')" .format (slot_name ),
366+ dbname = dbname ,username = username )
367+
368+ if rs [0 ][0 ]:
369+ raise TestgresException ("Slot '{}' already exists" .format (slot_name ))
370+
371+ query = (
372+ "select pg_create_physical_replication_slot('{}')"
373+ ).format (slot_name )
374+
375+ self .execute (query = query ,dbname = dbname ,username = username )
376+
351377def init (self ,initdb_params = None ,** kwargs ):
352378"""
353379 Perform initdb for this node.
@@ -458,8 +484,10 @@ def get_auth_method(t):
458484wal_keep_segments = 20 # for convenience
459485conf .write (u"hot_standby = on\n "
460486u"max_wal_senders = {}\n "
487+ u"max_replication_slots = {}\n "
461488u"wal_keep_segments = {}\n "
462489u"wal_level = {}\n " .format (max_wal_senders ,
490+ REPLICATION_SLOTS ,
463491wal_keep_segments ,
464492wal_level ))
465493
@@ -941,7 +969,7 @@ def backup(self, **kwargs):
941969
942970return NodeBackup (node = self ,** kwargs )
943971
944- def replicate (self ,name = None ,** kwargs ):
972+ def replicate (self ,name = None ,slot_name = None , ** kwargs ):
945973"""
946974 Create a binary replica of this node.
947975
@@ -952,10 +980,15 @@ def replicate(self, name=None, **kwargs):
952980 base_dir: the base directory for data files and logs
953981 """
954982
983+ if slot_name :
984+ self ._create_replication_slot (slot_name ,** kwargs )
985+
955986backup = self .backup (** kwargs )
956987
957988# transform backup into a replica
958- return backup .spawn_replica (name = name ,destroy = True )
989+ return backup .spawn_replica (name = name ,
990+ destroy = True ,
991+ slot_name = slot_name )
959992
960993def catchup (self ,dbname = None ,username = None ):
961994"""