评价此页

Expiration Timers#

Created On: May 04, 2021 | Last Updated On: Apr 25, 2024

Expiration timers are set up on the same process as the agent and used from your script to deal with stuck workers. When you go into a code-block that has the potential to get stuck you can acquire an expiration timer, which instructs the timer server to kill the process if it does not release the timer by the self-imposed expiration deadline.

用法

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work

In the example above if trainer_func takes more than 60 seconds to complete, then the worker process is killed and the agent retries the worker group.

Client Methods#

torch.distributed.elastic.timer.configure(timer_client)[source]#

Configures a timer client. Must be called before using expires.

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source]#

Acquires a countdown timer that expires in after seconds from now, unless the code-block that it wraps is finished within the timeframe. When the timer expires, this worker is eligible to be reaped. The exact meaning of “reaped” depends on the client implementation. In most cases, reaping means to terminate the worker process. Note that the worker is NOT guaranteed to be reaped at exactly time.now() + after, but rather the worker is “eligible” for being reaped and the TimerServer that the client talks to will ultimately make the decision when and how to reap the workers with expired timers.

用法

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

Server/Client Implementations#

Below are the timer server and client pairs that are provided by torchelastic.

注意

Timer server and clients always have to be implemented and used in pairs since there is a messaging protocol between the server and client.

Below is a pair of timer server and client that is implemented based on a multiprocess.Queue.

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source]#

Server that works with LocalTimerClient. Clients are expected to be subprocesses to the parent process that is running this server. Each host in the job is expected to start its own timer server locally and each server instance manages timers for local workers (running on processes on the same host).

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source]#

Client side of LocalTimerServer. This client is meant to be used on the same host that the LocalTimerServer is running on and uses pid to uniquely identify a worker. This is particularly useful in situations where one spawns a subprocess (trainer) per GPU on a host with multiple GPU devices.

Below is another pair of timer server and client that is implemented based on a named pipe.

class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source]#

Server that works with FileTimerClient. Clients are expected to be running on the same host as the process that is running this server. Each host in the job is expected to start its own timer server locally and each server instance manages timers for local workers (running on processes on the same host).

参数
  • file_path (str) – str, the path of a FIFO special file to be created.

  • max_interval (float) – float, max interval in seconds for each watchdog loop.

  • daemon (bool) – bool, running the watchdog thread in daemon mode or not. A daemon thread will not block a process to stop.

  • log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None], an optional callback for logging the events in JSON format.

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source]#

Client side of FileTimerServer. This client is meant to be used on the same host that the FileTimerServer is running on and uses pid to uniquely identify a worker. This client uses a named_pipe to send timer requests to the FileTimerServer. This client is a producer while the FileTimerServer is a consumer. Multiple clients can work with the same FileTimerServer.

参数
  • file_path (str) – str, the path of a FIFO special file. FileTimerServer must have created it by calling os.mkfifo().

  • signal – signal, the signal to use to kill the process. Using a negative or zero signal will not kill the process.

Writing a custom timer server/client#

To write your own timer server and client extend the torch.distributed.elastic.timer.TimerServer for the server and torch.distributed.elastic.timer.TimerClient for the client. The TimerRequest object is used to pass messages between the server and client.

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source]#

Data object representing a countdown timer acquisition and release that is used between the TimerClient and TimerServer. A negative expiration_time should be interpreted as a “release” request.

注意

the type of worker_id is implementation specific. It is whatever the TimerServer and TimerClient implementations have on to uniquely identify a worker.

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source]#

Entity that monitors active timers and expires them in a timely fashion. This server is responsible for reaping workers that have expired timers.

abstract clear_timers(worker_ids)[source]#

Clears all timers for the given worker_ids.

abstract get_expired_timers(deadline)[source]#

Returns all expired timers for each worker_id. An expired timer is a timer for which the expiration_time is less than or equal to the provided deadline.

返回类型

dict[str, list[torch.distributed.elastic.timer.api.TimerRequest]]

抽象 register_timers(timer_requests)[源码]#

处理传入的计时器请求,并将它们注册到服务器。计时器请求可以是获取计时器或释放计时器请求。具有负 expiration_time 的计时器请求应解释为释放计时器请求。

class torch.distributed.elastic.timer.TimerClient[源码]#

通过与 TimerServer 通信来获取和释放倒计时器的客户端库。

抽象 acquire(scope_id, expiration_time)[源码]#

根据 scope_id 和 expiration_time 获取持有此客户端对象的 worker 的计时器。通常会向 TimerServer 注册计时器。

抽象 release(scope_id)[源码]#

释放此客户端代表的 worker 上 scope_id 的计时器。调用此方法后,范围上的倒计时器将不再生效。

调试信息日志记录#

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id, expired_timers)[源码]#