Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

A library to implement websocket for distibuted system based on FastAPI.

License

NotificationsYou must be signed in to change notification settings

DontPanicO/fastapi-distributed-websocket

Repository files navigation

Python 3.11License: MITpypi 0.2.0

FastAPI Distributed Websocket

A library to implement websocket for distibuted systems based on FastAPI.

N.B.: This library is still at an early stage, use it in production at your own risk.

What it does

The main features of this libarary are:

  • Easly implementing broadcasting, pub/sub, chat rooms, etc...
  • Proxy websocket connections to other servers (e.g. from an api gateway)
  • Authentication
  • Clean exception handling
  • An in memory broker for fast development

Problems of scaling websocket among multiple servers in production

Websocket is a relatively new protocol for real time communication over HTTP.It establishes a durable, stateful, full-duplex connection between clients and the server.It can be used to implement chats, real time notifications, broadcasting andpub/sub models.

Connections from clients

HTTP request/response mechanism fits very well to scale among multiple serverinstances in production. Any time a client makes a request, it can connectto any server instance and it's going to receive the same response. Afterthe response has been returned to the client, it went disconnected and it canmake another request without the need to hit the same instace as before.This thanks to the stateless nature of HTTP.

However, Websocket establishes a stateful connection between the client and theserver and, if some error occurs and the connection went lost, we have toensure that clients are going to hit the same server instance they were connectedbefore, since that instance was managing the connection state.

Stateful means that there is a state that can be manipulated. In particular,a stateful connection is a connection that heavily relies on its state inorder to work

Broadcasting and group messages

Another problem of scaling Websocket occurs when we need to send messages tomultiple connected clients (i.e. broadcasting a message or sending a message toall clients subscribed to a specific topic).

Imagine that we have a chat server, and that when an user send a message in aspecific chat room, we broadcast it to all users subscribed to that room.If we have a single server instance, all connection are managed by this instanceso we can safely trust that the message will be delivered to all recipents.On the other hand, with multiple server instances, users subscribing to a chatroom will probably connect to different instances. This way, if an user send amessage to the chat room'xyz' at the serverA, users subscribed to the samechat room at the serverB are not receiving it.

Documenting Websocket interfaces

Another common problem with Websocket, that's not even related to scaling, isabout documentation. Due to the event driven nature of the Websocket protocolit does not fit well to be documented withopenapi.However a new specification for asynchronous, event driven interfaces has beendefined recently. The spec name isasyncapi and I'mcurrently studying it. I don't know if this has to be implemented here or it'sbetter having a separate library for that, however this is surely somethingwe have to look at.

Other problems

When I came first to think about this library, I started making a lot of researchof common problems related to Websocket on stackoverflow, reddit, github issues andso on. I found some interesting resource that are however related to the implementationitself. I picked up best solutions and elaborated my owns converging all of that inthis library.

Examples

Installation

$ pip install fastapi-distributed-websocket

Basic usage

This is a basic example using an in memory broker with a single server instance.

fromfastapiimportFastAPI,WebSocket,WebSocketDisconnect,statusfromdistributed_websocketimportConnection,WebSocketManagerapp=FastAPI()manager=WebSocketManager('channel:1',broker_url='memory://')...app.on_event('startup')asyncdefstartup()->None:    ...awaitmanager.startup()app.on_event('shutdown')asyncdefshutdown()->None:    ...awaitmanager.shutdown()@app.websocket('/ws/{conn_id}')asyncdefwebsocket_endpoint(ws:WebSocket,conn_id:str,*,topic:Optional[Any]=None,)->None:connection:Connection=awaitmanager.new_connection(ws,conn_id)try:whileTrue:msg=awaitconnection.receive_json()awaitmanager.broadcast(msg)exceptWebSocketDisconnect:awaitmanager.remove_connection(connection)

Themanger.new_connection method create a new Connection object and add it tothemanager.active_connections list. Note that after aWebSocketDisconnectis raised, we callremove_connection: this method only remove the connectionobject from themanager.active_connections list, without callingconnection.close, sincethe connection is already closed.If you need to close a connection at any other time, you can usemanager.close_connection.If you useconnection.iter_json, it already handles theWebSocketDisconnect exception, soyou can simply callmanager.remove_connection just after the loop (see next code block).

Note that here we are usingmanager.broadcast to send the message to all connections managedby the WebSocketManager instance. However, this method only work if we have a single serverinstance. If we have multiple server instances, we have to usemanager.receive, to properlysend the message to the broker.

@app.websocket('/ws/{conn_id}')asyncdefwebsocket_endpoint(ws:WebSocket,conn_id:str,*,topic:Optional[Any]=None,)->None:connection:Connection=awaitmanager.new_connection(ws,conn_id)# This is the preferred way of handling WebSocketDisconnectasyncformsginconnection.iter_json():awaitmanager.receive(connection,msg)awaitmanager.remove_connection(connection)

Proxy from an API gateway

Let's say we are developing a chat service and that all our services are behindan API gateway. If we want to keep our websocket service behind it too, thenfastapi-distributed-websocket provides us withWebSocketProxy.

fromdistributed_websocketimportWebSocketProxy# skipped other imports for brevityapp=FastAPI()WS_TARGET_ENDPOINT='ws://websocket_service:8000/wshandler'@app.websocket('/ws')asyncdefwebsocket_proxy(websocket:WebSocket):awaitwebsocket.accept()ws_proxy=WebSocketProxy(websocket,WS_TARGET_ENDPOINT)awaitws_proxy()

This will forward all messages from the client to the target endpoint andall messages from the target endpoint to the client.

Now let's assume that our websocket service code is the code of our previousexample. Our API Gateway code will be:

fromdistributed_websocketimportWebSocketProxy# skipped other imports for brevityapp=FastAPI()WS_TARGET_ENDPOINT='ws://websocket_service:8000/ws/{}'@app.websocket('/ws/{conn_id}')asyncdefwebsocket_endpoint(ws:WebSocket,conn_id:str,)->None:awaitwebsocket.accept()ws_proxy=WebSocketProxy(websocket,WS_TARGET_ENDPOINT.format(conn_id))awaitws_proxy()

API Reference

Connection

Connection objects wrap the websocket connection and provide a simple interfaceto send and receive messages. They have atopics attribute to store subscriptionspatterns and implement pub/sub models.

  • async accept(self) -> None
    Accept the connection.
  • async close(self, code: int = 1000) -> None
    Close the connection with the specified status.
  • async receive_json(self) -> Any
    Receive a JSON message.
  • async send_json(self, data: Any) -> None
    Send a JSON message over the connection.
  • async iter_json(self) -> AsyncIterator[Any]
    Iterate over the messages received over the connection.

Messages

Message objects store the message type, the topic and the data. They providesan easy serialization/deserialization mechanism.Remeber that messages returned byconnection.iter_json are already deserializedintodict objects, so here we calldeserialization the process of convertingadict object into aMessage object.

  • type: str
    The message type.

  • topic: str
    The message topic.

  • conn_id: str | list[str]
    The connection id or list of connection ids that the message should be sent to.

  • data: Any
    The message data.

  • classmethodfrom_client_message(cls, *, data: Any) -> Message
    Create a message from a client message.

  • __serialize__(self) -> dict
    Serialize the message into adict object.

Subscriptions

You can bind topics to connection objects to implement pub/sub models, notification and so on.Thetopics attribute is a set of strings that follows the pattern matching syntax of MQTT.This library share connection objects state between server instances, so you may findreferences to terms likechannel,publish,subscribe andunsubscribe referring tothe same concepts but applied to the underlying server/broker communication.
This may be confusing, but remember to keep separated the communication between the serverand the clients, that you are developing and the communication between the server and the broker,that you usually don't deal with.

  • subscribe(connection: Connection, message: Message) -> None
    Subscribe a connection tomessage.topic.
  • unsubscribe(connection: Connection, message: Message) -> None
    Unsubscribe a connection frommessage.topic.
  • hanlde_subscription_message(connection: Connection, message: Message) -> None
    Callssubscribe orunsubscribe depending on the message type.
  • matches(topic: str, patterns: set[str]) -> bool
    Check iftopic matches any of the patterns inpatterns.

Authentication

Authentication is provided with theWebSocketOAuth2PasswordBearer class.It inherits fromFastAPIOAuth2PasswordBearer and overrides__call__ method to acceptaWebSocket object.

  • async __call__(self, websocket: WebSocket) -> str | None
    Authenticate the websocket connection and return theAuthorization header value.
    If the authentication fails, returnNone if the objects has been initialized withauto_error=False
    or close the connection with theWS_1008_POLICY_VIOLATION code.

Exceptions and Exception Handling

fastapi-distributed-websocket provides exception handling via decorators. You can use theapposite decorators passing an exception class and a handler callable. Exception handlersshould accept only the exception object as argument.
Why this is useful?
Because sometimes the same type of exception can be raised by different parts of the application,this way you can decorate the higer level function in the call stack to handle the exception atany level.
A baseWebSocketException class is provided to bind connection objects to the exception, soyour handler function can easily access it.If you need to access connection objects from the exception handler, your custom exceptionsshould inherit fromWebSocketException, no matter if they are really network related or not.

  • WebSocketException(self, message: str, *, connection: Connection) -> None

  • InvalidSubscription(self, message: str, *, connection: Connection) -> None
    Raised when a subscription pattern use an invalid syntax. Inherits fromWebSocketException.

  • InvalidSubscriptionMessage(self, message: str, *, connection: Connection) -> None
    LikeInvalidSubscription it could be raised for bad syntax, but it could also be raised
    when the message type is notsubscribe orunsubscribe. Inherits fromWebSocketException.

  • handle(exc: BaseException, handler: Callable[..., Any]) -> Callable[..., Any]
    Decorator to handle exceptions. If you decorate a function with this decorator, at any time
    an exception of typeexc is raised or propagated to the function, it will be handled byhandler.
    Use this decorator only if both your handler and the function are not async.

  • async ahandle( exc: BaseException, handler: Callable[..., Coroutine[Any, Any, Any]] ) -> Callable[..., Any]
    Decorator to handle exceptions, same adhandle, but the handler is a coroutine function.
    Use this if your handler is a coroutine function, while the decorated function could be
    either a sync or an async function.

Broker Interfaces

Connections' state is shared between server instances using a pub/sub broker. By default,the broker is areids.asyncio.Redis instance (exaioredis.Redis), but you can use anyother implementation.fastapi-distributed-websocket provides anInMemoryBroker classfor development purposes.You can inherit fromBrokerInterface and override the methods to implement your own broker.

  • async connect(self) -> Coroutine[Any, Any, None]
    Connect to the broker.
  • async disconnect(self) -> Coroutine[Any, Any, None]
    Disconnect from the broker.
  • async subscribe(self, channel: str) -> Coroutine[Any, Any, None]
    Subscribe to a channel.
  • async unsubscribe(self, channel: str) -> Coroutine[Any, Any, None]
    Unsubscribe from a channel.
  • async publish(self, channel: str, message: Any) -> Coroutine[Any, Any, None]
    Publish a message to a channel.
  • async get_message(self, **kwargs) -> Coroutine[Any, Any, Message | None]
    Get a message from the broker.

WebSocketManager

TheWebSocketManager class is where the main logic of the library is implemented.
It keeps track of the connection objects and starts the broker connection.It spawn a main task, a listener that wait (non-blocking) for messages from the broker,and send them to the connection objects (broadcasting or checking for subscriptions)spawning a new task for each send.
The broker initialisation is done in the constructor while calls tobroker.connect andbroker.disconnect are handled in thestartup andshutdown methods.

  • async new_connection( self, websocket: WebSocket, conn_id: str, topic: str | None = None ) -> Coroutine[Any, Any, Connection]
    Create a new connection object, add it toself.active_connections and return it.
  • async close_connection( self, connection: Connection, code: int = status.WS_1000_NORMAL_CLOSURE ) -> Coroutine[Any, Any, None]
    Close a connection object and remove it fromself.active_connections.
  • remove_connection(self, connection: Connection) -> None
    Remove a connection object fromself.active_connections.
  • set_conn_id(self, connection: Connection, conn_id: str) -> None
    Set the connection id and notify the client.
  • send(self, message: Message) -> None
    Send a message to all the connection objects subscribed tomessage.topic.
    It spawns a new task wrapping the coroutine resulting fromself._send.
  • broadcast(self, message: Message) -> None
    Send a message to all the connection objects.
    It spawns a new task wrapping the coroutine resulting fromself._broadcast.
  • send_by_conn_id(self, message: Message) -> None
    Send a message to all the connection objects withid equals tomessage.conn_id.
    It spawns a new task wrapping the coroutine resulting fromself._send_by_conn_id
    ifconn_id is a string or from_send_multi_by_conn_id if it is a list.
  • send_msg(self, message: Message) -> None
    Based on the message type, it callssend,send_by_conn_id orbroadcast.
  • async receive( self, connection: Connection, message: Any ) -> Coroutine[Any, Any, None]
    Receive a message from a connection object. It passes the message down to
    a private method that handle eventual subscriptions and then publish the message
    to the broker.
  • async startup(self) -> Coroutine[Any, Any, None]
    Start the broker connection and the listener task.
  • async shutdown(self) -> Coroutine[Any, Any, None]
    Close the broker connection and the listener task.
    It also takes care to cancel all the tasks spawned bysend_msg and
    close all the connection objects before.

WebSocketProxy

TheWebSocketProxy class initialise callable objects that can beused to start proxyng websocket messages from client to a server and viceversa.It's initialised with a two parameters:

  • client: aWebSocket object
  • server_endpoint: astr containing the endpoint of the server

Notice that the target server could be a remote server or the same server that starts the proxy.

  • async __call__(self) -> Coroutine[Any, Any, None]
    Start a websocket connection toserver_endpoint and spawn two tasks:
    one that forwards the messages from the client to the target and the other that
    forwards the messages from the target to the client.

About

A library to implement websocket for distibuted system based on FastAPI.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors3

  •  
  •  
  •  

[8]ページ先頭

©2009-2025 Movatter.jp