Distributed
—ModuleTools for distributed parallel processing.
Distributed.addprocs
—Functionaddprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
Launches worker processes via the specified cluster manager.
For example, Beowulf clusters are supported via a custom cluster manager implemented in the packageClusterManagers.jl
.
The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variableJULIA_WORKER_TIMEOUT
in the worker process's environment. Relevant only when using TCP/IP as transport.
To launch workers without blocking the REPL, or the containing function if launching workers programmatically, executeaddprocs
in its own task.
Examples
# On busy clusters, call `addprocs` asynchronouslyt = @async addprocs(...)
# Utilize workers as and when they come onlineif nprocs() > 1 # Ensure at least one new worker is available .... # perform distributed executionend
# Retrieve newly launched worker IDs, or any error messagesif istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block if nworkers() == N new_pids = fetch(t) else fetch(t) endend
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers
Add worker processes on remote machines via SSH. Configuration is done with keyword arguments (see below). In particular, theexename
keyword can be used to specify the path to thejulia
binary on the remote machine(s).
machines
is a vector of "machine specifications" which are given as strings of the form[user@]host[:port] [bind_addr[:port]]
.user
defaults to current user andport
to the standard SSH port. If[bind_addr[:port]]
is specified, other workers will connect to this worker at the specifiedbind_addr
andport
.
It is possible to launch multiple processes on a remote host by using a tuple in themachines
vector or the form(machine_spec, count)
, wherecount
is the number of workers to be launched on the specified host. Passing:auto
as the worker count will launch as many workers as the number of CPU threads on the remote host.
Examples:
addprocs([ "remote1", # one worker on 'remote1' logging in with the current username "user@remote2", # one worker on 'remote2' logging in with the 'user' username "user@remote3:2222", # specifying SSH port to '2222' for 'remote3' ("user@remote4", 4), # launch 4 workers on 'remote4' ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'])
Keyword arguments:
tunnel
: iftrue
then SSH tunneling will be used to connect to the worker from the master process. Default isfalse
.
multiplex
: iftrue
then SSH multiplexing is used for SSH tunneling. Default isfalse
.
ssh
: the name or path of the SSH client executable used to start the workers. Default is"ssh"
.
sshflags
: specifies additional ssh options, e.g.sshflags=`-i /home/foo/bar.pem`
max_parallel
: specifies the maximum number of workers connected to in parallel at a host. Defaults to 10.
shell
: specifies the type of shell to which ssh connects on the workers.
shell=:posix
: a POSIX-compatible Unix/Linux shell (sh, ksh, bash, dash, zsh, etc.). The default.
shell=:csh
: a Unix C shell (csh, tcsh).
shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: specifies the working directory on the workers. Defaults to the host's current directory (as found bypwd()
)
enable_threaded_blas
: iftrue
then BLAS will run on multiple threads in added processes. Default isfalse
.
exename
: name of thejulia
executable. Defaults to"$(Sys.BINDIR)/julia"
or"$(Sys.BINDIR)/julia-debug"
as the case may be. It is recommended that a common Julia version is used on all remote machines because serialization and code distribution might fail otherwise.
exeflags
: additional flags passed to the worker processes.
topology
: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error.
topology=:all_to_all
: All processes are connected to each other. The default.
topology=:master_worker
: Only the driver process, i.e.pid
1 connects to the workers. The workers do not connect to each other.
topology=:custom
: Thelaunch
method of the cluster manager specifies the connection topology via fieldsident
andconnect_idents
inWorkerConfig
. A worker with a cluster manager identityident
will connect to all workers specified inconnect_idents
.
lazy
: Applicable only withtopology=:all_to_all
. Iftrue
, worker-worker connections are setup lazily, i.e. they are setup at the first instance of a remote call between workers. Default is true.
env
: provide an array of string pairs such asenv=["JULIA_DEPOT_PATH"=>"/depot"]
to request that environment variables are set on the remote machine. By default only the environment variableJULIA_WORKER_TIMEOUT
is passed automatically from the local to the remote environment.
cmdline_cookie
: pass the authentication cookie via the--worker
commandline option. The (more secure) default behaviour of passing the cookie via ssh stdio may hang with Windows workers that use older (pre-ConPTY) Julia or Windows versions, in which casecmdline_cookie=true
offers a work-around.
The keyword argumentsssh
,shell
,env
andcmdline_cookie
were added in Julia 1.6.
Environment variables:
If the master process fails to establish a connection with a newly launched worker within 60.0 seconds, the worker treats it as a fatal situation and terminates. This timeout can be controlled via environment variableJULIA_WORKER_TIMEOUT
. The value ofJULIA_WORKER_TIMEOUT
on the master process specifies the number of seconds a newly launched worker waits for connection establishment.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
Launchnp
workers on the local host using the in-builtLocalManager
.
Local workers inherit the current package environment (i.e., active project,LOAD_PATH
, andDEPOT_PATH
) from the main process.
Note that workers do not run a~/.julia/config/startup.jl
startup script, nor do they synchronize their global state (such as command-line switches, global variables, new method definitions, and loaded modules) with any of the other running processes.
Keyword arguments:
restrict::Bool
: iftrue
(default) binding is restricted to127.0.0.1
.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: same effect as forSSHManager
, see documentation foraddprocs(machines::AbstractVector)
.The inheriting of the package environment and theenv
keyword argument were added in Julia 1.9.
Distributed.nprocs
—Functionnprocs()
Get the number of available processes.
Examples
julia> nprocs()3julia> workers()2-element Array{Int64,1}: 2 3
Distributed.nworkers
—Functionnworkers()
Get the number of available worker processes. This is one less thannprocs()
. Equal tonprocs()
ifnprocs() == 1
.
Examples
$ julia -p 2julia> nprocs()3julia> nworkers()2
Distributed.procs
—Methodprocs()
Return a list of all process identifiers, including pid 1 (which is not included byworkers()
).
Examples
$ julia -p 2julia> procs()3-element Array{Int64,1}: 1 2 3
Distributed.procs
—Methodprocs(pid::Integer)
Return a list of all process identifiers on the same physical node. Specifically all workers bound to the same ip-address aspid
are returned.
Distributed.workers
—Functionworkers()
Return a list of all worker process identifiers.
Examples
$ julia -p 2julia> workers()2-element Array{Int64,1}: 2 3
Distributed.rmprocs
—Functionrmprocs(pids...; waitfor=typemax(Int))
Remove the specified workers. Note that only process 1 can add or remove workers.
Argumentwaitfor
specifies how long to wait for the workers to shut down:
rmprocs
will wait until all requestedpids
are removed.ErrorException
is raised if all workers cannot be terminated before the requestedwaitfor
seconds.waitfor
value of 0, the call returns immediately with the workers scheduled for removal in a different task. The scheduledTask
object is returned. The user should callwait
on the task before invoking any other parallel calls.Examples
$ julia -p 5julia> t = rmprocs(2, 3, waitfor=0)Task (runnable) @0x0000000107c718d0julia> wait(t)julia> workers()3-element Array{Int64,1}: 4 5 6
Distributed.interrupt
—Functioninterrupt(pids::Integer...)
Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
interrupt(pids::AbstractVector=workers())
Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
Distributed.myid
—Functionmyid()
Get the id of the current process.
Examples
julia> myid()1julia> remotecall_fetch(() -> myid(), 4)4
Distributed.pmap
—Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Transform collectionc
by applyingf
to each element using available workers and tasks.
For multiple collection arguments, applyf
elementwise.
Note thatf
must be made available to all worker processes; seeCode Availability and Loading Packages for details.
If a worker pool is not specified all available workers will be used via aCachingPool
.
By default,pmap
distributes the computation over all specified workers. To use only the local process and distribute over tasks, specifydistributed=false
. This is equivalent to usingasyncmap
. For example,pmap(f, c; distributed=false)
is equivalent toasyncmap(f,c; ntasks=()->nworkers())
pmap
can also use a mix of processes and tasks via thebatch_size
argument. For batch sizes greater than 1, the collection is processed in multiple batches, each of lengthbatch_size
or less. A batch is sent as a single request to a free worker, where a localasyncmap
processes elements from the batch using multiple concurrent tasks.
Any error stopspmap
from processing the remainder of the collection. To override this behavior you can specify an error handling function via argumenton_error
which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller.
Consider the following two examples. The first one returns the exception object inline, the second a 0 in place of any exception:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)4-element Array{Any,1}: 1 ErrorException("foo") 3 ErrorException("foo")julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)4-element Array{Int64,1}: 1 0 3 0
Errors can also be handled by retrying failed computations. Keyword argumentsretry_delays
andretry_check
are passed through toretry
as keyword argumentsdelays
andcheck
respectively. If batching is specified, and an entire batch fails, all items in the batch are retried.
Note that if bothon_error
andretry_delays
are specified, theon_error
hook is called before retrying. Ifon_error
does not throw (or rethrow) an exception, the element will not be retried.
Example: On errors, retryf
on an element a maximum of 3 times without any delay between retries.
pmap(f, c; retry_delays = zeros(3))
Example: Retryf
only if the exception is not of typeInexactError
, with exponentially increasing delays up to 3 times. Return aNaN
in place for allInexactError
occurrences.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
—TypeRemoteException(captured)
Exceptions on remote computations are captured and rethrown locally. ARemoteException
wraps thepid
of the worker and a captured exception. ACapturedException
captures the remote exception and a serializable form of the call stack when the exception was raised.
Distributed.ProcessExitedException
—TypeProcessExitedException(worker_id::Int)
After a client Julia process has exited, further attempts to reference the dead child will throw this exception.
Distributed.Future
—TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
AFuture
is a placeholder for a single computation of unknown termination status and time. For multiple potential computations, seeRemoteChannel
. Seeremoteref_id
for identifying anAbstractRemoteRef
.
Distributed.RemoteChannel
—TypeRemoteChannel(pid::Integer=myid())
Make a reference to aChannel{Any}(1)
on processpid
. The defaultpid
is the current process.
RemoteChannel(f::Function, pid::Integer=myid())
Create references to remote channels of a specific size and type.f
is a function that when executed onpid
must return an implementation of anAbstractChannel
.
For example,RemoteChannel(()->Channel{Int}(10), pid)
, will return a reference to a channel of typeInt
and size 10 onpid
.
The defaultpid
is the current process.
Base.fetch
—Methodfetch(x::Future)
Wait for and get the value of aFuture
. The fetched value is cached locally. Further calls tofetch
on the same reference return the cached value. If the remote value is an exception, throws aRemoteException
which captures the remote exception and backtrace.
Base.fetch
—Methodfetch(c::RemoteChannel)
Wait for and get a value from aRemoteChannel
. Exceptions raised are the same as for aFuture
. Does not remove the item fetched.
fetch(x::Any)
Returnx
.
Distributed.remotecall
—Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
Call a functionf
asynchronously on the given arguments on the specified process. Return aFuture
. Keyword arguments, if any, are passed through tof
.
Distributed.remotecall_wait
—Methodremotecall_wait(f, id::Integer, args...; kwargs...)
Perform a fasterwait(remotecall(...))
in one message on theWorker
specified by worker idid
. Keyword arguments, if any, are passed through tof
.
See alsowait
andremotecall
.
Distributed.remotecall_fetch
—Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
Performfetch(remotecall(...))
in one message. Keyword arguments, if any, are passed through tof
. Any remote exceptions are captured in aRemoteException
and thrown.
See alsofetch
andremotecall
.
Examples
$ julia -p 2julia> remotecall_fetch(sqrt, 2, 4)2.0julia> remotecall_fetch(sqrt, 2, -4)ERROR: On worker 2:DomainError with -4.0:sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x))....
Distributed.remote_do
—Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
Executesf
on workerid
asynchronously. Unlikeremotecall
, it does not store the result of computation, nor is there a way to wait for its completion.
A successful invocation indicates that the request has been accepted for execution on the remote node.
While consecutiveremotecall
s to the same worker are serialized in the order they are invoked, the order of executions on the remote worker is undetermined. For example,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
will serialize the call tof1
, followed byf2
andf3
in that order. However, it is not guaranteed thatf1
is executed beforef3
on worker 2.
Any exceptions thrown byf
are printed tostderr
on the remote worker.
Keyword arguments, if any, are passed through tof
.
Base.put!
—Methodput!(rr::RemoteChannel, args...)
Store a set of values to theRemoteChannel
. If the channel is full, blocks until space is available. Return the first argument.
Base.put!
—Methodput!(rr::Future, v)
Store a value to aFuture
rr
.Future
s are write-once remote references. Aput!
on an already setFuture
throws anException
. All asynchronous remote calls returnFuture
s and set the value to the return value of the call upon completion.
Base.take!
—Methodtake!(rr::RemoteChannel, args...)
Fetch value(s) from aRemoteChannel
rr
, removing the value(s) in the process.
Base.isready
—Methodisready(rr::RemoteChannel, args...)
Determine whether aRemoteChannel
has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. However, it can be safely used on aFuture
since they are assigned only once.
Base.isready
—Methodisready(rr::Future)
Determine whether aFuture
has a value stored to it.
If the argumentFuture
is owned by a different node, this call will block to wait for the answer. It is recommended to wait forrr
in a separate task instead or to use a localChannel
as a proxy:
p = 1f = Future(p)errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))isready(f) # will not block
Distributed.AbstractWorkerPool
—TypeAbstractWorkerPool
Supertype for worker pools such asWorkerPool
andCachingPool
. AnAbstractWorkerPool
should implement:
push!
- add a new worker to the overall pool (available + busy)put!
- put back a worker to the available pooltake!
- take a worker from the available pool (to be used for remote function execution)length
- number of workers available in the overall poolisready
- return false if atake!
on the pool would block, else trueThe default implementations of the above (on aAbstractWorkerPool
) require fields
channel::Channel{Int}
workers::Set{Int}
wherechannel
contains free worker pids andworkers
is the set of all workers associated with this pool.
Distributed.WorkerPool
—TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Create aWorkerPool
from a vector or range of worker ids.
Examples
$ julia -p 3julia> WorkerPool([2, 3])WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))julia> WorkerPool(2:4)WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
—TypeCachingPool(workers::Vector{Int})
An implementation of anAbstractWorkerPool
.remote
,remotecall_fetch
,pmap
(and other remote calls which execute functions remotely) benefit from caching the serialized/deserialized functions on the worker nodes, especially closures (which may capture large amounts of data).
The remote cache is maintained for the lifetime of the returnedCachingPool
object. To clear the cache earlier, useclear!(pool)
.
For global variables, only the bindings are captured in a closure, not the data.let
blocks can be used to capture global data.
Examples
const foo = rand(10^8);wp = CachingPool(workers())let foo = foo pmap(i -> sum(foo) + i, wp, 1:100);end
The above would transferfoo
only once to each worker.
Distributed.default_worker_pool
—Functiondefault_worker_pool()
AbstractWorkerPool
containing idleworkers
- used byremote(f)
andpmap
(by default). Unless one is explicitly set viadefault_worker_pool!(pool)
, the default worker pool is initialized to aWorkerPool
.
Examples
$ julia -p 3julia> default_worker_pool()WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
—Functionclear!(syms, pids=workers(); mod=Main)
Clears global bindings in modules by initializing them tonothing
.syms
should be of typeSymbol
or a collection ofSymbol
s .pids
andmod
identify the processes and the module in which global variables are to be reinitialized. Only those names found to be defined undermod
are cleared.
An exception is raised if a global constant is requested to be cleared.
clear!(pool::CachingPool) -> pool
Removes all cached functions from all participating workers.
Distributed.remote
—Functionremote([p::AbstractWorkerPool], f) -> Function
Return an anonymous function that executes functionf
on an available worker (drawn fromWorkerPool
p
if provided) usingremotecall_fetch
.
Distributed.remotecall
—Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
variant ofremotecall(f, pid, ....)
. Wait for and take a free worker frompool
and perform aremotecall
on it.
Examples
$ julia -p 3julia> wp = WorkerPool([2, 3]);julia> A = rand(3000);julia> f = remotecall(maximum, wp, A)Future(2, 1, 6, nothing)
In this example, the task ran on pid 2, called from pid 1.
Distributed.remotecall_wait
—Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
variant ofremotecall_wait(f, pid, ....)
. Wait for and take a free worker frompool
and perform aremotecall_wait
on it.
Examples
$ julia -p 3julia> wp = WorkerPool([2, 3]);julia> A = rand(3000);julia> f = remotecall_wait(maximum, wp, A)Future(3, 1, 9, nothing)julia> fetch(f)0.9995177101692958
Distributed.remotecall_fetch
—Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
variant ofremotecall_fetch(f, pid, ....)
. Waits for and takes a free worker frompool
and performs aremotecall_fetch
on it.
Examples
$ julia -p 3julia> wp = WorkerPool([2, 3]);julia> A = rand(3000);julia> remotecall_fetch(maximum, wp, A)0.9995177101692958
Distributed.remote_do
—Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
variant ofremote_do(f, pid, ....)
. Wait for and take a free worker frompool
and perform aremote_do
on it.
Distributed.@spawn
—Macro@spawn expr
Create a closure around an expression and run it on an automatically-chosen process, returning aFuture
to the result. This macro is deprecated;@spawnat :any expr
should be used instead.
Examples
julia> addprocs(3);julia> f = @spawn myid()Future(2, 1, 5, nothing)julia> fetch(f)2julia> f = @spawn myid()Future(3, 1, 7, nothing)julia> fetch(f)3
As of Julia 1.3 this macro is deprecated. Use@spawnat :any
instead.
Distributed.@spawnat
—Macro@spawnat p expr
Create a closure around an expression and run the closure asynchronously on processp
. Return aFuture
to the result. Ifp
is the quoted literal symbol:any
, then the system will pick a processor to use automatically.
Examples
julia> addprocs(3);julia> f = @spawnat 2 myid()Future(2, 1, 3, nothing)julia> fetch(f)2julia> f = @spawnat :any myid()Future(3, 1, 7, nothing)julia> fetch(f)3
The:any
argument is available as of Julia 1.3.
Distributed.@fetch
—MacroDistributed.@fetchfrom
—MacroDistributed.@distributed
—Macro@distributed
A distributed memory, parallel for loop of the form :
@distributed [reducer] for var = range bodyend
The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified,@distributed
performs local reductions on each worker with a final reduction on the calling process.
Note that without a reducer function,@distributed
executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with@sync
, like :
@sync @distributed for var = range bodyend
Distributed.@everywhere
—Macro@everywhere [procs()] expr
Execute an expression underMain
on allprocs
. Errors on any of the processes are collected into aCompositeException
and thrown. For example:
@everywhere bar = 1
will defineMain.bar
on all current processes. Any processes added later (say withaddprocs()
) will not have the expression defined.
Unlike@spawnat
,@everywhere
does not capture any local variables. Instead, local variables can be broadcast using interpolation:
foo = 1@everywhere bar = $foo
The optional argumentprocs
allows specifying a subset of all processes to have execute the expression.
Similar to callingremotecall_eval(Main, procs, expr)
, but with two extra features:
- `using` and `import` statements run on the calling process first, to ensure packages are precompiled.- The current source file path used by `include` is propagated to other processes.
Distributed.remoteref_id
—Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s andRemoteChannel
s are identified by fields:
where
- refers to the node where the underlying object/storage referred to by the reference actually exists.
whence
- refers to the node the remote reference was created from. Note that this is different from the node where the underlying object referred to actually exists. For example callingRemoteChannel(2)
from the master process would result in awhere
value of 2 and awhence
value of 1.
id
is unique across all references created from the worker specified bywhence
.
Taken together,whence
andid
uniquely identify a reference across all workers.
remoteref_id
is a low-level API which returns aRRID
object that wrapswhence
andid
values of a remote reference.
Distributed.channel_from_id
—Functionchannel_from_id(id) -> c
A low-level API which returns the backingAbstractChannel
for anid
returned byremoteref_id
. The call is valid only on the node where the backing channel exists.
Distributed.worker_id_from_socket
—Functionworker_id_from_socket(s) -> pid
A low-level API which, given aIO
connection or aWorker
, returns thepid
of the worker it is connected to. This is useful when writing customserialize
methods for a type, which optimizes the data written out depending on the receiving process id.
Distributed.cluster_cookie
—Methodcluster_cookie() -> cookie
Return the cluster cookie.
Distributed.cluster_cookie
—Methodcluster_cookie(cookie) -> cookie
Set the passed cookie as the cluster cookie, then returns it.
This interface provides a mechanism to launch and manage Julia workers on different cluster environments. There are two types of managers present in Base:LocalManager
, for launching additional workers on the same host, andSSHManager
, for launching on remote hosts viassh
. TCP/IP sockets are used to connect and transport messages between processes. It is possible for Cluster Managers to provide a different transport.
Distributed.ClusterManager
—TypeClusterManager
Supertype for cluster managers, which control workers processes as a cluster. Cluster managers implement how workers can be added, removed and communicated with.SSHManager
andLocalManager
are subtypes of this.
Distributed.WorkerConfig
—TypeWorkerConfig
Type used byClusterManager
s to control workers added to their clusters. Some fields are used by all cluster managers to access a host:
io
– the connection used to access the worker (a subtype ofIO
orNothing
)host
– the host address (either aString
orNothing
)port
– the port on the host used to connect to the worker (either anInt
orNothing
)Some are used by the cluster manager to add workers to an already-initialized host:
count
– the number of workers to be launched on the hostexename
– the path to the Julia executable on the host, defaults to"$(Sys.BINDIR)/julia"
or"$(Sys.BINDIR)/julia-debug"
exeflags
– flags to use when launching Julia remotelyTheuserdata
field is used to store information for each worker by external managers.
Some fields are used bySSHManager
and similar managers:
tunnel
–true
(use tunneling),false
(do not use tunneling), ornothing
(use default for the manager)multiplex
–true
(use SSH multiplexing for tunneling) orfalse
forward
– the forwarding option used for-L
option of sshbind_addr
– the address on the remote host to bind tosshflags
– flags to use in establishing the SSH connectionmax_parallel
– the maximum number of workers to connect to in parallel on the hostSome fields are used by bothLocalManager
s andSSHManager
s:
connect_at
– determines whether this is a worker-to-worker or driver-to-worker setup callprocess
– the process which will be connected (usually the manager will assign this duringaddprocs
)ospid
– the process ID according to the host OS, used to interrupt worker processesenviron
– private dictionary used to store temporary information by Local/SSH managersident
– worker as identified by theClusterManager
connect_idents
– list of worker ids the worker must connect to if using a custom topologyenable_threaded_blas
–true
,false
, ornothing
, whether to use threaded BLAS or not on the workersDistributed.launch
—Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implemented by cluster managers. For every Julia worker launched by this function, it should append aWorkerConfig
entry tolaunched
and notifylaunch_ntfy
. The function MUST exit once all workers, requested bymanager
have been launched.params
is a dictionary of all keyword argumentsaddprocs
was called with.
Distributed.manage
—Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Implemented by cluster managers. It is called on the master process, during a worker's lifetime, with appropriateop
values:
:register
/:deregister
when a worker is added / removed from the Julia worker pool.:interrupt
wheninterrupt(workers)
is called. TheClusterManager
should signal the appropriate worker with an interrupt signal.:finalize
for cleanup purposes.Base.kill
—Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Implemented by cluster managers. It is called on the master process, byrmprocs
. It should cause the remote worker specified bypid
to exit.kill(manager::ClusterManager.....)
executes a remoteexit()
onpid
.
Sockets.connect
—Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Implemented by cluster managers using custom transports. It should establish a logical connection to worker with idpid
, specified byconfig
and return a pair ofIO
objects. Messages frompid
to current process will be read offinstrm
, while messages to be sent topid
will be written tooutstrm
. The custom transport implementation must ensure that messages are delivered and received completely and in order.connect(manager::ClusterManager.....)
sets up TCP/IP socket connections in-between workers.
Distributed.init_worker
—Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Called by cluster managers implementing custom transports. It initializes a newly launched process as a worker. Command line argument--worker[=<cookie>]
has the effect of initializing a process as a worker using TCP/IP sockets for transport.cookie
is acluster_cookie
.
Distributed.start_worker
—Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
is an internal function which is the default entry point for worker processes connecting via TCP/IP. It sets up the process as a Julia cluster worker.
host:port information is written to streamout
(defaults to stdout).
The function reads the cookie from stdin if required, and listens on a free port (or if specified, the port in the--bind-to
command line option) and schedules tasks to process incoming TCP connections and requests. It also (optionally) closes stdin and redirects stderr to stdout.
It does not return.
Distributed.process_messages
—Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Called by cluster managers using custom transports. It should be called when the custom transport implementation receives the first message from a remote worker. The custom transport must manage a logical connection to the remote worker and provide twoIO
objects, one for incoming messages and the other for messages addressed to the remote worker. Ifincoming
istrue
, the remote peer initiated the connection. Whichever of the pair initiates the connection sends the cluster cookie and its Julia version number to perform the authentication handshake.
See alsocluster_cookie
.
Distributed.default_addprocs_params
—Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Implemented by cluster managers. The default keyword parameters passed when callingaddprocs(mgr)
. The minimal set of options is available by callingdefault_addprocs_params()
Settings
This document was generated withDocumenter.jl version 1.8.0 onWednesday 9 July 2025. Using Julia version 1.11.6.