Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit27cf106

Browse files
committed
add catchup() method to PostgresNode
1 parent76729ec commit27cf106

File tree

2 files changed

+68
-21
lines changed

2 files changed

+68
-21
lines changed

‎testgres/testgres.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ class BackupException(Exception):
109109
pass
110110

111111

112+
classCatchUpException(Exception):
113+
pass
114+
115+
112116
classTestgresLogger(threading.Thread):
113117
"""
114118
Helper class to implement reading from postgresql.log
@@ -318,12 +322,14 @@ def spawn_primary(self, name, destroy=True):
318322
# Copy backup to new data dir
319323
shutil.copytree(data1,data2)
320324
exceptExceptionase:
321-
raiseBackupException(e.message)
325+
raiseBackupException(str(e))
322326
else:
323327
base_dir=self.base_dir
324328

325329
# build a new PostgresNode
326-
node=get_new_node(name,base_dir)
330+
node=PostgresNode(name=name,
331+
base_dir=base_dir,
332+
master=self.original_node)
327333
node.append_conf("postgresql.conf","port = {}".format(node.port))
328334

329335
# record new status
@@ -803,13 +809,21 @@ def restore(self, dbname, filename, username=None):
803809

804810
self.psql(dbname=dbname,filename=filename,username=username)
805811

806-
defpoll_query_until(self,dbname,query,username=None,max_attempts=60,sleep_time=1):
812+
defpoll_query_until(self,
813+
dbname,
814+
query,
815+
username=None,
816+
max_attempts=60,
817+
sleep_time=1):
807818
"""
808819
Run a query once a second until it returs True.
809820
810821
Args:
811822
dbname: database name to connect to (str).
812823
query: query to be executed (str).
824+
username: database user name (str).
825+
max_attempts: how many times should we try?
826+
sleep_time: how long should we sleep after a failure?
813827
"""
814828

815829
attemps=0
@@ -844,7 +858,7 @@ def execute(self, dbname, query, username=None, commit=False):
844858
dbname: database name to connect to (str).
845859
query: query to be executed (str).
846860
username: database user name (str).
847-
commit: should we commit this query?.
861+
commit: should we commit this query?
848862
849863
Returns:
850864
A list of tuples representing rows.
@@ -885,6 +899,32 @@ def replicate(self, name, username=None, xlog_method=DEFAULT_XLOG_METHOD):
885899
returnself.backup(username=username,
886900
xlog_method=xlog_method).spawn_replica(name)
887901

902+
defcatchup(self):
903+
"""
904+
Wait until async replica catches up with its master.
905+
"""
906+
907+
master=self.master
908+
909+
cur_ver=LooseVersion(get_pg_config()["VERSION_NUM"])
910+
min_ver=LooseVersion('10.0')
911+
912+
ifcur_ver>=min_ver:
913+
poll_lsn="select pg_current_wal_lsn()::text"
914+
wait_lsn="select pg_last_wal_replay_lsn() >= '{}'::pg_lsn"
915+
else:
916+
poll_lsn="select pg_current_xlog_location()::text"
917+
wait_lsn="select pg_last_xlog_replay_location() >= '{}'::pg_lsn"
918+
919+
ifnotmaster:
920+
raiseCatchUpException("Master node is not specified")
921+
922+
try:
923+
lsn=master.execute('postgres',poll_lsn)[0][0]
924+
self.poll_query_until('postgres',wait_lsn.format(lsn))
925+
exceptExceptionase:
926+
raiseCatchUpException(str(e))
927+
888928
defpgbench_init(self,dbname='postgres',scale=1,options=[]):
889929
"""
890930
Prepare database for pgbench (create tables etc).
@@ -963,7 +1003,7 @@ def call_initdb(_data_dir):
9631003
_params= [_data_dir,"-N"]+initdb_params
9641004
_execute_utility("initdb",_params,initdb_logfile)
9651005
exceptExceptionase:
966-
raiseInitNodeException(e.message)
1006+
raiseInitNodeException(str(e))
9671007

9681008
# Call initdb if we have custom params
9691009
ifinitdb_params:
@@ -981,7 +1021,7 @@ def call_initdb(_data_dir):
9811021
# Copy cached initdb to current data dir
9821022
shutil.copytree(cached_data_dir,data_dir)
9831023
exceptExceptionase:
984-
raiseInitNodeException(e.message)
1024+
raiseInitNodeException(str(e))
9851025

9861026

9871027
def_execute_utility(util,args,logfile,write_to_pipe=True):

‎tests/test_simple.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,24 +224,12 @@ def test_backup_and_replication(self):
224224
res=replica.execute('postgres','select * from abc')
225225
self.assertListEqual(res, [(1,2)])
226226

227-
cur_ver=LooseVersion(get_pg_config()['VERSION_NUM'])
228-
min_ver=LooseVersion('10')
229-
230-
# Prepare the query which would check whether record reached replica
231-
# (It is slightly different for Postgres 9.6 and Postgres 10+)
232-
ifcur_ver>=min_ver:
233-
wait_lsn='SELECT pg_current_wal_lsn() <= replay_lsn ' \
234-
'FROM pg_stat_replication WHERE application_name =\'%s\'' \
235-
%replica.name
236-
else:
237-
wait_lsn='SELECT pg_current_xlog_location() <= replay_location '\
238-
'FROM pg_stat_replication WHERE application_name =\'%s\'' \
239-
%replica.name
240-
241227
# Insert into master node
242228
node.psql('postgres','insert into abc values (3, 4)')
229+
243230
# Wait until data syncronizes
244-
node.poll_query_until('postgres',wait_lsn)
231+
replica.catchup()
232+
245233
# Check that this record was exported to replica
246234
res=replica.execute('postgres','select * from abc')
247235
self.assertListEqual(res, [(1,2), (3,4)])
@@ -254,6 +242,25 @@ def test_replicate(self):
254242
res=replica.start().execute('postgres','select 1')
255243
self.assertListEqual(res, [(1, )])
256244

245+
node.execute(
246+
'postgres','create table test (val int)',commit=True)
247+
248+
replica.catchup()
249+
250+
res=node.execute('postgres','select * from test')
251+
self.assertListEqual(res, [])
252+
253+
deftest_incorrect_catchup(self):
254+
withget_new_node('node')asnode:
255+
node.init(allow_streaming=True).start()
256+
257+
got_exception=False
258+
try:
259+
node.catchup()
260+
exceptExceptionase:
261+
pass
262+
self.assertTrue(got_exception)
263+
257264
deftest_dump(self):
258265
withget_new_node('node1')asnode1:
259266
node1.init().start()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp