Skip to content

WebSockets

WebSocket API endpoints for the Vectra backend.

This module provides the WebSocket endpoint for both frontend clients and the Python worker, handling real-time communication and status updates.

ClientType

Bases: StrEnum

Enumeration for different types of WebSocket clients.

Attributes:

  • CLIENT

    Represents a frontend client (e.g., React Dashboard).

  • WORKER

    Represents the backend Python worker.

Source code in app/api/v1/ws.py
25
26
27
28
29
30
31
32
33
34
35
class ClientType(StrEnum):
    """
    Enumeration for different types of WebSocket clients.

    Attributes:
        CLIENT: Represents a frontend client (e.g., React Dashboard).
        WORKER: Represents the backend Python worker.
    """

    CLIENT = "client"
    WORKER = "worker"

websocket_endpoint(websocket, manager, client_type=Query(ClientType.CLIENT), token=Query(default=None)) async

WebSocket Endpoint handling both Frontend clients and the Python Worker.

This endpoint manages the lifecycle of WebSocket connections, including handshake, security checks, message processing, and disconnection. Workers must provide valid authentication using a secret token.

Protocol: - Frontend clients connect with client_type=client. - Workers connect with client_type=worker and must provide x-worker-secret header or a token query parameter matching settings.WORKER_SECRET.

Parameters:

  • websocket (WebSocket) –

    The WebSocket connection instance.

  • manager (Annotated[ConnectionManager, Depends(get_connection_manager)]) –

    Singleton manager for handling active connections.

  • client_type (ClientType, default: Query(CLIENT) ) –

    Type of the connecting client (client or worker). Defaults to ClientType.CLIENT.

  • token (Optional[str], default: Query(default=None) ) –

    Optional authentication token (used if x-worker-secret header is missing).

Returns:

  • None

    None

Raises:

  • WebSocketDisconnect

    When the client disconnects normally.

Source code in app/api/v1/ws.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@router.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    manager: Annotated[ConnectionManager, Depends(get_connection_manager)],
    client_type: ClientType = Query(ClientType.CLIENT),
    token: Optional[str] = Query(default=None),
) -> None:
    """
    WebSocket Endpoint handling both Frontend clients and the Python Worker.

    This endpoint manages the lifecycle of WebSocket connections, including
    handshake, security checks, message processing, and disconnection.
    Workers must provide valid authentication using a secret token.

    Protocol:
    - Frontend clients connect with `client_type=client`.
    - Workers connect with `client_type=worker` and must provide `x-worker-secret`
      header or a `token` query parameter matching `settings.WORKER_SECRET`.

    Args:
        websocket: The WebSocket connection instance.
        manager: Singleton manager for handling active connections.
        client_type: Type of the connecting client (client or worker).
            Defaults to ClientType.CLIENT.
        token: Optional authentication token (used if x-worker-secret header is missing).

    Returns:
        None

    Raises:
        WebSocketDisconnect: When the client disconnects normally.
    """
    func_name: str = "websocket_endpoint"

    logger.debug(f"CONNECT | {func_name} | Incoming connection [Type: {client_type}]")

    # 1. Connection Phase & Authentication
    try:
        if client_type == ClientType.WORKER:
            # Extract secret from headers or query param
            secret: Optional[str] = websocket.headers.get("x-worker-secret")

            if not secret and token:
                secret = token

            # Securely compare secrets to prevent timing attacks
            expected_secret = str(settings.WORKER_SECRET)
            if not secret or not secrets.compare_digest(secret, expected_secret):
                logger.warning(
                    f"SECURITY | {func_name} | Unauthorized Worker Attempt. "
                    f"Received: {secret!r} | Expected: [REDACTED]"
                )
                await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
                return

        # Accept connection and add to manager
        await manager.connect(websocket)

        conn_id: str = str(getattr(websocket, "conn_id", "unknown"))
        if client_type == ClientType.WORKER:
            logger.info(f"START | {func_name} | Secure Worker Connected [ID: {conn_id}]")
            await manager.register_worker(websocket)
        else:
            logger.debug(f"START | {func_name} | Client Connected [ID: {conn_id}]")

    except Exception as e:
        logger.error(f"FAIL | {func_name} | Connection refused or failed during setup | Error: {e}", exc_info=True)
        return

    # 2. Communication Loop
    try:
        while True:
            # Wait for text messages
            data: str = await websocket.receive_text()

            # Heartbeat check
            if data == "ping":
                await websocket.send_text("pong")
                continue

            # Status request
            if data == "get_worker_status":
                await manager.emit_worker_status(manager.is_worker_online)
                continue

            # Worker Broadcast: Re-broadcast worker messages to all frontend clients
            if client_type == ClientType.WORKER:
                try:
                    payload: Any = json.loads(data)
                    if isinstance(payload, dict):
                        await manager.broadcast(payload)
                    else:
                        logger.warning(f"Context | {func_name} | Msg: Worker sent non-dictionary JSON: {type(payload)}")
                except json.JSONDecodeError as e:
                    logger.warning(f"Context | {func_name} | Msg: Invalid JSON from worker | Error: {e!r}")
                except Exception as e:
                    logger.error(
                        f"FAIL | {func_name} | Worker broadcast error | Error: {e}",
                        exc_info=True,
                    )

    except WebSocketDisconnect as e:
        logger.info(f"FINISH | {func_name} | {client_type} Disconnected [Code: {e.code}]")
    except Exception as e:
        logger.error(f"FAIL | {func_name} | Unexpected error in WS loop | Error: {e!r}", exc_info=True)
    finally:
        # 3. Cleanup Phase
        try:
            await manager.disconnect(websocket)
        except Exception as e:
            logger.debug(f"Cleanup | {func_name} | Disconnect failed (likely already closed) | Error: {e}")