Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-47798: Add asubprocess.run_pipeline() API#142080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
gpshead wants to merge12 commits intopython:main
base:main
Choose a base branch
Loading
fromgpshead:claude/subprocess-pipe-chaining-01R27VPueru4RfRXYDsV5TmW

Conversation

@gpshead
Copy link
Member

@gpsheadgpshead commentedNov 29, 2025
edited
Loading

This was a feature request from 2008. I'm undecided if weshould add this feature, but it proved a good exercise and had me review a lot of existing subprocess.py code and notice a variety of other lingering subprocess issues while doing so(thus the my recent spate of smaller subprocess PRs merged today).

Summary

Read the docs in the PR for details, but it basically mirrors therun() API, just with multiple commands:

>>>fromsubprocessimportrun_pipeline>>>run_pipeline(...     ["gh","issue","list","--state","open","--search","subprocess in:title",..."--json","number","--limit","500"],...     ["jq","length"],... )97PipelineResult(commands=[['gh','issue','list','--state','open','--search','subprocess in:title','--json','number','--limit','500'], ['jq','length']],returncodes=[0,0])

Open question

This matches the signaling behavior of run upon timeout, namely SIGKILL. But to each process. Q: Is that rude? should we start in a particular order so that their own SIGPIPE's propagate and wait a bit before we signal each?

ThePipelineResult andPipelineError types are somewhat similar toCompletedProcess andCalledProcessError, but also different. Should they be the same, or be subclasses? Claude preferred not, as the obvious difference is the confusion around the singular attributes and what those even mean, particularly for the error when a pipeline failed its check=True saving throw. I can still imagine subclassingCalledProcessError is handy to avoid needing to modify code though as I doubt people catch the genericSubprocessError base often vs justexcept CalledProcessError:. I do not think it makes sense for the result.

Naming things -CompletedPipeline would be more consistent thanPipelineResult?

Alternatives ideas

I was pondering the use of the| pipe operator itself between objects. But this is unnatural and undesirable for Popen instances themselves as those start upon creation. Even though I could imagine allowing that to rewire their file descriptors. It'd get gross and raise questions around unclear management of the mess. You want processes started sequentially with the actual stdout->stdin chain of connections made from the start, so a run-like API makes sense to me.

This lets people avoid using a shell.

It does not offer the same flexibility a raw Popen instance does though for people who need to do their own IO multiplexing. Though given you can provide whatever file object you want for input and output, that could still be done using this by having your own threads feed or consume those instead of relying on capture_output.

What PyPI subprocess pipe options exist?

I found two PyPI packages offeringsomething resembling assembling pipelines of subprocesses:

  • subprocess_pipe - super trivial, what anyone could easily do already, not robust.
  • pipesubprocess - complicated, uses threads on all platforms, Popen-like API style.

Written entirely between my looking at subprocess sources, and driving a remote Claude Code for the web session and telling it what to do next. With the aid ofgpshead/cpython-skills.

Co-authored-by: Claude Opus 4.5


📚 Documentation preview 📚:https://cpython-previews--142080.org.readthedocs.build/

gpsheadand others added12 commitsNovember 29, 2025 08:04
Add a new run_pipeline() function to the subprocess module that enablesrunning multiple commands connected via pipes, similar to shell pipelines.New API:- run_pipeline(*commands, ...) - Run a pipeline of commands- PipelineResult - Return type with commands, returncodes, stdout, stderr- PipelineError - Raised when check=True and any command failsFeatures:- Supports arbitrary number of commands (minimum 2)- capture_output, input, timeout, and check parameters like run()- stdin= connects to first process, stdout= connects to last process- Text mode support via text=True, encoding, errors- All processes share a single stderr pipe for simplicity- "pipefail" semantics: check=True fails if any command failsUnlike run(), this function does not accept universal_newlines.Use text=True instead.Example:    result = subprocess.run_pipeline(        ['cat', 'file.txt'],        ['grep', 'pattern'],        ['wc', '-l'],        capture_output=True, text=True    )Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Document the new run_pipeline() function, PipelineResult class, andPipelineError exception in the subprocess module documentation.Includes:- Function signature with stdin, stdout, stderr, capture_output, etc.- Note about shared stderr pipe and text mode caveat for interleaved  multi-byte character sequences- Note that universal_newlines is not supported (use text=True)- Explanation that stdin connects to first process, stdout to last- Usage examples showing basic pipelines, multi-command pipelines,  input handling, and error handling with check=True- PipelineResult attributes: commands, returncodes, returncode,  stdout, stderr, and check_returncodes() method- PipelineError attributes: commands, returncodes, stdout, stderr,  and failed listCo-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Add _communicate_streams() helper function that properly multiplexesread/write operations to prevent pipe buffer deadlocks. The helperuses selectors on POSIX and threads on Windows, similar toPopen.communicate().This fixes potential deadlocks when large amounts of data flow throughthe pipeline and significantly improves performance.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Add three tests that verify the multiplexed I/O implementationproperly handles large data volumes that would otherwise causepipe buffer deadlocks:- test_pipeline_large_data_no_deadlock: 256KB through 2-stage pipeline- test_pipeline_large_data_three_stages: 128KB through 3-stage pipeline- test_pipeline_large_data_with_stderr: 64KB with concurrent stderrThese tests would timeout or deadlock without proper multiplexing.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Remove support for raw file descriptors in _communicate_streams(),requiring all streams to be file objects. This simplifies both theWindows and POSIX implementations by removing isinstance() checksand fd-wrapping logic.The run_pipeline() function now wraps the stderr pipe's read endwith os.fdopen() immediately after creation.This change makes _communicate_streams() more compatible withPopen.communicate() which already uses file objects, enablingpotential future refactoring to share the multiplexed I/O logic.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Update the test to write 64KB to stderr from each process (128KB total)instead of just small status messages. This better tests that themultiplexed I/O handles concurrent large data on both stdout and stderrwithout deadlocking.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
The comment suggested rewriting Popen._communicate() to usenon-blocking I/O on file objects now that Python 3's io moduleis used instead of C stdio.This is unnecessary - the current approach using select() todetect ready fds followed by os.read()/os.write() is correctand efficient. The selector already solves "when is data ready?"so non-blocking mode would add complexity with no benefit.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Extract the core selector-based I/O loop into a new _communicate_io_posix()function that is shared by both _communicate_streams_posix() (used byrun_pipeline) and Popen._communicate() (used by Popen.communicate).The new function:- Takes a pre-configured selector and output buffers- Supports resume via input_offset parameter (for Popen timeout retry)- Returns (new_offset, completed) instead of raising TimeoutExpired- Does not close streams (caller decides based on use case)This reduces code duplication and ensures both APIs use the samewell-tested I/O multiplexing logic.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Move stdin writing to a background thread in _communicate_streams_windowsto avoid blocking indefinitely when writing large input to a pipelinewhere the subprocess doesn't consume stdin quickly.This mirrors the fix made to Popen._communicate() for Windows incommit5b1862b (pythongh-87512).Add test_pipeline_timeout_large_input to verify that TimeoutExpiredis raised promptly when run_pipeline() is called with large inputand a timeout, even when the first process is slow to consume stdin.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Apply the same fixes from Popen._communicate() to _communicate_streams_posixfor run_pipeline():1. Handle non-byte memoryview input by casting to byte view (pythongh-134453):   Non-byte memoryviews (e.g., int32 arrays) had incorrect length tracking   because len() returns element count, not byte count. Now cast to "b"   view for correct progress tracking.2. Handle ValueError on stdin.flush() when stdin is closed (pythongh-74389):   Ignore ValueError from flush() if stdin is already closed, matching   the BrokenPipeError handling.Add tests for memoryview input to run_pipeline:- test_pipeline_memoryview_input: basic byte memoryview- test_pipeline_memoryview_input_nonbyte: int32 array memoryviewCo-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Extract common stdin preparation logic into shared helper functionsused by both _communicate_streams_posix() and Popen._communicate():- _flush_stdin(stdin): Flush stdin, ignoring BrokenPipeError and  ValueError (for closed files)- _make_input_view(input_data): Convert input data to a byte memoryview,  handling non-byte memoryviews by casting to "b" viewThis ensures consistent behavior and makes the fixes forpythongh-134453(memoryview) andpythongh-74389 (closed stdin) shared in one place.Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
- Factor out _translate_newlines() as a module-level function, have  Popen's method delegate to it for code sharing- Remove rejection of universal_newlines kwarg in run_pipeline(), treat  it the same as text=True (consistent with Popen behavior)- Use _translate_newlines() for text mode decoding in run_pipeline()  to properly handle \r\n and \r newline sequences- Update documentation to remove mention of universal_newlines rejection- Update test to verify universal_newlines=True works like text=TrueCo-authored-by: Claude <noreply@anthropic.com>
@gpsheadgpshead added type-featureA feature request or enhancement stdlibStandard Library Python modules in the Lib/ directory topic-subprocessSubprocess issues. labelsNov 29, 2025
@gpsheadgpshead self-assigned thisNov 29, 2025

..attribute::commands

List of commands that were used in the pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The caller has the list of commands that they supplied, I think they don’t need to be kept in the returned objects as well.

@merwok
Copy link
Member

merwok commentedNov 29, 2025
edited
Loading

This raises the issue of using chatbots to contribute PRs, and the unclear copyright/licensing status of their training data. (Not here of course, but a not fun discussion for python-dev)

maurycy reacted with eyes emoji

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@merwokmerwokmerwok left review comments

Assignees

@gpsheadgpshead

Labels

stdlibStandard Library Python modules in the Lib/ directorytopic-subprocessSubprocess issues.type-featureA feature request or enhancement

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants

@gpshead@merwok

[8]ページ先頭

©2009-2025 Movatter.jp