RabbitMQ tutorial - Remote procedure call (RPC)
Remote procedure call (RPC)
(using the Java client)
Prerequisites
This tutorial assumes RabbitMQ isinstalled and running onlocalhost on thestandard port (5672). In case youuse a different host, port or credentials, connections settings would requireadjusting.
Where to get help
If you're having trouble going through this tutorial you can contact usthroughGitHub Discussions orRabbitMQ community Discord.
In thesecond tutorial we learned how touseWork Queues to distribute time-consuming tasks among multipleworkers.
But what if we need to run a function on a remote computer and wait forthe result? Well, that's a different story. This pattern is commonlyknown asRemote Procedure Call orRPC.
In this tutorial we're going to use RabbitMQ to build an RPC system: aclient and a scalable RPC server. As we don't have any time-consumingtasks that are worth distributing, we're going to create a dummy RPCservice that returns Fibonacci numbers.
Client interface
To illustrate how an RPC service could be used we're going tocreate a simple client class. It's going to expose a method namedcallwhich sends an RPC request and blocks until the answer is received:
FibonacciRpcClient fibonacciRpc=newFibonacciRpcClient();
String result= fibonacciRpc.call("4");
System.out.println("fib(4) is "+ result);
A note on RPC
Although RPC is a pretty common pattern in computing, it's often criticised.The problems arise when a programmer is not awarewhether a function call is local or if it's a slow RPC. Confusionslike that result in an unpredictable system and adds unnecessarycomplexity to debugging. Instead of simplifying software, misused RPCcan result in unmaintainable spaghetti code.
Bearing that in mind, consider the following advice:
- Make sure it's obvious which function call is local and which is remote.
- Document your system. Make the dependencies between components clear.
- Handle error cases. How should the client react when the RPC server isdown for a long time?
When in doubt avoid RPC. If you can, you should use an asynchronouspipeline - instead of RPC-like blocking, results are asynchronouslypushed to a next computation stage.
Callback queue
The request-reply pattern in RabbitMQ involves a straightforward interaction between the server and the client.
A client sends a request message and a server replies with a response message.
In order to receive a response we need to send a 'callback' queue name with therequest. Such a queue is oftenserver-named but can also havea well-known name (be client-named).
The server will then use that name to respond usingthe default exchange.
callbackQueueName= channel.queueDeclare().getQueue();
BasicProperties props=newBasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("","rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
We need this new import:
importcom.rabbitmq.client.AMQP.BasicProperties;
Message properties
The AMQP 0-9-1 protocol predefines a set of 14 properties that go witha message. Most of the properties are rarely used, with the exception ofthe following:
deliveryMode: Marks a message as persistent (with a value of2)or transient (any other value). You may remember this propertyfromthe second tutorial.contentType: Used to describe the mime-type of the encoding.For example for the often used JSON encoding it is a good practiceto set this property to:application/json.replyTo: Commonly used to name a callback queue.correlationId: Useful to correlate RPC responses with requests.
Correlation Id
Creating a callback queue for every RPC request is inefficient.A better way is creating a single callback queue per client.
That raises a new issue, having received a response in that queue it'snot clear to which request the response belongs. That's when thecorrelationId property is used. We're going to set it to a uniquevalue for every request. Later, when we receive a message in thecallback queue we'll look at this property, and based on that we'll beable to match a response with a request. If we see an unknowncorrelationId value, we may safely discard the message - itdoesn't belong to our requests.
You may ask, why should we ignore unknown messages in the callbackqueue, rather than failing with an error? It's due to a possibility ofa race condition on the server side. Although unlikely, it is possiblethat the RPC server will die just after sending us the answer, butbefore sending an acknowledgment message for the request. If thathappens, the restarted RPC server will process the request again.That's why on the client we must handle the duplicate responsesgracefully, and the RPC should ideally be idempotent.
Summary
Our RPC will work like this:
- When the Client starts up, it creates an exclusivecallback queue.
- For an RPC request, the Client sends a message with two properties:
reply_to, which is set to the callback queue andcorrelation_id,which is set to a unique value for every request. - The request is sent to an
rpc_queuequeue. - The RPC worker (aka: server) is waiting for requests on that queue.When a request appears, it does the job and sends a message with theresult back to the Client, using the queue from the
replyTofield. - The client waits for data on the reply queue. When a messageappears, it checks the
correlationIdproperty. If it matchesthe value from the request it returns the response to theapplication.
Putting it all together
The Fibonacci task:
privatestaticintfib(int n){
if(n==0)return0;
if(n==1)return1;
returnfib(n-1)+fib(n-2);
}
We declare our fibonacci function. It assumes only valid positive integer input.(Don't expect this one to work for big numbers,and it's probably the slowest recursive implementation possible).
The code for our RPC server can be found here:RPCServer.java.
The server code is rather straightforward:
- As usual we start by establishing the connection, channel and declaringthe queue.
- We might want to run more than one server process. In orderto spread the load equally over multiple servers we need to set the
prefetchCountsetting in channel.basicQos. - We use
basicConsumeto access the queue, where we provide a callback in theform of an object (DeliverCallback) that will do the work and send the response back.
The code for our RPC client can be found here:RPCClient.java.
The client code is slightly more involved:
- We establish a connection and channel.
- Our
callmethod makes the actual RPC request. - Here, we first generate a unique
correlationIdnumber and save it - our consumer callback will use this value tomatch the appropriate response. - Then, we create a dedicated exclusive queue for the reply and subscribe to it.
- Next, we publish the request message, with two properties:
replyToandcorrelationId. - At this point we can sit back and wait until the properresponse arrives.
- Since our consumer delivery handling is happening in a separate thread,we're going to need something to suspend the
mainthread before the response arrives.Usage ofCompletableFutureis one possible solution to do so. - The consumer is doing a very simple job,for every consumed response message it checks if the
correlationIdis the one we're looking for. If so, it completes theCompletableFuture. - At the same time
mainthread is waiting for theCompletableFutureto complete. - Finally, we return the response back to the user.
Now is a good time to take a look at our full example source code (which includes basic exception handling) forRPCClient.java andRPCServer.java.
Compile and set up the classpath as usual (seetutorial one):
javac-cp$CP RPCClient.java RPCServer.java
Our RPC service is now ready. We can start the server:
java-cp$CP RPCServer
# => [x] Awaiting RPC requests
To request a fibonacci number run the client:
java-cp$CP RPCClient
# => [x] Requesting fib(30)
The design presented here is not the only possible implementation of a RPCservice, but it has some important advantages:
- If the RPC server is too slow, you can scale up by just runninganother one. Try running a second
RPCServerin a new console. - On the client side, the RPC requires sending andreceiving only one message. No synchronous calls like
queueDeclareare required. As a result the RPC client needs only one networkround trip for a single RPC request.
Our code is still pretty simplistic and doesn't try to solve morecomplex (but important) problems, like:
- How should the client react if there are no servers running?
- Should a client have some kind of timeout for the RPC?
- If the server malfunctions and raises an exception, should it beforwarded to the client?
- Protecting against invalid incoming messages(eg checking bounds, type) before processing.
If you want to experiment, you may find themanagement UI useful for viewing the queues.