Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4c3e877

Browse files
zilderildus
authored andcommitted
Synchronous standbys (#46)
Add set_synchronous_standbys() method
1 parent95d37e9 commit4c3e877

File tree

5 files changed

+148
-1
lines changed

5 files changed

+148
-1
lines changed

‎docs/source/testgres.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ testgres.node
6161
..autoclass::testgres.node.ProcessProxy
6262
:members:
6363

64+
testgres.standby
65+
----------------
66+
67+
..automodule::testgres.standby
68+
:members:
69+
:undoc-members:
70+
:show-inheritance:
71+
6472
testgres.pubsub
6573
---------------
6674

‎testgres/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@
2626
get_bin_path, \
2727
get_pg_config, \
2828
get_pg_version
29+
30+
from .standbyimport \
31+
First, \
32+
Any

‎testgres/node.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
importsubprocess
77
importtime
88

9+
fromcollectionsimportIterable
910
fromshutilimportrmtree
1011
fromsiximportraise_from,iteritems,text_type
1112
fromtempfileimportmkstemp,mkdtemp
@@ -64,6 +65,8 @@
6465

6566
from .pubsubimportPublication,Subscription
6667

68+
from .standbyimportFirst
69+
6770
from .utilsimport \
6871
PgVer, \
6972
eprint, \
@@ -699,7 +702,7 @@ def restart(self, params=[]):
699702

700703
defreload(self,params=[]):
701704
"""
702-
Reload config files using pg_ctl.
705+
Asynchronously reload config files using pg_ctl.
703706
704707
Args:
705708
params: additional arguments for pg_ctl.
@@ -1117,6 +1120,45 @@ def replicate(self, name=None, slot=None, **kwargs):
11171120
withclean_on_error(self.backup(**kwargs))asbackup:
11181121
returnbackup.spawn_replica(name=name,destroy=True,slot=slot)
11191122

1123+
defset_synchronous_standbys(self,standbys):
1124+
"""
1125+
Set standby synchronization options. This corresponds to
1126+
`synchronous_standby_names <https://www.postgresql.org/docs/current/static/runtime-config-replication.html#GUC-SYNCHRONOUS-STANDBY-NAMES>`_
1127+
option. Note that :meth:`~.PostgresNode.reload` or
1128+
:meth:`~.PostgresNode.restart` is needed for changes to take place.
1129+
1130+
Args:
1131+
standbys: either :class:`.First` or :class:`.Any` object specifying
1132+
sychronization parameters or just a plain list of
1133+
:class:`.PostgresNode`s replicas which would be equivalent
1134+
to passing ``First(1, <list>)``. For PostgreSQL 9.5 and below
1135+
it is only possible to specify a plain list of standbys as
1136+
`FIRST` and `ANY` keywords aren't supported.
1137+
1138+
Example::
1139+
1140+
from testgres import get_new_node, First
1141+
1142+
master = get_new_node().init().start()
1143+
with master.replicate().start() as standby:
1144+
master.append_conf("synchronous_commit = remote_apply")
1145+
master.set_synchronous_standbys(First(1, [standby]))
1146+
master.restart()
1147+
1148+
"""
1149+
ifself._pg_version>='9.6':
1150+
ifisinstance(standbys,Iterable):
1151+
standbys=First(1,standbys)
1152+
else:
1153+
ifisinstance(standbys,Iterable):
1154+
standbys=u", ".join(
1155+
u"\"{}\"".format(r.name)forrinstandbys)
1156+
else:
1157+
raiseTestgresException("Feature isn't supported in "
1158+
"Postgres 9.5 and below")
1159+
1160+
self.append_conf("synchronous_standby_names = '{}'".format(standbys))
1161+
11201162
defcatchup(self,dbname=None,username=None):
11211163
"""
11221164
Wait until async replica catches up with its master.

‎testgres/standby.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# coding: utf-8
2+
3+
importsix
4+
5+
6+
@six.python_2_unicode_compatible
7+
classFirst:
8+
"""
9+
Specifies a priority-based synchronous replication and makes transaction
10+
commits wait until their WAL records are replicated to ``num_sync``
11+
synchronous standbys chosen based on their priorities.
12+
13+
Args:
14+
sync_num (int): the number of standbys that transaction need to wait
15+
for replies from
16+
standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby
17+
nodes
18+
"""
19+
20+
def__init__(self,sync_num,standbys):
21+
self.sync_num=sync_num
22+
self.standbys=standbys
23+
24+
def__str__(self):
25+
returnu"{} ({})".format(self.sync_num,u", ".join(
26+
u"\"{}\"".format(r.name)forrinself.standbys))
27+
28+
29+
@six.python_2_unicode_compatible
30+
classAny:
31+
"""
32+
Specifies a quorum-based synchronous replication and makes transaction
33+
commits wait until their WAL records are replicated to at least ``num_sync``
34+
listed standbys. Only available for Postgres 10 and newer.
35+
36+
Args:
37+
sync_num (int): the number of standbys that transaction need to wait
38+
for replies from
39+
standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby
40+
nodes
41+
"""
42+
43+
def__init__(self,sync_num,standbys):
44+
self.sync_num=sync_num
45+
self.standbys=standbys
46+
47+
def__str__(self):
48+
returnu"ANY {} ({})".format(self.sync_num,u", ".join(
49+
u"\"{}\"".format(r.name)forrinself.standbys))

‎tests/test_simple.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
get_pg_config, \
4242
get_pg_version
4343

44+
fromtestgresimport \
45+
First, \
46+
Any
47+
4448
# NOTE: those are ugly imports
4549
fromtestgresimportbound_ports
4650
fromtestgres.utilsimportPgVer
@@ -410,6 +414,46 @@ def test_replicate(self):
410414
res=node.execute('select * from test')
411415
self.assertListEqual(res, [])
412416

417+
deftest_synchronous_replication(self):
418+
withget_new_node()asmaster:
419+
old_version=notpg_version_ge('9.6')
420+
421+
master.init(allow_streaming=True).start()
422+
423+
ifnotold_version:
424+
master.append_conf('synchronous_commit = remote_apply')
425+
426+
# create standby
427+
withmaster.replicate()asstandby1,master.replicate()asstandby2:
428+
standby1.start()
429+
standby2.start()
430+
431+
# check formatting
432+
self.assertEqual(
433+
'1 ("{}", "{}")'.format(standby1.name,standby2.name),
434+
str(First(1, (standby1,standby2))))# yapf: disable
435+
self.assertEqual(
436+
'ANY 1 ("{}", "{}")'.format(standby1.name,standby2.name),
437+
str(Any(1, (standby1,standby2))))# yapf: disable
438+
439+
# set synchronous_standby_names
440+
master.set_synchronous_standbys([standby1,standby2])
441+
master.restart()
442+
443+
# the following part of the test is only applicable to newer
444+
# versions of PostgresQL
445+
ifnotold_version:
446+
master.safe_psql('create table abc(a int)')
447+
448+
# Create a large transaction that will take some time to apply
449+
# on standby to check that it applies synchronously
450+
# (If set synchronous_commit to 'on' or other lower level then
451+
# standby most likely won't catchup so fast and test will fail)
452+
master.safe_psql(
453+
'insert into abc select generate_series(1, 1000000)')
454+
res=standby1.safe_psql('select count(*) from abc')
455+
self.assertEqual(res,b'1000000\n')
456+
413457
@unittest.skipUnless(pg_version_ge('10'),'requires 10+')
414458
deftest_logical_replication(self):
415459
withget_new_node()asnode1,get_new_node()asnode2:

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp