import asyncio from typing import List from fastapi import WebSocket import logging class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.loop: asyncio.AbstractEventLoop | None = None def set_loop(self, loop: asyncio.AbstractEventLoop): """Sets the asyncio event loop.""" self.loop = loop async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) def broadcast(self, message: str): """ Thread-safely broadcasts a message to all active WebSocket connections. This method is designed to be called from a different thread (e.g., a ROS2 callback). """ if not self.loop: logging.error("Event loop not set in ConnectionManager. Cannot broadcast.") return # Schedule the coroutine to be executed in the event loop self.loop.call_soon_threadsafe(self._broadcast_in_loop, message) def _broadcast_in_loop(self, message: str): """ Helper to run the broadcast coroutine in the correct event loop. """ asyncio.ensure_future(self._broadcast_async(message), loop=self.loop) async def _broadcast_async(self, message: str): """ The actual async method that sends messages. """ tasks = [connection.send_text(message) for connection in self.active_connections] await asyncio.gather(*tasks, return_exceptions=True) # Create a single instance of the manager to be used across the application websocket_manager = ConnectionManager()