Posted on • Originally published atlepovirta.org on
Reusable response collectors in Akka
One of the recommended practices in Akka is to use theresponse collector pattern instead of theask pattern for request-response style actor communication. It simplifies actor communication by placing most of the response management under one specialised actor. Instead of adding timeout and error handling logic to several places in the actor system, the actors simply respond to any queries using the standard tell mechanism while the response collector actor decides when it has received enough responses.
It’s easy to create an ad hoc response collector, but in many cases it would be nice to be able to reuse existing actor code.
In this article, I’ll present a reusable response collector actor. First, I’ll go over some of the common patterns that can be seen in many response collectors, and form an API for those patterns. After that, I’ll present an implementation for the response collector actor that’s using the API. Finally, I’ll demonstrate a few examples of using the actor.
Common patterns in response collectors
Response collector behaviour can be summarised as follows: collect relevant responses until all the results have been collected or a timeout has been exceeded, and make the results available to a consumer. There’s quite a few requirements in that sentence. Let’s see if we can extract individual patterns from it.
Collecting relevant responses
One caveat of Akka actors is that Akka doesn’t enforce type safety in the processing of incoming messages. Specifically, each message handler has to be able to handle any type of incoming message. There are a few attempts at bringing type safety to Akka actors (seeTyped Actors andAkka Typed), but for the purpose of this exercise we’ll just collect the interesting messages, and log an error for all the unexpected messages.
Scala’sPartialFunction
is useful for matching against a partial set of inputs. User can create a matcher of typePartialFunction[Any, T]
where the type parameterT
is the type of the expected output. For example, the following matcher can be used for extracting only incoming strings:
valmatcher:PartialFunction[Any,String]={cases:String=>s}
Determining completion
How can a response collector decide when it has collected enough responses? It depends on the use-case. Sometimes we’re interested in collecting a certain set of responses, and sometimes we’re only interested in a fixed number of responses. Let’s create a trait for tracking responses:
traitResponseTracker[T]{defaddResponse(response:T):ResponseTracker[T]defisDone:Boolean}
The traitResponseTracker
models the state of collected responses. TheaddResponse
method is used for advancing the tracking, whileisDone
is used to detect when enough responses have been collected. Let’s see this in action:
objectCountdown{defapply(expectedMessagesCount:Int):Countdown=newCountdown(expectedMessagesCount)}classCountdown(expectedMessagesCount:Int)extendsResponseTracker[Any]{require(expectedMessagesCount>=0)defisDone:Boolean=expectedMessagesCount==0defaddResponse(response:Any):Countdown=newCountdown((expectedMessagesCount-1)max0)}
Here theCountdown
is used for tracking the number of responses that been received. The tracker is initialised with the number of expected messages. When the number of expected messages have been received, the tracker will report as being done. Tracking the number of responses could be used for getting a rough sample of responses. Here’s another example:
objectMatchIds{defapply[Msg,Id](expectedIds:Set[Id],toId:Msg=>Id):MatchIds[Msg,Id]=newMatchIds(expectedIds,toId)}classMatchIds[Msg,Id](expectedIds:Set[Id],toId:Msg=>Id)extendsResponseTracker[Msg]{defisDone:Boolean=expectedIds.isEmptydefaddResponse(response:Msg):MatchIds[Msg,Id]=newMatchIds(expectedIds-toId(response),toId)}
Here theMatchIds
is used for tracking IDs in certain responses. The tracker is initialised with a set of expected IDs and a function for extracting an ID from an incoming response. When a response is received, the ID of the response is removed from the expected IDs. When there are no more expected IDs, the tracker will report as being done.
Delivering results and timing out
The collected results can be provided to actors using actor messaging, but another delivery mechanism is needed for code that doesn’t read messages from an actor mailbox.
The collected results can be made available usingFuture
values. A future value represents a placeholder for a value that is yet to be completed. In our case, the value is the sequence of all the collected results:Future[Iterable[T]]
whereT
is the type of the expected results. Alternatively, if there’s an error in the system, the future can be completed with an exception.
Sometimes everything doesn’t go as planned, and some of the responses never arrive to the response collector. It’s good to have a backup plan: stop waiting for responses after a certain amount time has passed. The timeout can be provided to the response collector using Scala’sFiniteDuration
.
It’s also a good idea to convey the timeout event to the consumer of the results. This can be achieved by completing the results with a timeout error. However, it may be useful to be able to inspect what values were collected before the timeout was reached. Thus, it’s a good idea to represent the partial results in the type level:
sealedtraitResultStatecaseobjectFullextendsResultStatecaseobjectPartialextendsResultStatecaseclassResult[T](values:Iterable[T],state:ResultState)
The typeResult[T]
contains the collection of all the received responses and a value that tells whether all of the responses were received or not. The type parameterT
is the type of the expected results.
Response collector implementation
Using the APIs described above, we can create an implementation of the response collector. We’ll start by introducing a new actor,ResponseCollector
:
importakka.actor._importscala.concurrent.duration._importscala.concurrent.PromiseclassResponseCollector[T](timeout:FiniteDuration,initialTracker:ResponseTracker[T],result:Promise[Result[T]],matcher:PartialFunction[Any,T])extendsActorwithActorLogging{// ...}
TheResponseCollector
has a type parameterT
that represents the type of the collected responses. The actor is initialised with:
- a timeout duration after which the actor stops listening for responses.
- an initial tracker that is used for detecting when collection is finished.
- a value that can be fulfilled once (
Promise
) with the collected results. ThePromise
can be provided to the consumer as aFuture
. - a matcher that is used for selecting only the interesting responses from the incoming messages.
Inside the class a timeout event is set to be triggered after the given timeout duration has passed. This is done by scheduling a message delivery to the same actor. TheReceiveTimeout
message from packageakka.actor
is used as the timeout message.
importcontext.dispatcherprivatevalscheduledTimeout=context.system.scheduler.scheduleOnce(timeout,self,ReceiveTimeout)
The actor has a custom message handler that has state. The state contains a collection of received responses, and the current state of the response tracker.
In this handler, only the messages matched by thematcher
are selected. On match, the response is appended to the collection of already collected responses. The handler also advances the response tracker, and checks whether enough responses were collected. If the tracker reports as being done, the collected results are reported back to the consumer using thePromise
passed in the actor constructor, and the actor stops itself. If there are still more responses to be collected, the actor replaces its message handler to include the updated responses and response tracker.
privatedefready(responses:Vector[T],tracker:ResponseTracker[T]):Receive={casemifmatcher.isDefinedAt(m)=>valresponse=matcher(m)valnextResponses=responses:+responsevalnextTracker=tracker.addResponse(response)if(nextTracker.isDone){log.info("All responses received.")result.success(Result(nextResponses,Full))context.stop(self)}else{context.become(ready(nextResponses,nextTracker))}caseReceiveTimeout=>log.warning("Response collection timed out")result.success(Result(responses,Partial))context.stop(self)casem=>log.warning("Unknown message: {}",m)}
Besides handling the incoming responses, the handler also handles the timeout event. When the actor receivesReceiveTimeout
, it will report the collected results back to the consumer as a partial result, and shutdown the actor.
Finally, the actor is set up with the custom message handler with an initial state as the actor’s first message handler.
defreceive:Receive=ready(Vector.empty,initialTracker)
In addition to the actor, we can also provide helpful functions for creating instances of the actor. Theprops
function is used for creating theProps
recipe for the actor. Theapply
function creates the response collector, and exposes only the collected results as aFuture
and the reference of the actor. In both cases, the timeout is passed as an implicit parameter in a similar style as it’s passed when using the ask pattern. Forapply
we also pass anActorRefFactory
implicitly, which can be, for example, an actor system or the context of an actor.
importakka.util.TimeoutobjectResponseCollector{defprops[T](tracker:ResponseTracker[T],result:Promise[Result[T]],matcher:PartialFunction[Any,T])(implicittimeout:Timeout):Props=Props(newResponseCollector(timeout.duration,tracker,result,matcher))defapply[T](tracker:ResponseTracker[T],matcher:PartialFunction[Any,T])(implicittimeout:Timeout,factory:ActorRefFactory)={valresult=Promise[Result[T]]()valref=factory.actorOf(props(tracker,result,matcher))(result.future,ref)}}
Example
Let’s create a small example demonstrating the use ofResponseCollector
. In this scenario, we have some data distributed among actors. To keep this example simple, our data set is represented by an in-memory hash map. OurDataStore
actor serves data from a map to anyone who requests it by key:
caseclassData(id:String,contents:String){overridedeftoString:String=s"[$id: $contents]"}objectDataStore{defprops(id:String,items:Map[Int,String])=Props(newDataStore(id,items))}classDataStore(id:String,items:Map[Int,String])extendsActor{defreceive:Receive={casei:Int=>items.get(i).foreach(v=>sender()!Data(id,v))}}
TheDataStore
also sends an ID specific to the store along with the data from the map. The ID can be used for identifying the source of the data. TheDataStore
will also ignore any requests where key is not found from the map.
For this example, we’ll set up three different data stores. All of the stores contain some information for keys 1-3 except the last one.
implicitvalsystem=ActorSystem()valallStores=List(system.actorOf(DataStore.props("name",Map(1->"Mike",2->"Robert",3->"Joe"))),system.actorOf(DataStore.props("location",Map(1->"UK",2->"Sweden",3->"Germany"))),system.actorOf(DataStore.props("lastPurchase",Map(1->"couch",2->"laptop"))))
For each key, we’ll set up a response collector that collects data from each data store. The collectors are only interested inData
objects, so we’ll adjust their matchers accordingly:
valmatcher:PartialFunction[Any,Data]={cased:Data=>d}
In order to know when the collectors can stop collecting, we’ll useMatchIds
and the IDs from the incoming data to track which data stores have responded.
defgetId(e:Data):String=e.idvaltracker=MatchIds(Set("name","location","lastPurchase"),getId)
Using the matcher and the tracker we can now set up the response collectors. As a result, we receive aFuture
value for the collector results and an actor reference to the collector. We’ll pass the timeout as an implicit parameter to the collectors.
implicitvaltimeout=Timeout(1.second)val(result1,collector1)=ResponseCollector(tracker,matcher)val(result2,collector2)=ResponseCollector(tracker,matcher)val(result3,collector3)=ResponseCollector(tracker,matcher)
We can use the collector actor references as the sender when requesting data from the data stores, so that the responses will be sent to the collectors.
allStores.foreach{store=>store.tell(1,collector1)store.tell(2,collector2)store.tell(3,collector3)}
Finally, we’ll serve the results to the user using theFuture
values we received earlier. For this example we’ll just print the results:
defprintResult(r:Result[Data]):Unit={valstatus=r.statematch{caseFull=>"All values received"casePartial=>"Only some values received"}valvalues=r.values.mkString(", ")println(s"$status: $values")}implicitvalec=system.dispatcherresult1.foreach(printResult)result2.foreach(printResult)result3.foreach(printResult)
Running this example will print out the results for all three collectors. The first two results will print out the full results. Since we didn’t specify thelastPurchase
information for the third key, the third collector will timeout after one second, and only partial results will be printed out.
All values received: [lastPurchase: laptop], [name: Robert], [location: Sweden]All values received: [lastPurchase: couch], [name: Mike], [location: UK]Only some values received: [name: Joe], [location: Germany]
Conclusions
In this article, I’ve demonstrated how to implement a reusable response collector actor. The solution allows customisation of how the actor determines when it has received enough responses, how it determines which responses are relevant, and how long it will wait for responses before timing out and delivering partial results. I also showed an example of how to use the actor and how it behaves.
There’s still room for customisation and improvements to the actor. For example, the timeout mechanism could be customised to reset after each collected response, error handling could be added, and the responses could be delivered as a stream instead of a batch.
I’ve made the code available in aGithub Gist. Feel free to extend the code as you wish.
Thanks for reading, and happy hAkking!
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse