- Notifications
You must be signed in to change notification settings - Fork0
A type-safe Actor class and WiredActor for sending functions as messages
License
simerplaha/Actor
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Actor
- A small type-safe class that implements the most commonly used Actor APIsincluding ask (?
) which returns a typedFuture[R]
.
WiredActor
- Convert anyobject
to anActor
that allows invoking theobject
's function as messages.
libraryDependencies+="com.github.simerplaha"%%"actor"%"0.3"
Make sure you haveExecutionContext
in scope
importscala.concurrent.ExecutionContext.Implicits.global
valactor=Actor[Int]( (message, self)=> println(message) )actor!1
caseclassState(varcounter:Int)valactor=Actor[Int,State](State(0))( (message, self)=> self.state.counter+=1 )
A timer actor will process messages in batches after the set delays. Similar to above a stateful timer Actorcan also be created.
importscala.concurrent.duration._//stateless timer actorvalactor=Actor.timer[Int](delays=1.second) { (message, self)=>//do something }
self.schedule
returns a javaTimerTask
which is cancellable.
valactor=Actor[Int]( (message, self)=> self.schedule(message=1, delay=1.second) )
valactor=Actor[Int]( (message, self)=> println(message) )actor.terminate()//cannot send messages to a terminated actor.(actor!1) shouldBeLeft(Result.TerminatedActor)
caseclassCreateUser(name:String)(valreplyTo:ActorRef[Boolean])valactor=Actor[CreateUser] { (message:CreateUser, _)=> message.replyTo!true}valresponse:Future[Boolean]= (actor?CreateUser("Tony Stark")).right.getAwait.result(response,1.second)
By default actors are not terminated if there is a failure processing a message. Thefollowing actor enables termination if there is a failure processing a message.
valactor=Actor[Int]( (message, self)=>thrownewException("Kaboom!") ).terminateOnException()//enable terminate on exception(actor!1) shouldBeRight(Result.Sent)//first message sent is successfuleventually(actor.isTerminated() shouldBetrue)//actor is terminated(actor!2) shouldBeLeft(Result.TerminatedActor)//cannot sent messages to a terminated actor
Borrowing ideas from Akka theTestActor
implements APIs to test messages in an Actor's mailbox.
valactor=TestActor[Int]()actor.expectNoMessage(after=1.second)//expect a message after delay in the Actor's mailboxvalgot= actor.getMessage()//fetch the first message in the actor's mailboxactor.expectMessage[Int]()//expect a message of some type
importscala.concurrent.ExecutionContext.Implicits.globalimportscala.concurrent.duration._importcom.github.simerplaha.actor._caseclassPong(replyTo:ActorRef[Ping])caseclassPing(replyTo:ActorRef[Pong])caseclassState(varcount:Int)valping=Actor[Ping,State](State(0)) {case (message, self)=> self.state.count+=1 println(s"Ping:${self.state.count}") message.replyTo!Pong(self) }valpong=Actor[Pong,State](State(0)) {case (message, self)=> self.state.count+=1 println(s"Pong:${self.state.count}") message.replyTo!Ping(self) }pong!Pong(ping)//run this for 1 secondsThread.sleep(1.second.toMillis)
Functions can be sent, invoked & scheduled as messages toWiredActor
s similar to messages in anActor
.
WiredActor
s can be created on anyclass
instance orobject
.
//your class that contains Actor functionsobjectMyImpl {defhello(name:String):String=s"Hello$name"defhelloFuture(name:String):Future[String]=Future(s"Hello$name from the Future!")//some future operation}//create WiredActorvalactor=Actor.wire(MyImpl)
//invoke functionvalresponse:Future[String]= actor.ask(_.hello("World"))response.foreach(println)
valresponseFlatMap:Future[String]= actor.askFlatMap(_.helloFuture("World"))responseFlatMap.foreach(println)
valunitResponse:Unit= actor.send(_.hello("World again!"))
//schedule a function call on the actor. Returns Future response and TimerTask to cancel.valscheduleResponse: (Future[String],TimerTask)= actor.scheduleAsk(delay=1.second)(_.hello("World"))scheduleResponse._1.foreach(println)
importscala.concurrent.duration._importscala.concurrent.ExecutionContext.Implicits.globalobjectWiredPingPongDemoextendsApp {classWiredPingPong(varpingCount:Int,varpongCount:Int) {defping(replyTo:WiredActor[WiredPingPong]):Unit= { pingCount+=1 println(s"pingCount:$pingCount") replyTo.send(_.pong(replyTo)) }defpong(replyTo:WiredActor[WiredPingPong]):Unit= { pongCount+=1 println(s"pongCount:$pongCount") replyTo.send(_.ping(replyTo)) } }Actor .wire(newWiredPingPong(0,0)) .send { (impl, self)=> impl.ping(self) }Thread.sleep(1.seconds.toMillis)}
SeeWiredPingPongStateless for a statelessversion of the aboveWiredPingPong
WiredActor.
About
A type-safe Actor class and WiredActor for sending functions as messages