Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-47798: Add asubprocess.run_pipeline() API#142080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
gpshead wants to merge12 commits intopython:main
base:main
Choose a base branch
Loading
fromgpshead:claude/subprocess-pipe-chaining-01R27VPueru4RfRXYDsV5TmW
Draft
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
12 commits
Select commitHold shift + click to select a range
e3a2fbe
Add subprocess.run_pipeline() for command pipe chaining
gpsheadNov 27, 2025
4feb2a8
Add documentation for subprocess.run_pipeline()
gpsheadNov 27, 2025
2a11d4b
Refactor run_pipeline() to use multiplexed I/O
gpsheadNov 28, 2025
2470e14
Add deadlock prevention tests for run_pipeline()
gpsheadNov 28, 2025
e22d1da
Simplify _communicate_streams() to only accept file objects
gpsheadNov 28, 2025
a3e98a7
Improve test_pipeline_large_data_with_stderr to use large stderr
gpsheadNov 28, 2025
3c28ed6
Remove obsolete XXX comment about non-blocking I/O
gpsheadNov 29, 2025
9f53a8e
Refactor POSIX communicate I/O into shared _communicate_io_posix()
gpsheadNov 29, 2025
d420f29
Fix _communicate_streams_windows to avoid blocking with large input
gpsheadNov 29, 2025
df8f082
Fix memoryview and closed stdin handling in _communicate_streams_posix
gpsheadNov 29, 2025
978cd76
Factor out _flush_stdin() and _make_input_view() helpers
gpsheadNov 29, 2025
15f8a93
Support universal_newlines and use _translate_newlines in run_pipeline
gpsheadNov 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
Fix _communicate_streams_windows to avoid blocking with large input
Move stdin writing to a background thread in _communicate_streams_windowsto avoid blocking indefinitely when writing large input to a pipelinewhere the subprocess doesn't consume stdin quickly.This mirrors the fix made to Popen._communicate() for Windows incommit5b1862b (gh-87512).Add test_pipeline_timeout_large_input to verify that TimeoutExpiredis raised promptly when run_pipeline() is called with large inputand a timeout, even when the first process is slow to consume stdin.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
  • Loading branch information
@gpshead@claude
gpshead andclaude committedNov 29, 2025
commitd420f29e2bc04b97f82c20e3822cd3b6e68cf4f4
71 changes: 53 additions & 18 deletionsLib/subprocess.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -443,11 +443,48 @@ def _reader_thread_func(fh, buffer):
except OSError:
buffer.append(b'')

def _writer_thread_func(fh, data, result):
"""Thread function to write data to a file handle and close it."""
try:
if data:
fh.write(data)
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
result.append(exc)
try:
fh.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL and not result:
result.append(exc)

def _communicate_streams_windows(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""Windows implementation using threads."""
threads = []
buffers = {}
writer_thread = None
writer_result = []

# Start writer thread to send input to stdin
if stdin and input_data:
writer_thread = threading.Thread(
target=_writer_thread_func,
args=(stdin, input_data, writer_result))
writer_thread.daemon = True
writer_thread.start()
elif stdin:
# No input data, just close stdin
try:
stdin.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise

# Start reader threads for each stream
for stream in read_streams:
Expand All@@ -458,25 +495,23 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
t.start()
threads.append((stream, t))

# Write stdin
if stdin and input_data:
try:
stdin.write(input_data)
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise
if stdin:
try:
stdin.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise
# Join writer thread with timeout first
if writer_thread is not None:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
remaining = 0
writer_thread.join(remaining)
if writer_thread.is_alive():
# Timed out during write - collect partial results
results = {s: (b[0] if b else b'') for s, b in buffers.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Check for write errors
if writer_result:
raise writer_result[0]

# Join threads with timeout
# Joinreaderthreads with timeout
for stream, t in threads:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
Expand Down
33 changes: 33 additions & 0 deletionsLib/test/test_subprocess.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2298,6 +2298,39 @@ def test_pipeline_large_data_with_stderr(self):
self.assertGreater(len(result.stderr), stderr_size)
self.assertEqual(result.returncodes, [0, 0])

def test_pipeline_timeout_large_input(self):
"""Test that timeout is enforced with large input to a slow pipeline.

This verifies that run_pipeline() doesn't block indefinitely when
writing large input to a pipeline where the first process is slow
to consume stdin. The timeout should be enforced promptly.

This is particularly important on Windows where stdin writing could
block without proper threading.
"""
# Input larger than typical pipe buffer (64KB)
input_data = 'x' * (128 * 1024)

start = time.monotonic()
with self.assertRaises(subprocess.TimeoutExpired):
subprocess.run_pipeline(
# First process sleeps before reading - simulates slow consumer
[sys.executable, '-c',
'import sys, time; time.sleep(30); print(sys.stdin.read())'],
[sys.executable, '-c',
'import sys; print(len(sys.stdin.read()))'],
input=input_data, capture_output=True, text=True, timeout=0.5
)
elapsed = time.monotonic() - start

# Timeout should occur close to the specified timeout value,
# not after waiting for the subprocess to finish sleeping.
# Allow generous margin for slow CI, but must be well under
# the subprocess sleep time.
self.assertLess(elapsed, 5.0,
f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. "
"Input writing may have blocked without checking timeout.")


def _get_test_grp_name():
for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp