Files
AI-VideoAssistant/engine/core/events.py
2026-02-06 14:01:34 +08:00

135 lines
4.3 KiB
Python

"""Event bus for pub/sub communication between components."""
import asyncio
from typing import Callable, Dict, List, Any, Optional
from collections import defaultdict
from loguru import logger
class EventBus:
"""
Async event bus for pub/sub communication.
Similar to the original Rust implementation's broadcast channel.
Components can subscribe to specific event types and receive events asynchronously.
"""
def __init__(self):
"""Initialize the event bus."""
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
self._lock = asyncio.Lock()
self._running = True
def subscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Subscribe to an event type.
Args:
event_type: Type of event to subscribe to (e.g., "speaking", "silence")
callback: Async callback function that receives event data
"""
if not self._running:
logger.warning(f"Event bus is shut down, ignoring subscription to {event_type}")
return
self._subscribers[event_type].append(callback)
logger.debug(f"Subscribed to event type: {event_type}")
def unsubscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Unsubscribe from an event type.
Args:
event_type: Type of event to unsubscribe from
callback: Callback function to remove
"""
if callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
logger.debug(f"Unsubscribed from event type: {event_type}")
async def publish(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""
Publish an event to all subscribers.
Args:
event_type: Type of event to publish
event_data: Event data to send to subscribers
"""
if not self._running:
logger.warning(f"Event bus is shut down, ignoring event: {event_type}")
return
# Get subscribers for this event type
subscribers = self._subscribers.get(event_type, [])
if not subscribers:
logger.debug(f"No subscribers for event type: {event_type}")
return
# Notify all subscribers concurrently
tasks = []
for callback in subscribers:
try:
# Create task for each subscriber
task = asyncio.create_task(self._call_subscriber(callback, event_data))
tasks.append(task)
except Exception as e:
logger.error(f"Error creating task for subscriber: {e}")
# Wait for all subscribers to complete
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
logger.debug(f"Published event '{event_type}' to {len(tasks)} subscribers")
async def _call_subscriber(self, callback: Callable[[Dict[str, Any]], None], event_data: Dict[str, Any]) -> None:
"""
Call a subscriber callback with error handling.
Args:
callback: Subscriber callback function
event_data: Event data to pass to callback
"""
try:
# Check if callback is a coroutine function
if asyncio.iscoroutinefunction(callback):
await callback(event_data)
else:
callback(event_data)
except Exception as e:
logger.error(f"Error in subscriber callback: {e}", exc_info=True)
async def close(self) -> None:
"""Close the event bus and stop processing events."""
self._running = False
self._subscribers.clear()
logger.info("Event bus closed")
@property
def is_running(self) -> bool:
"""Check if the event bus is running."""
return self._running
# Global event bus instance
_event_bus: Optional[EventBus] = None
def get_event_bus() -> EventBus:
"""
Get the global event bus instance.
Returns:
EventBus instance
"""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def reset_event_bus() -> None:
"""Reset the global event bus (mainly for testing)."""
global _event_bus
_event_bus = None