Expand description
§Gabriel2
Gabriel2: Indeed, an actor library based on Tokio, written in Rust
§Features
- Async for sending messages
- Async for messages processing in actor
- Support messaging like send and forget
- Support messaging like send and wait response
- Mutable state of actor
- Self reference in actor from context
- Actor lifecycle (pre_start, pre_stop)
- Sink to actor
- Stream from actor
- Remote Actor
- Event Bus
- Load Balancer
§Usage
Cargo.toml
[dependencies]gabriel2 = { version = "1.5.0", features = ["remote", "sink-stream", "broadcast", "balancer"] }
echo.rs
usestd::sync::Arc;usegabriel2::*;usebincode::{Decode, Encode};usederive_more::{Display, Error};#[derive(Debug)]pub structEchoActor;#[derive(Debug)]pub enumEchoMessage { Ping,}#[derive(Debug)]pub enumEchoResponse { Pong {counter: u32},}#[derive(Debug,Clone)]pub structEchoState {pubcounter: u32,}#[derive(Debug, Display, Error)]pub enumEchoError {#[display(fmt ="Unknown error")]Unknown,}implFrom<std::io::Error>forEchoError {fnfrom(_err: std::io::Error) ->Self{ EchoError::Unknown }}implHandlerforEchoActor {typeActor = EchoActor;typeMessage = EchoMessage;typeState = EchoState;typeResponse = EchoResponse;typeError = EchoError;async fnreceive(&self, ctx: Arc<Context<Self::Actor,Self::Message,Self::State,Self::Response,Self::Error>>) ->Result<EchoResponse, EchoError> {matchctx.mgs { EchoMessage::Ping => {println!("Received Ping");letmutstate_lock = ctx.state.lock().await; state_lock.counter +=1;ifstate_lock.counter >10{Err(EchoError::Unknown) }else{Ok(EchoResponse::Pong{counter: state_lock.counter}) } } } }}
main.rs
#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState { counter:0, };letecho_ref = ActorRef::new("echo", EchoActor {}, state,100000).await?;println!("Sent Ping"); echo_ref.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");letpong = echo_ref.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong);_= echo_ref.stop().await;Ok(())}
Example output:
Sent PingSent Ping and ask responseReceived PingReceived PingGot Pong { counter: 2 }
Example sources: https://github.com/igumnoff/gabriel2/tree/main/test
§Sink
#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState { counter:0, };letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;letecho_sink = ActorSink::sink(echo_ref.clone());letmessage_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);_= message_stream.forward(echo_sink).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}
Example output:
Received PingReceived PingReceived Ping
§Stream
#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState { counter:0, };letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;let(echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());letmessage_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);_= message_stream.forward(echo_sink).await; echo_stream.for_each(|message|async move{println!("Got {:?}", message.unwrap()); }).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}
Example output:
Received PingReceived PingReceived PingGot Pong { counter:1}Got Pong { counter:2}Got Pong { counter:3}
§Remote
Preparations for remote:
Add Encode, Decode from “bincode” to derive(..) for EchoActor, EchoMessage, EchoResponse, EchoState and EchoError
Remote version:
#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState { counter:0, };letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;letecho_server = ActorServer::new("echo_server","127.0.0.1",9001, echo_ref).await?;letecho_client: Arc<ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError >> = ActorClient::new("echo_client","127.0.0.1",9001).await?;println!("Sent Ping"); echo_client.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");letpong = echo_client.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong);_= echo_client.stop().await;_= echo_server.stop().await;Ok(())}
§Event Bus
#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState { counter:0, };#[derive(Debug, Copy, Clone)]enumEventElement { Fire, Water }letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;letevent_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());letsubscriber_id = event_bus.subscribe(move|event: EventElement| {async move{matchevent { EventElement::Fire => {let _= echo_ref.send(EchoMessage::Ping).await; () },_=> () } }}).await; event_bus.publish(EventElement::Fire).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; event_bus.unsubscribe(subscriber_id).await;Ok(())}
§Load Balancer
#[tokio::main]async fnmain() ->Result<(), EchoError> {letecho_load_balancer: Arc<LoadBalancer<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>> = LoadBalancer::new("echo_load_balancer",10, |id: usize| { Box::pin(async move{letuser: Arc< ActorRef<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>, > = ActorRef::new(format!("echo-{}", id), EchoActor {}, EchoState { counter:0},10000, ) .await?;Ok(user) }) }) .await.unwrap();for _ in0..30{ echo_load_balancer.send(EchoMessage::Ping).await?; }Ok(())}
§Contributing
I would love to see contributions from the community. If you experience bugs, feel free to open an issue. If you would like to implement a new feature or bug fix, please follow the steps:
- Read “Contributor License Agreement (CLA)”
- Contact with me via telegram @ievkz or discord @igumnovnsk
- Confirm e-mail invitation in repository
- Do “git clone” (You don’t need to fork!)
- Create branch with your assigned issue
- Create pull request to main branch
Modules§
Structs§
- Actor
Ref ActorRef
is a structure that represents a reference to an actor in an actor system.It contains the necessary components to interact with the actor and manage its state.- Context
Context
is a structure that represents the context in which an actor operates in an actor system.It contains the necessary components for an actor to process a message and manage its state.
Traits§
- Actor
RefTrait - The
ActorRefTrait
trait defines the behavior of an actor reference in an actor system.It provides methods for creating a new actor reference and getting the state of the actor. - Actor
Trait - The
ActorTrait
trait defines the behavior of an actor in an actor system.It provides methods for sending messages to the actor, asking the actor for a response, and stopping the actor. - Handler
- The
Handler
trait defines the behavior of an actor in an actor system.It provides methods for handling incoming messages and lifecycle events. - SSSD
SSSD
is a trait that represents a type that isSend
,Sync
,Debug
, and'static
.