I am trying to pipe multiple Linux commands and abort if there is an error. WithPopen thecommunicate() method waits for all commands to finish. This is why I am tryingasyncio now.
I have the following MWE working as expected if there is no error:
async def forward(src, dest): """Read data from src and write it to dest.""" while True: chunk = await src.read(4096) if not chunk: dest.write_eof() await dest.drain() break dest.write(chunk) await dest.drain()async def stderr_watch(stream): err = await stream.read() if err.strip(): raise RuntimeError(f"stderr: {err.decode()}")async def main(): p1 = await asyncio.create_subprocess_exec( "find","/", "-name", "*.py", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) p2 = await asyncio.create_subprocess_exec( "wc", "-l", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) output = [] async def stream_reader(stream): while True: line = await stream.readline() if not line: break output.append(line.decode()) try: async with asyncio.TaskGroup() as tg: t1 = tg.create_task(stderr_watch(p1.stderr)) t2 = tg.create_task(stderr_watch(p2.stderr)) t3 = tg.create_task(forward(p1.stdout, p2.stdin)) t4 = tg.create_task(stream_reader(p2.stdout)) except* Exception as eg: for e in eg.exceptions: print(e) pass else: return "".join(output)if __name__ == '__main__': output = asyncio.run(main()) print(output)However, I am getting aRuntimeError: Event loop is closed if I create an exeption in p2, eg. by piping to"wc", "-l", "abc",. Where is my mistake?
1 Answer1
The error occurs because the finalizer of the internal object runs after the event loop is finished. (It's theasyncio.base_subprocess.BaseSubprocessTransport.)You need to terminate the child processes on error, and await on them, like this.
async def main(): p1 = await asyncio.create_subprocess_exec( ... ) p2 = await asyncio.create_subprocess_exec( ... ) ... try: async with asyncio.TaskGroup() as tg: ... except* Exception as eg: ... for p in (p1, p2): try: p.terminate() except Exception: pass for p in (p1, p2): await p.communicate()Comments
Explore related questions
See similar questions with these tags.