torch.futures#
Created On: Jun 12, 2025 | Last Updated On: Jun 12, 2025
This package provides aFuture type that encapsulatesan asynchronous execution and a set of utility functions to simplify operationsonFuture objects. Currently, theFuture type is primarily used by theDistributed RPC Framework.
- classtorch.futures.Future(*,devices=None)#
Wrapper around a
torch._C.Futurewhich encapsulates an asynchronousexecution of a callable, e.g.rpc_async(). Italso exposes a set of APIs to add callback functions and set results.Warning
GPU support is a beta feature, subject to changes.
- add_done_callback(callback)[source]#
Append the given callback function to this
Future, which will be runwhen theFutureis completed. Multiple callbacks can be added tothe sameFuture, but the order in which they will be executed cannotbe guaranteed. The callback must take one argument, which is thereference to thisFuture. The callback function can use thevalue()method to get the value. Note that if thisFutureisalready completed, the given callback will be run inline.We recommend that you use the
then()method as it provides a wayto synchronize after your callback has completed.add_done_callbackcan be cheaper if your callback does not return anything. But boththen()andadd_done_callbackuse the same callbackregistration API under the hood.With respect to GPU tensors, this method behaves in the same way as
then().- Parameters:
callback (
Future) – aCallablethat takes in one argument,which is the reference to thisFuture.
Note
Note that if the callback function throws, eitherthrough the original future being completed with an exception andcalling
fut.wait(), or through other code in the callback,error handling must be carefully taken care of. For example, ifthis callback later completes additional futures, those futures arenot marked as completed with an error and the user is responsiblefor handling completion/waiting on those futures independently.Example:
>>>defcallback(fut):...print("This will run after the future has finished.")...print(fut.wait())>>>fut=torch.futures.Future()>>>fut.add_done_callback(callback)>>>fut.set_result(5)This will run after the future has finished.5
- done()[source]#
Return
Trueif thisFutureis done. AFutureis done if ithas a result or an exception.If the value contains tensors that reside on GPUs,
Future.done()will returnTrueeven if the asynchronous kernels that arepopulating those tensors haven’t yet completed running on the device,because at such stage the result is already usable, provided oneperforms the appropriate synchronizations (seewait()).- Return type:
- set_exception(result)[source]#
Set an exception for this
Future, which will mark thisFutureascompleted with an error and trigger all attached callbacks. Note thatwhen calling wait()/value() on thisFuture, the exception set herewill be raised inline.- Parameters:
result (BaseException) – the exception for this
Future.
Example:
>>>fut=torch.futures.Future()>>>fut.set_exception(ValueError("foo"))>>>fut.wait()Traceback (most recent call last):...ValueError:foo
- set_result(result)[source]#
Set the result for this
Future, which will mark thisFutureascompleted and trigger all attached callbacks. Note that aFuturecannot be marked completed twice.If the result contains tensors that reside on GPUs, this method can becalled even if the asynchronous kernels that are populating thosetensors haven’t yet completed running on the device, provided that thestreams on which those kernels were enqueued are set as the current oneswhen this method is called. Put simply, it’s safe to call this methodimmediately after launching those kernels, without any additionalsynchronization, as long as one doesn’t change streams in between. Thismethod will record events on all the relevant current streams and willuse them to ensure proper scheduling for all the consumers of this
Future.- Parameters:
result (object) – the result object of this
Future.
Example:
>>>importthreading>>>importtime>>>defslow_set_future(fut,value):...time.sleep(0.5)...fut.set_result(value)>>>fut=torch.futures.Future()>>>t=threading.Thread(...target=slow_set_future,...args=(fut,torch.ones(2)*3)...)>>>t.start()>>>print(fut.wait())tensor([3., 3.])>>>t.join()
- then(callback)[source]#
Append the given callback function to this
Future, which will be runwhen theFutureis completed. Multiple callbacks can be added tothe sameFuture, but the order in which they will be executed cannotbe guaranteed (to enforce a certain order consider chaining:fut.then(cb1).then(cb2)). The callback must take one argument, whichis the reference to thisFuture. The callback function can use thevalue()method to get the value. Note that if thisFutureisalready completed, the given callback will be run immediately inline.If the
Future’s value contains tensors that reside on GPUs, thecallback might be invoked while the async kernels that are populatingthose tensors haven’t yet finished executing on the device. However, thecallback will be invoked with some dedicated streams set as current(fetched from a global pool) which will be synchronized with thosekernels. Hence any operation performed by the callback on these tensorswill be scheduled on the device after the kernels complete. In otherwords, as long as the callback doesn’t switch streams, it can safelymanipulate the result without any additional synchronization. This issimilar to the non-blocking behavior ofwait().Similarly, if the callback returns a value that contains tensors thatreside on a GPU, it can do so even if the kernels that are producingthese tensors are still running on the device, as long as the callbackdidn’t change streams during its execution. If one wants to changestreams, one must be careful to re-synchronize them with the originalstreams, that is, those that were current when the callback was invoked.
- Parameters:
callback (
Callable) – aCallablethat takes thisFutureasthe only argument.- Returns:
A new
Futureobject that holds the return value of thecallbackand will be marked as completed when the givencallbackfinishes.- Return type:
Future[S]
Note
Note that if the callback function throws, eitherthrough the original future being completed with an exception andcalling
fut.wait(), or through other code in the callback, thefuture returned bythenwill be marked appropriately with theencountered error. However, if this callback later completesadditional futures, those futures are not marked as completed withan error and the user is responsible for handling completion/waitingon those futures independently.Example:
>>>defcallback(fut):...print(f"RPC return value is{fut.wait()}.")>>>fut=torch.futures.Future()>>># The inserted callback will print the return value when>>># receiving the response from "worker1">>>cb_fut=fut.then(callback)>>>chain_cb_fut=cb_fut.then(...lambdax:print(f"Chained cb done.{x.wait()}")...)>>>fut.set_result(5)RPC return value is 5.Chained cb done. None
- value()[source]#
Obtain the value of an already-completed future.
This method should only be called after a call to
wait()hascompleted, or inside a callback function passed tothen(). Inother cases thisFuturemay not yet hold a value and callingvalue()could fail.If the value contains tensors that reside on GPUs, then this method willnot perform any additional synchronization. This should be donebeforehand, separately, through a call to
wait()(except withincallbacks, for which it’s already being taken care of bythen()).- Returns:
The value held by this
Future. If the function (callback or RPC)creating the value has thrown an error, thisvalue()method willalso throw an error.- Return type:
T
- wait()[source]#
Block until the value of this
Futureis ready.If the value contains tensors that reside on GPUs, then an additionalsynchronization is performed with the kernels (executing on the device)which may be asynchronously populating those tensors. Such sync isnon-blocking, which means that
wait()will insert the necessaryinstructions in the current streams to ensure that further operationsenqueued on those streams will be properly scheduled after the asynckernels but, once that is done,wait()will return, even if thosekernels are still running. No further synchronization is required whenaccessing and using the values, as long as one doesn’t change streams.- Returns:
The value held by this
Future. If the function (callback or RPC)creating the value has thrown an error, thiswaitmethod willalso throw an error.- Return type:
T
- torch.futures.collect_all(futures)[source]#
Collects the provided
Futureobjects into a singlecombinedFuturethat is completed when all of thesub-futures are completed.- Parameters:
- Returns:
Returns a
Futureobject to a list of the passedin Futures.- Return type:
- Example::
>>>fut0=torch.futures.Future()>>>fut1=torch.futures.Future()>>>fut=torch.futures.collect_all([fut0,fut1])>>>fut0.set_result(0)>>>fut1.set_result(1)>>>fut_list=fut.wait()>>>print(f"fut0 result ={fut_list[0].wait()}")fut0 result = 0>>>print(f"fut1 result ={fut_list[1].wait()}")fut1 result = 1