135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
"""Event bus for pub/sub communication between components."""
|
|
|
|
import asyncio
|
|
from typing import Callable, Dict, List, Any, Optional
|
|
from collections import defaultdict
|
|
from loguru import logger
|
|
|
|
|
|
class EventBus:
|
|
"""
|
|
Async event bus for pub/sub communication.
|
|
|
|
Similar to the original Rust implementation's broadcast channel.
|
|
Components can subscribe to specific event types and receive events asynchronously.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the event bus."""
|
|
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
|
|
self._lock = asyncio.Lock()
|
|
self._running = True
|
|
|
|
def subscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None:
|
|
"""
|
|
Subscribe to an event type.
|
|
|
|
Args:
|
|
event_type: Type of event to subscribe to (e.g., "speaking", "silence")
|
|
callback: Async callback function that receives event data
|
|
"""
|
|
if not self._running:
|
|
logger.warning(f"Event bus is shut down, ignoring subscription to {event_type}")
|
|
return
|
|
|
|
self._subscribers[event_type].append(callback)
|
|
logger.debug(f"Subscribed to event type: {event_type}")
|
|
|
|
def unsubscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None:
|
|
"""
|
|
Unsubscribe from an event type.
|
|
|
|
Args:
|
|
event_type: Type of event to unsubscribe from
|
|
callback: Callback function to remove
|
|
"""
|
|
if callback in self._subscribers[event_type]:
|
|
self._subscribers[event_type].remove(callback)
|
|
logger.debug(f"Unsubscribed from event type: {event_type}")
|
|
|
|
async def publish(self, event_type: str, event_data: Dict[str, Any]) -> None:
|
|
"""
|
|
Publish an event to all subscribers.
|
|
|
|
Args:
|
|
event_type: Type of event to publish
|
|
event_data: Event data to send to subscribers
|
|
"""
|
|
if not self._running:
|
|
logger.warning(f"Event bus is shut down, ignoring event: {event_type}")
|
|
return
|
|
|
|
# Get subscribers for this event type
|
|
subscribers = self._subscribers.get(event_type, [])
|
|
|
|
if not subscribers:
|
|
logger.debug(f"No subscribers for event type: {event_type}")
|
|
return
|
|
|
|
# Notify all subscribers concurrently
|
|
tasks = []
|
|
for callback in subscribers:
|
|
try:
|
|
# Create task for each subscriber
|
|
task = asyncio.create_task(self._call_subscriber(callback, event_data))
|
|
tasks.append(task)
|
|
except Exception as e:
|
|
logger.error(f"Error creating task for subscriber: {e}")
|
|
|
|
# Wait for all subscribers to complete
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
logger.debug(f"Published event '{event_type}' to {len(tasks)} subscribers")
|
|
|
|
async def _call_subscriber(self, callback: Callable[[Dict[str, Any]], None], event_data: Dict[str, Any]) -> None:
|
|
"""
|
|
Call a subscriber callback with error handling.
|
|
|
|
Args:
|
|
callback: Subscriber callback function
|
|
event_data: Event data to pass to callback
|
|
"""
|
|
try:
|
|
# Check if callback is a coroutine function
|
|
if asyncio.iscoroutinefunction(callback):
|
|
await callback(event_data)
|
|
else:
|
|
callback(event_data)
|
|
except Exception as e:
|
|
logger.error(f"Error in subscriber callback: {e}", exc_info=True)
|
|
|
|
async def close(self) -> None:
|
|
"""Close the event bus and stop processing events."""
|
|
self._running = False
|
|
self._subscribers.clear()
|
|
logger.info("Event bus closed")
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
"""Check if the event bus is running."""
|
|
return self._running
|
|
|
|
|
|
# Global event bus instance
|
|
_event_bus: Optional[EventBus] = None
|
|
|
|
|
|
def get_event_bus() -> EventBus:
|
|
"""
|
|
Get the global event bus instance.
|
|
|
|
Returns:
|
|
EventBus instance
|
|
"""
|
|
global _event_bus
|
|
if _event_bus is None:
|
|
_event_bus = EventBus()
|
|
return _event_bus
|
|
|
|
|
|
def reset_event_bus() -> None:
|
|
"""Reset the global event bus (mainly for testing)."""
|
|
global _event_bus
|
|
_event_bus = None
|