58
58
QueryException , \
59
59
StartNodeException , \
60
60
TimeoutException , \
61
+ InitNodeException , \
61
62
TestgresException , \
62
63
BackupException
63
64
64
65
from .logger import TestgresLogger
65
66
67
+ from .pubsub import Publication ,Subscription
68
+
66
69
from .standby import First
67
70
71
+
68
72
from .utils import \
69
73
eprint , \
70
74
get_bin_path , \
73
77
reserve_port , \
74
78
release_port , \
75
79
execute_utility , \
80
+ options_string , \
76
81
clean_on_error
77
82
78
83
from .backup import NodeBackup
@@ -303,24 +308,24 @@ def _create_recovery_conf(self, username, slot=None):
303
308
master = self .master
304
309
assert master is not None
305
310
306
- conninfo = (
307
- u "application_name={} "
308
- u "port={} "
309
- u "user={} "
310
- ). format ( self . name , master . port , username ) # yapf: disable
311
+ conninfo = {
312
+ "application_name" : self . name ,
313
+ "port" : master . port ,
314
+ "user" : username
315
+ } # yapf: disable
311
316
312
317
# host is tricky
313
318
try :
314
319
import ipaddress
315
320
ipaddress .ip_address (master .host )
316
- conninfo += u "hostaddr={}" . format ( master .host )
321
+ conninfo [ "hostaddr" ] = master .host
317
322
except ValueError :
318
- conninfo += u "host={}" . format ( master .host )
323
+ conninfo [ "host" ] = master .host
319
324
320
325
line = (
321
326
"primary_conninfo='{}'\n "
322
327
"standby_mode=on\n "
323
- ).format (conninfo )# yapf: disable
328
+ ).format (options_string ( ** conninfo ) )# yapf: disable
324
329
325
330
if slot :
326
331
# Connect to master for some additional actions
@@ -416,6 +421,7 @@ def default_conf(self,
416
421
fsync = False ,
417
422
unix_sockets = True ,
418
423
allow_streaming = True ,
424
+ allow_logical = False ,
419
425
log_statement = 'all' ):
420
426
"""
421
427
Apply default settings to this node.
@@ -424,6 +430,7 @@ def default_conf(self,
424
430
fsync: should this node use fsync to keep data safe?
425
431
unix_sockets: should we enable UNIX sockets?
426
432
allow_streaming: should this node add a hba entry for replication?
433
+ allow_logical: can this node be used as a logical replication publisher?
427
434
log_statement: one of ('all', 'off', 'mod', 'ddl').
428
435
429
436
Returns:
@@ -500,6 +507,13 @@ def get_auth_method(t):
500
507
WAL_KEEP_SEGMENTS ,
501
508
wal_level ))# yapf: disable
502
509
510
+ if allow_logical :
511
+ if not pg_version_ge ('10' ):
512
+ raise InitNodeException (
513
+ "Logical replication is only available for Postgres 10 "
514
+ "and newer" )
515
+ conf .write (u"wal_level = logical\n " )
516
+
503
517
# disable UNIX sockets if asked to
504
518
if not unix_sockets :
505
519
conf .write (u"unix_socket_directories = ''\n " )
@@ -940,13 +954,14 @@ def poll_query_until(self,
940
954
if res is None :
941
955
raise QueryException ('Query returned None' ,query )
942
956
943
- if len (res )== 0 :
944
- raise QueryException ('Query returned 0 rows' ,query )
945
-
946
- if len (res [0 ])== 0 :
947
- raise QueryException ('Query returned 0 columns' ,query )
948
-
949
- if res [0 ][0 ]== expected :
957
+ # result set is not empty
958
+ if len (res ):
959
+ if len (res [0 ])== 0 :
960
+ raise QueryException ('Query returned 0 columns' ,query )
961
+ if res [0 ][0 ]== expected :
962
+ return # done
963
+ # empty result set is considered as None
964
+ elif expected is None :
950
965
return # done
951
966
952
967
except ProgrammingError as e :
@@ -985,13 +1000,11 @@ def execute(self,
985
1000
986
1001
with self .connect (dbname = dbname ,
987
1002
username = username ,
988
- password = password )as node_con :# yapf: disable
1003
+ password = password ,
1004
+ autocommit = commit )as node_con :# yapf: disable
989
1005
990
1006
res = node_con .execute (query )
991
1007
992
- if commit :
993
- node_con .commit ()
994
-
995
1008
return res
996
1009
997
1010
def backup (self ,** kwargs ):
@@ -1094,6 +1107,37 @@ def catchup(self, dbname=None, username=None):
1094
1107
except Exception as e :
1095
1108
raise_from (CatchUpException ("Failed to catch up" ,poll_lsn ),e )
1096
1109
1110
+ def publish (self ,name ,** kwargs ):
1111
+ """
1112
+ Create publication for logical replication
1113
+
1114
+ Args:
1115
+ pubname: publication name
1116
+ tables: tables names list
1117
+ dbname: database name where objects or interest are located
1118
+ username: replication username
1119
+ """
1120
+ return Publication (name = name ,node = self ,** kwargs )
1121
+
1122
+ def subscribe (self ,publication ,name ,dbname = None ,username = None ,
1123
+ ** params ):
1124
+ """
1125
+ Create subscription for logical replication
1126
+
1127
+ Args:
1128
+ name: subscription name
1129
+ publication: publication object obtained from publish()
1130
+ dbname: database name
1131
+ username: replication username
1132
+ params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
1133
+ <https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
1134
+ for details)
1135
+ """
1136
+ # yapf: disable
1137
+ return Subscription (name = name ,node = self ,publication = publication ,
1138
+ dbname = dbname ,username = username ,** params )
1139
+ # yapf: enable
1140
+
1097
1141
def pgbench (self ,
1098
1142
dbname = None ,
1099
1143
username = None ,
@@ -1192,14 +1236,21 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs):
1192
1236
1193
1237
return execute_utility (_params ,self .utils_log_file )
1194
1238
1195
- def connect (self ,dbname = None ,username = None ,password = None ):
1239
+ def connect (self ,
1240
+ dbname = None ,
1241
+ username = None ,
1242
+ password = None ,
1243
+ autocommit = False ):
1196
1244
"""
1197
1245
Connect to a database.
1198
1246
1199
1247
Args:
1200
1248
dbname: database name to connect to.
1201
1249
username: database user name.
1202
1250
password: user's password.
1251
+ autocommit: commit each statement automatically. Also it should be
1252
+ set to `True` for statements requiring to be run outside
1253
+ a transaction? such as `VACUUM` or `CREATE DATABASE`.
1203
1254
1204
1255
Returns:
1205
1256
An instance of :class:`.NodeConnection`.
@@ -1208,4 +1259,5 @@ def connect(self, dbname=None, username=None, password=None):
1208
1259
return NodeConnection (node = self ,
1209
1260
dbname = dbname ,
1210
1261
username = username ,
1211
- password = password )# yapf: disable
1262
+ password = password ,
1263
+ autocommit = autocommit )# yapf: disable