Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork0
A type-safe Actor class and WiredActor for sending functions as messages
License
simerplaha/Actor
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Actor
- A small type-safe class that implements the most commonly used Actor APIsincluding ask (?
) which returns a typedFuture[R]
.
WiredActor
- Convert anyobject
to anActor
that allows invoking theobject
's function as messages.
libraryDependencies+="com.github.simerplaha"%%"actor"%"0.3"
Make sure you haveExecutionContext
in scope
importscala.concurrent.ExecutionContext.Implicits.global
valactor=Actor[Int]( (message, self)=> println(message) )actor!1
caseclassState(varcounter:Int)valactor=Actor[Int,State](State(0))( (message, self)=> self.state.counter+=1 )
A timer actor will process messages in batches after the set delays. Similar to above a stateful timer Actorcan also be created.
importscala.concurrent.duration._//stateless timer actorvalactor=Actor.timer[Int](delays=1.second) { (message, self)=>//do something }
self.schedule
returns a javaTimerTask
which is cancellable.
valactor=Actor[Int]( (message, self)=> self.schedule(message=1, delay=1.second) )
valactor=Actor[Int]( (message, self)=> println(message) )actor.terminate()//cannot send messages to a terminated actor.(actor!1) shouldBeLeft(Result.TerminatedActor)
caseclassCreateUser(name:String)(valreplyTo:ActorRef[Boolean])valactor=Actor[CreateUser] { (message:CreateUser, _)=> message.replyTo!true}valresponse:Future[Boolean]= (actor?CreateUser("Tony Stark")).right.getAwait.result(response,1.second)
By default actors are not terminated if there is a failure processing a message. Thefollowing actor enables termination if there is a failure processing a message.
valactor=Actor[Int]( (message, self)=>thrownewException("Kaboom!") ).terminateOnException()//enable terminate on exception(actor!1) shouldBeRight(Result.Sent)//first message sent is successfuleventually(actor.isTerminated() shouldBetrue)//actor is terminated(actor!2) shouldBeLeft(Result.TerminatedActor)//cannot sent messages to a terminated actor
Borrowing ideas from Akka theTestActor
implements APIs to test messages in an Actor's mailbox.
valactor=TestActor[Int]()actor.expectNoMessage(after=1.second)//expect a message after delay in the Actor's mailboxvalgot= actor.getMessage()//fetch the first message in the actor's mailboxactor.expectMessage[Int]()//expect a message of some type
importscala.concurrent.ExecutionContext.Implicits.globalimportscala.concurrent.duration._importcom.github.simerplaha.actor._caseclassPong(replyTo:ActorRef[Ping])caseclassPing(replyTo:ActorRef[Pong])caseclassState(varcount:Int)valping=Actor[Ping,State](State(0)) {case (message, self)=> self.state.count+=1 println(s"Ping:${self.state.count}") message.replyTo!Pong(self) }valpong=Actor[Pong,State](State(0)) {case (message, self)=> self.state.count+=1 println(s"Pong:${self.state.count}") message.replyTo!Ping(self) }pong!Pong(ping)//run this for 1 secondsThread.sleep(1.second.toMillis)
Functions can be sent, invoked & scheduled as messages toWiredActor
s similar to messages in anActor
.
WiredActor
s can be created on anyclass
instance orobject
.
//your class that contains Actor functionsobjectMyImpl {defhello(name:String):String=s"Hello$name"defhelloFuture(name:String):Future[String]=Future(s"Hello$name from the Future!")//some future operation}//create WiredActorvalactor=Actor.wire(MyImpl)
//invoke functionvalresponse:Future[String]= actor.ask(_.hello("World"))response.foreach(println)
valresponseFlatMap:Future[String]= actor.askFlatMap(_.helloFuture("World"))responseFlatMap.foreach(println)
valunitResponse:Unit= actor.send(_.hello("World again!"))
//schedule a function call on the actor. Returns Future response and TimerTask to cancel.valscheduleResponse: (Future[String],TimerTask)= actor.scheduleAsk(delay=1.second)(_.hello("World"))scheduleResponse._1.foreach(println)
importscala.concurrent.duration._importscala.concurrent.ExecutionContext.Implicits.globalobjectWiredPingPongDemoextendsApp {classWiredPingPong(varpingCount:Int,varpongCount:Int) {defping(replyTo:WiredActor[WiredPingPong]):Unit= { pingCount+=1 println(s"pingCount:$pingCount") replyTo.send(_.pong(replyTo)) }defpong(replyTo:WiredActor[WiredPingPong]):Unit= { pongCount+=1 println(s"pongCount:$pongCount") replyTo.send(_.ping(replyTo)) } }Actor .wire(newWiredPingPong(0,0)) .send { (impl, self)=> impl.ping(self) }Thread.sleep(1.seconds.toMillis)}
SeeWiredPingPongStateless for a statelessversion of the aboveWiredPingPong
WiredActor.
About
A type-safe Actor class and WiredActor for sending functions as messages
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Sponsor this project
Uh oh!
There was an error while loading.Please reload this page.
Packages0
Uh oh!
There was an error while loading.Please reload this page.