Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Server-Sent Events in Rust
Praveen Chaudhary
Praveen Chaudhary

Posted on

     

Server-Sent Events in Rust

What is a Server-Sent Events?

It is a standard that enables the servers to send a stream of events to the subscribed clients.

Why do we need SSE?

SSE helps to get real-time data from the server. Web sockets can do the same but they are bidirectional and the client can also send the data to the server.

They can be overkill for the applications which need to send updates to the clients only. So we need SSE for that purpose.

Some of the Uses Cases

  • Twitter feeds
  • Score Updates
  • Stocks rates

How does SSE works?

When a client requests, it forms an EventSource Object with the url of the server endpoint over the regular HTTP request.

The server responds back with an event stream and keeps the connection until it feels that it has no events to send (or stable) or the client explicitly close the connection.

How sse works?

How to implement SSE with actix-web?

Let's follow the easy steps:-

Libraries required:-

  • actix-web:- A powerful, pragmatic, and extremely fast web framework for Rust
  • actix-web-lab:- Experimental extractors, middleware, and other extras for possible inclusion in Actix Web.

1. Setting Up Actix-web

Initializing the Rust Project

Start a new project with the followingcargo new <file-name>.

Implementing basic actix server

Let's clone the sample code fromactix official docs.

Add dependency in cargo.toml

....[dependencies]actix-web = "4"#sseactix-web-lab = "0.18.5"parking_lot = "0.12.1"futures-util = { version = "0.3.25", default-features = false, features = ["std"] }....
Enter fullscreen modeExit fullscreen mode

Let's write code for actix server.

use actix_web::{web, App, HttpServer};#[actix_web::main]async fn main() -> std::io::Result<()> {    HttpServer::new(|| {        App::new()            .route("/hello", web::get().to(|| async { "Hello World!" }))    })    .bind(("127.0.0.1", 8080))?    .run()    .await}
Enter fullscreen modeExit fullscreen mode

Implementing SSE using actix-web-lab

Broadcaster

  • Initialise the client struct

    #[derive(Debug, Clone, Default)]struct BroadcasterInner {    clients: Vec<sse::Sender>,}
    • The Debug trait is used for printing or debugging
    • The Clone trait let you clone the BroadcasterInner/ clients
    • Default trait lets you assign default value on initialisation
  • Initialise the Broadcaster struct

    pub struct Broadcaster {        inner: Mutex<BroadcasterInner>,    }
Enter fullscreen modeExit fullscreen mode

We have used the Mutex to mutate the value safely across the threads.

  • Creating methods to implement ping, create a new instance, remove stale client, create new client, and broadcast message.

    • Create a new broadcaster instance

      impl Broadcaster {    ....    pub fn create() -> Arc<Self> {        let this = Arc::new(Broadcaster {            inner: Mutex::new(BroadcasterInner::default()),        });        Broadcaster::spawn_ping(Arc::clone(&this));        this    }    ....}

      This method will create a broadcaster instance which will ping the clients.

    • ping client

      fn spawn_ping(this: Arc<Self>) {    actix_web::rt::spawn(async move {        let mut interval = interval(Duration::from_secs(10));        loop {            interval.tick().await;            this.remove_stale_clients().await;        }    });}

      This ping method is used to know the active clients and remove the stable ones. You can modify the logic as per your requirement.

    • Remove stale clients

      async fn remove_stale_clients(&self) {    let clients = self.inner.lock().clients.clone();    println!("active client {:?}",clients);    let mut ok_clients = Vec::new();    println!("okay active client {:?}",ok_clients);    for client in clients {        if client            .send(sse::Event::Comment("ping".into()))            .await            .is_ok()        {            ok_clients.push(client.clone());        }    }    self.inner.lock().clients = ok_clients;}

      This method will revalidate the client list and removes stale one.

    • Create a new client

      pub async fn new_client(&self) -> Sse<ChannelStream> {    println!("starting creation");    let (tx, rx) = sse::channel(10);    tx.send(sse::Data::new("connected")).await.unwrap();    println!("creating new clients success {:?}",tx);    self.inner.lock().clients.push(tx);    rx}

      sse::channel will create sender and receiver pair. The channel will take a buffer parameter which is the count of unsend messages stored without waiting.

      The first part -> sender can be cloned and shared across threads.

      The second part -> receiver is the regular event stream.

    • Broadcast new message

      pub async fn broadcast(&self, msg: &str) {    let clients = self.inner.lock().clients.clone();    let send_futures = clients        .iter()        .map(|client| client.send(sse::Data::new(msg)));    // try to send to all clients, ignoring failures    // disconnected clients will get swept up by `remove_stale_clients`    let _ = future::join_all(send_futures).await;}

      It will broadcast messages to the active clients.

    • Complete Broadcaster

      use std::{sync::Arc, time::Duration};use actix_web::rt::time::interval;use actix_web_lab::sse::{self, ChannelStream, Sse};use futures_util::future;use parking_lot::Mutex;pub struct Broadcaster {    inner: Mutex<BroadcasterInner>,}#[derive(Debug, Clone, Default)]struct BroadcasterInner {    clients: Vec<sse::Sender>,}impl Broadcaster {    /// Constructs new broadcaster and spawns ping loop.    pub fn create() -> Arc<Self> {        let this = Arc::new(Broadcaster {            inner: Mutex::new(BroadcasterInner::default()),        });        Broadcaster::spawn_ping(Arc::clone(&this));        // println!("created success");        this    }    /// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast list if not.    fn spawn_ping(this: Arc<Self>) {        actix_web::rt::spawn(async move {            let mut interval = interval(Duration::from_secs(10));            loop {                interval.tick().await;                this.remove_stale_clients().await;            }        });    }    /// Removes all non-responsive clients from broadcast list.    async fn remove_stale_clients(&self) {        let clients = self.inner.lock().clients.clone();        println!("active client {:?}",clients);        let mut ok_clients = Vec::new();        println!("okay active client {:?}",ok_clients);        for client in clients {            if client                .send(sse::Event::Comment("ping".into()))                .await                .is_ok()            {                ok_clients.push(client.clone());            }        }        self.inner.lock().clients = ok_clients;    }    /// Registers client with broadcaster, returning an SSE response body.    pub async fn new_client(&self) -> Sse<ChannelStream> {        println!("starting creation");        let (tx, rx) = sse::channel(10);        tx.send(sse::Data::new("connected")).await.unwrap();        println!("creating new clients success {:?}",tx);        self.inner.lock().clients.push(tx);        rx    }    /// Broadcasts `msg` to all clients.    pub async fn broadcast(&self, msg: &str) {        let clients = self.inner.lock().clients.clone();        let send_futures = clients            .iter()            .map(|client| client.send(sse::Data::new(msg)));        // try to send to all clients, ignoring failures        // disconnected clients will get swept up by `remove_stale_clients`        let _ = future::join_all(send_futures).await;    }}
  • Creating a route for the sse stream and broadcast message

#[actix_web::main]async fn main() -> std::io::Result<()> {    let broadcaster = Broadcaster::create();    HttpServer::new(move || {        App::new()            .app_data(web::Data::new(AppState {                broadcaster: Arc::clone(&broadcaster)            }))            // This route is used to listen events/ sse events            .route("/events{_:/?}", web::get().to(sse_client))            // This route will create notification            .route("/events/{msg}", web::get().to(broadcast_msg))    })    .bind(format!("{}:{}","127.0.0.1", "8000"))?    .run()    .await}
Enter fullscreen modeExit fullscreen mode
  • Let's implement the sse_client and broadcast_msg

    • sse_client

      pub async fn sse_client(state: web::Data<AppState>) -> impl Responder {    state.broadcaster.new_client().await}
    • broadcast_msg

      pub async fn broadcast_msg(    state: web::Data<AppState>,    Path((msg,)): Path<(String,)>,) -> impl Responder {    state.broadcaster.broadcast(&msg).await;    HttpResponse::Ok().body("msg sent")}
    • Completemain.rs

      use actix_web::HttpResponse;use actix_web::Responder;use actix_web::{web, App, HttpServer};mod broadcast;use self::broadcast::Broadcaster;use std::{io, sync::Arc};use actix_web_lab::extract::Path;pub struct  AppState{    broadcaster:Arc<Broadcaster>}// SSEpub async fn sse_client(state: web::Data<AppState>) -> impl Responder {    state.broadcaster.new_client().await}pub async fn broadcast_msg(    state: web::Data<AppState>,    Path((msg,)): Path<(String,)>,) -> impl Responder {    state.broadcaster.broadcast(&msg).await;    HttpResponse::Ok().body("msg sent")}#[actix_web::main]async fn main() -> std::io::Result<()> {    let broadcaster = Broadcaster::create();    HttpServer::new(move || {        App::new()            .app_data(web::Data::new(AppState {                broadcaster: Arc::clone(&broadcaster)            }))            // This route is used to listen to events/ sse events            .route("/events{_:/?}", web::get().to(sse_client))            // This route will create a notification            .route("/events/{msg}", web::get().to(broadcast_msg))    })    .bind(format!("{}:{}","127.0.0.1", "8000"))?    .run()    .await}
  • Testing the SSE

    • Open the /events to subscribe to the event stream
    • Then hit the /events/ to send a message to the clients.

SSE Texting

Hurray! we have finally implemented the Server-Sent Events.

Feel free to ask queries and make pull request for changes and suggestion in GitHub.

Source CodeGitHub

Happy Hacking
Rustaceans!

Top comments(8)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss
CollapseExpand
 
hadius profile image
Dương
Gopher, Rustacean, and Frontend enthusiasm
  • Joined
• Edited on• Edited

How to send event to a specific user. For example I have a user A who is interested about their related message, user A isn't care about user B message.
I have think about 2 approach,

  1. Having the endpoint /sse/:userId. Then each user will have only 1 broadcaster.
  2. Having different event for user A and user B so that each user only listen about their related event.

But I think that 1 is not scalable since it will create more connection. But I am worry about data safety with the second approach because it will broadcast every event to each user. Do you have any better approach? Please clarify for me

CollapseExpand
 
chaudharypraveen98 profile image
Praveen Chaudhary
SSE at Infoedge | Prev - SDE 2 - Sigmoid, SDE at India Today| Rust🦀 and Python🐍 enthusiast | Full stack developer | API & App developer| Performance & Acceptance Tester
  • Location
    Faridabad, Haryana, India
  • Education
    Guru Gobind Singh Indraprastha University
  • Pronouns
    He/Him
  • Work
    SSE at InfoEdge
  • Joined

You don't need to handle different events of A and B separately(Point 2). Your first approach works fine.

You can manage event updates for different users depending on the :userId variable at server end in the function handling the broadcast message.

You have a single client and a single broadcaster. That's not the problem as client closes the connection then updates will be dropped. So memory usage will be reduced.

for performance you can refertimeplus.com/post/websocket-vs-sse

CollapseExpand
 
leefordjudes profile image
leefordjudes
  • Joined

the above example is nice, can you please explain the following.
1.how to run broadcaster and sse client separately
2.how to run multiple sse client and one server
3.how to run multiple broadcaster and one client
4.how to run multiple broadcaster and multiple sse client

CollapseExpand
 
chaudharypraveen98 profile image
Praveen Chaudhary
SSE at Infoedge | Prev - SDE 2 - Sigmoid, SDE at India Today| Rust🦀 and Python🐍 enthusiast | Full stack developer | API & App developer| Performance & Acceptance Tester
  • Location
    Faridabad, Haryana, India
  • Education
    Guru Gobind Singh Indraprastha University
  • Pronouns
    He/Him
  • Work
    SSE at InfoEdge
  • Joined

1.how to run broadcaster and sse client separately

  • I think you are taking the things wrongly. I might be wrong while understanding, actually sse consist of client and server but they are not two different thing but part of system. Client subscribe to the broadcaster and broadcaster depending on the updates or anything broadcast message to client. You can relate the things with some news letter subscription. In the above example we are sending the message to the broadcaster but that part can be anything like some db change , internal or external api request (means the same what we are doing internally at "/events/{msg}" this route)

2.how to run multiple sse client and one server

  • The example given is already single broadcaster and multiple clients. Once deploy you connect as many clients as the server configuration can handle. One thing keep to mind is that one browser(one client) can handle up-to 6-10 simultaneous connectiondeveloper.mozilla.org/en-US/docs/W...

3.how to run multiple broadcaster and one client

  • That's not the ideal case for sse. You can use the HTTP request. sse is just like one to many, broadcaster to subscribers.

4.how to run multiple broadcaster and multiple sse client

  • You can as many as broadcaster you want ,which will distribute the load/clients among different servers but make sure that they listen to common update. You will find the issue of sync the data between the different broadcaster. This problem is solved by same players like google pub sub but for general use case sse works fine.

Feel free to ask again.

CollapseExpand
 
Sloan, the sloth mascot
Comment deleted
CollapseExpand
 
chaudharypraveen98 profile image
Praveen Chaudhary
SSE at Infoedge | Prev - SDE 2 - Sigmoid, SDE at India Today| Rust🦀 and Python🐍 enthusiast | Full stack developer | API & App developer| Performance & Acceptance Tester
  • Location
    Faridabad, Haryana, India
  • Education
    Guru Gobind Singh Indraprastha University
  • Pronouns
    He/Him
  • Work
    SSE at InfoEdge
  • Joined

Another alternative is web sockets, which uses the client (browser) so not usable for your use case,

but you can use the pub-sub(altexsoft.com/blog/event-driven-ar...), it uses service to service connection or long polling with connection alive.

Give a read to pub sub, if it solves your use case or please elaborate the use case, i will try to get you a solution.

CollapseExpand
 
crajcan profile image
crajcan
  • Joined

I'm more interested in how I would subscribe to your example server using a client written in Rust (or whatever) instead of the browser.

Thread Thread
 
chaudharypraveen98 profile image
Praveen Chaudhary
SSE at Infoedge | Prev - SDE 2 - Sigmoid, SDE at India Today| Rust🦀 and Python🐍 enthusiast | Full stack developer | API & App developer| Performance & Acceptance Tester
  • Location
    Faridabad, Haryana, India
  • Education
    Guru Gobind Singh Indraprastha University
  • Pronouns
    He/Him
  • Work
    SSE at InfoEdge
  • Joined

You can easily use pub sub to subscribe(from rust client) to the publisher. Read more heredocs.rs/pub-sub/latest/pub_sub/

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

SSE at Infoedge | Prev - SDE 2 - Sigmoid, SDE at India Today| Rust🦀 and Python🐍 enthusiast | Full stack developer | API & App developer| Performance & Acceptance Tester
  • Location
    Faridabad, Haryana, India
  • Education
    Guru Gobind Singh Indraprastha University
  • Pronouns
    He/Him
  • Work
    SSE at InfoEdge
  • Joined

More fromPraveen Chaudhary

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp