Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0583873

Browse files
authored
Merge pull request#42 from zilder/logical
Logical replication support
2 parents221df4f +50e02ff commit0583873

File tree

7 files changed

+413
-29
lines changed

7 files changed

+413
-29
lines changed

‎docs/source/testgres.rst

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,19 @@ testgres.node
5959
..automethod::__init__
6060

6161
..autoclass::testgres.node.ProcessProxy
62-
:members:
62+
:members:
63+
64+
testgres.pubsub
65+
---------------
66+
67+
..automodule::testgres.pubsub
68+
69+
..autoclass::testgres.node.Publication
70+
:members:
71+
72+
..automethod::__init__
73+
74+
..autoclass::testgres.node.Subscription
75+
:members:
76+
77+
..automethod::__init__

‎testgres/connection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ class NodeConnection(object):
2727
Transaction wrapper returned by Node
2828
"""
2929

30-
def__init__(self,node,dbname=None,username=None,password=None):
30+
def__init__(self,
31+
node,
32+
dbname=None,
33+
username=None,
34+
password=None,
35+
autocommit=False):
3136

3237
# Set default arguments
3338
dbname=dbnameordefault_dbname()
@@ -42,6 +47,7 @@ def __init__(self, node, dbname=None, username=None, password=None):
4247
host=node.host,
4348
port=node.port)
4449

50+
self._connection.autocommit=autocommit
4551
self._cursor=self.connection.cursor()
4652

4753
@property

‎testgres/consts.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@
2929
MAX_REPLICATION_SLOTS=10
3030
MAX_WAL_SENDERS=10
3131
WAL_KEEP_SEGMENTS=20
32+
33+
# logical replication settings
34+
LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS=60

‎testgres/node.py

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@
5757
QueryException, \
5858
StartNodeException, \
5959
TimeoutException, \
60+
InitNodeException, \
6061
TestgresException, \
6162
BackupException
6263

6364
from .loggerimportTestgresLogger
6465

66+
from .pubsubimportPublication,Subscription
67+
6568
from .utilsimport \
6669
eprint, \
6770
get_bin_path, \
@@ -70,6 +73,7 @@
7073
reserve_port, \
7174
release_port, \
7275
execute_utility, \
76+
options_string, \
7377
clean_on_error
7478

7579
from .backupimportNodeBackup
@@ -300,24 +304,24 @@ def _create_recovery_conf(self, username, slot=None):
300304
master=self.master
301305
assertmasterisnotNone
302306

303-
conninfo=(
304-
u"application_name={} "
305-
u"port={} "
306-
u"user={} "
307-
).format(self.name,master.port,username)# yapf: disable
307+
conninfo={
308+
"application_name":self.name,
309+
"port":master.port,
310+
"user":username
311+
}# yapf: disable
308312

309313
# host is tricky
310314
try:
311315
importipaddress
312316
ipaddress.ip_address(master.host)
313-
conninfo+=u"hostaddr={}".format(master.host)
317+
conninfo["hostaddr"]=master.host
314318
exceptValueError:
315-
conninfo+=u"host={}".format(master.host)
319+
conninfo["host"]=master.host
316320

317321
line= (
318322
"primary_conninfo='{}'\n"
319323
"standby_mode=on\n"
320-
).format(conninfo)# yapf: disable
324+
).format(options_string(**conninfo))# yapf: disable
321325

322326
ifslot:
323327
# Connect to master for some additional actions
@@ -413,6 +417,7 @@ def default_conf(self,
413417
fsync=False,
414418
unix_sockets=True,
415419
allow_streaming=True,
420+
allow_logical=False,
416421
log_statement='all'):
417422
"""
418423
Apply default settings to this node.
@@ -421,6 +426,7 @@ def default_conf(self,
421426
fsync: should this node use fsync to keep data safe?
422427
unix_sockets: should we enable UNIX sockets?
423428
allow_streaming: should this node add a hba entry for replication?
429+
allow_logical: can this node be used as a logical replication publisher?
424430
log_statement: one of ('all', 'off', 'mod', 'ddl').
425431
426432
Returns:
@@ -497,6 +503,13 @@ def get_auth_method(t):
497503
WAL_KEEP_SEGMENTS,
498504
wal_level))# yapf: disable
499505

506+
ifallow_logical:
507+
ifnotpg_version_ge('10'):
508+
raiseInitNodeException(
509+
"Logical replication is only available for Postgres 10 "
510+
"and newer")
511+
conf.write(u"wal_level = logical\n")
512+
500513
# disable UNIX sockets if asked to
501514
ifnotunix_sockets:
502515
conf.write(u"unix_socket_directories = ''\n")
@@ -937,13 +950,14 @@ def poll_query_until(self,
937950
ifresisNone:
938951
raiseQueryException('Query returned None',query)
939952

940-
iflen(res)==0:
941-
raiseQueryException('Query returned 0 rows',query)
942-
943-
iflen(res[0])==0:
944-
raiseQueryException('Query returned 0 columns',query)
945-
946-
ifres[0][0]==expected:
953+
# result set is not empty
954+
iflen(res):
955+
iflen(res[0])==0:
956+
raiseQueryException('Query returned 0 columns',query)
957+
ifres[0][0]==expected:
958+
return# done
959+
# empty result set is considered as None
960+
elifexpectedisNone:
947961
return# done
948962

949963
exceptProgrammingErrorase:
@@ -982,13 +996,11 @@ def execute(self,
982996

983997
withself.connect(dbname=dbname,
984998
username=username,
985-
password=password)asnode_con:# yapf: disable
999+
password=password,
1000+
autocommit=commit)asnode_con:# yapf: disable
9861001

9871002
res=node_con.execute(query)
9881003

989-
ifcommit:
990-
node_con.commit()
991-
9921004
returnres
9931005

9941006
defbackup(self,**kwargs):
@@ -1052,6 +1064,37 @@ def catchup(self, dbname=None, username=None):
10521064
exceptExceptionase:
10531065
raise_from(CatchUpException("Failed to catch up",poll_lsn),e)
10541066

1067+
defpublish(self,name,**kwargs):
1068+
"""
1069+
Create publication for logical replication
1070+
1071+
Args:
1072+
pubname: publication name
1073+
tables: tables names list
1074+
dbname: database name where objects or interest are located
1075+
username: replication username
1076+
"""
1077+
returnPublication(name=name,node=self,**kwargs)
1078+
1079+
defsubscribe(self,publication,name,dbname=None,username=None,
1080+
**params):
1081+
"""
1082+
Create subscription for logical replication
1083+
1084+
Args:
1085+
name: subscription name
1086+
publication: publication object obtained from publish()
1087+
dbname: database name
1088+
username: replication username
1089+
params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
1090+
<https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
1091+
for details)
1092+
"""
1093+
# yapf: disable
1094+
returnSubscription(name=name,node=self,publication=publication,
1095+
dbname=dbname,username=username,**params)
1096+
# yapf: enable
1097+
10551098
defpgbench(self,
10561099
dbname=None,
10571100
username=None,
@@ -1150,14 +1193,21 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs):
11501193

11511194
returnexecute_utility(_params,self.utils_log_file)
11521195

1153-
defconnect(self,dbname=None,username=None,password=None):
1196+
defconnect(self,
1197+
dbname=None,
1198+
username=None,
1199+
password=None,
1200+
autocommit=False):
11541201
"""
11551202
Connect to a database.
11561203
11571204
Args:
11581205
dbname: database name to connect to.
11591206
username: database user name.
11601207
password: user's password.
1208+
autocommit: commit each statement automatically. Also it should be
1209+
set to `True` for statements requiring to be run outside
1210+
a transaction? such as `VACUUM` or `CREATE DATABASE`.
11611211
11621212
Returns:
11631213
An instance of :class:`.NodeConnection`.
@@ -1166,4 +1216,5 @@ def connect(self, dbname=None, username=None, password=None):
11661216
returnNodeConnection(node=self,
11671217
dbname=dbname,
11681218
username=username,
1169-
password=password)# yapf: disable
1219+
password=password,
1220+
autocommit=autocommit)# yapf: disable

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp