Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Gabriel2: Indeed, an actor library based on Tokio, written in Rust

License

NotificationsYou must be signed in to change notification settings

igumnoff/gabriel2

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Gabriel2

Gabriel2: Indeed, an actor library based on Tokio, written in Rust

Features

  • Async for sending messages
  • Async for messages processing in actor
  • Support messaging like send and forget
  • Support messaging like send and wait response
  • Mutable state of actor
  • Self reference in actor from context
  • Actor lifecycle (pre_start, pre_stop)
  • Sink to actor
  • Stream from actor
  • Remote Actor
  • Event Bus
  • Load Balancer

Usage

Cargo.toml

[dependencies]gabriel2 = {version ="1.5.0",features = ["remote","sink-stream","broadcast","balancer"] }

echo.rs

use std::sync::Arc;use gabriel2::*;use bincode::{Decode,Encode};use derive_more::{Display,Error};#[derive(Debug)]pubstructEchoActor;#[derive(Debug)]pubenumEchoMessage{Ping,}#[derive(Debug)]pubenumEchoResponse{Pong{counter:u32},}#[derive(Debug,Clone)]pubstructEchoState{pubcounter:u32,}#[derive(Debug,Display,Error)]pubenumEchoError{#[display(fmt ="Unknown error")]Unknown,}implFrom<std::io::Error>forEchoError{fnfrom(_err: std::io::Error) ->Self{EchoError::Unknown}}implHandlerforEchoActor{typeActor =EchoActor;typeMessage =EchoMessage;typeState =EchoState;typeResponse =EchoResponse;typeError =EchoError;asyncfnreceive(&self,ctx:Arc<Context<Self::Actor,Self::Message,Self::State,Self::Response,Self::Error>>) ->Result<EchoResponse,EchoError>{match ctx.mgs{EchoMessage::Ping =>{println!("Received Ping");letmut state_lock = ctx.state.lock().await;                state_lock.counter +=1;if state_lock.counter >10{Err(EchoError::Unknown)}else{Ok(EchoResponse::Pong{counter: state_lock.counter})}}}}}

main.rs

#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",EchoActor{}, state,100000).await?;println!("Sent Ping");    echo_ref.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");let pong = echo_ref.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong);    _ = echo_ref.stop().await;Ok(())}

Example output:

Sent PingSent Ping and ask responseReceived PingReceived PingGot Pong { counter: 2 }

Example sources:https://github.com/igumnoff/gabriel2/tree/main/test

Sink

#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let echo_sink =ActorSink::sink(echo_ref.clone());let message_stream = futures::stream::iter(vec![EchoMessage::Ping,EchoMessage::Ping,EchoMessage::Ping]).map(Ok);    _ = message_stream.forward(echo_sink).await;    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}

Example output:

Received PingReceived PingReceived Ping

Stream

#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let(echo_sink, echo_stream) =ActorSink::sink_stream(echo_ref.clone());let message_stream = futures::stream::iter(vec![EchoMessage::Ping,EchoMessage::Ping,EchoMessage::Ping]).map(Ok);    _ = message_stream.forward(echo_sink).await;    echo_stream.for_each(|message|asyncmove{println!("Got {:?}", message.unwrap());}).await;    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;Ok(())}

Example output:

Received PingReceived PingReceived PingGot Pong { counter: 1 }Got Pong { counter: 2 }Got Pong { counter: 3 }

Remote

Preparations for remote:

Add Encode, Decode from "bincode" to derive(..) for EchoActor, EchoMessage, EchoResponse, EchoState and EchoError

Remote version:

#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let echo_server =ActorServer::new("echo_server","127.0.0.1",9001, echo_ref).await?;let echo_client:Arc<ActorClient<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>> =ActorClient::new("echo_client","127.0.0.1",9001).await?;println!("Sent Ping");    echo_client.send(EchoMessage::Ping).await?;println!("Sent Ping and ask response");let pong = echo_client.ask(EchoMessage::Ping).await?;println!("Got {:?}", pong);    _ = echo_client.stop().await;    _ = echo_server.stop().await;Ok(())}

Event Bus

#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let state =EchoState{counter:0,};#[derive(Debug,Copy,Clone)]enumEventElement{Fire,Water}let echo_ref =ActorRef::new("echo",crate::echo::EchoActor{}, state,100000).await?;let event_bus:Arc<EventBus<EventElement>> =Arc::new(EventBus::new());let subscriber_id = event_bus.subscribe(move |event:EventElement|{asyncmove{match event{EventElement::Fire =>{let _ = echo_ref.send(EchoMessage::Ping).await;()},                _ =>()}}}).await;    event_bus.publish(EventElement::Fire).await;    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;    event_bus.unsubscribe(subscriber_id).await;Ok(())}

Load Balancer

#[tokio::main]asyncfnmain() ->Result<(),EchoError>{let echo_load_balancer:Arc<LoadBalancer<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>> =LoadBalancer::new("echo_load_balancer",10, |id:usize|{Box::pin(asyncmove{let user:Arc<ActorRef<EchoActor,EchoMessage,EchoState,EchoResponse,EchoError>,> =ActorRef::new(format!("echo-{}", id),EchoActor{},EchoState{counter:0},10000,).await?;Ok(user)})}).await.unwrap();for _in0..30{        echo_load_balancer.send(EchoMessage::Ping).await?;}Ok(())}

Contributing

I would love to see contributions from the community. If you experience bugs, feel free to open an issue. If you would like to implement a new feature or bug fix, please follow the steps:

  1. Read "Contributor License Agreement (CLA)"
  2. Contact with me via telegram @ievkz or discord @igumnovnsk
  3. Confirm e-mail invitation in repository
  4. Do "git clone" (You don't need to fork!)
  5. Create branch with your assigned issue
  6. Create pull request to main branch

About

Gabriel2: Indeed, an actor library based on Tokio, written in Rust

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp