Remote Reference Protocol#
Created On: Nov 20, 2019 | Last Updated On: Apr 27, 2025
This note describes the design details of Remote Reference protocol and walksthrough message flows in different scenarios. Make sure you’re familiar with theDistributed RPC Framework before proceeding.
Background#
RRef stands for Remote REFerence. It is a reference of an object which islocated on the local or remote worker, and transparently handles referencecounting under the hood. Conceptually, it can be considered as a distributedshared pointer. Applications can create an RRef by callingremote(). Each RRef is owned by the callee workerof theremote() call (i.e., owner) and can be usedby multiple users. The owner stores the real data and keeps track of the globalreference count. Every RRef can be uniquely identified by a globalRRefId,which is assigned at the time of creation on the caller of theremote() call.
On the owner worker, there is only oneOwnerRRef instance, which containsthe real data, while on user workers, there can be as manyUserRRefs asnecessary, andUserRRef does not hold the data. All usage on the owner willretrieve the uniqueOwnerRRef instance using the globally uniqueRRefId.AUserRRef will be created when it is used as an argument or return value inrpc_sync(),rpc_async() orremote() invocation, and the owner will be notifiedaccording to update the reference count. AnOwnerRRef and its data will bedeleted when there is noUserRRef instances globally and there are noreference to theOwnerRRef on the owner as well.
Assumptions#
RRef protocol is designed with the following assumptions.
Transient Network Failures: The RRef design handles transientnetwork failures by retrying messages. It cannot handle node crashes orpermanent network partitions. When those incidents occur, the applicationshould take down all workers, revert to the previous checkpoint, and resumetraining.
Non-idempotent UDFs: We assume the user functions (UDF) provided to
rpc_sync(),rpc_async()orremote()are not idempotent and thereforecannot be retried. However, internal RRef control messages are idempotent andretried upon message failure.Out of Order Message Delivery: We do not assume message delivery orderbetween any pair of nodes, because both sender and receiver are using multiplethreads. There is no guarantee on which message will be processed first.
RRef Lifetime#
The goal of the protocol is to delete anOwnerRRef at an appropriate time.The right time to delete anOwnerRRef is when there are no livingUserRRef instances and user code is not holding references to theOwnerRRef either. The tricky part is to determine if there are any livingUserRRef instances.
Design Reasoning#
A user can get aUserRRef in three situations:
Receiving a
UserRReffrom the owner.Receiving a
UserRReffrom another user.Creating a new
UserRRefowned by another worker.
Case 1 is the simplest where the owner passes its RRef to a user, where theowner callsrpc_sync(),rpc_async(), orremote() and uses its RRef as an argument. In thiscase a newUserRRef will be created on the user. As the owner is the caller,it can easily update its local reference count on theOwnerRRef.
The only requirement is that anyUserRRef must notify the owner upon destruction. Hence, we need the firstguarantee:
G1. The owner will be notified when any UserRRef is deleted.
As messages might come delayed or out-of-order, we need one more guarantee tomake sure the delete message is not processed too soon. If A sends a message toB that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B(the child RRef).
G2. Parent RRef will NOT be deleted until the child RRef is confirmed by theowner.
In cases 2 and 3, it is possible that the owner has only partial or no knowledgeat all about the RRef fork graph. For example, an RRef could beconstructed on a user, and before the owner receives any RPC call, thecreator user might have already shared the RRef with other users, and thoseusers could further share the RRef. One invariant is that the fork graph ofany RRef is always a tree, because forking an RRef alwayscreates a newUserRRef instance on the callee (except if the callee is theowner), and hence every RRef has a single parent.
The owner’s view on anyUserRRef in the tree has three stages:
1)unknown->2)known->3)deleted.
The owner’s view of the entire tree keeps changing. The owner deletes itsOwnerRRef instance when it thinks there are no livingUserRRefinstances, i.e.,whenOwnerRRef is deleted, allUserRRef instances could be either indeeddeleted or unknown. The dangerous case is when some forks are unknown and othersare deleted.
G2 trivially guarantees that no parentUserRRef can be deleted beforethe owner knows all of its childrenUserRRef instances. However, it ispossible that the childUserRRef may be deleted before the owner knows itsparentUserRRef.
Consider the following example, where theOwnerRRef forks to A, then A forksto Y, and Y forks to Z:
OwnerRRef->A->Y->Z
If all of Z’s messages, including the delete message, are processed by theowner before Y’s messages. the owner will learn of Z’s deletion beforeknowing Y exists. Nevertheless, this does not cause any problem. Because, at leastone of Y’s ancestors will be alive (A) and it willprevent the owner from deleting theOwnerRRef. More specifically, if theowner does not know Y, A cannot be deleted due toG2, and the owner knows Asince it is A’s parent.
Things get a little trickier if the RRef is created on a user:
OwnerRRef^|A->Y->Z
If Z callsto_here() on theUserRRef, theowner at least knows A when Z is deleted, because otherwise,to_here() wouldn’t finish. If Z does not callto_here(), it is possible that the ownerreceives all messages from Z before any message from A and Y. In this case, asthe real data of theOwnerRRef has not been created yet, there is nothing tobe deleted either. It is the same as Z does not exist at all. Hence, it’s stillOK.
Implementation#
G1 is implemented by sending out a delete message inUserRRefdestructor. To provideG2, the parentUserRRef is put into a contextwhenever it is forked, indexed by the newForkId. The parentUserRRef isonly removed from the context when it receives an acknowledgement message (ACK)from the child, and the child will only send out the ACK when it is confirmed bythe owner.
Protocol Scenarios#
Let’s now discuss how the above designs translate to the protocol in fourscenarios.
User Share RRef with Owner as Return Value#
importtorchimporttorch.distributed.rpcasrpc# on worker Arref=rpc.remote('B',torch.add,args=(torch.ones(2),1))# say the rref has RRefId 100 and ForkId 1rref.to_here()
In this case, theUserRRef is created on the user worker A, then it ispassed to the owner worker B together with the remote message, and then Bcreates theOwnerRRef. The methodremote()returns immediately, meaning that theUserRRef can be forked/used beforethe owner knows about it.
On the owner, when receiving theremote() call, itwill create theOwnerRRef, and returns an ACK to acknowledge{100,1}(RRefId,ForkId). Only after receiving this ACK, can A delete itsUserRRef. This involves bothG1 andG2.G1 is obvious. ForG2, theOwnerRRef is a child of theUserRRef, and theUserRRefis not deleted until it receives the ACK from the owner.

The diagram above shows the message flow, where solid arrow contains userfunction and dashed arrow are builtin messages. Note that the first two messagesfrom A to B (remote() andto_here()) mayarrive at B in any order, but the final delete message will only be sent outwhen:
B acknowledges
UserRRef{100,1}(G2), andPython GC agrees to delete the local
UserRRefinstance. This occurs whenthe RRef is no longer in scope and is eligible for garbage collection.
User Share RRef with Owner as Argument#
importtorchimporttorch.distributed.rpcasrpc# on worker A and worker Bdeffunc(rref):pass# on worker Arref=rpc.remote('B',torch.add,args=(torch.ones(2),1))# say the rref has RRefId 100 and ForkId 1rpc.rpc_async('B',func,args=(rref,))
In this case, after creating theUserRRef on A, A uses it as an argument ina followup RPC call to B. A will keepUserRRef{100,1} alive until itreceives the acknowledge from B (G2, not the return value of the RPC call).This is necessary because A should not send out the delete message until allprevious messages are received, otherwise, theOwnerRRef could bedeleted before usage as we do not guarantee message delivery order. This is doneby creating a childForkId of RRef, holding them in a map until receives theowner confirms the childForkId. The figure below shows the message flow.

Note that theUserRRef could be deleted on B before func finishes or evenstarts. However this is OK, as at the time B sends out ACK for the childForkId, it already acquired theOwnerRRef instance, which would preventit been deleted too soon.
Owner Share RRef with User#
Owner to user is the simplest case, where the owner can update referencecounting locally, and does not need any additional control message to notifyothers. RegardingG2, it is same as the parent receives the ACK from theowner immediately, as the parent is the owner.
importtorchimporttorch.distributed.rpcasRRef,rpc# on worker B and worker Cdeffunc(rref):pass# on worker B, creating a local RRefrref=RRef("data")# say the rref has RRefId 100dist.rpc_async('C',func,args=(rref,))

The figure above shows the message flow. Note that when theOwnerRRef exitsscope after the rpc_async call, it will not be deleted, because internallythere is a map to hold it alive if there is any known forks, in which case isUserRRef{100,1}. (G2)
User Share RRef with User#
This is the most complicated case where caller user (parentUserRRef),callee user (childUserRRef), and the owner all need to get involved.
importtorchimporttorch.distributed.rpcasrpc# on worker A and worker Cdeffunc(rref):pass# on worker Arref=rpc.remote('B',torch.add,args=(torch.ones(2),1))# say the rref has RRefId 100 and ForkId 1rpc.rpc_async('C',func,args=(rref,))

When C receives the childUserRRef from A, it sends out a fork request tothe owner B. Later, when the B confirms theUserRRef on C, C will performtwo actions in parallel: 1) send out the child ACK to A ,and 2) run the userprovided function. During this time, the parent (A) will hold itsUserRRef{100,1} alive to achieveG2.