A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.
Supports and tested on Python >= 3.7
As seen at PyCon IL 2021
pip install fastapi_websocket_pubsub
The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).
FastAPI + WebSockets + PubSub == ⚡💪 ❤️
# Callback to be called upon event being published on server async def on_event(data): print("We got an event! with data- ", data) # Subscribe for the event client.subscribe("my event", on_event)
app = FastAPI() endpoint = PubSubEndpoint() endpoint.register_route(app, "/pubsub") endpoint.publish(["my_event_topic"], data=["my", "data", 1])
async with PubSubClient(server_uri="ws://localhost/pubsub") as client: endpoint.publish(["my_event_topic"], data=["my", "data", 1])
No matter which server a client connects to - it will get the messages it subscribes to
app = FastAPI() endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/") async def websocket_rpc_endpoint(websocket: WebSocket): async with endpoint.broadcaster: await endpoint.main_loop(websocket)
see examples/pubsub_broadcaster_server_example.py for full usage example
In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.
import asyncio import uvicorn from fastapi import FastAPI from fastapi.routing import APIRouter from fastapi_websocket_pubsub import PubSubEndpoint app = FastAPI() # Init endpoint endpoint = PubSubEndpoint() # register the endpoint on the app endpoint.register_route(app, "/pubsub") # Register a regular HTTP route async def trigger_events(): # Upon request trigger an event endpoint.publish(["triggered"])
from fastapi_websocket_pubsub import PubSubClient # Callback to be called upon event being published on server async def on_trigger(data): print("Trigger URL was accessed") async with PubSubClient(server_uri="ws://localhost/pubsub") as client: # Subscribe for the event client.subscribe("triggered", on_event)
The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.
Based on fastapi-websocket-rpc for a robust realtime bidirectional channel
Based on broadcaster for syncing server instances
Based on FastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)
Based on Pydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.
fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config.
It provides a helper logging module to control how it produces logs for you.
logging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer.
Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'
# set RPC to log like UVICORN from fastapi_websocket_rpc.logger import logging_config, LoggingModes logging_config.set_mode(LoggingModes.UVICORN)