From 6b589a1b7c3b0ab3146d75dfdf959eea3dd625b1 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Thu, 5 Mar 2026 21:44:23 +0800 Subject: [PATCH] Enhance session management and logging configuration - Updated .env.example to clarify audio frame size validation and default codec settings. - Refactored logging setup in main.py to support JSON serialization based on log format configuration. - Improved session.py to dynamically compute audio frame bytes and include protocol version in session events. - Added tests to validate session start events and audio frame handling based on chunk size settings. --- engine/.env.example | 7 +++- engine/app/main.py | 41 +++++++++++++------ engine/core/session.py | 36 ++++++++++++++-- .../tests/test_ws_protocol_session_start.py | 38 +++++++++++++++++ 4 files changed, 105 insertions(+), 17 deletions(-) diff --git a/engine/.env.example b/engine/.env.example index 8a87354..7f09de7 100644 --- a/engine/.env.example +++ b/engine/.env.example @@ -26,22 +26,27 @@ HISTORY_FINALIZE_DRAIN_TIMEOUT_SEC=1.5 SAMPLE_RATE=16000 # 20ms is recommended for VAD stability and latency. # 100ms works but usually worsens start-of-speech accuracy. +# WS binary audio frame size validation is derived from SAMPLE_RATE + CHUNK_SIZE_MS. +# Client frame payloads must be a multiple of: SAMPLE_RATE * 2 * (CHUNK_SIZE_MS / 1000). CHUNK_SIZE_MS=20 +# Public default output codec exposed in config.resolved (overridable by runtime metadata). DEFAULT_CODEC=pcm MAX_AUDIO_BUFFER_SECONDS=30 # Local assistant/agent YAML directory. In local mode the runtime resolves: # ASSISTANT_LOCAL_CONFIG_DIR/.yaml -ASSISTANT_LOCAL_CONFIG_DIR=engine/config/agents +ASSISTANT_LOCAL_CONFIG_DIR=config/agents # Logging LOG_LEVEL=INFO # json is better for production/observability; text is easier locally. +# Controls both console and file log serialization/format. LOG_FORMAT=json # WebSocket behavior INACTIVITY_TIMEOUT_SEC=60 HEARTBEAT_INTERVAL_SEC=50 +# Public protocol label emitted in session.started/config.resolved payloads. WS_PROTOCOL_VERSION=v1 # CORS / ICE (JSON strings) diff --git a/engine/app/main.py b/engine/app/main.py index b4c5c05..bd513f6 100644 --- a/engine/app/main.py +++ b/engine/app/main.py @@ -80,18 +80,35 @@ backend_gateway = build_backend_adapter_from_settings() # Configure logging logger.remove() -logger.add( - "./logs/active_call_{time}.log", - rotation="1 day", - retention="7 days", - level=settings.log_level, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" -) -logger.add( - lambda msg: print(msg, end=""), - level=settings.log_level, - format="{time:HH:mm:ss} | {level: <8} | {message}" -) +_log_format = str(settings.log_format or "text").strip().lower() +if _log_format == "json": + logger.add( + "./logs/active_call_{time}.log", + rotation="1 day", + retention="7 days", + level=settings.log_level, + serialize=True, + format="{message}", + ) + logger.add( + lambda msg: print(msg, end=""), + level=settings.log_level, + serialize=True, + format="{message}", + ) +else: + logger.add( + "./logs/active_call_{time}.log", + rotation="1 day", + retention="7 days", + level=settings.log_level, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + ) + logger.add( + lambda msg: print(msg, end=""), + level=settings.log_level, + format="{time:HH:mm:ss} | {level: <8} | {message}", + ) @app.get("/health") diff --git a/engine/core/session.py b/engine/core/session.py index de00855..bc8bab8 100644 --- a/engine/core/session.py +++ b/engine/core/session.py @@ -54,7 +54,7 @@ class Session: TRACK_AUDIO_IN = "audio_in" TRACK_AUDIO_OUT = "audio_out" TRACK_CONTROL = "control" - AUDIO_FRAME_BYTES = 640 # 16k mono pcm_s16le, 20ms + AUDIO_FRAME_BYTES = 640 # Legacy fallback: 16k mono pcm_s16le, 20ms _METADATA_ALLOWED_TOP_LEVEL_KEYS = { "overrides", "dynamicVariables", @@ -111,6 +111,7 @@ class Session: self.id = session_id self.transport = transport self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled + self.audio_frame_bytes = self._compute_audio_frame_bytes() self._assistant_id = str(assistant_id or "").strip() or None self._backend_gateway = backend_gateway or build_backend_adapter_from_settings() self._history_bridge = SessionHistoryBridge( @@ -210,11 +211,14 @@ class Session: ) return - frame_bytes = self.AUDIO_FRAME_BYTES + frame_bytes = getattr(self, "audio_frame_bytes", self._compute_audio_frame_bytes()) if len(audio_bytes) % frame_bytes != 0: await self._send_error( "client", - f"Audio frame size must be a multiple of {frame_bytes} bytes (20ms PCM)", + ( + f"Audio frame size must be a multiple of {frame_bytes} bytes " + f"({settings.chunk_size_ms}ms PCM @ {settings.sample_rate}Hz)" + ), "audio.frame_size_mismatch", stage="audio", retryable=False, @@ -384,6 +388,7 @@ class Session: ev( "session.started", trackId=self.current_track_id, + protocolVersion=self._public_ws_protocol_version(), tracks={ "audio_in": self.TRACK_AUDIO_IN, "audio_out": self.TRACK_AUDIO_OUT, @@ -1137,6 +1142,7 @@ class Session: output_mode = str(runtime_output.get("mode") or "").strip().lower() if isinstance(runtime_output, dict) else "" if output_mode not in {"audio", "text"}: output_mode = "audio" + output_codec = str(runtime_output.get("codec") or settings.default_codec or "pcm").strip().lower() or "pcm" tools_allowlist: List[str] = [] runtime_tools = runtime.get("tools", {}) if isinstance(runtime, dict) else {} @@ -1146,7 +1152,11 @@ class Session: tools_allowlist = [str(item) for item in allowlist if item is not None and str(item).strip()] resolved: Dict[str, Any] = { - "output": {"mode": output_mode}, + "protocolVersion": self._public_ws_protocol_version(), + "output": { + "mode": output_mode, + "codec": output_codec, + }, "tools": { "enabled": bool(tools_allowlist), "count": len(tools_allowlist), @@ -1162,6 +1172,24 @@ class Session: return resolved + @staticmethod + def _compute_audio_frame_bytes() -> int: + """Compute expected PCM frame bytes from SAMPLE_RATE and CHUNK_SIZE_MS.""" + sample_rate = max(1, int(getattr(settings, "sample_rate", 16000))) + chunk_ms = max(1, int(getattr(settings, "chunk_size_ms", 20))) + bytes_per_frame = int(round(sample_rate * 2 * (chunk_ms / 1000.0))) + if bytes_per_frame < 2: + bytes_per_frame = 2 + if bytes_per_frame % 2 != 0: + bytes_per_frame += 1 + return bytes_per_frame + + @staticmethod + def _public_ws_protocol_version() -> str: + """Return public protocol version label announced to clients.""" + version = str(getattr(settings, "ws_protocol_version", "v1") or "v1").strip() + return version or "v1" + def _extract_json_obj(self, text: str) -> Optional[Dict[str, Any]]: """Best-effort extraction of a JSON object from freeform text.""" try: diff --git a/engine/tests/test_ws_protocol_session_start.py b/engine/tests/test_ws_protocol_session_start.py index 90ac179..ac15b16 100644 --- a/engine/tests/test_ws_protocol_session_start.py +++ b/engine/tests/test_ws_protocol_session_start.py @@ -290,6 +290,8 @@ async def test_handle_session_start_applies_whitelisted_overrides_and_ignores_wo @pytest.mark.asyncio async def test_handle_session_start_emits_config_resolved_when_enabled(monkeypatch): monkeypatch.setattr("core.session.settings.ws_emit_config_resolved", True) + monkeypatch.setattr("core.session.settings.ws_protocol_version", "v1-custom") + monkeypatch.setattr("core.session.settings.default_codec", "pcmu") session = Session.__new__(Session) session.id = "sess_start_emit_config" @@ -368,10 +370,46 @@ async def test_handle_session_start_emits_config_resolved_when_enabled(monkeypat ) config_event = next(item for item in events if item.get("type") == "config.resolved") + session_started_event = next(item for item in events if item.get("type") == "session.started") + assert session_started_event["protocolVersion"] == "v1-custom" assert "appId" not in config_event["config"] assert "configVersionId" not in config_event["config"] assert "services" not in config_event["config"] + assert config_event["config"]["protocolVersion"] == "v1-custom" assert config_event["config"]["channel"] == "web_debug" assert config_event["config"]["output"]["mode"] == "text" + assert config_event["config"]["output"]["codec"] == "pcmu" assert config_event["config"]["tools"]["enabled"] is True assert config_event["config"]["tools"]["count"] == 1 + + +@pytest.mark.asyncio +async def test_handle_audio_uses_chunk_size_for_frame_validation(monkeypatch): + monkeypatch.setattr("core.session.settings.sample_rate", 16000) + monkeypatch.setattr("core.session.settings.chunk_size_ms", 10) + + session = Session.__new__(Session) + session.id = "sess_chunk_frame" + session.ws_state = WsSessionState.ACTIVE + + class _Pipeline: + def __init__(self): + self.frames = [] + + async def process_audio(self, frame: bytes): + self.frames.append(frame) + + session.pipeline = _Pipeline() + errors = [] + + async def _send_error(sender, message, code, **kwargs): + _ = (sender, kwargs) + errors.append((code, message)) + + session._send_error = _send_error + payload = b"\x00\x01" * 320 # 640 bytes = 2 frames when chunk_size_ms=10 + await session.handle_audio(payload) + + assert errors == [] + assert len(session.pipeline.frames) == 2 + assert all(len(frame) == 320 for frame in session.pipeline.frames)