Rendezvous#
Created On: May 04, 2021 | Last Updated On: May 22, 2024
In the context of Torch Distributed Elastic we use the termrendezvous torefer to a particular functionality that combines adistributedsynchronization primitive withpeer discovery.
It is used by Torch Distributed Elastic to gather participants of a trainingjob (i.e. nodes) such that they all agree on the same list of participants andeveryone’s roles, as well as make a consistent collective decision on whentraining can begin/resume.
Torch Distributed Elastic rendezvous provides the following criticalfunctionalities:
Barrier:
Nodes performing rendezvous will all block until the rendezvous is consideredcomplete - this happens when at leastmin total number of nodes have joinedthe rendezvous barrier (for the same job). This also implies the barrier is notnecessarily of fixed size.
There’s an additional small waiting time after reachingmin number ofnodes - this is used to ensure the rendezvous is not completed “too quickly”(which could potentially exclude additional nodes attempting to join atapproximately the same time).
Ifmax number of nodes is gathered at the barrier, the rendezvous iscompleted immediately.
There’s also an overall timeout which causes the rendezvous to fail ifminnumber of nodes is never reached - this is meant to be a simple fail-safe tohelp release partially allocated job resources, in case there’s a problem withthe resource manager, and is meant to be interpreted as non-retryable.
Exclusivity:
A simple distributed barrier would not be sufficient, as we also need to ensurethat only one group of nodes exists at any given time (for a given job). Inother words, new nodes (i.e. joining late) should not be able to form a parallelindependent group of workers for the same job.
Torch Distributed Elastic rendezvous ensures that if a group of nodes hasalready completed a rendezvous (and hence might already be training), thenadditional “late” nodes attempting to rendezvous will only announce themselvesas waiting, and will have to wait until the (previously completed) existingrendezvous is destroyed first.
Consistency:
When a rendezvous is completed, all its members will agree on the job membershipand everyone’s role in it. This role is represented using an integer, calledrank, that is between between 0 and world size.
Note that ranks arenot stable, in the sense that the same node can beassigned a different rank in the next (re-)rendezvous.
Fault-tolerance:
Torch Distributed Elastic rendezvous is designed to tolerate node failuresduring the rendezvous process. Should a process crash (or lose networkconnectivity, etc), between joining the rendezvous and it being completed, thena re-rendezvous with remaining healthy nodes will happen automatically.
A node can also failafter it has completed (orhas been observed by othernodes to have completed) the rendezvous - this scenario will be handled by theTorch Distributed Elastictrain_loop instead (where it will also trigger are-rendezvous).
Shared key-value store:
When the rendezvous is completed, a shared key-value store is created andreturned. This store implements atorch.distributed.Store API (seedistributed communication docs).
This store is only shared by the members of the completed rendezvous. Itis intended to be used by Torch Distributed Elastic to exchange informationnecessary to initialize job control and data-planes.
Waiting workers and rendezvous closing:
Torch Distributed Elastic rendezvous handler object provides additionalfunctionalities, which are technically not part of the rendezvous process:
Querying how many workers arrived late at the barrier, who can participate innext rendezvous.
Setting the rendezvousclosed to signal all nodes not to participate innext rendezvous.
DynamicRendezvousHandler:
Torch Distributed Elastic comes with theDynamicRendezvousHandlerclass that implements the rendezvous mechanism described above. It is a backend-agnostic type that expects a particularRendezvousBackend instanceto be specified during construction.
Torch distributed users can either implement their own backend type or use oneof the following implementations that come with PyTorch:
C10dRendezvousBackend: Uses a C10d store (by defaultTCPStore) as the rendezvous backend. The main advantage of using a C10dstore is that it requires no 3rd-party dependency (such as etcd) to establisha rendezvous.EtcdRendezvousBackend: Supersedes the legacyEtcdRendezvousHandlerclass. Passing anEtcdRendezvousBackendinstance toDynamicRendezvousHandleris functionally equivalent toinstantiating anEtcdRendezvousHandler.store=TCPStore("localhost")backend=C10dRendezvousBackend(store,"my_run_id")rdzv_handler=DynamicRendezvousHandler.from_backend(run_id="my_run_id",store=store,backend=backend,min_nodes=2,max_nodes=4)
Below is a state diagram describing how rendezvous works.

Registry#
- classtorch.distributed.elastic.rendezvous.RendezvousParameters(backend,endpoint,run_id,min_nodes,max_nodes,local_addr=None,**kwargs)[source]#
Hold the parameters to construct a
RendezvousHandler.- Parameters:
backend (str) – The name of the backend to use to handle the rendezvous.
endpoint (str) – The endpoint of the rendezvous, usually in form <hostname>[:<port>].
run_id (str) – The id of the rendezvous.
min_nodes (int) – The minimum number of nodes to admit to the rendezvous.
max_nodes (int) – The maximum number of nodes to admit to the rendezvous.
local_addr (str |None) – The address of the local node.
**kwargs – Additional parameters for the specified backend.
- classtorch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]#
Represent a registry of
RendezvousHandlerbackends.
Handler#
- classtorch.distributed.elastic.rendezvous.RendezvousHandler[source]#
Main rendezvous interface.
Note
Distributed Torch users normallydo not need to implement their own
RendezvousHandler. An implementation based on C10d Store is alreadyprovided, and is recommended for most users.- abstractget_run_id()[source]#
Return the run id of the rendezvous.
The run id is a user-defined id that uniquely identifies an instance ofa distributed application. It typically maps to a job id and is used toallow nodes to join the correct distributed application.
- Return type:
- abstractis_closed()[source]#
Check whether the rendezvous has been closed.
A closed rendezvous means all future attempts to re-rendezvous withinsame job will fail.
is_closed()andset_closed()have semantics of eventualpropagation and should not be used for synchronization. The intention isthat if at least one node decides the job is finished, it will close therendezvous, and other nodes will soon observe this and stop running aswell.- Return type:
- abstractnext_rendezvous()[source]#
Main entry-point into the rendezvous barrier.
Blocks until the rendezvous is complete and the current process isincluded in the formed worker group, or a timeout occurs, or therendezvous was marked closed.
- Returns:
Instance of
RendezvousInfo.- Raises:
RendezvousClosedError – The rendezvous is closed.
RendezvousConnectionError – The connection to the rendezvous backend has failed.
RendezvousStateError – The rendezvous state is corrupt.
RendezvousTimeoutError – The rendezvous did not complete on time.
- Return type:
- abstractnum_nodes_waiting()[source]#
Return the number of nodes who arrived late at the rendezvousbarrier, hence were not included in the current worker group.
Callers should periodically call this method to check whether newnodes are waiting to join the job and if so admit them by calling
next_rendezvous()(re-rendezvous).- Return type:
- abstractshutdown()[source]#
Close all resources that were open for the rendezvous.
Example:
rdzv_handler=...try:store,rank,world_size=rdzv_handler.next_rendezvous()finally:rdzv_handler.shutdown()
- Return type:
- propertyuse_agent_store:bool#
Indicates that store reference returned by
next_rendezvous()can be shared with userapplications and will be available during application lifecycle.Rendezvous handler impl will share store details as instance of
RendezvousStoreInfo.Applications as a convention useMASTER_ADDR/MASTER_PORT env variables to lookup the store.
Dataclasses#
- classtorch.distributed.elastic.rendezvous.RendezvousInfo(store,rank,world_size,bootstrap_store_info)[source]#
Holds the information about the rendezvous.
- classtorch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr,master_port)[source]#
Store address and port that can be used to bootstrap trainer distributed comms
Exceptions#
- classtorch.distributed.elastic.rendezvous.api.RendezvousError[source]#
Represents the base type for rendezvous errors.
- classtorch.distributed.elastic.rendezvous.api.RendezvousClosedError[source]#
Raised when a rendezvous is closed.
- classtorch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source]#
Raised when a rendezvous did not complete on time.
- classtorch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source]#
Raised when the connection to a rendezvous backend has failed.
Implementations#
Dynamic Rendezvous#
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store,backend,params)[source]#
Create a new
DynamicRendezvousHandlerfrom the specified parameters.- Parameters:
store (Store) – The C10d store to return as part of the rendezvous.
backend (RendezvousBackend) – The backend to use to hold the rendezvous state.
- Return type:
Parameter
Description
join_timeout
The total time, in seconds, within which therendezvous is expected to complete. Defaults to 600seconds.
last_call_timeout
An additional wait amount, in seconds, beforecompleting the rendezvous once the minimum number ofnodes has been reached. Defaults to 30 seconds.
close_timeout
The time, in seconds, within which the rendezvous isexpected to close after a call to
RendezvousHandler.set_closed()orRendezvousHandler.shutdown(). Defaults to30 seconds.heartbeat
The time, in seconds, within which a keep-aliveheartbeat is expected to complete
- classtorch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]#
Represent a handler that sets up a rendezvous among a set of nodes.
- classmethodfrom_backend(run_id,store,backend,min_nodes,max_nodes,local_addr=None,timeout=None,keep_alive_interval=5,keep_alive_max_attempt=3)[source]#
Create a new
DynamicRendezvousHandler.- Parameters:
run_id (str) – The run id of the rendezvous.
store (Store) – The C10d store to return as part of the rendezvous.
backend (RendezvousBackend) – The backend to use to hold the rendezvous state.
min_nodes (int) – The minimum number of nodes to admit to the rendezvous.
max_nodes (int) – The maximum number of nodes to admit to the rendezvous.
local_addr (str |None) – The local node address.
timeout (RendezvousTimeout |None) – The timeout configuration of the rendezvous.
keep_alive_interval (int) – The amount of time a node waits before sending a heartbeat to keepit alive in the rendezvous.
keep_alive_max_attempt (int) – The maximum number of failed heartbeat attempts after which a nodeis considered dead.
- classtorch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]#
Represent a backend that holds the rendezvous state.
- abstractget_state()[source]#
Get the rendezvous state.
- Returns:
A tuple of the encoded rendezvous state and its fencing token or
Noneif no state is found in the backend.- Raises:
RendezvousConnectionError – The connection to the backend has failed.
RendezvousStateError – The rendezvous state is corrupt.
- Return type:
- abstractset_state(state,token=None)[source]#
Set the rendezvous state.
The new rendezvous state is set conditionally:
If the specified
tokenmatches the fencing token stored in thebackend, the state will be updated. The new state will be returnedto the caller along with its fencing token.If the specified
tokendoes not match the fencing token storedin the backend, the state won’t be updated; instead the existingstate along with its fencing token will be returned to the caller.If the specified
tokenisNone, the new state will be setonly if there is no existing state in the backend. Either the newstate or the existing state along with its fencing token will bereturned to the caller.
- Parameters:
state (bytes) – The encoded rendezvous state.
token (Any |None) – An optional fencing token that was retrieved by a previous callto
get_state()orset_state().
- Returns:
A tuple of the serialized rendezvous state, its fencing token, anda boolean value indicating whether our set attempt succeeded.
- Raises:
RendezvousConnectionError – The connection to the backend has failed.
RendezvousStateError – The rendezvous state is corrupt.
- Return type:
- classtorch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None,last_call=None,close=None,heartbeat=None)[source]#
Hold the timeout configuration of a rendezvous.
- Parameters:
join (timedelta |None) – The time within which the rendezvous is expected to complete.
last_call (timedelta |None) – An additional wait amount before completing the rendezvous once therendezvous has the minimum number of required participants.
close (timedelta |None) – The time within which the rendezvous is expected to close after acall to
RendezvousHandler.set_closed()orRendezvousHandler.shutdown().heartbeat (timedelta |None) – The time within which a keep-alive heartbeat is expected tocomplete.
C10d Backend#
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]#
Create a new
C10dRendezvousBackendfrom the specified parameters.Parameter
Description
store_type
The type of the C10d store. The currently supported typesare “tcp” and “file” which correspond to
torch.distributed.TCPStoreandtorch.distributed.FileStore, respectively.Defaults to “tcp”.read_timeout
The read timeout, in seconds, for store operations.Defaults to 60 seconds.
Note this only applies to
torch.distributed.TCPStore. It is not relevanttotorch.distributed.FileStorewhich does nottake in timeout as a parameter.is_host
A boolean value indicating whether this backend instancewill host the C10d store. If not specified it will beinferred heuristically by matching the hostname or the IPaddress of this machine against the specified rendezvousendpoint. Defaults to
None.Note that this configuration option only applies to
torch.distributed.TCPStore. In normalcircumstances you can safely skip it; the only time whenit is needed is if its value cannot be correctlydetermined (e.g. the rendezvous endpoint has a CNAME asthe hostname or does not match the FQDN of the machine).- Return type:
- classtorch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store,run_id)[source]#
Represents a C10d-backed rendezvous backend.
- Parameters:
store (Store) – The
torch.distributed.Storeinstance to use tocommunicate with the C10d store.run_id (str) – The run id of the rendezvous.
Etcd Backend#
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source]#
Create a new
EtcdRendezvousBackendfrom the specified parameters.Parameter
Description
read_timeout
The read timeout, in seconds, for etcd operations.Defaults to 60 seconds.
protocol
The protocol to use to communicate with etcd. Validvalues are “http” and “https”. Defaults to “http”.
ssl_cert
The path to the SSL client certificate to use along withHTTPS. Defaults to
None.ssl_cert_key
The path to the private key of the SSL client certificateto use along with HTTPS. Defaults to
None.ca_cert
The path to the rool SSL authority certificate. Defaultsto
None.- Return type:
Etcd Rendezvous (Legacy)#
Warning
TheDynamicRendezvousHandler class supersedes theEtcdRendezvousHandlerclass, and is recommended for most users.EtcdRendezvousHandler is inmaintenance mode and will be deprecated in the future.
- classtorch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl,local_addr)[source]#
Implements a
torch.distributed.elastic.rendezvous.RendezvousHandlerinterfacebacked bytorch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous.EtcdRendezvousHandleruses a URL to configure the type of rendezvous touse and to pass implementation specific configurations to the rendezvousmodule. The basic etcd rendezvous configuration URL looks like the followingetcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605-- example --etcd://localhost:2379/1234?min_workers=1&max_workers=3
The URL above is interpreted as follows:
Use the rendezvous handler that is registered with the
etcdschemeThe
etcdendpoint to use islocalhost:2379job_id==1234is used as the prefix in etcd (this allows one toshare a common etcd server for multiple jobs so long as thejob_idsare guaranteed to be unique). Note that the job id can beany string (e.g. does not need to be a number) as long as it isunique.min_workers=1andmax_workers=3specifies a range formembership size - Torch Distributed Elastic starts running the job aslong as the cluster size is greater than or equal tomin_workersand admits up tomax_workersinto the cluster.
Below are a full list of the parameters that can be passed to etcdrendezvous:
Parameter
Description
min_workers
minimum number ofworkers for therendezvous to be valid
max_workers
maximum number ofworkers to admit
timeout
total timeout withinwhich next_rendezvous isexpected to succeed(default 600s)
last_call_timeout
additional wait amount(“last call”) after minnumber of workers hasbeen reached (defaultsto 30s)
etcd_prefix
path prefix (from etcdroot), inside which alletcd nodes will becreated (defaults to
/torchelastic/p2p)
Etcd Store#
TheEtcdStore is the C10dStore instance type returned bynext_rendezvous() when etcd is used as the rendezvous backend.
- classtorch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client,etcd_store_prefix,timeout=None)[source]#
Implement a c10 Store interface by piggybacking on the rendezvous etcd instance.
This is the store object returned by
EtcdRendezvous.- add(key,num)[source]#
Atomically increment a value by an integer amount.
The integer is represented as a string using base 10. If key is not present,a default value of
0will be assumed.- Returns:
the new (incremented) value
- Return type:
- get(key)[source]#
Get a value by key, possibly doing a blocking wait.
If key is not immediately present, will do a blocking waitfor at most
timeoutduration or until the key is published.- Returns:
value
(bytes)- Raises:
LookupError - If key still not published after timeout –
- Return type:
Etcd Server#
TheEtcdServer is a convenience class that makes it easy for you tostart and stop an etcd server on a subprocess. This is useful for testingor single-node (multi-worker) deployments where manually setting up anetcd server on the side is cumbersome.
Warning
For production and multi-node deployments please considerproperly deploying a highly available etcd server as this isthe single point of failure for your distributed jobs.
- classtorch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]#
Note
tested on etcd server v3.4.3.
Starts and stops a local standalone etcd server on a random freeport. Useful for single node, multi-worker launches or testing,where a sidecar etcd server is more convenient than having toseparately setup an etcd server.
This class registers a termination handler to shutdown the etcdsubprocess on exit. This termination handler is NOT a substitute forcalling the
stop()method.The following fallback mechanism is used to find the etcd binary:
Uses env var TORCHELASTIC_ETCD_BINARY_PATH
Uses
<thisfileroot>/bin/etcdif one existsUses
etcdfromPATH
Usage
server=EtcdServer("/usr/bin/etcd",2379,"/tmp/default.etcd")server.start()client=server.get_client()# use clientserver.stop()
- Parameters:
etcd_binary_path – path of etcd server binary (see above for fallback path)