1212import subprocess
1313import time
1414
15+
1516try :
1617from collections .abc import Iterable
1718except ImportError :
104105InternalError = pglib .InternalError
105106ProgrammingError = pglib .ProgrammingError
106107OperationalError = pglib .OperationalError
108+ DatabaseError = pglib .DatabaseError
107109
108110
109111class ProcessProxy (object ):
@@ -651,13 +653,15 @@ def get_control_data(self):
651653
652654return out_dict
653655
654- def slow_start (self ,replica = False ):
656+ def slow_start (self ,replica = False , dbname = 'template1' , username = 'dev' ):
655657"""
656658 Starts the PostgreSQL instance and then polls the instance
657659 until it reaches the expected state (primary or replica). The state is checked
658660 using the pg_is_in_recovery() function.
659661
660662 Args:
663+ dbname:
664+ username:
661665 replica: If True, waits for the instance to be in recovery (i.e., replica mode).
662666 If False, waits for the instance to be in primary mode. Default is False.
663667 """
@@ -668,14 +672,15 @@ def slow_start(self, replica=False):
668672else :
669673query = 'SELECT not pg_is_in_recovery()'
670674# Call poll_query_until until the expected value is returned
671- self .poll_query_until (
672- dbname = "template1" ,
673- query = query ,
674- suppress = {pglib .InternalError ,
675- QueryException ,
676- pglib .ProgrammingError ,
677- pglib .OperationalError })
678-
675+ self .poll_query_until (query = query ,
676+ expected = False ,
677+ dbname = dbname ,
678+ username = username ,
679+ suppress = {InternalError ,
680+ QueryException ,
681+ ProgrammingError ,
682+ OperationalError ,
683+ DatabaseError })
679684
680685def start (self ,params = [],wait = True ):
681686"""
@@ -1432,96 +1437,66 @@ def connect(self,
14321437autocommit = autocommit )# yapf: disable
14331438
14341439def table_checksum (self ,table ,dbname = "postgres" ):
1435- """
1436- Calculate the checksum of a table by hashing its rows.
1437-
1438- The function fetches rows from the table in chunks and calculates the checksum
1439- by summing the hash values of each row. The function uses a separate thread
1440- to fetch rows when there are more than 2000 rows in the table.
1441-
1442- Args:
1443- table (str): The name of the table for which the checksum should be calculated.
1444- dbname (str, optional): The name of the database where the table is located. Defaults to "postgres".
1445-
1446- Returns:
1447- int: The calculated checksum of the table.
1448- """
1449-
1450- def fetch_rows (con ,cursor_name ):
1451- while True :
1452- rows = con .execute (f"FETCH FORWARD 2000 FROM{ cursor_name } " )
1453- if not rows :
1454- break
1455- yield rows
1456-
1457- def process_rows (queue ,con ,cursor_name ):
1458- try :
1459- for rows in fetch_rows (con ,cursor_name ):
1460- queue .put (rows )
1461- except Exception as e :
1462- queue .put (e )
1463- else :
1464- queue .put (None )
1465-
1466- cursor_name = f"cur_{ random .randint (0 ,2 ** 48 )} "
1467- checksum = 0
1468- query_thread = None
1469-
1470- with self .connect (dbname = dbname )as con :
1471- con .execute (f"""
1472- DECLARE{ cursor_name } NO SCROLL CURSOR FOR
1473- SELECT t::text FROM{ table } as t
1474- """ )
1475-
1476- queue = Queue (maxsize = 50 )
1477- initial_rows = con .execute (f"FETCH FORWARD 2000 FROM{ cursor_name } " )
1478-
1479- if not initial_rows :
1480- return 0
1481-
1482- queue .put (initial_rows )
1483-
1484- if len (initial_rows )== 2000 :
1485- query_thread = threading .Thread (target = process_rows ,args = (queue ,con ,cursor_name ))
1486- query_thread .start ()
1487- else :
1488- queue .put (None )
1440+ con = self .connect (dbname = dbname )
1441+
1442+ curname = "cur_" + str (random .randint (0 ,2 ** 48 ))
1443+
1444+ con .execute ("""
1445+ DECLARE %s NO SCROLL CURSOR FOR
1446+ SELECT t::text FROM %s as t
1447+ """ % (curname ,table ))
1448+
1449+ que = Queue (maxsize = 50 )
1450+ sum = 0
1451+
1452+ rows = con .execute ("FETCH FORWARD 2000 FROM %s" % curname )
1453+ if not rows :
1454+ return 0
1455+ que .put (rows )
1456+
1457+ th = None
1458+ if len (rows )== 2000 :
1459+ def querier ():
1460+ try :
1461+ while True :
1462+ rows = con .execute ("FETCH FORWARD 2000 FROM %s" % curname )
1463+ if not rows :
1464+ break
1465+ que .put (rows )
1466+ except Exception as e :
1467+ que .put (e )
1468+ else :
1469+ que .put (None )
14891470
1490- while True :
1491- rows = queue .get ()
1492- if rows is None :
1493- break
1494- if isinstance (rows ,Exception ):
1495- raise rows
1471+ th = threading .Thread (target = querier )
1472+ th .start ()
1473+ else :
1474+ que .put (None )
14961475
1497- for row in rows :
1498- checksum += hash (row [0 ])
1476+ while True :
1477+ rows = que .get ()
1478+ if rows is None :
1479+ break
1480+ if isinstance (rows ,Exception ):
1481+ raise rows
1482+ # hash uses SipHash since Python3.4, therefore it is good enough
1483+ for row in rows :
1484+ sum += hash (row [0 ])
14991485
1500- if query_thread is not None :
1501- query_thread .join ()
1486+ if th is not None :
1487+ th .join ()
15021488
1503- con .execute (f "CLOSE{ cursor_name } ; ROLLBACK;" )
1489+ con .execute ("CLOSE%s ; ROLLBACK;" % curname )
15041490
1505- return checksum
1491+ con .close ()
1492+ return sum
15061493
15071494def pgbench_table_checksums (self ,dbname = "postgres" ,
1508- pgbench_tables = ('pgbench_branches' ,
1509- 'pgbench_tellers' ,
1510- 'pgbench_accounts' ,
1511- 'pgbench_history' )
1495+ pgbench_tables = ('pgbench_branches' ,
1496+ 'pgbench_tellers' ,
1497+ 'pgbench_accounts' ,
1498+ 'pgbench_history' )
15121499 ):
1513- """
1514- Calculate the checksums of the specified pgbench tables using table_checksum method.
1515-
1516- Args:
1517- dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres".
1518- pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the
1519- checksums should be calculated. Defaults to a tuple containing the
1520- names of the default pgbench tables.
1521-
1522- Returns:
1523- set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum.
1524- """
15251500return {(table ,self .table_checksum (table ,dbname ))
15261501for table in pgbench_tables }
15271502
@@ -1589,10 +1564,6 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):
15891564
15901565
15911566class NodeApp :
1592- """
1593- Functions that can be moved to testgres.PostgresNode
1594- We use these functions in ProbackupController and need tp move them in some visible place
1595- """
15961567
15971568def __init__ (self ,test_path ,nodes_to_cleanup ):
15981569self .test_path = test_path
@@ -1605,7 +1576,7 @@ def make_empty(
16051576shutil .rmtree (real_base_dir ,ignore_errors = True )
16061577os .makedirs (real_base_dir )
16071578
1608- node = PostgresNodeExtended (base_dir = real_base_dir )
1579+ node = PostgresNode (base_dir = real_base_dir )
16091580node .should_rm_dirs = True
16101581self .nodes_to_cleanup .append (node )
16111582