- Notifications
You must be signed in to change notification settings - Fork6
Gabriel2: Indeed, an actor library based on Tokio, written in Rust
License
igumnoff/gabriel2
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Gabriel2: Indeed, an actor library based on Tokio, written in Rust
- Async for sending messages
- Async for messages processing in actor
- Support messaging like send and forget
- Support messaging like send and wait response
- Mutable state of actor
- Self reference in actor from context
- Actor lifecycle (pre_start, pre_stop)
- Sink to actor
- Stream from actor
- Remote Actor
- Event Bus
- Load Balancer
Cargo.toml
[dependencies]gabriel2 = {version ="1.4.1",features = ["remote","sink-stream","broadcast"] }
echo.rs
use std::sync::Arc;use gabriel2::*;use bincode::{Decode,Encode};use derive_more::{Display,Error};#[derive(Debug)]pubstructEchoActor;#[derive(Debug)]pubenumEchoMessage{Ping,}#[derive(Debug)]pubenumEchoResponse{Pong{counter:u32},}#[derive(Debug,Clone)]pubstructEchoState{pubcounter:u32,}#[derive(Debug,Display,Error)]pubenumEchoError{#[display(fmt ="Unknown error")]Unknown,}implFrom<std::io::Error>forEchoError{fnfrom(_err: std::io::Error) ->Self{EchoError::Unknown}}implHandlerforEchoActor{typeActor =EchoActor;typeMessage =EchoMessage;typeState =EchoState;typeResponse =EchoResponse;typeError =EchoError;asyncfnreceive(&self,ctx:Arc<Context<Self::Actor,Self::Message,Self::State,Self::Response,Self::Error>>) ->Result<EchoResponse,EchoError>{match ctx.mgs{EchoMessage::Ping =>{println!("Received Ping");letmut state_lock = ctx.state.lock().await; state_lock.counter +=1;if state_lock.counter >10{Err(EchoError::Unknown)}else{Ok(EchoResponse::Pong{counter: state_lock.counter})}}}}}
main.rs
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",EchoActor{}, state,100000).await?;println!("Sent Ping"); echo_ref.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");let pong = echo_ref.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong); _ = echo_ref.stop().await;Ok(())}
Example output:
Sent PingSent Ping and ask responseReceived PingReceived PingGot Pong { counter: 2 }
Example sources:https://github.com/igumnoff/gabriel2/tree/main/test
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let echo_sink =ActorSink::sink(echo_ref.clone());let message_stream = futures::stream::iter(vec![EchoMessage::Ping,EchoMessage::Ping,EchoMessage::Ping]).map(Ok); _ = message_stream.forward(echo_sink).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}
Example output:
Received PingReceived PingReceived Ping
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let(echo_sink, echo_stream) =ActorSink::sink_stream(echo_ref.clone());let message_stream = futures::stream::iter(vec![EchoMessage::Ping,EchoMessage::Ping,EchoMessage::Ping]).map(Ok); _ = message_stream.forward(echo_sink).await; echo_stream.for_each(|message|asyncmove{println!("Got {:?}", message.unwrap());}).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}
Example output:
Received PingReceived PingReceived PingGot Pong { counter: 1 }Got Pong { counter: 2 }Got Pong { counter: 3 }
Preparations for remote:
Add Encode, Decode from "bincode" to derive(..) for EchoActor, EchoMessage, EchoResponse, EchoState and EchoError
Remote version:
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let echo_server =ActorServer::new("echo_server","127.0.0.1",9001, echo_ref).await?;let echo_client:Arc<ActorClient<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>> =ActorClient::new("echo_client","127.0.0.1",9001).await?;println!("Sent Ping"); echo_client.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");let pong = echo_client.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong); _ = echo_client.stop().await; _ = echo_server.stop().await;Ok(())}
#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};#[derive(Debug,Copy,Clone)]enumEventElement{Fire,Water}let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let event_bus:Arc<EventBus<EventElement>> =Arc::new(EventBus::new());let subscriber_id = event_bus.subscribe(move |event:EventElement|{asyncmove{match event{EventElement::Fire =>{let _ = echo_ref.send(EchoMessage::Ping).await;()}, _ =>()}}}).await; event_bus.publish(EventElement::Fire).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; event_bus.unsubscribe(subscriber_id).await;Ok(())}
I would love to see contributions from the community. If you experience bugs, feel free to open an issue. If you would like to implement a new feature or bug fix, please follow the steps:
- Read "Contributor License Agreement (CLA)"
- Contact with me via telegram @ievkz or discord @igumnovnsk
- Confirm e-mail invitation in repository
- Do "git clone" (You don't need to fork!)
- Create branch with your assigned issue
- Create pull request to main branch
About
Gabriel2: Indeed, an actor library based on Tokio, written in Rust
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors2
Uh oh!
There was an error while loading.Please reload this page.