Compare commits

...

69 Commits

Author SHA1 Message Date
James Hush
d41ee5319e Add example for getting just recording links 2025-10-23 14:37:39 +08:00
James Hush
45256903b5 Remove log 2025-10-20 17:03:31 +08:00
James Hush
d37e63d972 Simplify create_daily_room.py 2025-10-20 17:02:05 +08:00
James Hush
013f869259 Simplfy fetch_s3_recording 2025-10-20 16:54:24 +08:00
James Hush
11594003e2 This is working 2025-10-20 16:49:37 +08:00
James Hush
2dd2a17b19 Backoff examples 2025-10-20 16:46:16 +08:00
Aleix Conchillo Flaqué
7aa01c1ca8 Merge pull request #2882 from pipecat-ai/aleix/base-transport-output-cleanup
base output transport cleanup
2025-10-18 07:38:13 -07:00
Mark Backman
4d6356748f Merge pull request #2819 from shreyas-sarvam/sarvam/tts-v3
feat: Add support for bulbul:v3
2025-10-18 09:36:57 -04:00
Mark Backman
5b1a182421 Merge branch 'main' into sarvam/tts-v3 2025-10-18 09:34:10 -04:00
Mark Backman
6ac0c34413 Merge pull request #2879 from sam-s10s/fix/smx-vocab
Fix for SpeechmaticsSTTService AdditionVocabEntry entries
2025-10-18 09:27:23 -04:00
Mark Backman
c115422dbf Merge pull request #2857 from dan-ince-aai/main
feat: add keyterms_prompt to AssemblyAI service
2025-10-18 09:20:43 -04:00
Mark Backman
a2a973be27 Merge pull request #2842 from nbyers-altira/fix-riva-segmented
Fix NVIDIA Riva Segmented STT by adding missing is_final parameter to _handle_transcription
2025-10-18 09:11:11 -04:00
Aleix Conchillo Flaqué
0407744950 BaseOutputTransport: simplify process_frame 2025-10-17 21:55:20 -07:00
Aleix Conchillo Flaqué
7ce370ccc6 BaseOutputTransport: simplify bot speaking logic 2025-10-17 15:13:20 -07:00
nbyers-altira
a4867f61aa be a tad more precise in changelog 2025-10-17 13:51:49 -04:00
nbyers-altira
a67a765783 add changelog, run linter 2025-10-17 13:49:52 -04:00
nbyers-altira
81221668b1 Merge remote-tracking branch 'upstream/main' into fix-riva-segmented 2025-10-17 13:45:59 -04:00
Daniel Ince
cc9c264940 Merge branch 'main' into main 2025-10-17 15:15:36 +01:00
Sam Sykes
f2c61ac9fd Fix for AdditionVocabEntry without sounds_like items. 2025-10-17 14:34:37 +01:00
Filipi da Silva Fuchter
88f8c10f63 Merge pull request #2875 from pipecat-ai/filipi/rtvi_routes
Creating the WebRTC routes that mimic the ones provided by Pipecat Cloud.
2025-10-17 10:13:45 -03:00
Filipi Fuchter
855f4842dd Creating the WebRTC routes that mimic the ones provided by Pipecat Cloud. 2025-10-17 10:10:19 -03:00
Filipi da Silva Fuchter
2bf44fe2af Merge pull request #2853 from pipecat-ai/filipi/trickle_ice
Adding support for trickle ice.
2025-10-17 09:00:32 -03:00
Filipi Fuchter
3e8a7cc254 Adding support for trickle ICE to the SmallWebRTCTransport. 2025-10-17 08:57:45 -03:00
Daniel Ince
a600c05570 Merge branch 'main' into main 2025-10-17 11:43:38 +01:00
dan-ince-aai
3ba6b55659 feat: multilingual + changelog updates 2025-10-17 11:38:03 +01:00
dan-ince-aai
d5f2dcfac0 lint 2025-10-17 11:32:06 +01:00
shreyas-sarvam
d12134038b chore: Update CHANGELOG 2025-10-17 10:07:58 +05:30
shreyas-sarvam
a22af3a7e0 Merge branch 'main' into sarvam/stt 2025-10-17 10:00:49 +05:30
Aleix Conchillo Flaqué
76e07c6c48 Merge pull request #2870 from pipecat-ai/aleix/openaitts-update-settings
OpenAITTSService: allow updating instructions and speed
2025-10-16 13:21:12 -07:00
Aleix Conchillo Flaqué
8d8503bca7 OpenAITTSService: allow updating instructions and speed 2025-10-16 13:20:49 -07:00
Aleix Conchillo Flaqué
a444097060 Merge pull request #2872 from pipecat-ai/aleix/pipeline-task-cancellation-fixes
PipelineTask: fix task cancellation issues
2025-10-16 13:18:13 -07:00
Aleix Conchillo Flaqué
1b9e96c016 PipelineTask: fix task cancellation issues 2025-10-16 13:16:19 -07:00
Vanessa Pyne
7967bc53c3 Merge pull request #2868 from pipecat-ai/vp-whatsapp-dep-mv
only import whatsapp deps if using whatsapp runner
2025-10-16 14:16:28 -05:00
vipyne
6381335346 Add --whatsapp flag to runner 2025-10-16 14:15:26 -05:00
vipyne
0fd5d26104 add WHATSAPP_APP_SECRET to required whatsapp env vars 2025-10-16 10:37:56 -05:00
vipyne
41f817bf04 only import whatsapp deps if using whatsapp runner 2025-10-16 10:37:56 -05:00
shreyas-sarvam
27115e6565 Merge branch 'main' into sarvam/tts-v3 2025-10-16 12:09:50 +05:30
Mark Backman
3c4807d7d4 Merge pull request #2859 from pipecat-ai/mb/openai-package-upgrade
Bump openai, openpipe versions, add 14x foundational example
2025-10-15 15:41:32 -04:00
Mark Backman
8902f1dc94 Bump openai, openpipe versions, add 14x foundational example 2025-10-15 15:17:22 -04:00
Mark Backman
a25333ee51 Merge pull request #2856 from pipecat-ai/mb/pr-2840-cleanup
Fix an issue in ElevenLabsHttpTTSService where the last word is not e…
2025-10-15 15:16:43 -04:00
Mark Backman
82c7d7ad83 Merge pull request #2867 from pipecat-ai/mb/update-moondream-readme
Update moondream chatbot README link
2025-10-15 15:16:19 -04:00
Mark Backman
ba2ab51ef7 Merge pull request #2866 from pipecat-ai/mb/add-sentry-foundational
Add foundation 47-sentry-metrics.py
2025-10-15 15:15:52 -04:00
Mark Backman
22557fa668 Fix an issue in ElevenLabsHttpTTSService where the last word is not emitted 2025-10-15 15:13:54 -04:00
Vanessa Pyne
3fbf59e7c6 Merge pull request #2864 from pipecat-ai/vp-trace-log
WhatsApp transport debug log -> trace log
2025-10-15 13:03:58 -05:00
vipyne
129ab5ea0e WhatsApp transport debug log -> trace log 2025-10-15 13:02:57 -05:00
Aleix Conchillo Flaqué
dc917523d0 Merge pull request #2855 from pipecat-ai/aleix/stt-tts-connected-disconnected-events
services: added on_connected/on_disconnected events
2025-10-15 10:41:38 -07:00
Aleix Conchillo Flaqué
5ea7cc9d32 services: added on_connected/on_disconnected events 2025-10-15 10:39:31 -07:00
Mark Backman
e11ede475b Update moondream chatbot README link 2025-10-15 13:22:56 -04:00
Mark Backman
90d29e04af Merge pull request #2861 from pipecat-ai/mb/11labs-http-apply-text-normalization-fix
fix: set apply_text_normalization as request parameter in ElevenLabsH…
2025-10-15 12:59:36 -04:00
Mark Backman
4c67136a8d Merge pull request #2858 from pipecat-ai/mb/daily-runner-room-properties
Add room_properties to the Daily runner configure() method
2025-10-15 12:58:18 -04:00
Mark Backman
9d78402a33 fix: set apply_text_normalization as request parameter in ElevenLabsHttpTTSService 2025-10-15 12:56:42 -04:00
Mark Backman
73877218e9 Add room_properties to the Daily runner configure() method 2025-10-15 12:55:19 -04:00
Mark Backman
6a1be90cbb Merge pull request #2862 from pipecat-ai/mb/11labs-http-aggregate-sentences
Add aggregate_sentences arg to ElevenLabsHttpTTSService
2025-10-15 12:54:23 -04:00
Aleix Conchillo Flaqué
fbac959ecb Merge pull request #2865 from pipecat-ai/aleix/stop-audio-filter-also-on-cancel
BaseInputTransport: stop audio filter on cancel
2025-10-15 09:53:24 -07:00
Aleix Conchillo Flaqué
18dd85431c Merge pull request #2854 from pipecat-ai/aleix/cartesia-stt-service-websocket
CartesiaSTTService to inherit from WebsocketSTTService
2025-10-15 09:51:42 -07:00
Aleix Conchillo Flaqué
abc569b3d2 examples(foundational/07): use CartesiaSTTService 2025-10-15 09:46:57 -07:00
Mark Backman
fa5d4ecf86 Add foundation 47-sentry-metrics.py 2025-10-15 12:45:07 -04:00
Aleix Conchillo Flaqué
83b0dc39f7 BaseInputTransport: stop audio filter on cancel 2025-10-15 09:22:48 -07:00
Mark Backman
0c31b5ef19 Add aggregate_sentences arg to ElevenLabsHttpTTSService 2025-10-15 11:49:26 -04:00
dan-ince-aai
d16c36c56d feat: add keyterms_prompt to AssemblyAI service 2025-10-15 14:27:52 +01:00
Mark Backman
8fe3bcd484 Merge pull request #2840 from Rickaym/fix--excess-space-in-elevelabs-word-timestamp-joins
fix: handle ElevenLabs partial word concatenation across alignment chunks gracefully
2025-10-15 09:01:05 -04:00
Aleix Conchillo Flaqué
be2858bfbb CartesiaSTTService: inherit from WebsocketSTTService 2025-10-14 14:14:57 -07:00
Pyae Sone Myo
b6b0997553 fix: add support for partial words 2025-10-14 23:06:13 +06:30
Pyae Sone Myo
3b751322d3 fix: add interruption reset for partial word states 2025-10-14 23:04:09 +06:30
nbyers-altira
cc66ac14f1 add is_final to segmented func. sig. instead so tracing is consistent 2025-10-13 10:48:41 -04:00
nbyers-altira
9ddec0f8b4 is_final is not part of the segmented _handle_transcription function signature 2025-10-13 10:44:25 -04:00
shreyas-sarvam
9babfe9fd9 refactor: Improve code reability and replace deprecated interruption frames 2025-10-13 08:54:29 +05:30
Pyae Sone Myo
21d8d148b8 fix: handle partial words across alignment chunks gracefully 2025-10-12 22:10:11 +06:30
shreyas-sarvam
7c1e2793c5 feat: Add support for bulbul:v3 and bulbul:v3-beta 2025-10-09 18:26:22 +05:30
42 changed files with 6023 additions and 4578 deletions

View File

@@ -9,15 +9,78 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
-
- Added support for trickle ICE to the `SmallWebRTCTransport`.
- Added support for updating `OpenAITTSService` settings (`instructions` and
`speed`) at runtime via `TTSUpdateSettingsFrame`.
- Added `--whatsapp` flag to runner to better surface WhatsApp transport logs.
- Added `on_connected` and `on_disconnected` events to TTS and STT
websocket-based services.
- Added an `aggregate_sentences` arg in `ElevenLabsHttpTTSService`, where the
default value is True.
- Added a `room_properties` arg to the Daily runner's `configure()` method,
allowing `DailyRoomProperties` to be provided.
- The runner `--folder` argument now supports downloading files from
subdirectories.
### Changed
- `CartesiaSTTService` now inherits from `WebsocketSTTService`.
- Package upgrades:
- `openai` upgraded to support up to 2.x.x.
- `openpipe` upgraded to support up to 5.x.x.
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
### Fixed
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
to a mismatch in the _handle_transcription method's signature.
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
now handled properly in `PipelineTask` making it possible to cancel an asyncio
task that it's executing a `PipelineRunner` cleanly. Also,
`PipelineTask.cancel()` does not block anymore waiting for the `CancelFrame`
to reach the end of the pipeline (going back to the behavior in < 0.0.83).
- Fixed an issue in `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` where
the Flash models would split words, resulting in a space being inserted
between words.
- Fixed an issue where audio filters' `stop()` would not be called when using
`CancelFrame`.
- Fixed an issue in `ElevenLabsHttpTTSService`, where
`apply_text_normalization` was incorrectly set as a query parameter. It's now
being added as a request parameter.
- Fixed an issue where `RimeHttpTTSService` and `PiperTTSService` could generate
incorrectly 16-bit aligned audio frames, potentially leading to internal
errors or static audio.
- Fixed an issue in `SpeechmaticsSTTService` where `AdditionalVocabEntry` items
needed to have `sounds_like` for the session to start.
### Other
- Added foundational example `47-sentry-metrics.py`, demonstrating how to use the
`SentryMetrics` processor.
- Added foundational example `14x-function-calling-openpipe.py`.
## [0.0.90] - 2025-10-10
### Added

View File

@@ -63,24 +63,24 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
<br/>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/moondream-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/moondream-chatbot/image.png" width="400" /></a>
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/12-describe-video.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/assets/moondream.png" width="400" /></a>
</p>
## 🧩 Available services
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Category | Services |
| ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)

250
create_daily_room.py Executable file
View File

@@ -0,0 +1,250 @@
#!/usr/bin/env -S uv run
"""Utilities for creating Daily.co rooms with retry logic.
This module provides functions to create Daily rooms via REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
import time
from typing import Dict, Optional
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def periodic_progress_logger(
progress_dict: Dict[str, int],
total: int,
interval_seconds: float = 5.0,
stop_event: Optional[asyncio.Event] = None,
):
"""Log progress periodically in the background.
Args:
progress_dict: Shared dict with 'completed' and 'failed' counts
total: Total number of items being processed
interval_seconds: How often to log progress (default 5 seconds)
stop_event: Event to signal when to stop logging
"""
if stop_event is None:
stop_event = asyncio.Event()
while not stop_event.is_set():
await asyncio.sleep(interval_seconds)
if stop_event.is_set():
break
total_processed = progress_dict["completed"] + progress_dict["failed"]
if total_processed > 0:
percentage = (total_processed / total) * 100
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
logger.info(
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
f"{progress_dict['completed']} succeeded, "
f"{progress_dict['failed']} failed"
)
async def create_daily_room(
name: Optional[str] = None,
privacy: str = "public",
exp_minutes: int = 10,
max_retries: int = 5,
) -> Optional[Dict]:
"""Create a Daily room with automatic retry on rate limit errors.
Uses tenacity library to handle rate limiting (429 errors) with
exponential backoff and automatic retries.
Args:
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
privacy: Room privacy setting ("public" or "private")
exp_minutes: Minutes until room expires (default 10)
max_retries: Maximum number of retry attempts on rate limit (default 5)
Returns:
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
"""
# Calculate expiration timestamp (unix timestamp in seconds)
exp_timestamp = int(time.time()) + (exp_minutes * 60)
# Build request body
body = {
"privacy": privacy,
"properties": {
"exp": exp_timestamp,
},
}
if name:
body["name"] = name
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type(HTTPStatusError),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
async with AsyncClient(timeout=30) as client:
response = await client.post(
url="https://api.daily.co/v1/rooms",
headers=headers,
json=body,
)
response.raise_for_status()
return response.json()
# This line should never be reached due to reraise=True, but satisfies type checker
return None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
return None
except Exception as e:
logger.exception(f"Unexpected error creating room: {e}")
return None
async def create_room_with_progress(
index: int, total: int, progress_dict: Dict[str, int], **kwargs
) -> Optional[Dict]:
"""Wrapper for create_daily_room that tracks progress.
Args:
index: Index of this room creation (0-based)
total: Total number of rooms being created
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
**kwargs: Arguments passed to create_daily_room
Returns:
Room object dict or None
"""
result = await create_daily_room(**kwargs)
# Update progress
if result is not None:
progress_dict["completed"] += 1
else:
progress_dict["failed"] += 1
return result
async def test_create_rooms(
num_rooms: int = 1000,
progress_interval: float = 5.0,
) -> Dict[str, int | float]:
"""Attempt to create multiple Daily rooms concurrently.
This function demonstrates concurrent room creation and tracks
success/failure statistics. Rate limiting will likely occur when
creating many rooms quickly.
Args:
num_rooms: Number of rooms to attempt to create (default 1000)
progress_interval: How often to log progress in seconds (default 5.0)
Returns:
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
"""
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
start_time = time.time()
# Shared progress tracking dictionary
progress_dict = {"completed": 0, "failed": 0}
# Start background progress logger
stop_event = asyncio.Event()
progress_task = asyncio.create_task(
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
)
# Create and execute all tasks concurrently
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
tasks = [
create_room_with_progress(
index=i,
total=num_rooms,
progress_dict=progress_dict,
name=None, # Auto-generate names
privacy="public",
exp_minutes=10,
max_retries=5,
)
for i in range(num_rooms)
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
finally:
# Stop the progress logger
stop_event.set()
await progress_task
# Count successes and failures
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
failed_count = num_rooms - success_count
elapsed_time = time.time() - start_time
# Log statistics
logger.info("=" * 60)
logger.info("BULK ROOM CREATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms attempted: {num_rooms}")
logger.info(f"Successfully created: {success_count}")
logger.info(f"Failed to create: {failed_count}")
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
logger.info(f"Total time: {elapsed_time:.2f} seconds")
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
logger.info("=" * 60)
return {
"success": success_count,
"failed": failed_count,
"total": num_rooms,
"elapsed_seconds": elapsed_time,
}
# Example usage
async def main():
"""Example usage of the room creation functions."""
# Test creating a single room
logger.info("Testing single room creation...")
room = await create_daily_room(exp_minutes=10)
if room:
logger.info(f"Created room: {room['name']} at {room['url']}")
else:
logger.error("Failed to create room")
# Uncomment to test bulk creation (warning: may hit rate limits!)
logger.info("\nTesting bulk room creation...")
stats = await test_create_rooms(num_rooms=1000)
logger.info(f"Final stats: {stats}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -21,8 +21,8 @@ from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.stt import CartesiaSTTService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -58,7 +58,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -48,10 +48,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = CartesiaSTTService(
api_key=os.getenv("CARTESIA_API_KEY"),
base_url=os.getenv("CARTESIA_BASE_URL"),
)
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
tl = TranscriptionLogger()

View File

@@ -0,0 +1,182 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import time
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openpipe.llm import OpenPipeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
timestamp = int(time.time())
llm = OpenPipeLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
tags={"conversation_id": f"pipecat-{timestamp}"},
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,142 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import sentry_sdk
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.metrics.sentry import SentryMetrics
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Initialize Sentry
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
traces_sample_rate=1.0,
)
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
metrics=SentryMetrics(),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
metrics=SentryMetrics(),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
metrics=SentryMetrics(),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

267
fetch_s3_recording.py Executable file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_recordings(limit: int = 100) -> list[str]:
"""Get list of recent recording IDs from Daily API.
Args:
limit: Maximum number of recordings to retrieve (default 100)
Returns:
List of recording IDs (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url="https://api.daily.co/v1/recordings",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
recordings = data.get("data", [])
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
return recording_ids
except Exception as e:
logger.exception(f"Failed to get recordings from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent recordings."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 recordings
logger.info("Fetching recent recordings...")
recording_ids = await get_recent_recordings(limit=100)
if not recording_ids:
logger.error("No recordings found. Cannot proceed with test.")
return
logger.info(f"Found {len(recording_ids)} recordings to fetch")
# Fetch access links for each recording concurrently
logger.info(
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
)
# Create tasks for all recordings
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Get download link for a specific recording ID."""
try:
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None)
return (recording_url, recording_url)
except Exception as e:
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
return (None, None)
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
)
logger.debug(f" URL: {recording_url}")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(recording_ids)}] No link for {recording_id}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total recordings checked: {len(recording_ids)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

238
fetch_s3_recording_by_room.py Executable file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_rooms(limit: int = 100) -> list[str]:
"""Get list of recent room names from Daily API.
Args:
limit: Maximum number of rooms to retrieve (default 100, max 100)
Returns:
List of room names (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
rooms = data.get("data", [])
room_names = [room.get("name") for room in rooms if room.get("name")]
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
return room_names
except Exception as e:
logger.exception(f"Failed to get rooms from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent rooms."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 rooms
logger.info("Fetching recent rooms...")
room_names = await get_recent_rooms(limit=100)
if not room_names:
logger.error("No rooms found. Cannot proceed with test.")
return
logger.info(f"Found {len(room_names)} rooms to check for recordings")
# Call get_recording_s3_url_with_retry on each room concurrently
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
# Create tasks for all rooms
tasks = [
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
for room_name in room_names
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
logger.debug(f" URL: {recording_url[:80]}...")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(room_names)}] No recording for {room_name}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms checked: {len(room_names)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -34,7 +34,7 @@ dependencies = [
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0",
"openai>=1.74.0,<=1.99.1",
"openai>=1.74.0,<3",
# Pinning numba to resolve package dependencies
"numba==0.61.2",
"wait_for2>=0.4.1; python_version<'3.12'",
@@ -84,7 +84,7 @@ nim = []
neuphonic = [ "pipecat-ai[websockets-base]" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "pipecat-ai[websockets-base]" ]
openpipe = [ "openpipe~=4.50.0" ]
openpipe = [ "openpipe>=4.50.0,<6" ]
openrouter = []
perplexity = []
playht = [ "pipecat-ai[websockets-base]" ]
@@ -102,7 +102,7 @@ silero = [ "onnxruntime>=1.20.1,<2" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "pipecat-ai[websockets-base]" ]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.4.0" ]
speechmatics = [ "speechmatics-rt>=0.5.0" ]
strands = [ "strands-agents>=1.9.1,<2" ]
tavus=[]
together = []

View File

@@ -136,6 +136,7 @@ TESTS_14 = [
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14x-function-calling-openpipe.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# Currently not working.
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),

View File

@@ -70,11 +70,15 @@ class PipelineRunner(BaseObject):
"""
logger.debug(f"Runner {self} started running {task}")
self._tasks[task.name] = task
params = PipelineTaskParams(loop=self._loop)
# PipelineTask handles asyncio.CancelledError to shutdown the pipeline
# properly and re-raises it in case there's more cleanup to do.
try:
params = PipelineTaskParams(loop=self._loop)
await task.run(params)
except asyncio.CancelledError:
await self._cancel()
pass
del self._tasks[task.name]
# Cleanup base object.

View File

@@ -269,6 +269,9 @@ class PipelineTask(BasePipelineTask):
# StopFrame) has been received at the end of the pipeline.
self._pipeline_end_event = asyncio.Event()
# This event is set when the pipeline truly finishes.
self._pipeline_finished_event = asyncio.Event()
# This is the final pipeline. It is composed of a source processor,
# followed by the user pipeline, and ending with a sink processor. The
# source allows us to receive and react to upstream frames, and the sink
@@ -401,11 +404,7 @@ class PipelineTask(BasePipelineTask):
await self.queue_frame(EndFrame())
async def cancel(self):
"""Immediately stop the running pipeline.
Cancels all running tasks and stops frame processing without
waiting for completion.
"""
"""Request the running pipeline to cancel."""
if not self._finished:
await self._cancel()
@@ -417,51 +416,38 @@ class PipelineTask(BasePipelineTask):
"""
if self.has_finished():
return
cleanup_pipeline = True
# Setup processors.
await self._setup(params)
# Create all main tasks and wait for the main push task. This is the
# task that pushes frames to the very beginning of our pipeline (i.e. to
# our controlled source processor).
await self._create_tasks()
try:
# Setup processors.
await self._setup(params)
# Create all main tasks and wait of the main push task. This is the
# task that pushes frames to the very beginning of our pipeline (our
# controlled source processor).
push_task = await self._create_tasks()
await push_task
# We have already cleaned up the pipeline inside the task.
cleanup_pipeline = False
# Pipeline has finished nicely.
self._finished = True
# Wait for pipeline to finish.
await self._wait_for_pipeline_finished()
except asyncio.CancelledError:
# Raise exception back to the pipeline runner so it can cancel this
# task properly.
logger.debug(f"Pipeline task {self} got cancelled from outside...")
# We have been cancelled from outside, let's just cancel everything.
await self._cancel()
# Wait again for pipeline to finish. This time we have really
# cancelled, so it should really finish.
await self._wait_for_pipeline_finished()
# Re-raise in case there's more cleanup to do.
raise
finally:
# We can reach this point for different reasons:
#
# 1. The task has finished properly (e.g. `EndFrame`).
# 2. By calling `PipelineTask.cancel()`.
# 3. By asyncio task cancellation.
#
# Case (1) will execute the code below without issues because
# `self._finished` is true.
#
# Case (2) will execute the code below without issues because
# `self._cancelled` is true.
#
# Case (3) will raise the exception above (because we are cancelling
# the asyncio task). This will be then captured by the
# `PipelineRunner` which will call `PipelineTask.cancel()` and
# therefore becoming case (2).
if self._finished or self._cancelled:
logger.debug(f"Pipeline task {self} is finishing cleanup...")
await self._cancel_tasks()
await self._cleanup(cleanup_pipeline)
if self._check_dangling_tasks:
self._print_dangling_tasks()
self._finished = True
logger.debug(f"Pipeline task {self} has finished")
# 1. The pipeline task has finished (try case).
# 2. By an asyncio task cancellation (except case).
logger.debug(f"Pipeline task {self} is finishing...")
await self._cancel_tasks()
if self._check_dangling_tasks:
self._print_dangling_tasks()
self._finished = True
logger.debug(f"Pipeline task {self} has finished")
async def queue_frame(self, frame: Frame):
"""Queue a single frame to be pushed down the pipeline.
@@ -489,19 +475,7 @@ class PipelineTask(BasePipelineTask):
if not self._cancelled:
logger.debug(f"Cancelling pipeline task {self}")
self._cancelled = True
cancel_frame = CancelFrame()
# Make sure everything is cleaned up downstream. This is sent
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._pipeline.queue_frame(cancel_frame)
# Wait for CancelFrame to make it through the pipeline.
await self._wait_for_pipeline_end(cancel_frame)
# Only cancel the push task, we don't want to be able to process any
# other frame after cancel. Everything else will be cancelled in
# run().
if self._process_push_task:
await self._task_manager.cancel_task(self._process_push_task)
self._process_push_task = None
await self.queue_frame(CancelFrame())
async def _create_tasks(self):
"""Create and start all pipeline processing tasks."""
@@ -603,6 +577,17 @@ class PipelineTask(BasePipelineTask):
self._pipeline_end_event.clear()
# We are really done.
self._pipeline_finished_event.set()
async def _wait_for_pipeline_finished(self):
await self._pipeline_finished_event.wait()
self._pipeline_finished_event.clear()
# Make sure we wait for the main task to complete.
if self._process_push_task:
await self._process_push_task
self._process_push_task = None
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
mgr_params = TaskManagerParams(loop=params.loop)

View File

@@ -82,6 +82,7 @@ async def configure(
sip_enable_video: Optional[bool] = False,
sip_num_endpoints: Optional[int] = 1,
sip_codecs: Optional[Dict[str, List[str]]] = None,
room_properties: Optional[DailyRoomProperties] = None,
) -> DailyRoomConfig:
"""Configure Daily room URL and token with optional SIP capabilities.
@@ -99,6 +100,10 @@ async def configure(
sip_num_endpoints: Number of allowed SIP endpoints.
sip_codecs: Codecs to support for audio and video. If None, uses Daily defaults.
Example: {"audio": ["OPUS"], "video": ["H264"]}
room_properties: Optional DailyRoomProperties to use instead of building from
individual parameters. When provided, this overrides room_exp_duration and
SIP-related parameters. If not provided, properties are built from the
individual parameters as before.
Returns:
DailyRoomConfig: Object with room_url, token, and optional sip_endpoint.
@@ -115,6 +120,13 @@ async def configure(
# SIP-enabled room
sip_config = await configure(session, sip_caller_phone="+15551234567")
print(f"SIP endpoint: {sip_config.sip_endpoint}")
# Custom room properties with recording enabled
custom_props = DailyRoomProperties(
enable_recording="cloud",
max_participants=2,
)
config = await configure(session, room_properties=custom_props)
"""
# Check for required API key
api_key = os.getenv("DAILY_API_KEY")
@@ -124,9 +136,32 @@ async def configure(
"Get your API key from https://dashboard.daily.co/developers"
)
# Warn if both room_properties and individual parameters are provided
if room_properties is not None:
individual_params_provided = any(
[
room_exp_duration != 2.0,
token_exp_duration != 2.0,
sip_caller_phone is not None,
sip_enable_video is not False,
sip_num_endpoints != 1,
sip_codecs is not None,
]
)
if individual_params_provided:
logger.warning(
"Both room_properties and individual parameters (room_exp_duration, token_exp_duration, "
"sip_*) were provided. The room_properties will be used and individual parameters "
"will be ignored."
)
# Determine if SIP mode is enabled
sip_enabled = sip_caller_phone is not None
# If room_properties is provided, check if it has SIP configuration
if room_properties and room_properties.sip:
sip_enabled = True
daily_rest_helper = DailyRESTHelper(
daily_api_key=api_key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
@@ -150,27 +185,29 @@ async def configure(
room_name = f"{room_prefix}-{uuid.uuid4().hex[:8]}"
logger.info(f"Creating new Daily room: {room_name}")
# Calculate expiration time
expiration_time = time.time() + (room_exp_duration * 60 * 60)
# Use provided room_properties or build from parameters
if room_properties is None:
# Calculate expiration time
expiration_time = time.time() + (room_exp_duration * 60 * 60)
# Create room properties
room_properties = DailyRoomProperties(
exp=expiration_time,
eject_at_room_exp=True,
)
# Add SIP configuration if enabled
if sip_enabled:
sip_params = DailyRoomSipParams(
display_name=sip_caller_phone,
video=sip_enable_video,
sip_mode="dial-in",
num_endpoints=sip_num_endpoints,
codecs=sip_codecs,
# Create room properties
room_properties = DailyRoomProperties(
exp=expiration_time,
eject_at_room_exp=True,
)
room_properties.sip = sip_params
room_properties.enable_dialout = True # Enable outbound calls if needed
room_properties.start_video_off = not sip_enable_video # Voice-only by default
# Add SIP configuration if enabled
if sip_enabled:
sip_params = DailyRoomSipParams(
display_name=sip_caller_phone,
video=sip_enable_video,
sip_mode="dial-in",
num_endpoints=sip_num_endpoints,
codecs=sip_codecs,
)
room_properties.sip = sip_params
room_properties.enable_dialout = True # Enable outbound calls if needed
room_properties.start_video_off = not sip_enable_video # Voice-only by default
# Create room parameters
room_params = DailyRoomParams(name=room_name, properties=room_properties)

View File

@@ -70,12 +70,14 @@ import asyncio
import mimetypes
import os
import sys
import uuid
from contextlib import asynccontextmanager
from http import HTTPMethod
from pathlib import Path
from typing import Optional
from typing import Any, Dict, List, Optional, TypedDict
import aiohttp
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, Response
from loguru import logger
from pipecat.runner.types import (
@@ -166,6 +168,7 @@ def _create_server_app(
host: str = "localhost",
proxy: str,
esp32_mode: bool = False,
whatsapp_enabled: bool = False,
folder: Optional[str] = None,
):
"""Create FastAPI app with transport-specific routes."""
@@ -182,7 +185,8 @@ def _create_server_app(
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host, folder=folder)
_setup_whatsapp_routes(app)
if whatsapp_enabled:
_setup_whatsapp_routes(app)
elif transport_type == "daily":
_setup_daily_routes(app)
elif transport_type in TELEPHONY_TRANSPORTS:
@@ -200,8 +204,10 @@ def _setup_webrtc_routes(
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
IceCandidate,
SmallWebRTCPatchRequest,
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
@@ -209,6 +215,16 @@ def _setup_webrtc_routes(
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
class IceConfig(TypedDict):
iceServers: List[IceServer]
class StartBotResult(TypedDict, total=False):
sessionId: str
iceConfig: Optional[IceConfig]
# In-memory store of active sessions: session_id -> session info
active_sessions: Dict[str, Dict[str, Any]] = {}
# Mount the frontend
app.mount("/client", SmallWebRTCPrebuiltUI)
@@ -254,6 +270,74 @@ def _setup_webrtc_routes(
)
return answer
@app.patch("/api/offer")
async def ice_candidate(request: SmallWebRTCPatchRequest):
"""Handle WebRTC new ice candidate requests."""
logger.debug(f"Received patch request: {request}")
await small_webrtc_handler.handle_patch_request(request)
return {"status": "success"}
@app.post("/start")
async def rtvi_start(request: Request):
"""Mimic Pipecat Cloud's /start endpoint."""
# Parse the request body
try:
request_data = await request.json()
logger.debug(f"Received request: {request_data}")
except Exception as e:
logger.error(f"Failed to parse request body: {e}")
request_data = {}
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
session_id = str(uuid.uuid4())
active_sessions[session_id] = request_data
result: StartBotResult = {"sessionId": session_id}
if request_data.get("enableDefaultIceServers"):
result["iceConfig"] = IceConfig(
iceServers=[IceServer(urls="stun:stun.l.google.com:19302")]
)
return result
@app.api_route(
"/sessions/{session_id}/{path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
)
async def proxy_request(
session_id: str, path: str, request: Request, background_tasks: BackgroundTasks
):
"""Mimic Pipecat Cloud's proxy."""
active_session = active_sessions.get(session_id)
if not active_session:
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
if path.endswith("api/offer"):
# Parse the request body and convert to SmallWebRTCRequest
try:
request_data = await request.json()
if request.method == HTTPMethod.POST.value:
webrtc_request = SmallWebRTCRequest(
sdp=request_data["sdp"],
type=request_data["type"],
pc_id=request_data.get("pc_id"),
restart_pc=request_data.get("restart_pc"),
request_data=request_data,
)
return await offer(webrtc_request, background_tasks)
elif request.method == HTTPMethod.PATCH.value:
patch_request = SmallWebRTCPatchRequest(
pc_id=request_data["pc_id"],
candidates=[IceCandidate(**c) for c in request_data.get("candidates", [])],
)
return await ice_candidate(patch_request)
except Exception as e:
logger.error(f"Failed to parse WebRTC request: {e}")
return Response(content="Invalid WebRTC request", status_code=400)
logger.info(f"Received request for path: {path}")
return Response(status_code=200)
@asynccontextmanager
async def smallwebrtc_lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
@@ -289,6 +373,29 @@ def _add_lifespan_to_app(app: FastAPI, new_lifespan):
def _setup_whatsapp_routes(app: FastAPI):
"""Set up WebRTC-specific routes."""
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
if not all(
[
WHATSAPP_APP_SECRET,
WHATSAPP_PHONE_NUMBER_ID,
WHATSAPP_TOKEN,
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
]
):
logger.error(
"""Missing required environment variables for WhatsApp transport:
WHATSAPP_APP_SECRET
WHATSAPP_PHONE_NUMBER_ID
WHATSAPP_TOKEN
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN
"""
)
return
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
@@ -300,24 +407,7 @@ def _setup_whatsapp_routes(app: FastAPI):
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.whatsapp.client import WhatsAppClient
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
if not all(
[
WHATSAPP_TOKEN,
WHATSAPP_PHONE_NUMBER_ID,
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
]
):
logger.debug(
"Missing required environment variables for WhatsApp transport. Keeping it disabled."
)
logger.error(f"WhatsApp transport dependencies not installed: {e}")
return
# Global WhatsApp client instance
@@ -487,8 +577,6 @@ def _setup_daily_routes(app: FastAPI):
else:
logger.debug("No body data provided in request")
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
@@ -576,8 +664,6 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
async def _run_daily_direct():
"""Run Daily bot with direct connection (no FastAPI server)."""
try:
import aiohttp
from pipecat.runner.daily import configure
except ImportError as e:
logger.error("Daily transport dependencies not installed.")
@@ -689,6 +775,12 @@ def main():
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
)
parser.add_argument(
"--whatsapp",
action="store_true",
default=False,
help="Ensure requried WhatsApp environment variables are present",
)
args = parser.parse_args()
@@ -731,10 +823,11 @@ def main():
print()
if args.esp32:
print(f"🚀 Bot ready! (ESP32 mode)")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
elif args.whatsapp:
print(f"🚀 Bot ready! (WhatsApp)")
else:
print(f"🚀 Bot ready!")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
print()
elif args.transport == "daily":
print()
@@ -752,6 +845,7 @@ def main():
host=args.host,
proxy=args.proxy,
esp32_mode=args.esp32,
whatsapp_enabled=args.whatsapp,
folder=args.folder,
)

View File

@@ -108,6 +108,8 @@ class AssemblyAIConnectionParams(BaseModel):
end_of_turn_confidence_threshold: Confidence threshold for end-of-turn detection.
min_end_of_turn_silence_when_confident: Minimum silence duration when confident about end-of-turn.
max_turn_silence: Maximum silence duration before forcing end-of-turn.
keyterms_prompt: List of key terms to guide transcription. Will be JSON serialized before sending.
speech_model: Select between English and multilingual models. Defaults to "universal-streaming-english".
"""
sample_rate: int = 16000
@@ -117,3 +119,7 @@ class AssemblyAIConnectionParams(BaseModel):
end_of_turn_confidence_threshold: Optional[float] = None
min_end_of_turn_silence_when_confident: Optional[int] = None
max_turn_silence: Optional[int] = None
keyterms_prompt: Optional[List[str]] = None
speech_model: Literal["universal-streaming-english", "universal-streaming-multilingual"] = (
"universal-streaming-english"
)

View File

@@ -174,11 +174,16 @@ class AssemblyAISTTService(STTService):
def _build_ws_url(self) -> str:
"""Build WebSocket URL with query parameters using urllib.parse.urlencode."""
params = {
k: str(v).lower() if isinstance(v, bool) else v
for k, v in self._connection_params.model_dump().items()
if v is not None
}
params = {}
for k, v in self._connection_params.model_dump().items():
if v is not None:
if k == "keyterms_prompt":
params[k] = json.dumps(v)
elif isinstance(v, bool):
params[k] = str(v).lower()
else:
params[k] = v
if params:
query_string = urlencode(params)
return f"{self._api_endpoint_base_url}?{query_string}"
@@ -197,6 +202,8 @@ class AssemblyAISTTService(STTService):
)
self._connected = True
self._receive_task = self.create_task(self._receive_task_handler())
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Failed to connect to AssemblyAI: {e}")
self._connected = False
@@ -238,6 +245,7 @@ class AssemblyAISTTService(STTService):
self._websocket = None
self._connected = False
self._receive_task = None
await self._call_event_handler("on_disconnected")
async def _receive_task_handler(self):
"""Handle incoming WebSocket messages."""

View File

@@ -235,6 +235,8 @@ class AsyncAITTSService(InterruptibleTTSService):
}
await self._get_websocket().send(json.dumps(init_msg))
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -252,6 +254,7 @@ class AsyncAITTSService(InterruptibleTTSService):
finally:
self._websocket = None
self._started = False
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:

View File

@@ -286,6 +286,7 @@ class AWSTranscribeSTTService(STTService):
logger.info(f"{self} Successfully connected to AWS Transcribe")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Failed to connect to AWS Transcribe: {e}")
await self._disconnect()
@@ -310,6 +311,7 @@ class AWSTranscribeSTTService(STTService):
logger.warning(f"{self} Error closing WebSocket connection: {e}")
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
def language_to_service_language(self, language: Language) -> str | None:
"""Convert internal language enum to AWS Transcribe language code.

View File

@@ -28,13 +28,12 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import STTService
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
@@ -124,7 +123,7 @@ class CartesiaLiveOptions:
return cls(**json.loads(json_str))
class CartesiaSTTService(STTService):
class CartesiaSTTService(WebsocketSTTService):
"""Speech-to-text service using Cartesia Live API.
Provides real-time speech transcription through WebSocket connection
@@ -176,8 +175,7 @@ class CartesiaSTTService(STTService):
self.set_model_name(merged_options.model)
self._api_key = api_key
self._base_url = base_url or "api.cartesia.ai"
self._connection = None
self._receiver_task = None
self._receive_task = None
def can_generate_metrics(self) -> bool:
"""Check if the service can generate processing metrics.
@@ -214,6 +212,27 @@ class CartesiaSTTService(STTService):
await super().cancel(frame)
await self._disconnect()
async def start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
Args:
frame: The frame to process.
direction: Direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send("finalize")
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process audio data for speech-to-text transcription.
@@ -224,45 +243,71 @@ class CartesiaSTTService(STTService):
None - transcription results are handled via WebSocket responses.
"""
# If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again
if not self._connection or self._connection.state is State.CLOSED:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
await self._connection.send(audio)
await self._websocket.send(audio)
yield None
async def _connect(self):
params = self._settings.to_dict()
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
logger.debug(f"Connecting to Cartesia: {ws_url}")
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = asyncio.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
self._connection = await websocket_connect(ws_url, additional_headers=headers)
# Setup the receiver task to handle the incoming messages from the Cartesia server
if self._receiver_task is None or self._receiver_task.done():
self._receiver_task = asyncio.create_task(self._receive_messages())
logger.debug(f"Connected to Cartesia")
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia STT")
params = self._settings.to_dict()
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self}: unable to connect to Cartesia: {e}")
async def _receive_messages(self):
async def _disconnect_websocket(self):
try:
while True:
if not self._connection or self._connection.state is State.CLOSED:
break
message = await self._connection.recv()
try:
data = json.loads(message)
await self._process_response(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
except asyncio.CancelledError:
pass
except websockets.exceptions.ConnectionClosed as e:
logger.debug(f"WebSocket connection closed: {e}")
if self._websocket and self._websocket.state is State.OPEN:
logger.debug("Disconnecting from Cartesia STT")
await self._websocket.close()
except Exception as e:
logger.error(f"Error in message receiver: {e}")
logger.error(f"{self} error closing websocket: {e}")
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _process_messages(self):
async for message in self._get_websocket():
try:
data = json.loads(message)
await self._process_response(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
async def _receive_messages(self):
while True:
await self._process_messages()
# Cartesia times out after 5 minutes of innactivity (no keepalive
# mechanism is available). So, we try to reconnect.
logger.debug(f"{self} Cartesia connection was disconnected (timeout?), reconnecting")
await self._connect_websocket()
async def _process_response(self, data):
if "type" in data:
@@ -316,41 +361,3 @@ class CartesiaSTTService(STTService):
language,
)
)
async def _disconnect(self):
if self._receiver_task:
self._receiver_task.cancel()
try:
await self._receiver_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"Unexpected exception while cancelling task: {e}")
self._receiver_task = None
if self._connection and self._connection.state is State.OPEN:
logger.debug("Disconnecting from Cartesia")
await self._connection.close()
self._connection = None
async def start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
Args:
frame: The frame to process.
direction: Direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._connection and self._connection.state is State.OPEN:
await self._connection.send("finalize")

View File

@@ -344,10 +344,11 @@ class CartesiaTTSService(AudioContextWordTTSService):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia")
logger.debug("Connecting to Cartesia TTS")
self._websocket = await websocket_connect(
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -365,6 +366,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
finally:
self._context_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:

View File

@@ -205,6 +205,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
additional_headers={"Authorization": f"Token {self._api_key}"},
)
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -225,6 +226,9 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _send_close_stream(self) -> None:
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.

View File

@@ -168,16 +168,24 @@ def build_elevenlabs_voice_settings(
def calculate_word_times(
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
alignment_info: Mapping[str, Any],
cumulative_time: float,
partial_word: str = "",
partial_word_start_time: float = 0.0,
) -> tuple[List[Tuple[str, float]], str, float]:
"""Calculate word timestamps from character alignment information.
Args:
alignment_info: Character alignment data from ElevenLabs API.
cumulative_time: Base time offset for this chunk.
partial_word: Partial word carried over from previous chunk.
partial_word_start_time: Start time of the partial word.
Returns:
List of (word, timestamp) tuples.
Tuple of (word_times, new_partial_word, new_partial_word_start_time):
- word_times: List of (word, timestamp) tuples for complete words
- new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space)
- new_partial_word_start_time: Start time of the incomplete word
"""
chars = alignment_info["chars"]
char_start_times_ms = alignment_info["charStartTimesMs"]
@@ -186,41 +194,37 @@ def calculate_word_times(
logger.error(
f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}"
)
return []
return ([], partial_word, partial_word_start_time)
# Build words and track their start positions
words = []
word_start_indices = []
current_word = ""
word_start_index = None
word_start_times = []
current_word = partial_word # Start with any partial word from previous chunk
word_start_time = partial_word_start_time if partial_word else None
for i, char in enumerate(chars):
if char == " ":
# End of current word
if current_word: # Only add non-empty words
words.append(current_word)
word_start_indices.append(word_start_index)
word_start_times.append(word_start_time)
current_word = ""
word_start_index = None
word_start_time = None
else:
# Building a word
if word_start_index is None: # First character of new word
word_start_index = i
if word_start_time is None: # First character of new word
# Convert from milliseconds to seconds and add cumulative offset
word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0)
current_word += char
# Handle the last word if there's no trailing space
if current_word and word_start_index is not None:
words.append(current_word)
word_start_indices.append(word_start_index)
# Build result for complete words
word_times = list(zip(words, word_start_times))
# Calculate timestamps for each word
word_times = []
for word, start_idx in zip(words, word_start_indices):
# Convert from milliseconds to seconds and add cumulative offset
start_time_seconds = cumulative_time + (char_start_times_ms[start_idx] / 1000.0)
word_times.append((word, start_time_seconds))
# Return any incomplete word at the end of this chunk
new_partial_word = current_word if current_word else ""
new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0
return word_times
return (word_times, new_partial_word, new_partial_word_start_time)
class ElevenLabsTTSService(AudioContextWordTTSService):
@@ -332,6 +336,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
# there's an interruption or TTSStoppedFrame.
self._started = False
self._cumulative_time = 0
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
# Context management for v1 multi API
self._context_id = None
@@ -521,6 +528,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -543,6 +551,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self._started = False
self._context_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
@@ -570,6 +579,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
logger.error(f"Error closing context on interruption: {e}")
self._context_id = None
self._started = False
self._partial_word = ""
self._partial_word_start_time = 0.0
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
@@ -609,7 +620,14 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
if msg.get("alignment"):
alignment = msg["alignment"]
word_times = calculate_word_times(alignment, self._cumulative_time)
word_times, self._partial_word, self._partial_word_start_time = (
calculate_word_times(
alignment,
self._cumulative_time,
self._partial_word,
self._partial_word_start_time,
)
)
if word_times:
await self.add_word_timestamps(word_times)
@@ -683,6 +701,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
yield TTSStartedFrame()
self._started = True
self._cumulative_time = 0
self._partial_word = ""
self._partial_word_start_time = 0.0
# If a context ID does not exist, create a new one and
# register it. If an ID exists, that means the Pipeline is
# configured for allow_interruptions=False, so continue
@@ -756,6 +776,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
base_url: str = "https://api.elevenlabs.io",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
aggregate_sentences: Optional[bool] = True,
**kwargs,
):
"""Initialize the ElevenLabs HTTP TTS service.
@@ -768,10 +789,11 @@ class ElevenLabsHttpTTSService(WordTTSService):
base_url: Base URL for ElevenLabs HTTP API.
sample_rate: Audio sample rate. If None, uses default.
params: Additional input parameters for voice customization.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
**kwargs: Additional arguments passed to the parent service.
"""
super().__init__(
aggregate_sentences=True,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
push_stop_frames=True,
sample_rate=sample_rate,
@@ -809,6 +831,10 @@ class ElevenLabsHttpTTSService(WordTTSService):
# Store previous text for context within a turn
self._previous_text = ""
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert pipecat Language to ElevenLabs language code.
@@ -836,6 +862,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._cumulative_time = 0
self._started = False
self._previous_text = ""
self._partial_word = ""
self._partial_word_start_time = 0.0
logger.debug(f"{self}: Reset internal state")
async def start(self, frame: StartFrame):
@@ -870,11 +898,13 @@ class ElevenLabsHttpTTSService(WordTTSService):
def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> List[Tuple[str, float]]:
"""Calculate word timing from character alignment data.
This method handles partial words that may span across multiple alignment chunks.
Args:
alignment_info: Character timing data from ElevenLabs.
Returns:
List of (word, timestamp) pairs.
List of (word, timestamp) pairs for complete words in this chunk.
Example input data::
@@ -900,30 +930,28 @@ class ElevenLabsHttpTTSService(WordTTSService):
# Build the words and find their start times
words = []
word_start_times = []
current_word = ""
first_char_idx = -1
# Start with any partial word from previous chunk
current_word = self._partial_word
word_start_time = self._partial_word_start_time if self._partial_word else None
for i, char in enumerate(chars):
if char == " ":
if current_word: # Only add non-empty words
words.append(current_word)
# Use time of the first character of the word, offset by cumulative time
word_start_times.append(
self._cumulative_time + char_start_times[first_char_idx]
)
word_start_times.append(word_start_time)
current_word = ""
first_char_idx = -1
word_start_time = None
else:
if not current_word: # This is the first character of a new word
first_char_idx = i
if word_start_time is None: # First character of a new word
# Use time of the first character of the word, offset by cumulative time
word_start_time = self._cumulative_time + char_start_times[i]
current_word += char
# Don't forget the last word if there's no trailing space
if current_word and first_char_idx >= 0:
words.append(current_word)
word_start_times.append(self._cumulative_time + char_start_times[first_char_idx])
# Store any incomplete word at the end of this chunk
self._partial_word = current_word if current_word else ""
self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0
# Create word-time pairs
# Create word-time pairs for complete words only
word_times = list(zip(words, word_start_times))
return word_times
@@ -959,6 +987,9 @@ class ElevenLabsHttpTTSService(WordTTSService):
if self._voice_settings:
payload["voice_settings"] = self._voice_settings
if self._settings["apply_text_normalization"] is not None:
payload["apply_text_normalization"] = self._settings["apply_text_normalization"]
language = self._settings["language"]
if self._model_name in ELEVENLABS_MULTILINGUAL_MODELS and language:
payload["language_code"] = language
@@ -979,8 +1010,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
}
if self._settings["optimize_streaming_latency"] is not None:
params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"]
if self._settings["apply_text_normalization"] is not None:
params["apply_text_normalization"] = self._settings["apply_text_normalization"]
try:
await self.start_ttfb_metrics()
@@ -1041,6 +1070,14 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.error(f"Error processing response: {e}", exc_info=True)
continue
# After processing all chunks, emit any remaining partial word
# since this is the end of the utterance
if self._partial_word:
final_word_time = [(self._partial_word, self._partial_word_start_time)]
await self.add_word_timestamps(final_word_time)
self._partial_word = ""
self._partial_word_start_time = 0.0
# After processing all chunks, add the total utterance duration
# to the cumulative time to ensure next utterance starts after this one
if utterance_duration > 0:

View File

@@ -225,6 +225,8 @@ class FishAudioTTSService(InterruptibleTTSService):
start_message = {"event": "start", "request": {"text": "", **self._settings}}
await self._websocket.send(ormsgpack.packb(start_message))
logger.debug("Sent start message to Fish Audio")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Fish Audio initialization error: {e}")
self._websocket = None
@@ -245,6 +247,7 @@ class FishAudioTTSService(InterruptibleTTSService):
self._request_id = None
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
async def flush_audio(self):
"""Flush any buffered audio by sending a flush event to Fish Audio."""

View File

@@ -730,6 +730,8 @@ class GoogleSTTService(STTService):
self._request_queue = asyncio.Queue()
self._streaming_task = self.create_task(self._stream_audio())
await self._call_event_handler("on_connected")
async def _disconnect(self):
"""Clean up streaming recognition resources."""
if self._streaming_task:
@@ -737,6 +739,8 @@ class GoogleSTTService(STTService):
await self.cancel_task(self._streaming_task)
self._streaming_task = None
await self._call_event_handler("on_disconnected")
async def _request_generator(self):
"""Generates requests for the streaming recognize method."""
recognizer_path = f"projects/{self._project_id}/locations/{self._location}/recognizers/_"

View File

@@ -222,6 +222,7 @@ class LmntTTSService(InterruptibleTTSService):
# Send initialization message
await self._websocket.send(json.dumps(init_msg))
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -243,6 +244,7 @@ class LmntTTSService(InterruptibleTTSService):
finally:
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get the WebSocket connection if available."""

View File

@@ -293,6 +293,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
headers = {"x-api-key": self._api_key}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -311,6 +313,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
finally:
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _receive_messages(self):
"""Receive and process messages from Neuphonic WebSocket."""

View File

@@ -14,6 +14,7 @@ from typing import AsyncGenerator, Dict, Literal, Optional
from loguru import logger
from openai import AsyncOpenAI, BadRequestError
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
@@ -55,6 +56,17 @@ class OpenAITTSService(TTSService):
OPENAI_SAMPLE_RATE = 24000 # OpenAI TTS always outputs at 24kHz
class InputParams(BaseModel):
"""Input parameters for OpenAI TTS configuration.
Parameters:
instructions: Instructions to guide voice synthesis behavior.
speed: Voice speed control (0.25 to 4.0, default 1.0).
"""
instructions: Optional[str] = None
speed: Optional[float] = None
def __init__(
self,
*,
@@ -65,6 +77,7 @@ class OpenAITTSService(TTSService):
sample_rate: Optional[int] = None,
instructions: Optional[str] = None,
speed: Optional[float] = None,
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize OpenAI TTS service.
@@ -77,7 +90,11 @@ class OpenAITTSService(TTSService):
sample_rate: Output audio sample rate in Hz. If None, uses OpenAI's default 24kHz.
instructions: Optional instructions to guide voice synthesis behavior.
speed: Voice speed control (0.25 to 4.0, default 1.0).
params: Optional synthesis controls (acting instructions, speed, ...).
**kwargs: Additional keyword arguments passed to TTSService.
.. deprecated:: 0.0.91
The `instructions` and `speed` parameters are deprecated, use `InputParams` instead.
"""
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
logger.warning(
@@ -86,12 +103,26 @@ class OpenAITTSService(TTSService):
)
super().__init__(sample_rate=sample_rate, **kwargs)
self._speed = speed
self.set_model_name(model)
self.set_voice(voice)
self._instructions = instructions
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url)
if instructions or speed:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The `instructions` and `speed` parameters are deprecated, use `InputParams` instead.",
DeprecationWarning,
stacklevel=2,
)
self._settings = {
"instructions": params.instructions if params else instructions,
"speed": params.speed if params else speed,
}
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -144,11 +175,11 @@ class OpenAITTSService(TTSService):
"response_format": "pcm",
}
if self._instructions:
create_params["instructions"] = self._instructions
if self._settings["instructions"]:
create_params["instructions"] = self._settings["instructions"]
if self._speed:
create_params["speed"] = self._speed
if self._settings["speed"]:
create_params["speed"] = self._settings["speed"]
async with self._client.audio.speech.with_streaming_response.create(
**create_params

View File

@@ -269,6 +269,8 @@ class PlayHTTTSService(InterruptibleTTSService):
raise ValueError("WebSocket URL is not a string")
self._websocket = await websocket_connect(self._websocket_url)
await self._call_event_handler("on_connected")
except ValueError as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -291,6 +293,7 @@ class PlayHTTTSService(InterruptibleTTSService):
finally:
self._request_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _get_websocket_url(self):
"""Retrieve WebSocket URL from PlayHT API."""

View File

@@ -255,6 +255,8 @@ class RimeTTSService(AudioContextWordTTSService):
url = f"{self._url}?{params}"
headers = {"Authorization": f"Bearer {self._api_key}"}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -272,6 +274,7 @@ class RimeTTSService(AudioContextWordTTSService):
finally:
self._context_id = None
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active websocket connection or raise exception."""

View File

@@ -583,7 +583,9 @@ class RivaSegmentedSTTService(SegmentedSTTService):
self._config.language_code = self._language
@traced_stt
async def _handle_transcription(self, transcript: str, language: Optional[Language] = None):
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass

View File

@@ -76,17 +76,29 @@ class SarvamHttpTTSService(TTSService):
Example::
tts = SarvamTTSService(
tts = SarvamHttpTTSService(
api_key="your-api-key",
voice_id="anushka",
model="bulbul:v2",
aiohttp_session=session,
params=SarvamTTSService.InputParams(
params=SarvamHttpTTSService.InputParams(
language=Language.HI,
pitch=0.1,
pace=1.2
)
)
# For bulbul v3 beta with any speaker:
tts_v3 = SarvamHttpTTSService(
api_key="your-api-key",
voice_id="speaker_name",
model="bulbul:v3,
aiohttp_session=session,
params=SarvamHttpTTSService.InputParams(
language=Language.HI,
temperature=0.8
)
)
"""
class InputParams(BaseModel):
@@ -105,6 +117,14 @@ class SarvamHttpTTSService(TTSService):
pace: Optional[float] = Field(default=1.0, ge=0.3, le=3.0)
loudness: Optional[float] = Field(default=1.0, ge=0.1, le=3.0)
enable_preprocessing: Optional[bool] = False
temperature: Optional[float] = Field(
default=0.6,
ge=0.01,
le=1.0,
description="Controls the randomness of the output for bulbul v3 beta. "
"Lower values make the output more focused and deterministic, while "
"higher values make it more random. Range: 0.01 to 1.0. Default: 0.6.",
)
def __init__(
self,
@@ -124,7 +144,7 @@ class SarvamHttpTTSService(TTSService):
api_key: Sarvam AI API subscription key.
aiohttp_session: Shared aiohttp session for making requests.
voice_id: Speaker voice ID (e.g., "anushka", "meera"). Defaults to "anushka".
model: TTS model to use ("bulbul:v1" or "bulbul:v2"). Defaults to "bulbul:v2".
model: TTS model to use ("bulbul:v2" or "bulbul:v3-beta" or "bulbul:v3"). Defaults to "bulbul:v2".
base_url: Sarvam AI API base URL. Defaults to "https://api.sarvam.ai".
sample_rate: Audio sample rate in Hz (8000, 16000, 22050, 24000). If None, uses default.
params: Additional voice and preprocessing parameters. If None, uses defaults.
@@ -138,16 +158,32 @@ class SarvamHttpTTSService(TTSService):
self._base_url = base_url
self._session = aiohttp_session
# Build base settings common to all models
self._settings = {
"language": (
self.language_to_service_language(params.language) if params.language else "en-IN"
),
"pitch": params.pitch,
"pace": params.pace,
"loudness": params.loudness,
"enable_preprocessing": params.enable_preprocessing,
}
# Add model-specific parameters
if model in ("bulbul:v3-beta", "bulbul:v3"):
self._settings.update(
{
"temperature": getattr(params, "temperature", 0.6),
"model": model,
}
)
else:
self._settings.update(
{
"pitch": params.pitch,
"pace": params.pace,
"loudness": params.loudness,
"model": model,
}
)
self.set_model_name(model)
self.set_voice(voice_id)
@@ -275,6 +311,18 @@ class SarvamTTSService(InterruptibleTTSService):
pace=1.2
)
)
# For bulbul v3 beta with any speaker and temperature:
# Note: pace and loudness are not supported for bulbul v3 and bulbul v3 beta
tts_v3 = SarvamTTSService(
api_key="your-api-key",
voice_id="speaker_name",
model="bulbul:v3",
params=SarvamTTSService.InputParams(
language=Language.HI,
temperature=0.8
)
)
"""
class InputParams(BaseModel):
@@ -310,6 +358,14 @@ class SarvamTTSService(InterruptibleTTSService):
output_audio_codec: Optional[str] = "linear16"
output_audio_bitrate: Optional[str] = "128k"
language: Optional[Language] = Language.EN
temperature: Optional[float] = Field(
default=0.6,
ge=0.01,
le=1.0,
description="Controls the randomness of the output for bulbul v3 beta. "
"Lower values make the output more focused and deterministic, while "
"higher values make it more random. Range: 0.01 to 1.0. Default: 0.6.",
)
def __init__(
self,
@@ -329,6 +385,7 @@ class SarvamTTSService(InterruptibleTTSService):
Args:
api_key: Sarvam API key for authenticating TTS requests.
model: Identifier of the Sarvam speech model (default "bulbul:v2").
Supports "bulbul:v2", "bulbul:v3-beta" and "bulbul:v3".
voice_id: Voice identifier for synthesis (default "anushka").
url: WebSocket URL for connecting to the TTS backend (default production URL).
aiohttp_session: Optional shared aiohttp session. To maintain backward compatibility.
@@ -371,15 +428,12 @@ class SarvamTTSService(InterruptibleTTSService):
self._api_key = api_key
self.set_model_name(model)
self.set_voice(voice_id)
# Configuration parameters
# Build base settings common to all models
self._settings = {
"target_language_code": (
self.language_to_service_language(params.language) if params.language else "en-IN"
),
"pitch": params.pitch,
"pace": params.pace,
"speaker": voice_id,
"loudness": params.loudness,
"speech_sample_rate": 0,
"enable_preprocessing": params.enable_preprocessing,
"min_buffer_size": params.min_buffer_size,
@@ -387,6 +441,24 @@ class SarvamTTSService(InterruptibleTTSService):
"output_audio_codec": params.output_audio_codec,
"output_audio_bitrate": params.output_audio_bitrate,
}
# Add model-specific parameters
if model in ("bulbul:v3-beta", "bulbul:v3"):
self._settings.update(
{
"temperature": getattr(params, "temperature", 0.6),
"model": model,
}
)
else:
self._settings.update(
{
"pitch": params.pitch,
"pace": params.pace,
"loudness": params.loudness,
"model": model,
}
)
self._started = False
self._receive_task = None
@@ -525,6 +597,7 @@ class SarvamTTSService(InterruptibleTTSService):
logger.debug("Connected to Sarvam TTS Websocket")
await self._send_config()
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -556,6 +629,10 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:

View File

@@ -577,6 +577,7 @@ class SpeechmaticsSTTService(STTService):
),
)
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Error connecting to Speechmatics: {e}")
self._client = None
@@ -595,6 +596,7 @@ class SpeechmaticsSTTService(STTService):
logger.error(f"{self} Error closing Speechmatics client: {e}")
finally:
self._client = None
await self._call_event_handler("on_disconnected")
def _process_config(self) -> None:
"""Create a formatted STT transcription config.
@@ -618,7 +620,7 @@ class SpeechmaticsSTTService(STTService):
transcription_config.additional_vocab = [
{
"content": e.content,
"sounds_like": e.sounds_like,
**({"sounds_like": e.sounds_like} if e.sounds_like else {}),
}
for e in self._params.additional_vocab
]

View File

@@ -35,6 +35,25 @@ class STTService(AIService):
Provides common functionality for STT services including audio passthrough,
muting, settings management, and audio processing. Subclasses must implement
the run_stt method to provide actual speech recognition.
Event handlers:
on_connected: Called when connected to the STT service.
on_connected: Called when disconnected from the STT service.
on_connection_error: Called when a connection to the STT service error occurs.
Example::
@stt.event_handler("on_connected")
async def on_connected(stt: STTService):
logger.debug(f"STT connected")
@stt.event_handler("on_disconnected")
async def on_disconnected(stt: STTService):
logger.debug(f"STT disconnected")
@stt.event_handler("on_connection_error")
async def on_connection_error(stt: STTService, error: str):
logger.error(f"STT connection error: {error}")
"""
def __init__(
@@ -62,6 +81,10 @@ class STTService(AIService):
self._muted: bool = False
self._user_id: str = ""
self._register_event_handler("on_connected")
self._register_event_handler("on_disconnected")
self._register_event_handler("on_connection_error")
@property
def is_muted(self) -> bool:
"""Check if the STT service is currently muted.
@@ -292,15 +315,6 @@ class WebsocketSTTService(STTService, WebsocketService):
Combines STT functionality with websocket connectivity, providing automatic
error handling and reconnection capabilities.
Event handlers:
on_connection_error: Called when a websocket connection error occurs.
Example::
@stt.event_handler("on_connection_error")
async def on_connection_error(stt: STTService, error: str):
logger.error(f"STT connection error: {error}")
"""
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
@@ -312,7 +326,6 @@ class WebsocketSTTService(STTService, WebsocketService):
"""
STTService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)

View File

@@ -59,6 +59,25 @@ class TTSService(AIService):
Provides common functionality for TTS services including text aggregation,
filtering, audio generation, and frame management. Supports configurable
sentence aggregation, silence insertion, and frame processing control.
Event handlers:
on_connected: Called when connected to the STT service.
on_connected: Called when disconnected from the STT service.
on_connection_error: Called when a connection to the STT service error occurs.
Example::
@tts.event_handler("on_connected")
async def on_connected(tts: TTSService):
logger.debug(f"TTS connected")
@tts.event_handler("on_disconnected")
async def on_disconnected(tts: TTSService):
logger.debug(f"TTS disconnected")
@tts.event_handler("on_connection_error")
async def on_connection_error(stt: TTSService, error: str):
logger.error(f"TTS connection error: {error}")
"""
def __init__(
@@ -143,6 +162,10 @@ class TTSService(AIService):
self._processing_text: bool = False
self._register_event_handler("on_connected")
self._register_event_handler("on_disconnected")
self._register_event_handler("on_connection_error")
@property
def sample_rate(self) -> int:
"""Get the current sample rate for audio output.
@@ -626,7 +649,6 @@ class WebsocketTTSService(TTSService, WebsocketService):
"""
TTSService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
@@ -678,15 +700,6 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
"""Base class for websocket-based TTS services that support word timestamps.
Combines word timestamp functionality with websocket connectivity.
Event handlers:
on_connection_error: Called when a websocket connection error occurs.
Example::
@tts.event_handler("on_connection_error")
async def on_connection_error(tts: TTSService, error: str):
logger.error(f"TTS connection error: {error}")
"""
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
@@ -698,7 +711,6 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
"""
WordTTSService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)

View File

@@ -232,6 +232,9 @@ class BaseInputTransport(FrameProcessor):
"""
# Cancel and wait for the audio input task to finish.
await self._cancel_audio_task()
# Stop audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.stop()
async def set_transport_ready(self, frame: StartFrame):
"""Called when the transport is ready to stream.

View File

@@ -293,15 +293,15 @@ class BaseOutputTransport(FrameProcessor):
"""
await super().process_frame(frame, direction)
#
# System frames (like InterruptionFrame) are pushed immediately. Other
# frames require order so they are put in the sink queue.
#
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self.start(frame)
elif isinstance(frame, EndFrame):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self.push_frame(frame, direction)
@@ -314,21 +314,6 @@ class BaseOutputTransport(FrameProcessor):
await self.write_dtmf(frame)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames.
elif isinstance(frame, EndFrame):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, MixerControlFrame):
await self._handle_frame(frame)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
await self._handle_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
await self._handle_frame(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
await self._handle_frame(frame)
elif direction == FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
else:
@@ -410,6 +395,13 @@ class BaseOutputTransport(FrameProcessor):
# Indicates if the bot is currently speaking.
self._bot_speaking = False
# Last time a BotSpeakingFrame was pushed.
self._bot_speaking_frame_time = 0
# How often a BotSpeakingFrame should be pushed (value should be
# lower than the audio chunks).
self._bot_speaking_frame_period = 0.2
# Last time the bot actually spoke.
self._bot_speech_last_time = 0
self._audio_task: Optional[asyncio.Task] = None
self._video_task: Optional[asyncio.Task] = None
@@ -601,39 +593,71 @@ class BaseOutputTransport(FrameProcessor):
async def _bot_started_speaking(self):
"""Handle bot started speaking event."""
if not self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
)
if self._bot_speaking:
return
downstream_frame = BotStartedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
)
self._bot_speaking = True
downstream_frame = BotStartedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = True
async def _bot_stopped_speaking(self):
"""Handle bot stopped speaking event."""
if self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
)
if not self._bot_speaking:
return
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
)
self._bot_speaking = False
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def _bot_currently_speaking(self):
"""Handle bot speaking event."""
await self._bot_started_speaking()
diff_time = time.time() - self._bot_speaking_frame_time
if diff_time >= self._bot_speaking_frame_period:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking_frame_time = time.time()
self._bot_speech_last_time = time.time()
async def _maybe_bot_currently_speaking(self, frame: SpeechOutputAudioRawFrame):
if not is_silence(frame.audio):
await self._bot_currently_speaking()
else:
silence_duration = time.time() - self._bot_speech_last_time
if silence_duration > BOT_VAD_STOP_SECS:
await self._bot_stopped_speaking()
async def _handle_bot_speech(self, frame: Frame):
# TTS case.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_currently_speaking()
# Speech stream case.
elif isinstance(frame, SpeechOutputAudioRawFrame):
await self._maybe_bot_currently_speaking(frame)
async def _handle_frame(self, frame: Frame):
"""Handle various frame types with appropriate processing.
@@ -641,7 +665,9 @@ class BaseOutputTransport(FrameProcessor):
Args:
frame: The frame to handle.
"""
if isinstance(frame, OutputImageRawFrame):
if isinstance(frame, OutputAudioRawFrame):
await self._handle_bot_speech(frame)
elif isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
@@ -705,39 +731,7 @@ class BaseOutputTransport(FrameProcessor):
async def _audio_task_handler(self):
"""Main audio processing task handler."""
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
# every audio chunk.
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
bot_speaking_counter = 0
speech_last_speaking_time = 0
async for frame in self._next_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
is_speaking = False
if isinstance(frame, TTSAudioRawFrame):
is_speaking = True
elif isinstance(frame, SpeechOutputAudioRawFrame):
if not is_silence(frame.audio):
is_speaking = True
speech_last_speaking_time = time.time()
else:
silence_duration = time.time() - speech_last_speaking_time
if silence_duration > BOT_VAD_STOP_SECS:
await self._bot_stopped_speaking()
if is_speaking:
await self._bot_started_speaking()
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(
BotSpeakingFrame(), FrameDirection.UPSTREAM
)
bot_speaking_counter = 0
bot_speaking_counter += 1
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
break

View File

@@ -689,3 +689,8 @@ class SmallWebRTCConnection(BaseObject):
)()
if track:
track.set_enabled(signalling_message.enabled)
async def add_ice_candidate(self, candidate):
"""Handle incoming ICE candidates."""
logger.debug(f"Adding remote candidate: {candidate}")
await self.pc.addIceCandidate(candidate)

View File

@@ -14,6 +14,7 @@ from dataclasses import dataclass
from enum import Enum
from typing import Any, Awaitable, Callable, Dict, List, Optional
from aiortc.sdp import candidate_from_sdp
from fastapi import HTTPException
from loguru import logger
@@ -39,6 +40,34 @@ class SmallWebRTCRequest:
request_data: Optional[Any] = None
@dataclass
class IceCandidate:
"""The remote ice candidate object received from the peer connection.
Parameters:
candidate: The ice candidate patch SDP string (Session Description Protocol).
sdp_mid: The SDP mid for the candidate patch.
sdp_mline_index: The SDP mline index for the candidate patch.
"""
candidate: str
sdp_mid: str
sdp_mline_index: int
@dataclass
class SmallWebRTCPatchRequest:
"""Small WebRTC transport session arguments for the runner.
Parameters:
pc_id: Identifier for the peer connection.
candidates: A list of ICE candidate patches.
"""
pc_id: str
candidates: List[IceCandidate]
class ConnectionMode(Enum):
"""Enum defining the connection handling modes."""
@@ -197,6 +226,19 @@ class SmallWebRTCRequestHandler:
logger.debug(f"SmallWebRTC request details: {request}")
raise
async def handle_patch_request(self, request: SmallWebRTCPatchRequest):
"""Handle a SmallWebRTC patch candidate request."""
peer_connection = self._pcs_map.get(request.pc_id)
if not peer_connection:
raise HTTPException(status_code=404, detail="Peer connection not found")
for c in request.candidates:
candidate = candidate_from_sdp(c.candidate)
candidate.sdpMid = c.sdp_mid
candidate.sdpMLineIndex = c.sdp_mline_index
await peer_connection.add_ice_candidate(candidate)
async def close(self):
"""Clear the connection map."""
coros = [pc.disconnect() for pc in self._pcs_map.values()]

View File

@@ -254,7 +254,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
try:
await asyncio.wait_for(
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
timeout=1.0,
)
except asyncio.TimeoutError:
@@ -290,7 +290,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.queue_frame(TextFrame(text="Hello!"))
try:
await asyncio.wait_for(
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
timeout=1.0,
)
except asyncio.TimeoutError:
@@ -301,11 +301,8 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2)
try:
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert False
except asyncio.CancelledError:
assert True
# This shouldn't freeze, so nothing to check really.
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
async def test_no_idle_task(self):
identity = IdentityFilter()
@@ -313,7 +310,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
try:
await asyncio.wait_for(
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
timeout=0.3,
)
except asyncio.TimeoutError:
@@ -332,11 +329,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
),
idle_timeout_secs=0.3,
)
try:
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert False
except asyncio.CancelledError:
assert True
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
async def test_idle_task_event_handler_no_frames(self):
identity = IdentityFilter()
@@ -351,11 +344,8 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
idle_timeout = True
await task.cancel()
try:
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert False
except asyncio.CancelledError:
assert idle_timeout
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
assert idle_timeout
async def test_idle_task_event_handler_quiet_user(self):
identity = IdentityFilter()
@@ -416,12 +406,15 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
asyncio.create_task(delayed_frames()),
]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
diff_time = time.time() - start_time
self.assertGreater(diff_time, sleep_time_secs * 3)
# Wait for the pending tasks to complete.
await asyncio.gather(*pending)
async def test_task_cancel_timeout(self):
class CancelFilter(FrameProcessor):
def __init__(self, **kwargs):

8320
uv.lock generated

File diff suppressed because it is too large Load Diff