Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == ⚡ 💪 ❤️

License

NotificationsYou must be signed in to change notification settings

permitio/fastapi_websocket_pubsub

Repository files navigation

pubsub

⚡🗞️ FastAPI Websocket Pub/Sub

TestsPackageDownloads

A fast and durable Pub/Sub channel over Websockets.The easiest way to create a live publish / subscribe multi-cast over the web.

Supports and tested on Python >= 3.7

As seen atPyCon IL 2021 andEuroPython 2021

Installation 🛠️

pip install fastapi_websocket_pubsub

Intro

The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).

FastAPI + WebSockets + PubSub == ⚡💪 ❤️

  • Subscribe

    • Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
      # Callback to be called upon event being published on serverasyncdefon_event(data):print("We got an event! with data- ",data)# Subscribe for the eventclient.subscribe("my event",on_event)
  • Publish

    • Directly from server code to connected clients.
      app=FastAPI()endpoint=PubSubEndpoint()endpoint.register_route(app,path="/pubsub")endpoint.publish(["my_event_topic"],data=["my","data",1])
    • From client to client (through the servers)
      asyncwithPubSubClient(server_uri="ws://localhost/pubsub")asclient:endpoint.publish(["my_event_topic"],data=["my","data",1])
    • Across server instances (usingbroadcaster and a backend medium (e.g. Redis, Kafka, ...))
      • No matter which server a client connects to - it will get the messages it subscribes to
      app=FastAPI()endpoint=PubSubEndpoint(broadcaster="postgres://localhost:5432/")@app.websocket("/pubsub")asyncdefwebsocket_rpc_endpoint(websocket:WebSocket):awaitendpoint.main_loop(websocket)
      seeexamples/pubsub_broadcaster_server_example.py for full usage exampleNote: Requires install with dependencies e.g.pip install fastapi_websocket_pubsub[redis],pip install fastapi_websocket_pubsub[postgres],pip install fastapi_websocket_pubsub[kafka] orpip install fastapi_websocket_pubsub[all] to get support for all three backends

Usage example (server publishing following HTTP trigger):

In the code below, a client connects to the server and subscribes to a topic named "triggered".Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.

Server:

importasyncioimportuvicornfromfastapiimportFastAPIfromfastapi.routingimportAPIRouterfromfastapi_websocket_pubsubimportPubSubEndpointapp=FastAPI()# Init endpointendpoint=PubSubEndpoint()# register the endpoint on the appendpoint.register_route(app,"/pubsub")# Register a regular HTTP route@app.get("/trigger")asyncdeftrigger_events():# Upon request trigger an eventendpoint.publish(["triggered"])

Client:

fromfastapi_websocket_pubsubimportPubSubClient# Callback to be called upon event being published on serverasyncdefon_trigger(data):print("Trigger URL was accessed")asyncwithPubSubClient(server_uri="ws://localhost/pubsub")asclient:# Subscribe for the eventclient.subscribe("triggered",on_trigger)

More Examples

What can I do with this?

The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.

  • Update mechanism
  • Remote control mechanism
  • Data processing
  • Distributed computing
  • Realtime communications over the web

Foundations:

  • Based onfastapi-websocket-rpc for a robust realtime bidirectional channel

  • Based onbroadcaster for syncing server instances

  • Server Endpoint:

    • Based onFastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)

    • Based onPydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.

  • Client :

    • Based onTenacity: allowing configurable retries to keep to connection alive

      • see WebSocketRpcClient.init's retry_config
    • Based on pythonwebsockets - a more comprehensive client than the one offered by FastAPI

Logging

fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config.It provides a helper logging module to control how it produces logs for you.Seefastapi_websocket_rpc/logger.py.Uselogging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer.Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'

example:

# set RPC to log like UVICORNfromfastapi_websocket_rpc.loggerimportlogging_config,LoggingModeslogging_config.set_mode(LoggingModes.UVICORN)

Pull requests - welcome!

  • Please include tests for new features

About

A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == ⚡ 💪 ❤️

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors12


[8]ページ先頭

©2009-2026 Movatter.jp