Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[#247] PortManager__Generic uses lock-dirs for reserved ports#255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
dmitry-lipetsk wants to merge24 commits intopostgrespro:master
base:master
Choose a base branch
Loading
fromdmitry-lipetsk:master-fix247--v001
Draft
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
24 commits
Select commitHold shift + click to select a range
99e645e
[#247] PortManager__Generic uses lock-dirs for reserved ports
dmitry-lipetskMay 6, 2025
c6f4b4d
PortManager__Generic is refactored
dmitry-lipetskMay 7, 2025
f085b70
[#256] A used port range is [1024 ... 65535]
dmitry-lipetskMay 7, 2025
c9b4bbf
PortManager__Generic is refactored [consts, asserts]
dmitry-lipetskMay 7, 2025
862c79f
Merge branch 'master' into master-fix247--v001
dmitry-lipetskMay 7, 2025
b5e6f25
Merge branch 'master' into master-fix247--v001
dmitry-lipetskMay 7, 2025
be3cc11
Merge branch 'master' into master-fix247--v001
dmitry-lipetskMay 12, 2025
cc8333c
Merge branch 'master' into master-fix247--v001
dmitry-lipetskMay 12, 2025
d15ecdb
PortManager__Generic sends debug messages about its operations.
dmitry-lipetskMay 29, 2025
8e0869d
[attention] OsOperations::create_lock_fs_obj is added
dmitry-lipetskMay 31, 2025
82b46b3
Code style is fixed
dmitry-lipetskJun 1, 2025
a188a63
Merge branch 'master' into master-fix247--v001
dmitry-lipetskJun 22, 2025
597f699
PortManager__Generic is synchronized with master
dmitry-lipetskJun 24, 2025
b34837e
[FIX] PortManager__Generic::release_port sends a debug message under …
dmitry-lipetskJun 24, 2025
0648926
PortManager__Generic::reserve_port is updated (reordered)
dmitry-lipetskJun 24, 2025
8c5e340
PortManager__Generic::helper__send_debug_msg is corrected (assert)
dmitry-lipetskJun 24, 2025
ea25215
PortManager__Generic::reserve_port is updated (comment)
dmitry-lipetskJun 24, 2025
aa677d2
PortManager__Generic::__init__ is updated
dmitry-lipetskJun 24, 2025
1dfac90
Merge remote-tracking branch 'pgpro/master' into master-fix247--v001
dmitry-lipetskJun 24, 2025
8aa790a
Merge branch 'master' into master-fix247--v001
dmitry-lipetskJun 26, 2025
e63c603
Merge branch 'master' into master-fix247--v001
dmitry-lipetskJun 27, 2025
0773f14
Merge remote-tracking branch 'pgpro/master' into master-fix247--v001
dmitry-lipetskJul 6, 2025
6c605d8
Merge commit '3dad0b8f86fc905d47bfc285f02757216320f874' into master-f…
dmitry-lipetskJul 8, 2025
9735fe7
Merge commit '9b0c0a4a10285f6aaa7ed9f987327f026e96489e' into master-f…
dmitry-lipetskJul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletionstestgres/__init__.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -40,7 +40,6 @@
from .utils import \
reserve_port, \
release_port, \
bound_ports, \
get_bin_path, \
get_pg_config, \
get_pg_version
Expand All@@ -52,6 +51,7 @@
from .config import testgres_config

from .operations.os_ops import OsOperations, ConnectionParams
from .operations.os_ops import OsLockObj
from .operations.local_ops import LocalOperations
from .operations.remote_ops import RemoteOperations

Expand All@@ -66,7 +66,8 @@
NodeApp.__name__,
PostgresNode.__name__,
PortManager.__name__,
"reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version",
"reserve_port", "release_port", "get_bin_path", "get_pg_config", "get_pg_version",
"First", "Any",
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams"
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams",
"OsLockObj",
]
4 changes: 4 additions & 0 deletionstestgres/consts.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -10,6 +10,10 @@
TMP_CACHE = 'tgsc_'
TMP_BACKUP = 'tgsb_'

TMP_TESTGRES = "testgres"

TMP_TESTGRES_PORTS = TMP_TESTGRES + "/ports"

# path to control file
XLOG_CONTROL_FILE = "global/pg_control"

Expand Down
58 changes: 51 additions & 7 deletionstestgres/impl/port_manager__generic.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
from ..operations.os_ops import OsOperations
from ..operations.os_ops import OsLockObj

from ..port_manager import PortManager
from ..exceptions import PortForException
from .. import consts

import os
import threading
import random
import typing
Expand All@@ -17,7 +20,9 @@ class PortManager__Generic(PortManager):
_guard: object
# TODO: is there better to use bitmap fot _available_ports?
_available_ports: typing.Set[int]
_reserved_ports: typing.Set[int]
_reserved_ports: typing.Dict[int, OsLockObj]

_lock_dir: str

def __init__(self, os_ops: OsOperations):
assert __class__._C_MIN_PORT_NUMBER <= __class__._C_MAX_PORT_NUMBER
Expand All@@ -33,15 +38,28 @@ def __init__(self, os_ops: OsOperations):
assert len(self._available_ports) == (
(__class__._C_MAX_PORT_NUMBER - __class__._C_MIN_PORT_NUMBER) + 1
)
self._reserved_ports = set()
self._reserved_ports = dict()
self._lock_dir = None
return

def reserve_port(self) -> int:
assert self._guard is not None
assert type(self._available_ports) == set # noqa: E721t
assert type(self._reserved_ports) == set # noqa: E721
assert type(self._available_ports) == set # noqa: E721
assert type(self._reserved_ports) == dict # noqa: E721
assert isinstance(self._os_ops, OsOperations)

with self._guard:
if self._lock_dir is None:
temp_dir = self._os_ops.get_tempdir()
assert type(temp_dir) == str # noqa: E721
lock_dir = os.path.join(temp_dir, consts.TMP_TESTGRES_PORTS)
assert type(lock_dir) == str # noqa: E721
self._os_ops.makedirs(lock_dir)
self._lock_dir = lock_dir

assert self._lock_dir is not None
assert type(self._lock_dir) == str # noqa: E721

t = tuple(self._available_ports)
assert len(t) == len(self._available_ports)
sampled_ports = random.sample(t, min(len(t), 100))
Expand All@@ -58,7 +76,22 @@ def reserve_port(self) -> int:
if not self._os_ops.is_port_free(port):
continue

self._reserved_ports.add(port)
try:
lock_path = self.helper__make_lock_path(port)
lock_obj = self._os_ops.create_lock_fs_obj(lock_path) # raise
except: # noqa: 722
continue

assert isinstance(lock_obj, OsLockObj)
assert self._os_ops.path_exists(lock_path)

try:
self._reserved_ports[port] = lock_obj
except: # noqa: 722
assert not (port in self._reserved_ports)
lock_obj.release()
raise

self._available_ports.discard(port)
assert port in self._reserved_ports
assert not (port in self._available_ports)
Expand All@@ -73,15 +106,17 @@ def release_port(self, number: int) -> None:
assert number <= __class__._C_MAX_PORT_NUMBER

assert self._guard is not None
assert type(self._reserved_ports) ==set # noqa: E721
assert type(self._reserved_ports) ==dict # noqa: E721

with self._guard:
assert number in self._reserved_ports
assert not (number in self._available_ports)
self._available_ports.add(number)
self._reserved_ports.discard(number)
lock_obj =self._reserved_ports.pop(number)
assert not (number in self._reserved_ports)
assert number in self._available_ports
assert isinstance(lock_obj, OsLockObj)
lock_obj.release()
__class__.helper__send_debug_msg("Port {} is released.", number)
return

Expand All@@ -95,3 +130,12 @@ def helper__send_debug_msg(msg_template: str, *args) -> None:
s = "[port manager] "
s += msg_template.format(*args)
logging.debug(s)

def helper__make_lock_path(self, port_number: int) -> str:
assert type(port_number) == int # noqa: E721
# You have to call the reserve_port at first!
assert type(self._lock_dir) == str # noqa: E721

result = os.path.join(self._lock_dir, str(port_number) + ".lock")
assert type(result) == str # noqa: E721
return result
22 changes: 22 additions & 0 deletionstestgres/operations/local_ops.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -18,6 +18,7 @@
from ..exceptions import ExecUtilException
from ..exceptions import InvalidOperationException
from .os_ops import ConnectionParams, OsOperations, get_default_encoding
from .os_ops import OsLockObj
from .raise_error import RaiseError
from .helpers import Helpers

Expand All@@ -31,6 +32,23 @@
CMD_TIMEOUT_SEC = 60


class LocalOsLockFsObj(OsLockObj):
_path: str

def __init__(self, path: str):
assert type(path) == str # noqa: str
self._path = path
os.mkdir(path) # throw
assert os.path.exists(path)
self._path = path

def release(self) -> None:
assert type(self._path) == str # noqa: str
assert os.path.exists(self._path)
os.rmdir(self._path)
self._path = None


class LocalOperations(OsOperations):
sm_dummy_conn_params = ConnectionParams()
sm_single_instance: OsOperations = None
Expand DownExpand Up@@ -597,3 +615,7 @@ def get_tempdir(self) -> str:
assert type(r) == str # noqa: E721
assert os.path.exists(r)
return r

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
return LocalOsLockFsObj(path)
9 changes: 9 additions & 0 deletionstestgres/operations/os_ops.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -17,6 +17,11 @@ def get_default_encoding():
return locale.getencoding() or 'UTF-8'


class OsLockObj:
def release(self) -> None:
raise NotImplementedError()


class OsOperations:
def __init__(self):
pass
Expand DownExpand Up@@ -143,3 +148,7 @@ def is_port_free(self, number: int):

def get_tempdir(self) -> str:
raise NotImplementedError()

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
raise NotImplementedError()
30 changes: 30 additions & 0 deletionstestgres/operations/remote_ops.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -14,6 +14,7 @@
from ..exceptions import ExecUtilException
from ..exceptions import InvalidOperationException
from .os_ops import OsOperations, ConnectionParams, get_default_encoding
from .os_ops import OsLockObj
from .raise_error import RaiseError
from .helpers import Helpers

Expand DownExpand Up@@ -44,6 +45,31 @@ def cmdline(self):
return cmdline.split()


class RemoteOsLockFsObj(OsLockObj):
_os_ops: RemoteOperations
_path: str

def __init__(self, os_ops: RemoteOperations, path: str):
assert isinstance(os_ops, RemoteOperations)
assert type(path) == str # noqa: str

os_ops.makedir(path) # throw
assert os_ops.path_exists(path)

self._os_ops = os_ops
self._path = path

def release(self) -> None:
assert type(self._path) == str # noqa: str
assert isinstance(self._os_ops, RemoteOperations)
assert self._os_ops.path_exists(self._path)

self._os_ops.rmdir(self._path) # throw

self._path = None
self._os_ops = None


class RemoteOperations(OsOperations):
sm_dummy_conn_params = ConnectionParams()

Expand DownExpand Up@@ -732,6 +758,10 @@ def get_tempdir(self) -> str:
assert type(temp_dir) == str # noqa: E721
return temp_dir

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
return RemoteOsLockFsObj(self, path)

@staticmethod
def _is_port_free__process_0(error: str) -> bool:
assert type(error) == str # noqa: E721
Expand Down
5 changes: 1 addition & 4 deletionstestgres/utils.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -30,9 +30,6 @@
#
_old_port_manager = PortManager__Generic(LocalOperations.get_single_instance())

# ports used by nodes
bound_ports = _old_port_manager._reserved_ports


# re-export version type
class PgVer(Version):
Expand All@@ -46,7 +43,7 @@ def __init__(self, version: str) -> None:

def internal__reserve_port():
"""
Generate anewport and add it to 'bound_ports'.
Reserve a port.
"""
return _old_port_manager.reserve_port()

Expand Down
35 changes: 35 additions & 0 deletionstests/test_os_ops_common.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -17,6 +17,7 @@

from testgres import InvalidOperationException
from testgres import ExecUtilException
from testgres.operations.os_ops import OsLockObj

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future as ThreadFuture
Expand DownExpand Up@@ -1137,3 +1138,37 @@ class tadWorkerData:

logging.info("Test is finished! Total error count is {}.".format(nErrors))
return

def test_create_lock_fs_obj(self, os_ops: OsOperations):
assert isinstance(os_ops, OsOperations)

tmp = os_ops.mkdtemp()
assert type(tmp) == str # noqa: E721
assert os_ops.path_exists(tmp)
logging.info("tmp dir is [{}]".format(tmp))

p1 = os.path.join(tmp, "a.lock")
obj1 = os_ops.create_lock_fs_obj(p1)
assert obj1 is not None
assert isinstance(obj1, OsLockObj)
assert os_ops.path_exists(tmp)
assert os_ops.path_exists(p1)

while True:
try:
os_ops.create_lock_fs_obj(p1)
except Exception as e:
logging.info("OK. We got the error ({}): {}".format(type(e).__name__, e))
break
raise Exception("We wait the exception!")

assert isinstance(obj1, OsLockObj)
assert os_ops.path_exists(tmp)
assert os_ops.path_exists(p1)

obj1.release()
assert not os_ops.path_exists(p1)

assert os_ops.path_exists(tmp)
os_ops.rmdir(tmp)
assert not os_ops.path_exists(tmp)
35 changes: 0 additions & 35 deletionstests/test_testgres_local.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,7 +19,6 @@
from testgres import get_pg_version

# NOTE: those are ugly imports
from testgres.utils import bound_ports
from testgres.utils import PgVer
from testgres.node import ProcessProxy

Expand DownExpand Up@@ -90,40 +89,6 @@ def test_pg_config(self):
b = get_pg_config()
assert (id(a) != id(b))

def test_ports_management(self):
assert bound_ports is not None
assert type(bound_ports) == set # noqa: E721

if len(bound_ports) != 0:
logging.warning("bound_ports is not empty: {0}".format(bound_ports))

stage0__bound_ports = bound_ports.copy()

with get_new_node() as node:
assert bound_ports is not None
assert type(bound_ports) == set # noqa: E721

assert node.port is not None
assert type(node.port) == int # noqa: E721

logging.info("node port is {0}".format(node.port))

assert node.port in bound_ports
assert node.port not in stage0__bound_ports

assert stage0__bound_ports <= bound_ports
assert len(stage0__bound_ports) + 1 == len(bound_ports)

stage1__bound_ports = stage0__bound_ports.copy()
stage1__bound_ports.add(node.port)

assert stage1__bound_ports == bound_ports

# check that port has been freed successfully
assert bound_ports is not None
assert type(bound_ports) == set # noqa: E721
assert bound_ports == stage0__bound_ports

def test_child_process_dies(self):
# test for FileNotFound exception during child_processes() function
cmd = ["timeout", "60"] if os.name == 'nt' else ["sleep", "60"]
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp