diff --git a/changelog/3804.added.md b/changelog/3804.added.md new file mode 100644 index 000000000..0ad7676c9 --- /dev/null +++ b/changelog/3804.added.md @@ -0,0 +1 @@ +- Added concurrent audio context support: `CartesiaTTSService` can now synthesize the next sentence while the previous one is still playing, by setting `pause_frame_processing=False` and routing each sentence through its own audio context queue. diff --git a/changelog/3804.changed.md b/changelog/3804.changed.md new file mode 100644 index 000000000..9caae491f --- /dev/null +++ b/changelog/3804.changed.md @@ -0,0 +1 @@ +- Audio context management (previously in `AudioContextTTSService`) is now built into `TTSService`. All WebSocket providers (`cartesia`, `elevenlabs`, `asyncai`, `inworld`, `rime`, `gradium`, `resembleai`) now inherit from `WebsocketTTSService` directly. Word-timestamp baseline is set automatically on the first audio chunk of each context instead of requiring each provider to call `start_word_timestamps()` in their receive loop. diff --git a/changelog/3804.deprecated.md b/changelog/3804.deprecated.md new file mode 100644 index 000000000..0babfe7d4 --- /dev/null +++ b/changelog/3804.deprecated.md @@ -0,0 +1,2 @@ +- Deprecated `AudioContextTTSService` and `AudioContextWordTTSService`. Subclass `WebsocketTTSService` directly instead; audio context management is now part of the base `TTSService`. +- Deprecated `WordTTSService`, `WebsocketWordTTSService`, and `InterruptibleWordTTSService`. Word timestamp logic is now always active in `TTSService` and no longer needs to be opted into via a subclass. diff --git a/changelog/3804.removed.md b/changelog/3804.removed.md new file mode 100644 index 000000000..1813b5999 --- /dev/null +++ b/changelog/3804.removed.md @@ -0,0 +1 @@ +- ⚠️ Removed `supports_word_timestamps` parameter from `TTSService.__init__()`. Word timestamp logic is now always active. Remove this argument from any custom subclass `super().__init__()` calls. diff --git a/examples/foundational/07zb-interruptible-inworld.py b/examples/foundational/07zb-interruptible-inworld.py index 0865a4e92..6e2cd7aed 100644 --- a/examples/foundational/07zb-interruptible-inworld.py +++ b/examples/foundational/07zb-interruptible-inworld.py @@ -10,8 +10,7 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMRunFrame, TTSTextFrame -from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -25,7 +24,6 @@ from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.inworld.tts import InworldTTSService, InworldTTSSettings from pipecat.services.openai.llm import OpenAILLMService, OpenAILLMSettings -from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -94,13 +92,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_metrics=True, enable_usage_metrics=True, ), - observers=[ - DebugLogObserver( - frame_types={ - TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE), - } - ), - ], idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) diff --git a/src/pipecat/services/asyncai/tts.py b/src/pipecat/services/asyncai/tts.py index 3b1296752..56a7d0e82 100644 --- a/src/pipecat/services/asyncai/tts.py +++ b/src/pipecat/services/asyncai/tts.py @@ -23,12 +23,11 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.settings import TTSSettings, _warn_deprecated_param -from pipecat.services.tts_service import AudioContextTTSService, TextAggregationMode, TTSService +from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService from pipecat.transcriptions.language import Language, resolve_language from pipecat.utils.tracing.service_decorators import traced_tts @@ -80,7 +79,7 @@ class AsyncAITTSSettings(TTSSettings): pass -class AsyncAITTSService(AudioContextTTSService): +class AsyncAITTSService(WebsocketTTSService): """Async TTS service with WebSocket streaming. Provides text-to-speech using Async's streaming WebSocket API. @@ -183,8 +182,9 @@ class AsyncAITTSService(AudioContextTTSService): aggregate_sentences=aggregate_sentences, text_aggregation_mode=text_aggregation_mode, pause_frame_processing=True, - push_stop_frames=True, sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -340,13 +340,18 @@ class AsyncAITTSService(AudioContextTTSService): return self._websocket raise Exception("Websocket not connected") - async def flush_audio(self): - """Flush any pending audio.""" - context_id = self.get_active_audio_context_id() - if not context_id or not self._websocket: + async def flush_audio(self, context_id: Optional[str] = None): + """Flush any pending audio. + + Args: + context_id: The specific context to flush. If None, falls back to the + currently active context. + """ + flush_id = context_id or self.get_active_audio_context_id() + if not flush_id or not self._websocket: return logger.trace(f"{self}: flushing audio") - msg = self._build_msg(text=" ", context_id=context_id, force=True) + msg = self._build_msg(text=" ", context_id=flush_id, force=True) await self._websocket.send(msg) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): @@ -459,12 +464,6 @@ class AsyncAITTSService(AudioContextTTSService): await self._connect() try: - if not self.has_active_audio_context(): - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) - if not self.audio_context_available(context_id): - await self.create_audio_context(context_id) - msg = self._build_msg(text=text, force=True, context_id=context_id) await self._get_websocket().send(msg) await self.start_tts_usage_metrics(text) @@ -574,6 +573,8 @@ class AsyncAIHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -632,7 +633,7 @@ class AsyncAIHttpTTSService(TTSService): try: voice_config = {"mode": "id", "id": self._settings.voice} - await self.start_ttfb_metrics() + payload = { "model_id": self._settings.model, "transcript": text, @@ -644,7 +645,7 @@ class AsyncAIHttpTTSService(TTSService): }, "language": self._settings.language, } - yield TTSStartedFrame(context_id=context_id) + headers = { "version": self._api_version, "x-api-key": self._api_key, @@ -682,4 +683,3 @@ class AsyncAIHttpTTSService(TTSService): await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/aws/tts.py b/src/pipecat/services/aws/tts.py index 043fc264e..285026bca 100644 --- a/src/pipecat/services/aws/tts.py +++ b/src/pipecat/services/aws/tts.py @@ -22,8 +22,6 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -247,6 +245,8 @@ class AWSPollyTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -329,8 +329,6 @@ class AWSPollyTTSService(TTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - # Construct the parameters dictionary ssml = self._construct_ssml(text) @@ -362,8 +360,6 @@ class AWSPollyTTSService(TTSService): await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - CHUNK_SIZE = self.chunk_size for i in range(0, len(audio_data), CHUNK_SIZE): @@ -373,14 +369,10 @@ class AWSPollyTTSService(TTSService): frame = TTSAudioRawFrame(chunk, self.sample_rate, 1, context_id=context_id) yield frame - yield TTSStoppedFrame(context_id=context_id) except (BotoCoreError, ClientError) as error: error_message = f"AWS Polly TTS error: {str(error)}" yield ErrorFrame(error=error_message) - finally: - yield TTSStoppedFrame(context_id=context_id) - class PollyTTSService(AWSPollyTTSService): """Deprecated alias for AWSPollyTTSService. diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 112459c5a..f710482e9 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -21,7 +21,6 @@ from pipecat.frames.frames import ( InterruptionFrame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection @@ -331,8 +330,8 @@ class AzureTTSService(TTSService, AzureBaseTTSService): text_aggregation_mode=text_aggregation_mode, push_text_frames=False, # We'll push text frames based on word timestamps push_stop_frames=True, + push_start_frame=True, pause_frame_processing=True, - supports_word_timestamps=True, sample_rate=sample_rate, settings=default_settings, **kwargs, @@ -346,7 +345,6 @@ class AzureTTSService(TTSService, AzureBaseTTSService): self._audio_queue = asyncio.Queue() self._word_boundary_queue = asyncio.Queue() self._word_processor_task = None - self._first_chunk = True self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds self._current_sentence_base_offset: float = 0.0 # Base offset for current sentence self._current_sentence_duration: float = 0.0 # Duration from Azure callback @@ -619,7 +617,6 @@ class AzureTTSService(TTSService, AzureBaseTTSService): def _reset_state(self): """Reset TTS state between turns.""" - self._first_chunk = True self._cumulative_audio_offset = 0.0 self._current_sentence_base_offset = 0.0 self._current_sentence_duration = 0.0 @@ -628,7 +625,7 @@ class AzureTTSService(TTSService, AzureBaseTTSService): self._last_timestamp = None self._current_context_id = None - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio data.""" logger.trace(f"{self}: flushing audio") @@ -694,9 +691,6 @@ class AzureTTSService(TTSService, AzureBaseTTSService): return try: - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) - self._first_chunk = True self._current_context_id = context_id # Capture base offset BEFORE starting synthesis to avoid race conditions @@ -719,11 +713,6 @@ class AzureTTSService(TTSService, AzureBaseTTSService): yield ErrorFrame(error=str(chunk)) break - if self._first_chunk: - await self.stop_ttfb_metrics() - await self.start_word_timestamps() - self._first_chunk = False - frame = TTSAudioRawFrame( audio=chunk, sample_rate=self.sample_rate, @@ -833,6 +822,8 @@ class AzureHttpTTSService(TTSService, AzureBaseTTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -887,8 +878,6 @@ class AzureHttpTTSService(TTSService, AzureBaseTTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") - await self.start_ttfb_metrics() - ssml = self._construct_ssml(text) result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, ssml) @@ -896,7 +885,6 @@ class AzureHttpTTSService(TTSService, AzureBaseTTSService): if result.reason == ResultReason.SynthesizingAudioCompleted: await self.start_tts_usage_metrics(text) await self.stop_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) # Azure always sends a 44-byte header. Strip it off. yield TTSAudioRawFrame( audio=result.audio_data[44:], @@ -904,7 +892,6 @@ class AzureHttpTTSService(TTSService, AzureBaseTTSService): num_channels=1, context_id=context_id, ) - yield TTSStoppedFrame(context_id=context_id) elif result.reason == ResultReason.Canceled: cancellation_details = result.cancellation_details logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}") diff --git a/src/pipecat/services/camb/tts.py b/src/pipecat/services/camb/tts.py index ef007181e..b30726fda 100644 --- a/src/pipecat/services/camb/tts.py +++ b/src/pipecat/services/camb/tts.py @@ -29,8 +29,6 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -271,6 +269,8 @@ class CambTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -332,8 +332,6 @@ class CambTTSService(TTSService): text = text[:3000] try: - await self.start_ttfb_metrics() - # Build SDK parameters tts_kwargs: Dict[str, Any] = { "text": text, @@ -348,7 +346,6 @@ class CambTTSService(TTSService): tts_kwargs["user_instructions"] = self._settings.user_instructions await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) assert self._client is not None, "Camb.ai TTS service not initialized" @@ -384,5 +381,3 @@ class CambTTSService(TTSService): except Exception as e: yield ErrorFrame(error=f"Camb.ai TTS error: {e}") - finally: - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 166aa70af..3f708d06f 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -27,7 +27,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param -from pipecat.services.tts_service import AudioContextTTSService, TextAggregationMode, TTSService +from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService from pipecat.transcriptions.language import Language, resolve_language from pipecat.utils.text.base_text_aggregator import BaseTextAggregator from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator @@ -203,7 +203,7 @@ class CartesiaTTSSettings(TTSSettings): pronunciation_dict_id: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) -class CartesiaTTSService(AudioContextTTSService): +class CartesiaTTSService(WebsocketTTSService): """Cartesia TTS service with WebSocket streaming and word timestamps. Provides text-to-speech using Cartesia's streaming WebSocket API. @@ -334,9 +334,9 @@ class CartesiaTTSService(AudioContextTTSService): text_aggregation_mode=text_aggregation_mode, aggregate_sentences=aggregate_sentences, push_text_frames=False, - pause_frame_processing=True, - supports_word_timestamps=True, + pause_frame_processing=False, sample_rate=sample_rate, + push_start_frame=True, text_aggregator=text_aggregator, settings=default_settings, **kwargs, @@ -452,7 +452,11 @@ class CartesiaTTSService(AudioContextTTSService): return list(zip(words, starts)) def _build_msg( - self, text: str = "", continue_transcript: bool = True, add_timestamps: bool = True + self, + text: str = "", + continue_transcript: bool = True, + add_timestamps: bool = True, + context_id: str = "", ): voice_config = {} voice_config["mode"] = "id" @@ -461,7 +465,7 @@ class CartesiaTTSService(AudioContextTTSService): msg = { "transcript": text, "continue": continue_transcript, - "context_id": self.get_active_audio_context_id(), + "context_id": context_id, "model_id": self._settings.model, "voice": voice_config, "output_format": { @@ -580,15 +584,19 @@ class CartesiaTTSService(AudioContextTTSService): """ pass - async def flush_audio(self): - """Flush any pending audio and finalize the current context.""" - context_id = self.get_active_audio_context_id() - if not context_id or not self._websocket: + async def flush_audio(self, context_id: Optional[str] = None): + """Flush any pending audio and finalize the current context. + + Args: + context_id: The specific context to flush. If None, falls back to the + currently active context. + """ + flush_id = context_id or self.get_active_audio_context_id() + if not flush_id or not self._websocket: return logger.trace(f"{self}: flushing audio") - msg = self._build_msg(text="", continue_transcript=False) + msg = self._build_msg(text="", continue_transcript=False, context_id=flush_id) await self._websocket.send(msg) - self.reset_active_audio_context() async def _process_messages(self): async for message in self._get_websocket(): @@ -607,8 +615,6 @@ class CartesiaTTSService(AudioContextTTSService): ) await self.add_word_timestamps(processed_timestamps, ctx_id) elif msg["type"] == "chunk": - await self.stop_ttfb_metrics() - await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self.sample_rate, @@ -652,12 +658,7 @@ class CartesiaTTSService(AudioContextTTSService): if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() - if not self.has_active_audio_context(): - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) - await self.create_audio_context(context_id) - - msg = self._build_msg(text=text) + msg = self._build_msg(text=text, context_id=context_id) try: await self._get_websocket().send(msg) @@ -777,6 +778,8 @@ class CartesiaHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -863,8 +866,6 @@ class CartesiaHttpTTSService(TTSService): try: voice_config = {"mode": "id", "id": self._settings.voice} - await self.start_ttfb_metrics() - output_format = { "container": self._output_container, "encoding": self._output_encoding, @@ -889,8 +890,6 @@ class CartesiaHttpTTSService(TTSService): if self._settings.pronunciation_dict_id: payload["pronunciation_dict_id"] = self._settings.pronunciation_dict_id - yield TTSStartedFrame(context_id=context_id) - headers = { "Cartesia-Version": self._cartesia_version, "X-API-Key": self._api_key, @@ -922,4 +921,3 @@ class CartesiaHttpTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/deepgram/sagemaker/tts.py b/src/pipecat/services/deepgram/sagemaker/tts.py index 3693178a1..9e8c30ad7 100644 --- a/src/pipecat/services/deepgram/sagemaker/tts.py +++ b/src/pipecat/services/deepgram/sagemaker/tts.py @@ -328,7 +328,7 @@ class DeepgramSageMakerTTSService(TTSService): except Exception as e: logger.error(f"{self} error sending Clear message: {e}") - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis by sending Flush command. This should be called when the LLM finishes a complete response to force @@ -355,12 +355,12 @@ class DeepgramSageMakerTTSService(TTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - if not self._ttfb_started: - await self.start_ttfb_metrics() - self._ttfb_started = True - await self.start_tts_usage_metrics(text) - - yield TTSStartedFrame(context_id=context_id) + if not self.audio_context_available(context_id): + await self.create_audio_context(context_id) + if not self._ttfb_started: + await self.start_ttfb_metrics() + self._ttfb_started = True + yield TTSStartedFrame(context_id=context_id) self._context_id = context_id await self._client.send_json({"type": "Speak", "text": text}) diff --git a/src/pipecat/services/deepgram/tts.py b/src/pipecat/services/deepgram/tts.py index 95db998e2..6c8685bee 100644 --- a/src/pipecat/services/deepgram/tts.py +++ b/src/pipecat/services/deepgram/tts.py @@ -26,8 +26,6 @@ from pipecat.frames.frames import ( LLMFullResponseEndFrame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.settings import TTSSettings, _warn_deprecated_param @@ -120,6 +118,7 @@ class DeepgramTTSService(WebsocketTTSService): sample_rate=sample_rate, pause_frame_processing=True, push_stop_frames=True, + push_start_frame=True, append_trailing_space=True, settings=default_settings, **kwargs, @@ -130,7 +129,6 @@ class DeepgramTTSService(WebsocketTTSService): self._encoding = encoding self._receive_task = None - self._context_id: Optional[str] = None def can_generate_metrics(self) -> bool: """Check if the service can generate metrics. @@ -267,7 +265,6 @@ class DeepgramTTSService(WebsocketTTSService): logger.error(f"{self} exception: {e}") await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: - self._context_id = None self._websocket = None await self._call_event_handler("on_disconnected") @@ -299,7 +296,9 @@ class DeepgramTTSService(WebsocketTTSService): if isinstance(message, bytes): # Binary message contains audio data await self.stop_ttfb_metrics() - frame = TTSAudioRawFrame(message, self.sample_rate, 1, context_id=self._context_id) + frame = TTSAudioRawFrame( + message, self.sample_rate, 1, context_id=self.get_active_audio_context_id() + ) await self.push_frame(frame) elif isinstance(message, str): # Text message contains metadata or control messages @@ -326,7 +325,7 @@ class DeepgramTTSService(WebsocketTTSService): except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis by sending Flush command. This should be called when the LLM finishes a complete response to force @@ -357,13 +356,8 @@ class DeepgramTTSService(WebsocketTTSService): if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() - await self.start_ttfb_metrics() await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - # Store context_id for use in _receive_messages - self._context_id = context_id - # Send text message to Deepgram # Note: We don't send Flush here - that should only be sent when the # LLM finishes a complete response via flush_audio() @@ -435,6 +429,8 @@ class DeepgramHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -492,7 +488,6 @@ class DeepgramHttpTTSService(TTSService): raise Exception(f"HTTP {response.status}: {error_text}") await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) CHUNK_SIZE = self.chunk_size @@ -510,7 +505,5 @@ class DeepgramHttpTTSService(TTSService): context_id=context_id, ) - yield TTSStoppedFrame(context_id=context_id) - except Exception as e: yield ErrorFrame(f"Error getting audio: {str(e)}") diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index cfb08eb6d..de930d1f2 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -46,9 +46,9 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import ( - AudioContextTTSService, TextAggregationMode, TTSService, + WebsocketTTSService, ) from pipecat.transcriptions.language import Language, resolve_language from pipecat.utils.tracing.service_decorators import traced_tts @@ -308,7 +308,7 @@ def calculate_word_times( return (word_times, new_partial_word, new_partial_word_start_time) -class ElevenLabsTTSService(AudioContextTTSService): +class ElevenLabsTTSService(WebsocketTTSService): """ElevenLabs WebSocket-based TTS service with word timestamps. Provides real-time text-to-speech using ElevenLabs' WebSocket streaming API. @@ -479,7 +479,6 @@ class ElevenLabsTTSService(AudioContextTTSService): push_text_frames=False, push_stop_frames=True, pause_frame_processing=True, - supports_word_timestamps=True, sample_rate=sample_rate, settings=default_settings, **kwargs, @@ -559,20 +558,15 @@ class ElevenLabsTTSService(AudioContextTTSService): ) await self._disconnect() await self._connect() - elif voice_settings_changed and self.has_active_audio_context(): + elif voice_settings_changed: logger.debug( f"Voice settings changed ({changed.keys() & ElevenLabsTTSSettings.VOICE_SETTINGS_FIELDS}), " f"closing current context to apply changes" ) - context_id = self.get_active_audio_context_id() - try: - if self._websocket: - await self._websocket.send( - json.dumps({"context_id": context_id, "close_context": True}) - ) - except Exception as e: - await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) - self.reset_active_audio_context() + audio_contexts = self.get_audio_contexts() + if audio_contexts: + for ctx_id in audio_contexts: + await self._close_context(ctx_id) if not url_changed: # Reconnect applies all settings; only warn about fields not handled @@ -610,13 +604,18 @@ class ElevenLabsTTSService(AudioContextTTSService): await super().cancel(frame) await self._disconnect() - async def flush_audio(self): - """Flush any pending audio and finalize the current context.""" - context_id = self.get_active_audio_context_id() - if not context_id or not self._websocket: + async def flush_audio(self, context_id: Optional[str] = None): + """Flush any pending audio and finalize the current context. + + Args: + context_id: The specific context to flush. If None, falls back to the + currently active context. + """ + flush_id = context_id or self.get_active_audio_context_id() + if not flush_id or not self._websocket: return logger.trace(f"{self}: flushing audio") - msg = {"context_id": context_id, "flush": True} + msg = {"context_id": flush_id, "flush": True} await self._websocket.send(json.dumps(msg)) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): @@ -703,9 +702,7 @@ class ElevenLabsTTSService(AudioContextTTSService): if self._websocket: logger.debug("Disconnecting from ElevenLabs") - # Close all contexts and the socket - if self.has_active_audio_context(): - await self._websocket.send(json.dumps({"close_socket": True})) + await self._websocket.send(json.dumps({"close_socket": True})) await self._websocket.close() logger.debug("Disconnected from ElevenLabs") except Exception as e: @@ -737,6 +734,7 @@ class ElevenLabsTTSService(AudioContextTTSService): ) except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) + self._cumulative_time = 0.0 self._partial_word = "" self._partial_word_start_time = 0.0 @@ -782,9 +780,6 @@ class ElevenLabsTTSService(AudioContextTTSService): continue if msg.get("audio"): - await self.stop_ttfb_metrics() - await self.start_word_timestamps() - audio = base64.b64decode(msg["audio"]) frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=received_ctx_id) await self.append_to_audio_context(received_ctx_id, frame) @@ -845,9 +840,8 @@ class ElevenLabsTTSService(AudioContextTTSService): logger.warning(f"{self} keepalive error: {e}") break - async def _send_text(self, text: str): + async def _send_text(self, text: str, context_id: str): """Send text to the WebSocket for synthesis.""" - context_id = self.get_active_audio_context_id() if self._websocket and context_id: msg = {"text": text, "context_id": context_id} await self._websocket.send(json.dumps(msg)) @@ -870,16 +864,14 @@ class ElevenLabsTTSService(AudioContextTTSService): await self._connect() try: - if not self.has_active_audio_context(): + if not self.audio_context_available(context_id): + await self.create_audio_context(context_id) await self.start_ttfb_metrics() yield TTSStartedFrame(context_id=context_id) self._cumulative_time = 0 self._partial_word = "" self._partial_word_start_time = 0.0 - if not self.audio_context_available(context_id): - await self.create_audio_context(context_id) - # Initialize context with voice settings and pronunciation dictionaries msg = {"text": " ", "context_id": context_id} if self._voice_settings: @@ -892,7 +884,7 @@ class ElevenLabsTTSService(AudioContextTTSService): await self._websocket.send(json.dumps(msg)) logger.trace(f"Created new context {context_id}") - await self._send_text(text) + await self._send_text(text, context_id) await self.start_tts_usage_metrics(text) except Exception as e: yield TTSStoppedFrame(context_id=context_id) @@ -1046,7 +1038,7 @@ class ElevenLabsHttpTTSService(TTSService): aggregate_sentences=aggregate_sentences, push_text_frames=False, push_stop_frames=True, - supports_word_timestamps=True, + push_start_frame=True, sample_rate=sample_rate, settings=default_settings, **kwargs, @@ -1266,8 +1258,6 @@ class ElevenLabsHttpTTSService(TTSService): params["optimize_streaming_latency"] = self._settings.optimize_streaming_latency try: - await self.start_ttfb_metrics() - async with self._session.post( url, json=payload, headers=headers, params=params ) as response: @@ -1278,10 +1268,6 @@ class ElevenLabsHttpTTSService(TTSService): await self.start_tts_usage_metrics(text) - # Start TTS sequence - await self.start_word_timestamps() - yield TTSStartedFrame(context_id=context_id) - # Track the duration of this utterance based on the last character's end time utterance_duration = 0 async for line in response.content: @@ -1347,4 +1333,3 @@ class ElevenLabsHttpTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics() - # Let the parent class handle TTSStoppedFrame diff --git a/src/pipecat/services/fish/tts.py b/src/pipecat/services/fish/tts.py index 9ea749546..64c3bccd9 100644 --- a/src/pipecat/services/fish/tts.py +++ b/src/pipecat/services/fish/tts.py @@ -209,6 +209,7 @@ class FishAudioTTSService(InterruptibleTTSService): super().__init__( push_stop_frames=True, + push_start_frame=True, pause_frame_processing=True, sample_rate=sample_rate, settings=default_settings, @@ -219,7 +220,6 @@ class FishAudioTTSService(InterruptibleTTSService): self._base_url = "wss://api.fish.audio/v1/tts/live" self._websocket = None self._receive_task = None - self._request_id = None # Init-only audio format config (not runtime-updatable). self._fish_sample_rate = 0 # Set in start() @@ -341,11 +341,10 @@ class FishAudioTTSService(InterruptibleTTSService): except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) finally: - self._request_id = None self._websocket = None await self._call_event_handler("on_disconnected") - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any buffered audio by sending a flush event to Fish Audio.""" logger.trace(f"{self}: Flushing audio buffers") if not self._websocket or self._websocket.state is State.CLOSED: @@ -361,7 +360,6 @@ class FishAudioTTSService(InterruptibleTTSService): async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): await super()._handle_interruption(frame, direction) await self.stop_all_metrics() - self._request_id = None async def _receive_messages(self): async for message in self._get_websocket(): @@ -398,12 +396,6 @@ class FishAudioTTSService(InterruptibleTTSService): if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() - if not self._request_id: - await self.start_ttfb_metrics() - await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - self._request_id = str(uuid.uuid4()) - # Send the text text_message = { "event": "text", diff --git a/src/pipecat/services/google/tts.py b/src/pipecat/services/google/tts.py index 7cf2ee996..071d731b1 100644 --- a/src/pipecat/services/google/tts.py +++ b/src/pipecat/services/google/tts.py @@ -34,8 +34,6 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import ( NOT_GIVEN, @@ -655,6 +653,8 @@ class GoogleHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -803,8 +803,6 @@ class GoogleHttpTTSService(TTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - # Check if the voice is a Chirp voice (including Chirp 3) or Journey voice is_chirp_voice = "chirp" in self._settings.voice.lower() is_journey_voice = "journey" in self._settings.voice.lower() @@ -840,8 +838,6 @@ class GoogleHttpTTSService(TTSService): await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - # Skip the first 44 bytes to remove the WAV header audio_content = response.audio_content[44:] @@ -855,8 +851,6 @@ class GoogleHttpTTSService(TTSService): frame = TTSAudioRawFrame(chunk, self.sample_rate, 1, context_id=context_id) yield frame - yield TTSStoppedFrame(context_id=context_id) - except Exception as e: error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) @@ -967,8 +961,6 @@ class GoogleBaseTTSService(TTSService): streaming_responses = await self._client.streaming_synthesize(request_generator()) await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - audio_buffer = b"" first_chunk_for_ttfb = False @@ -992,8 +984,6 @@ class GoogleBaseTTSService(TTSService): if audio_buffer: yield TTSAudioRawFrame(audio_buffer, self.sample_rate, 1, context_id=context_id) - yield TTSStoppedFrame(context_id=context_id) - class GoogleTTSService(GoogleBaseTTSService): """Google Cloud Text-to-Speech streaming service. @@ -1096,6 +1086,8 @@ class GoogleTTSService(GoogleBaseTTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -1135,8 +1127,6 @@ class GoogleTTSService(GoogleBaseTTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - # Build voice selection params if self._voice_cloning_key: voice_clone_params = texttospeech_v1.VoiceCloneParams( @@ -1352,6 +1342,8 @@ class GeminiTTSService(GoogleBaseTTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -1414,8 +1406,6 @@ class GeminiTTSService(GoogleBaseTTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - # Build voice selection params if self._settings.multi_speaker and self._settings.speaker_configs: # Multi-speaker mode diff --git a/src/pipecat/services/gradium/tts.py b/src/pipecat/services/gradium/tts.py index 2ade14367..745a77f56 100644 --- a/src/pipecat/services/gradium/tts.py +++ b/src/pipecat/services/gradium/tts.py @@ -19,11 +19,10 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, TTSStoppedFrame, ) from pipecat.services.settings import TTSSettings, _warn_deprecated_param -from pipecat.services.tts_service import AudioContextTTSService +from pipecat.services.tts_service import WebsocketTTSService from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -45,7 +44,7 @@ class GradiumTTSSettings(TTSSettings): pass -class GradiumTTSService(AudioContextTTSService): +class GradiumTTSService(WebsocketTTSService): """Text-to-Speech service using Gradium's websocket API.""" _settings: GradiumTTSSettings @@ -125,9 +124,9 @@ class GradiumTTSService(AudioContextTTSService): super().__init__( push_stop_frames=True, + push_start_frame=True, push_text_frames=False, pause_frame_processing=True, - supports_word_timestamps=True, sample_rate=SAMPLE_RATE, settings=default_settings, **kwargs, @@ -166,12 +165,9 @@ class GradiumTTSService(AudioContextTTSService): self._warn_unhandled_updated_settings(changed) return changed - def _build_msg(self, text: str = "") -> dict: + def _build_msg(self, text: str = "", context_id: str = "") -> dict: """Build JSON message for Gradium API.""" - msg = {"text": text, "type": "text"} - context_id = self.get_active_audio_context_id() - if context_id: - msg["client_req_id"] = context_id + msg = {"text": text, "type": "text", "client_req_id": context_id} return msg async def start(self, frame: StartFrame): @@ -280,15 +276,14 @@ class GradiumTTSService(AudioContextTTSService): return self._websocket raise Exception("Websocket not connected") - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis.""" - context_id = self.get_active_audio_context_id() - if not context_id or not self._websocket: + flush_id = context_id or self.get_active_audio_context_id() + if not flush_id or not self._websocket: return try: - msg = {"type": "end_of_stream", "client_req_id": context_id} + msg = {"type": "end_of_stream", "client_req_id": flush_id} await self._websocket.send(json.dumps(msg)) - self.reset_active_audio_context() except ConnectionClosedOK: logger.debug(f"{self}: connection closed normally during flush") except Exception as e: @@ -326,8 +321,6 @@ class GradiumTTSService(AudioContextTTSService): if msg["type"] == "audio": if not ctx_id or not self.audio_context_available(ctx_id): continue - await self.stop_ttfb_metrics() - await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["audio"]), sample_rate=self.sample_rate, @@ -369,12 +362,7 @@ class GradiumTTSService(AudioContextTTSService): await self._connect() try: - if not self.has_active_audio_context(): - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) - await self.create_audio_context(context_id) - - msg = self._build_msg(text=text) + msg = self._build_msg(text=text, context_id=context_id) await self._get_websocket().send(json.dumps(msg)) await self.start_tts_usage_metrics(text) except Exception as e: diff --git a/src/pipecat/services/groq/tts.py b/src/pipecat/services/groq/tts.py index 18b623fc8..139816834 100644 --- a/src/pipecat/services/groq/tts.py +++ b/src/pipecat/services/groq/tts.py @@ -18,8 +18,6 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -140,6 +138,8 @@ class GroqTTSService(TTSService): super().__init__( pause_frame_processing=True, + push_start_frame=True, + push_stop_frames=True, sample_rate=sample_rate, settings=default_settings, **kwargs, @@ -171,9 +171,6 @@ class GroqTTSService(TTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") measuring_ttfb = True - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) - try: response = await self._client.audio.speech.create( model=self._settings.model, @@ -198,5 +195,3 @@ class GroqTTSService(TTSService): yield TTSAudioRawFrame(bytes, frame_rate, channels, context_id=context_id) except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") - - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/hume/tts.py b/src/pipecat/services/hume/tts.py index 052d1cd0a..ff5eb7522 100644 --- a/src/pipecat/services/hume/tts.py +++ b/src/pipecat/services/hume/tts.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( InterruptionFrame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection @@ -166,7 +165,7 @@ class HumeTTSService(TTSService): sample_rate=sample_rate, push_text_frames=False, push_stop_frames=True, - supports_word_timestamps=True, + push_start_frame=True, settings=default_settings, **kwargs, ) @@ -181,7 +180,6 @@ class HumeTTSService(TTSService): # Track cumulative time for word timestamps across utterances self._cumulative_time = 0.0 - self._started = False def can_generate_metrics(self) -> bool: """Can generate metrics. @@ -203,7 +201,6 @@ class HumeTTSService(TTSService): def _reset_state(self): """Reset internal state variables.""" self._cumulative_time = 0.0 - self._started = False async def stop(self, frame: EndFrame) -> None: """Stop the service and cleanup resources. @@ -310,15 +307,8 @@ class HumeTTSService(TTSService): # Request raw PCM chunks in the streaming JSON pcm_fmt = FormatPcm(type="pcm") - await self.start_ttfb_metrics() await self.start_tts_usage_metrics(text) - # Start TTS sequence if not already started - if not self._started: - await self.start_word_timestamps() - yield TTSStartedFrame(context_id=context_id) - self._started = True - try: # Instant mode is always enabled here (not user-configurable) # Hume emits mono PCM at 48 kHz; downstream can resample if needed. @@ -395,4 +385,3 @@ class HumeTTSService(TTSService): finally: # Ensure TTFB timer is stopped even on early failures await self.stop_ttfb_metrics() - # Let the parent class handle TTSStoppedFrame via push_stop_frames diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index a1421b2ab..d602efb52 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -62,7 +62,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.tts_service import AudioContextTTSService, TextAggregationMode, TTSService +from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService from pipecat.utils.tracing.service_decorators import traced_tts @@ -212,7 +212,7 @@ class InworldHttpTTSService(TTSService): super().__init__( push_text_frames=False, push_stop_frames=True, - supports_word_timestamps=True, + push_start_frame=True, sample_rate=sample_rate, settings=default_settings, **kwargs, @@ -359,11 +359,6 @@ class InworldHttpTTSService(TTSService): } try: - await self.start_ttfb_metrics() - - await self.start_word_timestamps() - yield TTSStartedFrame(context_id=context_id) - async with self._session.post( self._base_url, json=payload, headers=headers ) as response: @@ -514,7 +509,7 @@ class InworldHttpTTSService(TTSService): ) -class InworldTTSService(AudioContextTTSService): +class InworldTTSService(WebsocketTTSService): """Inworld AI WebSocket-based TTS service. Uses bidirectional WebSocket for lower latency streaming. Supports multiple @@ -650,7 +645,6 @@ class InworldTTSService(AudioContextTTSService): push_text_frames=False, push_stop_frames=True, pause_frame_processing=True, - supports_word_timestamps=True, sample_rate=sample_rate, aggregate_sentences=aggregate_sentences, text_aggregation_mode=text_aggregation_mode, @@ -719,17 +713,17 @@ class InworldTTSService(AudioContextTTSService): await super().cancel(frame) await self._disconnect() - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio without closing the context. This triggers synthesis of all accumulated text in the buffer while keeping the context open for subsequent text. The context is only closed on interruption, disconnect, or end of session. """ - context_id = self.get_active_audio_context_id() - if context_id and self._websocket: - logger.trace(f"Flushing audio for context {context_id}") - await self._send_flush(context_id) + flush_id = context_id or self.get_active_audio_context_id() + if flush_id and self._websocket: + logger.trace(f"Flushing audio for context {flush_id}") + await self._send_flush(flush_id) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a frame and handle state changes. @@ -899,12 +893,10 @@ class InworldTTSService(AudioContextTTSService): if self._websocket: logger.debug("Disconnecting from Inworld WebSocket TTS") - context_id = self.get_active_audio_context_id() - if context_id: - try: - await self._send_close_context(context_id) - except Exception: - pass + audio_contexts = self.get_audio_contexts() + if audio_contexts: + for ctx_id in audio_contexts: + await self._send_close_context(ctx_id) await self._websocket.close() logger.debug("Disconnected from Inworld WebSocket TTS") except Exception as e: @@ -934,10 +926,7 @@ class InworldTTSService(AudioContextTTSService): for k in ["contextCreated", "audioChunk", "flushCompleted", "contextClosed"] if k in result ] - logger.debug( - f"{self}: Received message types={msg_types}, ctx_id={ctx_id}, " - f"current_ctx={self.get_active_audio_context_id()}, available={self.audio_context_available(ctx_id) if ctx_id else 'N/A'}" - ) + logger.debug(f"{self}: Received message types={msg_types}, ctx_id={ctx_id}") # Check for errors status = result.get("status", {}) @@ -948,9 +937,7 @@ class InworldTTSService(AudioContextTTSService): # Handle "Context not found" error (code 5) # This can happen when a keepalive message is sent but no context is available. if error_code == 5 and "not found" in error_msg.lower(): - logger.debug( - f"{self}: Context {ctx_id or self.get_active_audio_context_id()} not found." - ) + logger.debug(f"{self}: Context {ctx_id} not found.") continue # For other errors, push error frame @@ -961,17 +948,10 @@ class InworldTTSService(AudioContextTTSService): await self.push_error(error_msg=str(msg["error"])) continue - # Check if this message belongs to an available context. - # If the context isn't available but matches our current context ID, - # recreate it (handles race conditions during interruption recovery). + # If the context isn't available recreate it (handles race conditions during interruption recovery). if ctx_id and not self.audio_context_available(ctx_id): - if self.get_active_audio_context_id() == ctx_id: - logger.trace(f"{self}: Recreating audio context for current context: {ctx_id}") - await self.create_audio_context(ctx_id) - else: - # This is a message from an old/closed context - skip it - logger.trace(f"{self}: Skipping message from unavailable context: {ctx_id}") - continue + logger.trace(f"{self}: Recreating audio context for current context: {ctx_id}") + await self.create_audio_context(ctx_id) # Process audio chunk audio_chunk = result.get("audioChunk", {}) @@ -979,8 +959,6 @@ class InworldTTSService(AudioContextTTSService): if audio_b64: logger.trace(f"{self}: Processing audio chunk for context {ctx_id}") - await self.stop_ttfb_metrics() - await self.start_word_timestamps() audio = base64.b64decode(audio_b64) if len(audio) > 44 and audio.startswith(b"RIFF"): audio = audio[44:] @@ -1012,12 +990,8 @@ class InworldTTSService(AudioContextTTSService): if "contextClosed" in result: logger.trace(f"{self}: Context closed on server: {ctx_id}") await self.stop_ttfb_metrics() - # Only reset if this is our current context - if ctx_id == self.get_active_audio_context_id(): - self.reset_active_audio_context() - if ctx_id and self.audio_context_available(ctx_id): - await self.remove_audio_context(ctx_id) await self.add_word_timestamps([("TTSStoppedFrame", 0), ("Reset", 0)], ctx_id) + await self.remove_audio_context(ctx_id) async def _keepalive_task_handler(self): """Send periodic keepalive messages to maintain WebSocket connection.""" @@ -1128,10 +1102,10 @@ class InworldTTSService(AudioContextTTSService): await self._connect() try: - if not self.has_active_audio_context(): + if not self.audio_context_available(context_id): + await self.create_audio_context(context_id) await self.start_ttfb_metrics() yield TTSStartedFrame(context_id=context_id) - await self.create_audio_context(context_id) await self._send_context(context_id) await self._send_text(context_id, text) diff --git a/src/pipecat/services/kokoro/tts.py b/src/pipecat/services/kokoro/tts.py index 0646923f3..e69ef7a67 100644 --- a/src/pipecat/services/kokoro/tts.py +++ b/src/pipecat/services/kokoro/tts.py @@ -20,8 +20,6 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import TTSSettings, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -170,6 +168,8 @@ class KokoroTTSService(TTSService): default_settings.apply_update(settings) super().__init__( + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -212,9 +212,7 @@ class KokoroTTSService(TTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) stream = self._kokoro.create_stream( text, voice=self._settings.voice, lang=self._settings.language, speed=1.0 @@ -238,4 +236,3 @@ class KokoroTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/lmnt/tts.py b/src/pipecat/services/lmnt/tts.py index 9ee6d8d60..c8bfcaf55 100644 --- a/src/pipecat/services/lmnt/tts.py +++ b/src/pipecat/services/lmnt/tts.py @@ -143,6 +143,7 @@ class LmntTTSService(InterruptibleTTSService): super().__init__( push_stop_frames=True, + push_start_frame=True, pause_frame_processing=True, sample_rate=sample_rate, settings=default_settings, @@ -152,7 +153,6 @@ class LmntTTSService(InterruptibleTTSService): self._api_key = api_key self._output_format = "raw" self._receive_task = None - self._context_id: Optional[str] = None def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. @@ -289,7 +289,6 @@ class LmntTTSService(InterruptibleTTSService): except Exception as e: await self.push_error(error_msg=f"Error disconnecting from LMNT: {e}", exception=e) finally: - self._context_id = None self._websocket = None await self._call_event_handler("on_disconnected") @@ -299,7 +298,7 @@ class LmntTTSService(InterruptibleTTSService): return self._websocket raise Exception("Websocket not connected") - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis.""" if not self._websocket or self._websocket.state is State.CLOSED: return @@ -315,7 +314,7 @@ class LmntTTSService(InterruptibleTTSService): audio=message, sample_rate=self.sample_rate, num_channels=1, - context_id=self._context_id, + context_id=self.get_active_audio_context_id(), ) await self.push_frame(frame) else: @@ -347,11 +346,6 @@ class LmntTTSService(InterruptibleTTSService): await self._connect() try: - await self.start_ttfb_metrics() - # Store context_id for use in _receive_messages - self._context_id = context_id - yield TTSStartedFrame(context_id=context_id) - # Send text to LMNT await self._get_websocket().send(json.dumps({"text": text})) # Force synthesis diff --git a/src/pipecat/services/minimax/tts.py b/src/pipecat/services/minimax/tts.py index efe8c1fd9..33e0669e1 100644 --- a/src/pipecat/services/minimax/tts.py +++ b/src/pipecat/services/minimax/tts.py @@ -23,8 +23,6 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -305,6 +303,8 @@ class MiniMaxHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -402,8 +402,6 @@ class MiniMaxHttpTTSService(TTSService): payload["language_boost"] = self._settings.language_boost try: - await self.start_ttfb_metrics() - async with self._session.post( self._base_url, headers=headers, json=payload ) as response: @@ -413,7 +411,6 @@ class MiniMaxHttpTTSService(TTSService): return await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) # Process the streaming response buffer = bytearray() @@ -490,4 +487,3 @@ class MiniMaxHttpTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}", exception=e) finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index da14ec2e5..c1916414e 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -180,6 +180,7 @@ class NeuphonicTTSService(InterruptibleTTSService): aggregate_sentences=aggregate_sentences, text_aggregation_mode=text_aggregation_mode, push_stop_frames=True, + push_start_frame=True, stop_frame_timeout_s=2.0, sample_rate=sample_rate, settings=default_settings, @@ -188,12 +189,8 @@ class NeuphonicTTSService(InterruptibleTTSService): self._api_key = api_key self._url = url - - self._cumulative_time = 0 - self._receive_task = None self._keepalive_task = None - self._context_id: Optional[str] = None self._encoding = encoding self._sampling_rate = sample_rate @@ -252,7 +249,7 @@ class NeuphonicTTSService(InterruptibleTTSService): await super().cancel(frame) await self._disconnect() - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis by sending stop command.""" if self._websocket: msg = {"text": ""} @@ -358,7 +355,6 @@ class NeuphonicTTSService(InterruptibleTTSService): except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) finally: - self._context_id = None self._websocket = None await self._call_event_handler("on_disconnected") @@ -372,7 +368,7 @@ class NeuphonicTTSService(InterruptibleTTSService): audio = base64.b64decode(msg["data"]["audio"]) frame = TTSAudioRawFrame( - audio, self.sample_rate, 1, context_id=self._context_id + audio, self.sample_rate, 1, context_id=self.get_active_audio_context_id() ) await self.push_frame(frame) @@ -415,12 +411,6 @@ class NeuphonicTTSService(InterruptibleTTSService): await self._connect() try: - await self.start_ttfb_metrics() - # Store context_id for use in _receive_messages - self._context_id = context_id - yield TTSStartedFrame(context_id=context_id) - self._cumulative_time = 0 - await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: @@ -523,6 +513,8 @@ class NeuphonicHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_stop_frames=True, + push_start_frame=True, settings=default_settings, **kwargs, ) @@ -559,7 +551,7 @@ class NeuphonicHttpTTSService(TTSService): """ await super().start(frame) - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis. Note: @@ -633,8 +625,6 @@ class NeuphonicHttpTTSService(TTSService): payload["voice_id"] = self._settings.voice try: - await self.start_ttfb_metrics() - async with self._session.post(url, json=payload, headers=headers) as response: if response.status != 200: error_text = await response.text() @@ -643,7 +633,6 @@ class NeuphonicHttpTTSService(TTSService): return await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) # Process SSE stream line by line async for line in response.content: @@ -681,4 +670,3 @@ class NeuphonicHttpTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/nvidia/tts.py b/src/pipecat/services/nvidia/tts.py index 6e3298a75..7f7638f5f 100644 --- a/src/pipecat/services/nvidia/tts.py +++ b/src/pipecat/services/nvidia/tts.py @@ -28,8 +28,6 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -145,6 +143,8 @@ class NvidiaTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -271,9 +271,6 @@ class NvidiaTTSService(TTSService): assert self._service is not None, "TTS service not initialized" assert self._config is not None, "Synthesis configuration not created" - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) - logger.debug(f"{self}: Generating TTS [{text}]") responses = await asyncio.to_thread(read_audio_responses) @@ -289,7 +286,6 @@ class NvidiaTTSService(TTSService): yield frame await self.start_tts_usage_metrics(text) - yield TTSStoppedFrame(context_id=context_id) except asyncio.TimeoutError as e: logger.error(f"{self} timeout waiting for audio response") yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/openai/tts.py b/src/pipecat/services/openai/tts.py index b4933ae9f..a50129349 100644 --- a/src/pipecat/services/openai/tts.py +++ b/src/pipecat/services/openai/tts.py @@ -22,8 +22,6 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -194,6 +192,8 @@ class OpenAITTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -234,8 +234,6 @@ class OpenAITTSService(TTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - # Setup API parameters create_params = { "input": text, @@ -267,12 +265,10 @@ class OpenAITTSService(TTSService): CHUNK_SIZE = self.chunk_size - yield TTSStartedFrame(context_id=context_id) async for chunk in r.iter_bytes(CHUNK_SIZE): if len(chunk) > 0: await self.stop_ttfb_metrics() frame = TTSAudioRawFrame(chunk, self.sample_rate, 1, context_id=context_id) yield frame - yield TTSStoppedFrame(context_id=context_id) except BadRequestError as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") diff --git a/src/pipecat/services/piper/tts.py b/src/pipecat/services/piper/tts.py index 46bf98cd1..f0343947b 100644 --- a/src/pipecat/services/piper/tts.py +++ b/src/pipecat/services/piper/tts.py @@ -17,8 +17,6 @@ from loguru import logger from pipecat.frames.frames import ( ErrorFrame, Frame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import TTSSettings, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -91,6 +89,8 @@ class PiperTTSService(TTSService): default_settings.apply_update(settings) super().__init__( + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -159,12 +159,8 @@ class PiperTTSService(TTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - async for frame in self._stream_audio_frames_from_iterator( async_iterator(self._voice.synthesize(text)), in_sample_rate=self._voice.config.sample_rate, @@ -178,7 +174,6 @@ class PiperTTSService(TTSService): finally: logger.debug(f"{self}: Finished TTS [{text}]") await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) # This assumes a running TTS service running: @@ -244,6 +239,8 @@ class PiperHttpTTSService(TTSService): default_settings.apply_update(settings) super().__init__( + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -279,8 +276,6 @@ class PiperHttpTTSService(TTSService): "Content-Type": "application/json", } try: - await self.start_ttfb_metrics() - data = { "text": text, "voice": self._settings.voice, @@ -296,8 +291,6 @@ class PiperHttpTTSService(TTSService): await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - CHUNK_SIZE = self.chunk_size async for frame in self._stream_audio_frames_from_iterator( @@ -311,4 +304,3 @@ class PiperHttpTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/services/resembleai/tts.py b/src/pipecat/services/resembleai/tts.py index e1c1cf68a..45f8fc229 100644 --- a/src/pipecat/services/resembleai/tts.py +++ b/src/pipecat/services/resembleai/tts.py @@ -24,7 +24,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.services.settings import TTSSettings, _warn_deprecated_param -from pipecat.services.tts_service import AudioContextTTSService +from pipecat.services.tts_service import WebsocketTTSService from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -43,7 +43,7 @@ class ResembleAITTSSettings(TTSSettings): pass -class ResembleAITTSService(AudioContextTTSService): +class ResembleAITTSService(WebsocketTTSService): """Resemble AI TTS service with WebSocket streaming and word timestamps. Provides text-to-speech using Resemble AI's streaming WebSocket API. @@ -103,7 +103,6 @@ class ResembleAITTSService(AudioContextTTSService): super().__init__( sample_rate=sample_rate, reuse_context_id_within_turn=False, - supports_word_timestamps=True, settings=default_settings, **kwargs, ) @@ -268,7 +267,7 @@ class ResembleAITTSService(AudioContextTTSService): """ pass - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio and finalize the current context.""" logger.trace(f"{self}: flushing audio") # For Resemble AI, we just wait for the audio_end message @@ -297,9 +296,6 @@ class ResembleAITTSService(AudioContextTTSService): continue if msg_type == "audio": - await self.stop_ttfb_metrics() - await self.start_word_timestamps() - # Decode base64 audio content audio_content = msg.get("audio_content", "") if not audio_content: @@ -447,14 +443,14 @@ class ResembleAITTSService(AudioContextTTSService): if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() - await self.start_ttfb_metrics() - yield TTSStartedFrame(context_id=context_id) + if not self.audio_context_available(context_id): + await self.create_audio_context(context_id) + await self.start_ttfb_metrics() + yield TTSStartedFrame(context_id=context_id) # Map request_id to context_id for tracking self._request_id_to_context[self._request_id_counter] = context_id - await self.create_audio_context(context_id) - msg = self._build_msg(text=text) try: diff --git a/src/pipecat/services/rime/tts.py b/src/pipecat/services/rime/tts.py index 04ebcd5fc..27580504b 100644 --- a/src/pipecat/services/rime/tts.py +++ b/src/pipecat/services/rime/tts.py @@ -33,10 +33,10 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import ( - AudioContextTTSService, InterruptibleTTSService, TextAggregationMode, TTSService, + WebsocketTTSService, ) from pipecat.transcriptions.language import Language, resolve_language from pipecat.utils.text.base_text_aggregator import BaseTextAggregator @@ -123,7 +123,7 @@ class RimeNonJsonTTSSettings(TTSSettings): _aliases: ClassVar[Dict[str, str]] = {"speaker": "voice"} -class RimeTTSService(AudioContextTTSService): +class RimeTTSService(WebsocketTTSService): """Text-to-Speech service using Rime's websocket API. Uses Rime's websocket JSON API to convert text to speech with word-level timing @@ -276,7 +276,6 @@ class RimeTTSService(AudioContextTTSService): push_text_frames=False, push_stop_frames=True, pause_frame_processing=True, - supports_word_timestamps=True, append_trailing_space=True, sample_rate=sample_rate, settings=default_settings, @@ -408,9 +407,9 @@ class RimeTTSService(AudioContextTTSService): return changed - def _build_msg(self, text: str = "") -> dict: + def _build_msg(self, text: str = "", context_id: str = "") -> dict: """Build JSON message for Rime API.""" - msg = {"text": text, "contextId": self.get_active_audio_context_id()} + msg = {"text": text, "contextId": context_id} if self._extra_msg_fields: msg |= self._extra_msg_fields self._extra_msg_fields = {} @@ -557,15 +556,14 @@ class RimeTTSService(AudioContextTTSService): return word_pairs - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis.""" - context_id = self.get_active_audio_context_id() - if not context_id or not self._websocket: + flush_id = context_id or self.get_active_audio_context_id() + if not flush_id or not self._websocket: return logger.trace(f"{self}: flushing audio") await self._get_websocket().send(json.dumps({"operation": "flush"})) - self.reset_active_audio_context() async def _receive_messages(self): """Process incoming websocket messages.""" @@ -578,8 +576,6 @@ class RimeTTSService(AudioContextTTSService): context_id = msg["contextId"] if msg["type"] == "chunk": # Process audio chunk - await self.stop_ttfb_metrics() - await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self.sample_rate, @@ -638,13 +634,13 @@ class RimeTTSService(AudioContextTTSService): await self._connect() try: - if not self.has_active_audio_context(): + if not self.audio_context_available(context_id): + await self.create_audio_context(context_id) await self.start_ttfb_metrics() yield TTSStartedFrame(context_id=context_id) self._cumulative_time = 0 - await self.create_audio_context(context_id) - msg = self._build_msg(text=text) + msg = self._build_msg(text=text, context_id=context_id) await self._get_websocket().send(json.dumps(msg)) await self.start_tts_usage_metrics(text) except Exception as e: @@ -773,6 +769,8 @@ class RimeHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_stop_frames=True, + push_start_frame=True, settings=default_settings, **kwargs, ) @@ -844,8 +842,6 @@ class RimeHttpTTSService(TTSService): need_to_strip_wav_header = False try: - await self.start_ttfb_metrics() - async with self._session.post( self._base_url, json=payload, headers=headers ) as response: @@ -856,8 +852,6 @@ class RimeHttpTTSService(TTSService): await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - CHUNK_SIZE = self.chunk_size async for frame in self._stream_audio_frames_from_iterator( @@ -872,7 +866,6 @@ class RimeHttpTTSService(TTSService): yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) class RimeNonJsonTTSService(InterruptibleTTSService): @@ -1005,6 +998,7 @@ class RimeNonJsonTTSService(InterruptibleTTSService): aggregate_sentences=aggregate_sentences, text_aggregation_mode=text_aggregation_mode, push_stop_frames=True, + push_start_frame=True, pause_frame_processing=True, append_trailing_space=True, settings=default_settings, @@ -1022,7 +1016,6 @@ class RimeNonJsonTTSService(InterruptibleTTSService): self._settings.extra.update(params.extra) self._receive_task = None - self._context_id: Optional[str] = None def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. @@ -1138,7 +1131,6 @@ class RimeNonJsonTTSService(InterruptibleTTSService): except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) finally: - self._context_id = None self._websocket = None await self._call_event_handler("on_disconnected") @@ -1148,7 +1140,7 @@ class RimeNonJsonTTSService(InterruptibleTTSService): return self._websocket raise Exception("Websocket not connected") - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis.""" if not self._websocket: return @@ -1168,7 +1160,7 @@ class RimeNonJsonTTSService(InterruptibleTTSService): audio=message, sample_rate=self.sample_rate, num_channels=1, - context_id=self._context_id, + context_id=self.get_active_audio_context_id(), ) await self.push_frame(frame) except Exception as e: @@ -1190,10 +1182,6 @@ class RimeNonJsonTTSService(InterruptibleTTSService): if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() try: - await self.start_ttfb_metrics() - # Store context_id for use in _receive_messages - self._context_id = context_id - yield TTSStartedFrame(context_id=context_id) # Send bare text (not JSON) await self._get_websocket().send(text) await self.start_tts_usage_metrics(text) diff --git a/src/pipecat/services/sarvam/tts.py b/src/pipecat/services/sarvam/tts.py index 0d605fc6b..6bf38bb24 100644 --- a/src/pipecat/services/sarvam/tts.py +++ b/src/pipecat/services/sarvam/tts.py @@ -524,6 +524,8 @@ class SarvamHttpTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_stop_frames=True, + push_start_frame=True, settings=default_settings, **kwargs, ) @@ -573,8 +575,6 @@ class SarvamHttpTTSService(TTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - await self.start_ttfb_metrics() - # Build payload with common parameters payload = { "text": text, @@ -606,8 +606,6 @@ class SarvamHttpTTSService(TTSService): url = f"{self._base_url}/text-to-speech" - yield TTSStartedFrame(context_id=context_id) - async with self._session.post(url, json=payload, headers=headers) as response: if response.status != 200: error_text = await response.text() @@ -645,7 +643,6 @@ class SarvamHttpTTSService(TTSService): yield ErrorFrame(error=f"Error generating TTS: {e}", exception=e) finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame(context_id=context_id) class SarvamTTSService(InterruptibleTTSService): @@ -951,6 +948,7 @@ class SarvamTTSService(InterruptibleTTSService): push_text_frames=True, pause_frame_processing=True, push_stop_frames=True, + push_start_frame=True, sample_rate=sample_rate, settings=default_settings, **kwargs, @@ -967,7 +965,6 @@ class SarvamTTSService(InterruptibleTTSService): self._receive_task = None self._keepalive_task = None - self._context_id: Optional[str] = None def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. @@ -1018,7 +1015,7 @@ class SarvamTTSService(InterruptibleTTSService): await super().cancel(frame) await self._disconnect() - async def flush_audio(self): + async def flush_audio(self, context_id: Optional[str] = None): """Flush any pending audio synthesis by sending flush command.""" try: if self._websocket: @@ -1151,7 +1148,6 @@ class SarvamTTSService(InterruptibleTTSService): except Exception as e: await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e) finally: - self._context_id = None self._websocket = None await self._call_event_handler("on_disconnected") @@ -1170,7 +1166,7 @@ class SarvamTTSService(InterruptibleTTSService): await self.stop_ttfb_metrics() audio = base64.b64decode(msg["data"]["audio"]) frame = TTSAudioRawFrame( - audio, self.sample_rate, 1, context_id=self._context_id + audio, self.sample_rate, 1, context_id=self.get_active_audio_context_id() ) await self.push_frame(frame) elif msg.get("type") == "error": @@ -1224,10 +1220,6 @@ class SarvamTTSService(InterruptibleTTSService): await self._connect() try: - await self.start_ttfb_metrics() - # Store context_id for use in _receive_messages - self._context_id = context_id - yield TTSStartedFrame(context_id=context_id) await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 55ded437e..22b47f3fc 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -19,8 +19,6 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -135,6 +133,8 @@ class SpeechmaticsTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -185,9 +185,6 @@ class SpeechmaticsTTSService(TTSService): url = _get_endpoint_url(self._base_url, self._settings.voice, self.sample_rate) try: - # Start TTS TTFB metrics - await self.start_ttfb_metrics() - # Track attempt attempt = 0 @@ -238,9 +235,6 @@ class SpeechmaticsTTSService(TTSService): # Update Pipecat metrics await self.start_tts_usage_metrics(text) - # Emit the TTS started frame - yield TTSStartedFrame(context_id=context_id) - # Process the response in streaming chunks first_chunk = True buffer = b"" @@ -277,8 +271,7 @@ class SpeechmaticsTTSService(TTSService): except Exception as e: yield ErrorFrame(error=f"Error generating TTS: {e}") finally: - # Emit the TTS stopped frame - yield TTSStoppedFrame(context_id=context_id) + await self.stop_ttfb_metrics() def _get_endpoint_url(base_url: str, voice: str, sample_rate: int) -> str: diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 4285e14f9..db695fd52 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -97,6 +97,15 @@ class TextAggregationMode(str, Enum): return self.value +@dataclass +class _WordTimestampEntry: + """Internal: word timestamp routed through an audio context queue.""" + + word: str + timestamp: float + context_id: str + + class TTSService(AIService): """Base class for text-to-speech services. @@ -131,6 +140,8 @@ class TTSService(AIService): _settings: TTSSettings + _CONTEXT_KEEPALIVE = object() + def __init__( self, *, @@ -141,6 +152,8 @@ class TTSService(AIService): push_text_frames: bool = True, # if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it push_stop_frames: bool = False, + # if True, TTSService will push TTSStartedFrames and create audio contexts automatically + push_start_frame: bool = False, # if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame stop_frame_timeout_s: float = 2.0, # if True, TTSService will push silence audio frames after TTSStoppedFrame @@ -154,8 +167,6 @@ class TTSService(AIService): append_trailing_space: bool = False, # TTS output sample rate sample_rate: Optional[int] = None, - # if True, enables word-level timestamp tracking and synchronization - supports_word_timestamps: bool = False, # Text aggregator to aggregate incoming tokens and decide when to push to the TTS. text_aggregator: Optional[BaseTextAggregator] = None, # Types of text aggregations that should not be spoken. @@ -174,6 +185,8 @@ class TTSService(AIService): # Audio transport destination of the generated frames. transport_destination: Optional[str] = None, settings: Optional[TTSSettings] = None, + # if True, the context ID is reused within an LLM turn + reuse_context_id_within_turn: bool = True, **kwargs, ): """Initialize the TTS service. @@ -191,6 +204,9 @@ class TTSService(AIService): push_text_frames: Whether to push TextFrames and LLMFullResponseEndFrames. push_stop_frames: Whether to automatically push TTSStoppedFrames. + push_start_frame: Whether to automatically create audio contexts and push TTSStartedFrames. + When True, the base class handles ``create_audio_context`` and yields ``TTSStartedFrame`` + before each synthesis call, so ``run_tts`` implementations do not need to. stop_frame_timeout_s: Idle time before pushing TTSStoppedFrame when push_stop_frames is True. push_silence_after_stop: Whether to push silence audio after TTSStoppedFrame. silence_time_s: Duration of silence to push when push_silence_after_stop is True. @@ -198,9 +214,6 @@ class TTSService(AIService): append_trailing_space: Whether to append a trailing space to text before sending to TTS. This helps prevent some TTS services from vocalizing trailing punctuation (e.g., "dot"). sample_rate: Output sample rate for generated audio. - supports_word_timestamps: Whether this service supports word-level timestamp tracking. - When True, enables synchronization of audio with spoken words so only spoken words - are added to the conversation context. text_aggregator: Custom text aggregator for processing incoming text. .. deprecated:: 0.0.95 @@ -220,6 +233,8 @@ class TTSService(AIService): transport_destination: Destination for generated audio frames. settings: The runtime-updatable settings for the TTS service. + reuse_context_id_within_turn: Whether the service should reuse context IDs within the + same turn. **kwargs: Additional arguments passed to the parent AIService. """ super().__init__( @@ -256,6 +271,7 @@ class TTSService(AIService): self._text_aggregation_mode: TextAggregationMode = text_aggregation_mode self._push_text_frames: bool = push_text_frames self._push_stop_frames: bool = push_stop_frames + self._push_start_frame: bool = push_start_frame self._stop_frame_timeout_s: float = stop_frame_timeout_s self._push_silence_after_stop: bool = push_silence_after_stop self._silence_time_s: float = silence_time_s @@ -304,18 +320,43 @@ class TTSService(AIService): self._streamed_text: str = "" self._text_aggregation_metrics_started: bool = False - # Word timestamp state (active when supports_word_timestamps=True) - self._supports_word_timestamps: bool = supports_word_timestamps + # Word timestamp state self._initial_word_timestamp: int = -1 self._initial_word_times: List[Tuple[str, float, Optional[str]]] = [] - self._words_task: Optional[asyncio.Task] = None + # PTS of the last word frame pushed via _add_word_timestamps, used to assign + # correct PTS to sentinel frames ("TTSStoppedFrame", "Reset") that follow. + self._word_last_pts: int = 0 self._llm_response_started: bool = False + self._reuse_context_id_within_turn: bool = reuse_context_id_within_turn + + # _turn_context_id: + # Set on LLMFullResponseStartFrame and cleared after LLMFullResponseEndFrame + # is processed (i.e. after flush). All sentences within one LLM turn share + # this ID so the TTS service groups them into a single audio context. + # Temporarily set to None for TTSSpeakFrame utterances, which are standalone. + # + # _playing_context_id (playback-side cursor): + # Set by _audio_context_task_handler as it dequeues contexts for playback. + # Cleared by reset_active_audio_context() on interruption. Used by + # has_active_audio_context() and get_active_audio_context_id(). + # + # Both fields may hold the same value during a turn, but + # they clear at different times: _turn_context_id is cleared when the LLM turn + # ends (synthesis done) while _playing_context_id remains set until the audio + # finishes playing. Merging them would null out the playback cursor prematurely. + self._playing_context_id: Optional[str] = None + self._turn_context_id: Optional[str] = None + self._audio_contexts: Dict[str, asyncio.Queue] = {} + self._audio_context_task: Optional[asyncio.Task] = None self._register_event_handler("on_connected") self._register_event_handler("on_disconnected") self._register_event_handler("on_connection_error") self._register_event_handler("on_tts_request") + # Whether the TTS process is currently yielding audio frames synchronously. + self._is_yielding_frames_synchronously = False + @property def _is_streaming_tokens(self) -> bool: """Whether the service is streaming tokens directly without sentence aggregation.""" @@ -417,13 +458,14 @@ class TTSService(AIService): await self._update_settings(settings_cls(voice=voice)) def create_context_id(self) -> str: - """Generate a unique context ID for a TTS request. - - This method can be overridden by subclasses to provide custom context ID generation. + """Generate or reuse a context ID based on concurrent TTS support. Returns: - A unique string identifier for the TTS context. + A context ID string for the TTS request. """ + if self._reuse_context_id_within_turn and self._turn_context_id: + self._refresh_audio_context(self._turn_context_id) + return self._turn_context_id return str(uuid.uuid4()) # Converts the text to audio. @@ -466,8 +508,13 @@ class TTSService(AIService): return text + " " return text - async def flush_audio(self): - """Flush any buffered audio data.""" + async def flush_audio(self, context_id: Optional[str] = None): + """Flush any buffered audio data. + + Args: + context_id: The specific context to flush. If None, falls back to the + currently active context (for non-concurrent services). + """ pass async def start(self, frame: StartFrame): @@ -480,8 +527,7 @@ class TTSService(AIService): self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate if self._push_stop_frames and not self._stop_frame_task: self._stop_frame_task = self.create_task(self._stop_frame_handler()) - if self._supports_word_timestamps: - self._create_words_task() + self._create_audio_context_task() async def stop(self, frame: EndFrame): """Stop the TTS service. @@ -493,8 +539,12 @@ class TTSService(AIService): if self._stop_frame_task: await self.cancel_task(self._stop_frame_task) self._stop_frame_task = None - if self._words_task: - await self._stop_words_task() + if self._audio_context_task: + # Indicate no more audio contexts are available; this will end the + # task cleanly after all contexts have been processed. + await self._contexts_queue.put(None) + await self._audio_context_task + self._audio_context_task = None async def cancel(self, frame: CancelFrame): """Cancel the TTS service. @@ -506,8 +556,7 @@ class TTSService(AIService): if self._stop_frame_task: await self.cancel_task(self._stop_frame_task) self._stop_frame_task = None - if self._words_task: - await self._stop_words_task() + await self._stop_audio_context_task() def add_text_transformer( self, @@ -584,6 +633,25 @@ class TTSService(AIService): await self.queue_frame(TTSSpeakFrame(text)) + async def on_turn_context_completed(self): + """Handle the completion of a turn.""" + # For HTTP services they emit the frames synchronously, so close the audio context here + # once all frames (including TTSTextFrame above) have been enqueued. + if self._is_yielding_frames_synchronously and self.audio_context_available( + self._turn_context_id + ): + if self._push_stop_frames: + await self.append_to_audio_context( + self._turn_context_id, TTSStoppedFrame(context_id=self._turn_context_id) + ) + await self.remove_audio_context(self._turn_context_id) + + # Flush any pending audio so the TTS service closes the current context. + await self.flush_audio(context_id=self._turn_context_id) + + # Reset the turn context ID + self._turn_context_id = None + async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames for text-to-speech conversion. @@ -615,6 +683,8 @@ class TTSService(AIService): await self.push_frame(frame, direction) elif isinstance(frame, LLMFullResponseStartFrame): self._llm_response_started = True + # New LLM turn → assign a fresh context ID shared by all sentences + self._turn_context_id = self.create_context_id() await self.push_frame(frame, direction) elif isinstance(frame, (LLMFullResponseEndFrame, EndFrame)): # We pause processing incoming frames if the LLM response included @@ -642,12 +712,17 @@ class TTSService(AIService): await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) - # Flush any pending audio so the TTS service closes the current context. - if self._supports_word_timestamps: - await self.flush_audio() + + await self.on_turn_context_completed() elif isinstance(frame, TTSSpeakFrame): # Store if we were processing text or not so we can set it back. processing_text = self._processing_text + # TTSSpeakFrame is independent — temporarily clear the turn context + # so create_context_id() generates a fresh UUID for this utterance. + saved_turn_context_id = self._turn_context_id + self._turn_context_id = None + # Creating a new context_id for the TTS request. + self._turn_context_id = self.create_context_id() # If we are not receiving text from the LLM, we can assume that the SpeakFrame should be automatically added to the context push_assistant_aggregation = frame.append_to_context and not self._llm_response_started # Assumption: text in TTSSpeakFrame does not include inter-frame spaces @@ -656,10 +731,11 @@ class TTSService(AIService): append_tts_text_to_context=frame.append_to_context, push_assistant_aggregation=push_assistant_aggregation, ) + await self.on_turn_context_completed() # We pause processing incoming frames because we are sending data to # the TTS. We pause to avoid audio overlapping. await self._maybe_pause_frame_processing() - await self.flush_audio() + self._turn_context_id = saved_turn_context_id self._processing_text = processing_text elif isinstance(frame, TTSUpdateSettingsFrame): if frame.delta is not None: @@ -789,8 +865,17 @@ class TTSService(AIService): self._llm_response_started = False self._streamed_text = "" self._text_aggregation_metrics_started = False - if self._supports_word_timestamps: - await self.reset_word_timestamps() + await self.reset_word_timestamps() + + await self._stop_audio_context_task() + audio_contexts = self.get_audio_contexts() + if audio_contexts: + for ctx_id in audio_contexts: + await self.on_audio_context_interrupted(context_id=ctx_id) + self.reset_active_audio_context() + self._turn_context_id = None + self._word_last_pts = 0 + self._create_audio_context_task() async def _maybe_pause_frame_processing(self): if self._processing_text and self._pause_frame_processing: @@ -897,7 +982,12 @@ class TTSService(AIService): # Trigger event before starting TTS await self._call_event_handler("on_tts_request", context_id, prepared_text) - await self.process_generator(self.run_tts(prepared_text, context_id)) + if self._push_start_frame and not self.audio_context_available(context_id): + await self.create_audio_context(context_id) + await self.start_ttfb_metrics() + await self.append_to_audio_context(context_id, TTSStartedFrame(context_id=context_id)) + + await self.tts_process_generator(context_id, self.run_tts(prepared_text, context_id)) if not self._is_streaming_tokens: await self.stop_processing_metrics() @@ -916,9 +1006,44 @@ class TTSService(AIService): # Only override append_to_context if explicitly set if append_tts_text_to_context is not None: frame.append_to_context = append_tts_text_to_context - await self.push_frame(frame) - if push_assistant_aggregation: - await self.push_frame(LLMAssistantPushAggregationFrame()) + # For services using the audio context we are appending to the context, so it preserves the ordering. + if self.audio_context_available(context_id): + await self.append_to_audio_context(context_id, frame) + if push_assistant_aggregation: + await self.append_to_audio_context( + context_id, LLMAssistantPushAggregationFrame() + ) + else: + await self.push_frame(frame) + if push_assistant_aggregation: + await self.push_frame(LLMAssistantPushAggregationFrame()) + + async def tts_process_generator( + self, context_id: str, generator: AsyncGenerator[Frame | None, None] + ) -> bool: + """Process frames from an async generator, routing them through the audio context. + + All non-None frames yielded by the generator are appended to the audio context + identified by context_id. The audio context must be created by run_tts (via + create_audio_context) before the first frame is yielded. + + WebSocket services yield None to signal that audio will arrive via a separate + receive loop; those services manage context lifetime themselves (via remove_audio_context + in the receive loop on "done"). HTTP services never yield None and do NOT call + remove_audio_context in run_tts — the caller (_synthesize_text) closes the context + after appending any remaining frames (e.g. TTSTextFrame). + + Args: + context_id: The audio context to route frames to. + generator: An async generator yielding Frame objects or None. + + """ + is_yielding_frames = False + async for frame in generator: + if frame: + await self.append_to_audio_context(context_id, frame) + is_yielding_frames = True + self._is_yielding_frames_synchronously = is_yielding_frames async def _stop_frame_handler(self): has_started = False @@ -937,18 +1062,25 @@ class TTSService(AIService): has_started = False # - # Word timestamp methods (active when supports_word_timestamps=True) + # Word timestamp methods # async def start_word_timestamps(self): """Start tracking word timestamps from the current time.""" if self._initial_word_timestamp == -1: - self._initial_word_timestamp = self.get_clock().get_time() + current_time = self.get_clock().get_time() + # Initialize word timestamp tracking. Use the last emitted timestamp if it's ahead + # of current time to maintain continuity across overlapping audio contexts. + self._initial_word_timestamp = ( + self._word_last_pts if self._word_last_pts > current_time else current_time + ) # If we cached some initial word times (because we didn't receive # audio), let's add them now. if self._initial_word_times: - await self._add_word_timestamps(self._initial_word_times) + cached = self._initial_word_times.copy() self._initial_word_times = [] + for word, timestamp_seconds, ctx_id in cached: + await self._add_word_timestamps([(word, timestamp_seconds)], ctx_id) async def reset_word_timestamps(self): """Reset word timestamp tracking.""" @@ -957,75 +1089,281 @@ class TTSService(AIService): async def add_word_timestamps( self, word_times: List[Tuple[str, float]], context_id: Optional[str] = None ): - """Add word timestamps to the processing queue. + """Add word timestamps for processing. + + When an audio context exists for this context_id, timestamps are routed into the + per-context audio queue alongside audio frames so they are processed in strict + playback order by _handle_audio_context. Otherwise they are processed immediately + via _add_word_timestamps. Args: word_times: List of (word, timestamp) tuples where timestamp is in seconds. context_id: Unique identifier for the TTS context. """ - # Transform to include context_id in each tuple - word_times_with_context = [(word, timestamp, context_id) for word, timestamp in word_times] - - if self._initial_word_timestamp == -1: - # Cache word timestamps and don't add them until we have started - # (i.e. we have some audio). - self._initial_word_times.extend(word_times_with_context) + if context_id and self.audio_context_available(context_id): + for word, timestamp in word_times: + await self._audio_contexts[context_id].put( + _WordTimestampEntry( + word=word, + timestamp=timestamp, + context_id=context_id, + ) + ) else: - await self._add_word_timestamps(word_times_with_context) + await self._add_word_timestamps(word_times=word_times, context_id=context_id) - def _create_words_task(self): - if not self._words_task: - self._words_queue: asyncio.Queue = asyncio.Queue() - self._words_task = self.create_task(self._words_task_handler()) + async def _add_word_timestamps( + self, word_times: List[Tuple[str, float]], context_id: Optional[str] = None + ): + """Process word timestamps directly, building and pushing frames inline. - async def _stop_words_task(self): - if self._words_task: - await self.cancel_task(self._words_task) - self._words_task = None + This is the single processing path for all word timestamp events, used both + from _handle_audio_context (via _WordTimestampEntry) and from services that + do not use audio contexts. Sentinel entries drive control-frame emission: - async def _add_word_timestamps(self, word_times_with_context: List[Tuple[str, float, str]]): - for word, timestamp, context_id in word_times_with_context: - await self._words_queue.put((word, seconds_to_nanoseconds(timestamp), context_id)) + - ("Reset", 0): reset timestamp baseline; emit LLMFullResponseEndFrame if needed. + - ("TTSStoppedFrame", 0): emit TTSStoppedFrame. + - Any other entry: emit TTSTextFrame with a PTS relative to the baseline. - async def _words_task_handler(self): - last_pts = 0 - while True: - frame = None - (word, timestamp, context_id) = await self._words_queue.get() + When the baseline (_initial_word_timestamp) is not yet set, regular word entries + are cached in _initial_word_times and flushed once start_word_timestamps() is + called (i.e. when the first audio chunk is received). + """ + for word, timestamp in word_times: if word == "Reset" and timestamp == 0: await self.reset_word_timestamps() if self._llm_response_started: self._llm_response_started = False frame = LLMFullResponseEndFrame() - frame.pts = last_pts + frame.pts = self._word_last_pts + await self.push_frame(frame) elif word == "TTSStoppedFrame" and timestamp == 0: - frame = TTSStoppedFrame() - frame.pts = last_pts + frame = TTSStoppedFrame(context_id=context_id) + frame.pts = self._word_last_pts frame.context_id = context_id + await self.push_frame(frame) if context_id in self._tts_contexts: if self._tts_contexts[context_id].push_assistant_aggregation: await self.push_frame(LLMAssistantPushAggregationFrame()) else: - # Assumption: word-by-word text frames don't include spaces, so - # we can rely on the default includes_inter_frame_spaces=False - frame = TTSTextFrame(word, aggregated_by=AggregationType.WORD) - frame.pts = self._initial_word_timestamp + timestamp - frame.context_id = context_id - # Look up append_to_context from context metadata - if context_id in self._tts_contexts: - frame.append_to_context = self._tts_contexts[context_id].append_to_context - if frame: - last_pts = frame.pts - await self.push_frame(frame) - self._words_queue.task_done() + ts_ns = seconds_to_nanoseconds(timestamp) + if self._initial_word_timestamp == -1: + # Cache until we have audio and can compute PTS. + self._initial_word_times.append((word, timestamp, context_id)) + else: + # Assumption: word-by-word text frames don't include spaces, so + # we can rely on the default includes_inter_frame_spaces=False + frame = TTSTextFrame(word, aggregated_by=AggregationType.WORD) + frame.pts = self._initial_word_timestamp + ts_ns + frame.context_id = context_id + if context_id in self._tts_contexts: + frame.append_to_context = self._tts_contexts[context_id].append_to_context + self._word_last_pts = frame.pts + await self.push_frame(frame) + + # + # Audio context methods (active when using websocket-based TTS with context management) + # + + async def create_audio_context(self, context_id: str): + """Create a new audio context for grouping related audio. + + Args: + context_id: Unique identifier for the audio context. + """ + await self._contexts_queue.put(context_id) + self._audio_contexts[context_id] = asyncio.Queue() + logger.trace(f"{self} created audio context {context_id}") + + async def append_to_audio_context(self, context_id: str, frame: Frame): + """Append audio or control frame to an existing context. + + Args: + context_id: The context to append audio to. + frame: The audio or control frame to append. + """ + if self.audio_context_available(context_id): + logger.trace(f"{self} appending audio {frame} to audio context {context_id}") + await self._audio_contexts[context_id].put(frame) + elif context_id == self._turn_context_id: + # Sometimes the HTTP service can take more than 3 seconds without sending any audio + # So we are now recreating the context id while we are in the same turn + logger.debug(f"{self} recreating audio context {context_id}") + await self.create_audio_context(context_id) + logger.trace(f"{self} appending audio {frame} to audio context {context_id}") + await self._audio_contexts[context_id].put(frame) + else: + logger.warning(f"{self} unable to append audio to context {context_id}") + + async def remove_audio_context(self, context_id: str): + """Remove an existing audio context. + + Args: + context_id: The context to remove. + """ + if self.audio_context_available(context_id): + # We just mark the audio context for deletion by appending + # None. Once we reach None while handling audio we know we can + # safely remove the context. + logger.trace(f"{self} marking audio context {context_id} for deletion") + await self._audio_contexts[context_id].put(None) + else: + logger.warning(f"{self} unable to remove context {context_id}") + + def has_active_audio_context(self) -> bool: + """Check if there is an active audio context. + + Returns: + True if an active audio context exists, False otherwise. + """ + return self._playing_context_id is not None and self.audio_context_available( + self._playing_context_id + ) + + def get_audio_contexts(self) -> List[str]: + """Get a list of all available audio contexts.""" + return list(self._audio_contexts.keys()) + + def get_active_audio_context_id(self) -> Optional[str]: + """Get the active audio context ID. + + Returns: + The active context ID, or None if no context is active. + """ + return self._playing_context_id + + async def remove_active_audio_context(self): + """Remove the active audio context.""" + if self._playing_context_id: + await self.remove_audio_context(self._playing_context_id) + self.reset_active_audio_context() + + def reset_active_audio_context(self): + """Reset the active audio context.""" + self._playing_context_id = None + + def audio_context_available(self, context_id: str) -> bool: + """Check whether the given audio context is registered. + + Args: + context_id: The context ID to check. + + Returns: + True if the context exists and is available. + """ + return context_id in self._audio_contexts + + def _refresh_audio_context(self, context_id: str): + """Signal that the audio context is still in use, resetting the timeout.""" + if self.audio_context_available(context_id): + self._audio_contexts[context_id].put_nowait(TTSService._CONTEXT_KEEPALIVE) + + def _create_audio_context_task(self): + if not self._audio_context_task: + self._contexts_queue: asyncio.Queue = asyncio.Queue() + self._audio_contexts: Dict[str, asyncio.Queue] = {} + self._audio_context_task = self.create_task(self._audio_context_task_handler()) + + async def _stop_audio_context_task(self): + if self._audio_context_task: + await self.cancel_task(self._audio_context_task) + self._audio_context_task = None + + async def _audio_context_task_handler(self): + """In this task we process audio contexts in order.""" + running = True + while running: + context_id = await self._contexts_queue.get() + self._playing_context_id = context_id + + if context_id: + # Process the audio context until the context doesn't have more + # audio available (i.e. we find None). + await self._handle_audio_context(context_id) + + # We just finished processing the context, so we can safely remove it. + del self._audio_contexts[context_id] + await self.on_audio_context_completed(context_id=context_id) + self.reset_active_audio_context() + else: + running = False + + self._contexts_queue.task_done() + + async def _handle_audio_context(self, context_id: str): + """Process items from an audio context queue until it is exhausted.""" + AUDIO_CONTEXT_TIMEOUT = 3.0 + queue = self._audio_contexts[context_id] + running = True + timestamps_started = False + while running: + try: + frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT) + if frame is TTSService._CONTEXT_KEEPALIVE: + # Context is still in use, reset the timeout. + continue + elif frame is None: + running = False + elif isinstance(frame, _WordTimestampEntry): + # _add_word_timestamps is the single processing path: it handles + # sentinel entries ("Reset", "TTSStoppedFrame") and regular words + # inline, keeping all word-frame logic in one place. + await self._add_word_timestamps( + [(frame.word, frame.timestamp)], frame.context_id + ) + continue + elif isinstance(frame, TTSAudioRawFrame): + # Set the word-timestamp baseline once, on the first audio chunk. + if not timestamps_started: + await self.stop_ttfb_metrics() + await self.start_word_timestamps() + timestamps_started = True + + if frame: + if isinstance(frame, ErrorFrame): + await self.push_error_frame(frame) + else: + await self.push_frame(frame) + except asyncio.TimeoutError: + # We didn't get audio, so let's consider this context finished. + logger.trace(f"{self} time out on audio context {context_id}") + break + + async def on_audio_context_interrupted(self, context_id: str): + """Called when an audio context is cancelled due to an interruption. + + Override this in a subclass to perform provider-specific cleanup (e.g. + sending a cancel/close message over the WebSocket) when the bot is + interrupted mid-speech. The audio context task has already been stopped + and the active context has **not** yet been reset when this is called, + so ``context_id`` reflects the context that was cut short. + + Args: + context_id: The ID of the audio context that was interrupted, or + ``None`` if no context was active at the time. + """ + pass + + async def on_audio_context_completed(self, context_id: str): + """Called after an audio context has finished playing all of its audio. + + Override this in a subclass to perform provider-specific cleanup (e.g. + sending a close-context message to free server-side resources) once an + audio context has been fully processed. The context entry has already + been removed from the internal context map, and the active context has + **not** yet been reset when this is called. + + Args: + context_id: The ID of the audio context that finished processing. + """ + pass class WordTTSService(TTSService): - """Deprecated. Use TTSService with supports_word_timestamps=True instead. + """Deprecated. Use TTSService directly instead. - .. deprecated:: 0.0.104 - Word timestamp functionality has been moved to TTSService. Pass - ``supports_word_timestamps=True`` to TTSService (or any subclass) instead. + .. deprecated:: 0.0.105 + Word timestamp functionality is now always active in TTSService. """ def __init__(self, **kwargs): @@ -1034,7 +1372,7 @@ class WordTTSService(TTSService): Args: **kwargs: Additional arguments passed to the parent TTSService. """ - super().__init__(supports_word_timestamps=True, **kwargs) + super().__init__(**kwargs) class WebsocketTTSService(TTSService, WebsocketService): @@ -1110,11 +1448,10 @@ class InterruptibleTTSService(WebsocketTTSService): class WebsocketWordTTSService(WebsocketTTSService): - """Deprecated. Use WebsocketTTSService with supports_word_timestamps=True instead. + """Deprecated. Use WebsocketTTSService directly instead. - .. deprecated:: 0.0.104 - Word timestamp functionality has been moved to TTSService. Pass - ``supports_word_timestamps=True`` to WebsocketTTSService instead. + .. deprecated:: 0.0.105 + Word timestamp functionality is now always active in TTSService. """ def __init__(self, *, reconnect_on_error: bool = True, **kwargs): @@ -1124,17 +1461,14 @@ class WebsocketWordTTSService(WebsocketTTSService): reconnect_on_error: Whether to automatically reconnect on websocket errors. **kwargs: Additional arguments passed to parent classes. """ - super().__init__( - supports_word_timestamps=True, reconnect_on_error=reconnect_on_error, **kwargs - ) + super().__init__(reconnect_on_error=reconnect_on_error, **kwargs) class InterruptibleWordTTSService(InterruptibleTTSService): - """Deprecated. Use InterruptibleTTSService with supports_word_timestamps=True instead. + """Deprecated. Use InterruptibleTTSService directly instead. - .. deprecated:: 0.0.104 - Word timestamp functionality has been moved to TTSService. Pass - ``supports_word_timestamps=True`` to InterruptibleTTSService instead. + .. deprecated:: 0.0.105 + Word timestamp functionality is now always active in TTSService. """ def __init__(self, **kwargs): @@ -1143,26 +1477,21 @@ class InterruptibleWordTTSService(InterruptibleTTSService): Args: **kwargs: Additional arguments passed to the parent InterruptibleTTSService. """ - super().__init__(supports_word_timestamps=True, **kwargs) + super().__init__(**kwargs) class AudioContextTTSService(WebsocketTTSService): - """Base class for websocket-based TTS services with audio context management. + """Deprecated. Inherit from WebsocketTTSService directly instead. - This is a base class for websocket-based TTS services that allow correlating - the generated audio with the requested text through audio contexts. + Audio context management (previously the main purpose of this class) is now + built into TTSService. This class is kept only for backwards compatibility. - Each request could be multiple sentences long which are grouped by - context. For this to work, the TTS service needs to support handling - multiple requests at once (i.e. multiple simultaneous contexts). - - The audio received from the TTS will be played in context order. That is, if - we requested audio for a context "A" and then audio for context "B", the - audio from context ID "A" will be played first. + .. deprecated:: 0.0.105 + Subclass :class:`WebsocketTTSService` directly and pass + ``reuse_context_id_within_turn`` as + keyword arguments to its ``__init__``. """ - _CONTEXT_KEEPALIVE = object() - def __init__( self, *, @@ -1177,248 +1506,26 @@ class AudioContextTTSService(WebsocketTTSService): reconnect_on_error: Whether to automatically reconnect on websocket errors. **kwargs: Additional arguments passed to the parent WebsocketTTSService. """ - super().__init__(reconnect_on_error=reconnect_on_error, **kwargs) - self._reuse_context_id_within_turn = reuse_context_id_within_turn - self._context_id = None - self._contexts: Dict[str, asyncio.Queue] = {} - self._audio_context_task = None + import warnings - async def create_audio_context(self, context_id: str): - """Create a new audio context for grouping related audio. - - Args: - context_id: Unique identifier for the audio context. - """ - # Set the context ID if not already set - if not self._context_id: - self._context_id = context_id - - await self._contexts_queue.put(context_id) - self._contexts[context_id] = asyncio.Queue() - logger.trace(f"{self} created audio context {context_id}") - - async def append_to_audio_context(self, context_id: str, frame: TTSAudioRawFrame): - """Append audio to an existing context. - - Args: - context_id: The context to append audio to. - frame: The audio frame to append. - """ - if self.audio_context_available(context_id): - logger.trace(f"{self} appending audio {frame} to audio context {context_id}") - await self._contexts[context_id].put(frame) - else: - logger.warning(f"{self} unable to append audio to context {context_id}") - - async def remove_audio_context(self, context_id: str): - """Remove an existing audio context. - - Args: - context_id: The context to remove. - """ - if self.audio_context_available(context_id): - # We just mark the audio context for deletion by appending - # None. Once we reach None while handling audio we know we can - # safely remove the context. - logger.trace(f"{self} marking audio context {context_id} for deletion") - await self._contexts[context_id].put(None) - else: - logger.warning(f"{self} unable to remove context {context_id}") - - def has_active_audio_context(self) -> bool: - """Check if there is an active audio context. - - Returns: - True if an active audio context exists, False otherwise. - """ - return self._context_id is not None and self.audio_context_available(self._context_id) - - def get_active_audio_context_id(self) -> Optional[str]: - """Get the active audio context ID. - - Returns: - The active context ID, or None if no context is active. - """ - return self._context_id - - async def remove_active_audio_context(self): - """Remove the active audio context.""" - if self._context_id: - await self.remove_audio_context(self._context_id) - self.reset_active_audio_context() - - def reset_active_audio_context(self): - """Reset the active audio context.""" - self._context_id = None - - def audio_context_available(self, context_id: str) -> bool: - """Check whether the given audio context is registered. - - Args: - context_id: The context ID to check. - - Returns: - True if the context exists and is available. - """ - return context_id in self._contexts - - def create_context_id(self) -> str: - """Generate or reuse a context ID based on concurrent TTS support. - - If _reuse_context_id_within_turn is False and a context already exists, - the existing context ID is returned. Otherwise, a new unique context - ID is generated. - - Returns: - A context ID string for the TTS request. - """ - if self._reuse_context_id_within_turn and self._context_id: - self._refresh_active_audio_context() - return self._context_id - return super().create_context_id() - - def _refresh_active_audio_context(self): - """Signal that the audio context is still in use, resetting the timeout.""" - if self.has_active_audio_context(): - self._contexts[self._context_id].put_nowait(AudioContextTTSService._CONTEXT_KEEPALIVE) - - async def start(self, frame: StartFrame): - """Start the audio context TTS service. - - Args: - frame: The start frame containing initialization parameters. - """ - await super().start(frame) - self._create_audio_context_task() - - async def stop(self, frame: EndFrame): - """Stop the audio context TTS service. - - Args: - frame: The end frame. - """ - await super().stop(frame) - if self._audio_context_task: - # Indicate no more audio contexts are available. this will end the - # task cleanly after all contexts have been processed. - await self._contexts_queue.put(None) - await self._audio_context_task - self._audio_context_task = None - - async def cancel(self, frame: CancelFrame): - """Cancel the audio context TTS service. - - Args: - frame: The cancel frame. - """ - await super().cancel(frame) - await self._stop_audio_context_task() - - async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): - await super()._handle_interruption(frame, direction) - await self._stop_audio_context_task() - await self.on_audio_context_interrupted(context_id=self._context_id) - self.reset_active_audio_context() - self._create_audio_context_task() - - def _create_audio_context_task(self): - if not self._audio_context_task: - self._contexts_queue = asyncio.Queue() - self._contexts: Dict[str, asyncio.Queue] = {} - self._audio_context_task = self.create_task(self._audio_context_task_handler()) - - async def _stop_audio_context_task(self): - if self._audio_context_task: - await self.cancel_task(self._audio_context_task) - self._audio_context_task = None - - async def _audio_context_task_handler(self): - """In this task we process audio contexts in order.""" - running = True - while running: - context_id = await self._contexts_queue.get() - self._context_id = context_id - - if context_id: - # Process the audio context until the context doesn't have more - # audio available (i.e. we find None). - await self._handle_audio_context(context_id) - - # We just finished processing the context, so we can safely remove it. - del self._contexts[context_id] - await self.on_audio_context_completed(context_id=context_id) - self.reset_active_audio_context() - - # Append some silence between sentences. - silence = b"\x00" * self.sample_rate - frame = TTSAudioRawFrame( - audio=silence, - sample_rate=self.sample_rate, - num_channels=1, - context_id=context_id, - ) - await self.push_frame(frame) - else: - running = False - - self._contexts_queue.task_done() - - async def _handle_audio_context(self, context_id: str): - # If we don't receive any audio during this time, we consider the context finished. - AUDIO_CONTEXT_TIMEOUT = 3.0 - queue = self._contexts[context_id] - running = True - while running: - try: - frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT) - if frame is AudioContextTTSService._CONTEXT_KEEPALIVE: - # Context is still in use, reset the timeout. - continue - - if frame: - await self.push_frame(frame) - running = frame is not None - except asyncio.TimeoutError: - # We didn't get audio, so let's consider this context finished. - logger.trace(f"{self} time out on audio context {context_id}") - break - - async def on_audio_context_interrupted(self, context_id: str): - """Called when an audio context is cancelled due to an interruption. - - Override this in a subclass to perform provider-specific cleanup (e.g. - sending a cancel/close message over the WebSocket) when the bot is - interrupted mid-speech. The audio context task has already been stopped - and the active context has **not** yet been reset when this is called, - so ``context_id`` reflects the context that was cut short. - - Args: - context_id: The ID of the audio context that was interrupted, or - ``None`` if no context was active at the time. - """ - pass - - async def on_audio_context_completed(self, context_id: str): - """Called after an audio context has finished playing all of its audio. - - Override this in a subclass to perform provider-specific cleanup (e.g. - sending a close-context message to free server-side resources) once an - audio context has been fully processed. The context entry has already - been removed from the internal context map, and the active context has - **not** yet been reset when this is called. - - Args: - context_id: The ID of the audio context that finished processing. - """ - pass + warnings.warn( + "AudioContextTTSService is deprecated. Inherit from WebsocketTTSService directly " + "and pass reuse_context_id_within_turn as kwargs.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__( + reuse_context_id_within_turn=reuse_context_id_within_turn, + reconnect_on_error=reconnect_on_error, + **kwargs, + ) class AudioContextWordTTSService(AudioContextTTSService): - """Deprecated. Use AudioContextTTSService with supports_word_timestamps=True instead. + """Deprecated. Use WebsocketTTSService directly instead. - .. deprecated:: 0.0.104 - Word timestamp functionality has been moved to TTSService. Pass - ``supports_word_timestamps=True`` to AudioContextTTSService instead. + .. deprecated:: 0.0.105 + Subclass :class:`WebsocketTTSService` directly. """ def __init__(self, *, reconnect_on_error: bool = True, **kwargs): @@ -1428,6 +1535,11 @@ class AudioContextWordTTSService(AudioContextTTSService): reconnect_on_error: Whether to automatically reconnect on websocket errors. **kwargs: Additional arguments passed to parent classes. """ - super().__init__( - supports_word_timestamps=True, reconnect_on_error=reconnect_on_error, **kwargs + import warnings + + warnings.warn( + "AudioContextWordTTSService is deprecated. Inherit from WebsocketTTSService directly.", + DeprecationWarning, + stacklevel=2, ) + super().__init__(reconnect_on_error=reconnect_on_error, **kwargs) diff --git a/src/pipecat/services/xtts/tts.py b/src/pipecat/services/xtts/tts.py index ea7cb0b8b..539ddc88c 100644 --- a/src/pipecat/services/xtts/tts.py +++ b/src/pipecat/services/xtts/tts.py @@ -11,7 +11,7 @@ text-to-speech synthesis using local Docker deployment. """ from dataclasses import dataclass -from typing import AsyncGenerator, Dict, Optional +from typing import Any, AsyncGenerator, Dict, Optional import aiohttp from loguru import logger @@ -22,8 +22,6 @@ from pipecat.frames.frames import ( Frame, StartFrame, TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, ) from pipecat.services.settings import TTSSettings, _warn_deprecated_param from pipecat.services.tts_service import TTSService @@ -132,6 +130,8 @@ class XTTSService(TTSService): super().__init__( sample_rate=sample_rate, + push_start_frame=True, + push_stop_frames=True, settings=default_settings, **kwargs, ) @@ -213,8 +213,6 @@ class XTTSService(TTSService): "stream_chunk_size": 20, } - await self.start_ttfb_metrics() - async with self._aiohttp_session.post(url, json=payload) as r: if r.status != 200: text = await r.text() @@ -223,8 +221,6 @@ class XTTSService(TTSService): await self.start_tts_usage_metrics(text) - yield TTSStartedFrame(context_id=context_id) - CHUNK_SIZE = self.chunk_size buffer = bytearray() @@ -262,5 +258,3 @@ class XTTSService(TTSService): resampled_audio, self.sample_rate, 1, context_id=context_id ) yield frame - - yield TTSStoppedFrame(context_id=context_id) diff --git a/src/pipecat/tests/utils.py b/src/pipecat/tests/utils.py index 356fb8545..ca18c4c4d 100644 --- a/src/pipecat/tests/utils.py +++ b/src/pipecat/tests/utils.py @@ -199,13 +199,13 @@ async def run_test( # # Down frames # - received_down_frames: Sequence[Frame] = [] - if expected_down_frames is not None: - while not received_down.empty(): - frame = await received_down.get() - if not isinstance(frame, EndFrame) or not send_end_frame: - received_down_frames.append(frame) + received_down_frames: list[Frame] = [] + while not received_down.empty(): + frame = await received_down.get() + if not isinstance(frame, EndFrame) or not send_end_frame: + received_down_frames.append(frame) + if expected_down_frames is not None: down_frames_printed = "[" for frame in received_down_frames: down_frames_printed += f"{frame.__class__.__name__}, " @@ -225,12 +225,12 @@ async def run_test( # # Up frames # - received_up_frames: Sequence[Frame] = [] - if expected_up_frames is not None: - while not received_up.empty(): - frame = await received_up.get() - received_up_frames.append(frame) + received_up_frames: list[Frame] = [] + while not received_up.empty(): + frame = await received_up.get() + received_up_frames.append(frame) + if expected_up_frames is not None: print("received UP frames =", received_up_frames) print("expected UP frames =", expected_up_frames) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 15eb99f3a..e14ae3828 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -44,12 +44,15 @@ from pipecat.frames.frames import ( StartFrame, SystemFrame, TTSAudioRawFrame, + TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams from pipecat.utils.time import nanoseconds_to_seconds BOT_VAD_STOP_SECS = 0.35 +# Only used as a fallback +BOT_VAD_STOP_FALLBACK_SECS = 3 class BaseOutputTransport(FrameProcessor): @@ -354,6 +357,8 @@ class BaseOutputTransport(FrameProcessor): await sender.handle_sync_frame(frame) elif isinstance(frame, MixerControlFrame): await sender.handle_mixer_control_frame(frame) + elif isinstance(frame, TTSStoppedFrame): + await sender.handle_sync_frame(frame) elif frame.pts: await sender.handle_timed_frame(frame) else: @@ -412,6 +417,8 @@ class BaseOutputTransport(FrameProcessor): # Indicates if the bot is currently speaking. self._bot_speaking = False + # Indicates if TTS audio has been received since the last stop. + self._tts_audio_received = False # Last time a BotSpeakingFrame was pushed. self._bot_speaking_frame_time = 0 # How often a BotSpeakingFrame should be pushed (value should be @@ -639,6 +646,7 @@ class BaseOutputTransport(FrameProcessor): return self._bot_speaking = False + self._tts_audio_received = False # Clean audio buffer (there could be tiny left overs if not multiple # to our output chunk size). @@ -682,6 +690,9 @@ class BaseOutputTransport(FrameProcessor): async def _handle_bot_speech(self, frame: Frame): # TTS case. if isinstance(frame, TTSAudioRawFrame): + # We will only trigger bot stopped speaking based on the TTSStoppedFrame, + # if we have received audio from TTS + self._tts_audio_received = True await self._bot_currently_speaking() # Speech stream case. elif isinstance(frame, SpeechOutputAudioRawFrame): @@ -703,6 +714,12 @@ class BaseOutputTransport(FrameProcessor): await self._transport.send_message(frame) elif isinstance(frame, OutputDTMFFrame): await self._transport.write_dtmf(frame) + elif isinstance(frame, TTSStoppedFrame): + # We will only trigger bot stopped speaking based on the TTSStoppedFrame, + # if we have received audio from TTS + if self._tts_audio_received: + logger.debug("Bot stopped speaking based on TTSStoppedFrame") + await self._bot_stopped_speaking() else: await self._transport.write_transport_frame(frame) @@ -722,7 +739,7 @@ class BaseOutputTransport(FrameProcessor): yield frame self._audio_queue.task_done() except asyncio.TimeoutError: - # Notify the bot stopped speaking upstream if necessary. + # Fallback: notify the bot stopped speaking upstream if necessary based on timeout. await self._bot_stopped_speaking() async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: @@ -737,7 +754,7 @@ class BaseOutputTransport(FrameProcessor): yield frame self._audio_queue.task_done() except asyncio.QueueEmpty: - # Notify the bot stopped speaking upstream if necessary. + # Fallback: notify the bot stopped speaking upstream if necessary based on timeout. diff_time = time.time() - last_frame_time if diff_time > vad_stop_secs: await self._bot_stopped_speaking() @@ -755,9 +772,9 @@ class BaseOutputTransport(FrameProcessor): await asyncio.sleep(0) if self._mixer: - return with_mixer(BOT_VAD_STOP_SECS) + return with_mixer(BOT_VAD_STOP_FALLBACK_SECS) else: - return without_mixer(BOT_VAD_STOP_SECS) + return without_mixer(BOT_VAD_STOP_FALLBACK_SECS) async def _send_silence(self, secs: int): if secs <= 0: diff --git a/tests/test_piper_tts.py b/tests/test_piper_tts.py index 662b9a40c..2b1dae577 100644 --- a/tests/test_piper_tts.py +++ b/tests/test_piper_tts.py @@ -77,28 +77,36 @@ async def test_run_piper_tts_success(aiohttp_client): TTSSpeakFrame(text="Hello world."), ] - expected_returned_frames = [ - AggregatedTextFrame, - TTSStartedFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSAudioRawFrame, - TTSStoppedFrame, - TTSTextFrame, - ] - frames_received = await run_test( tts_service, frames_to_send=frames_to_send, - expected_down_frames=expected_returned_frames, ) down_frames = frames_received[0] + frame_types = [type(f) for f in down_frames] + + # Verify key frames are present + assert AggregatedTextFrame in frame_types + assert TTSStartedFrame in frame_types + assert TTSStoppedFrame in frame_types + assert TTSTextFrame in frame_types + + # Verify ordering: Started → audio → Stopped → Text + started_idx = frame_types.index(TTSStartedFrame) + stopped_idx = frame_types.index(TTSStoppedFrame) + text_idx = frame_types.index(TTSTextFrame) + assert started_idx < text_idx < stopped_idx, ( + "Expected: TTSStartedFrame < TTSTextFrame < TTSStoppedFrame" + ) + + # Frames between Started and Stopped must all be audio or text + for i in range(started_idx + 1, stopped_idx): + assert frame_types[i] in (TTSAudioRawFrame, TTSTextFrame), ( + f"Unexpected frame type between Started and Stopped: {frame_types[i]}" + ) + + # All audio frames have correct sample rate audio_frames = [f for f in down_frames if isinstance(f, TTSAudioRawFrame)] + assert len(audio_frames) >= 1, "Expected at least one audio frame" for a_frame in audio_frames: assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" @@ -128,7 +136,7 @@ async def test_run_piper_tts_error(aiohttp_client): TTSSpeakFrame(text="Error case.", append_to_context=False), ] - expected_down_frames = [AggregatedTextFrame, TTSStoppedFrame, TTSTextFrame] + expected_down_frames = [AggregatedTextFrame, TTSStartedFrame, TTSTextFrame, TTSStoppedFrame] expected_up_frames = [ErrorFrame]