1212After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()`
1313methods may be used to setup replication. Example:
1414
15- >>> from.api import get_new_node
15+ >>> fromtestgres import get_new_node
1616>>> with get_new_node() as nodeA, get_new_node() as nodeB:
1717... nodeA.init(allow_logical=True).start()
1818... nodeB.init().start()
4444
4545from six import raise_from
4646
47+ from .consts import LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS
4748from .defaults import default_dbname ,default_username
4849from .exceptions import CatchUpException
4950from .utils import options_string
@@ -56,11 +57,11 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
5657 constructing publication objects.
5758
5859 Args:
59- name: publication name
60- node: publisher's node
61- tables: tables list or None for all tables
62- dbname: database name used to connect and perform subscription
63- username: username used to connect to the database
60+ name: publication name.
61+ node: publisher's node.
62+ tables: tables list or None for all tables.
63+ dbname: database name used to connect and perform subscription.
64+ username: username used to connect to the database.
6465 """
6566self .name = name
6667self .node = node
@@ -70,7 +71,7 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
7071# create publication in database
7172t = "table " + ", " .join (tables )if tables else "all tables"
7273query = "create publication {} for {}"
73- node .safe_psql (query .format (name ,t ),dbname = dbname ,username = username )
74+ node .execute (query .format (name ,t ),dbname = dbname ,username = username )
7475
7576def drop (self ,dbname = None ,username = None ):
7677"""
@@ -87,13 +88,13 @@ def add_tables(self, tables, dbname=None, username=None):
8788 created with empty tables list.
8889
8990 Args:
90- tables: a list of tables to be added to the publication
91+ tables: a list of tables to be added to the publication.
9192 """
9293if not tables :
9394raise ValueError ("Tables list is empty" )
9495
9596query = "alter publication {} add table {}"
96- self .node .safe_psql (
97+ self .node .execute (
9798query .format (self .name ,", " .join (tables )),
9899dbname = dbname or self .dbname ,
99100username = username or self .username )
@@ -112,15 +113,15 @@ def __init__(self,
112113 constructing subscription objects.
113114
114115 Args:
115- name: subscription name
116- node: subscriber's node
116+ name: subscription name.
117+ node: subscriber's node.
117118 publication: :class:`.Publication` object we are subscribing to
118- (see :meth:`.PostgresNode.publish()`)
119- dbname: database name used to connect and perform subscription
120- username: username used to connect to the database
119+ (see :meth:`.PostgresNode.publish()`).
120+ dbname: database name used to connect and perform subscription.
121+ username: username used to connect to the database.
121122 params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
122123 <https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
123- for details)
124+ for details).
124125 """
125126self .name = name
126127self .node = node
@@ -142,28 +143,29 @@ def __init__(self,
142143if params :
143144query += " with ({})" .format (options_string (** params ))
144145
145- node .safe_psql (query ,dbname = dbname ,username = username )
146+ # Note: cannot run 'create subscription' query in transaction mode
147+ node .execute (query ,dbname = dbname ,username = username )
146148
147149def disable (self ,dbname = None ,username = None ):
148150"""
149151 Disables the running subscription.
150152 """
151153query = "alter subscription {} disable"
152- self .node .safe_psql (query .format (self .name ),dbname = None ,username = None )
154+ self .node .execute (query .format (self .name ),dbname = None ,username = None )
153155
154156def enable (self ,dbname = None ,username = None ):
155157"""
156158 Enables the previously disabled subscription.
157159 """
158160query = "alter subscription {} enable"
159- self .node .safe_psql (query .format (self .name ),dbname = None ,username = None )
161+ self .node .execute (query .format (self .name ),dbname = None ,username = None )
160162
161163def refresh (self ,copy_data = True ,dbname = None ,username = None ):
162164"""
163165 Disables the running subscription.
164166 """
165167query = "alter subscription {} refresh publication with (copy_data={})"
166- self .node .safe_psql (
168+ self .node .execute (
167169query .format (self .name ,copy_data ),
168170dbname = dbname ,
169171username = username )
@@ -172,7 +174,7 @@ def drop(self, dbname=None, username=None):
172174"""
173175 Drops subscription
174176 """
175- self .node .safe_psql (
177+ self .node .execute (
176178"drop subscription {}" .format (self .name ),
177179dbname = dbname ,
178180username = username )
@@ -182,19 +184,19 @@ def catchup(self, username=None):
182184 Wait until subscription catches up with publication.
183185
184186 Args:
185- username: remote node's user name
187+ username: remote node's user name.
186188 """
187- query = (
188- " select pg_current_wal_lsn() - replay_lsn = 0 "
189- " from pg_stat_replication where application_name = '{}'" ). format (
190- self .name )
189+ query = """
190+ select pg_current_wal_lsn() - replay_lsn = 0
191+ frompg_catalog. pg_stat_replication where application_name = '{}'
192+ """ . format ( self .name )
191193
192194try :
193195# wait until this LSN reaches subscriber
194196self .pub .node .poll_query_until (
195197query = query ,
196198dbname = self .pub .dbname ,
197199username = username or self .pub .username ,
198- max_attempts = 60 )
200+ max_attempts = LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS )
199201except Exception as e :
200202raise_from (CatchUpException ("Failed to catch up" ,query ),e )