Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
cb6e86e69f docs: add a demo showing how to track usage 2025-10-16 13:45:42 +08:00
43 changed files with 4736 additions and 6025 deletions

View File

@@ -9,78 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
-
- Added support for trickle ICE to the `SmallWebRTCTransport`.
- Added support for updating `OpenAITTSService` settings (`instructions` and
`speed`) at runtime via `TTSUpdateSettingsFrame`.
- Added `--whatsapp` flag to runner to better surface WhatsApp transport logs.
- Added `on_connected` and `on_disconnected` events to TTS and STT
websocket-based services.
- Added an `aggregate_sentences` arg in `ElevenLabsHttpTTSService`, where the
default value is True.
- Added a `room_properties` arg to the Daily runner's `configure()` method,
allowing `DailyRoomProperties` to be provided.
- The runner `--folder` argument now supports downloading files from
subdirectories.
### Changed
- `CartesiaSTTService` now inherits from `WebsocketSTTService`.
- Package upgrades:
- `openai` upgraded to support up to 2.x.x.
- `openpipe` upgraded to support up to 5.x.x.
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
### Fixed
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
to a mismatch in the _handle_transcription method's signature.
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
now handled properly in `PipelineTask` making it possible to cancel an asyncio
task that it's executing a `PipelineRunner` cleanly. Also,
`PipelineTask.cancel()` does not block anymore waiting for the `CancelFrame`
to reach the end of the pipeline (going back to the behavior in < 0.0.83).
- Fixed an issue in `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` where
the Flash models would split words, resulting in a space being inserted
between words.
- Fixed an issue where audio filters' `stop()` would not be called when using
`CancelFrame`.
- Fixed an issue in `ElevenLabsHttpTTSService`, where
`apply_text_normalization` was incorrectly set as a query parameter. It's now
being added as a request parameter.
- Fixed an issue where `RimeHttpTTSService` and `PiperTTSService` could generate
incorrectly 16-bit aligned audio frames, potentially leading to internal
errors or static audio.
- Fixed an issue in `SpeechmaticsSTTService` where `AdditionalVocabEntry` items
needed to have `sounds_like` for the session to start.
### Other
- Added foundational example `47-sentry-metrics.py`, demonstrating how to use the
`SentryMetrics` processor.
- Added foundational example `14x-function-calling-openpipe.py`.
## [0.0.90] - 2025-10-10
### Added

View File

@@ -63,24 +63,24 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
<br/>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/12-describe-video.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/assets/moondream.png" width="400" /></a>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/moondream-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/moondream-chatbot/image.png" width="400" /></a>
</p>
## 🧩 Available services
| Category | Services |
| ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)

View File

@@ -1,250 +0,0 @@
#!/usr/bin/env -S uv run
"""Utilities for creating Daily.co rooms with retry logic.
This module provides functions to create Daily rooms via REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
import time
from typing import Dict, Optional
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def periodic_progress_logger(
progress_dict: Dict[str, int],
total: int,
interval_seconds: float = 5.0,
stop_event: Optional[asyncio.Event] = None,
):
"""Log progress periodically in the background.
Args:
progress_dict: Shared dict with 'completed' and 'failed' counts
total: Total number of items being processed
interval_seconds: How often to log progress (default 5 seconds)
stop_event: Event to signal when to stop logging
"""
if stop_event is None:
stop_event = asyncio.Event()
while not stop_event.is_set():
await asyncio.sleep(interval_seconds)
if stop_event.is_set():
break
total_processed = progress_dict["completed"] + progress_dict["failed"]
if total_processed > 0:
percentage = (total_processed / total) * 100
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
logger.info(
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
f"{progress_dict['completed']} succeeded, "
f"{progress_dict['failed']} failed"
)
async def create_daily_room(
name: Optional[str] = None,
privacy: str = "public",
exp_minutes: int = 10,
max_retries: int = 5,
) -> Optional[Dict]:
"""Create a Daily room with automatic retry on rate limit errors.
Uses tenacity library to handle rate limiting (429 errors) with
exponential backoff and automatic retries.
Args:
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
privacy: Room privacy setting ("public" or "private")
exp_minutes: Minutes until room expires (default 10)
max_retries: Maximum number of retry attempts on rate limit (default 5)
Returns:
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
"""
# Calculate expiration timestamp (unix timestamp in seconds)
exp_timestamp = int(time.time()) + (exp_minutes * 60)
# Build request body
body = {
"privacy": privacy,
"properties": {
"exp": exp_timestamp,
},
}
if name:
body["name"] = name
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type(HTTPStatusError),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
async with AsyncClient(timeout=30) as client:
response = await client.post(
url="https://api.daily.co/v1/rooms",
headers=headers,
json=body,
)
response.raise_for_status()
return response.json()
# This line should never be reached due to reraise=True, but satisfies type checker
return None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
return None
except Exception as e:
logger.exception(f"Unexpected error creating room: {e}")
return None
async def create_room_with_progress(
index: int, total: int, progress_dict: Dict[str, int], **kwargs
) -> Optional[Dict]:
"""Wrapper for create_daily_room that tracks progress.
Args:
index: Index of this room creation (0-based)
total: Total number of rooms being created
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
**kwargs: Arguments passed to create_daily_room
Returns:
Room object dict or None
"""
result = await create_daily_room(**kwargs)
# Update progress
if result is not None:
progress_dict["completed"] += 1
else:
progress_dict["failed"] += 1
return result
async def test_create_rooms(
num_rooms: int = 1000,
progress_interval: float = 5.0,
) -> Dict[str, int | float]:
"""Attempt to create multiple Daily rooms concurrently.
This function demonstrates concurrent room creation and tracks
success/failure statistics. Rate limiting will likely occur when
creating many rooms quickly.
Args:
num_rooms: Number of rooms to attempt to create (default 1000)
progress_interval: How often to log progress in seconds (default 5.0)
Returns:
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
"""
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
start_time = time.time()
# Shared progress tracking dictionary
progress_dict = {"completed": 0, "failed": 0}
# Start background progress logger
stop_event = asyncio.Event()
progress_task = asyncio.create_task(
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
)
# Create and execute all tasks concurrently
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
tasks = [
create_room_with_progress(
index=i,
total=num_rooms,
progress_dict=progress_dict,
name=None, # Auto-generate names
privacy="public",
exp_minutes=10,
max_retries=5,
)
for i in range(num_rooms)
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
finally:
# Stop the progress logger
stop_event.set()
await progress_task
# Count successes and failures
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
failed_count = num_rooms - success_count
elapsed_time = time.time() - start_time
# Log statistics
logger.info("=" * 60)
logger.info("BULK ROOM CREATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms attempted: {num_rooms}")
logger.info(f"Successfully created: {success_count}")
logger.info(f"Failed to create: {failed_count}")
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
logger.info(f"Total time: {elapsed_time:.2f} seconds")
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
logger.info("=" * 60)
return {
"success": success_count,
"failed": failed_count,
"total": num_rooms,
"elapsed_seconds": elapsed_time,
}
# Example usage
async def main():
"""Example usage of the room creation functions."""
# Test creating a single room
logger.info("Testing single room creation...")
room = await create_daily_room(exp_minutes=10)
if room:
logger.info(f"Created room: {room['name']} at {room['url']}")
else:
logger.error("Failed to create room")
# Uncomment to test bulk creation (warning: may hit rate limits!)
logger.info("\nTesting bulk room creation...")
stats = await test_create_rooms(num_rooms=1000)
logger.info(f"Final stats: {stats}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -21,8 +21,8 @@ from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.stt import CartesiaSTTService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -58,7 +58,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -48,7 +48,10 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
stt = CartesiaSTTService(
api_key=os.getenv("CARTESIA_API_KEY"),
base_url=os.getenv("CARTESIA_BASE_URL"),
)
tl = TranscriptionLogger()

View File

@@ -1,182 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import time
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openpipe.llm import OpenPipeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
timestamp = int(time.time())
llm = OpenPipeLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
tags={"conversation_id": f"pipecat-{timestamp}"},
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: Print OpenAI Realtime API Token Usage Statistics
This example demonstrates how to access and print token usage statistics
from the OpenAI Realtime API, including detailed breakdowns of input/output
tokens, cached tokens, and audio/text token usage.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects don't get instantiated until the desired
# transport gets selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Main function demonstrating usage statistics tracking."""
logger.info(f"Starting bot")
# Initialize the OpenAI Realtime service
llm = OpenAIRealtimeLLMService(
api_key=os.getenv("OPENAI_API_KEY") or "",
model="gpt-4o-realtime-preview-2024-12-17",
)
# To access usage statistics, we wrap the internal response handler
# This is the cleanest way to intercept usage data from the realtime API
original_handler = llm._handle_evt_response_done
async def custom_response_done_handler(evt):
"""Custom handler that prints usage stats before calling original handler."""
# Print usage statistics if available
if evt.response.usage:
usage = evt.response.usage
logger.info("\n" + "=" * 50)
logger.info("📊 TOKEN USAGE STATISTICS")
logger.info("=" * 50)
logger.info(f"Total tokens: {usage.total_tokens}")
logger.info(f"Input tokens: {usage.input_tokens}")
logger.info(f"Output tokens: {usage.output_tokens}")
# Input token details
if usage.input_token_details:
logger.info(f"\n📥 Input token breakdown:")
logger.info(f" • Cached tokens: {usage.input_token_details.cached_tokens}")
logger.info(f" • Text tokens: {usage.input_token_details.text_tokens}")
logger.info(f" • Audio tokens: {usage.input_token_details.audio_tokens}")
# Cached token details if available
if usage.input_token_details.cached_tokens_details:
logger.info(
f" • Cached text tokens: {usage.input_token_details.cached_tokens_details.text_tokens}"
)
logger.info(
f" • Cached audio tokens: {usage.input_token_details.cached_tokens_details.audio_tokens}"
)
# Output token details
if usage.output_token_details:
logger.info(f"\n📤 Output token breakdown:")
logger.info(f" • Text tokens: {usage.output_token_details.text_tokens}")
logger.info(f" • Audio tokens: {usage.output_token_details.audio_tokens}")
logger.info("=" * 50 + "\n")
# Call the original handler to maintain normal functionality
await original_handler(evt)
# Replace the handler with our custom one
llm._handle_evt_response_done = custom_response_done_handler
# Create pipeline
pipeline = Pipeline(
[
transport.input(),
llm,
transport.output(),
]
)
# Create task
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info("🎤 Speak into your microphone to interact with the assistant")
logger.info("📊 Usage statistics will be printed after each response")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,142 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import sentry_sdk
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.metrics.sentry import SentryMetrics
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Initialize Sentry
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
traces_sample_rate=1.0,
)
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
metrics=SentryMetrics(),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
metrics=SentryMetrics(),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
metrics=SentryMetrics(),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 MiB

View File

@@ -1,267 +0,0 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_recordings(limit: int = 100) -> list[str]:
"""Get list of recent recording IDs from Daily API.
Args:
limit: Maximum number of recordings to retrieve (default 100)
Returns:
List of recording IDs (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url="https://api.daily.co/v1/recordings",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
recordings = data.get("data", [])
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
return recording_ids
except Exception as e:
logger.exception(f"Failed to get recordings from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent recordings."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 recordings
logger.info("Fetching recent recordings...")
recording_ids = await get_recent_recordings(limit=100)
if not recording_ids:
logger.error("No recordings found. Cannot proceed with test.")
return
logger.info(f"Found {len(recording_ids)} recordings to fetch")
# Fetch access links for each recording concurrently
logger.info(
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
)
# Create tasks for all recordings
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Get download link for a specific recording ID."""
try:
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None)
return (recording_url, recording_url)
except Exception as e:
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
return (None, None)
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
)
logger.debug(f" URL: {recording_url}")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(recording_ids)}] No link for {recording_id}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total recordings checked: {len(recording_ids)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,238 +0,0 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_rooms(limit: int = 100) -> list[str]:
"""Get list of recent room names from Daily API.
Args:
limit: Maximum number of rooms to retrieve (default 100, max 100)
Returns:
List of room names (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
rooms = data.get("data", [])
room_names = [room.get("name") for room in rooms if room.get("name")]
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
return room_names
except Exception as e:
logger.exception(f"Failed to get rooms from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent rooms."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 rooms
logger.info("Fetching recent rooms...")
room_names = await get_recent_rooms(limit=100)
if not room_names:
logger.error("No rooms found. Cannot proceed with test.")
return
logger.info(f"Found {len(room_names)} rooms to check for recordings")
# Call get_recording_s3_url_with_retry on each room concurrently
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
# Create tasks for all rooms
tasks = [
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
for room_name in room_names
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
logger.debug(f" URL: {recording_url[:80]}...")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(room_names)}] No recording for {room_name}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms checked: {len(room_names)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -34,7 +34,7 @@ dependencies = [
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0",
"openai>=1.74.0,<3",
"openai>=1.74.0,<=1.99.1",
# Pinning numba to resolve package dependencies
"numba==0.61.2",
"wait_for2>=0.4.1; python_version<'3.12'",
@@ -84,7 +84,7 @@ nim = []
neuphonic = [ "pipecat-ai[websockets-base]" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "pipecat-ai[websockets-base]" ]
openpipe = [ "openpipe>=4.50.0,<6" ]
openpipe = [ "openpipe~=4.50.0" ]
openrouter = []
perplexity = []
playht = [ "pipecat-ai[websockets-base]" ]
@@ -102,7 +102,7 @@ silero = [ "onnxruntime>=1.20.1,<2" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "pipecat-ai[websockets-base]" ]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.5.0" ]
speechmatics = [ "speechmatics-rt>=0.4.0" ]
strands = [ "strands-agents>=1.9.1,<2" ]
tavus=[]
together = []

View File

@@ -136,7 +136,6 @@ TESTS_14 = [
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14x-function-calling-openpipe.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# Currently not working.
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),

View File

@@ -70,15 +70,11 @@ class PipelineRunner(BaseObject):
"""
logger.debug(f"Runner {self} started running {task}")
self._tasks[task.name] = task
# PipelineTask handles asyncio.CancelledError to shutdown the pipeline
# properly and re-raises it in case there's more cleanup to do.
params = PipelineTaskParams(loop=self._loop)
try:
params = PipelineTaskParams(loop=self._loop)
await task.run(params)
except asyncio.CancelledError:
pass
await self._cancel()
del self._tasks[task.name]
# Cleanup base object.

View File

@@ -269,9 +269,6 @@ class PipelineTask(BasePipelineTask):
# StopFrame) has been received at the end of the pipeline.
self._pipeline_end_event = asyncio.Event()
# This event is set when the pipeline truly finishes.
self._pipeline_finished_event = asyncio.Event()
# This is the final pipeline. It is composed of a source processor,
# followed by the user pipeline, and ending with a sink processor. The
# source allows us to receive and react to upstream frames, and the sink
@@ -404,7 +401,11 @@ class PipelineTask(BasePipelineTask):
await self.queue_frame(EndFrame())
async def cancel(self):
"""Request the running pipeline to cancel."""
"""Immediately stop the running pipeline.
Cancels all running tasks and stops frame processing without
waiting for completion.
"""
if not self._finished:
await self._cancel()
@@ -416,38 +417,51 @@ class PipelineTask(BasePipelineTask):
"""
if self.has_finished():
return
# Setup processors.
await self._setup(params)
# Create all main tasks and wait for the main push task. This is the
# task that pushes frames to the very beginning of our pipeline (i.e. to
# our controlled source processor).
await self._create_tasks()
cleanup_pipeline = True
try:
# Wait for pipeline to finish.
await self._wait_for_pipeline_finished()
# Setup processors.
await self._setup(params)
# Create all main tasks and wait of the main push task. This is the
# task that pushes frames to the very beginning of our pipeline (our
# controlled source processor).
push_task = await self._create_tasks()
await push_task
# We have already cleaned up the pipeline inside the task.
cleanup_pipeline = False
# Pipeline has finished nicely.
self._finished = True
except asyncio.CancelledError:
logger.debug(f"Pipeline task {self} got cancelled from outside...")
# We have been cancelled from outside, let's just cancel everything.
await self._cancel()
# Wait again for pipeline to finish. This time we have really
# cancelled, so it should really finish.
await self._wait_for_pipeline_finished()
# Re-raise in case there's more cleanup to do.
# Raise exception back to the pipeline runner so it can cancel this
# task properly.
raise
finally:
# We can reach this point for different reasons:
#
# 1. The pipeline task has finished (try case).
# 2. By an asyncio task cancellation (except case).
logger.debug(f"Pipeline task {self} is finishing...")
await self._cancel_tasks()
if self._check_dangling_tasks:
self._print_dangling_tasks()
self._finished = True
logger.debug(f"Pipeline task {self} has finished")
# 1. The task has finished properly (e.g. `EndFrame`).
# 2. By calling `PipelineTask.cancel()`.
# 3. By asyncio task cancellation.
#
# Case (1) will execute the code below without issues because
# `self._finished` is true.
#
# Case (2) will execute the code below without issues because
# `self._cancelled` is true.
#
# Case (3) will raise the exception above (because we are cancelling
# the asyncio task). This will be then captured by the
# `PipelineRunner` which will call `PipelineTask.cancel()` and
# therefore becoming case (2).
if self._finished or self._cancelled:
logger.debug(f"Pipeline task {self} is finishing cleanup...")
await self._cancel_tasks()
await self._cleanup(cleanup_pipeline)
if self._check_dangling_tasks:
self._print_dangling_tasks()
self._finished = True
logger.debug(f"Pipeline task {self} has finished")
async def queue_frame(self, frame: Frame):
"""Queue a single frame to be pushed down the pipeline.
@@ -475,7 +489,19 @@ class PipelineTask(BasePipelineTask):
if not self._cancelled:
logger.debug(f"Cancelling pipeline task {self}")
self._cancelled = True
await self.queue_frame(CancelFrame())
cancel_frame = CancelFrame()
# Make sure everything is cleaned up downstream. This is sent
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._pipeline.queue_frame(cancel_frame)
# Wait for CancelFrame to make it through the pipeline.
await self._wait_for_pipeline_end(cancel_frame)
# Only cancel the push task, we don't want to be able to process any
# other frame after cancel. Everything else will be cancelled in
# run().
if self._process_push_task:
await self._task_manager.cancel_task(self._process_push_task)
self._process_push_task = None
async def _create_tasks(self):
"""Create and start all pipeline processing tasks."""
@@ -577,17 +603,6 @@ class PipelineTask(BasePipelineTask):
self._pipeline_end_event.clear()
# We are really done.
self._pipeline_finished_event.set()
async def _wait_for_pipeline_finished(self):
await self._pipeline_finished_event.wait()
self._pipeline_finished_event.clear()
# Make sure we wait for the main task to complete.
if self._process_push_task:
await self._process_push_task
self._process_push_task = None
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
mgr_params = TaskManagerParams(loop=params.loop)

View File

@@ -82,7 +82,6 @@ async def configure(
sip_enable_video: Optional[bool] = False,
sip_num_endpoints: Optional[int] = 1,
sip_codecs: Optional[Dict[str, List[str]]] = None,
room_properties: Optional[DailyRoomProperties] = None,
) -> DailyRoomConfig:
"""Configure Daily room URL and token with optional SIP capabilities.
@@ -100,10 +99,6 @@ async def configure(
sip_num_endpoints: Number of allowed SIP endpoints.
sip_codecs: Codecs to support for audio and video. If None, uses Daily defaults.
Example: {"audio": ["OPUS"], "video": ["H264"]}
room_properties: Optional DailyRoomProperties to use instead of building from
individual parameters. When provided, this overrides room_exp_duration and
SIP-related parameters. If not provided, properties are built from the
individual parameters as before.
Returns:
DailyRoomConfig: Object with room_url, token, and optional sip_endpoint.
@@ -120,13 +115,6 @@ async def configure(
# SIP-enabled room
sip_config = await configure(session, sip_caller_phone="+15551234567")
print(f"SIP endpoint: {sip_config.sip_endpoint}")
# Custom room properties with recording enabled
custom_props = DailyRoomProperties(
enable_recording="cloud",
max_participants=2,
)
config = await configure(session, room_properties=custom_props)
"""
# Check for required API key
api_key = os.getenv("DAILY_API_KEY")
@@ -136,32 +124,9 @@ async def configure(
"Get your API key from https://dashboard.daily.co/developers"
)
# Warn if both room_properties and individual parameters are provided
if room_properties is not None:
individual_params_provided = any(
[
room_exp_duration != 2.0,
token_exp_duration != 2.0,
sip_caller_phone is not None,
sip_enable_video is not False,
sip_num_endpoints != 1,
sip_codecs is not None,
]
)
if individual_params_provided:
logger.warning(
"Both room_properties and individual parameters (room_exp_duration, token_exp_duration, "
"sip_*) were provided. The room_properties will be used and individual parameters "
"will be ignored."
)
# Determine if SIP mode is enabled
sip_enabled = sip_caller_phone is not None
# If room_properties is provided, check if it has SIP configuration
if room_properties and room_properties.sip:
sip_enabled = True
daily_rest_helper = DailyRESTHelper(
daily_api_key=api_key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
@@ -185,29 +150,27 @@ async def configure(
room_name = f"{room_prefix}-{uuid.uuid4().hex[:8]}"
logger.info(f"Creating new Daily room: {room_name}")
# Use provided room_properties or build from parameters
if room_properties is None:
# Calculate expiration time
expiration_time = time.time() + (room_exp_duration * 60 * 60)
# Calculate expiration time
expiration_time = time.time() + (room_exp_duration * 60 * 60)
# Create room properties
room_properties = DailyRoomProperties(
exp=expiration_time,
eject_at_room_exp=True,
# Create room properties
room_properties = DailyRoomProperties(
exp=expiration_time,
eject_at_room_exp=True,
)
# Add SIP configuration if enabled
if sip_enabled:
sip_params = DailyRoomSipParams(
display_name=sip_caller_phone,
video=sip_enable_video,
sip_mode="dial-in",
num_endpoints=sip_num_endpoints,
codecs=sip_codecs,
)
# Add SIP configuration if enabled
if sip_enabled:
sip_params = DailyRoomSipParams(
display_name=sip_caller_phone,
video=sip_enable_video,
sip_mode="dial-in",
num_endpoints=sip_num_endpoints,
codecs=sip_codecs,
)
room_properties.sip = sip_params
room_properties.enable_dialout = True # Enable outbound calls if needed
room_properties.start_video_off = not sip_enable_video # Voice-only by default
room_properties.sip = sip_params
room_properties.enable_dialout = True # Enable outbound calls if needed
room_properties.start_video_off = not sip_enable_video # Voice-only by default
# Create room parameters
room_params = DailyRoomParams(name=room_name, properties=room_properties)

View File

@@ -70,14 +70,12 @@ import asyncio
import mimetypes
import os
import sys
import uuid
from contextlib import asynccontextmanager
from http import HTTPMethod
from pathlib import Path
from typing import Any, Dict, List, Optional, TypedDict
from typing import Optional
import aiohttp
from fastapi.responses import FileResponse, Response
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.runner.types import (
@@ -168,7 +166,6 @@ def _create_server_app(
host: str = "localhost",
proxy: str,
esp32_mode: bool = False,
whatsapp_enabled: bool = False,
folder: Optional[str] = None,
):
"""Create FastAPI app with transport-specific routes."""
@@ -185,8 +182,7 @@ def _create_server_app(
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host, folder=folder)
if whatsapp_enabled:
_setup_whatsapp_routes(app)
_setup_whatsapp_routes(app)
elif transport_type == "daily":
_setup_daily_routes(app)
elif transport_type in TELEPHONY_TRANSPORTS:
@@ -204,10 +200,8 @@ def _setup_webrtc_routes(
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
IceCandidate,
SmallWebRTCPatchRequest,
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
@@ -215,16 +209,6 @@ def _setup_webrtc_routes(
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
class IceConfig(TypedDict):
iceServers: List[IceServer]
class StartBotResult(TypedDict, total=False):
sessionId: str
iceConfig: Optional[IceConfig]
# In-memory store of active sessions: session_id -> session info
active_sessions: Dict[str, Dict[str, Any]] = {}
# Mount the frontend
app.mount("/client", SmallWebRTCPrebuiltUI)
@@ -270,74 +254,6 @@ def _setup_webrtc_routes(
)
return answer
@app.patch("/api/offer")
async def ice_candidate(request: SmallWebRTCPatchRequest):
"""Handle WebRTC new ice candidate requests."""
logger.debug(f"Received patch request: {request}")
await small_webrtc_handler.handle_patch_request(request)
return {"status": "success"}
@app.post("/start")
async def rtvi_start(request: Request):
"""Mimic Pipecat Cloud's /start endpoint."""
# Parse the request body
try:
request_data = await request.json()
logger.debug(f"Received request: {request_data}")
except Exception as e:
logger.error(f"Failed to parse request body: {e}")
request_data = {}
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
session_id = str(uuid.uuid4())
active_sessions[session_id] = request_data
result: StartBotResult = {"sessionId": session_id}
if request_data.get("enableDefaultIceServers"):
result["iceConfig"] = IceConfig(
iceServers=[IceServer(urls="stun:stun.l.google.com:19302")]
)
return result
@app.api_route(
"/sessions/{session_id}/{path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
)
async def proxy_request(
session_id: str, path: str, request: Request, background_tasks: BackgroundTasks
):
"""Mimic Pipecat Cloud's proxy."""
active_session = active_sessions.get(session_id)
if not active_session:
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
if path.endswith("api/offer"):
# Parse the request body and convert to SmallWebRTCRequest
try:
request_data = await request.json()
if request.method == HTTPMethod.POST.value:
webrtc_request = SmallWebRTCRequest(
sdp=request_data["sdp"],
type=request_data["type"],
pc_id=request_data.get("pc_id"),
restart_pc=request_data.get("restart_pc"),
request_data=request_data,
)
return await offer(webrtc_request, background_tasks)
elif request.method == HTTPMethod.PATCH.value:
patch_request = SmallWebRTCPatchRequest(
pc_id=request_data["pc_id"],
candidates=[IceCandidate(**c) for c in request_data.get("candidates", [])],
)
return await ice_candidate(patch_request)
except Exception as e:
logger.error(f"Failed to parse WebRTC request: {e}")
return Response(content="Invalid WebRTC request", status_code=400)
logger.info(f"Received request for path: {path}")
return Response(status_code=200)
@asynccontextmanager
async def smallwebrtc_lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
@@ -373,29 +289,6 @@ def _add_lifespan_to_app(app: FastAPI, new_lifespan):
def _setup_whatsapp_routes(app: FastAPI):
"""Set up WebRTC-specific routes."""
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
if not all(
[
WHATSAPP_APP_SECRET,
WHATSAPP_PHONE_NUMBER_ID,
WHATSAPP_TOKEN,
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
]
):
logger.error(
"""Missing required environment variables for WhatsApp transport:
WHATSAPP_APP_SECRET
WHATSAPP_PHONE_NUMBER_ID
WHATSAPP_TOKEN
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN
"""
)
return
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
@@ -407,7 +300,24 @@ def _setup_whatsapp_routes(app: FastAPI):
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.whatsapp.client import WhatsAppClient
except ImportError as e:
logger.error(f"WhatsApp transport dependencies not installed: {e}")
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
if not all(
[
WHATSAPP_TOKEN,
WHATSAPP_PHONE_NUMBER_ID,
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
]
):
logger.debug(
"Missing required environment variables for WhatsApp transport. Keeping it disabled."
)
return
# Global WhatsApp client instance
@@ -577,6 +487,8 @@ def _setup_daily_routes(app: FastAPI):
else:
logger.debug("No body data provided in request")
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
@@ -664,6 +576,8 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
async def _run_daily_direct():
"""Run Daily bot with direct connection (no FastAPI server)."""
try:
import aiohttp
from pipecat.runner.daily import configure
except ImportError as e:
logger.error("Daily transport dependencies not installed.")
@@ -775,12 +689,6 @@ def main():
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
)
parser.add_argument(
"--whatsapp",
action="store_true",
default=False,
help="Ensure requried WhatsApp environment variables are present",
)
args = parser.parse_args()
@@ -823,11 +731,10 @@ def main():
print()
if args.esp32:
print(f"🚀 Bot ready! (ESP32 mode)")
elif args.whatsapp:
print(f"🚀 Bot ready! (WhatsApp)")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
else:
print(f"🚀 Bot ready!")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
print()
elif args.transport == "daily":
print()
@@ -845,7 +752,6 @@ def main():
host=args.host,
proxy=args.proxy,
esp32_mode=args.esp32,
whatsapp_enabled=args.whatsapp,
folder=args.folder,
)

View File

@@ -108,8 +108,6 @@ class AssemblyAIConnectionParams(BaseModel):
end_of_turn_confidence_threshold: Confidence threshold for end-of-turn detection.
min_end_of_turn_silence_when_confident: Minimum silence duration when confident about end-of-turn.
max_turn_silence: Maximum silence duration before forcing end-of-turn.
keyterms_prompt: List of key terms to guide transcription. Will be JSON serialized before sending.
speech_model: Select between English and multilingual models. Defaults to "universal-streaming-english".
"""
sample_rate: int = 16000
@@ -119,7 +117,3 @@ class AssemblyAIConnectionParams(BaseModel):
end_of_turn_confidence_threshold: Optional[float] = None
min_end_of_turn_silence_when_confident: Optional[int] = None
max_turn_silence: Optional[int] = None
keyterms_prompt: Optional[List[str]] = None
speech_model: Literal["universal-streaming-english", "universal-streaming-multilingual"] = (
"universal-streaming-english"
)

View File

@@ -174,16 +174,11 @@ class AssemblyAISTTService(STTService):
def _build_ws_url(self) -> str:
"""Build WebSocket URL with query parameters using urllib.parse.urlencode."""
params = {}
for k, v in self._connection_params.model_dump().items():
if v is not None:
if k == "keyterms_prompt":
params[k] = json.dumps(v)
elif isinstance(v, bool):
params[k] = str(v).lower()
else:
params[k] = v
params = {
k: str(v).lower() if isinstance(v, bool) else v
for k, v in self._connection_params.model_dump().items()
if v is not None
}
if params:
query_string = urlencode(params)
return f"{self._api_endpoint_base_url}?{query_string}"
@@ -202,8 +197,6 @@ class AssemblyAISTTService(STTService):
)
self._connected = True
self._receive_task = self.create_task(self._receive_task_handler())
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Failed to connect to AssemblyAI: {e}")
self._connected = False
@@ -245,7 +238,6 @@ class AssemblyAISTTService(STTService):
self._websocket = None
self._connected = False
self._receive_task = None
await self._call_event_handler("on_disconnected")
async def _receive_task_handler(self):
"""Handle incoming WebSocket messages."""

View File

@@ -235,8 +235,6 @@ class AsyncAITTSService(InterruptibleTTSService):
}
await self._get_websocket().send(json.dumps(init_msg))
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -254,7 +252,6 @@ class AsyncAITTSService(InterruptibleTTSService):
finally:
self._websocket = None
self._started = False
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:

View File

@@ -286,7 +286,6 @@ class AWSTranscribeSTTService(STTService):
logger.info(f"{self} Successfully connected to AWS Transcribe")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Failed to connect to AWS Transcribe: {e}")
await self._disconnect()
@@ -311,7 +310,6 @@ class AWSTranscribeSTTService(STTService):
logger.warning(f"{self} Error closing WebSocket connection: {e}")
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
def language_to_service_language(self, language: Language) -> str | None:
"""Convert internal language enum to AWS Transcribe language code.

View File

@@ -28,12 +28,13 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
@@ -123,7 +124,7 @@ class CartesiaLiveOptions:
return cls(**json.loads(json_str))
class CartesiaSTTService(WebsocketSTTService):
class CartesiaSTTService(STTService):
"""Speech-to-text service using Cartesia Live API.
Provides real-time speech transcription through WebSocket connection
@@ -175,7 +176,8 @@ class CartesiaSTTService(WebsocketSTTService):
self.set_model_name(merged_options.model)
self._api_key = api_key
self._base_url = base_url or "api.cartesia.ai"
self._receive_task = None
self._connection = None
self._receiver_task = None
def can_generate_metrics(self) -> bool:
"""Check if the service can generate processing metrics.
@@ -212,27 +214,6 @@ class CartesiaSTTService(WebsocketSTTService):
await super().cancel(frame)
await self._disconnect()
async def start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
Args:
frame: The frame to process.
direction: Direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send("finalize")
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process audio data for speech-to-text transcription.
@@ -243,71 +224,45 @@ class CartesiaSTTService(WebsocketSTTService):
None - transcription results are handled via WebSocket responses.
"""
# If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again
if not self._websocket or self._websocket.state is State.CLOSED:
if not self._connection or self._connection.state is State.CLOSED:
await self._connect()
await self._websocket.send(audio)
await self._connection.send(audio)
yield None
async def _connect(self):
await self._connect_websocket()
params = self._settings.to_dict()
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
logger.debug(f"Connecting to Cartesia: {ws_url}")
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
if self._websocket and not self._receive_task:
self._receive_task = asyncio.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia STT")
params = self._settings.to_dict()
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
self._connection = await websocket_connect(ws_url, additional_headers=headers)
# Setup the receiver task to handle the incoming messages from the Cartesia server
if self._receiver_task is None or self._receiver_task.done():
self._receiver_task = asyncio.create_task(self._receive_messages())
logger.debug(f"Connected to Cartesia")
except Exception as e:
logger.error(f"{self}: unable to connect to Cartesia: {e}")
async def _disconnect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
logger.debug("Disconnecting from Cartesia STT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _process_messages(self):
async for message in self._get_websocket():
try:
data = json.loads(message)
await self._process_response(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
async def _receive_messages(self):
while True:
await self._process_messages()
# Cartesia times out after 5 minutes of innactivity (no keepalive
# mechanism is available). So, we try to reconnect.
logger.debug(f"{self} Cartesia connection was disconnected (timeout?), reconnecting")
await self._connect_websocket()
try:
while True:
if not self._connection or self._connection.state is State.CLOSED:
break
message = await self._connection.recv()
try:
data = json.loads(message)
await self._process_response(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
except asyncio.CancelledError:
pass
except websockets.exceptions.ConnectionClosed as e:
logger.debug(f"WebSocket connection closed: {e}")
except Exception as e:
logger.error(f"Error in message receiver: {e}")
async def _process_response(self, data):
if "type" in data:
@@ -361,3 +316,41 @@ class CartesiaSTTService(WebsocketSTTService):
language,
)
)
async def _disconnect(self):
if self._receiver_task:
self._receiver_task.cancel()
try:
await self._receiver_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"Unexpected exception while cancelling task: {e}")
self._receiver_task = None
if self._connection and self._connection.state is State.OPEN:
logger.debug("Disconnecting from Cartesia")
await self._connection.close()
self._connection = None
async def start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
Args:
frame: The frame to process.
direction: Direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._connection and self._connection.state is State.OPEN:
await self._connection.send("finalize")

View File

@@ -344,11 +344,10 @@ class CartesiaTTSService(AudioContextWordTTSService):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia TTS")
logger.debug("Connecting to Cartesia")
self._websocket = await websocket_connect(
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -366,7 +365,6 @@ class CartesiaTTSService(AudioContextWordTTSService):
finally:
self._context_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:

View File

@@ -205,7 +205,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
additional_headers={"Authorization": f"Token {self._api_key}"},
)
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -226,9 +225,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _send_close_stream(self) -> None:
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.

View File

@@ -168,24 +168,16 @@ def build_elevenlabs_voice_settings(
def calculate_word_times(
alignment_info: Mapping[str, Any],
cumulative_time: float,
partial_word: str = "",
partial_word_start_time: float = 0.0,
) -> tuple[List[Tuple[str, float]], str, float]:
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
"""Calculate word timestamps from character alignment information.
Args:
alignment_info: Character alignment data from ElevenLabs API.
cumulative_time: Base time offset for this chunk.
partial_word: Partial word carried over from previous chunk.
partial_word_start_time: Start time of the partial word.
Returns:
Tuple of (word_times, new_partial_word, new_partial_word_start_time):
- word_times: List of (word, timestamp) tuples for complete words
- new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space)
- new_partial_word_start_time: Start time of the incomplete word
List of (word, timestamp) tuples.
"""
chars = alignment_info["chars"]
char_start_times_ms = alignment_info["charStartTimesMs"]
@@ -194,37 +186,41 @@ def calculate_word_times(
logger.error(
f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}"
)
return ([], partial_word, partial_word_start_time)
return []
# Build words and track their start positions
words = []
word_start_times = []
current_word = partial_word # Start with any partial word from previous chunk
word_start_time = partial_word_start_time if partial_word else None
word_start_indices = []
current_word = ""
word_start_index = None
for i, char in enumerate(chars):
if char == " ":
# End of current word
if current_word: # Only add non-empty words
words.append(current_word)
word_start_times.append(word_start_time)
word_start_indices.append(word_start_index)
current_word = ""
word_start_time = None
word_start_index = None
else:
# Building a word
if word_start_time is None: # First character of new word
# Convert from milliseconds to seconds and add cumulative offset
word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0)
if word_start_index is None: # First character of new word
word_start_index = i
current_word += char
# Build result for complete words
word_times = list(zip(words, word_start_times))
# Handle the last word if there's no trailing space
if current_word and word_start_index is not None:
words.append(current_word)
word_start_indices.append(word_start_index)
# Return any incomplete word at the end of this chunk
new_partial_word = current_word if current_word else ""
new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0
# Calculate timestamps for each word
word_times = []
for word, start_idx in zip(words, word_start_indices):
# Convert from milliseconds to seconds and add cumulative offset
start_time_seconds = cumulative_time + (char_start_times_ms[start_idx] / 1000.0)
word_times.append((word, start_time_seconds))
return (word_times, new_partial_word, new_partial_word_start_time)
return word_times
class ElevenLabsTTSService(AudioContextWordTTSService):
@@ -336,9 +332,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
# there's an interruption or TTSStoppedFrame.
self._started = False
self._cumulative_time = 0
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
# Context management for v1 multi API
self._context_id = None
@@ -528,7 +521,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -551,7 +543,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self._started = False
self._context_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
@@ -579,8 +570,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
logger.error(f"Error closing context on interruption: {e}")
self._context_id = None
self._started = False
self._partial_word = ""
self._partial_word_start_time = 0.0
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
@@ -620,14 +609,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
if msg.get("alignment"):
alignment = msg["alignment"]
word_times, self._partial_word, self._partial_word_start_time = (
calculate_word_times(
alignment,
self._cumulative_time,
self._partial_word,
self._partial_word_start_time,
)
)
word_times = calculate_word_times(alignment, self._cumulative_time)
if word_times:
await self.add_word_timestamps(word_times)
@@ -701,8 +683,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
yield TTSStartedFrame()
self._started = True
self._cumulative_time = 0
self._partial_word = ""
self._partial_word_start_time = 0.0
# If a context ID does not exist, create a new one and
# register it. If an ID exists, that means the Pipeline is
# configured for allow_interruptions=False, so continue
@@ -776,7 +756,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
base_url: str = "https://api.elevenlabs.io",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
aggregate_sentences: Optional[bool] = True,
**kwargs,
):
"""Initialize the ElevenLabs HTTP TTS service.
@@ -789,11 +768,10 @@ class ElevenLabsHttpTTSService(WordTTSService):
base_url: Base URL for ElevenLabs HTTP API.
sample_rate: Audio sample rate. If None, uses default.
params: Additional input parameters for voice customization.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
**kwargs: Additional arguments passed to the parent service.
"""
super().__init__(
aggregate_sentences=aggregate_sentences,
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
sample_rate=sample_rate,
@@ -831,10 +809,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
# Store previous text for context within a turn
self._previous_text = ""
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert pipecat Language to ElevenLabs language code.
@@ -862,8 +836,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._cumulative_time = 0
self._started = False
self._previous_text = ""
self._partial_word = ""
self._partial_word_start_time = 0.0
logger.debug(f"{self}: Reset internal state")
async def start(self, frame: StartFrame):
@@ -898,13 +870,11 @@ class ElevenLabsHttpTTSService(WordTTSService):
def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> List[Tuple[str, float]]:
"""Calculate word timing from character alignment data.
This method handles partial words that may span across multiple alignment chunks.
Args:
alignment_info: Character timing data from ElevenLabs.
Returns:
List of (word, timestamp) pairs for complete words in this chunk.
List of (word, timestamp) pairs.
Example input data::
@@ -930,28 +900,30 @@ class ElevenLabsHttpTTSService(WordTTSService):
# Build the words and find their start times
words = []
word_start_times = []
# Start with any partial word from previous chunk
current_word = self._partial_word
word_start_time = self._partial_word_start_time if self._partial_word else None
current_word = ""
first_char_idx = -1
for i, char in enumerate(chars):
if char == " ":
if current_word: # Only add non-empty words
words.append(current_word)
word_start_times.append(word_start_time)
current_word = ""
word_start_time = None
else:
if word_start_time is None: # First character of a new word
# Use time of the first character of the word, offset by cumulative time
word_start_time = self._cumulative_time + char_start_times[i]
word_start_times.append(
self._cumulative_time + char_start_times[first_char_idx]
)
current_word = ""
first_char_idx = -1
else:
if not current_word: # This is the first character of a new word
first_char_idx = i
current_word += char
# Store any incomplete word at the end of this chunk
self._partial_word = current_word if current_word else ""
self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0
# Don't forget the last word if there's no trailing space
if current_word and first_char_idx >= 0:
words.append(current_word)
word_start_times.append(self._cumulative_time + char_start_times[first_char_idx])
# Create word-time pairs for complete words only
# Create word-time pairs
word_times = list(zip(words, word_start_times))
return word_times
@@ -987,9 +959,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
if self._voice_settings:
payload["voice_settings"] = self._voice_settings
if self._settings["apply_text_normalization"] is not None:
payload["apply_text_normalization"] = self._settings["apply_text_normalization"]
language = self._settings["language"]
if self._model_name in ELEVENLABS_MULTILINGUAL_MODELS and language:
payload["language_code"] = language
@@ -1010,6 +979,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
}
if self._settings["optimize_streaming_latency"] is not None:
params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"]
if self._settings["apply_text_normalization"] is not None:
params["apply_text_normalization"] = self._settings["apply_text_normalization"]
try:
await self.start_ttfb_metrics()
@@ -1070,14 +1041,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.error(f"Error processing response: {e}", exc_info=True)
continue
# After processing all chunks, emit any remaining partial word
# since this is the end of the utterance
if self._partial_word:
final_word_time = [(self._partial_word, self._partial_word_start_time)]
await self.add_word_timestamps(final_word_time)
self._partial_word = ""
self._partial_word_start_time = 0.0
# After processing all chunks, add the total utterance duration
# to the cumulative time to ensure next utterance starts after this one
if utterance_duration > 0:

View File

@@ -225,8 +225,6 @@ class FishAudioTTSService(InterruptibleTTSService):
start_message = {"event": "start", "request": {"text": "", **self._settings}}
await self._websocket.send(ormsgpack.packb(start_message))
logger.debug("Sent start message to Fish Audio")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Fish Audio initialization error: {e}")
self._websocket = None
@@ -247,7 +245,6 @@ class FishAudioTTSService(InterruptibleTTSService):
self._request_id = None
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
async def flush_audio(self):
"""Flush any buffered audio by sending a flush event to Fish Audio."""

View File

@@ -730,8 +730,6 @@ class GoogleSTTService(STTService):
self._request_queue = asyncio.Queue()
self._streaming_task = self.create_task(self._stream_audio())
await self._call_event_handler("on_connected")
async def _disconnect(self):
"""Clean up streaming recognition resources."""
if self._streaming_task:
@@ -739,8 +737,6 @@ class GoogleSTTService(STTService):
await self.cancel_task(self._streaming_task)
self._streaming_task = None
await self._call_event_handler("on_disconnected")
async def _request_generator(self):
"""Generates requests for the streaming recognize method."""
recognizer_path = f"projects/{self._project_id}/locations/{self._location}/recognizers/_"

View File

@@ -222,7 +222,6 @@ class LmntTTSService(InterruptibleTTSService):
# Send initialization message
await self._websocket.send(json.dumps(init_msg))
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -244,7 +243,6 @@ class LmntTTSService(InterruptibleTTSService):
finally:
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get the WebSocket connection if available."""

View File

@@ -293,8 +293,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
headers = {"x-api-key": self._api_key}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -313,7 +311,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
finally:
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _receive_messages(self):
"""Receive and process messages from Neuphonic WebSocket."""

View File

@@ -14,7 +14,6 @@ from typing import AsyncGenerator, Dict, Literal, Optional
from loguru import logger
from openai import AsyncOpenAI, BadRequestError
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
@@ -56,17 +55,6 @@ class OpenAITTSService(TTSService):
OPENAI_SAMPLE_RATE = 24000 # OpenAI TTS always outputs at 24kHz
class InputParams(BaseModel):
"""Input parameters for OpenAI TTS configuration.
Parameters:
instructions: Instructions to guide voice synthesis behavior.
speed: Voice speed control (0.25 to 4.0, default 1.0).
"""
instructions: Optional[str] = None
speed: Optional[float] = None
def __init__(
self,
*,
@@ -77,7 +65,6 @@ class OpenAITTSService(TTSService):
sample_rate: Optional[int] = None,
instructions: Optional[str] = None,
speed: Optional[float] = None,
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize OpenAI TTS service.
@@ -90,11 +77,7 @@ class OpenAITTSService(TTSService):
sample_rate: Output audio sample rate in Hz. If None, uses OpenAI's default 24kHz.
instructions: Optional instructions to guide voice synthesis behavior.
speed: Voice speed control (0.25 to 4.0, default 1.0).
params: Optional synthesis controls (acting instructions, speed, ...).
**kwargs: Additional keyword arguments passed to TTSService.
.. deprecated:: 0.0.91
The `instructions` and `speed` parameters are deprecated, use `InputParams` instead.
"""
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
logger.warning(
@@ -103,26 +86,12 @@ class OpenAITTSService(TTSService):
)
super().__init__(sample_rate=sample_rate, **kwargs)
self._speed = speed
self.set_model_name(model)
self.set_voice(voice)
self._instructions = instructions
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url)
if instructions or speed:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The `instructions` and `speed` parameters are deprecated, use `InputParams` instead.",
DeprecationWarning,
stacklevel=2,
)
self._settings = {
"instructions": params.instructions if params else instructions,
"speed": params.speed if params else speed,
}
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -175,11 +144,11 @@ class OpenAITTSService(TTSService):
"response_format": "pcm",
}
if self._settings["instructions"]:
create_params["instructions"] = self._settings["instructions"]
if self._instructions:
create_params["instructions"] = self._instructions
if self._settings["speed"]:
create_params["speed"] = self._settings["speed"]
if self._speed:
create_params["speed"] = self._speed
async with self._client.audio.speech.with_streaming_response.create(
**create_params

View File

@@ -269,8 +269,6 @@ class PlayHTTTSService(InterruptibleTTSService):
raise ValueError("WebSocket URL is not a string")
self._websocket = await websocket_connect(self._websocket_url)
await self._call_event_handler("on_connected")
except ValueError as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -293,7 +291,6 @@ class PlayHTTTSService(InterruptibleTTSService):
finally:
self._request_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _get_websocket_url(self):
"""Retrieve WebSocket URL from PlayHT API."""

View File

@@ -255,8 +255,6 @@ class RimeTTSService(AudioContextWordTTSService):
url = f"{self._url}?{params}"
headers = {"Authorization": f"Bearer {self._api_key}"}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -274,7 +272,6 @@ class RimeTTSService(AudioContextWordTTSService):
finally:
self._context_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active websocket connection or raise exception."""

View File

@@ -583,9 +583,7 @@ class RivaSegmentedSTTService(SegmentedSTTService):
self._config.language_code = self._language
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
async def _handle_transcription(self, transcript: str, language: Optional[Language] = None):
"""Handle a transcription result with tracing."""
pass

View File

@@ -76,29 +76,17 @@ class SarvamHttpTTSService(TTSService):
Example::
tts = SarvamHttpTTSService(
tts = SarvamTTSService(
api_key="your-api-key",
voice_id="anushka",
model="bulbul:v2",
aiohttp_session=session,
params=SarvamHttpTTSService.InputParams(
params=SarvamTTSService.InputParams(
language=Language.HI,
pitch=0.1,
pace=1.2
)
)
# For bulbul v3 beta with any speaker:
tts_v3 = SarvamHttpTTSService(
api_key="your-api-key",
voice_id="speaker_name",
model="bulbul:v3,
aiohttp_session=session,
params=SarvamHttpTTSService.InputParams(
language=Language.HI,
temperature=0.8
)
)
"""
class InputParams(BaseModel):
@@ -117,14 +105,6 @@ class SarvamHttpTTSService(TTSService):
pace: Optional[float] = Field(default=1.0, ge=0.3, le=3.0)
loudness: Optional[float] = Field(default=1.0, ge=0.1, le=3.0)
enable_preprocessing: Optional[bool] = False
temperature: Optional[float] = Field(
default=0.6,
ge=0.01,
le=1.0,
description="Controls the randomness of the output for bulbul v3 beta. "
"Lower values make the output more focused and deterministic, while "
"higher values make it more random. Range: 0.01 to 1.0. Default: 0.6.",
)
def __init__(
self,
@@ -144,7 +124,7 @@ class SarvamHttpTTSService(TTSService):
api_key: Sarvam AI API subscription key.
aiohttp_session: Shared aiohttp session for making requests.
voice_id: Speaker voice ID (e.g., "anushka", "meera"). Defaults to "anushka".
model: TTS model to use ("bulbul:v2" or "bulbul:v3-beta" or "bulbul:v3"). Defaults to "bulbul:v2".
model: TTS model to use ("bulbul:v1" or "bulbul:v2"). Defaults to "bulbul:v2".
base_url: Sarvam AI API base URL. Defaults to "https://api.sarvam.ai".
sample_rate: Audio sample rate in Hz (8000, 16000, 22050, 24000). If None, uses default.
params: Additional voice and preprocessing parameters. If None, uses defaults.
@@ -158,32 +138,16 @@ class SarvamHttpTTSService(TTSService):
self._base_url = base_url
self._session = aiohttp_session
# Build base settings common to all models
self._settings = {
"language": (
self.language_to_service_language(params.language) if params.language else "en-IN"
),
"pitch": params.pitch,
"pace": params.pace,
"loudness": params.loudness,
"enable_preprocessing": params.enable_preprocessing,
}
# Add model-specific parameters
if model in ("bulbul:v3-beta", "bulbul:v3"):
self._settings.update(
{
"temperature": getattr(params, "temperature", 0.6),
"model": model,
}
)
else:
self._settings.update(
{
"pitch": params.pitch,
"pace": params.pace,
"loudness": params.loudness,
"model": model,
}
)
self.set_model_name(model)
self.set_voice(voice_id)
@@ -311,18 +275,6 @@ class SarvamTTSService(InterruptibleTTSService):
pace=1.2
)
)
# For bulbul v3 beta with any speaker and temperature:
# Note: pace and loudness are not supported for bulbul v3 and bulbul v3 beta
tts_v3 = SarvamTTSService(
api_key="your-api-key",
voice_id="speaker_name",
model="bulbul:v3",
params=SarvamTTSService.InputParams(
language=Language.HI,
temperature=0.8
)
)
"""
class InputParams(BaseModel):
@@ -358,14 +310,6 @@ class SarvamTTSService(InterruptibleTTSService):
output_audio_codec: Optional[str] = "linear16"
output_audio_bitrate: Optional[str] = "128k"
language: Optional[Language] = Language.EN
temperature: Optional[float] = Field(
default=0.6,
ge=0.01,
le=1.0,
description="Controls the randomness of the output for bulbul v3 beta. "
"Lower values make the output more focused and deterministic, while "
"higher values make it more random. Range: 0.01 to 1.0. Default: 0.6.",
)
def __init__(
self,
@@ -385,7 +329,6 @@ class SarvamTTSService(InterruptibleTTSService):
Args:
api_key: Sarvam API key for authenticating TTS requests.
model: Identifier of the Sarvam speech model (default "bulbul:v2").
Supports "bulbul:v2", "bulbul:v3-beta" and "bulbul:v3".
voice_id: Voice identifier for synthesis (default "anushka").
url: WebSocket URL for connecting to the TTS backend (default production URL).
aiohttp_session: Optional shared aiohttp session. To maintain backward compatibility.
@@ -428,12 +371,15 @@ class SarvamTTSService(InterruptibleTTSService):
self._api_key = api_key
self.set_model_name(model)
self.set_voice(voice_id)
# Build base settings common to all models
# Configuration parameters
self._settings = {
"target_language_code": (
self.language_to_service_language(params.language) if params.language else "en-IN"
),
"pitch": params.pitch,
"pace": params.pace,
"speaker": voice_id,
"loudness": params.loudness,
"speech_sample_rate": 0,
"enable_preprocessing": params.enable_preprocessing,
"min_buffer_size": params.min_buffer_size,
@@ -441,24 +387,6 @@ class SarvamTTSService(InterruptibleTTSService):
"output_audio_codec": params.output_audio_codec,
"output_audio_bitrate": params.output_audio_bitrate,
}
# Add model-specific parameters
if model in ("bulbul:v3-beta", "bulbul:v3"):
self._settings.update(
{
"temperature": getattr(params, "temperature", 0.6),
"model": model,
}
)
else:
self._settings.update(
{
"pitch": params.pitch,
"pace": params.pace,
"loudness": params.loudness,
"model": model,
}
)
self._started = False
self._receive_task = None
@@ -597,7 +525,6 @@ class SarvamTTSService(InterruptibleTTSService):
logger.debug("Connected to Sarvam TTS Websocket")
await self._send_config()
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -629,10 +556,6 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:

View File

@@ -577,7 +577,6 @@ class SpeechmaticsSTTService(STTService):
),
)
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Error connecting to Speechmatics: {e}")
self._client = None
@@ -596,7 +595,6 @@ class SpeechmaticsSTTService(STTService):
logger.error(f"{self} Error closing Speechmatics client: {e}")
finally:
self._client = None
await self._call_event_handler("on_disconnected")
def _process_config(self) -> None:
"""Create a formatted STT transcription config.
@@ -620,7 +618,7 @@ class SpeechmaticsSTTService(STTService):
transcription_config.additional_vocab = [
{
"content": e.content,
**({"sounds_like": e.sounds_like} if e.sounds_like else {}),
"sounds_like": e.sounds_like,
}
for e in self._params.additional_vocab
]

View File

@@ -35,25 +35,6 @@ class STTService(AIService):
Provides common functionality for STT services including audio passthrough,
muting, settings management, and audio processing. Subclasses must implement
the run_stt method to provide actual speech recognition.
Event handlers:
on_connected: Called when connected to the STT service.
on_connected: Called when disconnected from the STT service.
on_connection_error: Called when a connection to the STT service error occurs.
Example::
@stt.event_handler("on_connected")
async def on_connected(stt: STTService):
logger.debug(f"STT connected")
@stt.event_handler("on_disconnected")
async def on_disconnected(stt: STTService):
logger.debug(f"STT disconnected")
@stt.event_handler("on_connection_error")
async def on_connection_error(stt: STTService, error: str):
logger.error(f"STT connection error: {error}")
"""
def __init__(
@@ -81,10 +62,6 @@ class STTService(AIService):
self._muted: bool = False
self._user_id: str = ""
self._register_event_handler("on_connected")
self._register_event_handler("on_disconnected")
self._register_event_handler("on_connection_error")
@property
def is_muted(self) -> bool:
"""Check if the STT service is currently muted.
@@ -315,6 +292,15 @@ class WebsocketSTTService(STTService, WebsocketService):
Combines STT functionality with websocket connectivity, providing automatic
error handling and reconnection capabilities.
Event handlers:
on_connection_error: Called when a websocket connection error occurs.
Example::
@stt.event_handler("on_connection_error")
async def on_connection_error(stt: STTService, error: str):
logger.error(f"STT connection error: {error}")
"""
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
@@ -326,6 +312,7 @@ class WebsocketSTTService(STTService, WebsocketService):
"""
STTService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)

View File

@@ -59,25 +59,6 @@ class TTSService(AIService):
Provides common functionality for TTS services including text aggregation,
filtering, audio generation, and frame management. Supports configurable
sentence aggregation, silence insertion, and frame processing control.
Event handlers:
on_connected: Called when connected to the STT service.
on_connected: Called when disconnected from the STT service.
on_connection_error: Called when a connection to the STT service error occurs.
Example::
@tts.event_handler("on_connected")
async def on_connected(tts: TTSService):
logger.debug(f"TTS connected")
@tts.event_handler("on_disconnected")
async def on_disconnected(tts: TTSService):
logger.debug(f"TTS disconnected")
@tts.event_handler("on_connection_error")
async def on_connection_error(stt: TTSService, error: str):
logger.error(f"TTS connection error: {error}")
"""
def __init__(
@@ -162,10 +143,6 @@ class TTSService(AIService):
self._processing_text: bool = False
self._register_event_handler("on_connected")
self._register_event_handler("on_disconnected")
self._register_event_handler("on_connection_error")
@property
def sample_rate(self) -> int:
"""Get the current sample rate for audio output.
@@ -649,6 +626,7 @@ class WebsocketTTSService(TTSService, WebsocketService):
"""
TTSService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
@@ -700,6 +678,15 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
"""Base class for websocket-based TTS services that support word timestamps.
Combines word timestamp functionality with websocket connectivity.
Event handlers:
on_connection_error: Called when a websocket connection error occurs.
Example::
@tts.event_handler("on_connection_error")
async def on_connection_error(tts: TTSService, error: str):
logger.error(f"TTS connection error: {error}")
"""
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
@@ -711,6 +698,7 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
"""
WordTTSService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)

View File

@@ -232,9 +232,6 @@ class BaseInputTransport(FrameProcessor):
"""
# Cancel and wait for the audio input task to finish.
await self._cancel_audio_task()
# Stop audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.stop()
async def set_transport_ready(self, frame: StartFrame):
"""Called when the transport is ready to stream.

View File

@@ -293,15 +293,15 @@ class BaseOutputTransport(FrameProcessor):
"""
await super().process_frame(frame, direction)
#
# System frames (like InterruptionFrame) are pushed immediately. Other
# frames require order so they are put in the sink queue.
#
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self.start(frame)
elif isinstance(frame, EndFrame):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self.push_frame(frame, direction)
@@ -314,6 +314,21 @@ class BaseOutputTransport(FrameProcessor):
await self.write_dtmf(frame)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames.
elif isinstance(frame, EndFrame):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, MixerControlFrame):
await self._handle_frame(frame)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
await self._handle_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
await self._handle_frame(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
await self._handle_frame(frame)
elif direction == FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
else:
@@ -395,13 +410,6 @@ class BaseOutputTransport(FrameProcessor):
# Indicates if the bot is currently speaking.
self._bot_speaking = False
# Last time a BotSpeakingFrame was pushed.
self._bot_speaking_frame_time = 0
# How often a BotSpeakingFrame should be pushed (value should be
# lower than the audio chunks).
self._bot_speaking_frame_period = 0.2
# Last time the bot actually spoke.
self._bot_speech_last_time = 0
self._audio_task: Optional[asyncio.Task] = None
self._video_task: Optional[asyncio.Task] = None
@@ -593,71 +601,39 @@ class BaseOutputTransport(FrameProcessor):
async def _bot_started_speaking(self):
"""Handle bot started speaking event."""
if self._bot_speaking:
return
if not self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
)
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
)
downstream_frame = BotStartedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
downstream_frame = BotStartedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = True
self._bot_speaking = True
async def _bot_stopped_speaking(self):
"""Handle bot stopped speaking event."""
if not self._bot_speaking:
return
if self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
)
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
)
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = False
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def _bot_currently_speaking(self):
"""Handle bot speaking event."""
await self._bot_started_speaking()
diff_time = time.time() - self._bot_speaking_frame_time
if diff_time >= self._bot_speaking_frame_period:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking_frame_time = time.time()
self._bot_speech_last_time = time.time()
async def _maybe_bot_currently_speaking(self, frame: SpeechOutputAudioRawFrame):
if not is_silence(frame.audio):
await self._bot_currently_speaking()
else:
silence_duration = time.time() - self._bot_speech_last_time
if silence_duration > BOT_VAD_STOP_SECS:
await self._bot_stopped_speaking()
async def _handle_bot_speech(self, frame: Frame):
# TTS case.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_currently_speaking()
# Speech stream case.
elif isinstance(frame, SpeechOutputAudioRawFrame):
await self._maybe_bot_currently_speaking(frame)
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def _handle_frame(self, frame: Frame):
"""Handle various frame types with appropriate processing.
@@ -665,9 +641,7 @@ class BaseOutputTransport(FrameProcessor):
Args:
frame: The frame to handle.
"""
if isinstance(frame, OutputAudioRawFrame):
await self._handle_bot_speech(frame)
elif isinstance(frame, OutputImageRawFrame):
if isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
@@ -731,7 +705,39 @@ class BaseOutputTransport(FrameProcessor):
async def _audio_task_handler(self):
"""Main audio processing task handler."""
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
# every audio chunk.
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
bot_speaking_counter = 0
speech_last_speaking_time = 0
async for frame in self._next_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
is_speaking = False
if isinstance(frame, TTSAudioRawFrame):
is_speaking = True
elif isinstance(frame, SpeechOutputAudioRawFrame):
if not is_silence(frame.audio):
is_speaking = True
speech_last_speaking_time = time.time()
else:
silence_duration = time.time() - speech_last_speaking_time
if silence_duration > BOT_VAD_STOP_SECS:
await self._bot_stopped_speaking()
if is_speaking:
await self._bot_started_speaking()
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(
BotSpeakingFrame(), FrameDirection.UPSTREAM
)
bot_speaking_counter = 0
bot_speaking_counter += 1
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
break

View File

@@ -689,8 +689,3 @@ class SmallWebRTCConnection(BaseObject):
)()
if track:
track.set_enabled(signalling_message.enabled)
async def add_ice_candidate(self, candidate):
"""Handle incoming ICE candidates."""
logger.debug(f"Adding remote candidate: {candidate}")
await self.pc.addIceCandidate(candidate)

View File

@@ -14,7 +14,6 @@ from dataclasses import dataclass
from enum import Enum
from typing import Any, Awaitable, Callable, Dict, List, Optional
from aiortc.sdp import candidate_from_sdp
from fastapi import HTTPException
from loguru import logger
@@ -40,34 +39,6 @@ class SmallWebRTCRequest:
request_data: Optional[Any] = None
@dataclass
class IceCandidate:
"""The remote ice candidate object received from the peer connection.
Parameters:
candidate: The ice candidate patch SDP string (Session Description Protocol).
sdp_mid: The SDP mid for the candidate patch.
sdp_mline_index: The SDP mline index for the candidate patch.
"""
candidate: str
sdp_mid: str
sdp_mline_index: int
@dataclass
class SmallWebRTCPatchRequest:
"""Small WebRTC transport session arguments for the runner.
Parameters:
pc_id: Identifier for the peer connection.
candidates: A list of ICE candidate patches.
"""
pc_id: str
candidates: List[IceCandidate]
class ConnectionMode(Enum):
"""Enum defining the connection handling modes."""
@@ -226,19 +197,6 @@ class SmallWebRTCRequestHandler:
logger.debug(f"SmallWebRTC request details: {request}")
raise
async def handle_patch_request(self, request: SmallWebRTCPatchRequest):
"""Handle a SmallWebRTC patch candidate request."""
peer_connection = self._pcs_map.get(request.pc_id)
if not peer_connection:
raise HTTPException(status_code=404, detail="Peer connection not found")
for c in request.candidates:
candidate = candidate_from_sdp(c.candidate)
candidate.sdpMid = c.sdp_mid
candidate.sdpMLineIndex = c.sdp_mline_index
await peer_connection.add_ice_candidate(candidate)
async def close(self):
"""Clear the connection map."""
coros = [pc.disconnect() for pc in self._pcs_map.values()]

View File

@@ -254,7 +254,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
try:
await asyncio.wait_for(
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
timeout=1.0,
)
except asyncio.TimeoutError:
@@ -290,7 +290,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.queue_frame(TextFrame(text="Hello!"))
try:
await asyncio.wait_for(
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
timeout=1.0,
)
except asyncio.TimeoutError:
@@ -301,8 +301,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2)
# This shouldn't freeze, so nothing to check really.
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
try:
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert False
except asyncio.CancelledError:
assert True
async def test_no_idle_task(self):
identity = IdentityFilter()
@@ -310,7 +313,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
try:
await asyncio.wait_for(
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
timeout=0.3,
)
except asyncio.TimeoutError:
@@ -329,7 +332,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
),
idle_timeout_secs=0.3,
)
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
try:
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert False
except asyncio.CancelledError:
assert True
async def test_idle_task_event_handler_no_frames(self):
identity = IdentityFilter()
@@ -344,8 +351,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
idle_timeout = True
await task.cancel()
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert idle_timeout
try:
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert False
except asyncio.CancelledError:
assert idle_timeout
async def test_idle_task_event_handler_quiet_user(self):
identity = IdentityFilter()
@@ -406,15 +416,12 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
asyncio.create_task(delayed_frames()),
]
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
diff_time = time.time() - start_time
self.assertGreater(diff_time, sleep_time_secs * 3)
# Wait for the pending tasks to complete.
await asyncio.gather(*pending)
async def test_task_cancel_timeout(self):
class CancelFilter(FrameProcessor):
def __init__(self, **kwargs):

8320
uv.lock generated

File diff suppressed because it is too large Load Diff