- Notifications
You must be signed in to change notification settings - Fork12
distributed structured concurrency
License
goodboy/tractor
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
tractor
: distributed structurred concurrency
tractor
is astructured concurrency (SC),multi-processing runtime built ontrio.
Fundamentally,tractor
provides parallelism viatrio
-"actors": independent Pythonprocesses (i.e.non-shared-memory threads) which can scheduletrio
tasks whilstmaintainingend-to-end SC inside adistributed supervision tree.
Cross-process (and thus cross-host) SC is accomplished through thecombined use of our,
- "actornurseries" which provide for spawning multiple, andpossibly nested, Python processes each running a
trio
scheduledruntime - a call totrio.run()
, - an "SC-transitive supervision protocol" enforced as anIPC-message-spec encapsulating all RPC-dialogs.
We believe the system adheres to the3 axioms of an "actor model"but likelydoes not look like whatyou probablythink an "actormodel" looks like, and that'sintentional.
The first step to groktractor
is to get an intermediateknowledge oftrio
andstructured concurrency B)
Some great places to start are,
- the seminalblog post
- obviously thetrio docs
- wikipedia's nascentSC page
- the fancy diagrams @libdill-docs
It's just a
trio
API!Infinitely nesteable process trees running embedded
trio
tasks.Swappable, OS-specific, process spawning via multiple backends.
Modular IPC stack, allowing for custom interchange formats (eg.as offered frommsgspec), varied transport protocols (TCP, RUDP,QUIC, wireguard), and OS-env specific higher-perf primitives (UDS,shm-ring-buffers).
Optionallydistributed: all IPC and RPC APIs work over multi-hosttransports the same as local.
Builtin high-level streaming API that enables your app to easilyleverage the benefits of a "cheap or nasty"(un)protocol.
A "native UX" around a multi-process safe debugger REPL usingpdbp (a fork & fix ofpdb++)
"Infected
asyncio
" mode: support for starting an actor'sruntime as aguest on theasyncio
loop allowing us toprovide stringent SC-styletrio.Task
-supervision around anyasyncio.Task
spawned via ourtractor.to_asyncio
APIs.Avery naive and still very much work-in-progress inter-actordiscovery sys with plans to support multiplemodern protocolapproaches.
Various
trio
extension APIs viatractor.trionics
such as,- task fan-outbroadcasting,- multi-task-single-resource-caching and fan-out-to-multi__aenter__()
APIs for@acm
functions,- (WIP) a
TaskMngr
: one-cancels-one style nursery supervisor.
- (WIP) a
tractor
is still in aalpha-near-beta-stage for manyof its subsystems, however we are very close to having a stablelowlevel runtime and API.
As such, it's currently recommended that you clone and install therepo from source:
pip install git+git://github.com/goodboy/tractor.git
We use the very hipuv for project mgmt:
git clone https://github.com/goodboy/tractor.gitcd tractoruv sync --devuv run python examples/rpc_bidir_streaming.py
Consider activating a virtual/project-env before starting to hack onthe code base:
# you could use plain ol' venvs# https://docs.astral.sh/uv/pip/environments/uv venv tractor_py313 --python 3.13# but @goodboy prefers the more explicit (and shell agnostic)# https://docs.astral.sh/uv/configuration/environment/#uv_project_environmentUV_PROJECT_ENVIRONMENT="tractor_py313# hint hint, enter @goodboy's fave shell B)uv run --dev xonsh
Alongside all this we ofc offer "releases" on PyPi:
pip install tractor
Just note that YMMV since the main git branch is often much furtherahead then any latest release.
Intractor
's (very lacking) documention we prefer to point toexample scripts in the repo over duplicating them in docs, but withthat in mind here are some definitive snippets to try and hook youinto digging deeper.
Usetrio
's style of focussing ontasks as functions:
"""Run with a process monitor from a terminal using:: $TERM -e watch -n 0.1 "pstree -a $$"\ & python examples/parallelism/single_func.py\ && kill $!"""importosimporttractorimporttrioasyncdefburn_cpu():pid=os.getpid()# burn a core @ ~ 50kHzfor_inrange(50000):awaittrio.sleep(1/50000/50)returnos.getpid()asyncdefmain():asyncwithtractor.open_nursery()asn:portal=awaitn.run_in_actor(burn_cpu)# burn rubber in the parent tooawaitburn_cpu()# wait on result from target functionpid=awaitportal.result()# end of nursery blockprint(f"Collected subproc{pid}")if__name__=='__main__':trio.run(main)
This runsburn_cpu()
in a new process and reaps it on completionof the nursery block.
If you only need to run a sync function and retreive a single result, youmight want to check outtrio-parallel.
tractor
tries to protect you from zombies, no matter what.
"""Run with a process monitor from a terminal using:: $TERM -e watch -n 0.1 "pstree -a $$"\ & python examples/parallelism/we_are_processes.py\ && kill $!"""frommultiprocessingimportcpu_countimportosimporttractorimporttrioasyncdeftarget():print(f"Yo, i'm '{tractor.current_actor().name}' "f"running in pid{os.getpid()}" )awaittrio.sleep_forever()asyncdefmain():asyncwithtractor.open_nursery()asn:foriinrange(cpu_count()):awaitn.run_in_actor(target,name=f'worker_{i}')print('This process tree will self-destruct in 1 sec...')awaittrio.sleep(1)# raise an error in root actor/process and trigger# reaping of all minionsraiseException('Self Destructed')if__name__=='__main__':try:trio.run(main)exceptException:print('Zombies Contained')
If you can create zombie child processes (without using a system signal)itis a bug.
Using the magic ofpdbp and our internal IPC, we'vebeen able to create a native feeling debugging experience forany (sub-)process in yourtractor
tree.
fromosimportgetpidimporttractorimporttrioasyncdefbreakpoint_forever():"Indefinitely re-enter debugger in child actor."whileTrue:yield'yo'awaittractor.breakpoint()asyncdefname_error():"Raise a ``NameError``"getattr(doggypants)asyncdefmain():"""Test breakpoint in a streaming actor. """asyncwithtractor.open_nursery(debug_mode=True,loglevel='error', )asn:p0=awaitn.start_actor('bp_forever',enable_modules=[__name__])p1=awaitn.start_actor('name_error',enable_modules=[__name__])# retreive resultsstream=awaitp0.run(breakpoint_forever)awaitp1.run(name_error)if__name__=='__main__':trio.run(main)
You can run this with:
>>> python examples/debugging/multi_daemon_subactors.py
And, yes, there's a built-in crash handling mode B)
We're hoping to add a respawn-from-repl system soon!
Yes, you saw it here first; we provide 2-way streamswith reliable, transitive setup/teardown semantics.
Our nascent api is remniscent oftrio.Nursery.start()
style invocation:
importtrioimporttractor@tractor.contextasyncdefsimple_rpc(ctx:tractor.Context,data:int,)->None:'''Test a small ping-pong 2-way streaming server. '''# signal to parent that we're up much like# ``trio_typing.TaskStatus.started()``awaitctx.started(data+1)asyncwithctx.open_stream()asstream:count=0asyncformsginstream:assertmsg=='ping'awaitstream.send('pong')count+=1else:assertcount==10asyncdefmain()->None:asyncwithtractor.open_nursery()asn:portal=awaitn.start_actor('rpc_server',enable_modules=[__name__], )# XXX: this syntax requires py3.9asyncwith (portal.open_context(simple_rpc,data=10, )as (ctx,sent),ctx.open_stream()asstream, ):assertsent==11count=0# receive msgs using async for styleawaitstream.send('ping')asyncformsginstream:assertmsg=='pong'awaitstream.send('ping')count+=1ifcount>=9:break# explicitly teardown the daemon-actorawaitportal.cancel_actor()if__name__=='__main__':trio.run(main)
See original proposal and discussion in#53 as wellas follow up improvements in#223 that we'd love tohear your thoughts on!
The initial ask from most new users is"how do I make a workerpool thing?".
tractor
is built to handle any SC (structured concurrent) processtree you can imagine; a "worker pool" pattern is a trivial specialcase.
We have afull worker pool re-implementation of the std-lib'sconcurrent.futures.ProcessPoolExecutor
example for reference.
You can run it like so (from this dir) to see the process tree inreal time:
$TERM -e watch -n 0.1 "pstree -a $$" \ & python examples/parallelism/concurrent_actors_primes.py \ && kill $!
This uses no extra threads, fancy semaphores or futures; all we needistractor
's IPC!
Have a bunch ofasyncio
code you want to force to be SC at the process level?
Check out our experimental system forguest-mode controlledasyncio
actors:
importasynciofromstatisticsimportmeanimporttimeimporttrioimporttractorasyncdefaio_echo_server(to_trio:trio.MemorySendChannel,from_trio:asyncio.Queue,)->None:# a first message must be sent **from** this ``asyncio``# task or the ``trio`` side will never unblock from# ``tractor.to_asyncio.open_channel_from():``to_trio.send_nowait('start')# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we# should probably offer something better.whileTrue:# echo the msg backto_trio.send_nowait(awaitfrom_trio.get())awaitasyncio.sleep(0)@tractor.contextasyncdeftrio_to_aio_echo_server(ctx:tractor.Context,):# this will block until the ``asyncio`` task sends a "first"# message.asyncwithtractor.to_asyncio.open_channel_from(aio_echo_server, )as (first,chan):assertfirst=='start'awaitctx.started(first)asyncwithctx.open_stream()asstream:asyncformsginstream:awaitchan.send(msg)out=awaitchan.receive()# echo back to parent actor-taskawaitstream.send(out)asyncdefmain():asyncwithtractor.open_nursery()asn:p=awaitn.start_actor('aio_server',enable_modules=[__name__],infect_asyncio=True, )asyncwithp.open_context(trio_to_aio_echo_server, )as (ctx,first):assertfirst=='start'count=0asyncwithctx.open_stream()asstream:delays= []send=time.time()awaitstream.send(count)asyncformsginstream:recv=time.time()delays.append(recv-send)assertmsg==countcount+=1send=time.time()awaitstream.send(count)ifcount>=1e3:breakprint(f'mean round trip rate (Hz):{1/mean(delays)}')awaitp.cancel_actor()if__name__=='__main__':trio.run(main)
Yes, we spawn a python process, runasyncio
, starttrio
on theasyncio
loop, then send commands to thetrio
scheduled tasks totellasyncio
tasks what to do XD
We need help refining the asyncio-side channel API to be moretrio-like. Feel free to sling your opinion in#273!
To be extra terse thetractor
devs have started hacking some "higherlevel" APIs for managing actor trees/clusters. These interfaces shouldgenerally be condsidered provisional for now but we encourage you to trythem and provide feedback. Here's a new API that let's you quicklyspawn a flat cluster:
importtrioimporttractorasyncdefsleepy_jane():uid=tractor.current_actor().uidprint(f'Yo i am actor{uid}')awaittrio.sleep_forever()asyncdefmain():''' Spawn a flat actor cluster, with one process per detected core. '''portal_map:dict[str,tractor.Portal]results:dict[str,str]# look at this hip new syntax!asyncwith (tractor.open_actor_cluster(modules=[__name__] )asportal_map,trio.open_nursery()asn, ):for (name,portal)inportal_map.items():n.start_soon(portal.run,sleepy_jane)awaittrio.sleep(0.5)# kill the cluster with a cancelraiseKeyboardInterruptif__name__=='__main__':try:trio.run(main)exceptKeyboardInterrupt:pass
tractor
is an attempt to pairtrionicstructured concurrency withdistributed Python. You can think of it as atrio
-across-processes or simply as an opinionated replacement for thestdlib'smultiprocessing
but built on async programming primitivesfrom the ground up.
Don't be scared off by this description.tractor
is justtrio
but with nurseries for process management and cancel-able streaming IPC.If you understand how to work withtrio
,tractor
will give youthe parallelism you may have been needing.
Let's stop and ask how many canon actor model papers have you actually read ;)
From our experience many "actor systems" aren't really "actor models"since theydon't adhere to the3 axioms and pay even lessattention to the problem ofunbounded non-determinism (which was thewhole point for creation of the model in the first place).
From the author's mouth,the only thing required isadherance tothe3 axioms,and that's it.
tractor
adheres to said base requirements of an "actor model":
In response to a message, an actor may:- send a finite number of new messages- create a finite number of new actors- designate a new behavior to process subsequent messages
and requiresno further api changes to accomplish this.
If you want do debate this further please feel free to chime in on ourchat or discuss on one of the following issuesafter you've readeverything in them:
Whether or nottractor
has "actors" underneath should be mostlyirrelevant to users other then for referring to the interactions of ourprimary runtime primitives: each Python process +trio.run()
+ surrounding IPC machinery. These are our high level, baseruntime-units-of-abstraction which bothare (as much as they canbe in Python) and will be referred to as our"actors".
The main goal oftractor
is is to allow for highly distributedsoftware that, through the adherence tostructured concurrency,results in systems which fail in predictable, recoverable and maybe evenunderstandable ways; being an "actor model" is just one way to describeproperties of the system.
Help us push toward the future of distributed Python.
- Erlang-style supervisors via composed context managers (see#22)
- Typed messaging protocols (ex. via
msgspec.Struct
, see#36) - Typed capability-based (dialog) protocols ( see#196 with draft workstarted in#311)
- Werecently disabled CI-testing on windows and need help gettingit running again! (see#327).We do have windowssupport (and have for quite a while) but since no active hackerexists in the user-base to help test on that OS, for now we're notactively maintaining testing due to the added hassle and generallatency..
This project is very much coupled to the ongoing development oftrio
(i.e.tractor
gets most of its ideas from that brilliantcommunity). If you want to help, have suggestions or just want tosay hi, please feel free to reach us in ourmatrix channel. Ifmatrix seems too hip, we're also mostly all in the thetrio gitterchannel!
About
distributed structured concurrency
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors4
Uh oh!
There was an error while loading.Please reload this page.