Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Pbckp 152 multihost#78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
demonolock merged 27 commits intomasterfromPBCKP-152-multihost
Aug 4, 2023
Merged
Changes from1 commit
Commits
Show all changes
27 commits
Select commitHold shift + click to select a range
02c3375
PBCKP-137 update node.py
Apr 10, 2023
1512afd
PBCKP-137 up version 1.8.6
Apr 11, 2023
0d62e0e
PBCKP-137 update node.py
Apr 11, 2023
8be1b3a
PBCKP-137 update node
Apr 17, 2023
51f05de
PBCKP-152 change local function on execution by ssh
May 2, 2023
f131088
PBCKP-152 merge master
May 2, 2023
4f38bd5
PBCKP-152 multihost
May 3, 2023
0da2ee2
merge master
Jun 6, 2023
2bc17f0
testgres from PBCKP-152-multihost
Jun 6, 2023
f9b6bdb
PBCKP-152
Jun 10, 2023
ac77ef7
PBCKP-152 use black for formatting
Jun 11, 2023
b048041
PBCKP-152 fix failed tests
Jun 12, 2023
e098b97
PBCKP-152 fix failed tests
Jun 13, 2023
1c405ef
PBCKP-152 add tests for remote_ops.py
Jun 14, 2023
8c373e6
PBCKP-152 add testgres tests for remote node
Jun 14, 2023
72e6d5d
PBCKP-152 fixed test_simple and test_remote
Jun 17, 2023
2c2d2c5
PBCKP-588 test fix test_restore_after_failover
Jun 22, 2023
1b4f74a
PBCKP-588 test partially fixed test_simple_remote.py 41/43
Jun 22, 2023
2e916df
PBCKP-588 fixes after review
Jun 25, 2023
0528541
PBCKP-588 fixes after review - add ConnectionParams
Jun 26, 2023
089ab9b
PBCKP-588 fixes after review - remove f-strings
Jun 26, 2023
190d084
PBCKP-588 fixes after review - replace subprocess.run on subprocess.P…
Jun 27, 2023
0c26f77
PBCKP-588 fix failed tests - psql, set_auto_conf
Jun 28, 2023
0796bc4
PBCKP-152 - test_restore_target_time cut
Jul 26, 2023
0f14034
PBCKP-152 - node set listen address
Jul 28, 2023
12aa7ba
Add info about remote mode in README.md
Aug 1, 2023
4e7f4b0
merge master
Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
PBCKP-137 update node.py
  • Loading branch information
v.shepard committedApr 15, 2023
commit0d62e0e6881a8cd18e9acd58507fcae74ce71ad9
163 changes: 67 additions & 96 deletionstestgres/node.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -12,6 +12,7 @@
import subprocess
import time


try:
from collections.abc import Iterable
except ImportError:
Expand DownExpand Up@@ -104,6 +105,7 @@
InternalError = pglib.InternalError
ProgrammingError = pglib.ProgrammingError
OperationalError = pglib.OperationalError
DatabaseError = pglib.DatabaseError


class ProcessProxy(object):
Expand DownExpand Up@@ -651,13 +653,15 @@ def get_control_data(self):

return out_dict

def slow_start(self, replica=False):
def slow_start(self, replica=False, dbname='template1', username='dev'):
"""
Starts the PostgreSQL instance and then polls the instance
until it reaches the expected state (primary or replica). The state is checked
using the pg_is_in_recovery() function.

Args:
dbname:
username:
replica: If True, waits for the instance to be in recovery (i.e., replica mode).
If False, waits for the instance to be in primary mode. Default is False.
"""
Expand All@@ -668,14 +672,15 @@ def slow_start(self, replica=False):
else:
query = 'SELECT not pg_is_in_recovery()'
# Call poll_query_until until the expected value is returned
self.poll_query_until(
dbname="template1",
query=query,
suppress={pglib.InternalError,
QueryException,
pglib.ProgrammingError,
pglib.OperationalError})

self.poll_query_until(query=query,
expected=False,
dbname=dbname,
username=username,
suppress={InternalError,
QueryException,
ProgrammingError,
OperationalError,
DatabaseError})

def start(self, params=[], wait=True):
"""
Expand DownExpand Up@@ -1432,96 +1437,66 @@ def connect(self,
autocommit=autocommit) # yapf: disable

def table_checksum(self, table, dbname="postgres"):
"""
Calculate the checksum of a table by hashing its rows.

The function fetches rows from the table in chunks and calculates the checksum
by summing the hash values of each row. The function uses a separate thread
to fetch rows when there are more than 2000 rows in the table.

Args:
table (str): The name of the table for which the checksum should be calculated.
dbname (str, optional): The name of the database where the table is located. Defaults to "postgres".

Returns:
int: The calculated checksum of the table.
"""

def fetch_rows(con, cursor_name):
while True:
rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}")
if not rows:
break
yield rows

def process_rows(queue, con, cursor_name):
try:
for rows in fetch_rows(con, cursor_name):
queue.put(rows)
except Exception as e:
queue.put(e)
else:
queue.put(None)

cursor_name = f"cur_{random.randint(0, 2 ** 48)}"
checksum = 0
query_thread = None

with self.connect(dbname=dbname) as con:
con.execute(f"""
DECLARE {cursor_name} NO SCROLL CURSOR FOR
SELECT t::text FROM {table} as t
""")

queue = Queue(maxsize=50)
initial_rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}")

if not initial_rows:
return 0

queue.put(initial_rows)

if len(initial_rows) == 2000:
query_thread = threading.Thread(target=process_rows, args=(queue, con, cursor_name))
query_thread.start()
else:
queue.put(None)
con = self.connect(dbname=dbname)

curname = "cur_" + str(random.randint(0, 2 ** 48))

con.execute("""
DECLARE %s NO SCROLL CURSOR FOR
SELECT t::text FROM %s as t
""" % (curname, table))

que = Queue(maxsize=50)
sum = 0

rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname)
if not rows:
return 0
que.put(rows)

th = None
if len(rows) == 2000:
def querier():
try:
while True:
rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname)
if not rows:
break
que.put(rows)
except Exception as e:
que.put(e)
else:
que.put(None)

while True:
rows = queue.get()
if rows is None:
break
if isinstance(rows, Exception):
raise rows
th = threading.Thread(target=querier)
th.start()
else:
que.put(None)

for row in rows:
checksum += hash(row[0])
while True:
rows = que.get()
if rows is None:
break
if isinstance(rows, Exception):
raise rows
# hash uses SipHash since Python3.4, therefore it is good enough
for row in rows:
sum += hash(row[0])

ifquery_thread is not None:
query_thread.join()
ifth is not None:
th.join()

con.execute(f"CLOSE{cursor_name}; ROLLBACK;")
con.execute("CLOSE%s; ROLLBACK;" % curname)

return checksum
con.close()
return sum

def pgbench_table_checksums(self, dbname="postgres",
pgbench_tables=('pgbench_branches',
'pgbench_tellers',
'pgbench_accounts',
'pgbench_history')
pgbench_tables =('pgbench_branches',
'pgbench_tellers',
'pgbench_accounts',
'pgbench_history')
):
"""
Calculate the checksums of the specified pgbench tables using table_checksum method.

Args:
dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres".
pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the
checksums should be calculated. Defaults to a tuple containing the
names of the default pgbench tables.

Returns:
set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum.
"""
return {(table, self.table_checksum(table, dbname))
for table in pgbench_tables}

Expand DownExpand Up@@ -1589,10 +1564,6 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):


class NodeApp:
"""
Functions that can be moved to testgres.PostgresNode
We use these functions in ProbackupController and need tp move them in some visible place
"""

def __init__(self, test_path, nodes_to_cleanup):
self.test_path = test_path
Expand All@@ -1605,7 +1576,7 @@ def make_empty(
shutil.rmtree(real_base_dir, ignore_errors=True)
os.makedirs(real_base_dir)

node =PostgresNodeExtended(base_dir=real_base_dir)
node =PostgresNode(base_dir=real_base_dir)
node.should_rm_dirs = True
self.nodes_to_cleanup.append(node)

Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp