Compare commits

..

16 Commits

Author SHA1 Message Date
filipi87
7c938102ad Allowing to buffer audio inside the base output transport. 2026-05-21 11:54:01 -03:00
filipi87
359c9394d0 Caching 500 ms of audio 2026-05-21 10:58:52 -03:00
filipi87
144a1ece7b Fixing ruff format. 2026-05-21 10:05:33 -03:00
filipi87
6ef7f6446a Saving the audio inside the Tavus video so we can test. 2026-05-21 09:01:12 -03:00
filipi87
7c61c36825 Recording the audios that we are receiving. 2026-05-20 19:03:01 -03:00
filipi87
1338da6831 Don't inject silence in the proxy. 2026-05-20 18:26:11 -03:00
filipi87
0dc1337a3c Not sending silence by default. 2026-05-20 17:48:11 -03:00
filipi87
6a238e0d62 Refactoring how we are handling the silence. 2026-05-20 17:41:11 -03:00
filipi87
e7bad7a007 Buffering the audio before sending back. 2026-05-20 17:23:15 -03:00
filipi87
b360fbf7fc Handling interruption. 2026-05-20 16:26:47 -03:00
filipi87
f568b1d8df Fixing ruff format. 2026-05-20 12:48:25 -03:00
filipi87
fd7af7ba9f Changing the silence threshold to 10. 2026-05-20 12:45:31 -03:00
filipi87
40851696b7 Not injecting silence in the TavusTransport. 2026-05-20 12:37:49 -03:00
filipi87
b7d272a5be Skipping webrtc injected silence. 2026-05-20 12:18:43 -03:00
filipi87
996aa461ac Sending audio faster than realtime. 2026-05-20 12:03:24 -03:00
filipi87
aef6226a1c Describing the TAVUS_SAMPLE_ROOM_URL environment variable. 2026-05-20 11:22:38 -03:00
45 changed files with 459 additions and 1211 deletions

View File

@@ -1,91 +0,0 @@
---
name: squash-commits
description: Reorganize messy branch commits into a small set of logical, meaningful commits without changing any content. Drops merge-from-main commits. Safe: creates a backup branch first.
---
Reorganize the commits on the current branch into a small number of logical commits. Do NOT change any file content — only the commit structure changes.
## Instructions
### 1. Safety check
```bash
git status --short
```
If there are uncommitted changes, stop and tell the user to commit or stash them first.
### 2. Inspect the branch
```bash
git log main..HEAD --oneline
git diff main..HEAD --name-only
```
List every file changed vs `main` and every commit on the branch (excluding merge commits from main).
### 3. Create a backup branch
```bash
git branch backup/<current-branch-name>
```
Tell the user the backup exists so they can recover if needed.
### 4. Soft-reset to main and unstage everything
```bash
git reset --soft main
git restore --staged .
```
All branch changes are now in the working tree, unstaged. No content has changed.
### 5. Plan the logical groups
Read the changed files and the original commit messages to understand what the work covers. Group related files into logical commits. Typical groups:
- Core feature or fix (new source files + modified core files)
- Secondary features or fixes (each as its own commit if distinct)
- Refactoring or renames
- Tests
- Changelogs / docs
Use the changelog files (if any) as a strong hint — each changelog entry often maps to one commit.
Present the proposed grouping to the user and ask for confirmation before committing.
### 6. Commit in logical groups
For each group, stage only the relevant files and commit with a clear message following the project's conventions:
```bash
git add <file1> <file2> ...
git commit -m "..."
```
Use conventional commit prefixes if the project uses them (`feat:`, `fix:`, `refactor:`, `test:`, `chore:`).
### 7. Verify
```bash
git log main..HEAD --oneline
git diff main..HEAD --name-only
git status --short
```
Confirm:
- Commit count is small and each message is meaningful
- The set of changed files vs `main` is identical to before
- Working tree is clean
### 8. Remind about force-push
The branch history has been rewritten. Tell the user they will need to `git push --force-with-lease` when they are ready to update the remote. Do NOT push automatically.
## Rules
- Never change file contents. If you find yourself editing a file, stop.
- Never skip the backup branch step.
- Never force-push without explicit user instruction.
- If any step fails or the result looks wrong, tell the user and suggest restoring from the backup: `git reset --hard backup/<branch-name>`.

View File

@@ -92,7 +92,7 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
| Category | Services |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/api-reference/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/api-reference/server/services/stt/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/api-reference/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/api-reference/server/services/stt/gladia), [Google](https://docs.pipecat.ai/api-reference/server/services/stt/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/stt/mistral), [NVIDIA](https://docs.pipecat.ai/api-reference/server/services/stt/nvidia), [OpenAI (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/openai), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/api-reference/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/api-reference/server/services/stt/whisper), [xAI](https://docs.pipecat.ai/api-reference/server/services/stt/xai) |
| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/api-reference/server/services/llm/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/api-reference/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/api-reference/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/api-reference/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/server/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/server/services/llm/groq), [Inception](https://docs.pipecat.ai/api-reference/server/services/llm/inception), [Mistral](https://docs.pipecat.ai/api-reference/server/services/llm/mistral), [Nebius](https://docs.pipecat.ai/api-reference/server/services/llm/nebius), [Novita](https://docs.pipecat.ai/api-reference/server/services/llm/novita), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/server/services/llm/nvidia), [Ollama](https://docs.pipecat.ai/api-reference/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/llm/openai), [OpenAI Responses](https://docs.pipecat.ai/api-reference/server/services/llm/openai-responses), [OpenRouter](https://docs.pipecat.ai/api-reference/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/api-reference/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/api-reference/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/api-reference/server/services/llm/sambanova), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/llm/sarvam), [Together AI](https://docs.pipecat.ai/api-reference/server/services/llm/together) |
| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/api-reference/server/services/llm/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/api-reference/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/api-reference/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/api-reference/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/server/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/llm/mistral), [Nebius](https://docs.pipecat.ai/api-reference/server/services/llm/nebius), [Novita](https://docs.pipecat.ai/api-reference/server/services/llm/novita), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/server/services/llm/nvidia), [Ollama](https://docs.pipecat.ai/api-reference/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/llm/openai), [OpenAI Responses](https://docs.pipecat.ai/api-reference/server/services/llm/openai-responses), [OpenRouter](https://docs.pipecat.ai/api-reference/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/api-reference/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/api-reference/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/api-reference/server/services/llm/sambanova), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/llm/sarvam), [Together AI](https://docs.pipecat.ai/api-reference/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/api-reference/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/api-reference/server/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/api-reference/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/api-reference/server/services/tts/fish), [Google](https://docs.pipecat.ai/api-reference/server/services/tts/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/api-reference/server/services/tts/groq), [Hume](https://docs.pipecat.ai/api-reference/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/api-reference/server/services/tts/inworld), [Kokoro](https://docs.pipecat.ai/api-reference/server/services/tts/kokoro), [LMNT](https://docs.pipecat.ai/api-reference/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/api-reference/server/services/tts/minimax), [Mistral](https://docs.pipecat.ai/api-reference/server/services/tts/mistral), [Neuphonic](https://docs.pipecat.ai/api-reference/server/services/tts/neuphonic), [NVIDIA](https://docs.pipecat.ai/api-reference/server/services/tts/nvidia), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/tts/openai), [Piper](https://docs.pipecat.ai/api-reference/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/api-reference/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/api-reference/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/tts/sarvam), [Smallest](https://docs.pipecat.ai/api-reference/server/services/tts/smallest), [Soniox](https://docs.pipecat.ai/api-reference/server/services/tts/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/tts/speechmatics), [xAI](https://docs.pipecat.ai/api-reference/server/services/tts/xai), [XTTS](https://docs.pipecat.ai/api-reference/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/api-reference/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/api-reference/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/api-reference/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/api-reference/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/api-reference/server/services/s2s/ultravox), |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/api-reference/server/services/transport/fastapi-websocket), [LiveKit (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/livekit), [SmallWebRTCTransport](https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc), [Vonage (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/vonage), [WebSocket Server](https://docs.pipecat.ai/api-reference/server/services/transport/websocket-server), [WhatsApp](https://docs.pipecat.ai/api-reference/server/services/transport/whatsapp), Local |

View File

@@ -1 +0,0 @@
- Added `InceptionLLMService` for Inception's Mercury 2 diffusion reasoning model, with support for `reasoning_effort` and `realtime` settings.

View File

@@ -1 +0,0 @@
- Fixed websocket STT connection setup failures so services clear stale websocket state and emit non-fatal error frames, allowing `ServiceSwitcher` failover to keep agents running.

View File

@@ -1 +0,0 @@
- Added `max_endpoint_delay_ms` to `SonioxSTTService.Settings`, controlling the maximum delay (500-3000 ms) before endpoint detection finalizes a turn.

View File

@@ -1 +0,0 @@
- `SonioxSTTService` now applies settings updates (e.g. via `STTUpdateSettingsFrame`) using a graceful reconnect instead of a hard disconnect/reconnect, preserving the service's reconnect retry behavior.

View File

@@ -1 +0,0 @@
- Removed the unsupported Georgian (`Language.KA`) language mapping from `SonioxSTTService`.

View File

@@ -1 +0,0 @@
- Updated the default p99 TTFS latency values for Smallest AI, Mistral, and XAI STT so turn stop timing uses measured values instead of the conservative fallback.

View File

@@ -1 +0,0 @@
- Updated the development runner startup banner to show the prebuilt client URL once and list enabled or disabled transports with install hints.

View File

@@ -1 +0,0 @@
- Fixed the development runner so missing optional transport dependencies disable only their related routes instead of failing startup in all-transport mode.

View File

@@ -1 +0,0 @@
- Fixed a race in `ElevenLabsTTSService` where the periodic keepalive could be sent for a new turn's context before that context's `voice_settings` initialization message, causing ElevenLabs to close the WebSocket with a 1008 policy violation (`voice_settings field must be provided in the first message ...`). The keepalive now only targets a context once its context-init has been sent.

View File

@@ -1 +0,0 @@
- Bumped `pipecat-ai-prebuilt` to 1.0.1 in the `runner` extra, updating the prebuilt client UI served by the development runner.

View File

@@ -91,9 +91,6 @@ HEYGEN_LIVE_AVATAR_API_KEY=...
HUME_API_KEY=...
HUME_VOICE_ID=...
# Inception
INCEPTION_API_KEY=...
# Inworld
INWORLD_API_KEY=...
@@ -199,6 +196,10 @@ SPEECHMATICS_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
# Used by scripts/daily/test_tavus_transport.py, which mimics Tavus behavior
# inside a Daily room for local testing. Set this to the Daily room URL where
# the Pipecat pipeline is running.
TAVUS_SAMPLE_ROOM_URL=https://...
# Telnyx
TELNYX_API_KEY=...

View File

@@ -1,177 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.inception.llm import InceptionLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = InceptionLLMService(
api_key=os.environ["INCEPTION_API_KEY"],
settings=InceptionLLMService.Settings(
reasoning_effort="instant",
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -22,9 +22,9 @@ from pipecat.processors.aggregators.llm_response_universal import (
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.services.soniox.tts import SonioxTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -53,7 +53,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt = SonioxSTTService(api_key=os.environ["SONIOX_API_KEY"])
tts = SonioxTTSService(api_key=os.environ["SONIOX_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
@@ -98,9 +103,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.queue_frames([LLMRunFrame()])
await asyncio.sleep(10)
logger.info("Updating Soniox STT settings: language_hints=[es]")
logger.info("Updating Soniox STT settings: language=es")
await task.queue_frame(
STTUpdateSettingsFrame(delta=SonioxSTTService.Settings(language_hints=[Language.ES]))
STTUpdateSettingsFrame(delta=SonioxSTTService.Settings(language=Language.ES))
)
@transport.event_handler("on_client_disconnected")

View File

@@ -5,8 +5,12 @@
#
import datetime
import io
import os
import wave
import aiofiles
import aiohttp
from dotenv import load_dotenv
from loguru import logger
@@ -21,6 +25,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -32,6 +37,21 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
async def save_audio_file(audio: bytes, filename: str, sample_rate: int, num_channels: int):
"""Save audio data to a WAV file."""
if len(audio) > 0:
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Audio saved to {filename}")
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
@@ -59,7 +79,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"], audio_passthrough=True)
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
@@ -87,6 +107,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
@@ -96,6 +118,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
tts, # TTS
tavus, # Tavus output layer
transport.output(), # Transport bot output
audiobuffer, # Audio recording
assistant_aggregator, # Assistant spoken responses
]
)
@@ -114,6 +137,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
await audiobuffer.start_recording()
# Kick off the conversation.
context.add_message(
{
@@ -128,6 +152,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recordings/merged_{timestamp}.wav"
os.makedirs("recordings", exist_ok=True)
await save_audio_file(audio, filename, sample_rate, num_channels)
@audiobuffer.event_handler("on_track_audio_data")
async def on_track_audio_data(buffer, user_audio, bot_audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("recordings", exist_ok=True)
await save_audio_file(user_audio, f"recordings/user_{timestamp}.wav", sample_rate, 1)
await save_audio_file(bot_audio, f"recordings/bot_{timestamp}.wav", sample_rate, 1)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -77,7 +77,6 @@ groq = [ "groq>=0.23.0,<2" ]
gstreamer = [ "pygobject~=3.50.0" ]
heygen = [ "livekit>=1.0.13,<2", "pipecat-ai[websockets-base]" ]
hume = [ "hume>=0.11.2,<1" ]
inception = []
inworld = [ "pipecat-ai[websockets-base]" ]
koala = [ "pvkoala~=2.0.3" ]
kokoro = [ "kokoro-onnx>=0.5.0,<1", "requests>=2.32.5,<3" ]
@@ -104,7 +103,7 @@ piper = [ "piper-tts>=1.3.0,<2", "requests>=2.32.5,<3" ]
qwen = []
resembleai = [ "pipecat-ai[websockets-base]" ]
rime = [ "pipecat-ai[websockets-base]" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-prebuilt>=1.0.1"]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-prebuilt>=1.0.0"]
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
sambanova = []
sarvam = [ "sarvamai==0.1.28", "pipecat-ai[websockets-base]" ]

View File

@@ -1,6 +1,9 @@
import array
import asyncio
import datetime
import os
import signal
import wave
from daily import (
AudioData,
@@ -15,6 +18,16 @@ from loguru import logger
load_dotenv(override=True)
# Pipecat sends audio at this true content rate but declares it as
# DECLARED_SAMPLE_RATE to write_frames(), which makes delivery faster than
# real-time. We receive at the declared rate (no resampling) and play back at
# the true rate so the avatar consumes audio at normal speed.
TRUE_SAMPLE_RATE = 24000
DECLARED_SAMPLE_RATE = 48000
SPEEDUP = DECLARED_SAMPLE_RATE // TRUE_SAMPLE_RATE
CHUNK_BYTES = int(TRUE_SAMPLE_RATE * 20 / 1000) * 2 # 20 ms, 16-bit mono
MIN_AUDIO_BUFFER = CHUNK_BYTES * 5 # 100 ms pre-buffer
def completion_callback(future):
def _callback(*args):
@@ -37,19 +50,21 @@ class DailyProxyApp(EventHandler):
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, sample_rate: int):
def __init__(self):
super().__init__()
self._sample_rate = sample_rate
self._loop = asyncio.new_event_loop()
self._audio_queue: asyncio.Queue = asyncio.Queue()
# Raw PCM buffer — filled at DECLARED_SAMPLE_RATE speed, drained at TRUE_SAMPLE_RATE speed.
self._buffer = bytearray()
self._audio_task: asyncio.Task | None = None
self._wav_file: wave.Wave_write | None = None
self._client: CallClient = CallClient(event_handler=self)
self._client.update_subscription_profiles(
{"base": {"camera": "unsubscribed", "microphone": "subscribed"}}
)
self._audio_source = CustomAudioSource(self._sample_rate, 1)
# Playback source declared at TRUE_SAMPLE_RATE — consumes audio at real-time speed.
self._audio_source = CustomAudioSource(TRUE_SAMPLE_RATE, 1, False)
self._audio_track = CustomAudioTrack(self._audio_source)
def on_joined(self, data, error):
@@ -58,8 +73,27 @@ class DailyProxyApp(EventHandler):
print(f"Unable to join meeting: {error}")
self._loop.call_soon_threadsafe(self._loop.stop)
def _open_wav(self):
os.makedirs("recordings", exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = f"recordings/received_pos_speed_{timestamp}.wav"
self._wav_file = wave.open(path, "wb")
self._wav_file.setnchannels(1)
self._wav_file.setsampwidth(2)
# Declare TRUE_SAMPLE_RATE so timestamps match bot_*.wav for comparison.
# Bytes arrive at DECLARED_SAMPLE_RATE speed (2x real-time) but each byte
# is 24kHz content, so the WAV plays back at normal speed.
self._wav_file.setframerate(TRUE_SAMPLE_RATE)
logger.info(f"Recording received audio to {path}")
def _close_wav(self):
if self._wav_file:
self._wav_file.close()
self._wav_file = None
def run(self, meeting_url: str):
asyncio.set_event_loop(self._loop)
self._open_wav()
self._create_audio_task()
def handle_exit():
@@ -92,6 +126,7 @@ class DailyProxyApp(EventHandler):
if self._audio_task:
self._loop.run_until_complete(self._cancel_audio_task())
self._close_wav()
self._client.leave()
self._client.release()
@@ -113,7 +148,6 @@ class DailyProxyApp(EventHandler):
if self._audio_task:
self._audio_task.cancel()
try:
# Waits for it to finish
await self._audio_task
except asyncio.CancelledError:
pass
@@ -121,46 +155,120 @@ class DailyProxyApp(EventHandler):
async def capture_participant_audio(self, participant_id: str):
logger.info(f"Capturing participant audio: {participant_id}")
# Receiving from this custom track
# audio_source: str = "microphone"
audio_source: str = "stream"
media = {"media": {"customAudio": {audio_source: "subscribed"}}}
await self.update_subscriptions(participant_settings={participant_id: media})
# Must match the declared rate Pipecat used so WebRTC skips resampling —
# every original byte arrives intact.
self._client.set_audio_renderer(
participant_id,
self._audio_data_received,
audio_source=audio_source,
sample_rate=self._sample_rate,
sample_rate=DECLARED_SAMPLE_RATE,
callback_interval_ms=20,
)
logger.info(
f"Receiving at declared_rate={DECLARED_SAMPLE_RATE} Hz "
f"(true content: {TRUE_SAMPLE_RATE} Hz, ~{SPEEDUP}x faster than real-time)"
)
async def send_audio(self, audio: AudioData):
future = asyncio.get_running_loop().create_future()
self._audio_source.write_frames(audio.audio_frames, completion=completion_callback(future))
await future
@staticmethod
def _is_silence(data: bytes, threshold: int = 5) -> bool:
# Interpret as 16-bit signed PCM samples and check peak amplitude.
# WebRTC-injected silence is all zeros; real TTS audio has non-trivial
# amplitude. This lets us skip buffering frames that Pipecat never wrote,
# so the buffer only grows when actual speech arrives (via our trick).
samples = array.array("h", data)
return max(abs(s) for s in samples) < threshold
async def queue_audio(self, audio: AudioData):
await self._audio_queue.put(audio)
async def _buffer_audio(self, audio_data: AudioData):
"""Append received bytes to the buffer, skipping WebRTC-injected silence.
Speech frames arrive at DECLARED_SAMPLE_RATE speed (~2x real-time) so the
buffer grows ahead of the drain. WebRTC-injected silence (all-zero PCM) is
handled differently based on buffer level: below MIN_AUDIO_BUFFER we keep it
so the pre-buffer can fill; above that threshold we discard it so the buffer
drains back down between utterances.
"""
new_bytes = audio_data.audio_frames
if self._is_silence(new_bytes):
if len(self._buffer) < MIN_AUDIO_BUFFER:
# Below pre-buffer threshold: add silence so the buffer fills up.
self._buffer.extend(new_bytes)
# else: buffer is healthy, discard silence so it can drain.
return
self._buffer.extend(new_bytes)
def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
# logger.info(f"Received audio data for {participant_id}, audio_source: {audio_source}")
asyncio.run_coroutine_threadsafe(self.queue_audio(audio_data), self._loop)
if self._wav_file:
self._wav_file.writeframes(audio_data.audio_frames)
asyncio.run_coroutine_threadsafe(self._buffer_audio(audio_data), self._loop)
async def _audio_task_handler(self):
while True:
audio = await self._audio_queue.get()
await self.send_audio(audio)
async def _handle_interrupt(self):
"""Clear the audio buffer, mimicking the avatar stopping mid-speech."""
dropped = len(self._buffer)
self._buffer.clear()
logger.info(
f"Interrupt received — dropped {dropped}B ({dropped / (TRUE_SAMPLE_RATE * 2):.3f}s) from buffer"
)
#
# Daily (EventHandler)
#
def on_app_message(self, message, sender):
if not isinstance(message, dict):
return
if message.get("event_type") == "conversation.interrupt":
asyncio.run_coroutine_threadsafe(self._handle_interrupt(), self._loop)
async def _audio_task_handler(self):
"""Drain the buffer at TRUE_SAMPLE_RATE speed (real-time playback).
Waits until min_audio_buffer bytes are accumulated before starting
playback, then drains freely in chunk_bytes steps. If the buffer runs
dry it re-enters the waiting state so the next burst also gets the
pre-buffer delay.
"""
buffering = True
last_log_time = self._loop.time()
while True:
if buffering:
if len(self._buffer) >= MIN_AUDIO_BUFFER:
buffering = False
logger.debug(f"Pre-buffer reached ({MIN_AUDIO_BUFFER}B) — starting playback")
else:
await asyncio.sleep(0.001)
continue
if len(self._buffer) >= CHUNK_BYTES:
chunk = bytes(self._buffer[:CHUNK_BYTES])
del self._buffer[:CHUNK_BYTES]
future = asyncio.get_running_loop().create_future()
self._audio_source.write_frames(chunk, completion=completion_callback(future))
await future
else:
buffering = True
await asyncio.sleep(0.001)
now = self._loop.time()
if now - last_log_time >= 1.0:
buffer_seconds = len(self._buffer) / (TRUE_SAMPLE_RATE * 2)
if buffer_seconds > 0:
logger.info(
f"Buffer status: {len(self._buffer)}B ({buffer_seconds:.3f}s buffered)"
)
last_log_time = now
def on_participant_joined(self, participant):
participant_name = participant["info"]["userName"]
logger.info(f"Participant {participant_name} joined")
if participant_name != "Pipecat":
# We are only subscribing for audios from Pipecat.
# We are only subscribing for audio from Pipecat.
return
asyncio.run_coroutine_threadsafe(
self.capture_participant_audio(participant_id=participant["id"]), self._loop
@@ -173,7 +281,7 @@ class DailyProxyApp(EventHandler):
def main():
Daily.init()
room_url = os.environ["TAVUS_SAMPLE_ROOM_URL"]
app = DailyProxyApp(sample_rate=24000)
app = DailyProxyApp()
app.run(room_url)

View File

@@ -198,7 +198,6 @@ TESTS_FUNCTION_CALLING = [
("function-calling/function-calling-sarvam.py", EVAL_WEATHER),
("function-calling/function-calling-novita.py", EVAL_WEATHER),
("function-calling/function-calling-deepseek.py", EVAL_WEATHER),
("function-calling/function-calling-inception.py", EVAL_WEATHER),
# Video
("function-calling/function-calling-anthropic-video.py", EVAL_VISION_CAMERA),
("function-calling/function-calling-aws-video.py", EVAL_VISION_CAMERA),

View File

@@ -529,7 +529,7 @@ class RTVIObserver(BaseObserver):
isTTS = isinstance(frame, TTSTextFrame)
if agg_type is not AggregationType.WORD:
logger.trace(f"{self} Aggregated LLM text: {text}, {agg_type} spoken:{isTTS}")
logger.debug(f"{self} Aggregated LLM text: {text}, {agg_type} spoken:{isTTS}")
if self._params.bot_output_enabled:
message = RTVI.BotOutputMessage(

View File

@@ -90,7 +90,6 @@ To run locally:
import argparse
import asyncio
import importlib.util
import mimetypes
import os
import sys
@@ -132,18 +131,6 @@ load_dotenv(override=True)
os.environ["ENV"] = "local"
TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"]
TRANSPORT_ROUTE_DEPENDENCIES = {
"daily": ("daily",),
"webrtc": ("aiortc",),
"telephony": ("fastapi", "websockets"),
"websocket": ("fastapi", "websockets"),
}
TRANSPORT_INSTALL_HINTS = {
"daily": "install pipecat-ai[daily]",
"webrtc": "install pipecat-ai[webrtc]",
"telephony": "install pipecat-ai[websocket]",
"websocket": "install pipecat-ai[websocket]",
}
# Mirror Pipecat Cloud's 4-hour max session limit so dev rooms get cleaned up.
PIPECAT_ROOM_EXP_HOURS = 4.0
@@ -169,120 +156,6 @@ Import this to add custom routes from other packages before calling
"""
def _is_module_available(module: str) -> bool:
"""Check whether a module can be imported without importing it.
Args:
module: Fully-qualified module name to check.
Returns:
``True`` if Python can resolve the module, ``False`` otherwise.
"""
try:
return importlib.util.find_spec(module) is not None
except (ImportError, ModuleNotFoundError, ValueError):
return False
def _transport_route_dependencies(transport: str) -> tuple[str, ...]:
"""Return module dependencies required for a transport route.
Args:
transport: Transport name from the runner request or CLI.
Returns:
Module names required to enable the transport route.
"""
if transport in TELEPHONY_TRANSPORTS:
return TRANSPORT_ROUTE_DEPENDENCIES["telephony"]
return TRANSPORT_ROUTE_DEPENDENCIES.get(transport, ())
def _transport_routes_enabled(transport: str) -> bool:
"""Return whether a transport route can run in this environment.
Args:
transport: Transport name from the runner request or CLI.
Returns:
``True`` if the requested transport is enabled.
"""
return all(_is_module_available(module) for module in _transport_route_dependencies(transport))
def _runner_url(args: argparse.Namespace) -> str:
"""Return the browser URL for the runner prebuilt client."""
return f"http://{args.host}:{args.port}"
def _transport_status_lists() -> tuple[list[str], list[str]]:
"""Return enabled and disabled transport labels for the startup banner."""
transports = ["daily", "webrtc", "telephony", "websocket"]
enabled = []
disabled = []
for label in transports:
if _transport_routes_enabled(label):
enabled.append(label)
else:
disabled.append(f"{label} ({TRANSPORT_INSTALL_HINTS[label]})")
return enabled, disabled
def _format_transport_status(labels: list[str]) -> str:
"""Format a startup banner transport status list."""
return ", ".join(labels) if labels else "none"
def _print_startup_message(args: argparse.Namespace):
"""Print connection information for the development runner."""
print()
if args.transport is None:
enabled, disabled = _transport_status_lists()
print("🚀 Bot ready!")
print(f" → Open: {_runner_url(args)}")
print(f" → Enabled transports: {_format_transport_status(enabled)}")
if disabled:
print(f" → Disabled transports: {_format_transport_status(disabled)}")
elif args.transport == "webrtc":
if args.esp32:
print("🚀 Bot ready! (ESP32 mode)")
elif args.whatsapp:
print("🚀 Bot ready! (WhatsApp)")
else:
print("🚀 Bot ready! (WebRTC)")
if _transport_routes_enabled("webrtc"):
print(f" → Open: {_runner_url(args)}")
else:
print(f" → WebRTC disabled ({TRANSPORT_INSTALL_HINTS['webrtc']})")
elif args.transport == "daily":
print("🚀 Bot ready! (Daily)")
if not _transport_routes_enabled("daily"):
print(f" → Daily disabled ({TRANSPORT_INSTALL_HINTS['daily']})")
else:
print(f" → Open: {_runner_url(args)}")
if args.dialin:
print(
f" → Daily dial-in webhook: "
f"http://{args.host}:{args.port}/daily-dialin-webhook"
)
print(" → Configure this URL in your Daily phone number settings")
elif args.transport in TELEPHONY_TRANSPORTS:
print(f"🚀 Bot ready! ({args.transport.capitalize()})")
if not _transport_routes_enabled(args.transport):
print(f" → Telephony disabled ({TRANSPORT_INSTALL_HINTS['telephony']})")
else:
print(f" → Open: {_runner_url(args)}")
if args.proxy:
print(f" → XML webhook: http://{args.host}:{args.port}/")
print(f" → WebSocket: ws://{args.host}:{args.port}/ws")
elif args.transport == "vonage":
print()
print("🚀 Bot ready!")
print()
def _get_bot_module():
"""Get the bot module from the calling script."""
import importlib.util
@@ -354,8 +227,6 @@ async def _run_websocket_bot(websocket: WebSocket, args: argparse.Namespace):
def _setup_websocket_routes(app: FastAPI, args: argparse.Namespace):
"""Set up the plain WebSocket route at ``/ws-client``."""
if not _transport_routes_enabled("websocket"):
return
@app.websocket("/ws-client")
async def websocket_client_endpoint(websocket: WebSocket):
@@ -467,15 +338,6 @@ def _setup_unified_start_route(
),
)
if not _transport_routes_enabled(transport):
raise HTTPException(
status_code=400,
detail=(
f"Transport '{transport}' is disabled in this runner environment. "
"Check the startup banner for enabled transports."
),
)
if transport == "webrtc":
# WebRTC: register the session; the bot starts when the WebRTC offer arrives.
session_id = str(uuid.uuid4())
@@ -609,9 +471,6 @@ def _setup_webrtc_routes(
app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]]
):
"""Set up WebRTC-specific routes."""
if not _transport_routes_enabled("webrtc"):
return
try:
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
@@ -621,7 +480,7 @@ def _setup_webrtc_routes(
SmallWebRTCRequestHandler,
)
except ImportError as e:
logger.warning(f"WebRTC routes disabled after dependency check passed: {e}")
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
@app.get("/files/{filename:path}")
@@ -906,8 +765,6 @@ def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace):
def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
"""Set up Daily-specific routes."""
if not _transport_routes_enabled("daily"):
return
@app.get("/daily")
async def create_room_and_start_agent():
@@ -1051,9 +908,6 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
specific telephony transport is chosen via ``-t`` because the XML template
is provider-specific and requires a proxy hostname (``--proxy``).
"""
if not _transport_routes_enabled("telephony"):
return
if args.transport in TELEPHONY_TRANSPORTS:
# XML response templates (Exotel doesn't use XML webhooks)
XML_TEMPLATES = {
@@ -1306,11 +1160,43 @@ def main(parser: argparse.ArgumentParser | None = None):
return
# Print startup message
_print_startup_message(args)
if args.transport == "vonage":
print()
if args.transport is None:
print("🚀 Bot ready!")
print(f" → WebRTC: http://{args.host}:{args.port}/client")
print(f" → Daily: http://{args.host}:{args.port}/daily")
print(f" → Telephony: ws://{args.host}:{args.port}/ws")
elif args.transport == "webrtc":
if args.esp32:
print("🚀 Bot ready! (ESP32 mode)")
elif args.whatsapp:
print("🚀 Bot ready! (WhatsApp)")
else:
print("🚀 Bot ready! (WebRTC)")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
elif args.transport == "daily":
print("🚀 Bot ready! (Daily)")
if args.dialin:
print(
f" → Daily dial-in webhook: http://{args.host}:{args.port}/daily-dialin-webhook"
)
print(f" → Configure this URL in your Daily phone number settings")
else:
print(
f" → Open http://{args.host}:{args.port}/daily in your browser to start a session"
)
elif args.transport in TELEPHONY_TRANSPORTS:
print(f"🚀 Bot ready! ({args.transport.capitalize()})")
if args.proxy:
print(f" → XML webhook: http://{args.host}:{args.port}/")
print(f" → WebSocket: ws://{args.host}:{args.port}/ws")
elif args.transport == "vonage":
print()
print(f"🚀 Bot ready!")
asyncio.run(_run_vonage())
print()
return
print()
RUNNER_DOWNLOADS_FOLDER = args.folder
RUNNER_HOST = args.host

View File

@@ -586,9 +586,9 @@ class AssemblyAISTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug(f"{self} Connected to AssemblyAI WebSocket")
except Exception as e:
self._websocket = None
self._connected = False
await self.push_error(error_msg=f"Unable to connect to AssemblyAI: {e}", exception=e)
raise
async def _disconnect_websocket(self):
"""Close the websocket connection to AssemblyAI."""

View File

@@ -339,10 +339,10 @@ class AWSTranscribeSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.info(f"{self} Successfully connected to AWS Transcribe")
except Exception as e:
self._websocket = None
await self.push_error(
error_msg=f"Unable to connect to AWS Transcribe: {e}", exception=e
)
raise
async def _disconnect_websocket(self):
"""Close the websocket connection to AWS Transcribe."""

View File

@@ -354,8 +354,7 @@ class CartesiaSTTService(WebsocketSTTService):
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
self._websocket = None
await self.push_error(error_msg=f"Unable to connect to Cartesia: {e}", exception=e)
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _disconnect_websocket(self):
ws = self._websocket

View File

@@ -823,7 +823,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug("Connected to ElevenLabs Realtime STT")
except Exception as e:
self._websocket = None
await self.push_error(
error_msg=f"Unable to connect to ElevenLabs Realtime STT: {e}", exception=e
)

View File

@@ -594,10 +594,6 @@ class ElevenLabsTTSService(WebsocketTTSService):
self._partial_word_start_time = 0.0
self._alignment_started_context_ids: set[str | None] = set()
# Context IDs whose context-init has been sent, so the keepalive knows
# which contexts are safe to target.
self._context_init_sent: set[str] = set()
# Context management for v1 multi API
self._receive_task = None
self._keepalive_task = None
@@ -796,7 +792,6 @@ class ElevenLabsTTSService(WebsocketTTSService):
finally:
await self.remove_active_audio_context()
self._websocket = None
self._context_init_sent.clear()
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
@@ -827,7 +822,6 @@ class ElevenLabsTTSService(WebsocketTTSService):
self._partial_word = ""
self._partial_word_start_time = 0.0
self._alignment_started_context_ids.discard(context_id)
self._context_init_sent.discard(context_id)
async def on_audio_context_interrupted(self, context_id: str):
"""Close the ElevenLabs context when the bot is interrupted."""
@@ -920,35 +914,26 @@ class ElevenLabsTTSService(WebsocketTTSService):
while True:
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
await self._send_keepalive()
if self._websocket and self._websocket.state is State.OPEN:
context_id = self.get_active_audio_context_id()
if context_id:
# Send keepalive with context ID to keep the connection alive
keepalive_message = {
"text": "",
"context_id": context_id,
}
logger.trace(f"Sending keepalive for context {context_id}")
else:
# It's possible to have a user interruption which clears the context
# without generating a new TTS response. In this case, we'll just send
# an empty message to keep the connection alive.
keepalive_message = {"text": ""}
logger.trace("Sending keepalive without context")
await self._websocket.send(json.dumps(keepalive_message))
except websockets.ConnectionClosed as e:
logger.warning(f"{self} keepalive error: {e}")
break
async def _send_keepalive(self):
"""Send a single keepalive message to keep the WebSocket connection alive.
Only stamps a ``context_id`` once its context-init (carrying
``voice_settings``) has been sent. Otherwise the keepalive would be the
context's first message, with no ``voice_settings``, and ElevenLabs would
reject the later context-init with a 1008 policy violation. A context-less
keepalive is sufficient until the context-init is sent.
"""
if not self._websocket or self._websocket.state is not State.OPEN:
return
context_id = self.get_active_audio_context_id()
if context_id and context_id in self._context_init_sent:
# The context's voice_settings context-init has been sent, so it's
# safe to keep that context alive.
keepalive_message = {"text": "", "context_id": context_id}
else:
# No active context, or the active context's context-init hasn't been
# sent yet. A context-less keepalive keeps the connection alive without
# opening the context prematurely.
keepalive_message = {"text": ""}
await self._websocket.send(json.dumps(keepalive_message))
async def _send_text(self, text: str, context_id: str):
"""Send text to the WebSocket for synthesis."""
if self._websocket and context_id:
@@ -995,9 +980,6 @@ class ElevenLabsTTSService(WebsocketTTSService):
locator.model_dump()
for locator in self._pronunciation_dictionary_locators
]
# Mark the context-init as sent so the keepalive may now
# target this context_id.
self._context_init_sent.add(context_id)
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {context_id}")

View File

@@ -558,9 +558,8 @@ class GladiaSTTService(WebsocketSTTService):
logger.debug(f"{self} Connected to Gladia WebSocket")
except Exception as e:
self._websocket = None
self._connection_active = False
await self.push_error(error_msg=f"Unable to connect to Gladia: {e}", exception=e)
raise
async def _disconnect_websocket(self):
"""Close the websocket connection to Gladia."""

View File

@@ -423,8 +423,8 @@ class GradiumSTTService(WebsocketSTTService):
logger.debug("Connected to Gradium STT")
except Exception as e:
self._websocket = None
await self.push_error(error_msg=f"Unable to connect to Gradium: {e}", exception=e)
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise
async def _disconnect(self):
await super()._disconnect()

View File

@@ -1,124 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Inception LLM service implementation using OpenAI-compatible interface."""
from dataclasses import dataclass, field
from typing import Literal
from loguru import logger
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN
from pipecat.services.settings import _NotGiven, is_given
@dataclass
class InceptionLLMSettings(BaseOpenAILLMService.Settings):
"""Settings for InceptionLLMService.
Parameters:
reasoning_effort: Controls how much reasoning the model applies.
One of "instant", "low", "medium", or "high". When unset, the
parameter is omitted and Inception's server-side default applies.
realtime: When True, reduces time to first diffusion block (TTFT).
"""
reasoning_effort: Literal["instant", "low", "medium", "high"] | None | _NotGiven = field(
default_factory=lambda: _NOT_GIVEN
)
realtime: bool | None | _NotGiven = field(default_factory=lambda: _NOT_GIVEN)
class InceptionLLMService(OpenAILLMService):
"""A service for interacting with Inception's API using the OpenAI-compatible interface.
This service extends OpenAILLMService to connect to Inception's API endpoint while
maintaining full compatibility with OpenAI's interface and functionality.
Supports Mercury-2, Inception's diffusion-based reasoning model.
"""
# Inception doesn't support the "developer" message role.
supports_developer_role = False
Settings = InceptionLLMSettings
_settings: Settings
def __init__(
self,
*,
api_key: str,
base_url: str = "https://api.inceptionlabs.ai/v1",
settings: Settings | None = None,
**kwargs,
):
"""Initialize the Inception LLM service.
Args:
api_key: The API key for accessing Inception's API.
base_url: The base URL for Inception API. Defaults to "https://api.inceptionlabs.ai/v1".
settings: Runtime-updatable settings.
**kwargs: Additional keyword arguments passed to OpenAILLMService.
"""
default_settings = self.Settings(
model="mercury-2",
reasoning_effort=None,
realtime=None,
)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(api_key=api_key, base_url=base_url, settings=default_settings, **kwargs)
def create_client(self, api_key=None, base_url=None, **kwargs):
"""Create OpenAI-compatible client for Inception API endpoint.
Args:
api_key: The API key for authentication. If None, uses instance default.
base_url: The base URL for the API. If None, uses instance default.
**kwargs: Additional keyword arguments for client configuration.
Returns:
An OpenAI-compatible client configured for Inception's API.
"""
logger.debug(f"Creating Inception client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationParams) -> dict:
"""Build parameters for Inception chat completion request.
Extends the base OpenAI parameters with Inception-specific options
such as reasoning_effort and realtime.
Args:
params_from_context: Parameters, derived from the LLM context, to
use for the chat completion. Contains messages, tools, and tool
choice.
Returns:
Dictionary of parameters for the chat completion request.
"""
params = super().build_chat_completion_params(params_from_context)
if (
is_given(self._settings.reasoning_effort)
and self._settings.reasoning_effort is not None
):
params["reasoning_effort"] = self._settings.reasoning_effort
# realtime is Inception-specific and unknown to the OpenAI SDK,
# so it must be passed via extra_body to avoid validation errors.
extra_body = {}
if is_given(self._settings.realtime) and self._settings.realtime is not None:
extra_body["realtime"] = self._settings.realtime
if extra_body:
params["extra_body"] = extra_body
return params

View File

@@ -155,6 +155,7 @@ def language_to_soniox_language(language: Language) -> str:
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KA: "ka",
Language.KK: "kk",
Language.KN: "kn",
Language.KO: "ko",
@@ -231,7 +232,6 @@ class SonioxSTTSettings(STTSettings):
context_version 2.
enable_speaker_diarization: Whether to enable speaker diarization.
enable_language_identification: Whether to enable language identification.
max_endpoint_delay_ms: Max ms before endpoint detection finalizes the turn (500-3000).
client_reference_id: Client reference ID to use for transcription.
"""
@@ -242,7 +242,6 @@ class SonioxSTTSettings(STTSettings):
enable_language_identification: bool | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
max_endpoint_delay_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
client_reference_id: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
@@ -310,7 +309,6 @@ class SonioxSTTService(WebsocketSTTService):
context=None,
enable_speaker_diarization=False,
enable_language_identification=False,
max_endpoint_delay_ms=None,
client_reference_id=None,
)
@@ -392,7 +390,8 @@ class SonioxSTTService(WebsocketSTTService):
changed = await super()._update_settings(delta)
if changed:
await self._request_reconnect()
await self._disconnect()
await self._connect()
return changed
@@ -523,7 +522,6 @@ class SonioxSTTService(WebsocketSTTService):
"audio_format": self._audio_format,
"num_channels": self._num_channels,
"enable_endpoint_detection": enable_endpoint_detection,
"max_endpoint_delay_ms": s.max_endpoint_delay_ms,
"sample_rate": self.sample_rate,
"language_hints": _prepare_language_hints(assert_given(s.language_hints)),
"language_hints_strict": s.language_hints_strict,
@@ -539,8 +537,8 @@ class SonioxSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug("Connected to Soniox STT")
except Exception as e:
self._websocket = None
await self.push_error(error_msg=f"Unable to connect to Soniox: {e}", exception=e)
raise
async def _disconnect_websocket(self):
"""Close the websocket connection to Soniox."""

View File

@@ -44,15 +44,17 @@ GLADIA_TTFS_P99: float = 1.49
GOOGLE_TTFS_P99: float = 1.57
GRADIUM_TTFS_P99: float = 1.61
GROQ_TTFS_P99: float = 1.54
MISTRAL_TTFS_P99: float = 1.89
OPENAI_TTFS_P99: float = 2.01
OPENAI_REALTIME_TTFS_P99: float = 1.66
SARVAM_TTFS_P99: float = 1.17
SMALLEST_TTFS_P99: float = 1.59
SONIOX_TTFS_P99: float = 0.35
SPEECHMATICS_TTFS_P99: float = 0.74
XAI_TTFS_P99: float = 2.14
# These services run locally and should be replaced with measured values
NVIDIA_TTFS_P99: float = DEFAULT_TTFS_P99
WHISPER_TTFS_P99: float = DEFAULT_TTFS_P99
# No benchmark available yet; using conservative default
MISTRAL_TTFS_P99: float = DEFAULT_TTFS_P99
SMALLEST_TTFS_P99: float = DEFAULT_TTFS_P99
XAI_TTFS_P99: float = DEFAULT_TTFS_P99

View File

@@ -11,6 +11,9 @@ avatar functionality through Tavus's streaming API.
"""
import asyncio
import datetime
import os
import wave
from dataclasses import dataclass
import aiohttp
@@ -101,8 +104,9 @@ class TavusVideoService(AIService):
self._audio_buffer = bytearray()
self._send_task: asyncio.Task | None = None
# This is the custom track destination expected by Tavus
self._transport_destination: str | None = "stream"
self._transport_destination: str = "stream"
self._transport_ready = False
self._wav_file: wave.Wave_write | None = None
async def setup(self, setup: FrameProcessorSetup):
"""Set up the Tavus video service.
@@ -204,6 +208,21 @@ class TavusVideoService(AIService):
"""
return await self._client.get_persona_name()
def _open_wav(self, sample_rate: int):
os.makedirs("recordings", exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = f"recordings/bot_pre_speed_{timestamp}.wav"
self._wav_file = wave.open(path, "wb")
self._wav_file.setnchannels(1)
self._wav_file.setsampwidth(2)
self._wav_file.setframerate(sample_rate)
logger.info(f"Recording outgoing audio to {path}")
def _close_wav(self):
if self._wav_file:
self._wav_file.close()
self._wav_file = None
async def start(self, frame: StartFrame):
"""Start the Tavus video service.
@@ -212,10 +231,10 @@ class TavusVideoService(AIService):
"""
await super().start(frame)
await self._client.start(frame)
if self._transport_destination:
await self._client.register_audio_destination(
self._transport_destination, auto_silence=False
)
await self._client.register_audio_destination(
self._transport_destination, auto_silence=False
)
self._open_wav(self._client.out_sample_rate)
await self._create_send_task()
async def stop(self, frame: EndFrame):
@@ -227,6 +246,7 @@ class TavusVideoService(AIService):
await super().stop(frame)
await self._end_conversation()
await self._cancel_send_task()
self._close_wav()
async def cancel(self, frame: CancelFrame):
"""Cancel the Tavus video service.
@@ -237,6 +257,7 @@ class TavusVideoService(AIService):
await super().cancel(frame)
await self._end_conversation()
await self._cancel_send_task()
self._close_wav()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames through the service.
@@ -308,9 +329,30 @@ class TavusVideoService(AIService):
self._audio_buffer = self._audio_buffer[chunk_size:]
async def _send_task_handler(self):
"""Handle sending audio frames to the Tavus client."""
"""Handle sending audio frames to the Tavus client.
Accumulates 500 ms of audio before sending anything to WebRTC. This
pre-buffer absorbs TTS jitter so the WebRTC jitter buffer sees a steady
stream rather than bursts separated by silence, which prevents the drift
and silence-injection observed without it. On interruption the task is
replaced, so the next utterance gets a fresh 500 ms pre-buffer.
"""
min_prebuffer_bytes = int(self._client.out_sample_rate * 0.5) * 2
prebuffer: list[OutputAudioRawFrame] | None = []
while True:
frame = await self._queue.get()
if isinstance(frame, OutputAudioRawFrame) and self._client:
await self._client.write_audio_frame(frame)
if prebuffer is not None:
prebuffer.append(frame)
if sum(len(f.audio) for f in prebuffer) >= min_prebuffer_bytes:
for f in prebuffer:
if self._wav_file:
self._wav_file.writeframes(f.audio)
await self._client.write_audio_frame(f)
prebuffer = None
else:
if self._wav_file:
self._wav_file.writeframes(frame.audio)
await self._client.write_audio_frame(frame)
self._queue.task_done()

View File

@@ -76,9 +76,7 @@ class WebsocketService(ABC):
logger.warning(f"{self} reconnecting (attempt: {attempt_number})")
await self._disconnect_websocket()
await self._connect_websocket()
if not await self._verify_connection():
raise ConnectionError(f"{self} websocket reconnection failed verification")
return True
return await self._verify_connection()
async def _try_reconnect(
self,

View File

@@ -293,9 +293,8 @@ class XAISTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug(f"{self} connected to xAI STT WebSocket")
except Exception as e:
self._websocket = None
self._session_ready.clear()
await self.push_error(error_msg=f"Unable to connect to xAI STT: {e}", exception=e)
raise
async def _disconnect_websocket(self):
"""Close the WebSocket connection."""

View File

@@ -823,6 +823,23 @@ class BaseOutputTransport(FrameProcessor):
async def _audio_task_handler(self):
"""Main audio processing task handler."""
# Pre-buffer: accumulate audio before sending anything to the transport.
#
# prebuffer is a list while we are still accumulating, and None once the
# threshold has been reached and all held frames have been flushed. Using
# None as the sentinel avoids a boolean flag and makes the steady-state
# branch a simple identity check.
#
# The pre-buffer resets automatically on each interruption because the
# audio task is cancelled and recreated, giving the next utterance a fresh
# local variable.
min_prebuffer_bytes = (
int(self._sample_rate * self._params.audio_out_prebuffer_secs)
* 2
* self._params.audio_out_channels
)
prebuffer: list[OutputAudioRawFrame] | None = [] if min_prebuffer_bytes > 0 else None
async for frame in self._next_frame():
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
@@ -840,7 +857,20 @@ class BaseOutputTransport(FrameProcessor):
# Try to send audio to the transport.
try:
if isinstance(frame, OutputAudioRawFrame):
push_downstream = await self._transport.write_audio_frame(frame)
if prebuffer is not None:
# Accumulation phase: hold frames until we have enough audio.
prebuffer.append(frame)
if sum(len(f.audio) for f in prebuffer) >= min_prebuffer_bytes:
# Threshold reached: flush all held frames at once, then
# switch to direct-write mode for the rest of the utterance.
for f in prebuffer:
await self._transport.write_audio_frame(f)
prebuffer = None
# push_downstream stays True so frames flow through the
# pipeline even while we are still accumulating.
else:
# Steady-state: write directly to the transport.
push_downstream = await self._transport.write_audio_frame(frame)
except Exception as e:
logger.error(f"{self} Error writing {frame} to transport: {e}")
push_downstream = False

View File

@@ -34,6 +34,8 @@ class TransportParams(BaseModel):
audio_out_mixer: Audio mixer instance or destination mapping.
audio_out_destinations: List of audio output destination identifiers.
audio_out_end_silence_secs: How much silence to send after an EndFrame (0 for no silence).
audio_out_prebuffer_secs: Seconds of audio to accumulate before sending anything to the
transport. Resets automatically on each interruption. Defaults to 0.0 (disabled).
audio_out_auto_silence: Insert silence frames when the audio output queue is empty.
When False, the transport will wait for audio data instead of inserting silence.
audio_in_enabled: Enable audio input streaming.
@@ -70,6 +72,7 @@ class TransportParams(BaseModel):
audio_out_mixer: BaseAudioMixer | Mapping[str | None, BaseAudioMixer] | None = None
audio_out_destinations: list[str] = Field(default_factory=list)
audio_out_end_silence_secs: int = 2
audio_out_prebuffer_secs: float = 0.0
audio_out_auto_silence: bool = True
audio_in_enabled: bool = False
audio_in_sample_rate: int | None = None

View File

@@ -40,10 +40,16 @@ from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import (
DailyCallbacks,
DailyCustomAudioTrackParams,
DailyParams,
DailyTransportClient,
)
# Opus codec maximum. When the Tavus server supports fast audio delivery it
# returns this as stream_declared_sample_rate so that write_frames() blocks for
# n/48000 s instead of n/true_rate s, delivering audio faster than real-time.
_STREAM_DECLARED_SAMPLE_RATE = 48000
class TavusApi:
"""Helper class for interacting with the Tavus API (v2).
@@ -69,20 +75,28 @@ class TavusApi:
# Only for development
self._dev_room_url = os.getenv("TAVUS_SAMPLE_ROOM_URL")
async def create_conversation(self, replica_id: str, persona_id: str) -> dict:
async def create_conversation(self, replica_id: str, persona_id: str, sample_rate: int) -> dict:
"""Create a new conversation with the specified replica and persona.
Args:
replica_id: ID of the replica to use in the conversation.
persona_id: ID of the persona to use in the conversation.
sample_rate: True audio sample rate of the pipeline's output. Sent
to Tavus so the server can negotiate fast audio delivery. When
the server supports it, the response includes
``stream_declared_sample_rate`` — the rate Pipecat should
declare to the ``CustomAudioSource`` for faster-than-realtime
delivery.
Returns:
Dictionary containing conversation_id and conversation_url.
Dictionary containing conversation_id, conversation_url, and
optionally stream_declared_sample_rate.
"""
if self._dev_room_url:
return {
"conversation_id": self.MOCK_CONVERSATION_ID,
"conversation_url": self._dev_room_url,
"stream_declared_sample_rate": _STREAM_DECLARED_SAMPLE_RATE,
}
logger.debug(f"Creating Tavus conversation: replica={replica_id}, persona={persona_id}")
@@ -90,6 +104,8 @@ class TavusApi:
payload = {
"replica_id": replica_id,
"persona_id": persona_id,
# TODO: start to send it when Tavus start to support it.
# "sample_rate": sample_rate,
}
async with self._session.post(url, headers=self._headers, json=payload) as r:
r.raise_for_status()
@@ -152,11 +168,15 @@ class TavusParams(DailyParams):
audio_in_enabled: Whether to enable audio input from participants.
audio_out_enabled: Whether to enable audio output to participants.
microphone_out_enabled: Whether to enable microphone output track.
audio_out_prebuffer_secs: Seconds of audio to accumulate before sending to WebRTC.
Absorbs TTS jitter to prevent the WebRTC jitter buffer from injecting silence.
Defaults to 0.5.
"""
audio_in_enabled: bool = True
audio_out_enabled: bool = True
microphone_out_enabled: bool = False
audio_out_prebuffer_secs: float = 0.5
class TavusTransportClient:
@@ -202,76 +222,60 @@ class TavusTransportClient:
self._client: DailyTransportClient | None = None
self._callbacks = callbacks
self._params = params
self._setup: FrameProcessorSetup | None = None
self._initialized = False
async def _initialize(self) -> str:
"""Initialize the conversation and return the room URL."""
response = await self._api.create_conversation(self._replica_id, self._persona_id)
self._conversation_id = response["conversation_id"]
return response["conversation_url"]
def _build_daily_callbacks(self) -> DailyCallbacks:
"""Build the DailyCallbacks object."""
return DailyCallbacks(
on_active_speaker_changed=partial(
self._on_handle_callback, "on_active_speaker_changed"
),
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=partial(self._on_handle_callback, "on_before_leave"),
on_error=partial(self._on_handle_callback, "on_error"),
on_app_message=partial(self._on_handle_callback, "on_app_message"),
on_call_state_updated=partial(self._on_handle_callback, "on_call_state_updated"),
on_client_connected=partial(self._on_handle_callback, "on_client_connected"),
on_client_disconnected=partial(self._on_handle_callback, "on_client_disconnected"),
on_dialin_connected=partial(self._on_handle_callback, "on_dialin_connected"),
on_dialin_ready=partial(self._on_handle_callback, "on_dialin_ready"),
on_dialin_stopped=partial(self._on_handle_callback, "on_dialin_stopped"),
on_dialin_error=partial(self._on_handle_callback, "on_dialin_error"),
on_dialin_warning=partial(self._on_handle_callback, "on_dialin_warning"),
on_dialout_answered=partial(self._on_handle_callback, "on_dialout_answered"),
on_dialout_connected=partial(self._on_handle_callback, "on_dialout_connected"),
on_dialout_stopped=partial(self._on_handle_callback, "on_dialout_stopped"),
on_dialout_error=partial(self._on_handle_callback, "on_dialout_error"),
on_dialout_warning=partial(self._on_handle_callback, "on_dialout_warning"),
on_dtmf_event=partial(self._on_handle_callback, "on_dtmf_event"),
on_participant_joined=self._callbacks.on_participant_joined,
on_participant_left=self._callbacks.on_participant_left,
on_participant_updated=partial(self._on_handle_callback, "on_participant_updated"),
on_transcription_message=partial(self._on_handle_callback, "on_transcription_message"),
on_recording_started=partial(self._on_handle_callback, "on_recording_started"),
on_recording_stopped=partial(self._on_handle_callback, "on_recording_stopped"),
on_recording_error=partial(self._on_handle_callback, "on_recording_error"),
on_transcription_stopped=partial(self._on_handle_callback, "on_transcription_stopped"),
on_transcription_error=partial(self._on_handle_callback, "on_transcription_error"),
)
async def setup(self, setup: FrameProcessorSetup):
"""Setup the client and initialize the conversation.
"""Save setup context for later use in start().
Args:
setup: The frame processor setup configuration.
"""
if self._conversation_id is not None:
logger.debug(f"Conversation ID already defined: {self._conversation_id}")
return
try:
room_url = await self._initialize()
daily_callbacks = DailyCallbacks(
on_active_speaker_changed=partial(
self._on_handle_callback, "on_active_speaker_changed"
),
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=partial(self._on_handle_callback, "on_before_leave"),
on_error=partial(self._on_handle_callback, "on_error"),
on_app_message=partial(self._on_handle_callback, "on_app_message"),
on_call_state_updated=partial(self._on_handle_callback, "on_call_state_updated"),
on_client_connected=partial(self._on_handle_callback, "on_client_connected"),
on_client_disconnected=partial(self._on_handle_callback, "on_client_disconnected"),
on_dialin_connected=partial(self._on_handle_callback, "on_dialin_connected"),
on_dialin_ready=partial(self._on_handle_callback, "on_dialin_ready"),
on_dialin_stopped=partial(self._on_handle_callback, "on_dialin_stopped"),
on_dialin_error=partial(self._on_handle_callback, "on_dialin_error"),
on_dialin_warning=partial(self._on_handle_callback, "on_dialin_warning"),
on_dialout_answered=partial(self._on_handle_callback, "on_dialout_answered"),
on_dialout_connected=partial(self._on_handle_callback, "on_dialout_connected"),
on_dialout_stopped=partial(self._on_handle_callback, "on_dialout_stopped"),
on_dialout_error=partial(self._on_handle_callback, "on_dialout_error"),
on_dialout_warning=partial(self._on_handle_callback, "on_dialout_warning"),
on_dtmf_event=partial(self._on_handle_callback, "on_dtmf_event"),
on_participant_joined=self._callbacks.on_participant_joined,
on_participant_left=self._callbacks.on_participant_left,
on_participant_updated=partial(self._on_handle_callback, "on_participant_updated"),
on_transcription_message=partial(
self._on_handle_callback, "on_transcription_message"
),
on_recording_started=partial(self._on_handle_callback, "on_recording_started"),
on_recording_stopped=partial(self._on_handle_callback, "on_recording_stopped"),
on_recording_error=partial(self._on_handle_callback, "on_recording_error"),
on_transcription_stopped=partial(
self._on_handle_callback, "on_transcription_stopped"
),
on_transcription_error=partial(self._on_handle_callback, "on_transcription_error"),
)
self._client = DailyTransportClient(
room_url, None, "Pipecat", self._params, daily_callbacks, self._bot_name
)
await self._client.setup(setup)
except Exception as e:
logger.error(f"Failed to setup TavusTransportClient: {e}")
await self._api.end_conversation(self._conversation_id)
self._conversation_id = None
self._setup = setup
async def cleanup(self):
"""Cleanup client resources."""
try:
await self._client.cleanup()
except Exception as e:
logger.error(f"Exception during cleanup: {e}")
if self._client:
try:
await self._client.cleanup()
except Exception as e:
logger.error(f"Exception during cleanup: {e}")
async def _on_joined(self, data):
"""Handle joined event."""
@@ -295,12 +299,56 @@ class TavusTransportClient:
return await self._api.get_persona_name(self._persona_id)
async def start(self, frame: StartFrame):
"""Start the client and join the room.
"""Create the conversation, build the Daily client, and join the room.
Args:
frame: The start frame containing initialization parameters.
"""
if self._initialized:
return
self._initialized = True
logger.debug("TavusTransportClient start invoked!")
try:
sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
response = await self._api.create_conversation(
self._replica_id, self._persona_id, sample_rate
)
self._conversation_id = response["conversation_id"]
room_url = response["conversation_url"]
params = self._params
stream_declared_sample_rate = response.get("stream_declared_sample_rate")
if stream_declared_sample_rate:
# Tavus supports fast audio delivery: we write true-rate PCM bytes into a
# CustomAudioSource declared at stream_declared_sample_rate (e.g. 48 kHz).
# write_frames() blocks for n/declared_rate seconds instead of n/true_rate
# seconds, so audio is delivered faster than real-time. The receiver must
# also request the same declared rate so WebRTC skips resampling and every
# original byte arrives intact.
# We always override sample_rate here even if the user already provided
# "stream" params, because the declared rate must match what the server
# negotiated — other fields (channels, send_settings) are preserved.
logger.debug(
f"Tavus fast audio: true_rate={sample_rate} declared_rate={stream_declared_sample_rate}"
)
existing = dict(params.custom_audio_track_params or {})
existing["stream"] = (
existing.get("stream") or DailyCustomAudioTrackParams()
).model_copy(update={"sample_rate": stream_declared_sample_rate})
params = params.model_copy(update={"custom_audio_track_params": existing})
self._client = DailyTransportClient(
room_url, None, "Pipecat", params, self._build_daily_callbacks(), self._bot_name
)
await self._client.setup(self._setup)
except Exception as e:
logger.error(f"Failed to start TavusTransportClient: {e}")
await self._api.end_conversation(self._conversation_id)
self._conversation_id = None
self._initialized = False
return
await self._client.start(frame)
await self._client.join()
@@ -598,7 +646,11 @@ class TavusOutputTransport(BaseOutputTransport):
await self._client.start(frame)
if self._transport_destination:
await self._client.register_audio_destination(self._transport_destination)
# auto_silence=False so the CustomAudioSource only writes frames when
# there is real TTS audio.
await self._client.register_audio_destination(
self._transport_destination, auto_silence=False
)
await self.set_transport_ready(frame)

View File

@@ -86,6 +86,7 @@ class WordCompletionTracker:
self._overflow_word: str | None = None
self._llm_consumed: str | None = None
self._frame_word: str | None = None
logger.debug(f"WordCompletionTracker: {self._tts_normalized}")
@staticmethod
def _normalize(text: str) -> str:

View File

@@ -1,45 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from unittest.mock import AsyncMock
import pytest
from websockets.protocol import State
from pipecat.services.cartesia.stt import CartesiaSTTService
class _FakeWebsocket:
def __init__(self, *, state=State.OPEN, send_side_effect=None):
self.state = state
self.send = AsyncMock(side_effect=send_side_effect)
@pytest.mark.asyncio
async def test_cartesia_connect_failure_clears_stale_websocket(monkeypatch):
async def fake_websocket_connect(*args, **kwargs):
raise RuntimeError("connection failed")
monkeypatch.setattr("pipecat.services.cartesia.stt.websocket_connect", fake_websocket_connect)
service = CartesiaSTTService(api_key="test-key", sample_rate=16000)
service._websocket = _FakeWebsocket(state=State.CLOSED)
await service._connect_websocket()
assert service._websocket is None
@pytest.mark.asyncio
async def test_cartesia_run_stt_logs_send_failure_without_clearing_websocket():
service = CartesiaSTTService(api_key="test-key", sample_rate=16000)
websocket = _FakeWebsocket(send_side_effect=RuntimeError("websocket closed"))
service._websocket = websocket
async for _ in service.run_stt(b"\x00" * 160):
pass
assert service._websocket is websocket

View File

@@ -6,14 +6,9 @@
"""Tests for ElevenLabs TTS alignment handling."""
import json
from typing import Any
import pytest
from websockets.protocol import State
from pipecat.services.elevenlabs.tts import (
ElevenLabsTTSService,
_select_alignment,
_strip_utterance_leading_spaces,
calculate_word_times,
@@ -205,87 +200,3 @@ def test_select_alignment_works_with_http_field_names():
)
assert selected is not None
assert selected["characters"] == list(" Hi")
# ---------------------------------------------------------------------------
# Keepalive vs context-init race
#
# The keepalive must only stamp a context_id once its context-init (carrying
# voice_settings) has been sent. Stamping it earlier makes the keepalive the
# context's first message, with no voice_settings, and ElevenLabs rejects the
# later context-init with a 1008 policy violation.
# ---------------------------------------------------------------------------
class _FakeWebSocket:
"""Minimal stand-in for the ElevenLabs websocket that records sends."""
def __init__(self):
self.state = State.OPEN
self.sent: list[dict] = []
async def send(self, data: str):
self.sent.append(json.loads(data))
def _make_service() -> ElevenLabsTTSService:
return ElevenLabsTTSService(
api_key="test-key",
settings=ElevenLabsTTSService.Settings(
voice="test-voice",
stability=0.55,
similarity_boost=0.85,
use_speaker_boost=True,
speed=0.81,
),
)
@pytest.mark.asyncio
async def test_keepalive_does_not_stamp_context_before_init():
"""During the pre-init window the keepalive must not stamp the new context_id."""
service = _make_service()
ws = _FakeWebSocket()
service._websocket = ws
# Simulate the start of an LLM turn: TTSService sets the turn context id on
# LLMFullResponseStartFrame, before run_tts sends the voice_settings init.
service._turn_context_id = "ctx-1"
service._playing_context_id = None
assert "ctx-1" not in service._context_init_sent
await service._send_keepalive()
# Context-less keepalive: the real context-init stays the context's first
# message, so ElevenLabs won't reject it with 1008.
assert ws.sent == [{"text": ""}]
@pytest.mark.asyncio
async def test_keepalive_stamps_context_after_init():
"""Once the context-init has been sent, the keepalive targets that context."""
service = _make_service()
ws = _FakeWebSocket()
service._websocket = ws
service._turn_context_id = "ctx-1"
service._playing_context_id = None
# run_tts records the context once its voice_settings init has gone out.
service._context_init_sent.add("ctx-1")
await service._send_keepalive()
assert ws.sent == [{"text": "", "context_id": "ctx-1"}]
@pytest.mark.asyncio
async def test_keepalive_without_active_context_sends_empty():
"""With no active context, the keepalive sends a plain empty message."""
service = _make_service()
ws = _FakeWebSocket()
service._websocket = ws
service._turn_context_id = None
service._playing_context_id = None
await service._send_keepalive()
assert ws.sent == [{"text": ""}]

View File

@@ -1,324 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import io
import sys
import types
import unittest
from contextlib import redirect_stdout
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
from pipecat.runner.run import (
_print_startup_message,
_setup_daily_routes,
_setup_telephony_routes,
_setup_unified_start_route,
_setup_webrtc_routes,
_setup_websocket_routes,
_transport_route_dependencies,
_transport_routes_enabled,
)
class TestRunnerRun(unittest.TestCase):
def _capture_startup_message(self, args: argparse.Namespace) -> str:
buffer = io.StringIO()
with redirect_stdout(buffer):
_print_startup_message(args)
return buffer.getvalue()
def test_transport_route_dependencies_maps_transports_to_modules(self):
self.assertEqual(_transport_route_dependencies("daily"), ("daily",))
self.assertEqual(_transport_route_dependencies("webrtc"), ("aiortc",))
self.assertEqual(_transport_route_dependencies("websocket"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("telephony"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("twilio"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("telnyx"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("plivo"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("exotel"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("vonage"), ())
def test_transport_routes_enabled_maps_transports_to_dependency_checks(self):
def module_available(module: str) -> bool:
return module in {"fastapi", "websockets"}
with patch("pipecat.runner.run._is_module_available", side_effect=module_available):
self.assertFalse(_transport_routes_enabled("daily"))
self.assertFalse(_transport_routes_enabled("webrtc"))
self.assertTrue(_transport_routes_enabled("websocket"))
self.assertTrue(_transport_routes_enabled("telephony"))
self.assertTrue(_transport_routes_enabled("twilio"))
self.assertTrue(_transport_routes_enabled("vonage"))
def test_setup_webrtc_routes_skips_when_aiortc_is_missing(self):
"""WebRTC routes should be optional when the webrtc extra is not installed."""
app = FastAPI()
args = argparse.Namespace(folder=None, esp32=False, host="localhost")
with (
patch("pipecat.runner.run._transport_routes_enabled", return_value=False),
patch("pipecat.runner.run.logger") as logger,
):
_setup_webrtc_routes(app, args, {})
paths = {route.path for route in app.routes}
self.assertNotIn("/api/offer", paths)
logger.info.assert_not_called()
def test_setup_webrtc_routes_registers_routes_when_webrtc_is_available(self):
"""WebRTC routes should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace(folder=None, esp32=False, host="localhost")
connection_module = types.ModuleType("pipecat.transports.smallwebrtc.connection")
connection_module.SmallWebRTCConnection = MagicMock()
request_handler_module = types.ModuleType("pipecat.transports.smallwebrtc.request_handler")
class IceCandidate(BaseModel):
candidate: str
sdp_mid: str
sdp_mline_index: int
class SmallWebRTCPatchRequest(BaseModel):
pc_id: str
candidates: list[IceCandidate] = []
class SmallWebRTCRequest(BaseModel):
sdp: str
type: str
pc_id: str | None = None
restart_pc: bool | None = None
request_data: dict | None = None
request_handler_module.IceCandidate = IceCandidate
request_handler_module.SmallWebRTCPatchRequest = SmallWebRTCPatchRequest
request_handler_module.SmallWebRTCRequest = SmallWebRTCRequest
class MockSmallWebRTCRequestHandler:
def __init__(self, *args, **kwargs):
pass
async def close(self):
pass
request_handler_module.SmallWebRTCRequestHandler = MockSmallWebRTCRequestHandler
with (
patch("pipecat.runner.run._transport_routes_enabled", return_value=True),
patch.dict(
sys.modules,
{
"pipecat.transports.smallwebrtc.connection": connection_module,
"pipecat.transports.smallwebrtc.request_handler": request_handler_module,
},
),
):
_setup_webrtc_routes(app, args, {})
paths = {route.path for route in app.routes}
self.assertIn("/api/offer", paths)
self.assertIn("/files/{filename:path}", paths)
def test_setup_websocket_routes_skips_when_websocket_is_missing(self):
"""Plain WebSocket routes should be optional."""
app = FastAPI()
args = argparse.Namespace()
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
_setup_websocket_routes(app, args)
paths = {route.path for route in app.routes}
self.assertNotIn("/ws-client", paths)
def test_setup_websocket_routes_registers_when_websocket_is_available(self):
"""Plain WebSocket route should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace()
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_websocket_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/ws-client", paths)
def test_setup_telephony_routes_skips_when_websocket_is_missing(self):
"""Telephony WebSocket routes should be optional."""
app = FastAPI()
args = argparse.Namespace(transport=None)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
_setup_telephony_routes(app, args)
paths = {route.path for route in app.routes}
self.assertNotIn("/ws", paths)
def test_setup_telephony_routes_registers_when_websocket_is_available(self):
"""Telephony WebSocket route should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace(transport=None)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_telephony_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/ws", paths)
def test_setup_telephony_routes_registers_provider_webhook_for_selected_transport(self):
"""Provider webhook route should be registered for selected telephony transports."""
app = FastAPI()
args = argparse.Namespace(transport="twilio", proxy="example.ngrok.io")
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_telephony_routes(app, args)
post_root_routes = [
route for route in app.routes if route.path == "/" and "POST" in route.methods
]
self.assertEqual(len(post_root_routes), 1)
def test_setup_daily_routes_skips_when_daily_is_missing(self):
"""Daily routes should be optional."""
app = FastAPI()
args = argparse.Namespace(dialin=False)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
_setup_daily_routes(app, args)
paths = {route.path for route in app.routes}
self.assertNotIn("/daily", paths)
def test_setup_daily_routes_registers_when_daily_is_available(self):
"""Daily route should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace(dialin=False)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_daily_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/daily", paths)
def test_setup_daily_routes_registers_dialin_route_when_enabled(self):
"""Daily dial-in route should be registered when requested and available."""
app = FastAPI()
args = argparse.Namespace(dialin=True)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_daily_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/daily", paths)
self.assertIn("/daily-dialin-webhook", paths)
def test_websocket_routes_require_fastapi_and_websockets(self):
with patch(
"pipecat.runner.run._is_module_available",
side_effect=lambda module: module == "fastapi",
) as is_module_available:
self.assertFalse(_transport_routes_enabled("websocket"))
self.assertEqual(
[call.args[0] for call in is_module_available.call_args_list],
["fastapi", "websockets"],
)
def test_start_rejects_disabled_transport_before_running_bot(self):
app = FastAPI()
args = argparse.Namespace(transport=None)
_setup_unified_start_route(app, args, {})
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
response = TestClient(app).post("/start", json={"transport": "daily"})
self.assertEqual(response.status_code, 400)
self.assertEqual(
response.json()["detail"],
(
"Transport 'daily' is disabled in this runner environment. "
"Check the startup banner for enabled transports."
),
)
def test_startup_message_all_transports_shows_open_url_and_transport_status(self):
args = argparse.Namespace(transport=None, host="localhost", port=7860)
def routes_enabled(transport: str) -> bool:
return transport in {"telephony", "websocket"}
with patch("pipecat.runner.run._transport_routes_enabled", side_effect=routes_enabled):
output = self._capture_startup_message(args)
self.assertEqual(
output,
(
"\n"
"🚀 Bot ready!\n"
" → Open: http://localhost:7860\n"
" → Enabled transports: telephony, websocket\n"
" → Disabled transports: daily (install pipecat-ai[daily]), "
"webrtc (install pipecat-ai[webrtc])\n"
"\n"
),
)
def test_startup_message_all_transports_omits_disabled_status_when_all_enabled(self):
args = argparse.Namespace(transport=None, host="localhost", port=7860)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
output = self._capture_startup_message(args)
self.assertEqual(
output,
(
"\n"
"🚀 Bot ready!\n"
" → Open: http://localhost:7860\n"
" → Enabled transports: daily, webrtc, telephony, websocket\n"
"\n"
),
)
def test_startup_message_webrtc_uses_root_open_url(self):
args = argparse.Namespace(
transport="webrtc", host="localhost", port=7860, esp32=False, whatsapp=False
)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
output = self._capture_startup_message(args)
self.assertIn(" → Open: http://localhost:7860\n", output)
self.assertNotIn("/client", output)
def test_startup_message_daily_uses_root_open_url(self):
args = argparse.Namespace(transport="daily", host="localhost", port=7860, dialin=False)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
output = self._capture_startup_message(args)
self.assertIn(" → Open: http://localhost:7860\n", output)
self.assertNotIn("/daily in your browser", output)
def test_startup_message_telephony_keeps_provider_endpoint_details(self):
args = argparse.Namespace(
transport="twilio", host="localhost", port=7860, proxy="example.ngrok.io"
)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
output = self._capture_startup_message(args)
self.assertIn(" → Open: http://localhost:7860\n", output)
self.assertIn(" → XML webhook: http://localhost:7860/\n", output)
self.assertIn(" → WebSocket: ws://localhost:7860/ws\n", output)
if __name__ == "__main__":
unittest.main()

View File

@@ -5,10 +5,8 @@
#
import json
from unittest.mock import AsyncMock
import pytest
from websockets.protocol import State
from pipecat.frames.frames import TranscriptionFrame
from pipecat.services.soniox.stt import END_TOKEN, SonioxSTTService, _language_from_tokens
@@ -16,10 +14,8 @@ from pipecat.transcriptions.language import Language
class _FakeWebsocket:
def __init__(self, messages, *, state=State.OPEN, send_side_effect=None):
def __init__(self, messages):
self._messages = messages
self.state = state
self.send = AsyncMock(side_effect=send_side_effect)
def __aiter__(self):
return self._iter_messages()
@@ -29,21 +25,6 @@ class _FakeWebsocket:
yield message
@pytest.mark.asyncio
async def test_connect_failure_clears_stale_websocket_without_raising(monkeypatch):
async def fake_websocket_connect(*args, **kwargs):
raise RuntimeError("connection failed")
monkeypatch.setattr("pipecat.services.soniox.stt.websocket_connect", fake_websocket_connect)
service = SonioxSTTService(api_key="test-key")
service._websocket = _FakeWebsocket([], state=State.CLOSED)
await service._connect_websocket()
assert service._websocket is None
def test_language_from_tokens_uses_single_recognized_language():
tokens = [
{"text": "Hello", "language": "en"},

View File

@@ -165,19 +165,6 @@ async def test_reconnect_exhausted_emits_non_fatal_error(service, report_error):
assert "Connection refused" in final_error.error
@pytest.mark.asyncio
async def test_reconnect_exhausted_when_connect_does_not_raise(service, report_error):
"""A non-raising failed connect is treated as a failed reconnect attempt."""
result = await service._try_reconnect(report_error=report_error)
assert result is False
assert report_error.call_count == 4
final_error = report_error.call_args_list[-1][0][0]
assert isinstance(final_error, ErrorFrame)
assert final_error.fatal is False
assert "websocket reconnection failed verification" in final_error.error
# ---------------------------------------------------------------------------
# Quick failure detection — accept then immediately close
# ---------------------------------------------------------------------------

10
uv.lock generated
View File

@@ -4539,7 +4539,7 @@ requires-dist = [
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'ultravox'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'websocket'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'xai'" },
{ name = "pipecat-ai-prebuilt", marker = "extra == 'runner'", specifier = ">=1.0.1" },
{ name = "pipecat-ai-prebuilt", marker = "extra == 'runner'", specifier = ">=1.0.0" },
{ name = "piper-tts", marker = "extra == 'piper'", specifier = ">=1.3.0,<2" },
{ name = "protobuf", specifier = ">=5.29.6,<7" },
{ name = "protobuf", marker = "extra == 'nvidia'", specifier = ">=6.31.1,<7" },
@@ -4574,7 +4574,7 @@ requires-dist = [
{ name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1,<1" },
{ name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" },
]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inception", "inworld", "koala", "kokoro", "langchain", "lemonslice", "livekit", "lmnt", "local", "local-smart-turn", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nebius", "neuphonic", "novita", "nvidia", "openai", "rnnoise", "openrouter", "perplexity", "piper", "qwen", "resembleai", "rime", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "smallest", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "vonage-video-connector", "webrtc", "websocket", "websockets-base", "whisper", "xai"]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "kokoro", "langchain", "lemonslice", "livekit", "lmnt", "local", "local-smart-turn", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nebius", "neuphonic", "novita", "nvidia", "openai", "rnnoise", "openrouter", "perplexity", "piper", "qwen", "resembleai", "rime", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "smallest", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "vonage-video-connector", "webrtc", "websocket", "websockets-base", "whisper", "xai"]
[package.metadata.requires-dev]
dev = [
@@ -4603,14 +4603,14 @@ docs = [
[[package]]
name = "pipecat-ai-prebuilt"
version = "1.0.1"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "fastapi", extra = ["all"] },
]
sdist = { url = "https://files.pythonhosted.org/packages/fa/27/91857cd93661922687e51f4141583dbeb71f9a6c8d0d6379bae1aa467522/pipecat_ai_prebuilt-1.0.1.tar.gz", hash = "sha256:9453136fcb994802f9b650b5175f3ce1d0476849a9e609fefe52ecc1c3299680", size = 601771, upload-time = "2026-05-20T16:08:14.485Z" }
sdist = { url = "https://files.pythonhosted.org/packages/d0/86/7527474a324e3da787468133a1dba877e06576edc502e1bc7dd84ba7c9f7/pipecat_ai_prebuilt-1.0.0.tar.gz", hash = "sha256:dc66df541f17620eef5dedb2fd44737eb97232899779afb66dcca5aaa9317512", size = 601709, upload-time = "2026-05-14T21:15:26.575Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/4f/a636e47967c3aa885ae912502d73a46d1e824a67992e405ea1e94b78bd94/pipecat_ai_prebuilt-1.0.1-py3-none-any.whl", hash = "sha256:45d78d3fd2ac8193626a5dabb5f45d0ff2d35bfc92098b4bcea308ae612196aa", size = 601994, upload-time = "2026-05-20T16:08:12.4Z" },
{ url = "https://files.pythonhosted.org/packages/89/b1/648122d5e418d3e0c8f797028bc53a22229ffc07a2406712b13b76735f38/pipecat_ai_prebuilt-1.0.0-py3-none-any.whl", hash = "sha256:6b7057920d3d00e5687adb26e032634ba1f6d924eb9079b1804d031620a1e854", size = 601949, upload-time = "2026-05-14T21:15:24.666Z" },
]
[[package]]