Skip to content

Integrating WebSockets with FastAPI

FastAPI offers excellent built-in support for WebSockets, allowing you to build real-time features easily alongside your REST APIs.

This guide covers the basics of setting up WebSocket endpoints, handling connections, sending/receiving messages, and best practices.


πŸš€ 1. Setting Up a WebSocket Endpoint

Use FastAPI’s WebSocket class and the websocket route decorator.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message received: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

πŸ”„ 2. Receiving and Sending Messages

  • Use await websocket.receive_text() or await websocket.receive_json() to receive.
  • Use await websocket.send_text() or await websocket.send_json() to send.
data = await websocket.receive_json()
await websocket.send_json({"type": "response", "data": data})

❌ 3. Handling Disconnects

Catch WebSocketDisconnect exception to clean up or notify.

from fastapi import WebSocketDisconnect

try:
    while True:
        message = await websocket.receive_text()
        # process message
except WebSocketDisconnect:
    print("Client disconnected")

πŸ” 4. Authentication

You can authenticate WebSocket connections by:

  • Passing tokens as query parameters.
  • Sending an auth message after connection.
Example
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = None):
    await websocket.accept()
    if token != "expected_token":
        await websocket.close(code=1008)
        return
    # Continue handling connection

πŸ“¦ 5. Managing Multiple Connections

Use a connection manager to keep track of active clients, broadcast messages, or send to specific users.

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Client says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)

⚠️ 6. Best Practices

  • Always accept connections before receiving.
  • Handle disconnects cleanly.
  • Authenticate early and close unauthorized connections.
  • Use background tasks or queues for heavy processing.
  • Use try/except to prevent unhandled exceptions.

πŸ”š Next Steps