- Notifications
You must be signed in to change notification settings - Fork6
Gabriel2: Indeed, an actor library based on Tokio, written in Rust
License
NotificationsYou must be signed in to change notification settings
igumnoff/gabriel2
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Gabriel2: Indeed, an actor library based on Tokio, written in Rust
- Async for sending messages
- Async for messages processing in actor
- Support messaging like send and forget
- Support messaging like send and wait response
- Mutable state of actor
- Self reference in actor from context
- Actor lifecycle (pre_start, pre_stop)
- Sink to actor
- Stream from actor
- Remote Actor
- Event Bus
- Load Balancer
Cargo.toml
[dependencies]gabriel2 = {version ="1.5.0",features = ["remote","sink-stream","broadcast","balancer"] }
echo.rs
use std::sync::Arc;use gabriel2::*;use bincode::{Decode,Encode};use derive_more::{Display,Error};#[derive(Debug)]pubstructEchoActor;#[derive(Debug)]pubenumEchoMessage{Ping,}#[derive(Debug)]pubenumEchoResponse{Pong{counter:u32},}#[derive(Debug,Clone)]pubstructEchoState{pubcounter:u32,}#[derive(Debug,Display,Error)]pubenumEchoError{#[display(fmt ="Unknown error")]Unknown,}implFrom<std::io::Error>forEchoError{fnfrom(_err: std::io::Error) ->Self{EchoError::Unknown}}implHandlerforEchoActor{typeActor =EchoActor;typeMessage =EchoMessage;typeState =EchoState;typeResponse =EchoResponse;typeError =EchoError;asyncfnreceive(&self,ctx:Arc<Context<Self::Actor,Self::Message,Self::State,Self::Response,Self::Error>>) ->Result<EchoResponse,EchoError>{match ctx.mgs{EchoMessage::Ping =>{println!("Received Ping");letmut state_lock = ctx.state.lock().await; state_lock.counter +=1;if state_lock.counter >10{Err(EchoError::Unknown)}else{Ok(EchoResponse::Pong{counter: state_lock.counter})}}}}}
main.rs
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",EchoActor{}, state,100000).await?;println!("Sent Ping"); echo_ref.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");let pong = echo_ref.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong); _ = echo_ref.stop().await;Ok(())}
Example output:
Sent PingSent Ping and ask responseReceived PingReceived PingGot Pong { counter: 2 }
Example sources:https://github.com/igumnoff/gabriel2/tree/main/test
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let echo_sink =ActorSink::sink(echo_ref.clone());let message_stream = futures::stream::iter(vec![EchoMessage::Ping,EchoMessage::Ping,EchoMessage::Ping]).map(Ok); _ = message_stream.forward(echo_sink).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}
Example output:
Received PingReceived PingReceived Ping
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let(echo_sink, echo_stream) =ActorSink::sink_stream(echo_ref.clone());let message_stream = futures::stream::iter(vec![EchoMessage::Ping,EchoMessage::Ping,EchoMessage::Ping]).map(Ok); _ = message_stream.forward(echo_sink).await; echo_stream.for_each(|message|asyncmove{println!("Got {:?}", message.unwrap());}).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}
Example output:
Received PingReceived PingReceived PingGot Pong { counter: 1 }Got Pong { counter: 2 }Got Pong { counter: 3 }
Preparations for remote:
Add Encode, Decode from "bincode" to derive(..) for EchoActor, EchoMessage, EchoResponse, EchoState and EchoError
Remote version:
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let echo_server =ActorServer::new("echo_server","127.0.0.1",9001, echo_ref).await?;let echo_client:Arc<ActorClient<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>> =ActorClient::new("echo_client","127.0.0.1",9001).await?;println!("Sent Ping"); echo_client.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");let pong = echo_client.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong); _ = echo_client.stop().await; _ = echo_server.stop().await;Ok(())}
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};#[derive(Debug,Copy,Clone)]enumEventElement{Fire,Water}let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let event_bus:Arc<EventBus<EventElement>> =Arc::new(EventBus::new());let subscriber_id = event_bus.subscribe(move |event:EventElement|{asyncmove{match event{EventElement::Fire =>{let _ = echo_ref.send(EchoMessage::Ping).await;()}, _ =>()}}}).await; event_bus.publish(EventElement::Fire).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; event_bus.unsubscribe(subscriber_id).await;Ok(())}
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let echo_load_balancer:Arc<LoadBalancer<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>> =LoadBalancer::new("echo_load_balancer",10, |id:usize|{Box::pin(asyncmove{let user:Arc<ActorRef<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>,> =ActorRef::new(format!("echo-{}", id),EchoActor{},EchoState{counter:0},10000,).await?;Ok(user)})}).await.unwrap();for _in0..30{ echo_load_balancer.send(EchoMessage::Ping).await?;}Ok(())}
I would love to see contributions from the community. If you experience bugs, feel free to open an issue. If you would like to implement a new feature or bug fix, please follow the steps:
- Read "Contributor License Agreement (CLA)"
- Contact with me via telegram @ievkz or discord @igumnovnsk
- Confirm e-mail invitation in repository
- Do "git clone" (You don't need to fork!)
- Create branch with your assigned issue
- Create pull request to main branch