"""Event bus for pub/sub communication between components.""" import asyncio from typing import Callable, Dict, List, Any, Optional from collections import defaultdict from loguru import logger class EventBus: """ Async event bus for pub/sub communication. Similar to the original Rust implementation's broadcast channel. Components can subscribe to specific event types and receive events asynchronously. """ def __init__(self): """Initialize the event bus.""" self._subscribers: Dict[str, List[Callable]] = defaultdict(list) self._lock = asyncio.Lock() self._running = True def subscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None: """ Subscribe to an event type. Args: event_type: Type of event to subscribe to (e.g., "speaking", "silence") callback: Async callback function that receives event data """ if not self._running: logger.warning(f"Event bus is shut down, ignoring subscription to {event_type}") return self._subscribers[event_type].append(callback) logger.debug(f"Subscribed to event type: {event_type}") def unsubscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None: """ Unsubscribe from an event type. Args: event_type: Type of event to unsubscribe from callback: Callback function to remove """ if callback in self._subscribers[event_type]: self._subscribers[event_type].remove(callback) logger.debug(f"Unsubscribed from event type: {event_type}") async def publish(self, event_type: str, event_data: Dict[str, Any]) -> None: """ Publish an event to all subscribers. Args: event_type: Type of event to publish event_data: Event data to send to subscribers """ if not self._running: logger.warning(f"Event bus is shut down, ignoring event: {event_type}") return # Get subscribers for this event type subscribers = self._subscribers.get(event_type, []) if not subscribers: logger.debug(f"No subscribers for event type: {event_type}") return # Notify all subscribers concurrently tasks = [] for callback in subscribers: try: # Create task for each subscriber task = asyncio.create_task(self._call_subscriber(callback, event_data)) tasks.append(task) except Exception as e: logger.error(f"Error creating task for subscriber: {e}") # Wait for all subscribers to complete if tasks: await asyncio.gather(*tasks, return_exceptions=True) logger.debug(f"Published event '{event_type}' to {len(tasks)} subscribers") async def _call_subscriber(self, callback: Callable[[Dict[str, Any]], None], event_data: Dict[str, Any]) -> None: """ Call a subscriber callback with error handling. Args: callback: Subscriber callback function event_data: Event data to pass to callback """ try: # Check if callback is a coroutine function if asyncio.iscoroutinefunction(callback): await callback(event_data) else: callback(event_data) except Exception as e: logger.error(f"Error in subscriber callback: {e}", exc_info=True) async def close(self) -> None: """Close the event bus and stop processing events.""" self._running = False self._subscribers.clear() logger.info("Event bus closed") @property def is_running(self) -> bool: """Check if the event bus is running.""" return self._running # Global event bus instance _event_bus: Optional[EventBus] = None def get_event_bus() -> EventBus: """ Get the global event bus instance. Returns: EventBus instance """ global _event_bus if _event_bus is None: _event_bus = EventBus() return _event_bus def reset_event_bus() -> None: """Reset the global event bus (mainly for testing).""" global _event_bus _event_bus = None