0

I am trying to pipe multiple Linux commands and abort if there is an error. WithPopen thecommunicate() method waits for all commands to finish. This is why I am tryingasyncio now.

I have the following MWE working as expected if there is no error:

async def forward(src, dest):    """Read data from src and write it to dest."""    while True:        chunk = await src.read(4096)        if not chunk:            dest.write_eof()            await dest.drain()            break        dest.write(chunk)        await dest.drain()async def stderr_watch(stream):    err = await stream.read()    if err.strip():        raise RuntimeError(f"stderr: {err.decode()}")async def main():    p1 = await asyncio.create_subprocess_exec(        "find","/", "-name", "*.py",        stdout=asyncio.subprocess.PIPE,        stderr=asyncio.subprocess.PIPE,    )    p2 = await asyncio.create_subprocess_exec(        "wc", "-l",        stdin=asyncio.subprocess.PIPE,        stdout=asyncio.subprocess.PIPE,        stderr=asyncio.subprocess.PIPE,    )    output = []    async def stream_reader(stream):        while True:            line = await stream.readline()            if not line:                break            output.append(line.decode())    try:        async with asyncio.TaskGroup() as tg:            t1 = tg.create_task(stderr_watch(p1.stderr))            t2 = tg.create_task(stderr_watch(p2.stderr))            t3 = tg.create_task(forward(p1.stdout, p2.stdin))            t4 = tg.create_task(stream_reader(p2.stdout))    except* Exception as eg:        for e in eg.exceptions:            print(e)            pass    else:        return "".join(output)if __name__ == '__main__':    output = asyncio.run(main())    print(output)

However, I am getting aRuntimeError: Event loop is closed if I create an exeption in p2, eg. by piping to"wc", "-l", "abc",. Where is my mistake?

askedNov 24 at 7:48
Manuel Schmidt's user avatar

1 Answer1

1

The error occurs because the finalizer of the internal object runs after the event loop is finished. (It's theasyncio.base_subprocess.BaseSubprocessTransport.)You need to terminate the child processes on error, and await on them, like this.

async def main():    p1 = await asyncio.create_subprocess_exec(        ...    )    p2 = await asyncio.create_subprocess_exec(        ...    )    ...    try:        async with asyncio.TaskGroup() as tg:        ...    except* Exception as eg:        ...        for p in (p1, p2):            try:                p.terminate()            except Exception:                pass    for p in (p1, p2):        await p.communicate()
answeredNov 25 at 1:31
relent95's user avatar
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Sign up orlog in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

By clicking “Post Your Answer”, you agree to ourterms of service and acknowledge you have read ourprivacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.