"""Session management for active calls.""" import uuid import json from typing import Optional, Dict, Any from loguru import logger from core.transports import BaseTransport from core.duplex_pipeline import DuplexPipeline from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand from app.config import settings class Session: """ Manages a single call session. Handles command routing, audio processing, and session lifecycle. Uses full duplex voice conversation pipeline. """ def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None): """ Initialize session. Args: session_id: Unique session identifier transport: Transport instance for communication use_duplex: Whether to use duplex pipeline (defaults to settings.duplex_enabled) """ self.id = session_id self.transport = transport self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled self.pipeline = DuplexPipeline( transport=transport, session_id=session_id, system_prompt=settings.duplex_system_prompt, greeting=settings.duplex_greeting ) # Session state self.created_at = None self.state = "created" # created, invited, accepted, ringing, hungup self._pipeline_started = False # Track IDs self.current_track_id: Optional[str] = str(uuid.uuid4()) logger.info(f"Session {self.id} created (duplex={self.use_duplex})") async def handle_text(self, text_data: str) -> None: """ Handle incoming text data (JSON commands). Args: text_data: JSON text data """ try: data = json.loads(text_data) command = parse_command(data) command_type = command.command logger.info(f"Session {self.id} received command: {command_type}") # Route command to appropriate handler if command_type == "invite": await self._handle_invite(data) elif command_type == "accept": await self._handle_accept(data) elif command_type == "reject": await self._handle_reject(data) elif command_type == "ringing": await self._handle_ringing(data) elif command_type == "tts": await self._handle_tts(command) elif command_type == "play": await self._handle_play(data) elif command_type == "interrupt": await self._handle_interrupt(command) elif command_type == "pause": await self._handle_pause() elif command_type == "resume": await self._handle_resume() elif command_type == "hangup": await self._handle_hangup(command) elif command_type == "history": await self._handle_history(data) elif command_type == "chat": await self._handle_chat(command) else: logger.warning(f"Session {self.id} unknown command: {command_type}") except json.JSONDecodeError as e: logger.error(f"Session {self.id} JSON decode error: {e}") await self._send_error("client", f"Invalid JSON: {e}") except ValueError as e: logger.error(f"Session {self.id} command parse error: {e}") await self._send_error("client", f"Invalid command: {e}") except Exception as e: logger.error(f"Session {self.id} handle_text error: {e}", exc_info=True) await self._send_error("server", f"Internal error: {e}") async def handle_audio(self, audio_bytes: bytes) -> None: """ Handle incoming audio data. Args: audio_bytes: PCM audio data """ try: await self.pipeline.process_audio(audio_bytes) except Exception as e: logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True) async def _handle_invite(self, data: Dict[str, Any]) -> None: """Handle invite command.""" self.state = "invited" option = data.get("option", {}) # Send answer event await self.transport.send_event({ "event": "answer", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms() }) # Start duplex pipeline if not self._pipeline_started: try: await self.pipeline.start() self._pipeline_started = True logger.info(f"Session {self.id} duplex pipeline started") except Exception as e: logger.error(f"Failed to start duplex pipeline: {e}") logger.info(f"Session {self.id} invited with codec: {option.get('codec', 'pcm')}") async def _handle_accept(self, data: Dict[str, Any]) -> None: """Handle accept command.""" self.state = "accepted" logger.info(f"Session {self.id} accepted") async def _handle_reject(self, data: Dict[str, Any]) -> None: """Handle reject command.""" self.state = "rejected" reason = data.get("reason", "Rejected") logger.info(f"Session {self.id} rejected: {reason}") async def _handle_ringing(self, data: Dict[str, Any]) -> None: """Handle ringing command.""" self.state = "ringing" logger.info(f"Session {self.id} ringing") async def _handle_tts(self, command: TTSCommand) -> None: """Handle TTS command.""" logger.info(f"Session {self.id} TTS: {command.text[:50]}...") # Send track start event await self.transport.send_event({ "event": "trackStart", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms(), "playId": command.play_id }) # TODO: Implement actual TTS synthesis # For now, just send track end event await self.transport.send_event({ "event": "trackEnd", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms(), "duration": 1000, "ssrc": 0, "playId": command.play_id }) async def _handle_play(self, data: Dict[str, Any]) -> None: """Handle play command.""" url = data.get("url", "") logger.info(f"Session {self.id} play: {url}") # Send track start event await self.transport.send_event({ "event": "trackStart", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms(), "playId": url }) # TODO: Implement actual audio playback # For now, just send track end event await self.transport.send_event({ "event": "trackEnd", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms(), "duration": 1000, "ssrc": 0, "playId": url }) async def _handle_interrupt(self, command: InterruptCommand) -> None: """Handle interrupt command.""" if command.graceful: logger.info(f"Session {self.id} graceful interrupt") else: logger.info(f"Session {self.id} immediate interrupt") await self.pipeline.interrupt() async def _handle_pause(self) -> None: """Handle pause command.""" logger.info(f"Session {self.id} paused") async def _handle_resume(self) -> None: """Handle resume command.""" logger.info(f"Session {self.id} resumed") async def _handle_hangup(self, command: HangupCommand) -> None: """Handle hangup command.""" self.state = "hungup" reason = command.reason or "User requested" logger.info(f"Session {self.id} hung up: {reason}") # Send hangup event await self.transport.send_event({ "event": "hangup", "timestamp": self._get_timestamp_ms(), "reason": reason, "initiator": command.initiator or "user" }) # Close transport await self.transport.close() async def _handle_history(self, data: Dict[str, Any]) -> None: """Handle history command.""" speaker = data.get("speaker", "unknown") text = data.get("text", "") logger.info(f"Session {self.id} history [{speaker}]: {text[:50]}...") async def _handle_chat(self, command: ChatCommand) -> None: """Handle chat command.""" logger.info(f"Session {self.id} chat: {command.text[:50]}...") await self.pipeline.process_text(command.text) async def _send_error(self, sender: str, error_message: str) -> None: """ Send error event to client. Args: sender: Component that generated the error error_message: Error message """ await self.transport.send_event({ "event": "error", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms(), "sender": sender, "error": error_message }) def _get_timestamp_ms(self) -> int: """Get current timestamp in milliseconds.""" import time return int(time.time() * 1000) async def cleanup(self) -> None: """Cleanup session resources.""" logger.info(f"Session {self.id} cleaning up") await self.pipeline.cleanup() await self.transport.close()