Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Pool for asyncio with multiprocessing, threading and gevent -like interface

License

NotificationsYou must be signed in to change notification settings

gistart/asyncio-pool

Repository files navigation

Pool of asyncio coroutines with familiar interface. Supports python 3.5+ (including PyPy 6+, which is also 3.5 atm)

AioPool makes sureno more andno less (if possible) thansize spawned coroutines are active at the same time.spawned means created and scheduled with one of the pool interface methods,active means coroutine function started executing it's code, as opposed towaiting -- which waits for pool space without entering coroutine function.

Interface

Readcode doctrings for details.

AioPool(size=4, *, loop=None)

Creates pool ofsize concurrent tasks. Supports async context manager interface.

spawn(coro, cb=None, ctx=None)

Waits for pool space, then creates task forcoro coroutine, returning future for it's result. Can spawn coroutine, created bycb with result ofcoro as first argument.ctx context is passed to callback as third positinal argument.

exec(coro, cb=None, ctx=None)

Waits for pool space, then creates task forcoro, then waits for it to finish, then returns result ofcoro if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.

spawn_n(coro, cb=None, ctx=None)

Creates waiting task forcoro, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.

join()

Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads todeadlock.

cancel(*futures)

Cancels spawned tasks (active and waiting), finding them by providedfutures. If no futures provided -- cancels all spawned tasks.

map(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)

Spawns coroutines created byfn function for each item initerable withspawn, waits for all of them to finish (including callbacks), returns results maintaining order ofiterable.

map_n(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)

Spawns coroutines created byfn function for each item initerable withspawn_n, returns futures for task results maintaining order ofiterable.

itermap(fn, iterable, cb=None, ctx=None, *, flat=True, get_result=getres.flat, timeout=None, yield_when=asyncio.ALL_COMPLETED)

Spawns tasks withmap_n(fn, iterable, cb, ctx), then waits for results withasyncio.wait function, yielding ready results one by one ifflat == True, otherwise yielding list of ready results.

Usage

spawn andmap methods is probably what you should use in 99% of cases. Their overhead is minimal (~3% execution time), and even in worst cases memory usage is insignificant.

spawn_n,map_n anditermap methods give you more control and flexibily, but they come with a price of higher overhead. They spawn all tasks that you want, and most of the tasks wait their turn "in background". If you spawn too much (10**6+ tasks) -- you'll use most of the memory you have in system, also you'll lose a lot of time on "concurrency management" of all the tasks spawned.

Play withpython tests/loadtest.py -h to understand what you want to use.

Usage examples (more intests/ andexamples/):

asyncdefworker(n):# dummy workerawaitaio.sleep(1/n)returnnasyncdefspawn_n_usage(todo=[range(1,51),range(51,101),range(101,200)]):futures= []asyncwithAioPool(size=20)aspool:fortasksintodo:foriintasks:# too many tasks# Returns quickly for all tasks, does not wait for pool space.# Workers are not spawned, they wait for pool space in their# own background tasks.fut=pool.spawn_n(worker(i))futures.append(fut)# At this point not a single worker should start.# Context manager calls `join` at exit, so this will finish when all# workers return, crash or cancelled.assertsum(itertools.chain.from_iterable(todo))== \sum(f.result()forfinfutures)asyncdefspawn_usage(todo=range(1,4)):futures= []asyncwithAioPool(size=2)aspool:foriintodo:# 1, 2, 3# Returns quickly for 1 and 2, then waits for empty space for 3,# spawns 3 and returns. Can save some resources I guess.fut=awaitpool.spawn(worker(i))futures.append(fut)# At this point some of the workers already started.# Context manager calls `join` at exit, so this will finish when all# workers return, crash or cancelled.assertsum(todo)==sum(fut.result()forfutinfutures)# all doneasyncdefmap_usage(todo=range(100)):pool=AioPool(size=10)# Waits and collects results from all spawned workers,# returns them in same order as `todo`, if worker crashes or cancelled:# returns exception object as a result.# Basically, it wraps `spawn_usage` code into one call.results=awaitpool.map(worker,todo)# await pool.join()  # is not needed here, bcs no other tasks were spawnedassertisinstance(results[0],ZeroDivisionError) \andsum(results[1:])==sum(todo)asyncdefitermap_usage(todo=range(1,11)):result=0asyncwithAioPool(size=10)aspool:# Combines spawn_n and iterwait, which is a wrapper for asyncio.wait,# which yields results of finished workers according to `timeout` and# `yield_when` params passed to asyncio.wait (see it's docs for details)asyncforresinpool.itermap(worker,todo,timeout=0.5):result+=res# technically, you can skip join callassertresult==sum(todo)asyncdefcallbacks_usage():asyncdefwrk(n):# custom dummy workerawaitaio.sleep(1/n)returnnasyncdefcb(res,err,ctx):# callbackiferr:# error handlingexc,tb=errasserttb# the only purpose of this is loggingreturnexcpool,n=ctx# context can be anything you likeawaitaio.sleep(1/ (n-1))returnres+ntodo=range(5)futures= []asyncwithAioPool(size=2)aspool:foriintodo:fut=pool.spawn_n(wrk(i),cb, (pool,i))futures.append(fut)results= []forfutinfutures:# there are helpers for result extraction. `flat` one will do# exactly what's written below#   from asyncio_pool import getres#   results.append(getres.flat(fut))try:results.append(fut.result())exceptExceptionase:results.append(e)# First error happens for n == 0 in wrk, exception of it is passed to# callback, callback returns it to us. Second one happens in callback itself# and is passed to us by pool.assertall(isinstance(e,ZeroDivisionError)foreinresults[:2])# All n's in `todo` are passed through `wrk` and `cb` (cb adds wrk result# and # number, passed inside context), except for n == 0 and n == 1.assertsum(results[2:])==2* (sum(todo)-0-1)asyncdefexec_usage(todo=range(1,11)):asyncwithAioPool(size=4)aspool:futures=pool.map_n(worker,todo)# While other workers are waiting or active, you can "synchronously"# execute one task. It does not interrupt  others, just waits for pool# space, then waits for task to finish and then returns it's result.important_res=awaitpool.exec(worker(2))assert2==important_res# You can continue working as usual:moar=awaitpool.spawn(worker(10))assertsum(todo)==sum(f.result()forfinfutures)asyncdefcancel_usage():asyncdefwrk(*arg,**kw):awaitaio.sleep(0.5)return1pool=AioPool(size=2)f_quick=pool.spawn_n(aio.sleep(0.1))f12=awaitpool.spawn(wrk()),pool.spawn_n(wrk())f35=pool.map_n(wrk,range(3))# At this point, if you cancel futures, returned by pool methods,# you just won't be able to retrieve spawned task results, task# themselves will continue working. Don't do this:#   f_quick.cancel()# use `pool.cancel` instead:# cancel someawaitaio.sleep(0.1)cancelled,results=awaitpool.cancel(f12[0],f35[2])# running and waitingassert2==cancelled# none of them had time to finishassert2==len(results)and \all(isinstance(res,aio.CancelledError)forresinresults)# cancel all othersawaitaio.sleep(0.1)# not interrupted and finished successfullyassertf_quick.done()andf_quick.result()isNonecancelled,results=awaitpool.cancel()# allassert3==cancelledassertlen(results)==3and \all(isinstance(res,aio.CancelledError)forresinresults)assertawaitpool.join()# joins successfullyasyncdefdetails(todo=range(1,11)):pool=AioPool(size=5)# This code:f1= []foriintodo:f1.append(pool.spawn_n(worker(i)))# is equivalent to one call of `map_n`:f2=pool.map_n(worker,todo)# Afterwards you can await for any given future:try:assert3==awaitf1[2]# result of spawn_n(worker(3))exceptExceptionase:# exception happened in worker (or CancelledError) will be re-raisedpass# Or use `asyncio.wait` to handle results in batches (see `iterwait` also):important_res=0more_important= [f1[1],f2[1],f2[2]]whilemore_important:done,more_important=awaitaio.wait(more_important,timeout=0.5)# handle result, note it will re-raise exceptionsimportant_res+=sum(f.result()forfindone)assertimportant_res==2+2+3# But you need to join, to allow all spawned workers to finish# (of course you can `asyncio.wait` all of the futures if you want to)awaitpool.join()assertall(f.done()forfinitertools.chain(f1,f2))# this is guaranteedassert2*sum(todo)==sum(f.result()forfinitertools.chain(f1,f2))

About

Pool for asyncio with multiprocessing, threading and gevent -like interface

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors4

  •  
  •  
  •  
  •  

[8]ページ先頭

©2009-2025 Movatter.jp