Compare commits
69 Commits
hush/usage
...
hush/backo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d41ee5319e | ||
|
|
45256903b5 | ||
|
|
d37e63d972 | ||
|
|
013f869259 | ||
|
|
11594003e2 | ||
|
|
2dd2a17b19 | ||
|
|
7aa01c1ca8 | ||
|
|
4d6356748f | ||
|
|
5b1a182421 | ||
|
|
6ac0c34413 | ||
|
|
c115422dbf | ||
|
|
a2a973be27 | ||
|
|
0407744950 | ||
|
|
7ce370ccc6 | ||
|
|
a4867f61aa | ||
|
|
a67a765783 | ||
|
|
81221668b1 | ||
|
|
cc9c264940 | ||
|
|
f2c61ac9fd | ||
|
|
88f8c10f63 | ||
|
|
855f4842dd | ||
|
|
2bf44fe2af | ||
|
|
3e8a7cc254 | ||
|
|
a600c05570 | ||
|
|
3ba6b55659 | ||
|
|
d5f2dcfac0 | ||
|
|
d12134038b | ||
|
|
a22af3a7e0 | ||
|
|
76e07c6c48 | ||
|
|
8d8503bca7 | ||
|
|
a444097060 | ||
|
|
1b9e96c016 | ||
|
|
7967bc53c3 | ||
|
|
6381335346 | ||
|
|
0fd5d26104 | ||
|
|
41f817bf04 | ||
|
|
27115e6565 | ||
|
|
3c4807d7d4 | ||
|
|
8902f1dc94 | ||
|
|
a25333ee51 | ||
|
|
82c7d7ad83 | ||
|
|
ba2ab51ef7 | ||
|
|
22557fa668 | ||
|
|
3fbf59e7c6 | ||
|
|
129ab5ea0e | ||
|
|
dc917523d0 | ||
|
|
5ea7cc9d32 | ||
|
|
e11ede475b | ||
|
|
90d29e04af | ||
|
|
4c67136a8d | ||
|
|
9d78402a33 | ||
|
|
73877218e9 | ||
|
|
6a1be90cbb | ||
|
|
fbac959ecb | ||
|
|
18dd85431c | ||
|
|
abc569b3d2 | ||
|
|
fa5d4ecf86 | ||
|
|
83b0dc39f7 | ||
|
|
0c31b5ef19 | ||
|
|
d16c36c56d | ||
|
|
8fe3bcd484 | ||
|
|
be2858bfbb | ||
|
|
b6b0997553 | ||
|
|
3b751322d3 | ||
|
|
cc66ac14f1 | ||
|
|
9ddec0f8b4 | ||
|
|
9babfe9fd9 | ||
|
|
21d8d148b8 | ||
|
|
7c1e2793c5 |
63
CHANGELOG.md
63
CHANGELOG.md
@@ -9,15 +9,78 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
|
||||
|
||||
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
|
||||
|
||||
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
|
||||
-
|
||||
- Added support for trickle ICE to the `SmallWebRTCTransport`.
|
||||
|
||||
- Added support for updating `OpenAITTSService` settings (`instructions` and
|
||||
`speed`) at runtime via `TTSUpdateSettingsFrame`.
|
||||
|
||||
- Added `--whatsapp` flag to runner to better surface WhatsApp transport logs.
|
||||
|
||||
- Added `on_connected` and `on_disconnected` events to TTS and STT
|
||||
websocket-based services.
|
||||
|
||||
- Added an `aggregate_sentences` arg in `ElevenLabsHttpTTSService`, where the
|
||||
default value is True.
|
||||
|
||||
- Added a `room_properties` arg to the Daily runner's `configure()` method,
|
||||
allowing `DailyRoomProperties` to be provided.
|
||||
|
||||
- The runner `--folder` argument now supports downloading files from
|
||||
subdirectories.
|
||||
|
||||
### Changed
|
||||
|
||||
- `CartesiaSTTService` now inherits from `WebsocketSTTService`.
|
||||
|
||||
- Package upgrades:
|
||||
|
||||
- `openai` upgraded to support up to 2.x.x.
|
||||
- `openpipe` upgraded to support up to 5.x.x.
|
||||
|
||||
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
|
||||
to a mismatch in the _handle_transcription method's signature.
|
||||
|
||||
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
|
||||
now handled properly in `PipelineTask` making it possible to cancel an asyncio
|
||||
task that it's executing a `PipelineRunner` cleanly. Also,
|
||||
`PipelineTask.cancel()` does not block anymore waiting for the `CancelFrame`
|
||||
to reach the end of the pipeline (going back to the behavior in < 0.0.83).
|
||||
|
||||
- Fixed an issue in `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` where
|
||||
the Flash models would split words, resulting in a space being inserted
|
||||
between words.
|
||||
|
||||
- Fixed an issue where audio filters' `stop()` would not be called when using
|
||||
`CancelFrame`.
|
||||
|
||||
- Fixed an issue in `ElevenLabsHttpTTSService`, where
|
||||
`apply_text_normalization` was incorrectly set as a query parameter. It's now
|
||||
being added as a request parameter.
|
||||
|
||||
- Fixed an issue where `RimeHttpTTSService` and `PiperTTSService` could generate
|
||||
incorrectly 16-bit aligned audio frames, potentially leading to internal
|
||||
errors or static audio.
|
||||
|
||||
- Fixed an issue in `SpeechmaticsSTTService` where `AdditionalVocabEntry` items
|
||||
needed to have `sounds_like` for the session to start.
|
||||
|
||||
### Other
|
||||
|
||||
- Added foundational example `47-sentry-metrics.py`, demonstrating how to use the
|
||||
`SentryMetrics` processor.
|
||||
|
||||
- Added foundational example `14x-function-calling-openpipe.py`.
|
||||
|
||||
## [0.0.90] - 2025-10-10
|
||||
|
||||
### Added
|
||||
|
||||
26
README.md
26
README.md
@@ -63,24 +63,24 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
|
||||
<br/>
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/moondream-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/moondream-chatbot/image.png" width="400" /></a>
|
||||
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/12-describe-video.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/assets/moondream.png" width="400" /></a>
|
||||
</p>
|
||||
|
||||
## 🧩 Available services
|
||||
|
||||
| Category | Services |
|
||||
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
|
||||
| Category | Services |
|
||||
| ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
|
||||
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
|
||||
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
|
||||
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
|
||||
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
|
||||
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
|
||||
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
|
||||
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
|
||||
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
|
||||
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
|
||||
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
|
||||
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
|
||||
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
|
||||
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
|
||||
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
|
||||
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
|
||||
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
|
||||
|
||||
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
|
||||
|
||||
|
||||
250
create_daily_room.py
Executable file
250
create_daily_room.py
Executable file
@@ -0,0 +1,250 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for creating Daily.co rooms with retry logic.
|
||||
|
||||
This module provides functions to create Daily rooms via REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def periodic_progress_logger(
|
||||
progress_dict: Dict[str, int],
|
||||
total: int,
|
||||
interval_seconds: float = 5.0,
|
||||
stop_event: Optional[asyncio.Event] = None,
|
||||
):
|
||||
"""Log progress periodically in the background.
|
||||
|
||||
Args:
|
||||
progress_dict: Shared dict with 'completed' and 'failed' counts
|
||||
total: Total number of items being processed
|
||||
interval_seconds: How often to log progress (default 5 seconds)
|
||||
stop_event: Event to signal when to stop logging
|
||||
"""
|
||||
if stop_event is None:
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
while not stop_event.is_set():
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
total_processed = progress_dict["completed"] + progress_dict["failed"]
|
||||
if total_processed > 0:
|
||||
percentage = (total_processed / total) * 100
|
||||
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
|
||||
|
||||
logger.info(
|
||||
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
|
||||
f"✅ {progress_dict['completed']} succeeded, "
|
||||
f"❌ {progress_dict['failed']} failed"
|
||||
)
|
||||
|
||||
|
||||
async def create_daily_room(
|
||||
name: Optional[str] = None,
|
||||
privacy: str = "public",
|
||||
exp_minutes: int = 10,
|
||||
max_retries: int = 5,
|
||||
) -> Optional[Dict]:
|
||||
"""Create a Daily room with automatic retry on rate limit errors.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) with
|
||||
exponential backoff and automatic retries.
|
||||
|
||||
Args:
|
||||
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
|
||||
privacy: Room privacy setting ("public" or "private")
|
||||
exp_minutes: Minutes until room expires (default 10)
|
||||
max_retries: Maximum number of retry attempts on rate limit (default 5)
|
||||
|
||||
Returns:
|
||||
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
|
||||
"""
|
||||
# Calculate expiration timestamp (unix timestamp in seconds)
|
||||
exp_timestamp = int(time.time()) + (exp_minutes * 60)
|
||||
|
||||
# Build request body
|
||||
body = {
|
||||
"privacy": privacy,
|
||||
"properties": {
|
||||
"exp": exp_timestamp,
|
||||
},
|
||||
}
|
||||
|
||||
if name:
|
||||
body["name"] = name
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type(HTTPStatusError),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
async with AsyncClient(timeout=30) as client:
|
||||
response = await client.post(
|
||||
url="https://api.daily.co/v1/rooms",
|
||||
headers=headers,
|
||||
json=body,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error creating room: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def create_room_with_progress(
|
||||
index: int, total: int, progress_dict: Dict[str, int], **kwargs
|
||||
) -> Optional[Dict]:
|
||||
"""Wrapper for create_daily_room that tracks progress.
|
||||
|
||||
Args:
|
||||
index: Index of this room creation (0-based)
|
||||
total: Total number of rooms being created
|
||||
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
|
||||
**kwargs: Arguments passed to create_daily_room
|
||||
|
||||
Returns:
|
||||
Room object dict or None
|
||||
"""
|
||||
result = await create_daily_room(**kwargs)
|
||||
|
||||
# Update progress
|
||||
if result is not None:
|
||||
progress_dict["completed"] += 1
|
||||
else:
|
||||
progress_dict["failed"] += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def test_create_rooms(
|
||||
num_rooms: int = 1000,
|
||||
progress_interval: float = 5.0,
|
||||
) -> Dict[str, int | float]:
|
||||
"""Attempt to create multiple Daily rooms concurrently.
|
||||
|
||||
This function demonstrates concurrent room creation and tracks
|
||||
success/failure statistics. Rate limiting will likely occur when
|
||||
creating many rooms quickly.
|
||||
|
||||
Args:
|
||||
num_rooms: Number of rooms to attempt to create (default 1000)
|
||||
progress_interval: How often to log progress in seconds (default 5.0)
|
||||
|
||||
Returns:
|
||||
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
|
||||
"""
|
||||
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
|
||||
start_time = time.time()
|
||||
|
||||
# Shared progress tracking dictionary
|
||||
progress_dict = {"completed": 0, "failed": 0}
|
||||
|
||||
# Start background progress logger
|
||||
stop_event = asyncio.Event()
|
||||
progress_task = asyncio.create_task(
|
||||
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
|
||||
)
|
||||
|
||||
# Create and execute all tasks concurrently
|
||||
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
|
||||
tasks = [
|
||||
create_room_with_progress(
|
||||
index=i,
|
||||
total=num_rooms,
|
||||
progress_dict=progress_dict,
|
||||
name=None, # Auto-generate names
|
||||
privacy="public",
|
||||
exp_minutes=10,
|
||||
max_retries=5,
|
||||
)
|
||||
for i in range(num_rooms)
|
||||
]
|
||||
|
||||
try:
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
finally:
|
||||
# Stop the progress logger
|
||||
stop_event.set()
|
||||
await progress_task
|
||||
|
||||
# Count successes and failures
|
||||
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
|
||||
failed_count = num_rooms - success_count
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Log statistics
|
||||
logger.info("=" * 60)
|
||||
logger.info("BULK ROOM CREATION SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms attempted: {num_rooms}")
|
||||
logger.info(f"Successfully created: {success_count}")
|
||||
logger.info(f"Failed to create: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
|
||||
logger.info(f"Total time: {elapsed_time:.2f} seconds")
|
||||
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
|
||||
logger.info("=" * 60)
|
||||
|
||||
return {
|
||||
"success": success_count,
|
||||
"failed": failed_count,
|
||||
"total": num_rooms,
|
||||
"elapsed_seconds": elapsed_time,
|
||||
}
|
||||
|
||||
|
||||
# Example usage
|
||||
async def main():
|
||||
"""Example usage of the room creation functions."""
|
||||
# Test creating a single room
|
||||
logger.info("Testing single room creation...")
|
||||
room = await create_daily_room(exp_minutes=10)
|
||||
if room:
|
||||
logger.info(f"Created room: {room['name']} at {room['url']}")
|
||||
else:
|
||||
logger.error("Failed to create room")
|
||||
|
||||
# Uncomment to test bulk creation (warning: may hit rate limits!)
|
||||
logger.info("\nTesting bulk room creation...")
|
||||
stats = await test_create_rooms(num_rooms=1000)
|
||||
logger.info(f"Final stats: {stats}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -21,8 +21,8 @@ from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.stt import CartesiaSTTService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
@@ -58,7 +58,7 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
|
||||
@@ -48,10 +48,7 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = CartesiaSTTService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
base_url=os.getenv("CARTESIA_BASE_URL"),
|
||||
)
|
||||
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
|
||||
|
||||
tl = TranscriptionLogger()
|
||||
|
||||
|
||||
182
examples/foundational/14x-function-calling-openpipe.py
Normal file
182
examples/foundational/14x-function-calling-openpipe.py
Normal file
@@ -0,0 +1,182 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openpipe.llm import OpenPipeLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
timestamp = int(time.time())
|
||||
llm = OpenPipeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
|
||||
tags={"conversation_id": f"pipecat-{timestamp}"},
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
142
examples/foundational/47-sentry-metrics.py
Normal file
142
examples/foundational/47-sentry-metrics.py
Normal file
@@ -0,0 +1,142 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
import sentry_sdk
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.metrics.sentry import SentryMetrics
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# Initialize Sentry
|
||||
sentry_sdk.init(
|
||||
dsn=os.getenv("SENTRY_DSN"),
|
||||
traces_sample_rate=1.0,
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
metrics=SentryMetrics(),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
metrics=SentryMetrics(),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
metrics=SentryMetrics(),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
BIN
examples/foundational/assets/moondream.png
Normal file
BIN
examples/foundational/assets/moondream.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.1 MiB |
267
fetch_s3_recording.py
Executable file
267
fetch_s3_recording.py
Executable file
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_recordings(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent recording IDs from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of recordings to retrieve (default 100)
|
||||
|
||||
Returns:
|
||||
List of recording IDs (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url="https://api.daily.co/v1/recordings",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
recordings = data.get("data", [])
|
||||
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
|
||||
|
||||
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
|
||||
return recording_ids
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get recordings from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent recordings."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 recordings
|
||||
logger.info("Fetching recent recordings...")
|
||||
recording_ids = await get_recent_recordings(limit=100)
|
||||
|
||||
if not recording_ids:
|
||||
logger.error("No recordings found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(recording_ids)} recordings to fetch")
|
||||
|
||||
# Fetch access links for each recording concurrently
|
||||
logger.info(
|
||||
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
|
||||
)
|
||||
|
||||
# Create tasks for all recordings
|
||||
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Get download link for a specific recording ID."""
|
||||
try:
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None)
|
||||
|
||||
return (recording_url, recording_url)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
|
||||
return (None, None)
|
||||
|
||||
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(
|
||||
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
|
||||
)
|
||||
logger.debug(f" URL: {recording_url}")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(recording_ids)}] No link for {recording_id}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total recordings checked: {len(recording_ids)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
238
fetch_s3_recording_by_room.py
Executable file
238
fetch_s3_recording_by_room.py
Executable file
@@ -0,0 +1,238 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_rooms(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent room names from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of rooms to retrieve (default 100, max 100)
|
||||
|
||||
Returns:
|
||||
List of room names (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
rooms = data.get("data", [])
|
||||
room_names = [room.get("name") for room in rooms if room.get("name")]
|
||||
|
||||
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
|
||||
return room_names
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get rooms from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent rooms."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 rooms
|
||||
logger.info("Fetching recent rooms...")
|
||||
room_names = await get_recent_rooms(limit=100)
|
||||
|
||||
if not room_names:
|
||||
logger.error("No rooms found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(room_names)} rooms to check for recordings")
|
||||
|
||||
# Call get_recording_s3_url_with_retry on each room concurrently
|
||||
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
|
||||
|
||||
# Create tasks for all rooms
|
||||
tasks = [
|
||||
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
|
||||
for room_name in room_names
|
||||
]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
|
||||
logger.debug(f" URL: {recording_url[:80]}...")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(room_names)}] No recording for {room_name}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms checked: {len(room_names)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -34,7 +34,7 @@ dependencies = [
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0",
|
||||
"openai>=1.74.0,<=1.99.1",
|
||||
"openai>=1.74.0,<3",
|
||||
# Pinning numba to resolve package dependencies
|
||||
"numba==0.61.2",
|
||||
"wait_for2>=0.4.1; python_version<'3.12'",
|
||||
@@ -84,7 +84,7 @@ nim = []
|
||||
neuphonic = [ "pipecat-ai[websockets-base]" ]
|
||||
noisereduce = [ "noisereduce~=3.0.3" ]
|
||||
openai = [ "pipecat-ai[websockets-base]" ]
|
||||
openpipe = [ "openpipe~=4.50.0" ]
|
||||
openpipe = [ "openpipe>=4.50.0,<6" ]
|
||||
openrouter = []
|
||||
perplexity = []
|
||||
playht = [ "pipecat-ai[websockets-base]" ]
|
||||
@@ -102,7 +102,7 @@ silero = [ "onnxruntime>=1.20.1,<2" ]
|
||||
simli = [ "simli-ai~=0.1.10"]
|
||||
soniox = [ "pipecat-ai[websockets-base]" ]
|
||||
soundfile = [ "soundfile~=0.13.0" ]
|
||||
speechmatics = [ "speechmatics-rt>=0.4.0" ]
|
||||
speechmatics = [ "speechmatics-rt>=0.5.0" ]
|
||||
strands = [ "strands-agents>=1.9.1,<2" ]
|
||||
tavus=[]
|
||||
together = []
|
||||
|
||||
@@ -136,6 +136,7 @@ TESTS_14 = [
|
||||
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
|
||||
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
|
||||
("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
|
||||
("14x-function-calling-openpipe.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
|
||||
# Currently not working.
|
||||
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
|
||||
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
|
||||
|
||||
@@ -70,11 +70,15 @@ class PipelineRunner(BaseObject):
|
||||
"""
|
||||
logger.debug(f"Runner {self} started running {task}")
|
||||
self._tasks[task.name] = task
|
||||
params = PipelineTaskParams(loop=self._loop)
|
||||
|
||||
# PipelineTask handles asyncio.CancelledError to shutdown the pipeline
|
||||
# properly and re-raises it in case there's more cleanup to do.
|
||||
try:
|
||||
params = PipelineTaskParams(loop=self._loop)
|
||||
await task.run(params)
|
||||
except asyncio.CancelledError:
|
||||
await self._cancel()
|
||||
pass
|
||||
|
||||
del self._tasks[task.name]
|
||||
|
||||
# Cleanup base object.
|
||||
|
||||
@@ -269,6 +269,9 @@ class PipelineTask(BasePipelineTask):
|
||||
# StopFrame) has been received at the end of the pipeline.
|
||||
self._pipeline_end_event = asyncio.Event()
|
||||
|
||||
# This event is set when the pipeline truly finishes.
|
||||
self._pipeline_finished_event = asyncio.Event()
|
||||
|
||||
# This is the final pipeline. It is composed of a source processor,
|
||||
# followed by the user pipeline, and ending with a sink processor. The
|
||||
# source allows us to receive and react to upstream frames, and the sink
|
||||
@@ -401,11 +404,7 @@ class PipelineTask(BasePipelineTask):
|
||||
await self.queue_frame(EndFrame())
|
||||
|
||||
async def cancel(self):
|
||||
"""Immediately stop the running pipeline.
|
||||
|
||||
Cancels all running tasks and stops frame processing without
|
||||
waiting for completion.
|
||||
"""
|
||||
"""Request the running pipeline to cancel."""
|
||||
if not self._finished:
|
||||
await self._cancel()
|
||||
|
||||
@@ -417,51 +416,38 @@ class PipelineTask(BasePipelineTask):
|
||||
"""
|
||||
if self.has_finished():
|
||||
return
|
||||
cleanup_pipeline = True
|
||||
|
||||
# Setup processors.
|
||||
await self._setup(params)
|
||||
|
||||
# Create all main tasks and wait for the main push task. This is the
|
||||
# task that pushes frames to the very beginning of our pipeline (i.e. to
|
||||
# our controlled source processor).
|
||||
await self._create_tasks()
|
||||
|
||||
try:
|
||||
# Setup processors.
|
||||
await self._setup(params)
|
||||
|
||||
# Create all main tasks and wait of the main push task. This is the
|
||||
# task that pushes frames to the very beginning of our pipeline (our
|
||||
# controlled source processor).
|
||||
push_task = await self._create_tasks()
|
||||
await push_task
|
||||
|
||||
# We have already cleaned up the pipeline inside the task.
|
||||
cleanup_pipeline = False
|
||||
|
||||
# Pipeline has finished nicely.
|
||||
self._finished = True
|
||||
# Wait for pipeline to finish.
|
||||
await self._wait_for_pipeline_finished()
|
||||
except asyncio.CancelledError:
|
||||
# Raise exception back to the pipeline runner so it can cancel this
|
||||
# task properly.
|
||||
logger.debug(f"Pipeline task {self} got cancelled from outside...")
|
||||
# We have been cancelled from outside, let's just cancel everything.
|
||||
await self._cancel()
|
||||
# Wait again for pipeline to finish. This time we have really
|
||||
# cancelled, so it should really finish.
|
||||
await self._wait_for_pipeline_finished()
|
||||
# Re-raise in case there's more cleanup to do.
|
||||
raise
|
||||
finally:
|
||||
# We can reach this point for different reasons:
|
||||
#
|
||||
# 1. The task has finished properly (e.g. `EndFrame`).
|
||||
# 2. By calling `PipelineTask.cancel()`.
|
||||
# 3. By asyncio task cancellation.
|
||||
#
|
||||
# Case (1) will execute the code below without issues because
|
||||
# `self._finished` is true.
|
||||
#
|
||||
# Case (2) will execute the code below without issues because
|
||||
# `self._cancelled` is true.
|
||||
#
|
||||
# Case (3) will raise the exception above (because we are cancelling
|
||||
# the asyncio task). This will be then captured by the
|
||||
# `PipelineRunner` which will call `PipelineTask.cancel()` and
|
||||
# therefore becoming case (2).
|
||||
if self._finished or self._cancelled:
|
||||
logger.debug(f"Pipeline task {self} is finishing cleanup...")
|
||||
await self._cancel_tasks()
|
||||
await self._cleanup(cleanup_pipeline)
|
||||
if self._check_dangling_tasks:
|
||||
self._print_dangling_tasks()
|
||||
self._finished = True
|
||||
logger.debug(f"Pipeline task {self} has finished")
|
||||
# 1. The pipeline task has finished (try case).
|
||||
# 2. By an asyncio task cancellation (except case).
|
||||
logger.debug(f"Pipeline task {self} is finishing...")
|
||||
await self._cancel_tasks()
|
||||
if self._check_dangling_tasks:
|
||||
self._print_dangling_tasks()
|
||||
self._finished = True
|
||||
logger.debug(f"Pipeline task {self} has finished")
|
||||
|
||||
async def queue_frame(self, frame: Frame):
|
||||
"""Queue a single frame to be pushed down the pipeline.
|
||||
@@ -489,19 +475,7 @@ class PipelineTask(BasePipelineTask):
|
||||
if not self._cancelled:
|
||||
logger.debug(f"Cancelling pipeline task {self}")
|
||||
self._cancelled = True
|
||||
cancel_frame = CancelFrame()
|
||||
# Make sure everything is cleaned up downstream. This is sent
|
||||
# out-of-band from the main streaming task which is what we want since
|
||||
# we want to cancel right away.
|
||||
await self._pipeline.queue_frame(cancel_frame)
|
||||
# Wait for CancelFrame to make it through the pipeline.
|
||||
await self._wait_for_pipeline_end(cancel_frame)
|
||||
# Only cancel the push task, we don't want to be able to process any
|
||||
# other frame after cancel. Everything else will be cancelled in
|
||||
# run().
|
||||
if self._process_push_task:
|
||||
await self._task_manager.cancel_task(self._process_push_task)
|
||||
self._process_push_task = None
|
||||
await self.queue_frame(CancelFrame())
|
||||
|
||||
async def _create_tasks(self):
|
||||
"""Create and start all pipeline processing tasks."""
|
||||
@@ -603,6 +577,17 @@ class PipelineTask(BasePipelineTask):
|
||||
|
||||
self._pipeline_end_event.clear()
|
||||
|
||||
# We are really done.
|
||||
self._pipeline_finished_event.set()
|
||||
|
||||
async def _wait_for_pipeline_finished(self):
|
||||
await self._pipeline_finished_event.wait()
|
||||
self._pipeline_finished_event.clear()
|
||||
# Make sure we wait for the main task to complete.
|
||||
if self._process_push_task:
|
||||
await self._process_push_task
|
||||
self._process_push_task = None
|
||||
|
||||
async def _setup(self, params: PipelineTaskParams):
|
||||
"""Set up the pipeline task and all processors."""
|
||||
mgr_params = TaskManagerParams(loop=params.loop)
|
||||
|
||||
@@ -82,6 +82,7 @@ async def configure(
|
||||
sip_enable_video: Optional[bool] = False,
|
||||
sip_num_endpoints: Optional[int] = 1,
|
||||
sip_codecs: Optional[Dict[str, List[str]]] = None,
|
||||
room_properties: Optional[DailyRoomProperties] = None,
|
||||
) -> DailyRoomConfig:
|
||||
"""Configure Daily room URL and token with optional SIP capabilities.
|
||||
|
||||
@@ -99,6 +100,10 @@ async def configure(
|
||||
sip_num_endpoints: Number of allowed SIP endpoints.
|
||||
sip_codecs: Codecs to support for audio and video. If None, uses Daily defaults.
|
||||
Example: {"audio": ["OPUS"], "video": ["H264"]}
|
||||
room_properties: Optional DailyRoomProperties to use instead of building from
|
||||
individual parameters. When provided, this overrides room_exp_duration and
|
||||
SIP-related parameters. If not provided, properties are built from the
|
||||
individual parameters as before.
|
||||
|
||||
Returns:
|
||||
DailyRoomConfig: Object with room_url, token, and optional sip_endpoint.
|
||||
@@ -115,6 +120,13 @@ async def configure(
|
||||
# SIP-enabled room
|
||||
sip_config = await configure(session, sip_caller_phone="+15551234567")
|
||||
print(f"SIP endpoint: {sip_config.sip_endpoint}")
|
||||
|
||||
# Custom room properties with recording enabled
|
||||
custom_props = DailyRoomProperties(
|
||||
enable_recording="cloud",
|
||||
max_participants=2,
|
||||
)
|
||||
config = await configure(session, room_properties=custom_props)
|
||||
"""
|
||||
# Check for required API key
|
||||
api_key = os.getenv("DAILY_API_KEY")
|
||||
@@ -124,9 +136,32 @@ async def configure(
|
||||
"Get your API key from https://dashboard.daily.co/developers"
|
||||
)
|
||||
|
||||
# Warn if both room_properties and individual parameters are provided
|
||||
if room_properties is not None:
|
||||
individual_params_provided = any(
|
||||
[
|
||||
room_exp_duration != 2.0,
|
||||
token_exp_duration != 2.0,
|
||||
sip_caller_phone is not None,
|
||||
sip_enable_video is not False,
|
||||
sip_num_endpoints != 1,
|
||||
sip_codecs is not None,
|
||||
]
|
||||
)
|
||||
if individual_params_provided:
|
||||
logger.warning(
|
||||
"Both room_properties and individual parameters (room_exp_duration, token_exp_duration, "
|
||||
"sip_*) were provided. The room_properties will be used and individual parameters "
|
||||
"will be ignored."
|
||||
)
|
||||
|
||||
# Determine if SIP mode is enabled
|
||||
sip_enabled = sip_caller_phone is not None
|
||||
|
||||
# If room_properties is provided, check if it has SIP configuration
|
||||
if room_properties and room_properties.sip:
|
||||
sip_enabled = True
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=api_key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
@@ -150,27 +185,29 @@ async def configure(
|
||||
room_name = f"{room_prefix}-{uuid.uuid4().hex[:8]}"
|
||||
logger.info(f"Creating new Daily room: {room_name}")
|
||||
|
||||
# Calculate expiration time
|
||||
expiration_time = time.time() + (room_exp_duration * 60 * 60)
|
||||
# Use provided room_properties or build from parameters
|
||||
if room_properties is None:
|
||||
# Calculate expiration time
|
||||
expiration_time = time.time() + (room_exp_duration * 60 * 60)
|
||||
|
||||
# Create room properties
|
||||
room_properties = DailyRoomProperties(
|
||||
exp=expiration_time,
|
||||
eject_at_room_exp=True,
|
||||
)
|
||||
|
||||
# Add SIP configuration if enabled
|
||||
if sip_enabled:
|
||||
sip_params = DailyRoomSipParams(
|
||||
display_name=sip_caller_phone,
|
||||
video=sip_enable_video,
|
||||
sip_mode="dial-in",
|
||||
num_endpoints=sip_num_endpoints,
|
||||
codecs=sip_codecs,
|
||||
# Create room properties
|
||||
room_properties = DailyRoomProperties(
|
||||
exp=expiration_time,
|
||||
eject_at_room_exp=True,
|
||||
)
|
||||
room_properties.sip = sip_params
|
||||
room_properties.enable_dialout = True # Enable outbound calls if needed
|
||||
room_properties.start_video_off = not sip_enable_video # Voice-only by default
|
||||
|
||||
# Add SIP configuration if enabled
|
||||
if sip_enabled:
|
||||
sip_params = DailyRoomSipParams(
|
||||
display_name=sip_caller_phone,
|
||||
video=sip_enable_video,
|
||||
sip_mode="dial-in",
|
||||
num_endpoints=sip_num_endpoints,
|
||||
codecs=sip_codecs,
|
||||
)
|
||||
room_properties.sip = sip_params
|
||||
room_properties.enable_dialout = True # Enable outbound calls if needed
|
||||
room_properties.start_video_off = not sip_enable_video # Voice-only by default
|
||||
|
||||
# Create room parameters
|
||||
room_params = DailyRoomParams(name=room_name, properties=room_properties)
|
||||
|
||||
@@ -70,12 +70,14 @@ import asyncio
|
||||
import mimetypes
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
from http import HTTPMethod
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
import aiohttp
|
||||
from fastapi.responses import FileResponse
|
||||
from fastapi.responses import FileResponse, Response
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
@@ -166,6 +168,7 @@ def _create_server_app(
|
||||
host: str = "localhost",
|
||||
proxy: str,
|
||||
esp32_mode: bool = False,
|
||||
whatsapp_enabled: bool = False,
|
||||
folder: Optional[str] = None,
|
||||
):
|
||||
"""Create FastAPI app with transport-specific routes."""
|
||||
@@ -182,7 +185,8 @@ def _create_server_app(
|
||||
# Set up transport-specific routes
|
||||
if transport_type == "webrtc":
|
||||
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host, folder=folder)
|
||||
_setup_whatsapp_routes(app)
|
||||
if whatsapp_enabled:
|
||||
_setup_whatsapp_routes(app)
|
||||
elif transport_type == "daily":
|
||||
_setup_daily_routes(app)
|
||||
elif transport_type in TELEPHONY_TRANSPORTS:
|
||||
@@ -200,8 +204,10 @@ def _setup_webrtc_routes(
|
||||
try:
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.request_handler import (
|
||||
IceCandidate,
|
||||
SmallWebRTCPatchRequest,
|
||||
SmallWebRTCRequest,
|
||||
SmallWebRTCRequestHandler,
|
||||
)
|
||||
@@ -209,6 +215,16 @@ def _setup_webrtc_routes(
|
||||
logger.error(f"WebRTC transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
class IceConfig(TypedDict):
|
||||
iceServers: List[IceServer]
|
||||
|
||||
class StartBotResult(TypedDict, total=False):
|
||||
sessionId: str
|
||||
iceConfig: Optional[IceConfig]
|
||||
|
||||
# In-memory store of active sessions: session_id -> session info
|
||||
active_sessions: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
# Mount the frontend
|
||||
app.mount("/client", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@@ -254,6 +270,74 @@ def _setup_webrtc_routes(
|
||||
)
|
||||
return answer
|
||||
|
||||
@app.patch("/api/offer")
|
||||
async def ice_candidate(request: SmallWebRTCPatchRequest):
|
||||
"""Handle WebRTC new ice candidate requests."""
|
||||
logger.debug(f"Received patch request: {request}")
|
||||
await small_webrtc_handler.handle_patch_request(request)
|
||||
return {"status": "success"}
|
||||
|
||||
@app.post("/start")
|
||||
async def rtvi_start(request: Request):
|
||||
"""Mimic Pipecat Cloud's /start endpoint."""
|
||||
# Parse the request body
|
||||
try:
|
||||
request_data = await request.json()
|
||||
logger.debug(f"Received request: {request_data}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
|
||||
session_id = str(uuid.uuid4())
|
||||
active_sessions[session_id] = request_data
|
||||
|
||||
result: StartBotResult = {"sessionId": session_id}
|
||||
if request_data.get("enableDefaultIceServers"):
|
||||
result["iceConfig"] = IceConfig(
|
||||
iceServers=[IceServer(urls="stun:stun.l.google.com:19302")]
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
@app.api_route(
|
||||
"/sessions/{session_id}/{path:path}",
|
||||
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
|
||||
)
|
||||
async def proxy_request(
|
||||
session_id: str, path: str, request: Request, background_tasks: BackgroundTasks
|
||||
):
|
||||
"""Mimic Pipecat Cloud's proxy."""
|
||||
active_session = active_sessions.get(session_id)
|
||||
if not active_session:
|
||||
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
|
||||
|
||||
if path.endswith("api/offer"):
|
||||
# Parse the request body and convert to SmallWebRTCRequest
|
||||
try:
|
||||
request_data = await request.json()
|
||||
if request.method == HTTPMethod.POST.value:
|
||||
webrtc_request = SmallWebRTCRequest(
|
||||
sdp=request_data["sdp"],
|
||||
type=request_data["type"],
|
||||
pc_id=request_data.get("pc_id"),
|
||||
restart_pc=request_data.get("restart_pc"),
|
||||
request_data=request_data,
|
||||
)
|
||||
return await offer(webrtc_request, background_tasks)
|
||||
elif request.method == HTTPMethod.PATCH.value:
|
||||
patch_request = SmallWebRTCPatchRequest(
|
||||
pc_id=request_data["pc_id"],
|
||||
candidates=[IceCandidate(**c) for c in request_data.get("candidates", [])],
|
||||
)
|
||||
return await ice_candidate(patch_request)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse WebRTC request: {e}")
|
||||
return Response(content="Invalid WebRTC request", status_code=400)
|
||||
|
||||
logger.info(f"Received request for path: {path}")
|
||||
return Response(status_code=200)
|
||||
|
||||
@asynccontextmanager
|
||||
async def smallwebrtc_lifespan(app: FastAPI):
|
||||
"""Manage FastAPI application lifecycle and cleanup connections."""
|
||||
@@ -289,6 +373,29 @@ def _add_lifespan_to_app(app: FastAPI, new_lifespan):
|
||||
|
||||
def _setup_whatsapp_routes(app: FastAPI):
|
||||
"""Set up WebRTC-specific routes."""
|
||||
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
|
||||
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
|
||||
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
|
||||
|
||||
if not all(
|
||||
[
|
||||
WHATSAPP_APP_SECRET,
|
||||
WHATSAPP_PHONE_NUMBER_ID,
|
||||
WHATSAPP_TOKEN,
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
|
||||
]
|
||||
):
|
||||
logger.error(
|
||||
"""Missing required environment variables for WhatsApp transport:
|
||||
WHATSAPP_APP_SECRET
|
||||
WHATSAPP_PHONE_NUMBER_ID
|
||||
WHATSAPP_TOKEN
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN
|
||||
"""
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
@@ -300,24 +407,7 @@ def _setup_whatsapp_routes(app: FastAPI):
|
||||
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
|
||||
from pipecat.transports.whatsapp.client import WhatsAppClient
|
||||
except ImportError as e:
|
||||
logger.error(f"WebRTC transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
|
||||
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
|
||||
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
|
||||
|
||||
if not all(
|
||||
[
|
||||
WHATSAPP_TOKEN,
|
||||
WHATSAPP_PHONE_NUMBER_ID,
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
|
||||
]
|
||||
):
|
||||
logger.debug(
|
||||
"Missing required environment variables for WhatsApp transport. Keeping it disabled."
|
||||
)
|
||||
logger.error(f"WhatsApp transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
# Global WhatsApp client instance
|
||||
@@ -487,8 +577,6 @@ def _setup_daily_routes(app: FastAPI):
|
||||
else:
|
||||
logger.debug("No body data provided in request")
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
@@ -576,8 +664,6 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
|
||||
async def _run_daily_direct():
|
||||
"""Run Daily bot with direct connection (no FastAPI server)."""
|
||||
try:
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
except ImportError as e:
|
||||
logger.error("Daily transport dependencies not installed.")
|
||||
@@ -689,6 +775,12 @@ def main():
|
||||
parser.add_argument(
|
||||
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--whatsapp",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Ensure requried WhatsApp environment variables are present",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -731,10 +823,11 @@ def main():
|
||||
print()
|
||||
if args.esp32:
|
||||
print(f"🚀 Bot ready! (ESP32 mode)")
|
||||
print(f" → Open http://{args.host}:{args.port}/client in your browser")
|
||||
elif args.whatsapp:
|
||||
print(f"🚀 Bot ready! (WhatsApp)")
|
||||
else:
|
||||
print(f"🚀 Bot ready!")
|
||||
print(f" → Open http://{args.host}:{args.port}/client in your browser")
|
||||
print(f" → Open http://{args.host}:{args.port}/client in your browser")
|
||||
print()
|
||||
elif args.transport == "daily":
|
||||
print()
|
||||
@@ -752,6 +845,7 @@ def main():
|
||||
host=args.host,
|
||||
proxy=args.proxy,
|
||||
esp32_mode=args.esp32,
|
||||
whatsapp_enabled=args.whatsapp,
|
||||
folder=args.folder,
|
||||
)
|
||||
|
||||
|
||||
@@ -108,6 +108,8 @@ class AssemblyAIConnectionParams(BaseModel):
|
||||
end_of_turn_confidence_threshold: Confidence threshold for end-of-turn detection.
|
||||
min_end_of_turn_silence_when_confident: Minimum silence duration when confident about end-of-turn.
|
||||
max_turn_silence: Maximum silence duration before forcing end-of-turn.
|
||||
keyterms_prompt: List of key terms to guide transcription. Will be JSON serialized before sending.
|
||||
speech_model: Select between English and multilingual models. Defaults to "universal-streaming-english".
|
||||
"""
|
||||
|
||||
sample_rate: int = 16000
|
||||
@@ -117,3 +119,7 @@ class AssemblyAIConnectionParams(BaseModel):
|
||||
end_of_turn_confidence_threshold: Optional[float] = None
|
||||
min_end_of_turn_silence_when_confident: Optional[int] = None
|
||||
max_turn_silence: Optional[int] = None
|
||||
keyterms_prompt: Optional[List[str]] = None
|
||||
speech_model: Literal["universal-streaming-english", "universal-streaming-multilingual"] = (
|
||||
"universal-streaming-english"
|
||||
)
|
||||
|
||||
@@ -174,11 +174,16 @@ class AssemblyAISTTService(STTService):
|
||||
|
||||
def _build_ws_url(self) -> str:
|
||||
"""Build WebSocket URL with query parameters using urllib.parse.urlencode."""
|
||||
params = {
|
||||
k: str(v).lower() if isinstance(v, bool) else v
|
||||
for k, v in self._connection_params.model_dump().items()
|
||||
if v is not None
|
||||
}
|
||||
params = {}
|
||||
for k, v in self._connection_params.model_dump().items():
|
||||
if v is not None:
|
||||
if k == "keyterms_prompt":
|
||||
params[k] = json.dumps(v)
|
||||
elif isinstance(v, bool):
|
||||
params[k] = str(v).lower()
|
||||
else:
|
||||
params[k] = v
|
||||
|
||||
if params:
|
||||
query_string = urlencode(params)
|
||||
return f"{self._api_endpoint_base_url}?{query_string}"
|
||||
@@ -197,6 +202,8 @@ class AssemblyAISTTService(STTService):
|
||||
)
|
||||
self._connected = True
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to AssemblyAI: {e}")
|
||||
self._connected = False
|
||||
@@ -238,6 +245,7 @@ class AssemblyAISTTService(STTService):
|
||||
self._websocket = None
|
||||
self._connected = False
|
||||
self._receive_task = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
"""Handle incoming WebSocket messages."""
|
||||
|
||||
@@ -235,6 +235,8 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
}
|
||||
|
||||
await self._get_websocket().send(json.dumps(init_msg))
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -252,6 +254,7 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
finally:
|
||||
self._websocket = None
|
||||
self._started = False
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
|
||||
@@ -286,6 +286,7 @@ class AWSTranscribeSTTService(STTService):
|
||||
|
||||
logger.info(f"{self} Successfully connected to AWS Transcribe")
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} Failed to connect to AWS Transcribe: {e}")
|
||||
await self._disconnect()
|
||||
@@ -310,6 +311,7 @@ class AWSTranscribeSTTService(STTService):
|
||||
logger.warning(f"{self} Error closing WebSocket connection: {e}")
|
||||
finally:
|
||||
self._ws_client = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
"""Convert internal language enum to AWS Transcribe language code.
|
||||
|
||||
@@ -28,13 +28,12 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.services.stt_service import WebsocketSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
try:
|
||||
import websockets
|
||||
from websockets.asyncio.client import connect as websocket_connect
|
||||
from websockets.protocol import State
|
||||
except ModuleNotFoundError as e:
|
||||
@@ -124,7 +123,7 @@ class CartesiaLiveOptions:
|
||||
return cls(**json.loads(json_str))
|
||||
|
||||
|
||||
class CartesiaSTTService(STTService):
|
||||
class CartesiaSTTService(WebsocketSTTService):
|
||||
"""Speech-to-text service using Cartesia Live API.
|
||||
|
||||
Provides real-time speech transcription through WebSocket connection
|
||||
@@ -176,8 +175,7 @@ class CartesiaSTTService(STTService):
|
||||
self.set_model_name(merged_options.model)
|
||||
self._api_key = api_key
|
||||
self._base_url = base_url or "api.cartesia.ai"
|
||||
self._connection = None
|
||||
self._receiver_task = None
|
||||
self._receive_task = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if the service can generate processing metrics.
|
||||
@@ -214,6 +212,27 @@ class CartesiaSTTService(STTService):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start performance metrics collection for transcription processing."""
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames and handle speech events.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: Direction of frame flow in the pipeline.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self.start_metrics()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
# Send finalize command to flush the transcription session
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
await self._websocket.send("finalize")
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Process audio data for speech-to-text transcription.
|
||||
|
||||
@@ -224,45 +243,71 @@ class CartesiaSTTService(STTService):
|
||||
None - transcription results are handled via WebSocket responses.
|
||||
"""
|
||||
# If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again
|
||||
if not self._connection or self._connection.state is State.CLOSED:
|
||||
if not self._websocket or self._websocket.state is State.CLOSED:
|
||||
await self._connect()
|
||||
|
||||
await self._connection.send(audio)
|
||||
await self._websocket.send(audio)
|
||||
yield None
|
||||
|
||||
async def _connect(self):
|
||||
params = self._settings.to_dict()
|
||||
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
|
||||
logger.debug(f"Connecting to Cartesia: {ws_url}")
|
||||
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
|
||||
await self._connect_websocket()
|
||||
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = asyncio.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
self._connection = await websocket_connect(ws_url, additional_headers=headers)
|
||||
# Setup the receiver task to handle the incoming messages from the Cartesia server
|
||||
if self._receiver_task is None or self._receiver_task.done():
|
||||
self._receiver_task = asyncio.create_task(self._receive_messages())
|
||||
logger.debug(f"Connected to Cartesia")
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
return
|
||||
logger.debug("Connecting to Cartesia STT")
|
||||
|
||||
params = self._settings.to_dict()
|
||||
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
|
||||
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
|
||||
|
||||
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self}: unable to connect to Cartesia: {e}")
|
||||
|
||||
async def _receive_messages(self):
|
||||
async def _disconnect_websocket(self):
|
||||
try:
|
||||
while True:
|
||||
if not self._connection or self._connection.state is State.CLOSED:
|
||||
break
|
||||
|
||||
message = await self._connection.recv()
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_response(data)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Received non-JSON message: {message}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except websockets.exceptions.ConnectionClosed as e:
|
||||
logger.debug(f"WebSocket connection closed: {e}")
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
logger.debug("Disconnecting from Cartesia STT")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message receiver: {e}")
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
return self._websocket
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
async def _process_messages(self):
|
||||
async for message in self._get_websocket():
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_response(data)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Received non-JSON message: {message}")
|
||||
|
||||
async def _receive_messages(self):
|
||||
while True:
|
||||
await self._process_messages()
|
||||
# Cartesia times out after 5 minutes of innactivity (no keepalive
|
||||
# mechanism is available). So, we try to reconnect.
|
||||
logger.debug(f"{self} Cartesia connection was disconnected (timeout?), reconnecting")
|
||||
await self._connect_websocket()
|
||||
|
||||
async def _process_response(self, data):
|
||||
if "type" in data:
|
||||
@@ -316,41 +361,3 @@ class CartesiaSTTService(STTService):
|
||||
language,
|
||||
)
|
||||
)
|
||||
|
||||
async def _disconnect(self):
|
||||
if self._receiver_task:
|
||||
self._receiver_task.cancel()
|
||||
try:
|
||||
await self._receiver_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected exception while cancelling task: {e}")
|
||||
self._receiver_task = None
|
||||
|
||||
if self._connection and self._connection.state is State.OPEN:
|
||||
logger.debug("Disconnecting from Cartesia")
|
||||
|
||||
await self._connection.close()
|
||||
self._connection = None
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start performance metrics collection for transcription processing."""
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames and handle speech events.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: Direction of frame flow in the pipeline.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self.start_metrics()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
# Send finalize command to flush the transcription session
|
||||
if self._connection and self._connection.state is State.OPEN:
|
||||
await self._connection.send("finalize")
|
||||
|
||||
@@ -344,10 +344,11 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
try:
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
return
|
||||
logger.debug("Connecting to Cartesia")
|
||||
logger.debug("Connecting to Cartesia TTS")
|
||||
self._websocket = await websocket_connect(
|
||||
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
|
||||
)
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -365,6 +366,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
finally:
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
|
||||
@@ -205,6 +205,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
additional_headers={"Authorization": f"Token {self._api_key}"},
|
||||
)
|
||||
logger.debug("Connected to Deepgram Flux Websocket")
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -225,6 +226,9 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _send_close_stream(self) -> None:
|
||||
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.
|
||||
|
||||
@@ -168,16 +168,24 @@ def build_elevenlabs_voice_settings(
|
||||
|
||||
|
||||
def calculate_word_times(
|
||||
alignment_info: Mapping[str, Any], cumulative_time: float
|
||||
) -> List[Tuple[str, float]]:
|
||||
alignment_info: Mapping[str, Any],
|
||||
cumulative_time: float,
|
||||
partial_word: str = "",
|
||||
partial_word_start_time: float = 0.0,
|
||||
) -> tuple[List[Tuple[str, float]], str, float]:
|
||||
"""Calculate word timestamps from character alignment information.
|
||||
|
||||
Args:
|
||||
alignment_info: Character alignment data from ElevenLabs API.
|
||||
cumulative_time: Base time offset for this chunk.
|
||||
partial_word: Partial word carried over from previous chunk.
|
||||
partial_word_start_time: Start time of the partial word.
|
||||
|
||||
Returns:
|
||||
List of (word, timestamp) tuples.
|
||||
Tuple of (word_times, new_partial_word, new_partial_word_start_time):
|
||||
- word_times: List of (word, timestamp) tuples for complete words
|
||||
- new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space)
|
||||
- new_partial_word_start_time: Start time of the incomplete word
|
||||
"""
|
||||
chars = alignment_info["chars"]
|
||||
char_start_times_ms = alignment_info["charStartTimesMs"]
|
||||
@@ -186,41 +194,37 @@ def calculate_word_times(
|
||||
logger.error(
|
||||
f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}"
|
||||
)
|
||||
return []
|
||||
return ([], partial_word, partial_word_start_time)
|
||||
|
||||
# Build words and track their start positions
|
||||
words = []
|
||||
word_start_indices = []
|
||||
current_word = ""
|
||||
word_start_index = None
|
||||
word_start_times = []
|
||||
current_word = partial_word # Start with any partial word from previous chunk
|
||||
word_start_time = partial_word_start_time if partial_word else None
|
||||
|
||||
for i, char in enumerate(chars):
|
||||
if char == " ":
|
||||
# End of current word
|
||||
if current_word: # Only add non-empty words
|
||||
words.append(current_word)
|
||||
word_start_indices.append(word_start_index)
|
||||
word_start_times.append(word_start_time)
|
||||
current_word = ""
|
||||
word_start_index = None
|
||||
word_start_time = None
|
||||
else:
|
||||
# Building a word
|
||||
if word_start_index is None: # First character of new word
|
||||
word_start_index = i
|
||||
if word_start_time is None: # First character of new word
|
||||
# Convert from milliseconds to seconds and add cumulative offset
|
||||
word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0)
|
||||
current_word += char
|
||||
|
||||
# Handle the last word if there's no trailing space
|
||||
if current_word and word_start_index is not None:
|
||||
words.append(current_word)
|
||||
word_start_indices.append(word_start_index)
|
||||
# Build result for complete words
|
||||
word_times = list(zip(words, word_start_times))
|
||||
|
||||
# Calculate timestamps for each word
|
||||
word_times = []
|
||||
for word, start_idx in zip(words, word_start_indices):
|
||||
# Convert from milliseconds to seconds and add cumulative offset
|
||||
start_time_seconds = cumulative_time + (char_start_times_ms[start_idx] / 1000.0)
|
||||
word_times.append((word, start_time_seconds))
|
||||
# Return any incomplete word at the end of this chunk
|
||||
new_partial_word = current_word if current_word else ""
|
||||
new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0
|
||||
|
||||
return word_times
|
||||
return (word_times, new_partial_word, new_partial_word_start_time)
|
||||
|
||||
|
||||
class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
@@ -332,6 +336,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
# there's an interruption or TTSStoppedFrame.
|
||||
self._started = False
|
||||
self._cumulative_time = 0
|
||||
# Track partial words that span across alignment chunks
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
|
||||
# Context management for v1 multi API
|
||||
self._context_id = None
|
||||
@@ -521,6 +528,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
|
||||
)
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -543,6 +551,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
self._started = False
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
@@ -570,6 +579,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
logger.error(f"Error closing context on interruption: {e}")
|
||||
self._context_id = None
|
||||
self._started = False
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Handle incoming WebSocket messages from ElevenLabs."""
|
||||
@@ -609,7 +620,14 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
|
||||
if msg.get("alignment"):
|
||||
alignment = msg["alignment"]
|
||||
word_times = calculate_word_times(alignment, self._cumulative_time)
|
||||
word_times, self._partial_word, self._partial_word_start_time = (
|
||||
calculate_word_times(
|
||||
alignment,
|
||||
self._cumulative_time,
|
||||
self._partial_word,
|
||||
self._partial_word_start_time,
|
||||
)
|
||||
)
|
||||
|
||||
if word_times:
|
||||
await self.add_word_timestamps(word_times)
|
||||
@@ -683,6 +701,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
yield TTSStartedFrame()
|
||||
self._started = True
|
||||
self._cumulative_time = 0
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
# If a context ID does not exist, create a new one and
|
||||
# register it. If an ID exists, that means the Pipeline is
|
||||
# configured for allow_interruptions=False, so continue
|
||||
@@ -756,6 +776,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
base_url: str = "https://api.elevenlabs.io",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: Optional[InputParams] = None,
|
||||
aggregate_sentences: Optional[bool] = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the ElevenLabs HTTP TTS service.
|
||||
@@ -768,10 +789,11 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
base_url: Base URL for ElevenLabs HTTP API.
|
||||
sample_rate: Audio sample rate. If None, uses default.
|
||||
params: Additional input parameters for voice customization.
|
||||
aggregate_sentences: Whether to aggregate sentences within the TTSService.
|
||||
**kwargs: Additional arguments passed to the parent service.
|
||||
"""
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
aggregate_sentences=aggregate_sentences,
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
sample_rate=sample_rate,
|
||||
@@ -809,6 +831,10 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
# Store previous text for context within a turn
|
||||
self._previous_text = ""
|
||||
|
||||
# Track partial words that span across alignment chunks
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
"""Convert pipecat Language to ElevenLabs language code.
|
||||
|
||||
@@ -836,6 +862,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
self._cumulative_time = 0
|
||||
self._started = False
|
||||
self._previous_text = ""
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
logger.debug(f"{self}: Reset internal state")
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
@@ -870,11 +898,13 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> List[Tuple[str, float]]:
|
||||
"""Calculate word timing from character alignment data.
|
||||
|
||||
This method handles partial words that may span across multiple alignment chunks.
|
||||
|
||||
Args:
|
||||
alignment_info: Character timing data from ElevenLabs.
|
||||
|
||||
Returns:
|
||||
List of (word, timestamp) pairs.
|
||||
List of (word, timestamp) pairs for complete words in this chunk.
|
||||
|
||||
Example input data::
|
||||
|
||||
@@ -900,30 +930,28 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
# Build the words and find their start times
|
||||
words = []
|
||||
word_start_times = []
|
||||
current_word = ""
|
||||
first_char_idx = -1
|
||||
# Start with any partial word from previous chunk
|
||||
current_word = self._partial_word
|
||||
word_start_time = self._partial_word_start_time if self._partial_word else None
|
||||
|
||||
for i, char in enumerate(chars):
|
||||
if char == " ":
|
||||
if current_word: # Only add non-empty words
|
||||
words.append(current_word)
|
||||
# Use time of the first character of the word, offset by cumulative time
|
||||
word_start_times.append(
|
||||
self._cumulative_time + char_start_times[first_char_idx]
|
||||
)
|
||||
word_start_times.append(word_start_time)
|
||||
current_word = ""
|
||||
first_char_idx = -1
|
||||
word_start_time = None
|
||||
else:
|
||||
if not current_word: # This is the first character of a new word
|
||||
first_char_idx = i
|
||||
if word_start_time is None: # First character of a new word
|
||||
# Use time of the first character of the word, offset by cumulative time
|
||||
word_start_time = self._cumulative_time + char_start_times[i]
|
||||
current_word += char
|
||||
|
||||
# Don't forget the last word if there's no trailing space
|
||||
if current_word and first_char_idx >= 0:
|
||||
words.append(current_word)
|
||||
word_start_times.append(self._cumulative_time + char_start_times[first_char_idx])
|
||||
# Store any incomplete word at the end of this chunk
|
||||
self._partial_word = current_word if current_word else ""
|
||||
self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0
|
||||
|
||||
# Create word-time pairs
|
||||
# Create word-time pairs for complete words only
|
||||
word_times = list(zip(words, word_start_times))
|
||||
|
||||
return word_times
|
||||
@@ -959,6 +987,9 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
if self._voice_settings:
|
||||
payload["voice_settings"] = self._voice_settings
|
||||
|
||||
if self._settings["apply_text_normalization"] is not None:
|
||||
payload["apply_text_normalization"] = self._settings["apply_text_normalization"]
|
||||
|
||||
language = self._settings["language"]
|
||||
if self._model_name in ELEVENLABS_MULTILINGUAL_MODELS and language:
|
||||
payload["language_code"] = language
|
||||
@@ -979,8 +1010,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
}
|
||||
if self._settings["optimize_streaming_latency"] is not None:
|
||||
params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"]
|
||||
if self._settings["apply_text_normalization"] is not None:
|
||||
params["apply_text_normalization"] = self._settings["apply_text_normalization"]
|
||||
|
||||
try:
|
||||
await self.start_ttfb_metrics()
|
||||
@@ -1041,6 +1070,14 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
logger.error(f"Error processing response: {e}", exc_info=True)
|
||||
continue
|
||||
|
||||
# After processing all chunks, emit any remaining partial word
|
||||
# since this is the end of the utterance
|
||||
if self._partial_word:
|
||||
final_word_time = [(self._partial_word, self._partial_word_start_time)]
|
||||
await self.add_word_timestamps(final_word_time)
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
|
||||
# After processing all chunks, add the total utterance duration
|
||||
# to the cumulative time to ensure next utterance starts after this one
|
||||
if utterance_duration > 0:
|
||||
|
||||
@@ -225,6 +225,8 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
start_message = {"event": "start", "request": {"text": "", **self._settings}}
|
||||
await self._websocket.send(ormsgpack.packb(start_message))
|
||||
logger.debug("Sent start message to Fish Audio")
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"Fish Audio initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -245,6 +247,7 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
self._request_id = None
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def flush_audio(self):
|
||||
"""Flush any buffered audio by sending a flush event to Fish Audio."""
|
||||
|
||||
@@ -730,6 +730,8 @@ class GoogleSTTService(STTService):
|
||||
self._request_queue = asyncio.Queue()
|
||||
self._streaming_task = self.create_task(self._stream_audio())
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
|
||||
async def _disconnect(self):
|
||||
"""Clean up streaming recognition resources."""
|
||||
if self._streaming_task:
|
||||
@@ -737,6 +739,8 @@ class GoogleSTTService(STTService):
|
||||
await self.cancel_task(self._streaming_task)
|
||||
self._streaming_task = None
|
||||
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _request_generator(self):
|
||||
"""Generates requests for the streaming recognize method."""
|
||||
recognizer_path = f"projects/{self._project_id}/locations/{self._location}/recognizers/_"
|
||||
|
||||
@@ -222,6 +222,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
# Send initialization message
|
||||
await self._websocket.send(json.dumps(init_msg))
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -243,6 +244,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
"""Get the WebSocket connection if available."""
|
||||
|
||||
@@ -293,6 +293,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
headers = {"x-api-key": self._api_key}
|
||||
|
||||
self._websocket = await websocket_connect(url, additional_headers=headers)
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -311,6 +313,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Receive and process messages from Neuphonic WebSocket."""
|
||||
|
||||
@@ -14,6 +14,7 @@ from typing import AsyncGenerator, Dict, Literal, Optional
|
||||
|
||||
from loguru import logger
|
||||
from openai import AsyncOpenAI, BadRequestError
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
@@ -55,6 +56,17 @@ class OpenAITTSService(TTSService):
|
||||
|
||||
OPENAI_SAMPLE_RATE = 24000 # OpenAI TTS always outputs at 24kHz
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Input parameters for OpenAI TTS configuration.
|
||||
|
||||
Parameters:
|
||||
instructions: Instructions to guide voice synthesis behavior.
|
||||
speed: Voice speed control (0.25 to 4.0, default 1.0).
|
||||
"""
|
||||
|
||||
instructions: Optional[str] = None
|
||||
speed: Optional[float] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
@@ -65,6 +77,7 @@ class OpenAITTSService(TTSService):
|
||||
sample_rate: Optional[int] = None,
|
||||
instructions: Optional[str] = None,
|
||||
speed: Optional[float] = None,
|
||||
params: Optional[InputParams] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize OpenAI TTS service.
|
||||
@@ -77,7 +90,11 @@ class OpenAITTSService(TTSService):
|
||||
sample_rate: Output audio sample rate in Hz. If None, uses OpenAI's default 24kHz.
|
||||
instructions: Optional instructions to guide voice synthesis behavior.
|
||||
speed: Voice speed control (0.25 to 4.0, default 1.0).
|
||||
params: Optional synthesis controls (acting instructions, speed, ...).
|
||||
**kwargs: Additional keyword arguments passed to TTSService.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
The `instructions` and `speed` parameters are deprecated, use `InputParams` instead.
|
||||
"""
|
||||
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
|
||||
logger.warning(
|
||||
@@ -86,12 +103,26 @@ class OpenAITTSService(TTSService):
|
||||
)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._speed = speed
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice)
|
||||
self._instructions = instructions
|
||||
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url)
|
||||
|
||||
if instructions or speed:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"The `instructions` and `speed` parameters are deprecated, use `InputParams` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
self._settings = {
|
||||
"instructions": params.instructions if params else instructions,
|
||||
"speed": params.speed if params else speed,
|
||||
}
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
@@ -144,11 +175,11 @@ class OpenAITTSService(TTSService):
|
||||
"response_format": "pcm",
|
||||
}
|
||||
|
||||
if self._instructions:
|
||||
create_params["instructions"] = self._instructions
|
||||
if self._settings["instructions"]:
|
||||
create_params["instructions"] = self._settings["instructions"]
|
||||
|
||||
if self._speed:
|
||||
create_params["speed"] = self._speed
|
||||
if self._settings["speed"]:
|
||||
create_params["speed"] = self._settings["speed"]
|
||||
|
||||
async with self._client.audio.speech.with_streaming_response.create(
|
||||
**create_params
|
||||
|
||||
@@ -269,6 +269,8 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
raise ValueError("WebSocket URL is not a string")
|
||||
|
||||
self._websocket = await websocket_connect(self._websocket_url)
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except ValueError as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -291,6 +293,7 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
finally:
|
||||
self._request_id = None
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _get_websocket_url(self):
|
||||
"""Retrieve WebSocket URL from PlayHT API."""
|
||||
|
||||
@@ -255,6 +255,8 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
url = f"{self._url}?{params}"
|
||||
headers = {"Authorization": f"Bearer {self._api_key}"}
|
||||
self._websocket = await websocket_connect(url, additional_headers=headers)
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -272,6 +274,7 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
finally:
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
"""Get active websocket connection or raise exception."""
|
||||
|
||||
@@ -583,7 +583,9 @@ class RivaSegmentedSTTService(SegmentedSTTService):
|
||||
self._config.language_code = self._language
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(self, transcript: str, language: Optional[Language] = None):
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
|
||||
@@ -76,17 +76,29 @@ class SarvamHttpTTSService(TTSService):
|
||||
|
||||
Example::
|
||||
|
||||
tts = SarvamTTSService(
|
||||
tts = SarvamHttpTTSService(
|
||||
api_key="your-api-key",
|
||||
voice_id="anushka",
|
||||
model="bulbul:v2",
|
||||
aiohttp_session=session,
|
||||
params=SarvamTTSService.InputParams(
|
||||
params=SarvamHttpTTSService.InputParams(
|
||||
language=Language.HI,
|
||||
pitch=0.1,
|
||||
pace=1.2
|
||||
)
|
||||
)
|
||||
|
||||
# For bulbul v3 beta with any speaker:
|
||||
tts_v3 = SarvamHttpTTSService(
|
||||
api_key="your-api-key",
|
||||
voice_id="speaker_name",
|
||||
model="bulbul:v3,
|
||||
aiohttp_session=session,
|
||||
params=SarvamHttpTTSService.InputParams(
|
||||
language=Language.HI,
|
||||
temperature=0.8
|
||||
)
|
||||
)
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
@@ -105,6 +117,14 @@ class SarvamHttpTTSService(TTSService):
|
||||
pace: Optional[float] = Field(default=1.0, ge=0.3, le=3.0)
|
||||
loudness: Optional[float] = Field(default=1.0, ge=0.1, le=3.0)
|
||||
enable_preprocessing: Optional[bool] = False
|
||||
temperature: Optional[float] = Field(
|
||||
default=0.6,
|
||||
ge=0.01,
|
||||
le=1.0,
|
||||
description="Controls the randomness of the output for bulbul v3 beta. "
|
||||
"Lower values make the output more focused and deterministic, while "
|
||||
"higher values make it more random. Range: 0.01 to 1.0. Default: 0.6.",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -124,7 +144,7 @@ class SarvamHttpTTSService(TTSService):
|
||||
api_key: Sarvam AI API subscription key.
|
||||
aiohttp_session: Shared aiohttp session for making requests.
|
||||
voice_id: Speaker voice ID (e.g., "anushka", "meera"). Defaults to "anushka".
|
||||
model: TTS model to use ("bulbul:v1" or "bulbul:v2"). Defaults to "bulbul:v2".
|
||||
model: TTS model to use ("bulbul:v2" or "bulbul:v3-beta" or "bulbul:v3"). Defaults to "bulbul:v2".
|
||||
base_url: Sarvam AI API base URL. Defaults to "https://api.sarvam.ai".
|
||||
sample_rate: Audio sample rate in Hz (8000, 16000, 22050, 24000). If None, uses default.
|
||||
params: Additional voice and preprocessing parameters. If None, uses defaults.
|
||||
@@ -138,16 +158,32 @@ class SarvamHttpTTSService(TTSService):
|
||||
self._base_url = base_url
|
||||
self._session = aiohttp_session
|
||||
|
||||
# Build base settings common to all models
|
||||
self._settings = {
|
||||
"language": (
|
||||
self.language_to_service_language(params.language) if params.language else "en-IN"
|
||||
),
|
||||
"pitch": params.pitch,
|
||||
"pace": params.pace,
|
||||
"loudness": params.loudness,
|
||||
"enable_preprocessing": params.enable_preprocessing,
|
||||
}
|
||||
|
||||
# Add model-specific parameters
|
||||
if model in ("bulbul:v3-beta", "bulbul:v3"):
|
||||
self._settings.update(
|
||||
{
|
||||
"temperature": getattr(params, "temperature", 0.6),
|
||||
"model": model,
|
||||
}
|
||||
)
|
||||
else:
|
||||
self._settings.update(
|
||||
{
|
||||
"pitch": params.pitch,
|
||||
"pace": params.pace,
|
||||
"loudness": params.loudness,
|
||||
"model": model,
|
||||
}
|
||||
)
|
||||
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
|
||||
@@ -275,6 +311,18 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
pace=1.2
|
||||
)
|
||||
)
|
||||
|
||||
# For bulbul v3 beta with any speaker and temperature:
|
||||
# Note: pace and loudness are not supported for bulbul v3 and bulbul v3 beta
|
||||
tts_v3 = SarvamTTSService(
|
||||
api_key="your-api-key",
|
||||
voice_id="speaker_name",
|
||||
model="bulbul:v3",
|
||||
params=SarvamTTSService.InputParams(
|
||||
language=Language.HI,
|
||||
temperature=0.8
|
||||
)
|
||||
)
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
@@ -310,6 +358,14 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
output_audio_codec: Optional[str] = "linear16"
|
||||
output_audio_bitrate: Optional[str] = "128k"
|
||||
language: Optional[Language] = Language.EN
|
||||
temperature: Optional[float] = Field(
|
||||
default=0.6,
|
||||
ge=0.01,
|
||||
le=1.0,
|
||||
description="Controls the randomness of the output for bulbul v3 beta. "
|
||||
"Lower values make the output more focused and deterministic, while "
|
||||
"higher values make it more random. Range: 0.01 to 1.0. Default: 0.6.",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -329,6 +385,7 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
Args:
|
||||
api_key: Sarvam API key for authenticating TTS requests.
|
||||
model: Identifier of the Sarvam speech model (default "bulbul:v2").
|
||||
Supports "bulbul:v2", "bulbul:v3-beta" and "bulbul:v3".
|
||||
voice_id: Voice identifier for synthesis (default "anushka").
|
||||
url: WebSocket URL for connecting to the TTS backend (default production URL).
|
||||
aiohttp_session: Optional shared aiohttp session. To maintain backward compatibility.
|
||||
@@ -371,15 +428,12 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
self._api_key = api_key
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
# Configuration parameters
|
||||
# Build base settings common to all models
|
||||
self._settings = {
|
||||
"target_language_code": (
|
||||
self.language_to_service_language(params.language) if params.language else "en-IN"
|
||||
),
|
||||
"pitch": params.pitch,
|
||||
"pace": params.pace,
|
||||
"speaker": voice_id,
|
||||
"loudness": params.loudness,
|
||||
"speech_sample_rate": 0,
|
||||
"enable_preprocessing": params.enable_preprocessing,
|
||||
"min_buffer_size": params.min_buffer_size,
|
||||
@@ -387,6 +441,24 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
"output_audio_codec": params.output_audio_codec,
|
||||
"output_audio_bitrate": params.output_audio_bitrate,
|
||||
}
|
||||
|
||||
# Add model-specific parameters
|
||||
if model in ("bulbul:v3-beta", "bulbul:v3"):
|
||||
self._settings.update(
|
||||
{
|
||||
"temperature": getattr(params, "temperature", 0.6),
|
||||
"model": model,
|
||||
}
|
||||
)
|
||||
else:
|
||||
self._settings.update(
|
||||
{
|
||||
"pitch": params.pitch,
|
||||
"pace": params.pace,
|
||||
"loudness": params.loudness,
|
||||
"model": model,
|
||||
}
|
||||
)
|
||||
self._started = False
|
||||
|
||||
self._receive_task = None
|
||||
@@ -525,6 +597,7 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
logger.debug("Connected to Sarvam TTS Websocket")
|
||||
await self._send_config()
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -556,6 +629,10 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
|
||||
@@ -577,6 +577,7 @@ class SpeechmaticsSTTService(STTService):
|
||||
),
|
||||
)
|
||||
logger.debug(f"{self} Connected to Speechmatics STT service")
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} Error connecting to Speechmatics: {e}")
|
||||
self._client = None
|
||||
@@ -595,6 +596,7 @@ class SpeechmaticsSTTService(STTService):
|
||||
logger.error(f"{self} Error closing Speechmatics client: {e}")
|
||||
finally:
|
||||
self._client = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _process_config(self) -> None:
|
||||
"""Create a formatted STT transcription config.
|
||||
@@ -618,7 +620,7 @@ class SpeechmaticsSTTService(STTService):
|
||||
transcription_config.additional_vocab = [
|
||||
{
|
||||
"content": e.content,
|
||||
"sounds_like": e.sounds_like,
|
||||
**({"sounds_like": e.sounds_like} if e.sounds_like else {}),
|
||||
}
|
||||
for e in self._params.additional_vocab
|
||||
]
|
||||
|
||||
@@ -35,6 +35,25 @@ class STTService(AIService):
|
||||
Provides common functionality for STT services including audio passthrough,
|
||||
muting, settings management, and audio processing. Subclasses must implement
|
||||
the run_stt method to provide actual speech recognition.
|
||||
|
||||
Event handlers:
|
||||
on_connected: Called when connected to the STT service.
|
||||
on_connected: Called when disconnected from the STT service.
|
||||
on_connection_error: Called when a connection to the STT service error occurs.
|
||||
|
||||
Example::
|
||||
|
||||
@stt.event_handler("on_connected")
|
||||
async def on_connected(stt: STTService):
|
||||
logger.debug(f"STT connected")
|
||||
|
||||
@stt.event_handler("on_disconnected")
|
||||
async def on_disconnected(stt: STTService):
|
||||
logger.debug(f"STT disconnected")
|
||||
|
||||
@stt.event_handler("on_connection_error")
|
||||
async def on_connection_error(stt: STTService, error: str):
|
||||
logger.error(f"STT connection error: {error}")
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -62,6 +81,10 @@ class STTService(AIService):
|
||||
self._muted: bool = False
|
||||
self._user_id: str = ""
|
||||
|
||||
self._register_event_handler("on_connected")
|
||||
self._register_event_handler("on_disconnected")
|
||||
self._register_event_handler("on_connection_error")
|
||||
|
||||
@property
|
||||
def is_muted(self) -> bool:
|
||||
"""Check if the STT service is currently muted.
|
||||
@@ -292,15 +315,6 @@ class WebsocketSTTService(STTService, WebsocketService):
|
||||
|
||||
Combines STT functionality with websocket connectivity, providing automatic
|
||||
error handling and reconnection capabilities.
|
||||
|
||||
Event handlers:
|
||||
on_connection_error: Called when a websocket connection error occurs.
|
||||
|
||||
Example::
|
||||
|
||||
@stt.event_handler("on_connection_error")
|
||||
async def on_connection_error(stt: STTService, error: str):
|
||||
logger.error(f"STT connection error: {error}")
|
||||
"""
|
||||
|
||||
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
|
||||
@@ -312,7 +326,6 @@ class WebsocketSTTService(STTService, WebsocketService):
|
||||
"""
|
||||
STTService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
|
||||
self._register_event_handler("on_connection_error")
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
|
||||
@@ -59,6 +59,25 @@ class TTSService(AIService):
|
||||
Provides common functionality for TTS services including text aggregation,
|
||||
filtering, audio generation, and frame management. Supports configurable
|
||||
sentence aggregation, silence insertion, and frame processing control.
|
||||
|
||||
Event handlers:
|
||||
on_connected: Called when connected to the STT service.
|
||||
on_connected: Called when disconnected from the STT service.
|
||||
on_connection_error: Called when a connection to the STT service error occurs.
|
||||
|
||||
Example::
|
||||
|
||||
@tts.event_handler("on_connected")
|
||||
async def on_connected(tts: TTSService):
|
||||
logger.debug(f"TTS connected")
|
||||
|
||||
@tts.event_handler("on_disconnected")
|
||||
async def on_disconnected(tts: TTSService):
|
||||
logger.debug(f"TTS disconnected")
|
||||
|
||||
@tts.event_handler("on_connection_error")
|
||||
async def on_connection_error(stt: TTSService, error: str):
|
||||
logger.error(f"TTS connection error: {error}")
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -143,6 +162,10 @@ class TTSService(AIService):
|
||||
|
||||
self._processing_text: bool = False
|
||||
|
||||
self._register_event_handler("on_connected")
|
||||
self._register_event_handler("on_disconnected")
|
||||
self._register_event_handler("on_connection_error")
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""Get the current sample rate for audio output.
|
||||
@@ -626,7 +649,6 @@ class WebsocketTTSService(TTSService, WebsocketService):
|
||||
"""
|
||||
TTSService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
|
||||
self._register_event_handler("on_connection_error")
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
@@ -678,15 +700,6 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
|
||||
"""Base class for websocket-based TTS services that support word timestamps.
|
||||
|
||||
Combines word timestamp functionality with websocket connectivity.
|
||||
|
||||
Event handlers:
|
||||
on_connection_error: Called when a websocket connection error occurs.
|
||||
|
||||
Example::
|
||||
|
||||
@tts.event_handler("on_connection_error")
|
||||
async def on_connection_error(tts: TTSService, error: str):
|
||||
logger.error(f"TTS connection error: {error}")
|
||||
"""
|
||||
|
||||
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
|
||||
@@ -698,7 +711,6 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
|
||||
"""
|
||||
WordTTSService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
|
||||
self._register_event_handler("on_connection_error")
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
|
||||
@@ -232,6 +232,9 @@ class BaseInputTransport(FrameProcessor):
|
||||
"""
|
||||
# Cancel and wait for the audio input task to finish.
|
||||
await self._cancel_audio_task()
|
||||
# Stop audio filter.
|
||||
if self._params.audio_in_filter:
|
||||
await self._params.audio_in_filter.stop()
|
||||
|
||||
async def set_transport_ready(self, frame: StartFrame):
|
||||
"""Called when the transport is ready to stream.
|
||||
|
||||
@@ -293,15 +293,15 @@ class BaseOutputTransport(FrameProcessor):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
#
|
||||
# System frames (like InterruptionFrame) are pushed immediately. Other
|
||||
# frames require order so they are put in the sink queue.
|
||||
#
|
||||
if isinstance(frame, StartFrame):
|
||||
# Push StartFrame before start(), because we want StartFrame to be
|
||||
# processed by every processor before any other frame is processed.
|
||||
await self.push_frame(frame, direction)
|
||||
await self.start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self.stop(frame)
|
||||
# Keep pushing EndFrame down so all the pipeline stops nicely.
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await self.cancel(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -314,21 +314,6 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.write_dtmf(frame)
|
||||
elif isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
# Control frames.
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self.stop(frame)
|
||||
# Keep pushing EndFrame down so all the pipeline stops nicely.
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, MixerControlFrame):
|
||||
await self._handle_frame(frame)
|
||||
# Other frames.
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
await self._handle_frame(frame)
|
||||
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
|
||||
await self._handle_frame(frame)
|
||||
# TODO(aleix): Images and audio should support presentation timestamps.
|
||||
elif frame.pts:
|
||||
await self._handle_frame(frame)
|
||||
elif direction == FrameDirection.UPSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
@@ -410,6 +395,13 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
# Last time a BotSpeakingFrame was pushed.
|
||||
self._bot_speaking_frame_time = 0
|
||||
# How often a BotSpeakingFrame should be pushed (value should be
|
||||
# lower than the audio chunks).
|
||||
self._bot_speaking_frame_period = 0.2
|
||||
# Last time the bot actually spoke.
|
||||
self._bot_speech_last_time = 0
|
||||
|
||||
self._audio_task: Optional[asyncio.Task] = None
|
||||
self._video_task: Optional[asyncio.Task] = None
|
||||
@@ -601,39 +593,71 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
"""Handle bot started speaking event."""
|
||||
if not self._bot_speaking:
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
|
||||
)
|
||||
if self._bot_speaking:
|
||||
return
|
||||
|
||||
downstream_frame = BotStartedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStartedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
|
||||
)
|
||||
|
||||
self._bot_speaking = True
|
||||
downstream_frame = BotStartedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStartedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = True
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
"""Handle bot stopped speaking event."""
|
||||
if self._bot_speaking:
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
|
||||
)
|
||||
if not self._bot_speaking:
|
||||
return
|
||||
|
||||
downstream_frame = BotStoppedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStoppedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
|
||||
)
|
||||
|
||||
self._bot_speaking = False
|
||||
downstream_frame = BotStoppedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStoppedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
self._bot_speaking = False
|
||||
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
async def _bot_currently_speaking(self):
|
||||
"""Handle bot speaking event."""
|
||||
await self._bot_started_speaking()
|
||||
|
||||
diff_time = time.time() - self._bot_speaking_frame_time
|
||||
if diff_time >= self._bot_speaking_frame_period:
|
||||
await self._transport.push_frame(BotSpeakingFrame())
|
||||
await self._transport.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking_frame_time = time.time()
|
||||
|
||||
self._bot_speech_last_time = time.time()
|
||||
|
||||
async def _maybe_bot_currently_speaking(self, frame: SpeechOutputAudioRawFrame):
|
||||
if not is_silence(frame.audio):
|
||||
await self._bot_currently_speaking()
|
||||
else:
|
||||
silence_duration = time.time() - self._bot_speech_last_time
|
||||
if silence_duration > BOT_VAD_STOP_SECS:
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def _handle_bot_speech(self, frame: Frame):
|
||||
# TTS case.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_currently_speaking()
|
||||
# Speech stream case.
|
||||
elif isinstance(frame, SpeechOutputAudioRawFrame):
|
||||
await self._maybe_bot_currently_speaking(frame)
|
||||
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
"""Handle various frame types with appropriate processing.
|
||||
@@ -641,7 +665,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
Args:
|
||||
frame: The frame to handle.
|
||||
"""
|
||||
if isinstance(frame, OutputImageRawFrame):
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self._handle_bot_speech(frame)
|
||||
elif isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_video_images(frame.images)
|
||||
@@ -705,39 +731,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
async def _audio_task_handler(self):
|
||||
"""Main audio processing task handler."""
|
||||
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
|
||||
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
|
||||
# every audio chunk.
|
||||
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
|
||||
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
|
||||
bot_speaking_counter = 0
|
||||
speech_last_speaking_time = 0
|
||||
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
is_speaking = False
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
is_speaking = True
|
||||
elif isinstance(frame, SpeechOutputAudioRawFrame):
|
||||
if not is_silence(frame.audio):
|
||||
is_speaking = True
|
||||
speech_last_speaking_time = time.time()
|
||||
else:
|
||||
silence_duration = time.time() - speech_last_speaking_time
|
||||
if silence_duration > BOT_VAD_STOP_SECS:
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
if is_speaking:
|
||||
await self._bot_started_speaking()
|
||||
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
|
||||
await self._transport.push_frame(BotSpeakingFrame())
|
||||
await self._transport.push_frame(
|
||||
BotSpeakingFrame(), FrameDirection.UPSTREAM
|
||||
)
|
||||
bot_speaking_counter = 0
|
||||
bot_speaking_counter += 1
|
||||
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
@@ -689,3 +689,8 @@ class SmallWebRTCConnection(BaseObject):
|
||||
)()
|
||||
if track:
|
||||
track.set_enabled(signalling_message.enabled)
|
||||
|
||||
async def add_ice_candidate(self, candidate):
|
||||
"""Handle incoming ICE candidates."""
|
||||
logger.debug(f"Adding remote candidate: {candidate}")
|
||||
await self.pc.addIceCandidate(candidate)
|
||||
|
||||
@@ -14,6 +14,7 @@ from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||||
|
||||
from aiortc.sdp import candidate_from_sdp
|
||||
from fastapi import HTTPException
|
||||
from loguru import logger
|
||||
|
||||
@@ -39,6 +40,34 @@ class SmallWebRTCRequest:
|
||||
request_data: Optional[Any] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class IceCandidate:
|
||||
"""The remote ice candidate object received from the peer connection.
|
||||
|
||||
Parameters:
|
||||
candidate: The ice candidate patch SDP string (Session Description Protocol).
|
||||
sdp_mid: The SDP mid for the candidate patch.
|
||||
sdp_mline_index: The SDP mline index for the candidate patch.
|
||||
"""
|
||||
|
||||
candidate: str
|
||||
sdp_mid: str
|
||||
sdp_mline_index: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class SmallWebRTCPatchRequest:
|
||||
"""Small WebRTC transport session arguments for the runner.
|
||||
|
||||
Parameters:
|
||||
pc_id: Identifier for the peer connection.
|
||||
candidates: A list of ICE candidate patches.
|
||||
"""
|
||||
|
||||
pc_id: str
|
||||
candidates: List[IceCandidate]
|
||||
|
||||
|
||||
class ConnectionMode(Enum):
|
||||
"""Enum defining the connection handling modes."""
|
||||
|
||||
@@ -197,6 +226,19 @@ class SmallWebRTCRequestHandler:
|
||||
logger.debug(f"SmallWebRTC request details: {request}")
|
||||
raise
|
||||
|
||||
async def handle_patch_request(self, request: SmallWebRTCPatchRequest):
|
||||
"""Handle a SmallWebRTC patch candidate request."""
|
||||
peer_connection = self._pcs_map.get(request.pc_id)
|
||||
|
||||
if not peer_connection:
|
||||
raise HTTPException(status_code=404, detail="Peer connection not found")
|
||||
|
||||
for c in request.candidates:
|
||||
candidate = candidate_from_sdp(c.candidate)
|
||||
candidate.sdpMid = c.sdp_mid
|
||||
candidate.sdpMLineIndex = c.sdp_mline_index
|
||||
await peer_connection.add_ice_candidate(candidate)
|
||||
|
||||
async def close(self):
|
||||
"""Clear the connection map."""
|
||||
coros = [pc.disconnect() for pc in self._pcs_map.values()]
|
||||
|
||||
@@ -254,7 +254,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
|
||||
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
|
||||
timeout=1.0,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
@@ -290,7 +290,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
await task.queue_frame(TextFrame(text="Hello!"))
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
|
||||
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
|
||||
timeout=1.0,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
@@ -301,11 +301,8 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
identity = IdentityFilter()
|
||||
pipeline = Pipeline([identity])
|
||||
task = PipelineTask(pipeline, idle_timeout_secs=0.2)
|
||||
try:
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert False
|
||||
except asyncio.CancelledError:
|
||||
assert True
|
||||
# This shouldn't freeze, so nothing to check really.
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
|
||||
async def test_no_idle_task(self):
|
||||
identity = IdentityFilter()
|
||||
@@ -313,7 +310,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
|
||||
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
|
||||
timeout=0.3,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
@@ -332,11 +329,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
),
|
||||
idle_timeout_secs=0.3,
|
||||
)
|
||||
try:
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert False
|
||||
except asyncio.CancelledError:
|
||||
assert True
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
|
||||
async def test_idle_task_event_handler_no_frames(self):
|
||||
identity = IdentityFilter()
|
||||
@@ -351,11 +344,8 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
idle_timeout = True
|
||||
await task.cancel()
|
||||
|
||||
try:
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert False
|
||||
except asyncio.CancelledError:
|
||||
assert idle_timeout
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert idle_timeout
|
||||
|
||||
async def test_idle_task_event_handler_quiet_user(self):
|
||||
identity = IdentityFilter()
|
||||
@@ -416,12 +406,15 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
asyncio.create_task(delayed_frames()),
|
||||
]
|
||||
|
||||
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
|
||||
diff_time = time.time() - start_time
|
||||
|
||||
self.assertGreater(diff_time, sleep_time_secs * 3)
|
||||
|
||||
# Wait for the pending tasks to complete.
|
||||
await asyncio.gather(*pending)
|
||||
|
||||
async def test_task_cancel_timeout(self):
|
||||
class CancelFilter(FrameProcessor):
|
||||
def __init__(self, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user