Skip to content

Analytics

Advanced Analytics API Endpoints.

broadcast_analytics_loop(interval_seconds=10) async

Background task that periodically broadcasts advanced analytics stats via WebSocket.

Parameters:

  • interval_seconds (int, default: 10 ) –

    Broadcast interval in seconds (default: 10). Higher than dashboard stats due to query complexity.

Source code in app/api/v1/endpoints/analytics.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def broadcast_analytics_loop(interval_seconds: int = 10) -> None:
    """
    Background task that periodically broadcasts advanced analytics stats via WebSocket.

    Args:
        interval_seconds: Broadcast interval in seconds (default: 10).
            Higher than dashboard stats due to query complexity.
    """
    global _broadcast_running
    _broadcast_running = True

    logger.info("Starting advanced analytics broadcast loop (interval: %ds)", interval_seconds)

    from app.core.connection_manager import manager

    while _broadcast_running:
        try:
            # P0 Fix: Pass the factory, not a session, to support parallel execution inside loop
            factory = get_session_factory()
            # Initialize SettingsService (no DB session needed for advanced analytics as they use defaults/env)
            settings_service = SettingsService(db=None)
            service = AnalyticsService(session_factory=factory, settings_service=settings_service)

            # Use standard defaults for the live view
            stats = await service.get_all_advanced_analytics(
                ttft_hours=24, step_days=7, cache_hours=24, cost_hours=24, trending_limit=10
            )

            # Broadcast via WebSocket
            await manager.emit_advanced_analytics_stats(stats.model_dump(mode="json"))

        except Exception as e:
            logger.error("Error in analytics broadcast: %s", e, exc_info=True)

        # Wait for next interval
        await asyncio.sleep(interval_seconds)

    logger.info("Advanced analytics broadcast loop stopped")

get_advanced_analytics(service, assistant_id=Query(None, description='Filter by assistant ID'), ttft_hours=Query(24, ge=1, le=168, description='TTFT analysis period in hours'), step_days=Query(7, ge=1, le=90, description='Step breakdown period in days'), cache_hours=Query(24, ge=1, le=168, description='Cache metrics period in hours'), cost_hours=Query(24, ge=1, le=168, description='Cost analysis period in hours'), trending_limit=Query(10, ge=1, le=50, description='Number of trending topics')) async

Get comprehensive advanced analytics for the admin dashboard.

Parameters:

  • service (Annotated[AnalyticsService, Depends(get_analytics_service)]) –

    The analytics service instance.

  • assistant_id (Optional[UUID], default: Query(None, description='Filter by assistant ID') ) –

    Optional filter by assistant ID.

  • ttft_hours (int, default: Query(24, ge=1, le=168, description='TTFT analysis period in hours') ) –

    TTFT analysis period in hours.

  • step_days (int, default: Query(7, ge=1, le=90, description='Step breakdown period in days') ) –

    Step breakdown period in days.

  • cache_hours (int, default: Query(24, ge=1, le=168, description='Cache metrics period in hours') ) –

    Cache metrics period in hours.

  • cost_hours (int, default: Query(24, ge=1, le=168, description='Cost analysis period in hours') ) –

    Cost analysis period in hours.

  • trending_limit (int, default: Query(10, ge=1, le=50, description='Number of trending topics') ) –

    Number of trending topics.

Returns:

  • AdvancedAnalyticsResponse ( AdvancedAnalyticsResponse ) –

    A comprehensive analytics response including TTFT percentiles, pipeline step breakdown, cache hit rate, trending topics, topic diversity score, assistant token costs, and document freshness.

Source code in app/api/v1/endpoints/analytics.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@router.get("/advanced", response_model=AdvancedAnalyticsResponse)
async def get_advanced_analytics(
    service: Annotated[AnalyticsService, Depends(get_analytics_service)],
    assistant_id: Optional[UUID] = Query(None, description="Filter by assistant ID"),
    ttft_hours: int = Query(24, ge=1, le=168, description="TTFT analysis period in hours"),
    step_days: int = Query(7, ge=1, le=90, description="Step breakdown period in days"),
    cache_hours: int = Query(24, ge=1, le=168, description="Cache metrics period in hours"),
    cost_hours: int = Query(24, ge=1, le=168, description="Cost analysis period in hours"),
    trending_limit: int = Query(10, ge=1, le=50, description="Number of trending topics"),
) -> AdvancedAnalyticsResponse:
    """
    Get comprehensive advanced analytics for the admin dashboard.

    Args:
        service: The analytics service instance.
        assistant_id: Optional filter by assistant ID.
        ttft_hours: TTFT analysis period in hours.
        step_days: Step breakdown period in days.
        cache_hours: Cache metrics period in hours.
        cost_hours: Cost analysis period in hours.
        trending_limit: Number of trending topics.

    Returns:
        AdvancedAnalyticsResponse: A comprehensive analytics response including
            TTFT percentiles, pipeline step breakdown, cache hit rate, trending topics,
            topic diversity score, assistant token costs, and document freshness.
    """
    return await service.get_all_advanced_analytics(
        ttft_hours=ttft_hours,
        step_days=step_days,
        cache_hours=cache_hours,
        cost_hours=cost_hours,
        trending_limit=trending_limit,
        assistant_id=assistant_id,
    )

get_assistant_costs(service, hours=Query(24, ge=1, le=168, description='Analysis period in hours')) async

Get token costs by assistant.

Parameters:

  • service (Annotated[AnalyticsService, Depends(get_analytics_service)]) –

    The analytics service instance.

  • hours (int, default: Query(24, ge=1, le=168, description='Analysis period in hours') ) –

    Analysis period in hours.

Returns:

  • list[AssistantCost]

    list[AssistantCost]: A list of costs per assistant.

Source code in app/api/v1/endpoints/analytics.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@router.get("/costs", response_model=list[AssistantCost])
async def get_assistant_costs(
    service: Annotated[AnalyticsService, Depends(get_analytics_service)],
    hours: int = Query(24, ge=1, le=168, description="Analysis period in hours"),
) -> list[AssistantCost]:
    """
    Get token costs by assistant.

    Args:
        service: The analytics service instance.
        hours: Analysis period in hours.

    Returns:
        list[AssistantCost]: A list of costs per assistant.
    """
    return await service.get_assistant_costs(hours)

get_document_freshness(service) async

Get knowledge base document freshness distribution.

Parameters:

  • service (Annotated[AnalyticsService, Depends(get_analytics_service)]) –

    The analytics service instance.

Returns:

  • list[DocumentFreshness]

    list[DocumentFreshness]: A list representing document freshness distribution.

Source code in app/api/v1/endpoints/analytics.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@router.get("/freshness", response_model=list[DocumentFreshness])
async def get_document_freshness(
    service: Annotated[AnalyticsService, Depends(get_analytics_service)],
) -> list[DocumentFreshness]:
    """
    Get knowledge base document freshness distribution.

    Args:
        service: The analytics service instance.

    Returns:
        list[DocumentFreshness]: A list representing document freshness distribution.
    """
    return await service.get_document_freshness()

Get top trending questions/topics.

Parameters:

  • service (Annotated[AnalyticsService, Depends(get_analytics_service)]) –

    The analytics service instance.

  • assistant_id (Optional[UUID], default: Query(None, description='Filter by assistant ID') ) –

    Optional filter by assistant ID.

  • limit (int, default: Query(10, ge=1, le=50, description='Number of topics to return') ) –

    Number of topics to return.

Returns:

  • list[TrendingTopic]

    list[TrendingTopic]: A list of trending topics.

Source code in app/api/v1/endpoints/analytics.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@router.get("/trending", response_model=list[TrendingTopic])
async def get_trending_topics(
    service: Annotated[AnalyticsService, Depends(get_analytics_service)],
    assistant_id: Optional[UUID] = Query(None, description="Filter by assistant ID"),
    limit: int = Query(10, ge=1, le=50, description="Number of topics to return"),
) -> list[TrendingTopic]:
    """
    Get top trending questions/topics.

    Args:
        service: The analytics service instance.
        assistant_id: Optional filter by assistant ID.
        limit: Number of topics to return.

    Returns:
        list[TrendingTopic]: A list of trending topics.
    """
    return await service.get_trending_topics(assistant_id, limit)

get_ttft_percentiles(service, hours=Query(24, ge=1, le=168, description='Analysis period in hours')) async

Get Time-to-First-Token percentiles.

Parameters:

  • service (Annotated[AnalyticsService, Depends(get_analytics_service)]) –

    The analytics service instance.

  • hours (int, default: Query(24, ge=1, le=168, description='Analysis period in hours') ) –

    Analysis period in hours.

Returns:

  • TTFTPercentiles ( TTFTPercentiles ) –

    The calculated TTFT percentiles (p50, p95, p99).

Source code in app/api/v1/endpoints/analytics.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@router.get("/ttft", response_model=TTFTPercentiles)
async def get_ttft_percentiles(
    service: Annotated[AnalyticsService, Depends(get_analytics_service)],
    hours: int = Query(24, ge=1, le=168, description="Analysis period in hours"),
) -> TTFTPercentiles:
    """
    Get Time-to-First-Token percentiles.

    Args:
        service: The analytics service instance.
        hours: Analysis period in hours.

    Returns:
        TTFTPercentiles: The calculated TTFT percentiles (p50, p95, p99).
    """
    result = await service.get_ttft_percentiles(hours)

    if not result:
        return TTFTPercentiles(p50=0.0, p95=0.0, p99=0.0, period_hours=hours)

    return result

start_broadcast_task(interval_seconds=10) async

Start the periodic broadcast background task.

Parameters:

  • interval_seconds (int, default: 10 ) –

    Broadcast interval in seconds.

Source code in app/api/v1/endpoints/analytics.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def start_broadcast_task(interval_seconds: int = 10) -> None:
    """
    Start the periodic broadcast background task.

    Args:
        interval_seconds: Broadcast interval in seconds.
    """
    global _broadcast_task, _broadcast_running

    if _broadcast_task is not None and not _broadcast_task.done():
        logger.warning("Analytics broadcast task already running")
        return

    _broadcast_running = True
    _broadcast_task = asyncio.create_task(broadcast_analytics_loop(interval_seconds))
    logger.info("Analytics broadcast task started")

stop_broadcast_task() async

Stop the periodic broadcast background task.

Source code in app/api/v1/endpoints/analytics.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
async def stop_broadcast_task() -> None:
    """
    Stop the periodic broadcast background task.
    """
    global _broadcast_task, _broadcast_running

    if _broadcast_task is None:
        return

    _broadcast_running = False

    # Wait for task to finish
    try:
        await asyncio.wait_for(_broadcast_task, timeout=10.0)
    except asyncio.TimeoutError:
        logger.warning("Analytics broadcast task did not stop gracefully, cancelling")
        _broadcast_task.cancel()
    except Exception as e:
        logger.error("Error during stop_broadcast_task: %s", e)

    _broadcast_task = None
    logger.info("Analytics broadcast task stopped")