Movatterモバイル変換


[0]ホーム

URL:


Docs.rs

Crategabriel2

Source
Expand description

§Gabriel2

Gabriel2

Gabriel2: Indeed, an actor library based on Tokio, written in Rust

§Features

  • Async for sending messages
  • Async for messages processing in actor
  • Support messaging like send and forget
  • Support messaging like send and wait response
  • Mutable state of actor
  • Self reference in actor from context
  • Actor lifecycle (pre_start, pre_stop)
  • Sink to actor
  • Stream from actor
  • Remote Actor
  • Event Bus
  • Load Balancer

§Usage

Cargo.toml

[dependencies]gabriel2 = { version = "1.5.0", features = ["remote", "sink-stream", "broadcast", "balancer"] }

echo.rs

usestd::sync::Arc;usegabriel2::*;usebincode::{Decode, Encode};usederive_more::{Display, Error};#[derive(Debug)]pub structEchoActor;#[derive(Debug)]pub enumEchoMessage {    Ping,}#[derive(Debug)]pub enumEchoResponse {    Pong {counter: u32},}#[derive(Debug,Clone)]pub structEchoState {pubcounter: u32,}#[derive(Debug, Display, Error)]pub enumEchoError {#[display(fmt ="Unknown error")]Unknown,}implFrom<std::io::Error>forEchoError {fnfrom(_err: std::io::Error) ->Self{        EchoError::Unknown    }}implHandlerforEchoActor {typeActor = EchoActor;typeMessage = EchoMessage;typeState = EchoState;typeResponse = EchoResponse;typeError = EchoError;async fnreceive(&self, ctx: Arc<Context<Self::Actor,Self::Message,Self::State,Self::Response,Self::Error>>) ->Result<EchoResponse, EchoError> {matchctx.mgs {            EchoMessage::Ping => {println!("Received Ping");letmutstate_lock = ctx.state.lock().await;                state_lock.counter +=1;ifstate_lock.counter >10{Err(EchoError::Unknown)                }else{Ok(EchoResponse::Pong{counter: state_lock.counter})                }            }        }    }}

main.rs

#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState {        counter:0,    };letecho_ref = ActorRef::new("echo", EchoActor {}, state,100000).await?;println!("Sent Ping");    echo_ref.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");letpong = echo_ref.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong);_= echo_ref.stop().await;Ok(())}

Example output:

Sent PingSent Ping and ask responseReceived PingReceived PingGot Pong { counter: 2 }

Example sources: https://github.com/igumnoff/gabriel2/tree/main/test

§Sink

#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState {        counter:0,    };letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;letecho_sink = ActorSink::sink(echo_ref.clone());letmessage_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);_= message_stream.forward(echo_sink).await;    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}

Example output:

Received PingReceived PingReceived Ping

§Stream

#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState {        counter:0,    };letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;let(echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());letmessage_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);_= message_stream.forward(echo_sink).await;    echo_stream.for_each(|message|async move{println!("Got {:?}", message.unwrap());    }).await;    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}

Example output:

Received PingReceived PingReceived PingGot Pong { counter:1}Got Pong { counter:2}Got Pong { counter:3}

§Remote

Preparations for remote:

Add Encode, Decode from “bincode” to derive(..) for EchoActor, EchoMessage, EchoResponse, EchoState and EchoError

Remote version:

#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState {        counter:0,    };letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;letecho_server = ActorServer::new("echo_server","127.0.0.1",9001, echo_ref).await?;letecho_client: Arc<ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError >> = ActorClient::new("echo_client","127.0.0.1",9001).await?;println!("Sent Ping");    echo_client.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");letpong = echo_client.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong);_= echo_client.stop().await;_= echo_server.stop().await;Ok(())}

§Event Bus

#[tokio::main]async fnmain() ->Result<(), EchoError> {letstate = EchoState {        counter:0,    };#[derive(Debug, Copy, Clone)]enumEventElement {        Fire,        Water    }letecho_ref = ActorRef::new("echo",crate::echo::EchoActor {}, state,100000).await?;letevent_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());letsubscriber_id = event_bus.subscribe(move|event: EventElement| {async move{matchevent {                EventElement::Fire => {let _= echo_ref.send(EchoMessage::Ping).await;                    ()                },_=> ()            }        }}).await;    event_bus.publish(EventElement::Fire).await;    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;    event_bus.unsubscribe(subscriber_id).await;Ok(())}

§Load Balancer

#[tokio::main]async fnmain() ->Result<(), EchoError> {letecho_load_balancer: Arc<LoadBalancer<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>> =        LoadBalancer::new("echo_load_balancer",10, |id: usize| {            Box::pin(async move{letuser: Arc<                    ActorRef<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>,                > = ActorRef::new(format!("echo-{}", id),                    EchoActor {},                    EchoState { counter:0},10000,                )                    .await?;Ok(user)            })        })            .await.unwrap();for _ in0..30{        echo_load_balancer.send(EchoMessage::Ping).await?;    }Ok(())}

§Contributing

I would love to see contributions from the community. If you experience bugs, feel free to open an issue. If you would like to implement a new feature or bug fix, please follow the steps:

  1. Read “Contributor License Agreement (CLA)
  2. Contact with me via telegram @ievkz or discord @igumnovnsk
  3. Confirm e-mail invitation in repository
  4. Do “git clone” (You don’t need to fork!)
  5. Create branch with your assigned issue
  6. Create pull request to main branch

Modules§

balancer
broadcast
remote
sink_stream

Structs§

ActorRef
ActorRef is a structure that represents a reference to an actor in an actor system.It contains the necessary components to interact with the actor and manage its state.
Context
Context is a structure that represents the context in which an actor operates in an actor system.It contains the necessary components for an actor to process a message and manage its state.

Traits§

ActorRefTrait
TheActorRefTrait trait defines the behavior of an actor reference in an actor system.It provides methods for creating a new actor reference and getting the state of the actor.
ActorTrait
TheActorTrait trait defines the behavior of an actor in an actor system.It provides methods for sending messages to the actor, asking the actor for a response, and stopping the actor.
Handler
TheHandler trait defines the behavior of an actor in an actor system.It provides methods for handling incoming messages and lifecycle events.
SSSD
SSSD is a trait that represents a type that isSend,Sync,Debug, and'static.

[8]ページ先頭

©2009-2025 Movatter.jp