Integrating WebSockets with FastAPI
FastAPI offers excellent built-in support for WebSockets, allowing you to build real-time features easily alongside your REST APIs.
This guide covers the basics of setting up WebSocket endpoints, handling connections, sending/receiving messages, and best practices.
π 1. Setting Up a WebSocket Endpoint
Use FastAPIβs WebSocket
class and the websocket
route decorator.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
except WebSocketDisconnect:
print("Client disconnected")
π 2. Receiving and Sending Messages
- Use
await websocket.receive_text()
orawait websocket.receive_json()
to receive. - Use
await websocket.send_text()
orawait websocket.send_json()
to send.
β 3. Handling Disconnects
Catch WebSocketDisconnect
exception to clean up or notify.
from fastapi import WebSocketDisconnect
try:
while True:
message = await websocket.receive_text()
# process message
except WebSocketDisconnect:
print("Client disconnected")
π 4. Authentication
You can authenticate WebSocket connections by:
- Passing tokens as query parameters.
- Sending an auth message after connection.
Example
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = None):
await websocket.accept()
if token != "expected_token":
await websocket.close(code=1008)
return
# Continue handling connection
π¦ 5. Managing Multiple Connections
Use a connection manager to keep track of active clients, broadcast messages, or send to specific users.
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Client says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
β οΈ 6. Best Practices
- Always accept connections before receiving.
- Handle disconnects cleanly.
- Authenticate early and close unauthorized connections.
- Use background tasks or queues for heavy processing.
- Use
try/except
to prevent unhandled exceptions.
π Next Steps
- Message Format: Use a consistent message envelope.
- Authentication: Secure your WebSocket sessions.
- Error Handling: Gracefully handle errors.