Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0d62e0e

Browse files
author
v.shepard
committed
PBCKP-137 update node.py
1 parent1512afd commit0d62e0e

File tree

1 file changed

+67
-96
lines changed

1 file changed

+67
-96
lines changed

‎testgres/node.py

Lines changed: 67 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
importsubprocess
1313
importtime
1414

15+
1516
try:
1617
fromcollections.abcimportIterable
1718
exceptImportError:
@@ -104,6 +105,7 @@
104105
InternalError=pglib.InternalError
105106
ProgrammingError=pglib.ProgrammingError
106107
OperationalError=pglib.OperationalError
108+
DatabaseError=pglib.DatabaseError
107109

108110

109111
classProcessProxy(object):
@@ -651,13 +653,15 @@ def get_control_data(self):
651653

652654
returnout_dict
653655

654-
defslow_start(self,replica=False):
656+
defslow_start(self,replica=False,dbname='template1',username='dev'):
655657
"""
656658
Starts the PostgreSQL instance and then polls the instance
657659
until it reaches the expected state (primary or replica). The state is checked
658660
using the pg_is_in_recovery() function.
659661
660662
Args:
663+
dbname:
664+
username:
661665
replica: If True, waits for the instance to be in recovery (i.e., replica mode).
662666
If False, waits for the instance to be in primary mode. Default is False.
663667
"""
@@ -668,14 +672,15 @@ def slow_start(self, replica=False):
668672
else:
669673
query='SELECT not pg_is_in_recovery()'
670674
# Call poll_query_until until the expected value is returned
671-
self.poll_query_until(
672-
dbname="template1",
673-
query=query,
674-
suppress={pglib.InternalError,
675-
QueryException,
676-
pglib.ProgrammingError,
677-
pglib.OperationalError})
678-
675+
self.poll_query_until(query=query,
676+
expected=False,
677+
dbname=dbname,
678+
username=username,
679+
suppress={InternalError,
680+
QueryException,
681+
ProgrammingError,
682+
OperationalError,
683+
DatabaseError})
679684

680685
defstart(self,params=[],wait=True):
681686
"""
@@ -1432,96 +1437,66 @@ def connect(self,
14321437
autocommit=autocommit)# yapf: disable
14331438

14341439
deftable_checksum(self,table,dbname="postgres"):
1435-
"""
1436-
Calculate the checksum of a table by hashing its rows.
1437-
1438-
The function fetches rows from the table in chunks and calculates the checksum
1439-
by summing the hash values of each row. The function uses a separate thread
1440-
to fetch rows when there are more than 2000 rows in the table.
1441-
1442-
Args:
1443-
table (str): The name of the table for which the checksum should be calculated.
1444-
dbname (str, optional): The name of the database where the table is located. Defaults to "postgres".
1445-
1446-
Returns:
1447-
int: The calculated checksum of the table.
1448-
"""
1449-
1450-
deffetch_rows(con,cursor_name):
1451-
whileTrue:
1452-
rows=con.execute(f"FETCH FORWARD 2000 FROM{cursor_name}")
1453-
ifnotrows:
1454-
break
1455-
yieldrows
1456-
1457-
defprocess_rows(queue,con,cursor_name):
1458-
try:
1459-
forrowsinfetch_rows(con,cursor_name):
1460-
queue.put(rows)
1461-
exceptExceptionase:
1462-
queue.put(e)
1463-
else:
1464-
queue.put(None)
1465-
1466-
cursor_name=f"cur_{random.randint(0,2**48)}"
1467-
checksum=0
1468-
query_thread=None
1469-
1470-
withself.connect(dbname=dbname)ascon:
1471-
con.execute(f"""
1472-
DECLARE{cursor_name} NO SCROLL CURSOR FOR
1473-
SELECT t::text FROM{table} as t
1474-
""")
1475-
1476-
queue=Queue(maxsize=50)
1477-
initial_rows=con.execute(f"FETCH FORWARD 2000 FROM{cursor_name}")
1478-
1479-
ifnotinitial_rows:
1480-
return0
1481-
1482-
queue.put(initial_rows)
1483-
1484-
iflen(initial_rows)==2000:
1485-
query_thread=threading.Thread(target=process_rows,args=(queue,con,cursor_name))
1486-
query_thread.start()
1487-
else:
1488-
queue.put(None)
1440+
con=self.connect(dbname=dbname)
1441+
1442+
curname="cur_"+str(random.randint(0,2**48))
1443+
1444+
con.execute("""
1445+
DECLARE %s NO SCROLL CURSOR FOR
1446+
SELECT t::text FROM %s as t
1447+
"""% (curname,table))
1448+
1449+
que=Queue(maxsize=50)
1450+
sum=0
1451+
1452+
rows=con.execute("FETCH FORWARD 2000 FROM %s"%curname)
1453+
ifnotrows:
1454+
return0
1455+
que.put(rows)
1456+
1457+
th=None
1458+
iflen(rows)==2000:
1459+
defquerier():
1460+
try:
1461+
whileTrue:
1462+
rows=con.execute("FETCH FORWARD 2000 FROM %s"%curname)
1463+
ifnotrows:
1464+
break
1465+
que.put(rows)
1466+
exceptExceptionase:
1467+
que.put(e)
1468+
else:
1469+
que.put(None)
14891470

1490-
whileTrue:
1491-
rows=queue.get()
1492-
ifrowsisNone:
1493-
break
1494-
ifisinstance(rows,Exception):
1495-
raiserows
1471+
th=threading.Thread(target=querier)
1472+
th.start()
1473+
else:
1474+
que.put(None)
14961475

1497-
forrowinrows:
1498-
checksum+=hash(row[0])
1476+
whileTrue:
1477+
rows=que.get()
1478+
ifrowsisNone:
1479+
break
1480+
ifisinstance(rows,Exception):
1481+
raiserows
1482+
# hash uses SipHash since Python3.4, therefore it is good enough
1483+
forrowinrows:
1484+
sum+=hash(row[0])
14991485

1500-
ifquery_threadisnotNone:
1501-
query_thread.join()
1486+
ifthisnotNone:
1487+
th.join()
15021488

1503-
con.execute(f"CLOSE{cursor_name}; ROLLBACK;")
1489+
con.execute("CLOSE%s; ROLLBACK;"%curname)
15041490

1505-
returnchecksum
1491+
con.close()
1492+
returnsum
15061493

15071494
defpgbench_table_checksums(self,dbname="postgres",
1508-
pgbench_tables=('pgbench_branches',
1509-
'pgbench_tellers',
1510-
'pgbench_accounts',
1511-
'pgbench_history')
1495+
pgbench_tables=('pgbench_branches',
1496+
'pgbench_tellers',
1497+
'pgbench_accounts',
1498+
'pgbench_history')
15121499
):
1513-
"""
1514-
Calculate the checksums of the specified pgbench tables using table_checksum method.
1515-
1516-
Args:
1517-
dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres".
1518-
pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the
1519-
checksums should be calculated. Defaults to a tuple containing the
1520-
names of the default pgbench tables.
1521-
1522-
Returns:
1523-
set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum.
1524-
"""
15251500
return {(table,self.table_checksum(table,dbname))
15261501
fortableinpgbench_tables}
15271502

@@ -1589,10 +1564,6 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):
15891564

15901565

15911566
classNodeApp:
1592-
"""
1593-
Functions that can be moved to testgres.PostgresNode
1594-
We use these functions in ProbackupController and need tp move them in some visible place
1595-
"""
15961567

15971568
def__init__(self,test_path,nodes_to_cleanup):
15981569
self.test_path=test_path
@@ -1605,7 +1576,7 @@ def make_empty(
16051576
shutil.rmtree(real_base_dir,ignore_errors=True)
16061577
os.makedirs(real_base_dir)
16071578

1608-
node=PostgresNodeExtended(base_dir=real_base_dir)
1579+
node=PostgresNode(base_dir=real_base_dir)
16091580
node.should_rm_dirs=True
16101581
self.nodes_to_cleanup.append(node)
16111582

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp