Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Jaakko Pallari
Jaakko Pallari

Posted on • Originally published atlepovirta.org on

     

Reusable response collectors in Akka

One of the recommended practices in Akka is to use theresponse collector pattern instead of theask pattern for request-response style actor communication. It simplifies actor communication by placing most of the response management under one specialised actor. Instead of adding timeout and error handling logic to several places in the actor system, the actors simply respond to any queries using the standard tell mechanism while the response collector actor decides when it has received enough responses.

It’s easy to create an ad hoc response collector, but in many cases it would be nice to be able to reuse existing actor code.

In this article, I’ll present a reusable response collector actor. First, I’ll go over some of the common patterns that can be seen in many response collectors, and form an API for those patterns. After that, I’ll present an implementation for the response collector actor that’s using the API. Finally, I’ll demonstrate a few examples of using the actor.

Common patterns in response collectors

Response collector behaviour can be summarised as follows: collect relevant responses until all the results have been collected or a timeout has been exceeded, and make the results available to a consumer. There’s quite a few requirements in that sentence. Let’s see if we can extract individual patterns from it.

Collecting relevant responses

One caveat of Akka actors is that Akka doesn’t enforce type safety in the processing of incoming messages. Specifically, each message handler has to be able to handle any type of incoming message. There are a few attempts at bringing type safety to Akka actors (seeTyped Actors andAkka Typed), but for the purpose of this exercise we’ll just collect the interesting messages, and log an error for all the unexpected messages.

Scala’sPartialFunction is useful for matching against a partial set of inputs. User can create a matcher of typePartialFunction[Any, T] where the type parameterT is the type of the expected output. For example, the following matcher can be used for extracting only incoming strings:

valmatcher:PartialFunction[Any,String]={cases:String=>s}
Enter fullscreen modeExit fullscreen mode

Determining completion

How can a response collector decide when it has collected enough responses? It depends on the use-case. Sometimes we’re interested in collecting a certain set of responses, and sometimes we’re only interested in a fixed number of responses. Let’s create a trait for tracking responses:

traitResponseTracker[T]{defaddResponse(response:T):ResponseTracker[T]defisDone:Boolean}
Enter fullscreen modeExit fullscreen mode

The traitResponseTracker models the state of collected responses. TheaddResponse method is used for advancing the tracking, whileisDone is used to detect when enough responses have been collected. Let’s see this in action:

objectCountdown{defapply(expectedMessagesCount:Int):Countdown=newCountdown(expectedMessagesCount)}classCountdown(expectedMessagesCount:Int)extendsResponseTracker[Any]{require(expectedMessagesCount>=0)defisDone:Boolean=expectedMessagesCount==0defaddResponse(response:Any):Countdown=newCountdown((expectedMessagesCount-1)max0)}
Enter fullscreen modeExit fullscreen mode

Here theCountdown is used for tracking the number of responses that been received. The tracker is initialised with the number of expected messages. When the number of expected messages have been received, the tracker will report as being done. Tracking the number of responses could be used for getting a rough sample of responses. Here’s another example:

objectMatchIds{defapply[Msg,Id](expectedIds:Set[Id],toId:Msg=>Id):MatchIds[Msg,Id]=newMatchIds(expectedIds,toId)}classMatchIds[Msg,Id](expectedIds:Set[Id],toId:Msg=>Id)extendsResponseTracker[Msg]{defisDone:Boolean=expectedIds.isEmptydefaddResponse(response:Msg):MatchIds[Msg,Id]=newMatchIds(expectedIds-toId(response),toId)}
Enter fullscreen modeExit fullscreen mode

Here theMatchIds is used for tracking IDs in certain responses. The tracker is initialised with a set of expected IDs and a function for extracting an ID from an incoming response. When a response is received, the ID of the response is removed from the expected IDs. When there are no more expected IDs, the tracker will report as being done.

Delivering results and timing out

The collected results can be provided to actors using actor messaging, but another delivery mechanism is needed for code that doesn’t read messages from an actor mailbox.

The collected results can be made available usingFuture values. A future value represents a placeholder for a value that is yet to be completed. In our case, the value is the sequence of all the collected results:Future[Iterable[T]] whereT is the type of the expected results. Alternatively, if there’s an error in the system, the future can be completed with an exception.

Sometimes everything doesn’t go as planned, and some of the responses never arrive to the response collector. It’s good to have a backup plan: stop waiting for responses after a certain amount time has passed. The timeout can be provided to the response collector using Scala’sFiniteDuration.

It’s also a good idea to convey the timeout event to the consumer of the results. This can be achieved by completing the results with a timeout error. However, it may be useful to be able to inspect what values were collected before the timeout was reached. Thus, it’s a good idea to represent the partial results in the type level:

sealedtraitResultStatecaseobjectFullextendsResultStatecaseobjectPartialextendsResultStatecaseclassResult[T](values:Iterable[T],state:ResultState)
Enter fullscreen modeExit fullscreen mode

The typeResult[T] contains the collection of all the received responses and a value that tells whether all of the responses were received or not. The type parameterT is the type of the expected results.

Response collector implementation

Using the APIs described above, we can create an implementation of the response collector. We’ll start by introducing a new actor,ResponseCollector:

importakka.actor._importscala.concurrent.duration._importscala.concurrent.PromiseclassResponseCollector[T](timeout:FiniteDuration,initialTracker:ResponseTracker[T],result:Promise[Result[T]],matcher:PartialFunction[Any,T])extendsActorwithActorLogging{// ...}
Enter fullscreen modeExit fullscreen mode

TheResponseCollector has a type parameterT that represents the type of the collected responses. The actor is initialised with:

  • a timeout duration after which the actor stops listening for responses.
  • an initial tracker that is used for detecting when collection is finished.
  • a value that can be fulfilled once (Promise) with the collected results. ThePromise can be provided to the consumer as aFuture.
  • a matcher that is used for selecting only the interesting responses from the incoming messages.

Inside the class a timeout event is set to be triggered after the given timeout duration has passed. This is done by scheduling a message delivery to the same actor. TheReceiveTimeout message from packageakka.actor is used as the timeout message.

importcontext.dispatcherprivatevalscheduledTimeout=context.system.scheduler.scheduleOnce(timeout,self,ReceiveTimeout)
Enter fullscreen modeExit fullscreen mode

The actor has a custom message handler that has state. The state contains a collection of received responses, and the current state of the response tracker.

In this handler, only the messages matched by thematcher are selected. On match, the response is appended to the collection of already collected responses. The handler also advances the response tracker, and checks whether enough responses were collected. If the tracker reports as being done, the collected results are reported back to the consumer using thePromise passed in the actor constructor, and the actor stops itself. If there are still more responses to be collected, the actor replaces its message handler to include the updated responses and response tracker.

privatedefready(responses:Vector[T],tracker:ResponseTracker[T]):Receive={casemifmatcher.isDefinedAt(m)=>valresponse=matcher(m)valnextResponses=responses:+responsevalnextTracker=tracker.addResponse(response)if(nextTracker.isDone){log.info("All responses received.")result.success(Result(nextResponses,Full))context.stop(self)}else{context.become(ready(nextResponses,nextTracker))}caseReceiveTimeout=>log.warning("Response collection timed out")result.success(Result(responses,Partial))context.stop(self)casem=>log.warning("Unknown message: {}",m)}
Enter fullscreen modeExit fullscreen mode

Besides handling the incoming responses, the handler also handles the timeout event. When the actor receivesReceiveTimeout, it will report the collected results back to the consumer as a partial result, and shutdown the actor.

Finally, the actor is set up with the custom message handler with an initial state as the actor’s first message handler.

defreceive:Receive=ready(Vector.empty,initialTracker)
Enter fullscreen modeExit fullscreen mode

In addition to the actor, we can also provide helpful functions for creating instances of the actor. Theprops function is used for creating theProps recipe for the actor. Theapply function creates the response collector, and exposes only the collected results as aFuture and the reference of the actor. In both cases, the timeout is passed as an implicit parameter in a similar style as it’s passed when using the ask pattern. Forapply we also pass anActorRefFactory implicitly, which can be, for example, an actor system or the context of an actor.

importakka.util.TimeoutobjectResponseCollector{defprops[T](tracker:ResponseTracker[T],result:Promise[Result[T]],matcher:PartialFunction[Any,T])(implicittimeout:Timeout):Props=Props(newResponseCollector(timeout.duration,tracker,result,matcher))defapply[T](tracker:ResponseTracker[T],matcher:PartialFunction[Any,T])(implicittimeout:Timeout,factory:ActorRefFactory)={valresult=Promise[Result[T]]()valref=factory.actorOf(props(tracker,result,matcher))(result.future,ref)}}
Enter fullscreen modeExit fullscreen mode

Example

Let’s create a small example demonstrating the use ofResponseCollector. In this scenario, we have some data distributed among actors. To keep this example simple, our data set is represented by an in-memory hash map. OurDataStore actor serves data from a map to anyone who requests it by key:

caseclassData(id:String,contents:String){overridedeftoString:String=s"[$id: $contents]"}objectDataStore{defprops(id:String,items:Map[Int,String])=Props(newDataStore(id,items))}classDataStore(id:String,items:Map[Int,String])extendsActor{defreceive:Receive={casei:Int=>items.get(i).foreach(v=>sender()!Data(id,v))}}
Enter fullscreen modeExit fullscreen mode

TheDataStore also sends an ID specific to the store along with the data from the map. The ID can be used for identifying the source of the data. TheDataStore will also ignore any requests where key is not found from the map.

For this example, we’ll set up three different data stores. All of the stores contain some information for keys 1-3 except the last one.

implicitvalsystem=ActorSystem()valallStores=List(system.actorOf(DataStore.props("name",Map(1->"Mike",2->"Robert",3->"Joe"))),system.actorOf(DataStore.props("location",Map(1->"UK",2->"Sweden",3->"Germany"))),system.actorOf(DataStore.props("lastPurchase",Map(1->"couch",2->"laptop"))))
Enter fullscreen modeExit fullscreen mode

For each key, we’ll set up a response collector that collects data from each data store. The collectors are only interested inData objects, so we’ll adjust their matchers accordingly:

valmatcher:PartialFunction[Any,Data]={cased:Data=>d}
Enter fullscreen modeExit fullscreen mode

In order to know when the collectors can stop collecting, we’ll useMatchIds and the IDs from the incoming data to track which data stores have responded.

defgetId(e:Data):String=e.idvaltracker=MatchIds(Set("name","location","lastPurchase"),getId)
Enter fullscreen modeExit fullscreen mode

Using the matcher and the tracker we can now set up the response collectors. As a result, we receive aFuture value for the collector results and an actor reference to the collector. We’ll pass the timeout as an implicit parameter to the collectors.

implicitvaltimeout=Timeout(1.second)val(result1,collector1)=ResponseCollector(tracker,matcher)val(result2,collector2)=ResponseCollector(tracker,matcher)val(result3,collector3)=ResponseCollector(tracker,matcher)
Enter fullscreen modeExit fullscreen mode

We can use the collector actor references as the sender when requesting data from the data stores, so that the responses will be sent to the collectors.

allStores.foreach{store=>store.tell(1,collector1)store.tell(2,collector2)store.tell(3,collector3)}
Enter fullscreen modeExit fullscreen mode

Finally, we’ll serve the results to the user using theFuture values we received earlier. For this example we’ll just print the results:

defprintResult(r:Result[Data]):Unit={valstatus=r.statematch{caseFull=>"All values received"casePartial=>"Only some values received"}valvalues=r.values.mkString(", ")println(s"$status: $values")}implicitvalec=system.dispatcherresult1.foreach(printResult)result2.foreach(printResult)result3.foreach(printResult)
Enter fullscreen modeExit fullscreen mode

Running this example will print out the results for all three collectors. The first two results will print out the full results. Since we didn’t specify thelastPurchase information for the third key, the third collector will timeout after one second, and only partial results will be printed out.

All values received: [lastPurchase: laptop], [name: Robert], [location: Sweden]All values received: [lastPurchase: couch], [name: Mike], [location: UK]Only some values received: [name: Joe], [location: Germany]
Enter fullscreen modeExit fullscreen mode

Conclusions

In this article, I’ve demonstrated how to implement a reusable response collector actor. The solution allows customisation of how the actor determines when it has received enough responses, how it determines which responses are relevant, and how long it will wait for responses before timing out and delivering partial results. I also showed an example of how to use the actor and how it behaves.

There’s still room for customisation and improvements to the actor. For example, the timeout mechanism could be customised to reset after each collected response, error handling could be added, and the responses could be delivered as a stream instead of a batch.

I’ve made the code available in aGithub Gist. Feel free to extend the code as you wish.

Thanks for reading, and happy hAkking!

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Bringing people in the IT industry together to deliver exactly what is needed in a rapid, reliable, and stress-free way.
  • Location
    Finland
  • Joined

More fromJaakko Pallari

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp