Skip to content

Notifications

MessageResponse

Bases: BaseModel

Generic message response.

Source code in app/api/v1/endpoints/notifications.py
21
22
23
24
25
class MessageResponse(BaseModel):
    """Generic message response."""

    message: str
    success: bool = True

NotificationCreateRequest

Bases: NotificationBase

Schema for creating a notification through the API.

Source code in app/api/v1/endpoints/notifications.py
28
29
30
31
32
33
34
35
class NotificationCreateRequest(NotificationBase):
    """
    Schema for creating a notification through the API.
    """

    target_user_id: Optional[UUID] = Field(
        default=None, description="The UUID of the user to notify (defaults to self if not provided)"
    )

clear_all_notifications(service, current_user) async

Clear all notifications (Admin only).

Source code in app/api/v1/endpoints/notifications.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@router.delete("/", response_model=MessageResponse)
async def clear_all_notifications(
    service: Annotated[NotificationService, Depends(get_notification_service)],
    current_user: Annotated[User, Depends(get_current_admin)],
) -> MessageResponse:
    """
    Clear all notifications (Admin only).
    """
    try:
        # Pass user context and confirmation
        await service.clear_all_notifications(user_id=current_user.id, user_confirmation=True)
        return MessageResponse(message="All notifications cleared")
    except Exception as e:
        logger.error(f"❌ FAIL | clear_all_notifications | Error: {str(e)}", exc_info=True)
        raise TechnicalError(f"Failed to clear all notifications: {e}")

create_notification(notification_in, service, current_user) async

Create a notification (Admin only).

If target_user_id is provided, the notification will be created for that user. Otherwise, it will be created for the admin currently authenticated.

Source code in app/api/v1/endpoints/notifications.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@router.post("/", response_model=NotificationResponse)
async def create_notification(
    notification_in: NotificationCreateRequest,
    service: Annotated[NotificationService, Depends(get_notification_service)],
    current_user: Annotated[User, Depends(get_current_admin)],
) -> NotificationResponse:
    """
    Create a notification (Admin only).

    If target_user_id is provided, the notification will be created for that user.
    Otherwise, it will be created for the admin currently authenticated.
    """
    try:
        # P0: Allow admin to notify others
        target_id = notification_in.target_user_id or current_user.id

        notification = NotificationCreate(**notification_in.model_dump(exclude={"target_user_id"}), user_id=target_id)
        return await service.create_notification(notification)
    except Exception as e:
        logger.error(f"❌ FAIL | create_notification | Error: {str(e)}", exc_info=True)
        if isinstance(e, (FunctionalError, TechnicalError)):
            raise
        raise TechnicalError(f"Failed to create notification: {e}")

get_notifications(service, current_user) async

List all notifications for the current user.

Parameters:

  • service (Annotated[NotificationService, Depends(get_notification_service)]) –

    The notification service instance.

  • current_user (Annotated[User, Depends(get_current_user)]) –

    The currently authenticated user.

Returns:

  • List[NotificationResponse]

    List[NotificationResponse]: A list of notifications for the user.

Raises:

  • TechnicalError

    If there's an error fetching notifications.

Source code in app/api/v1/endpoints/notifications.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@router.get("/", response_model=List[NotificationResponse])
async def get_notifications(
    service: Annotated[NotificationService, Depends(get_notification_service)],
    current_user: Annotated[User, Depends(get_current_user)],
) -> List[NotificationResponse]:
    """
    List all notifications for the current user.

    Args:
        service: The notification service instance.
        current_user: The currently authenticated user.

    Returns:
        List[NotificationResponse]: A list of notifications for the user.

    Raises:
        TechnicalError: If there's an error fetching notifications.
    """
    try:
        return await service.get_notifications(user_id=current_user.id)
    except Exception as e:
        logger.error(f"❌ FAIL | get_notifications | Error: {str(e)}", exc_info=True)
        raise TechnicalError(f"Failed to fetch notifications: {e}")

mark_all_notifications_read(service, current_user) async

Mark all notifications as read for the current user.

Source code in app/api/v1/endpoints/notifications.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@router.put("/read", response_model=MessageResponse)
async def mark_all_notifications_read(
    service: Annotated[NotificationService, Depends(get_notification_service)],
    current_user: Annotated[User, Depends(get_current_user)],
) -> MessageResponse:
    """
    Mark all notifications as read for the current user.
    """
    try:
        # Pass user context and confirmation
        await service.mark_all_as_read(user_id=current_user.id, user_confirmation=True)
        return MessageResponse(message="All notifications marked as read")
    except Exception as e:
        logger.error(f"❌ FAIL | mark_all_notifications_read | Error: {str(e)}", exc_info=True)
        raise TechnicalError(f"Failed to mark all notifications as read: {e}")

mark_notification_read(notification_id, service, current_user) async

Mark a specific notification as read.

Source code in app/api/v1/endpoints/notifications.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@router.put("/{notification_id}/read", response_model=NotificationResponse)
async def mark_notification_read(
    notification_id: UUID,
    service: Annotated[NotificationService, Depends(get_notification_service)],
    current_user: Annotated[User, Depends(get_current_user)],
) -> NotificationResponse:
    """
    Mark a specific notification as read.
    """
    try:
        notification = await service.mark_notification_as_read(user_id=current_user.id, notification_id=notification_id)
        if not notification:
            # Should technically be handled by service raising EntityNotFound
            raise EntityNotFound("Notification not found")
        return notification
    except EntityNotFound:
        raise
    except Exception as e:
        logger.error(
            f"❌ FAIL | mark_notification_read | ID: {notification_id} | Error: {str(e)}",
            exc_info=True,
        )
        raise TechnicalError(f"Failed to mark notification as read: {e}")

stream_notifications(request, current_user, manager=Depends(get_websocket)) async

SSE Stream for real-time notifications.

Provides a Server-Sent Events (SSE) stream that delivers real-time notifications to the authenticated user.

Parameters:

  • request (Request) –

    The FastAPI request object.

  • current_user (Annotated[User, Depends(get_current_user)]) –

    The currently authenticated user.

  • manager (Websocket, default: Depends(get_websocket) ) –

    The websocket manager instance.

Returns:

  • StreamingResponse ( StreamingResponse ) –

    A response delivering SSE events.

Source code in app/api/v1/endpoints/notifications.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@router.get("/stream")
async def stream_notifications(
    request: Request,
    current_user: Annotated[User, Depends(get_current_user)],
    manager: Websocket = Depends(get_websocket),
) -> StreamingResponse:
    """
    SSE Stream for real-time notifications.

    Provides a Server-Sent Events (SSE) stream that delivers real-time notifications
    to the authenticated user.

    Args:
        request: The FastAPI request object.
        current_user: The currently authenticated user.
        manager: The websocket manager instance.

    Returns:
        StreamingResponse: A response delivering SSE events.
    """

    async def event_generator() -> AsyncGenerator[str, None]:
        """
        Generates SSE events from the connection manager.

        Yields:
            str: Formatted SSE data strings.
        """
        # "The Architect Way": Use the safe generator that guarantees cleanup and capped queues
        async for message in manager.stream_events():
            if await request.is_disconnected():
                break
            yield f"data: {message}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")