Rate this Page

Expiration Timers#

Created On: May 04, 2021 | Last Updated On: Apr 25, 2024

Expiration timers are set up on the same process as the agent andused from your script to deal with stuck workers. When you go intoa code-block that has the potential to get stuck you can acquirean expiration timer, which instructs the timer server to kill theprocess if it does not release the timer by the self-imposed expirationdeadline.

Usage:

importtorchelastic.timerastimerimporttorchelastic.agent.serverasagentdefmain():start_method="spawn"message_queue=mp.get_context(start_method).Queue()server=timer.LocalTimerServer(message,max_interval=0.01)server.start()# non-blockingspec=WorkerSpec(fn=trainer_func,args=(message_queue,),...<OTHER_PARAMS...>)agent=agent.LocalElasticAgent(spec,start_method)agent.run()deftrainer_func(message_queue):timer.configure(timer.LocalTimerClient(message_queue))withtimer.expires(after=60):# 60 second expiry# do some work

In the example above iftrainer_func takes more than 60 seconds tocomplete, then the worker process is killed and the agent retries the worker group.

Client Methods#

torch.distributed.elastic.timer.configure(timer_client)[source]#

Configures a timer client. Must be called before usingexpires.

torch.distributed.elastic.timer.expires(after,scope=None,client=None)[source]#

Acquires a countdown timer that expires inafter seconds from now,unless the code-block that it wraps is finished within the timeframe.When the timer expires, this worker is eligible to be reaped. Theexact meaning of “reaped” depends on the client implementation. Inmost cases, reaping means to terminate the worker process.Note that the worker is NOT guaranteed to be reaped at exactlytime.now()+after, but rather the worker is “eligible” for beingreaped and theTimerServer that the client talks to will ultimatelymake the decision when and how to reap the workers with expired timers.

Usage:

torch.distributed.elastic.timer.configure(LocalTimerClient())withexpires(after=10):torch.distributed.all_reduce(...)

Server/Client Implementations#

Below are the timer server and client pairs that are provided by torchelastic.

Note

Timer server and clients always have to be implemented and usedin pairs since there is a messaging protocol between the serverand client.

Below is a pair of timer server and client that is implemented based onamultiprocess.Queue.

classtorch.distributed.elastic.timer.LocalTimerServer(mp_queue,max_interval=60,daemon=True)[source]#

Server that works withLocalTimerClient. Clients are expected to besubprocesses to the parent process that is running this server. Each hostin the job is expected to start its own timer server locally and eachserver instance manages timers for local workers (running on processeson the same host).

classtorch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source]#

Client side ofLocalTimerServer. This client is meant to be usedon the same host that theLocalTimerServer is running on and usespid to uniquely identify a worker. This is particularly useful in situationswhere one spawns a subprocess (trainer) per GPU on a host with multipleGPU devices.

Below is another pair of timer server and client that is implementedbased on a named pipe.

classtorch.distributed.elastic.timer.FileTimerServer(file_path,run_id,max_interval=10,daemon=True,log_event=None)[source]#

Server that works withFileTimerClient. Clients are expected to berunning on the same host as the process that is running this server.Each host in the job is expected to start its own timer server locallyand each server instance manages timers for local workers (running onprocesses on the same host).

Parameters
  • file_path (str) – str, the path of a FIFO special file to be created.

  • max_interval (float) – float, max interval in seconds for each watchdog loop.

  • daemon (bool) – bool, running the watchdog thread in daemon mode or not.A daemon thread will not block a process to stop.

  • log_event (Optional[Callable[[str,Optional[FileTimerRequest]],None]]) – Callable[[Dict[str, str]], None], an optional callback forlogging the events in JSON format.

classtorch.distributed.elastic.timer.FileTimerClient(file_path,signal=Signals.SIGKILL)[source]#

Client side ofFileTimerServer. This client is meant to be usedon the same host that theFileTimerServer is running on and usespid to uniquely identify a worker.This client uses a named_pipe to send timer requests to theFileTimerServer. This client is a producer while theFileTimerServer is a consumer. Multiple clients can work withthe sameFileTimerServer.

Parameters
  • file_path (str) – str, the path of a FIFO special file.FileTimerServermust have created it by calling os.mkfifo().

  • signal – signal, the signal to use to kill the process. Using anegative or zero signal will not kill the process.

Writing a custom timer server/client#

To write your own timer server and client extend thetorch.distributed.elastic.timer.TimerServer for the server andtorch.distributed.elastic.timer.TimerClient for the client. TheTimerRequest object is used to pass messages betweenthe server and client.

classtorch.distributed.elastic.timer.TimerRequest(worker_id,scope_id,expiration_time)[source]#

Data object representing a countdown timer acquisition and releasethat is used between theTimerClient andTimerServer.A negativeexpiration_time should be interpreted as a “release”request.

Note

the type ofworker_id is implementation specific.It is whatever the TimerServer and TimerClient implementationshave on to uniquely identify a worker.

classtorch.distributed.elastic.timer.TimerServer(request_queue,max_interval,daemon=True)[source]#

Entity that monitors active timers and expires themin a timely fashion. This server is responsible forreaping workers that have expired timers.

abstractclear_timers(worker_ids)[source]#

Clears all timers for the givenworker_ids.

abstractget_expired_timers(deadline)[source]#

Returns all expired timers for each worker_id. An expired timeris a timer for which the expiration_time is less than or equal tothe provided deadline.

Return type

dict[str,list[torch.distributed.elastic.timer.api.TimerRequest]]

abstractregister_timers(timer_requests)[source]#

Processes the incoming timer requests and registers them with the server.The timer request can either be a acquire-timer or release-timer request.Timer requests with a negative expiration_time should be interpretedas a release-timer request.

classtorch.distributed.elastic.timer.TimerClient[source]#

Client library to acquire and release countdown timers by communicatingwith the TimerServer.

abstractacquire(scope_id,expiration_time)[source]#

Acquires a timer for the worker that holds this client objectgiven the scope_id and expiration_time. Typically registersthe timer with the TimerServer.

abstractrelease(scope_id)[source]#

Releases the timer for thescope_id on the worker thisclient represents. After this method iscalled, the countdown timer on the scope is no longer in effect.

Debug info logging#

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id,expired_timers)[source]#