Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Port numbers management is improved (#164)#165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 82 additions & 17 deletionstestgres/node.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -83,13 +83,13 @@

from .standby import First

from . import utils

from .utils import \
PgVer, \
eprint, \
get_bin_path, \
get_pg_version, \
reserve_port, \
release_port, \
execute_utility, \
options_string, \
clean_on_error
Expand DownExpand Up@@ -128,6 +128,9 @@ def __repr__(self):


class PostgresNode(object):
# a max number of node start attempts
_C_MAX_START_ATEMPTS = 5

def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionParams = ConnectionParams(), bin_dir=None, prefix=None):
"""
PostgresNode constructor.
Expand DownExpand Up@@ -158,7 +161,7 @@ def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionP
self.os_ops = LocalOperations(conn_params)

self.host = self.os_ops.host
self.port = port or reserve_port()
self.port = port orutils.reserve_port()

self.ssh_key = self.os_ops.ssh_key

Expand DownExpand Up@@ -471,6 +474,28 @@ def _collect_special_files(self):

return result

def _collect_log_files(self):
# dictionary of log files + size in bytes

files = [
self.pg_log_file
] # yapf: disable

result = {}

for f in files:
# skip missing files
if not self.os_ops.path_exists(f):
continue

file_size = self.os_ops.get_file_size(f)
assert type(file_size) == int # noqa: E721
assert file_size >= 0

result[f] = file_size

return result

def init(self, initdb_params=None, cached=True, **kwargs):
"""
Perform initdb for this node.
Expand DownExpand Up@@ -722,6 +747,22 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
OperationalError},
max_attempts=max_attempts)

def _detect_port_conflict(self, log_files0, log_files1):
assert type(log_files0) == dict # noqa: E721
assert type(log_files1) == dict # noqa: E721

for file in log_files1.keys():
read_pos = 0

if file in log_files0.keys():
read_pos = log_files0[file] # the previous size

file_content = self.os_ops.read_binary(file, read_pos)
file_content_s = file_content.decode()
if 'Is another postmaster already running on port' in file_content_s:
return True
return False

def start(self, params=[], wait=True):
"""
Starts the PostgreSQL node using pg_ctl if node has not been started.
Expand All@@ -736,6 +777,9 @@ def start(self, params=[], wait=True):
Returns:
This instance of :class:`.PostgresNode`.
"""

assert __class__._C_MAX_START_ATEMPTS > 1

if self.is_started:
return self

Expand All@@ -745,27 +789,46 @@ def start(self, params=[], wait=True):
"-w" if wait else '-W', # --wait or --no-wait
"start"] + params # yapf: disable

startup_retries = 5
log_files0 = self._collect_log_files()
assert type(log_files0) == dict # noqa: E721

nAttempt = 0
timeout = 1
while True:
assert nAttempt >= 0
assert nAttempt < __class__._C_MAX_START_ATEMPTS
nAttempt += 1
try:
exit_status, out, error = execute_utility(_params, self.utils_log_file, verbose=True)
if error and 'does not exist' in error:
raise Exception
except Exception as e:
files = self._collect_special_files()
if any(len(file) > 1 and 'Is another postmaster already '
'running on port' in file[1].decode() for
file in files):
logging.warning("Detected an issue with connecting to port {0}. "
"Trying another port after a 5-second sleep...".format(self.port))
self.port = reserve_port()
options = {'port': str(self.port)}
self.set_auto_conf(options)
startup_retries -= 1
time.sleep(5)
continue
assert nAttempt > 0
assert nAttempt <= __class__._C_MAX_START_ATEMPTS
if self._should_free_port and nAttempt < __class__._C_MAX_START_ATEMPTS:
log_files1 = self._collect_log_files()
if self._detect_port_conflict(log_files0, log_files1):
log_files0 = log_files1
logging.warning(
"Detected an issue with connecting to port {0}. "
"Trying another port after a {1}-second sleep...".format(self.port, timeout)
)
time.sleep(timeout)
timeout = min(2 * timeout, 5)
cur_port = self.port
new_port = utils.reserve_port() # can raise
try:
options = {'port': str(new_port)}
self.set_auto_conf(options)
except: # noqa: E722
utils.release_port(new_port)
raise
self.port = new_port
utils.release_port(cur_port)
continue

msg = 'Cannot start node'
files = self._collect_special_files()
raise_from(StartNodeException(msg, files), e)
break
self._maybe_start_logger()
Expand DownExpand Up@@ -930,8 +993,10 @@ def free_port(self):
"""

if self._should_free_port:
port = self.port
self._should_free_port = False
release_port(self.port)
self.port = None
utils.release_port(port)

def cleanup(self, max_attempts=3, full=False):
"""
Expand Down
16 changes: 16 additions & 0 deletionstestgres/operations/local_ops.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -308,12 +308,28 @@ def readlines(self, filename, num_lines=0, binary=False, encoding=None):
buffers * max(2, int(num_lines / max(cur_lines, 1)))
) # Adjust buffer size

def read_binary(self, filename, start_pos):
assert type(filename) == str # noqa: E721
assert type(start_pos) == int # noqa: E721
assert start_pos >= 0

with open(filename, 'rb') as file: # open in a binary mode
file.seek(start_pos, os.SEEK_SET)
r = file.read()
assert type(r) == bytes # noqa: E721
return r

def isfile(self, remote_file):
return os.path.isfile(remote_file)

def isdir(self, dirname):
return os.path.isdir(dirname)

def get_file_size(self, filename):
assert filename is not None
assert type(filename) == str # noqa: E721
return os.path.getsize(filename)

def remove_file(self, filename):
return os.remove(filename)

Expand Down
9 changes: 9 additions & 0 deletionstestgres/operations/os_ops.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -98,9 +98,18 @@ def read(self, filename, encoding, binary):
def readlines(self, filename):
raise NotImplementedError()

def read_binary(self, filename, start_pos):
assert type(filename) == str # noqa: E721
assert type(start_pos) == int # noqa: E721
assert start_pos >= 0
raise NotImplementedError()

def isfile(self, remote_file):
raise NotImplementedError()

def get_file_size(self, filename):
raise NotImplementedError()

# Processes control
def kill(self, pid, signal):
# Kill the process
Expand Down
83 changes: 83 additions & 0 deletionstestgres/operations/remote_ops.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -340,6 +340,16 @@ def readlines(self, filename, num_lines=0, binary=False, encoding=None):

return lines

def read_binary(self, filename, start_pos):
assert type(filename) == str # noqa: E721
assert type(start_pos) == int # noqa: E721
assert start_pos >= 0

cmd = "tail -c +{} {}".format(start_pos + 1, __class__._escape_path(filename))
r = self.exec_command(cmd)
assert type(r) == bytes # noqa: E721
return r

def isfile(self, remote_file):
stdout = self.exec_command("test -f {}; echo $?".format(remote_file))
result = int(stdout.strip())
Expand All@@ -350,6 +360,70 @@ def isdir(self, dirname):
response = self.exec_command(cmd)
return response.strip() == b"True"

def get_file_size(self, filename):
C_ERR_SRC = "RemoteOpertions::get_file_size"

assert filename is not None
assert type(filename) == str # noqa: E721
cmd = "du -b " + __class__._escape_path(filename)

s = self.exec_command(cmd, encoding=get_default_encoding())
assert type(s) == str # noqa: E721

if len(s) == 0:
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned an empty string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#001",
filename
)
)

i = 0

while i < len(s) and s[i].isdigit():
assert s[i] >= '0'
assert s[i] <= '9'
i += 1

if i == 0:
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#002",
filename
)
)

if i == len(s):
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#003",
filename
)
)

if not s[i].isspace():
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#004",
filename
)
)

r = 0

for i2 in range(0, i):
ch = s[i2]
assert ch >= '0'
assert ch <= '9'
# Here is needed to check overflow or that it is a human-valid result?
r = (r * 10) + ord(ch) - ord('0')

return r

def remove_file(self, filename):
cmd = "rm {}".format(filename)
return self.exec_command(cmd)
Expand DownExpand Up@@ -386,6 +460,15 @@ def db_connect(self, dbname, user, password=None, host="localhost", port=5432):
)
return conn

def _escape_path(path):
assert type(path) == str # noqa: E721
assert path != "" # Ok?

r = "'"
r += path
r += "'"
return r


def normalize_error(error):
if isinstance(error, bytes):
Expand Down
8 changes: 6 additions & 2 deletionstestgres/utils.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -34,7 +34,7 @@ def __init__(self, version: str) -> None:
super().__init__(version)


defreserve_port():
definternal__reserve_port():
"""
Generate a new port and add it to 'bound_ports'.
"""
Expand All@@ -45,14 +45,18 @@ def reserve_port():
return port


defrelease_port(port):
definternal__release_port(port):
"""
Free port provided by reserve_port().
"""

bound_ports.discard(port)


reserve_port = internal__reserve_port
release_port = internal__release_port


def execute_utility(args, logfile=None, verbose=False):
"""
Execute utility (pg_ctl, pg_dump etc).
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp