Expand Up @@ -12,6 +12,7 @@ import subprocess import time try: from collections.abc import Iterable except ImportError: Expand Down Expand Up @@ -104,6 +105,7 @@ InternalError = pglib.InternalError ProgrammingError = pglib.ProgrammingError OperationalError = pglib.OperationalError DatabaseError = pglib.DatabaseError class ProcessProxy(object): Expand Down Expand Up @@ -651,13 +653,15 @@ def get_control_data(self): return out_dict def slow_start(self, replica=False): def slow_start(self, replica=False, dbname='template1', username='dev' ): """ Starts the PostgreSQL instance and then polls the instance until it reaches the expected state (primary or replica). The state is checked using the pg_is_in_recovery() function. Args: dbname: username: replica: If True, waits for the instance to be in recovery (i.e., replica mode). If False, waits for the instance to be in primary mode. Default is False. """ Expand All @@ -668,14 +672,15 @@ def slow_start(self, replica=False): else: query = 'SELECT not pg_is_in_recovery()' # Call poll_query_until until the expected value is returned self.poll_query_until( dbname="template1", query=query, suppress={pglib.InternalError, QueryException, pglib.ProgrammingError, pglib.OperationalError}) self.poll_query_until(query=query, expected=False, dbname=dbname, username=username, suppress={InternalError, QueryException, ProgrammingError, OperationalError, DatabaseError}) def start(self, params=[], wait=True): """ Expand Down Expand Up @@ -1432,96 +1437,66 @@ def connect(self, autocommit=autocommit) # yapf: disable def table_checksum(self, table, dbname="postgres"): """ Calculate the checksum of a table by hashing its rows. The function fetches rows from the table in chunks and calculates the checksum by summing the hash values of each row. The function uses a separate thread to fetch rows when there are more than 2000 rows in the table. Args: table (str): The name of the table for which the checksum should be calculated. dbname (str, optional): The name of the database where the table is located. Defaults to "postgres". Returns: int: The calculated checksum of the table. """ def fetch_rows(con, cursor_name): while True: rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}") if not rows: break yield rows def process_rows(queue, con, cursor_name): try: for rows in fetch_rows(con, cursor_name): queue.put(rows) except Exception as e: queue.put(e) else: queue.put(None) cursor_name = f"cur_{random.randint(0, 2 ** 48)}" checksum = 0 query_thread = None with self.connect(dbname=dbname) as con: con.execute(f""" DECLARE {cursor_name} NO SCROLL CURSOR FOR SELECT t::text FROM {table} as t """) queue = Queue(maxsize=50) initial_rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}") if not initial_rows: return 0 queue.put(initial_rows) if len(initial_rows) == 2000: query_thread = threading.Thread(target=process_rows, args=(queue, con, cursor_name)) query_thread.start() else: queue.put(None) con = self.connect(dbname=dbname) curname = "cur_" + str(random.randint(0, 2 ** 48)) con.execute(""" DECLARE %s NO SCROLL CURSOR FOR SELECT t::text FROM %s as t """ % (curname, table)) que = Queue(maxsize=50) sum = 0 rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname) if not rows: return 0 que.put(rows) th = None if len(rows) == 2000: def querier(): try: while True: rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname) if not rows: break que.put(rows) except Exception as e: que.put(e) else: que.put(None) while True: rows = queue.get() if rows is None: break if isinstance(rows, Exception): raise rows th = threading.Thread(target=querier) th.start() else: que.put(None) for row in rows: checksum += hash(row[0]) while True: rows = que.get() if rows is None: break if isinstance(rows, Exception): raise rows # hash uses SipHash since Python3.4, therefore it is good enough for row in rows: sum += hash(row[0]) ifquery_thread is not None: query_thread .join() ifth is not None: th .join() con.execute(f "CLOSE{cursor_name} ; ROLLBACK;") con.execute("CLOSE%s ; ROLLBACK;" % curname ) return checksum con.close() return sum def pgbench_table_checksums(self, dbname="postgres", pgbench_tables= ('pgbench_branches', 'pgbench_tellers', 'pgbench_accounts', 'pgbench_history') pgbench_tables = ('pgbench_branches', 'pgbench_tellers', 'pgbench_accounts', 'pgbench_history') ): """ Calculate the checksums of the specified pgbench tables using table_checksum method. Args: dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres". pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the checksums should be calculated. Defaults to a tuple containing the names of the default pgbench tables. Returns: set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum. """ return {(table, self.table_checksum(table, dbname)) for table in pgbench_tables} Expand Down Expand Up @@ -1589,10 +1564,6 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}): class NodeApp: """ Functions that can be moved to testgres.PostgresNode We use these functions in ProbackupController and need tp move them in some visible place """ def __init__(self, test_path, nodes_to_cleanup): self.test_path = test_path Expand All @@ -1605,7 +1576,7 @@ def make_empty( shutil.rmtree(real_base_dir, ignore_errors=True) os.makedirs(real_base_dir) node =PostgresNodeExtended (base_dir=real_base_dir) node =PostgresNode (base_dir=real_base_dir) node.should_rm_dirs = True self.nodes_to_cleanup.append(node) Expand Down