- Notifications
You must be signed in to change notification settings - Fork35
A Python 3.5+ library that integrates the multiprocessing module with asyncio
License
dano/aioprocessing
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
aioprocessing
provides asynchronous,asyncio
compatible, coroutineversions of many blocking instance methods on objects in themultiprocessing
library. To usedill
for universal pickling, install usingpip install aioprocessing[dill]
. Here's an example demonstrating theaioprocessing
versions ofEvent
,Queue
, andLock
:
importtimeimportasyncioimportaioprocessingdeffunc(queue,event,lock,items):""" Demo worker function. This worker function runs in its own process, and uses normal blocking calls to aioprocessing objects, exactly the way you would use oridinary multiprocessing objects. """withlock:event.set()foriteminitems:time.sleep(3)queue.put(item+5)queue.close()asyncdefexample(queue,event,lock):l= [1,2,3,4,5]p=aioprocessing.AioProcess(target=func,args=(queue,event,lock,l))p.start()whileTrue:result=awaitqueue.coro_get()ifresultisNone:breakprint("Got result {}".format(result))awaitp.coro_join()asyncdefexample2(queue,event,lock):awaitevent.coro_wait()asyncwithlock:awaitqueue.coro_put(78)awaitqueue.coro_put(None)# Shut down the workerif__name__=="__main__":loop=asyncio.get_event_loop()queue=aioprocessing.AioQueue()lock=aioprocessing.AioLock()event=aioprocessing.AioEvent()tasks= [asyncio.ensure_future(example(queue,event,lock)),asyncio.ensure_future(example2(queue,event,lock)), ]loop.run_until_complete(asyncio.wait(tasks))loop.close()
The aioprocessing objects can be used just like their multiprocessingequivalents - as they are infunc
above - but they can also beseamlessly used inside ofasyncio
coroutines, without ever blockingthe event loop.
v2.0.1
- Fixed a bug that kept the
AioBarrier
andAioEvent
proxies returned fromAioManager
instances from working. Thanks to Giorgos Apostolopoulos for the fix.
v2.0.0
- Add support for universal pickling using
dill
, installable withpip install aioprocessing[dill]
. The library will now attempt to importmultiprocess
, falling back to stdlibmultiprocessing
. Force stdlib behaviour by setting a non-empty environment variableAIOPROCESSING_DILL_DISABLED=1
. This can be used to avoiderrors when attempting to combineaioprocessing[dill]
with stdlibmultiprocessing
based objects likeconcurrent.futures.ProcessPoolExecutor
.
In most cases, this library makes blocking calls tomultiprocessing
methodsasynchronous by executing the call in aThreadPoolExecutor
, usingasyncio.run_in_executor()
.It doesnot re-implement multiprocessing using asynchronous I/O. This meansthere is extra overhead added when you useaioprocessing
objects instead ofmultiprocessing
objects, because each one is generally introducing aThreadPoolExecutor
containing at least onethreading.Thread
. It also meansthat all the normal risks you get when you mix threads with fork apply here, too(Seehttp://bugs.python.org/issue6721 for more info).
The one exception to this isaioprocessing.AioPool
, which makes use of theexistingcallback
anderror_callback
keyword arguments in the variousPool.*_async
methods to run them asasyncio
coroutines. Note thatmultiprocessing.Pool
is actually using threads internally, so the thread/forkmixing caveat still applies.
Eachmultiprocessing
class is replaced by an equivalentaioprocessing
class,distinguished by theAio
prefix. So,Pool
becomesAioPool
, etc. All methodsthat could block on I/O also have a coroutine version that can be used withasyncio
. For example,multiprocessing.Lock.acquire()
can be replaced withaioprocessing.AioLock.coro_acquire()
. You can pass anasyncio
EventLoop object to anycoro_*
method using theloop
keyword argument. For example,lock.coro_acquire(loop=my_loop)
.
Note that you can also use theaioprocessing
synchronization primitives as replacementsfor their equivalentthreading
primitives, in single-process, multi-threaded programsthat useasyncio
.
Most of them! All methods that could do blocking I/O in the following objectshave equivalent versions inaioprocessing
that extend themultiprocessing
versions by adding coroutine versions of all the blocking methods.
Pool
Process
Pipe
Lock
RLock
Semaphore
BoundedSemaphore
Event
Condition
Barrier
connection.Connection
connection.Listener
connection.Client
Queue
JoinableQueue
SimpleQueue
- All
managers.SyncManager
Proxy
versions of the items above (SyncManager.Queue
,SyncManager.Lock()
, etc.).
aioprocessing
will work out of the box on Python 3.5+.
Keep in mind that, while the API exposes coroutines for interacting withmultiprocessing
APIs, internally they are almost always being delegatedto aThreadPoolExecutor
, this means the caveats that apply with usingThreadPoolExecutor
withasyncio
apply: namely, you won't be able tocancel any of the coroutines, because the work being done in the workerthread can't be interrupted.
About
A Python 3.5+ library that integrates the multiprocessing module with asyncio