Dashboard Statistics API Endpoint.
Provides REST endpoint for dashboard stats and manages periodic WebSocket broadcasts.
broadcast_dashboard_stats_loop(interval_seconds=5)
async
Background task that periodically broadcasts dashboard stats via WebSocket.
Source code in app/api/v1/endpoints/dashboard.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | async def broadcast_dashboard_stats_loop(interval_seconds: int = 5) -> None:
"""
Background task that periodically broadcasts dashboard stats via WebSocket.
"""
global _broadcast_running
_broadcast_running = True
logger.info("📈 START | dashboard_stats_broadcast | Interval: %ds", interval_seconds)
# Late imports for clean dependency tree
from app.core.websocket import manager
from app.core.database import SessionLocal
try:
while _broadcast_running:
try:
# Create a fresh session per iteration to avoid pool/state issues
async with SessionLocal() as db:
service = DashboardStatsService(db)
stats = await service.get_all_stats()
await manager.emit_dashboard_stats(stats.model_dump(mode="json"))
except Exception as e:
logger.error(f"⚠️ WARN | dashboard_stats_broadcast | Iteration Error: {e}")
# Responsive sleep: allows faster shutdown if flag changes
# Wait in small increments or simply wait once if flag is enough
await asyncio.sleep(interval_seconds)
finally:
_broadcast_running = False
logger.info("📉 STOP | dashboard_stats_broadcast | Loop ended")
|
get_dashboard_stats(db)
async
Get current dashboard statistics (REST endpoint).
Source code in app/api/v1/endpoints/dashboard.py
26
27
28
29
30
31
32
33
34
35
36
37 | @router.get("/stats", response_model=DashboardStats)
async def get_dashboard_stats(db: Annotated[AsyncSession, Depends(get_db)]) -> DashboardStats:
"""
Get current dashboard statistics (REST endpoint).
"""
try:
service = DashboardStatsService(db)
return await service.get_all_stats()
except Exception as e:
logger.error(f"❌ FAIL | get_dashboard_stats | Error: {str(e)}", exc_info=True)
# Exception handler in main.py will convert to standard JSON response
raise
|
start_broadcast_task(interval_seconds=5)
async
Start the periodic broadcast background task.
Parameters:
-
interval_seconds
(int, default:
5
)
–
Broadcast interval in seconds.
Returns:
Source code in app/api/v1/endpoints/dashboard.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | async def start_broadcast_task(interval_seconds: int = 5) -> None:
"""
Start the periodic broadcast background task.
Args:
interval_seconds: Broadcast interval in seconds.
Returns:
None
"""
global _broadcast_task, _broadcast_running
if _broadcast_task is not None and not _broadcast_task.done():
logger.warning("Dashboard broadcast task already running")
return
_broadcast_running = True
_broadcast_task = asyncio.create_task(broadcast_dashboard_stats_loop(interval_seconds))
logger.info("Dashboard broadcast task started")
|
stop_broadcast_task()
async
Stop the periodic broadcast background task.
Returns:
Source code in app/api/v1/endpoints/dashboard.py
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 | async def stop_broadcast_task() -> None:
"""
Stop the periodic broadcast background task.
Returns:
None
"""
global _broadcast_task, _broadcast_running
if _broadcast_task is None:
return
_broadcast_running = False
# Wait for task to finish (should exit quickly after flag set)
try:
if _broadcast_task:
await asyncio.wait_for(_broadcast_task, timeout=10.0)
except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as e:
if _broadcast_task:
logger.warning(f"Dashboard broadcast task did not stop gracefully ({e}), cancelling")
_broadcast_task.cancel()
try:
await _broadcast_task
except asyncio.CancelledError:
pass
_broadcast_task = None
logger.info("Dashboard broadcast task stopped")
|