286 lines
9.5 KiB
Python
286 lines
9.5 KiB
Python
"""Session management for active calls."""
|
|
|
|
import uuid
|
|
import json
|
|
from typing import Optional, Dict, Any
|
|
from loguru import logger
|
|
|
|
from core.transports import BaseTransport
|
|
from core.duplex_pipeline import DuplexPipeline
|
|
from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand
|
|
from app.config import settings
|
|
|
|
|
|
class Session:
|
|
"""
|
|
Manages a single call session.
|
|
|
|
Handles command routing, audio processing, and session lifecycle.
|
|
Uses full duplex voice conversation pipeline.
|
|
"""
|
|
|
|
def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None):
|
|
"""
|
|
Initialize session.
|
|
|
|
Args:
|
|
session_id: Unique session identifier
|
|
transport: Transport instance for communication
|
|
use_duplex: Whether to use duplex pipeline (defaults to settings.duplex_enabled)
|
|
"""
|
|
self.id = session_id
|
|
self.transport = transport
|
|
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
|
|
|
|
self.pipeline = DuplexPipeline(
|
|
transport=transport,
|
|
session_id=session_id,
|
|
system_prompt=settings.duplex_system_prompt,
|
|
greeting=settings.duplex_greeting
|
|
)
|
|
|
|
# Session state
|
|
self.created_at = None
|
|
self.state = "created" # created, invited, accepted, ringing, hungup
|
|
self._pipeline_started = False
|
|
|
|
# Track IDs
|
|
self.current_track_id: Optional[str] = str(uuid.uuid4())
|
|
|
|
logger.info(f"Session {self.id} created (duplex={self.use_duplex})")
|
|
|
|
async def handle_text(self, text_data: str) -> None:
|
|
"""
|
|
Handle incoming text data (JSON commands).
|
|
|
|
Args:
|
|
text_data: JSON text data
|
|
"""
|
|
try:
|
|
data = json.loads(text_data)
|
|
command = parse_command(data)
|
|
command_type = command.command
|
|
|
|
logger.info(f"Session {self.id} received command: {command_type}")
|
|
|
|
# Route command to appropriate handler
|
|
if command_type == "invite":
|
|
await self._handle_invite(data)
|
|
|
|
elif command_type == "accept":
|
|
await self._handle_accept(data)
|
|
|
|
elif command_type == "reject":
|
|
await self._handle_reject(data)
|
|
|
|
elif command_type == "ringing":
|
|
await self._handle_ringing(data)
|
|
|
|
elif command_type == "tts":
|
|
await self._handle_tts(command)
|
|
|
|
elif command_type == "play":
|
|
await self._handle_play(data)
|
|
|
|
elif command_type == "interrupt":
|
|
await self._handle_interrupt(command)
|
|
|
|
elif command_type == "pause":
|
|
await self._handle_pause()
|
|
|
|
elif command_type == "resume":
|
|
await self._handle_resume()
|
|
|
|
elif command_type == "hangup":
|
|
await self._handle_hangup(command)
|
|
|
|
elif command_type == "history":
|
|
await self._handle_history(data)
|
|
|
|
elif command_type == "chat":
|
|
await self._handle_chat(command)
|
|
|
|
else:
|
|
logger.warning(f"Session {self.id} unknown command: {command_type}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Session {self.id} JSON decode error: {e}")
|
|
await self._send_error("client", f"Invalid JSON: {e}")
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Session {self.id} command parse error: {e}")
|
|
await self._send_error("client", f"Invalid command: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Session {self.id} handle_text error: {e}", exc_info=True)
|
|
await self._send_error("server", f"Internal error: {e}")
|
|
|
|
async def handle_audio(self, audio_bytes: bytes) -> None:
|
|
"""
|
|
Handle incoming audio data.
|
|
|
|
Args:
|
|
audio_bytes: PCM audio data
|
|
"""
|
|
try:
|
|
await self.pipeline.process_audio(audio_bytes)
|
|
except Exception as e:
|
|
logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True)
|
|
|
|
async def _handle_invite(self, data: Dict[str, Any]) -> None:
|
|
"""Handle invite command."""
|
|
self.state = "invited"
|
|
option = data.get("option", {})
|
|
|
|
# Send answer event
|
|
await self.transport.send_event({
|
|
"event": "answer",
|
|
"trackId": self.current_track_id,
|
|
"timestamp": self._get_timestamp_ms()
|
|
})
|
|
|
|
# Start duplex pipeline
|
|
if not self._pipeline_started:
|
|
try:
|
|
await self.pipeline.start()
|
|
self._pipeline_started = True
|
|
logger.info(f"Session {self.id} duplex pipeline started")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start duplex pipeline: {e}")
|
|
|
|
logger.info(f"Session {self.id} invited with codec: {option.get('codec', 'pcm')}")
|
|
|
|
async def _handle_accept(self, data: Dict[str, Any]) -> None:
|
|
"""Handle accept command."""
|
|
self.state = "accepted"
|
|
logger.info(f"Session {self.id} accepted")
|
|
|
|
async def _handle_reject(self, data: Dict[str, Any]) -> None:
|
|
"""Handle reject command."""
|
|
self.state = "rejected"
|
|
reason = data.get("reason", "Rejected")
|
|
logger.info(f"Session {self.id} rejected: {reason}")
|
|
|
|
async def _handle_ringing(self, data: Dict[str, Any]) -> None:
|
|
"""Handle ringing command."""
|
|
self.state = "ringing"
|
|
logger.info(f"Session {self.id} ringing")
|
|
|
|
async def _handle_tts(self, command: TTSCommand) -> None:
|
|
"""Handle TTS command."""
|
|
logger.info(f"Session {self.id} TTS: {command.text[:50]}...")
|
|
|
|
# Send track start event
|
|
await self.transport.send_event({
|
|
"event": "trackStart",
|
|
"trackId": self.current_track_id,
|
|
"timestamp": self._get_timestamp_ms(),
|
|
"playId": command.play_id
|
|
})
|
|
|
|
# TODO: Implement actual TTS synthesis
|
|
# For now, just send track end event
|
|
await self.transport.send_event({
|
|
"event": "trackEnd",
|
|
"trackId": self.current_track_id,
|
|
"timestamp": self._get_timestamp_ms(),
|
|
"duration": 1000,
|
|
"ssrc": 0,
|
|
"playId": command.play_id
|
|
})
|
|
|
|
async def _handle_play(self, data: Dict[str, Any]) -> None:
|
|
"""Handle play command."""
|
|
url = data.get("url", "")
|
|
logger.info(f"Session {self.id} play: {url}")
|
|
|
|
# Send track start event
|
|
await self.transport.send_event({
|
|
"event": "trackStart",
|
|
"trackId": self.current_track_id,
|
|
"timestamp": self._get_timestamp_ms(),
|
|
"playId": url
|
|
})
|
|
|
|
# TODO: Implement actual audio playback
|
|
# For now, just send track end event
|
|
await self.transport.send_event({
|
|
"event": "trackEnd",
|
|
"trackId": self.current_track_id,
|
|
"timestamp": self._get_timestamp_ms(),
|
|
"duration": 1000,
|
|
"ssrc": 0,
|
|
"playId": url
|
|
})
|
|
|
|
async def _handle_interrupt(self, command: InterruptCommand) -> None:
|
|
"""Handle interrupt command."""
|
|
if command.graceful:
|
|
logger.info(f"Session {self.id} graceful interrupt")
|
|
else:
|
|
logger.info(f"Session {self.id} immediate interrupt")
|
|
await self.pipeline.interrupt()
|
|
|
|
async def _handle_pause(self) -> None:
|
|
"""Handle pause command."""
|
|
logger.info(f"Session {self.id} paused")
|
|
|
|
async def _handle_resume(self) -> None:
|
|
"""Handle resume command."""
|
|
logger.info(f"Session {self.id} resumed")
|
|
|
|
async def _handle_hangup(self, command: HangupCommand) -> None:
|
|
"""Handle hangup command."""
|
|
self.state = "hungup"
|
|
reason = command.reason or "User requested"
|
|
logger.info(f"Session {self.id} hung up: {reason}")
|
|
|
|
# Send hangup event
|
|
await self.transport.send_event({
|
|
"event": "hangup",
|
|
"timestamp": self._get_timestamp_ms(),
|
|
"reason": reason,
|
|
"initiator": command.initiator or "user"
|
|
})
|
|
|
|
# Close transport
|
|
await self.transport.close()
|
|
|
|
async def _handle_history(self, data: Dict[str, Any]) -> None:
|
|
"""Handle history command."""
|
|
speaker = data.get("speaker", "unknown")
|
|
text = data.get("text", "")
|
|
logger.info(f"Session {self.id} history [{speaker}]: {text[:50]}...")
|
|
|
|
async def _handle_chat(self, command: ChatCommand) -> None:
|
|
"""Handle chat command."""
|
|
logger.info(f"Session {self.id} chat: {command.text[:50]}...")
|
|
await self.pipeline.process_text(command.text)
|
|
|
|
async def _send_error(self, sender: str, error_message: str) -> None:
|
|
"""
|
|
Send error event to client.
|
|
|
|
Args:
|
|
sender: Component that generated the error
|
|
error_message: Error message
|
|
"""
|
|
await self.transport.send_event({
|
|
"event": "error",
|
|
"trackId": self.current_track_id,
|
|
"timestamp": self._get_timestamp_ms(),
|
|
"sender": sender,
|
|
"error": error_message
|
|
})
|
|
|
|
def _get_timestamp_ms(self) -> int:
|
|
"""Get current timestamp in milliseconds."""
|
|
import time
|
|
return int(time.time() * 1000)
|
|
|
|
async def cleanup(self) -> None:
|
|
"""Cleanup session resources."""
|
|
logger.info(f"Session {self.id} cleaning up")
|
|
await self.pipeline.cleanup()
|
|
await self.transport.close()
|