@@ -784,28 +784,6 @@ def _collect_special_files(self):
784784
785785return result
786786
787- def _collect_log_files (self ):
788- # dictionary of log files + size in bytes
789-
790- files = [
791- self .pg_log_file
792- ]# yapf: disable
793-
794- result = {}
795-
796- for f in files :
797- # skip missing files
798- if not self .os_ops .path_exists (f ):
799- continue
800-
801- file_size = self .os_ops .get_file_size (f )
802- assert type (file_size )== int # noqa: E721
803- assert file_size >= 0
804-
805- result [f ]= file_size
806-
807- return result
808-
809787def init (self ,initdb_params = None ,cached = True ,** kwargs ):
810788"""
811789 Perform initdb for this node.
@@ -1062,22 +1040,6 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
10621040OperationalError },
10631041max_attempts = max_attempts )
10641042
1065- def _detect_port_conflict (self ,log_files0 ,log_files1 ):
1066- assert type (log_files0 )== dict # noqa: E721
1067- assert type (log_files1 )== dict # noqa: E721
1068-
1069- for file in log_files1 .keys ():
1070- read_pos = 0
1071-
1072- if file in log_files0 .keys ():
1073- read_pos = log_files0 [file ]# the previous size
1074-
1075- file_content = self .os_ops .read_binary (file ,read_pos )
1076- file_content_s = file_content .decode ()
1077- if 'Is another postmaster already running on port' in file_content_s :
1078- return True
1079- return False
1080-
10811043def start (self ,params = [],wait = True ,exec_env = None ):
10821044"""
10831045 Starts the PostgreSQL node using pg_ctl if node has not been started.
@@ -1137,8 +1099,7 @@ def LOCAL__raise_cannot_start_node__std(from_exception):
11371099assert isinstance (self ._port_manager ,PortManager )
11381100assert __class__ ._C_MAX_START_ATEMPTS > 1
11391101
1140- log_files0 = self ._collect_log_files ()
1141- assert type (log_files0 )== dict # noqa: E721
1102+ log_reader = PostgresNodeLogReader (self ,from_beginnig = False )
11421103
11431104nAttempt = 0
11441105timeout = 1
@@ -1154,11 +1115,11 @@ def LOCAL__raise_cannot_start_node__std(from_exception):
11541115if nAttempt == __class__ ._C_MAX_START_ATEMPTS :
11551116LOCAL__raise_cannot_start_node (e ,"Cannot start node after multiple attempts." )
11561117
1157- log_files1 = self ._collect_log_files ()
1158- if not self ._detect_port_conflict (log_files0 ,log_files1 ):
1118+ is_it_port_conflict = PostgresNodeUtils .delect_port_conflict (log_reader )
1119+
1120+ if not is_it_port_conflict :
11591121LOCAL__raise_cannot_start_node__std (e )
11601122
1161- log_files0 = log_files1
11621123logging .warning (
11631124"Detected a conflict with using the port {0}. Trying another port after a {1}-second sleep..." .format (self ._port ,timeout )
11641125 )
@@ -2192,6 +2153,167 @@ def _escape_config_value(value):
21922153return result
21932154
21942155
2156+ class PostgresNodeLogReader :
2157+ class LogInfo :
2158+ position :int
2159+
2160+ def __init__ (self ,position :int ):
2161+ self .position = position
2162+
2163+ # --------------------------------------------------------------------
2164+ class LogDataBlock :
2165+ _file_name :str
2166+ _position :int
2167+ _data :str
2168+
2169+ def __init__ (
2170+ self ,
2171+ file_name :str ,
2172+ position :int ,
2173+ data :str
2174+ ):
2175+ assert type (file_name )== str # noqa: E721
2176+ assert type (position )== int # noqa: E721
2177+ assert type (data )== str # noqa: E721
2178+ assert file_name != ""
2179+ assert position >= 0
2180+ self ._file_name = file_name
2181+ self ._position = position
2182+ self ._data = data
2183+
2184+ @property
2185+ def file_name (self )-> str :
2186+ assert type (self ._file_name )== str # noqa: E721
2187+ assert self ._file_name != ""
2188+ return self ._file_name
2189+
2190+ @property
2191+ def position (self )-> int :
2192+ assert type (self ._position )== int # noqa: E721
2193+ assert self ._position >= 0
2194+ return self ._position
2195+
2196+ @property
2197+ def data (self )-> str :
2198+ assert type (self ._data )== str # noqa: E721
2199+ return self ._data
2200+
2201+ # --------------------------------------------------------------------
2202+ _node :PostgresNode
2203+ _logs :typing .Dict [str ,LogInfo ]
2204+
2205+ # --------------------------------------------------------------------
2206+ def __init__ (self ,node :PostgresNode ,from_beginnig :bool ):
2207+ assert node is not None
2208+ assert isinstance (node ,PostgresNode )
2209+ assert type (from_beginnig )== bool # noqa: E721
2210+
2211+ self ._node = node
2212+
2213+ if from_beginnig :
2214+ self ._logs = dict ()
2215+ else :
2216+ self ._logs = self ._collect_logs ()
2217+
2218+ assert type (self ._logs )== dict # noqa: E721
2219+ return
2220+
2221+ def read (self )-> typing .List [LogDataBlock ]:
2222+ assert self ._node is not None
2223+ assert isinstance (self ._node ,PostgresNode )
2224+
2225+ cur_logs :typing .Dict [__class__ .LogInfo ]= self ._collect_logs ()
2226+ assert cur_logs is not None
2227+ assert type (cur_logs )== dict # noqa: E721
2228+
2229+ assert type (self ._logs )== dict # noqa: E721
2230+
2231+ result = list ()
2232+
2233+ for file_name ,cur_log_info in cur_logs .items ():
2234+ assert type (file_name )== str # noqa: E721
2235+ assert type (cur_log_info )== __class__ .LogInfo # noqa: E721
2236+
2237+ read_pos = 0
2238+
2239+ if file_name in self ._logs .keys ():
2240+ prev_log_info = self ._logs [file_name ]
2241+ assert type (prev_log_info )== __class__ .LogInfo # noqa: E721
2242+ read_pos = prev_log_info .position # the previous size
2243+
2244+ file_content_b = self ._node .os_ops .read_binary (file_name ,read_pos )
2245+ assert type (file_content_b )== bytes # noqa: E721
2246+
2247+ #
2248+ # A POTENTIAL PROBLEM: file_content_b may contain an incompleted UTF-8 symbol.
2249+ #
2250+ file_content_s = file_content_b .decode ()
2251+ assert type (file_content_s )== str # noqa: E721
2252+
2253+ next_read_pos = read_pos + len (file_content_b )
2254+
2255+ # It is a research/paranoja check.
2256+ # When we will process partial UTF-8 symbol, it must be adjusted.
2257+ assert cur_log_info .position <= next_read_pos
2258+
2259+ cur_log_info .position = next_read_pos
2260+
2261+ block = __class__ .LogDataBlock (
2262+ file_name ,
2263+ read_pos ,
2264+ file_content_s
2265+ )
2266+
2267+ result .append (block )
2268+
2269+ # A new check point
2270+ self ._logs = cur_logs
2271+
2272+ return result
2273+
2274+ def _collect_logs (self )-> typing .Dict [LogInfo ]:
2275+ assert self ._node is not None
2276+ assert isinstance (self ._node ,PostgresNode )
2277+
2278+ files = [
2279+ self ._node .pg_log_file
2280+ ]# yapf: disable
2281+
2282+ result = dict ()
2283+
2284+ for f in files :
2285+ assert type (f )== str # noqa: E721
2286+
2287+ # skip missing files
2288+ if not self ._node .os_ops .path_exists (f ):
2289+ continue
2290+
2291+ file_size = self ._node .os_ops .get_file_size (f )
2292+ assert type (file_size )== int # noqa: E721
2293+ assert file_size >= 0
2294+
2295+ result [f ]= __class__ .LogInfo (file_size )
2296+
2297+ return result
2298+
2299+
2300+ class PostgresNodeUtils :
2301+ @staticmethod
2302+ def delect_port_conflict (log_reader :PostgresNodeLogReader )-> bool :
2303+ assert type (log_reader )== PostgresNodeLogReader # noqa: E721
2304+
2305+ blocks = log_reader .read ()
2306+ assert type (blocks )== list # noqa: E721
2307+
2308+ for block in blocks :
2309+ assert type (block )== PostgresNodeLogReader .LogDataBlock # noqa: E721
2310+
2311+ if 'Is another postmaster already running on port' in block .data :
2312+ return True
2313+
2314+ return False
2315+
2316+
21952317class NodeApp :
21962318
21972319def __init__ (self ,test_path = None ,nodes_to_cleanup = None ,os_ops = None ):