Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A type-safe Actor class and WiredActor for sending functions as messages

License

NotificationsYou must be signed in to change notification settings

simerplaha/Actor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Actor - A small type-safe class that implements the most commonly used Actor APIsincluding ask (?) which returns a typedFuture[R].

WiredActor - Convert anyobject to anActor that allows invoking theobject's function as messages.

Setup

libraryDependencies+="com.github.simerplaha"%%"actor"%"0.3"

Make sure you haveExecutionContext in scope

importscala.concurrent.ExecutionContext.Implicits.global

Actor

Stateless Actor

valactor=Actor[Int](    (message, self)=>      println(message)  )actor!1

Stateful Actor

caseclassState(varcounter:Int)valactor=Actor[Int,State](State(0))(    (message, self)=>      self.state.counter+=1  )

Timer actor

A timer actor will process messages in batches after the set delays. Similar to above a stateful timer Actorcan also be created.

importscala.concurrent.duration._//stateless timer actorvalactor=Actor.timer[Int](delays=1.second) {    (message, self)=>//do something  }

Scheduling messages to self

self.schedule returns a javaTimerTask which is cancellable.

valactor=Actor[Int](    (message, self)=>      self.schedule(message=1, delay=1.second)    )

Terminating an Actor

valactor=Actor[Int](    (message, self)=>      println(message)  )actor.terminate()//cannot send messages to a terminated actor.(actor!1) shouldBeLeft(Result.TerminatedActor)

Ask - Get a Future response

caseclassCreateUser(name:String)(valreplyTo:ActorRef[Boolean])valactor=Actor[CreateUser] {  (message:CreateUser, _)=>    message.replyTo!true}valresponse:Future[Boolean]= (actor?CreateUser("Tony Stark")).right.getAwait.result(response,1.second)

Terminating an Actor on message failure

By default actors are not terminated if there is a failure processing a message. Thefollowing actor enables termination if there is a failure processing a message.

valactor=Actor[Int](    (message, self)=>thrownewException("Kaboom!")  ).terminateOnException()//enable terminate on exception(actor!1) shouldBeRight(Result.Sent)//first message sent is successfuleventually(actor.isTerminated() shouldBetrue)//actor is terminated(actor!2) shouldBeLeft(Result.TerminatedActor)//cannot sent messages to a terminated actor

Testing

Borrowing ideas from Akka theTestActor implements APIs to test messages in an Actor's mailbox.

valactor=TestActor[Int]()actor.expectNoMessage(after=1.second)//expect a message after delay in the Actor's mailboxvalgot= actor.getMessage()//fetch the first message in the actor's mailboxactor.expectMessage[Int]()//expect a message of some type

PingPong example

importscala.concurrent.ExecutionContext.Implicits.globalimportscala.concurrent.duration._importcom.github.simerplaha.actor._caseclassPong(replyTo:ActorRef[Ping])caseclassPing(replyTo:ActorRef[Pong])caseclassState(varcount:Int)valping=Actor[Ping,State](State(0)) {case (message, self)=>      self.state.count+=1      println(s"Ping:${self.state.count}")      message.replyTo!Pong(self)  }valpong=Actor[Pong,State](State(0)) {case (message, self)=>      self.state.count+=1      println(s"Pong:${self.state.count}")      message.replyTo!Ping(self)  }pong!Pong(ping)//run this for 1 secondsThread.sleep(1.second.toMillis)

WiredActor

Functions can be sent, invoked & scheduled as messages toWiredActors similar to messages in anActor.

WiredActors can be created on anyclass instance orobject.

Create aWiredActor

//your class that contains Actor functionsobjectMyImpl {defhello(name:String):String=s"Hello$name"defhelloFuture(name:String):Future[String]=Future(s"Hello$name from the Future!")//some future operation}//create WiredActorvalactor=Actor.wire(MyImpl)

ask

//invoke functionvalresponse:Future[String]= actor.ask(_.hello("World"))response.foreach(println)

askFlatMap

valresponseFlatMap:Future[String]= actor.askFlatMap(_.helloFuture("World"))responseFlatMap.foreach(println)

send

valunitResponse:Unit= actor.send(_.hello("World again!"))

schedule

//schedule a function call on the actor. Returns Future response and TimerTask to cancel.valscheduleResponse: (Future[String],TimerTask)= actor.scheduleAsk(delay=1.second)(_.hello("World"))scheduleResponse._1.foreach(println)

PingPong example usingWiredActor

importscala.concurrent.duration._importscala.concurrent.ExecutionContext.Implicits.globalobjectWiredPingPongDemoextendsApp {classWiredPingPong(varpingCount:Int,varpongCount:Int) {defping(replyTo:WiredActor[WiredPingPong]):Unit= {      pingCount+=1      println(s"pingCount:$pingCount")      replyTo.send(_.pong(replyTo))    }defpong(replyTo:WiredActor[WiredPingPong]):Unit= {      pongCount+=1      println(s"pongCount:$pongCount")      replyTo.send(_.ping(replyTo))    }  }Actor    .wire(newWiredPingPong(0,0))    .send {      (impl, self)=>        impl.ping(self)    }Thread.sleep(1.seconds.toMillis)}

SeeWiredPingPongStateless for a statelessversion of the aboveWiredPingPong WiredActor.

About

A type-safe Actor class and WiredActor for sending functions as messages

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp