Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4b279ef

Browse files
committed
minor refactoring
1 parent08ed6ef commit4b279ef

File tree

5 files changed

+61
-39
lines changed

5 files changed

+61
-39
lines changed

‎testgres/connection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ class NodeConnection(object):
2727
Transaction wrapper returned by Node
2828
"""
2929

30-
def__init__(self,node,dbname=None,username=None,password=None):
30+
def__init__(self,
31+
node,
32+
dbname=None,
33+
username=None,
34+
password=None,
35+
autocommit=False):
3136

3237
# Set default arguments
3338
dbname=dbnameordefault_dbname()
@@ -42,6 +47,7 @@ def __init__(self, node, dbname=None, username=None, password=None):
4247
host=node.host,
4348
port=node.port)
4449

50+
self._connection.autocommit=autocommit
4551
self._cursor=self.connection.cursor()
4652

4753
@property

‎testgres/consts.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@
2929
MAX_REPLICATION_SLOTS=10
3030
MAX_WAL_SENDERS=10
3131
WAL_KEEP_SEGMENTS=20
32+
33+
# logical replication settings
34+
LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS=60

‎testgres/node.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -953,13 +953,11 @@ def execute(self,
953953

954954
withself.connect(dbname=dbname,
955955
username=username,
956-
password=password)asnode_con:# yapf: disable
956+
password=password,
957+
autocommit=commit)asnode_con:# yapf: disable
957958

958959
res=node_con.execute(query)
959960

960-
ifcommit:
961-
node_con.commit()
962-
963961
returnres
964962

965963
defbackup(self,**kwargs):
@@ -1152,14 +1150,21 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs):
11521150

11531151
returnexecute_utility(_params,self.utils_log_file)
11541152

1155-
defconnect(self,dbname=None,username=None,password=None):
1153+
defconnect(self,
1154+
dbname=None,
1155+
username=None,
1156+
password=None,
1157+
autocommit=False):
11561158
"""
11571159
Connect to a database.
11581160
11591161
Args:
11601162
dbname: database name to connect to.
11611163
username: database user name.
11621164
password: user's password.
1165+
autocommit: commit each statement automatically. Also it should be
1166+
set to `True` for statements requiring to be run outside
1167+
a transaction? such as `VACUUM` or `CREATE DATABASE`.
11631168
11641169
Returns:
11651170
An instance of :class:`.NodeConnection`.
@@ -1168,4 +1173,5 @@ def connect(self, dbname=None, username=None, password=None):
11681173
returnNodeConnection(node=self,
11691174
dbname=dbname,
11701175
username=username,
1171-
password=password)# yapf: disable
1176+
password=password,
1177+
autocommit=autocommit)# yapf: disable

‎testgres/pubsub.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()`
1313
methods may be used to setup replication. Example:
1414
15-
>>> from.api import get_new_node
15+
>>> fromtestgres import get_new_node
1616
>>> with get_new_node() as nodeA, get_new_node() as nodeB:
1717
... nodeA.init(allow_logical=True).start()
1818
... nodeB.init().start()
@@ -44,6 +44,7 @@
4444

4545
fromsiximportraise_from
4646

47+
from .constsimportLOGICAL_REPL_MAX_CATCHUP_ATTEMPTS
4748
from .defaultsimportdefault_dbname,default_username
4849
from .exceptionsimportCatchUpException
4950
from .utilsimportoptions_string
@@ -56,11 +57,11 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
5657
constructing publication objects.
5758
5859
Args:
59-
name: publication name
60-
node: publisher's node
61-
tables: tables list or None for all tables
62-
dbname: database name used to connect and perform subscription
63-
username: username used to connect to the database
60+
name: publication name.
61+
node: publisher's node.
62+
tables: tables list or None for all tables.
63+
dbname: database name used to connect and perform subscription.
64+
username: username used to connect to the database.
6465
"""
6566
self.name=name
6667
self.node=node
@@ -70,7 +71,7 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
7071
# create publication in database
7172
t="table "+", ".join(tables)iftableselse"all tables"
7273
query="create publication {} for {}"
73-
node.safe_psql(query.format(name,t),dbname=dbname,username=username)
74+
node.execute(query.format(name,t),dbname=dbname,username=username)
7475

7576
defdrop(self,dbname=None,username=None):
7677
"""
@@ -87,13 +88,13 @@ def add_tables(self, tables, dbname=None, username=None):
8788
created with empty tables list.
8889
8990
Args:
90-
tables: a list of tables to be added to the publication
91+
tables: a list of tables to be added to the publication.
9192
"""
9293
ifnottables:
9394
raiseValueError("Tables list is empty")
9495

9596
query="alter publication {} add table {}"
96-
self.node.safe_psql(
97+
self.node.execute(
9798
query.format(self.name,", ".join(tables)),
9899
dbname=dbnameorself.dbname,
99100
username=usernameorself.username)
@@ -112,15 +113,15 @@ def __init__(self,
112113
constructing subscription objects.
113114
114115
Args:
115-
name: subscription name
116-
node: subscriber's node
116+
name: subscription name.
117+
node: subscriber's node.
117118
publication: :class:`.Publication` object we are subscribing to
118-
(see :meth:`.PostgresNode.publish()`)
119-
dbname: database name used to connect and perform subscription
120-
username: username used to connect to the database
119+
(see :meth:`.PostgresNode.publish()`).
120+
dbname: database name used to connect and perform subscription.
121+
username: username used to connect to the database.
121122
params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
122123
<https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
123-
for details)
124+
for details).
124125
"""
125126
self.name=name
126127
self.node=node
@@ -142,28 +143,29 @@ def __init__(self,
142143
ifparams:
143144
query+=" with ({})".format(options_string(**params))
144145

145-
node.safe_psql(query,dbname=dbname,username=username)
146+
# Note: cannot run 'create subscription' query in transaction mode
147+
node.execute(query,dbname=dbname,username=username)
146148

147149
defdisable(self,dbname=None,username=None):
148150
"""
149151
Disables the running subscription.
150152
"""
151153
query="alter subscription {} disable"
152-
self.node.safe_psql(query.format(self.name),dbname=None,username=None)
154+
self.node.execute(query.format(self.name),dbname=None,username=None)
153155

154156
defenable(self,dbname=None,username=None):
155157
"""
156158
Enables the previously disabled subscription.
157159
"""
158160
query="alter subscription {} enable"
159-
self.node.safe_psql(query.format(self.name),dbname=None,username=None)
161+
self.node.execute(query.format(self.name),dbname=None,username=None)
160162

161163
defrefresh(self,copy_data=True,dbname=None,username=None):
162164
"""
163165
Disables the running subscription.
164166
"""
165167
query="alter subscription {} refresh publication with (copy_data={})"
166-
self.node.safe_psql(
168+
self.node.execute(
167169
query.format(self.name,copy_data),
168170
dbname=dbname,
169171
username=username)
@@ -172,7 +174,7 @@ def drop(self, dbname=None, username=None):
172174
"""
173175
Drops subscription
174176
"""
175-
self.node.safe_psql(
177+
self.node.execute(
176178
"drop subscription {}".format(self.name),
177179
dbname=dbname,
178180
username=username)
@@ -182,19 +184,19 @@ def catchup(self, username=None):
182184
Wait until subscription catches up with publication.
183185
184186
Args:
185-
username: remote node's user name
187+
username: remote node's user name.
186188
"""
187-
query=(
188-
"select pg_current_wal_lsn() - replay_lsn = 0 "
189-
"from pg_stat_replication where application_name = '{}'").format(
190-
self.name)
189+
query="""
190+
select pg_current_wal_lsn() - replay_lsn = 0
191+
frompg_catalog.pg_stat_replication where application_name = '{}'
192+
""".format(self.name)
191193

192194
try:
193195
# wait until this LSN reaches subscriber
194196
self.pub.node.poll_query_until(
195197
query=query,
196198
dbname=self.pub.dbname,
197199
username=usernameorself.pub.username,
198-
max_attempts=60)
200+
max_attempts=LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS)
199201
exceptExceptionase:
200202
raise_from(CatchUpException("Failed to catch up",query),e)

‎tests/test_simple.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ def test_logical_replication(self):
432432
node1.safe_psql('insert into test2 values (\'a\'), (\'b\')')
433433
sub.catchup()
434434
res=node2.execute('select * from test2')
435-
self.assertListEqual(res, [('a',), ('b',)])
435+
self.assertListEqual(res, [('a',), ('b',)])
436436

437437
# drop subscription
438438
sub.drop()
@@ -450,12 +450,12 @@ def test_logical_replication(self):
450450

451451
# explicitely add table
452452
withself.assertRaises(ValueError):
453-
pub.add_tables([])# fail
453+
pub.add_tables([])# fail
454454
pub.add_tables(['test2'])
455455
node1.safe_psql('insert into test2 values (\'c\')')
456456
sub.catchup()
457457
res=node2.execute('select * from test2')
458-
self.assertListEqual(res, [('a',), ('b',)])
458+
self.assertListEqual(res, [('a',), ('b',)])
459459

460460
@unittest.skipUnless(pg_version_ge('10'),'requires 10+')
461461
deftest_logical_catchup(self):
@@ -477,7 +477,10 @@ def test_logical_catchup(self):
477477
node1.execute('insert into test values ({0}, {0})'.format(i))
478478
sub.catchup()
479479
res=node2.execute('select * from test')
480-
self.assertListEqual(res, [(i,i,)])
480+
self.assertListEqual(res, [(
481+
i,
482+
i,
483+
)])
481484
node1.execute('delete from test')
482485

483486
@unittest.skipIf(pg_version_ge('10'),'requires <10')
@@ -544,7 +547,8 @@ def test_poll_query_until(self):
544547

545548
# check 0 columns
546549
withself.assertRaises(QueryException):
547-
node.poll_query_until(query='select from pg_class limit 1')
550+
node.poll_query_until(
551+
query='select from pg_catalog.pg_class limit 1')
548552

549553
# check None, fail
550554
withself.assertRaises(QueryException):
@@ -556,7 +560,8 @@ def test_poll_query_until(self):
556560

557561
# check 0 rows equivalent to expected=None
558562
node.poll_query_until(
559-
query='select * from pg_class where true = false',expected=None)
563+
query='select * from pg_catalog.pg_class where true = false',
564+
expected=None)
560565

561566
# check arbitrary expected value, fail
562567
withself.assertRaises(TimeoutException):

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp