Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Surface fetch/pull/push kill_after_timeout and reset default to None#1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Byron merged 17 commits intogitpython-developers:mainfromsroet:update_fetch_timeout
Sep 18, 2021
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
17 commits
Select commitHold shift + click to select a range
aafb300
change default fetch timeout to 60 s
sroetSep 10, 2021
1d26515
allow for timeout propagation
sroetSep 10, 2021
febd4fe
add test timeout with the old 10 s timeout
sroetSep 10, 2021
4113d01
also test a call to 'push' with 10s timeout
sroetSep 10, 2021
c55a8e3
propagate kwargs in do_test_fetch
sroetSep 10, 2021
7df33f3
reset default timeout to None
sroetSep 10, 2021
b555764
update docstring
sroetSep 10, 2021
6b358f9
reuse kill_after_timeout kwarg
sroetSep 13, 2021
3a81850
update tests and add a comment about different behaviour of 'push' vs…
sroetSep 13, 2021
d41d537
go for pytest.raises and test that the functions run
sroetSep 13, 2021
5531323
make flake8 and mypy happy
sroetSep 14, 2021
da68bb1
fix typo's
sroetSep 14, 2021
9678226
make test timeout stricter
sroetSep 14, 2021
a9e43e5
fetch is also to quick on CI, only test pull
sroetSep 14, 2021
5081881
two spaces before comments
sroetSep 14, 2021
083039a
set timeout to a non-zero value
sroetSep 14, 2021
e058c4c
Add a way to force status codes inside AutoInterrupt._terminate, and …
sroetSep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletionsgit/cmd.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -79,7 +79,7 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
finalizer: Union[None,
Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None,
decode_streams: bool = True,
timeout:float =10.0) -> None:
kill_after_timeout: Union[None,float] =None) -> None:
"""Registers for notifications to learn that process output is ready to read, and dispatches lines to
the respective line handlers.
This function returns once the finalizer returns
Expand All@@ -94,7 +94,10 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
their contents to handlers.
Set it to False if `universal_newline == True` (then streams are in text-mode)
or if decoding must happen later (i.e. for Diffs).
:param timeout: float, timeout to pass to t.join() in case it hangs. Default = 10.0 seconds
:param kill_after_timeout:
float or None, Default = None
To specify a timeout in seconds for the git command, after which the process
should be killed.
"""
# Use 2 "pump" threads and wait for both to finish.
def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool,
Expand All@@ -108,9 +111,12 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO],
handler(line_str)
else:
handler(line)

except Exception as ex:
log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}")
raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
if "I/O operation on closed file" not in str(ex):
# Only reraise if the error was not due to the stream closing
raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
finally:
stream.close()

Expand DownExpand Up@@ -146,9 +152,24 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO],
## FIXME: Why Join?? Will block if `stdin` needs feeding...
#
for t in threads:
t.join(timeout=timeout)
t.join(timeout=kill_after_timeout)
if t.is_alive():
raise RuntimeError(f"Thread join() timed out in cmd.handle_process_output(). Timeout={timeout} seconds")
if isinstance(process, Git.AutoInterrupt):
process._terminate()
else: # Don't want to deal with the other case
raise RuntimeError("Thread join() timed out in cmd.handle_process_output()."
f" kill_after_timeout={kill_after_timeout} seconds")
if stderr_handler:
error_str: Union[str, bytes] = (
"error: process killed because it timed out."
f" kill_after_timeout={kill_after_timeout} seconds")
if not decode_streams and isinstance(p_stderr, BinaryIO):
# Assume stderr_handler needs binary input
error_str = cast(str, error_str)
error_str = error_str.encode()
# We ignore typing on the next line because mypy does not like
# the way we inferred that stderr takes str or bytes
stderr_handler(error_str) # type: ignore

if finalizer:
return finalizer(process)
Expand DownExpand Up@@ -386,13 +407,19 @@ class AutoInterrupt(object):
The wait method was overridden to perform automatic status code checking
and possibly raise."""

__slots__ = ("proc", "args")
__slots__ = ("proc", "args", "status")

# If this is non-zero it will override any status code during
# _terminate, used to prevent race conditions in testing
_status_code_if_terminate: int = 0

def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
self.proc = proc
self.args = args
self.status: Union[int, None] = None

def __del__(self) -> None:
def _terminate(self) -> None:
"""Terminate the underlying process"""
if self.proc is None:
return

Expand All@@ -404,10 +431,10 @@ def __del__(self) -> None:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()

# did the process finish already so we have a return code ?
try:
if proc.poll() is not None:
self.status = self._status_code_if_terminate or proc.poll()
return None
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
Expand All@@ -419,7 +446,9 @@ def __del__(self) -> None:
# try to kill it
try:
proc.terminate()
proc.wait() # ensure process goes away
status = proc.wait() # ensure process goes away

self.status = self._status_code_if_terminate or status
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
except AttributeError:
Expand All@@ -431,6 +460,9 @@ def __del__(self) -> None:
call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True)
# END exception handling

def __del__(self) -> None:
self._terminate()

def __getattr__(self, attr: str) -> Any:
return getattr(self.proc, attr)

Expand All@@ -444,24 +476,29 @@ def wait(self, stderr: Union[None, str, bytes] = b'') -> int:
if stderr is None:
stderr_b = b''
stderr_b = force_bytes(data=stderr, encoding='utf-8')

status: Union[int, None]
if self.proc is not None:
status = self.proc.wait()
p_stderr = self.proc.stderr
else: # Assume the underlying proc was killed earlier or never existed
status = self.status
p_stderr = None

def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
return stderr_b or b''
else:
def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
return stderr_b or b''
else:
return stderr_b or b''

if status != 0:
errstr = read_all_from_possibly_closed_stream(self.proc.stderr)
log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
raise GitCommandError(remove_password_if_present(self.args), status, errstr)
# END status handling

if status != 0:
errstr = read_all_from_possibly_closed_stream(p_stderr)
log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
raise GitCommandError(remove_password_if_present(self.args), status, errstr)
return status

# END auto interrupt
Expand DownExpand Up@@ -694,7 +731,7 @@ def execute(self,
as_process: bool = False,
output_stream: Union[None, BinaryIO] = None,
stdout_as_string: bool = True,
kill_after_timeout: Union[None,int] = None,
kill_after_timeout: Union[None,float] = None,
with_stdout: bool = True,
universal_newlines: bool = False,
shell: Union[None, bool] = None,
Expand DownExpand Up@@ -817,7 +854,7 @@ def execute(self,

if is_win:
cmd_not_found_exception = OSError
if kill_after_timeout:
if kill_after_timeout is not None:
raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.')
else:
cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
Expand DownExpand Up@@ -884,7 +921,7 @@ def _kill_process(pid: int) -> None:
return
# end

if kill_after_timeout:
if kill_after_timeout is not None:
kill_check = threading.Event()
watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,))

Expand All@@ -895,10 +932,10 @@ def _kill_process(pid: int) -> None:
newline = "\n" if universal_newlines else b"\n"
try:
if output_stream is None:
if kill_after_timeout:
if kill_after_timeout is not None:
watchdog.start()
stdout_value, stderr_value = proc.communicate()
if kill_after_timeout:
if kill_after_timeout is not None:
watchdog.cancel()
if kill_check.is_set():
stderr_value = ('Timeout: the command "%s" did not complete in %d '
Expand Down
40 changes: 31 additions & 9 deletionsgit/remote.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -707,7 +707,8 @@ def update(self, **kwargs: Any) -> 'Remote':
return self

def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
progress: Union[Callable[..., Any], RemoteProgress, None]
progress: Union[Callable[..., Any], RemoteProgress, None],
kill_after_timeout: Union[None, float] = None,
) -> IterableList['FetchInfo']:

progress = to_progress_instance(progress)
Expand All@@ -724,7 +725,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
cmds = set(FetchInfo._flag_map.keys())

progress_handler = progress.new_message_handler()
handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False)
handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False,
kill_after_timeout=kill_after_timeout)

stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or ''
proc.wait(stderr=stderr_text)
Expand DownExpand Up@@ -769,7 +771,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
return output

def _get_push_info(self, proc: 'Git.AutoInterrupt',
progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList[PushInfo]:
progress: Union[Callable[..., Any], RemoteProgress, None],
kill_after_timeout: Union[None, float] = None) -> IterableList[PushInfo]:
progress = to_progress_instance(progress)

# read progress information from stderr
Expand All@@ -786,11 +789,14 @@ def stdout_handler(line: str) -> None:
# If an error happens, additional info is given which we parse below.
pass

handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False)
handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False,
kill_after_timeout=kill_after_timeout)
stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or ''
try:
proc.wait(stderr=stderr_text)
except Exception:
# This is different than fetch (which fails if there is any std_err
# even if there is an output)
if not output:
raise
elif stderr_text:
Expand All@@ -813,7 +819,9 @@ def _assert_refspec(self) -> None:

def fetch(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, None, 'UpdateProgress'] = None,
verbose: bool = True, **kwargs: Any) -> IterableList[FetchInfo]:
verbose: bool = True,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote

:param refspec:
Expand All@@ -833,6 +841,9 @@ def fetch(self, refspec: Union[str, List[str], None] = None,
for 'refspec' will make use of this facility.
:param progress: See 'push' method
:param verbose: Boolean for verbose output
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. It is set to None by default.
:param kwargs: Additional arguments to be passed to git-fetch
:return:
IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed
Expand All@@ -853,19 +864,22 @@ def fetch(self, refspec: Union[str, List[str], None] = None,

proc = self.repo.git.fetch(self, *args, as_process=True, with_stdout=False,
universal_newlines=True, v=verbose, **kwargs)
res = self._get_fetch_info_from_stderr(proc, progress)
res = self._get_fetch_info_from_stderr(proc, progress,
kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
return res

def pull(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, 'UpdateProgress', None] = None,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
by a merge of branch with your local branch.

:param refspec: see 'fetch' method
:param progress: see 'push' method
:param kill_after_timeout: see 'fetch' method
:param kwargs: Additional arguments to be passed to git-pull
:return: Please see 'fetch' method """
if refspec is None:
Expand All@@ -874,13 +888,15 @@ def pull(self, refspec: Union[str, List[str], None] = None,
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.pull(self, refspec, with_stdout=False, as_process=True,
universal_newlines=True, v=True, **kwargs)
res = self._get_fetch_info_from_stderr(proc, progress)
res = self._get_fetch_info_from_stderr(proc, progress,
kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
return res

def push(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] = None,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[PushInfo]:
"""Push changes from source branch in refspec to target branch in refspec.

Expand All@@ -897,6 +913,9 @@ def push(self, refspec: Union[str, List[str], None] = None,
overrides the ``update()`` function.

:note: No further progress information is returned after push returns.
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. It is set to None by default.
:param kwargs: Additional arguments to be passed to git-push
:return:
list(PushInfo, ...) list of PushInfo instances, each
Expand All@@ -908,8 +927,11 @@ def push(self, refspec: Union[str, List[str], None] = None,
be 0."""
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.push(self, refspec, porcelain=True, as_process=True,
universal_newlines=True, **kwargs)
return self._get_push_info(proc, progress)
universal_newlines=True,
kill_after_timeout=kill_after_timeout,
**kwargs)
return self._get_push_info(proc, progress,
kill_after_timeout=kill_after_timeout)

@ property
def config_reader(self) -> SectionConstraint[GitConfigParser]:
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp