Rate this Page

Elastic Agent#

Created On: May 04, 2021 | Last Updated On: Jun 07, 2025

Server#

The elastic agent is the control plane of torchelastic.

It is a process that launches and manages underlying worker processes.The agent is responsible for:

  1. Working with distributed torch: the workers are started with all thenecessary information to successfully and trivially calltorch.distributed.init_process_group().

  2. Fault tolerance: monitors workers and upon detecting worker failuresor unhealthiness, tears down all workers and restarts everyone.

  3. Elasticity: Reacts to membership changes and restarts workers with the newmembers.

The simplest agents are deployed per node and works with local processes.A more advanced agent can launch and manage workers remotely. Agents canbe completely decentralized, making decisions based on the workers it manages.Or can be coordinated, communicating to other agents (that manage workersin the same job) to make a collective decision.

Below is a diagram of an agent that manages a local group of workers.

../_images/agent_diagram.jpg

Concepts#

This section describes the high-level classes and concepts thatare relevant to understanding the role of theagent in torchelastic.

classtorch.distributed.elastic.agent.server.ElasticAgent[source]#

An agent process responsible for managing one or more worker processes.

The worker processes are assumed to be regular distributed PyTorch scripts.When the worker process is created by the agent, the agent provides thenecessary information for the worker processes to properly initializea torch process group.

The exact deployment topology and ratio of agent-to-worker is dependenton the specific implementation of the agent and the user’s job placementpreferences. For instance, to run a distributed training job on GPU with8 trainers (one per GPU) one can:

  1. Use 8 x single GPU instances, place an agent per instance, managing1 worker per agent.

  2. Use 4 x double GPU instances, place an agent per instance, managing2 workers per agent.

  3. Use 2 x quad GPU instances, place an agent per instance, managing4 workers per agent.

  4. Use 1 x 8 GPU instance, place an agent per instance, managing8 workers per agent.

Usage

group_result=agent.run()ifgroup_result.is_failed():# workers failedfailure=group_result.failures[0]logger.exception("worker 0 failed with exit code :%s",failure.exit_code)else:returngroup_result.return_values[0]# return rank 0's results
abstractget_worker_group(role='default')[source]#

Return theWorkerGroup for the givenrole.

Note that the worker group is a mutable object and hence in amulti-threaded/process environment it may change state.Implementers are encouraged (but not required) to returna defensive read-only copy.

Return type:

WorkerGroup

abstractrun(role='default')[source]#

Run the agent.

Supports retrying the worker group on failures up tomax_restarts.

Returns:

The result of the execution, containing the return values orfailure details for each worker mapped by the worker’s global rank.

Raises:

Exception - any other failures NOT related to worker process

Return type:

RunResult

classtorch.distributed.elastic.agent.server.WorkerSpec(role,local_world_size,rdzv_handler,fn=None,entrypoint=None,args=(),max_restarts=3,monitor_interval=0.1,master_port=None,master_addr=None,local_addr=None,event_log_handler='null',numa_options=None,duplicate_stdout_filters=None,duplicate_stderr_filters=None,virtual_local_rank=False)[source]#

Blueprint information about a particular type of worker.

For a given role, there must only exist a single worker spec.Worker spec is expected to be homogeneous across all nodes (machine),that is each node runs the same number of workers for a particular spec.

Parameters:
  • role (str) – user-defined role for the workers with this spec

  • local_world_size (int) – number local workers to run

  • fn (Callable |None) – (deprecated use entrypoint instead)

  • entrypoint (Callable |str |None) – worker function or command

  • args (tuple) – arguments to pass toentrypoint

  • rdzv_handler (RendezvousHandler) – handles rdzv for this set of workers

  • max_restarts (int) – number of max retries for the workers

  • monitor_interval (float) – monitor status of workers everyn seconds

  • master_port (int |None) – fixed port to run the c10d store on rank 0if not specified then will chose a random free port

  • master_addr (str |None) – fixed master_addr to run the c10d store on rank 0if not specified then will chose hostname on agent rank 0

  • redirects – redirect std streams to a file,selectively redirect for a particularlocal rank by passing a map

  • tee – tees the specified std stream(s) to console + file,selectively tee for a particular local rank by passing a map,takes precedence overredirects settings.

  • event_log_handler (str) – name of the event logging handler as registered inelastic/events/handlers.py.

  • duplicate_stdout_filters (list[str]|None) – If non-empty, duplicates stdout to a file containing only linesthat match _any_ of the filter strings.

  • duplicate_stderr_filters (list[str]|None) – If non-empty, duplicates stderr to a file containing only linesthat match _any_ of the filter strings.

  • virtual_local_rank (bool) – Enable virtual local rank mode for workers (defaults to False).When enabled, LOCAL_RANK is set to 0 for all workers andCUDA_VISIBLE_DEVICES is adjusted so each worker accesses itsassigned GPU at device index 0.

get_entrypoint_name()[source]#

Get the entry point name.

If the entrypoint is a function (e.g.Callable) returns its__qualname__else if the entrypoint is a binary (e.g.str), returns the binary name.

classtorch.distributed.elastic.agent.server.WorkerState(value)[source]#

A state of theWorkerGroup.

Workers in a worker group change state as a unit. If a single workerin a worker group fails the entire set is considered failed:

UNKNOWN - agent lost track of worker group state, unrecoverableINIT - worker group object created not yet startedHEALTHY - workers running and healthyUNHEALTHY - workers running and unhealthySTOPPED - workers stopped (interrupted) by the agentSUCCEEDED - workers finished running (exit 0)FAILED - workers failed to successfully finish (exit !0)

A worker group starts from an initialINIT state,then progresses toHEALTHY orUNHEALTHY states,and finally reaches a terminalSUCCEEDED orFAILED state.

Worker groups can be interrupted and temporarily put intoSTOPPED stateby the agent. Workers inSTOPPED state are scheduled to be restartedin the near future by the agent. Some examples of workers being put intoSTOPPED state are:

  1. Worker group failure|unhealthy observed

  2. Membership change detected

When actions (start, stop, rdzv, retry, etc) on worker group failsand results in the action being partially applied to the worker groupthe state will beUNKNOWN. Typically this happens on uncaught/unhandledexceptions during state change events on the agent. The agent is notexpected to recover worker groups inUNKNOWN state and is better offself terminating and allowing the job manager to retry the node.

staticis_running(state)[source]#

Return the state of the Worker.

Returns:

True if the worker state represents workers still running(e.g. that the process exists but not necessarily healthy).

Return type:

bool

classtorch.distributed.elastic.agent.server.Worker(local_rank,global_rank=-1,role_rank=-1,world_size=-1,role_world_size=-1)[source]#

A worker instance.

Contrast this withWorkerSpec that represents the specifications of aworker. AWorker is created from aWorkerSpec. AWorker is toaWorkerSpec as an object is to a class.

Theid of the worker is interpretedby the specific implementation ofElasticAgent. For a localagent, it could be thepid(int) of the worker, for a remoteagent it could be encoded ashost:port(string).

Parameters:
  • id (Any) – uniquely identifies a worker (interpreted by the agent)

  • local_rank (int) – local rank of the worker

  • global_rank (int) – global rank of the worker

  • role_rank (int) – rank of the worker across all workers that have the same role

  • world_size (int) – number of workers (globally)

  • role_world_size (int) – number of workers that have the same role

classtorch.distributed.elastic.agent.server.WorkerGroup(spec)[source]#

A set ofWorker instances.

The class defines a set ofWorker instances for the givenWorkerSpec managed byElasticAgent. Whether the workergroup contains cross instance workers or not depends on the implementation of the agent.

Implementations#

Below are the agent implementations provided by torchelastic.

classtorch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec,logs_specs,start_method='spawn',exit_barrier_timeout=300,log_line_prefix_template=None)[source]#

An implementation oftorchelastic.agent.server.ElasticAgent that handles host-local workers.

This agent is deployed per host and is configured to spawnn workers.When using GPUs,n maps to the number of GPUs available on the host.

The local agent does not communicate to other local agents deployed onother hosts, even if the workers may communicate inter-host. The worker idis interpreted to be a local process. The agent starts and stops all workerprocesses as a single unit.

The worker function and argument passed to the worker function must bepython multiprocessing compatible. To pass multiprocessing data structuresto the workers you may create the data structure in the same multiprocessingcontext as the specifiedstart_method and pass it as a function argument.

Theexit_barrier_timeout specifies the amount of time (in seconds) to waitfor other agents to finish. This acts as a safety net to handle cases whereworkers finish at different times, to prevent agents from viewing workersthat finished early as a scale-down event. It is strongly advised that theuser code deal with ensuring that workers are terminated in a synchronousmanner rather than relying on the exit_barrier_timeout.

A named pipe based watchdog can be enabled in`LocalElasticAgent` if anenvironment variableTORCHELASTIC_ENABLE_FILE_TIMER with value 1 hasbeen defined in the`LocalElasticAgent` process.Optionally, another environment variable`TORCHELASTIC_TIMER_FILE`can be set with a unique file name for the named pipe. If the environmentvariable`TORCHELASTIC_TIMER_FILE` is not set,`LocalElasticAgent`will internally create a unique file name and set it to the environmentvariable`TORCHELASTIC_TIMER_FILE`, and this environment variable willbe propagated to the worker processes to allow them to connect to the samenamed pipe that`LocalElasticAgent` uses.

Logs are written to the specified log directory. Each log line will be by defaultprefixed by[${role_name}${local_rank}]: (e.g.[trainer0]:foobar).Log prefixes can be customized by passing atemplate string as thelog_line_prefix_template argument.The following macros (identifiers) are substituted at runtime:${role_name},${local_rank},${rank}. For example, to prefix each log line withglobal rank instead of the local rank, setlog_line_prefix_template="[${rank}]:.

Example launching function

deftrainer(args)->str:return"do train"defmain():start_method="spawn"shared_queue=multiprocessing.get_context(start_method).Queue()spec=WorkerSpec(role="trainer",local_world_size=nproc_per_process,entrypoint=trainer,args=("foobar",),...<OTHER_PARAMS...>)agent=LocalElasticAgent(spec,start_method)results=agent.run()ifresults.is_failed():print("trainer failed")else:print(f"rank 0 return value:{results.return_values[0]}")# prints -> rank 0 return value: do train

Example launching binary

defmain():spec=WorkerSpec(role="trainer",local_world_size=nproc_per_process,entrypoint="/usr/local/bin/trainer",args=("--trainer-args","foobar"),...<OTHER_PARAMS...>)agent=LocalElasticAgent(spec)results=agent.run()ifnotresults.is_failed():print("binary launches do not have return values")

Extending the Agent#

To extend the agent you can implementElasticAgent directly, howeverwe recommend you extendSimpleElasticAgent instead, which providesmost of the scaffolding and leaves you with a few specific abstract methodsto implement.

classtorch.distributed.elastic.agent.server.SimpleElasticAgent(spec,exit_barrier_timeout=300)[source]#

AnElasticAgent that manages one particular type of worker role.

AnElasticAgent that manages workers (WorkerGroup) for a singleWorkerSpecsuch as one particular type of worker role.

_assign_worker_ranks(store,group_rank,group_world_size,spec)[source]#

Determine proper ranks for worker processes.

Fast Path: when all workers have the same role and world size. We calculatethe global rank to be group_rank * group_world_size + local_rank. And therole_world_size is the same asglobal_world_size. No TCP store is used inthis case. This is only enabled when users set the environment variableTORCH_ELASTIC_WORKER_IDENTICAL to 1.

Time complexity: each worker O(1), overall O(1)

Slow Path: when workers have different roles and world sizes. We use thethe following algorithm:

  1. Each agent writes its configuration(group_rank, group_world_size, num_workers) to the common store.

  2. The rank 0 agent reads all the role_info from the store anddetermines each agents worker ranks.

  3. Determine the global rank: the global rank of the workers is computedby cumulative sum of the local_world_size for all workers in front of it.For efficiency reasons each worker is assigned a base global ranksuch that it’s workers are in the range [base_global_rank,base_global_rank + local_world_size).

  4. Determine the role rank: The role rank is determined using the algorithmsin the point 3 with the exception that the ranks are calculated withrespect to the role name.

  5. The rank 0 agent writes the assigned ranks to the store.

  6. Each agent reads the assigned ranks from the store.

Time complexity: each worker O(1), rank0 O(n), overall O(n)

Return type:

list[Worker]

_exit_barrier()[source]#

Define a barrier that keeps the agent process alive until all workers finish.

Wait forexit_barrier_timeout seconds for all agents to finishexecuting their local workers (either successfully or not). Thisacts as a safety guard against user scripts that terminate at differenttimes.

_initialize_workers(worker_group)[source]#

Start a fresh set of workers for the worker_group.

Essentially, a rendezvous followed by astart_workers.The caller should first call_stop_workers() to stop running workersprior to calling this method.

Optimistically sets the state of the worker group thatjust started asHEALTHY and delegates the actual monitoringof state to_monitor_workers() method

abstract_monitor_workers(worker_group)[source]#

Check on the workers for theworker_group.

This function also returns the new state of the worker group.

Return type:

RunResult

_rendezvous(worker_group)[source]#

Run rendezvous for the workers specified by the worker spec.

Assigns workers a new global rank and world size.Updates the rendezvous store for the worker group.

_restart_workers(worker_group)[source]#

Restart (stops, rendezvous, starts) all local workers in the group.

abstract_shutdown(death_sig=Signals.SIGTERM)[source]#

Clean up any resources that were allocated during the agent’s work.

Parameters:

death_sig (Signals) – Signal to send to the child process, SIGTERM is default

abstract_start_workers(worker_group)[source]#

Startworker_group.spec.local_world_size number of workers.

This is according to worker spec for the worker group .Returns a map oflocal_rank to workerid.

Return type:

dict[int,Any]

abstract_stop_workers(worker_group)[source]#

Stop all workers in the given worker group.

Implementers must deal with workers in all states defined byWorkerState. That is, it must gracefully handle stoppingnon-existent workers, unhealthy (stuck) workers, etc.

classtorch.distributed.elastic.agent.server.api.RunResult(state,return_values=<factory>,failures=<factory>)[source]#

Return results of the worker executions.

Run results follow an “all-or-nothing” policy where the run is successful if andonly if ALL local workers managed by this agent complete successfully.

If the result is successful (e.g.is_failed()=False) then thereturn_valuesfield contains the outputs (return values) of the workers managed by THIS agent mappedby their GLOBAL ranks. That isresult.return_values[0] is the return value ofglobal rank 0.

Note

return_values are only meaningful for when the worker entrypointis a function. Workers specified as a binary entrypoint do not canonicallyhave a return value and thereturn_values field is meaningless andmay be empty.

Ifis_failed() returnsTrue then thefailures field contains thefailure information, again, mapped by the GLOBAL rank of the worker that failed.

The keys inreturn_values andfailures are mutually exclusive, that is,a worker’s final state can only be one of: succeeded, failed. Workers intentionallyterminated by the agent according to the agent’s restart policy, are not representedin eitherreturn_values norfailures.

Watchdog in the Agent#

A named pipe based watchdog can be enabled inLocalElasticAgent if anenvironment variableTORCHELASTIC_ENABLE_FILE_TIMER with value 1 hasbeen defined in theLocalElasticAgent process.Optionally, another environment variableTORCHELASTIC_TIMER_FILEcan be set with a unique file name for the named pipe. If the environmentvariableTORCHELASTIC_TIMER_FILE is not set,LocalElasticAgentwill internally create a unique file name and set it to the environmentvariableTORCHELASTIC_TIMER_FILE, and this environment variable willbe propagated to the worker processes to allow them to connect to the samenamed pipe thatLocalElasticAgent uses.

Health Check Server#

A health check monitoring server can be enabled inLocalElasticAgentif an environment variableTORCHELASTIC_HEALTH_CHECK_PORT has been definedin theLocalElasticAgent process.Adding interface for health check server which can be extended by starting tcp/httpserver on the specified port number.Additionally, health check server will have callback to check watchdog is alive.

classtorch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback,port,timeout)[source]#

Interface for health check monitoring server, which can be extendedby starting tcp/http server on the specified port.

Parameters:
  • alive_callback (Callable[[],int]) – Callable[[], int], callback to last progress time of agent

  • port (int) – int, port number to start tcp/http server

  • timeout (int) – int, timeout seconds to decide agent is alive/dead

start()[source]#

Unsupported functionality for Pytorch, doesn’t start any health check server

stop()[source]#

Function to stop health check server

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback,port,timeout)[source]#

creates health check server object

Return type:

HealthCheckServer