diff --git a/docs/api/requirements.txt b/docs/api/requirements.txt index aa4721362..b90b48f11 100644 --- a/docs/api/requirements.txt +++ b/docs/api/requirements.txt @@ -23,6 +23,7 @@ pipecat-ai[gladia] pipecat-ai[google] pipecat-ai[grok] pipecat-ai[groq] +pipecat-ai[inworld] # pipecat-ai[krisp] # Mocked pipecat-ai[koala] # pipecat-ai[langchain] # Mocked diff --git a/dot-env.template b/dot-env.template index e1e68cf42..dbed59da8 100644 --- a/dot-env.template +++ b/dot-env.template @@ -76,6 +76,9 @@ GROQ_API_KEY=... # Grok GROK_API_KEY=... +# Inworld +INWORLD_API_KEY=... + # Together.ai TOGETHER_API_KEY=... diff --git a/examples/foundational/07ab-interruptible-inworld-http.py b/examples/foundational/07ab-interruptible-inworld-http.py new file mode 100644 index 000000000..5d559ba5a --- /dev/null +++ b/examples/foundational/07ab-interruptible-inworld-http.py @@ -0,0 +1,124 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +import aiohttp +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.inworld.tts import InworldTTSService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting bot") + + # Create an HTTP session + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + # Inworld TTS Service - Unified streaming and non-streaming + # Set streaming=True for real-time audio, streaming=False for complete audio generation + streaming = True # Toggle this to switch between modes + + tts = InworldTTSService( + api_key=os.getenv("INWORLD_API_KEY", ""), + aiohttp_session=session, + voice_id="Ashley", + model="inworld-tts-1", + streaming=streaming, # True: real-time chunks, False: complete audio then playback + params=InworldTTSService.InputParams( + temperature=0.8, + ), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are very knowledgable about dogs. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + + await runner.run(task) + + +if __name__ == "__main__": + from pipecat.examples.run import main + + main(run_example, transport_params=transport_params) diff --git a/pyproject.toml b/pyproject.toml index 0c5be3a27..5ef4ddeef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,7 @@ google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", " grok = [] groq = [ "groq~=0.23.0" ] gstreamer = [ "pygobject~=3.50.0" ] +inworld = [] krisp = [ "pipecat-ai-krisp~=0.4.0" ] koala = [ "pvkoala~=2.0.3" ] langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-openai~=0.3.9" ] diff --git a/src/pipecat/services/inworld/__init__.py b/src/pipecat/services/inworld/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/src/pipecat/services/inworld/__init__.py @@ -0,0 +1 @@ + diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py new file mode 100644 index 000000000..fdd0d1a5c --- /dev/null +++ b/src/pipecat/services/inworld/tts.py @@ -0,0 +1,592 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Inworld AI Text-to-Speech Service Implementation. + +This module provides integration with Inworld AI's HTTP-based TTS API, enabling +both streaming and non-streaming text-to-speech synthesis with high-quality, +natural-sounding voices. + +Key Features: + +- HTTP streaming and non-streaming API support for flexible audio generation +- Multiple voice options (Ashley, Hades, etc.) +- Automatic language detection from input text (no manual language setting required) +- Real-time audio chunk processing with proper buffering +- WAV header handling and audio format conversion +- Comprehensive error handling and metrics tracking + +Technical Implementation: + +- Uses aiohttp for HTTP connections +- Implements both JSON line-by-line parsing (streaming) and complete response (non-streaming) +- Handles base64-encoded audio data with proper decoding +- Manages audio continuity to prevent clicks and artifacts +- Integrates with Pipecat's frame-based pipeline architecture + +Examples:: + + async with aiohttp.ClientSession() as session: + # Streaming mode (default) - real-time audio generation + tts = InworldTTSService( + api_key=os.getenv("INWORLD_API_KEY"), + aiohttp_session=session, + voice_id="Ashley", + model="inworld-tts-1", + streaming=True, # Default + params=InworldTTSService.InputParams( + temperature=0.8, # Optional: control synthesis variability (range: [0, 2]) + ), + ) + + # Non-streaming mode - complete audio generation then playback + tts = InworldTTSService( + api_key=os.getenv("INWORLD_API_KEY"), + aiohttp_session=session, + voice_id="Ashley", + model="inworld-tts-1", + streaming=False, + params=InworldTTSService.InputParams( + temperature=0.8, + ), + ) +""" + +import base64 +import json +from typing import AsyncGenerator, Optional + +import aiohttp +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + StartFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.tts_service import TTSService +from pipecat.utils.tracing.service_decorators import traced_tts + + +class InworldTTSService(TTSService): + """Inworld AI HTTP-based Text-to-Speech Service. + + This unified service integrates Inworld AI's high-quality TTS API with Pipecat's pipeline + architecture. It supports both streaming and non-streaming modes, providing flexible + speech synthesis with natural-sounding voices. + + Key Features: + + - **Streaming Mode**: Real-time HTTP streaming for minimal latency + - **Non-Streaming Mode**: Complete audio synthesis then chunked playback + - Multiple voice options (Ashley, Hades, etc.) + - High-quality audio output (48kHz LINEAR16 PCM) + - Automatic audio format handling and header stripping + - Comprehensive error handling and recovery + - Built-in performance metrics and monitoring + - Unified interface for both modes + + Technical Architecture: + + - Uses aiohttp for non-blocking HTTP requests + - **Streaming**: Implements JSON line-by-line streaming protocol + - **Non-Streaming**: Single HTTP POST with complete response + - Processes base64-encoded audio chunks in real-time or batch + - Manages audio continuity to prevent artifacts + - Integrates with Pipecat's frame-based pipeline system + + Supported Configuration: + + - Voice Selection: Ashley, Hades, and other Inworld voices + - Models: inworld-tts-1 and other available models + - Audio Formats: LINEAR16 PCM at various sample rates + - Language Detection: Automatically inferred from input text (no explicit language setting required) + - Mode Selection: streaming=True for real-time, streaming=False for complete synthesis + + Examples:: + + async with aiohttp.ClientSession() as session: + # Streaming mode (default) - Real-time audio generation + tts_streaming = InworldTTSService( + api_key=os.getenv("INWORLD_API_KEY"), + aiohttp_session=session, + voice_id="Ashley", + model="inworld-tts-1", + streaming=True, # Default behavior + params=InworldTTSService.InputParams( + temperature=0.8, # Add variability to speech synthesis (range: [0, 2]) + ), + ) + + # Non-streaming mode - Complete audio then playback + tts_complete = InworldTTSService( + api_key=os.getenv("INWORLD_API_KEY"), + aiohttp_session=session, + voice_id="Hades", + model="inworld-tts-1-max", + streaming=False, + params=InworldTTSService.InputParams( + temperature=0.8, + ), + ) + """ + + class InputParams(BaseModel): + """Optional input parameters for Inworld TTS configuration. + + Parameters: + temperature: Voice temperature control for synthesis variability (e.g., 0.8). + Valid range: [0, 2]. Higher values increase variability. + + Note: + Language is automatically inferred from the input text by Inworld's TTS models, + so no explicit language parameter is required. + """ + + temperature: Optional[float] = None # optional temperature control (range: [0, 2]) + + def __init__( + self, + *, + api_key: str, + aiohttp_session: aiohttp.ClientSession, + voice_id: str = "Ashley", + model: str = "inworld-tts-1", + streaming: bool = True, + sample_rate: Optional[int] = None, + encoding: str = "LINEAR16", + params: Optional[InputParams] = None, + **kwargs, + ): + """Initialize the Inworld TTS service. + + Sets up the TTS service with Inworld AI's API configuration. + This constructor prepares all necessary parameters for speech synthesis. + + Args: + api_key: Inworld API key for authentication (base64-encoded from Inworld Portal). + Get this from: Inworld Portal > Settings > API Keys > Runtime API Key + aiohttp_session: Shared aiohttp session for HTTP requests. Must be provided + for proper connection pooling and resource management. + voice_id: Voice selection for speech synthesis. Common options include: + - "Ashley": Clear, professional female voice (default) + - "Hades": Deep, authoritative male voice + - And many more available in your Inworld account + model: TTS model to use for speech synthesis: + - "inworld-tts-1": Standard quality model (default) + - "inworld-tts-1-max": Higher quality model + - Other models as available in your Inworld account + streaming: Whether to use streaming mode (True) or non-streaming mode (False). + - True: Real-time audio chunks as they're generated (lower latency) + - False: Complete audio file generated first, then chunked for playback (simpler) + The base URL is automatically selected based on this mode: + - Streaming: "https://api.inworld.ai/tts/v1/voice:stream" + - Non-streaming: "https://api.inworld.ai/tts/v1/voice" + sample_rate: Audio sample rate in Hz. If None, uses default from StartFrame. + Common values: 48000 (high quality), 24000 (good quality), 16000 (basic) + encoding: Audio encoding format. Supported options: + - "LINEAR16" (default) - Uncompressed PCM, best quality + - Other formats as supported by Inworld API + params: Optional input parameters for additional configuration. Use this to specify: + - temperature: Voice temperature control for variability (range: [0, 2], e.g., 0.8, optional) + Language is automatically inferred from input text. + **kwargs: Additional arguments passed to the parent TTSService class. + + Note: + The aiohttp_session parameter is required because Inworld's HTTP API + benefits from connection reuse and proper async session management. + """ + # Initialize parent TTSService with audio configuration + super().__init__(sample_rate=sample_rate, **kwargs) + + # Use provided params or create default configuration + params = params or InworldTTSService.InputParams() + + # Store core configuration for API requests + self._api_key = api_key # Authentication credentials + self._session = aiohttp_session # HTTP session for requests + self._streaming = streaming # Streaming mode selection + + # Set base URL based on streaming mode + if streaming: + self._base_url = "https://api.inworld.ai/tts/v1/voice:stream" # Streaming endpoint + else: + self._base_url = "https://api.inworld.ai/tts/v1/voice" # Non-streaming endpoint + + # Build settings dictionary that matches Inworld's API expectations + # This will be sent as JSON payload in each TTS request + # Note: Language is automatically inferred from text by Inworld's models + self._settings = { + "voiceId": voice_id, # Voice selection from direct parameter + "modelId": model, # TTS model selection from direct parameter + "audio_config": { # Audio format configuration + "audio_encoding": encoding, # Format: LINEAR16, MP3, etc. + "sample_rate_hertz": 0, # Will be set in start() from parent service + }, + } + + # Add optional temperature parameter if provided (valid range: [0, 2]) + if params and params.temperature is not None: + self._settings["temperature"] = params.temperature + + # Register voice and model with parent service for metrics and tracking + self.set_voice(voice_id) # Used for logging and metrics + self.set_model_name(model) # Used for performance tracking + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Inworld TTS service supports metrics generation. + """ + return True + + async def start(self, frame: StartFrame): + """Start the Inworld TTS service. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + self._settings["audio_config"]["sample_rate_hertz"] = self.sample_rate + + async def stop(self, frame: EndFrame): + """Stop the Inworld TTS service. + + Args: + frame: The end frame. + """ + await super().stop(frame) + + async def cancel(self, frame: CancelFrame): + """Cancel the Inworld TTS service. + + Args: + frame: The cancel frame. + """ + await super().cancel(frame) + + @traced_tts + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Inworld's HTTP API. + + This is the core TTS processing function that adapts its behavior based on the streaming mode: + + **Streaming Mode (streaming=True)**: + 1. Sends text to Inworld's streaming TTS endpoint + 2. Receives JSON-streamed audio chunks in real-time + 3. Processes and cleans audio data (removes WAV headers, validates content) + 4. Yields audio frames for immediate playback in the pipeline + + **Non-Streaming Mode (streaming=False)**: + 1. Sends text to Inworld's non-streaming TTS endpoint + 2. Receives complete audio file as base64-encoded response + 3. Processes entire audio and chunks for playback + 4. Yields audio frames in manageable pieces + + Technical Details: + + - **Streaming**: Uses HTTP streaming with JSON line-by-line responses + - **Non-Streaming**: Single HTTP POST with complete JSON response + - Each audio chunk contains base64-encoded audio data + - Implements buffering to handle partial data (streaming mode) + - Strips WAV headers to prevent audio artifacts/clicks + - Provides optimized audio delivery for each mode + + Args: + text: The text to synthesize into speech. + + Yields: + Frame: Audio frames containing the synthesized speech, plus control frames. + + Raises: + ErrorFrame: If API errors occur or audio processing fails. + """ + logger.debug(f"{self}: Generating TTS [{text}] (streaming={self._streaming})") + + # ================================================================================ + # STEP 1: PREPARE API REQUEST + # ================================================================================ + # Build the JSON payload according to Inworld's API specification + # This matches the format shown in their documentation examples + # Note: Language is automatically inferred from the input text by Inworld's models + payload = { + "text": text, # Text to synthesize + "voiceId": self._settings["voiceId"], # Voice selection (Ashley, Hades, etc.) + "modelId": self._settings["modelId"], # TTS model (inworld-tts-1) + "audio_config": self._settings[ + "audio_config" + ], # Audio format settings (LINEAR16, 48kHz) + } + + # Add optional temperature parameter if configured (valid range: [0, 2]) + if "temperature" in self._settings: + payload["temperature"] = self._settings["temperature"] + + # Set up HTTP headers for authentication and content type + # Inworld requires Basic auth with base64-encoded API key + headers = { + "Authorization": f"Basic {self._api_key}", # Base64 API key from Inworld Portal + "Content-Type": "application/json", # JSON request body + } + + try: + # ================================================================================ + # STEP 2: INITIALIZE METRICS AND PROCESSING + # ================================================================================ + # Start measuring Time To First Byte (TTFB) for performance tracking + await self.start_ttfb_metrics() + + # Signal to the pipeline that TTS generation has started + # This allows downstream processors to prepare for incoming audio + yield TTSStartedFrame() + + # ================================================================================ + # STEP 3: MAKE HTTP REQUEST (MODE-SPECIFIC) + # ================================================================================ + # Use aiohttp to make request to Inworld's endpoint + # Behavior differs based on streaming mode + async with self._session.post( + self._base_url, json=payload, headers=headers + ) as response: + # ================================================================================ + # STEP 4: HANDLE HTTP ERRORS + # ================================================================================ + # Check for API errors (expired keys, invalid requests, etc.) + if response.status != 200: + error_text = await response.text() + logger.error(f"Inworld API error: {error_text}") + await self.push_error(ErrorFrame(f"Inworld API error: {error_text}")) + return + + # ================================================================================ + # STEP 5: PROCESS RESPONSE (MODE-SPECIFIC) + # ================================================================================ + # Choose processing method based on streaming mode + if self._streaming: + # Stream processing: JSON line-by-line with real-time audio + async for frame in self._process_streaming_response(response): + yield frame + else: + # Non-stream processing: Complete JSON response with batch audio + async for frame in self._process_non_streaming_response(response): + yield frame + + # ================================================================================ + # STEP 6: FINALIZE METRICS AND CLEANUP + # ================================================================================ + # Start usage metrics tracking after successful completion + await self.start_tts_usage_metrics(text) + + except Exception as e: + # ================================================================================ + # STEP 7: ERROR HANDLING + # ================================================================================ + # Log any unexpected errors and notify the pipeline + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + finally: + # ================================================================================ + # STEP 8: CLEANUP AND COMPLETION + # ================================================================================ + # Always stop metrics tracking, even if errors occurred + await self.stop_all_metrics() + + # Signal to pipeline that TTS generation is complete + # This allows downstream processors to finalize audio processing + yield TTSStoppedFrame() + + async def _process_streaming_response( + self, response: aiohttp.ClientResponse + ) -> AsyncGenerator[Frame, None]: + """Process streaming JSON response with real-time audio chunks. + + This method handles Inworld's streaming endpoint response format: + - JSON lines containing base64-encoded audio chunks + - Real-time processing as data arrives + - Line buffering to handle partial JSON data + + Args: + response: The aiohttp response object from streaming endpoint. + + Yields: + Frame: Audio frames as they're processed from the stream. + """ + # ================================================================================ + # STREAMING: PROCESS JSON LINE-BY-LINE RESPONSE + # ================================================================================ + # Inworld streams JSON lines where each line contains audio data + # We need to buffer incoming data and process complete lines + + # Buffer to accumulate incoming text data + # This handles cases where JSON lines are split across HTTP chunks + buffer = "" + + # Read HTTP response in manageable chunks (1KB each) + # This prevents memory issues with large responses + async for chunk in response.content.iter_chunked(1024): + if not chunk: + continue + + # ============================================================================ + # BUFFER MANAGEMENT + # ============================================================================ + # Decode binary chunk to text and add to our line buffer + # Each chunk may contain partial JSON lines, so we need to accumulate + buffer += chunk.decode("utf-8") + + # ============================================================================ + # LINE-BY-LINE JSON PROCESSING + # ============================================================================ + # Process all complete lines in the buffer (lines ending with \n) + # Leave partial lines in buffer for next iteration + while "\n" in buffer: + # Split on first newline, keeping remainder in buffer + line, buffer = buffer.split("\n", 1) + line_str = line.strip() + + # Skip empty lines (common in streaming responses) + if not line_str: + continue + + try: + # ================================================================ + # PARSE JSON AND EXTRACT AUDIO + # ================================================================ + # Parse the JSON line - should contain audio data + chunk_data = json.loads(line_str) + + # Check if this line contains audio content + # Inworld's response format: {"result": {"audioContent": "base64data"}} + if "result" in chunk_data and "audioContent" in chunk_data["result"]: + # Process the audio chunk + await self.stop_ttfb_metrics() + async for frame in self._process_audio_chunk( + base64.b64decode(chunk_data["result"]["audioContent"]) + ): + yield frame + + except json.JSONDecodeError: + # Ignore malformed JSON lines - streaming can have partial data + # This is normal in HTTP streaming scenarios + continue + + async def _process_non_streaming_response( + self, response: aiohttp.ClientResponse + ) -> AsyncGenerator[Frame, None]: + """Process complete JSON response with full audio content. + + This method handles Inworld's non-streaming endpoint response format: + - Single JSON response with complete base64-encoded audio + - Full audio download then chunked playback + - Simpler processing without line buffering + + Args: + response: The aiohttp response object from non-streaming endpoint. + + Yields: + Frame: Audio frames chunked from the complete audio. + """ + # ================================================================================ + # NON-STREAMING: PARSE COMPLETE JSON RESPONSE + # ================================================================================ + # Parse the complete JSON response containing base64 audio data + response_data = await response.json() + + # ================================================================================ + # EXTRACT AND VALIDATE AUDIO CONTENT + # ================================================================================ + # Extract the base64-encoded audio content from response + if "audioContent" not in response_data: + logger.error("No audioContent in Inworld API response") + await self.push_error(ErrorFrame("No audioContent in response")) + return + + # ================================================================================ + # DECODE AND PROCESS COMPLETE AUDIO DATA + # ================================================================================ + # Decode the base64 audio data to binary + audio_data = base64.b64decode(response_data["audioContent"]) + + # Strip WAV header if present (Inworld may include WAV header) + # This prevents audio clicks and ensures clean audio playback + if len(audio_data) > 44 and audio_data.startswith(b"RIFF"): + audio_data = audio_data[44:] + + # ================================================================================ + # CHUNK AND YIELD COMPLETE AUDIO FOR PLAYBACK + # ================================================================================ + # Chunk the complete audio for streaming playback + # This allows the pipeline to process audio in manageable pieces + CHUNK_SIZE = self.chunk_size + + for i in range(0, len(audio_data), CHUNK_SIZE): + chunk = audio_data[i : i + CHUNK_SIZE] + if len(chunk) > 0: + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame( + audio=chunk, + sample_rate=self.sample_rate, + num_channels=1, + ) + + async def _process_audio_chunk(self, audio_chunk: bytes) -> AsyncGenerator[Frame, None]: + """Process a single audio chunk (common logic for both modes). + + This method handles audio chunk processing that's common to both streaming + and non-streaming modes: + - WAV header removal + - Audio validation + - Frame creation and yielding + + Args: + audio_chunk: Raw audio data bytes to process. + + Yields: + Frame: Audio frame if chunk contains valid audio data. + """ + # ======================================================== + # AUDIO DATA VALIDATION + # ======================================================== + # Skip empty audio chunks that could cause discontinuities + # Empty chunks can create gaps or clicks in audio playback + if not audio_chunk: + return + + # Start with the raw audio data + audio_data = audio_chunk + + # ======================================================== + # WAV HEADER REMOVAL (CRITICAL FOR AUDIO QUALITY) + # ======================================================== + # Each audio chunk may have its own WAV header (44 bytes) + # These headers contain metadata and will sound like clicks if played + # We must strip them from EVERY chunk, not just the first one + if ( + len(audio_chunk) > 44 # Ensure chunk is large enough + and audio_chunk.startswith(b"RIFF") # Check for WAV header magic bytes + ): + # Remove the 44-byte WAV header to get pure audio data + audio_data = audio_chunk[44:] + + # ======================================================== + # YIELD AUDIO FRAME TO PIPELINE + # ======================================================== + # Only yield frames with actual audio content + # Empty frames can cause pipeline issues + if len(audio_data) > 0: + # Create Pipecat audio frame with processed audio data + yield TTSAudioRawFrame( + audio=audio_data, # Clean audio without headers + sample_rate=self.sample_rate, # Configured sample rate (48kHz) + num_channels=1, # Mono audio + )