diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index 1a10d9e..93ca614 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -230,6 +230,7 @@ class DuplexPipeline: self._last_llm_delta_emit_ms: float = 0.0 self._runtime_tool_executor = self._resolved_tool_executor_map() + self._initial_greeting_emitted = False if self._server_tool_executor is None: if self._tool_resource_resolver: @@ -727,34 +728,52 @@ class DuplexPipeline: if not self._outbound_task or self._outbound_task.done(): self._outbound_task = asyncio.create_task(self._outbound_loop()) - # Resolve greeting once per session start. - # Always emit text opener event so text-only sessions can display it. - if self._bot_starts_first(): - greeting_to_speak = self.conversation.greeting - if self._generated_opener_enabled(): - generated_greeting = await self._generate_runtime_greeting() - if generated_greeting: - greeting_to_speak = generated_greeting - self.conversation.greeting = generated_greeting - if greeting_to_speak: - self._start_turn() - self._start_response() - await self._send_event( - ev( - "assistant.response.final", - text=greeting_to_speak, - trackId=self.track_audio_out, - ), - priority=20, - ) - await self.conversation.add_assistant_turn(greeting_to_speak) - if tts_output_enabled: - await self._speak(greeting_to_speak) - except Exception as e: logger.error(f"Failed to start pipeline: {e}") raise + async def emit_initial_greeting(self) -> None: + """ + Emit opener after session activation. + + Ordering target: + 1) frontend receives `session.started` (shows connected/ready) + 2) frontend receives opener text event + 3) frontend receives opener audio events/chunks + """ + if self._initial_greeting_emitted: + return + + self._initial_greeting_emitted = True + if not self._bot_starts_first(): + return + + greeting_to_speak = self.conversation.greeting + if self._generated_opener_enabled(): + generated_greeting = await self._generate_runtime_greeting() + if generated_greeting: + greeting_to_speak = generated_greeting + self.conversation.greeting = generated_greeting + + if not greeting_to_speak: + return + + self._start_turn() + self._start_response() + await self._send_event( + ev( + "assistant.response.final", + text=greeting_to_speak, + trackId=self.track_audio_out, + ), + priority=20, + ) + await self.conversation.add_assistant_turn(greeting_to_speak) + + if self._tts_output_enabled(): + # Keep opener text ahead of opener voice start. + await self._speak(greeting_to_speak, audio_event_priority=30) + async def _enqueue_outbound(self, kind: str, payload: Any, priority: int) -> None: """Queue outbound message with priority ordering.""" self._outbound_seq += 1 @@ -1538,7 +1557,7 @@ class DuplexPipeline: trackId=self.track_audio_out, ) }, - priority=10, + priority=30, ) first_audio_sent = True @@ -1568,7 +1587,7 @@ class DuplexPipeline: trackId=self.track_audio_out, ) }, - priority=10, + priority=30, ) first_audio_sent = True await self._speak_sentence( @@ -1762,12 +1781,13 @@ class DuplexPipeline: # Fallback: never block audio delivery on smoothing failure. return pcm_bytes - async def _speak(self, text: str) -> None: + async def _speak(self, text: str, audio_event_priority: int = 10) -> None: """ Synthesize and send speech. Args: text: Text to speak + audio_event_priority: Priority for output.audio.start/end events """ if not self._tts_output_enabled(): return @@ -1788,7 +1808,7 @@ class DuplexPipeline: "output.audio.start", trackId=self.track_audio_out, ) - }, priority=10) + }, priority=audio_event_priority) self._is_bot_speaking = True @@ -1826,7 +1846,7 @@ class DuplexPipeline: "output.audio.end", trackId=self.track_audio_out, ) - }, priority=10) + }, priority=audio_event_priority) except asyncio.CancelledError: logger.info("TTS cancelled") diff --git a/engine/core/session.py b/engine/core/session.py index 4f78545..3b51c48 100644 --- a/engine/core/session.py +++ b/engine/core/session.py @@ -405,6 +405,9 @@ class Session: ) ) + # Emit opener only after frontend has received session.started/config events. + await self.pipeline.emit_initial_greeting() + async def _handle_session_stop(self, reason: Optional[str]) -> None: """Handle session stop.""" if self.ws_state == WsSessionState.STOPPED: