Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-47798: Add asubprocess.run_pipeline() API#142080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
gpshead wants to merge12 commits intopython:main
base:main
Choose a base branch
Loading
fromgpshead:claude/subprocess-pipe-chaining-01R27VPueru4RfRXYDsV5TmW
Draft
Changes from1 commit
Commits
Show all changes
12 commits
Select commitHold shift + click to select a range
e3a2fbe
Add subprocess.run_pipeline() for command pipe chaining
gpsheadNov 27, 2025
4feb2a8
Add documentation for subprocess.run_pipeline()
gpsheadNov 27, 2025
2a11d4b
Refactor run_pipeline() to use multiplexed I/O
gpsheadNov 28, 2025
2470e14
Add deadlock prevention tests for run_pipeline()
gpsheadNov 28, 2025
e22d1da
Simplify _communicate_streams() to only accept file objects
gpsheadNov 28, 2025
a3e98a7
Improve test_pipeline_large_data_with_stderr to use large stderr
gpsheadNov 28, 2025
3c28ed6
Remove obsolete XXX comment about non-blocking I/O
gpsheadNov 29, 2025
9f53a8e
Refactor POSIX communicate I/O into shared _communicate_io_posix()
gpsheadNov 29, 2025
d420f29
Fix _communicate_streams_windows to avoid blocking with large input
gpsheadNov 29, 2025
df8f082
Fix memoryview and closed stdin handling in _communicate_streams_posix
gpsheadNov 29, 2025
978cd76
Factor out _flush_stdin() and _make_input_view() helpers
gpsheadNov 29, 2025
15f8a93
Support universal_newlines and use _translate_newlines in run_pipeline
gpsheadNov 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
Simplify _communicate_streams() to only accept file objects
Remove support for raw file descriptors in _communicate_streams(),requiring all streams to be file objects. This simplifies both theWindows and POSIX implementations by removing isinstance() checksand fd-wrapping logic.The run_pipeline() function now wraps the stderr pipe's read endwith os.fdopen() immediately after creation.This change makes _communicate_streams() more compatible withPopen.communicate() which already uses file objects, enablingpotential future refactoring to share the multiplexed I/O logic.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
  • Loading branch information
@gpshead@claude
gpshead andclaude committedNov 29, 2025
commite22d1da9bccb0c1cb40ec6e0d21b6ce16316cd1c
80 changes: 31 additions & 49 deletionsLib/subprocess.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -333,18 +333,19 @@ def _communicate_streams(stdin=None, input_data=None, read_streams=None,
"""
Multiplex I/O: write input_data to stdin, read from read_streams.

Works with bothfile objectsand raw file descriptors.
All streams must befile objects(not raw file descriptors).
All I/O is done in binary mode; caller handles text encoding.

Args:
stdin: Writable file object for input, or None
stdin: Writablebinaryfile object for input, or None
input_data: Bytes to write to stdin, or None
read_streams: List of readable file objects or raw fds to read from
read_streams: List of readablebinaryfile objects to read from
timeout: Timeout in seconds, or None for no timeout
cmd_for_timeout: Value to use for TimeoutExpired.cmd

Returns:
Dict mapping each item in read_streams to its bytes data
Dict mapping each file object in read_streams to its bytes data.
All file objects in read_streams will be closed.

Raises:
TimeoutExpired: If timeout expires (with partial data)
Expand DownExpand Up@@ -377,22 +378,15 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
"""Windows implementation using threads."""
threads = []
buffers = {}
fds_to_close = []

# Start reader threads
# Start reader threads for each stream
for stream in read_streams:
buf = []
buffers[stream] = buf
# Wrap raw fds in file objects
if isinstance(stream, int):
fobj = os.fdopen(os.dup(stream), 'rb')
fds_to_close.append(stream)
else:
fobj = stream
t = threading.Thread(target=_reader_thread_func, args=(fobj, buf))
t = threading.Thread(target=_reader_thread_func, args=(stream, buf))
t.daemon = True
t.start()
threads.append((stream, t, fobj))
threads.append((stream, t))

# Write stdin
if stdin and input_data:
Expand All@@ -413,7 +407,7 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
raise

# Join threads with timeout
for stream, t, fobj in threads:
for stream, t in threads:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
remaining = 0
Expand All@@ -425,28 +419,17 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)

# Close any raw fds we duped
for fd in fds_to_close:
try:
os.close(fd)
except OSError:
pass

# Collect results
return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()}

else:
def _communicate_streams_posix(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""POSIX implementation using selectors."""
#Normalize read_streams: buildmapping of fd -> (original_key, chunks)
fd_info = {} # fd -> (original_stream, chunks_list)
#Buildmapping of fd -> (file_object, chunks_list)
fd_info = {}
for stream in read_streams:
if isinstance(stream, int):
fd = stream
else:
fd = stream.fileno()
fd_info[fd] = (stream, [])
fd_info[stream.fileno()] = (stream, [])

# Prepare stdin
stdin_fd = None
Expand DownExpand Up@@ -477,8 +460,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
# Timed out - collect partial results
results = {orig: b''.join(chunks)
for fd, (orig, chunks) in fd_info.items()}
results = {stream: b''.join(chunks)
for fd, (stream, chunks) in fd_info.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
Expand All@@ -487,8 +470,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams,

# Check timeout after select
if endtime is not None and _time() > endtime:
results = {orig: b''.join(chunks)
for fd, (orig, chunks) in fd_info.items()}
results = {stream: b''.join(chunks)
for fd, (stream, chunks) in fd_info.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
Expand DownExpand Up@@ -520,16 +503,14 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
else:
fd_info[key.fd][1].append(data)

# Build results: map original stream keys to joined data
# Build results and close all file objects
results = {}
for fd, (orig_stream, chunks) in fd_info.items():
results[orig_stream] = b''.join(chunks)
# Close file objects (but not raw fds - caller manages those)
if not isinstance(orig_stream, int):
try:
orig_stream.close()
except OSError:
pass
for fd, (stream, chunks) in fd_info.items():
results[stream] = b''.join(chunks)
try:
stream.close()
except OSError:
pass

return results

Expand DownExpand Up@@ -942,13 +923,14 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
stdout_arg = kwargs.pop('stdout', None)

processes = []
stderr_read_fd = None# Read end ofshared stderr pipe (for parent)
stderr_reader = None # File object for readingshared stderr (for parent)
stderr_write_fd = None # Write end of shared stderr pipe (for children)

try:
# Create a single stderr pipe that all processes will share
if capture_stderr:
stderr_read_fd, stderr_write_fd = os.pipe()
stderr_reader = os.fdopen(stderr_read_fd, 'rb')

for i, cmd in enumerate(commands):
is_first = (i == 0)
Expand DownExpand Up@@ -1017,8 +999,8 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
read_streams = []
if last_proc.stdout is not None:
read_streams.append(last_proc.stdout)
ifstderr_read_fd is not None:
read_streams.append(stderr_read_fd)
ifstderr_reader is not None:
read_streams.append(stderr_reader)

# Use multiplexed I/O to handle stdin/stdout/stderr concurrently
# This avoids deadlocks from pipe buffer limits
Expand All@@ -1043,7 +1025,7 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,

# Extract results
stdout = results.get(last_proc.stdout)
stderr = results.get(stderr_read_fd)
stderr = results.get(stderr_reader)

# Decode stdout if in text mode (Popen text mode only applies to
# streams it creates, but we read via _communicate_streams which
Expand DownExpand Up@@ -1087,10 +1069,10 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
proc.stdin.close()
if proc.stdout and not proc.stdout.closed:
proc.stdout.close()
# Close stderr pipe filedescriptor
ifstderr_read_fd is not None:
# Close stderr pipe(reader is afileobject, writer is a raw fd)
ifstderr_reader is not None and not stderr_reader.closed:
try:
os.close(stderr_read_fd)
stderr_reader.close()
except OSError:
pass
if stderr_write_fd is not None:
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp