Compare commits

...

113 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
ab61d09ec1 Merge pull request #2502 from pipecat-ai/aleix/pipecat-0.0.81
update CHANGELOG for 0.0.81
2025-08-25 09:28:21 -07:00
Aleix Conchillo Flaqué
e4afc0a13c update CHANGELOG for 0.0.81 2025-08-25 08:22:28 -07:00
Mark Backman
dde3d2395b Merge pull request #2491 from pipecat-ai/mb/update-quickstart 2025-08-23 06:34:37 -07:00
Aleix Conchillo Flaqué
30b36c3d6e Merge pull request #2497 from pipecat-ai/aleix/pipeline-task-fix-cancellation
PipelineTask: handle cancellations gracefully
2025-08-22 22:37:12 -07:00
Mark Backman
de4dfc3ed4 Update deployment steps 2025-08-23 00:19:26 -04:00
Aleix Conchillo Flaqué
a0128516ff PipelineTask: handle cancellations gracefully 2025-08-22 19:04:31 -07:00
Aleix Conchillo Flaqué
db3b8c7325 Merge pull request #2496 from pipecat-ai/aleix/release-evals-always-provide-eval-prompt
scripts(evals): always require an eval prompt
2025-08-22 18:11:33 -07:00
Aleix Conchillo Flaqué
9273ec0f25 scripts(evals): always require an eval prompt 2025-08-22 16:57:47 -07:00
Mark Backman
8dfa1187be Merge pull request #2402 from pipecat-ai/mb/voicemail-detection
Add voicemail detection
2025-08-22 14:51:13 -07:00
Mark Backman
e17fd580c6 Update README 2025-08-22 15:56:56 -04:00
mattie ruth backman
3e3d50a855 Fix issue with request images from the camera introduced in smallwebrtctransport 2025-08-22 15:02:33 -04:00
Mark Backman
402661ae03 Prevent user speaking frames from entering the classifier branch after a conversation is detected 2025-08-22 14:09:45 -04:00
Mark Backman
69c6a95b8a Simplify frames in the NotifierGate 2025-08-22 14:09:45 -04:00
Mark Backman
4d49210a73 Rename system_prompt to custom_system_prompt; improve dev ex for classification prompt requirements 2025-08-22 14:09:45 -04:00
Aleix Conchillo Flaqué
5f8a22ef2f Merge pull request #2493 from pipecat-ai/aleix/runner-task-asyncio-cancellation
PipelineRunner/PipelineTask: fix asyncio task cancellation
2025-08-22 09:13:58 -07:00
Aleix Conchillo Flaqué
606ad0826a Merge pull request #2492 from pipecat-ai/aleix/wait-for-task-deprecated
FrameProcessor: wait_for_task is now deprecated
2025-08-22 09:13:34 -07:00
Mark Backman
57028255ee Update changelog, mention text LLMs only 2025-08-22 12:12:17 -04:00
Mark Backman
87ebbab758 Only set/clear voicemail_event when voicemail is detected 2025-08-22 12:12:17 -04:00
Mark Backman
bd401e8d6f Rename TTSBuffer to TTSGate 2025-08-22 12:12:17 -04:00
Mark Backman
f0dfab23e7 Cleanup 2025-08-22 12:12:17 -04:00
Mark Backman
fbc907c371 Change path to extensions 2025-08-22 12:12:17 -04:00
Mark Backman
40b5ef485d Add base NotifierGate class and ClassifierGate, ConversationGate subclasses 2025-08-22 12:12:17 -04:00
Mark Backman
b30af3e155 Tests specify USER_SPEAKS_FIRST or BOT_SPEAKS_FIRST 2025-08-22 12:12:17 -04:00
Mark Backman
446bb5cddf Refactor callback to event 2025-08-22 12:12:17 -04:00
Mark Backman
1c1ee94074 Add 44 to evals, update evals to support user speaking first 2025-08-22 12:12:17 -04:00
Mark Backman
ac30083b45 Add CHANGELOG entry 2025-08-22 12:12:17 -04:00
Mark Backman
ce579d4266 Make on_voicemail_detected callback required, cleanup logging 2025-08-22 12:12:17 -04:00
Mark Backman
5a07b30c7a Class name changes, add TTSStarted/StoppedFrame to the TTSBuffer 2025-08-22 12:12:17 -04:00
Mark Backman
9da33f3897 Handle multiple user inputs from the user when a voicemail is detected; add a configurable timeout to emitting the callback 2025-08-22 12:12:17 -04:00
Mark Backman
5ca82ec61e Final docstrings, comments, and cleanup 2025-08-22 12:12:17 -04:00
Mark Backman
0067c7df47 Add aggregation to classifier LLM output and validate prompt 2025-08-22 12:12:17 -04:00
Mark Backman
ab03db5b0c Updated prompt, add custom system_prompt input 2025-08-22 12:12:17 -04:00
Mark Backman
238d6bf9ab Add buffering logic 2025-08-22 12:12:17 -04:00
Mark Backman
90ae85bab2 More updates—added new voicemail module 2025-08-22 12:12:17 -04:00
Mark Backman
29e09b2053 POC demo in progress 2025-08-22 12:12:17 -04:00
mattie ruth backman
bad9977e8c PR feedback and more explicit about only supporting exporting 1 video 2025-08-22 11:24:22 -04:00
mattie ruth backman
b987579d54 update smallWebRTC screen support to support the utils format for listening to screenshares 2025-08-22 11:24:22 -04:00
mattie ruth backman
40f1f4ff11 Add support to smallWebRTCTransport for receiving screenshare videos 2025-08-22 11:24:22 -04:00
Aleix Conchillo Flaqué
a3ad31d0f6 README: recommended python version is 3.12 2025-08-21 23:50:00 -07:00
Aleix Conchillo Flaqué
8044c4170d PipelineRunner/PipelineTask: fix asyncio task cancellation 2025-08-21 23:50:00 -07:00
Aleix Conchillo Flaqué
bc51e7abc6 FrameProcessor: wait_for_task is now deprecated 2025-08-21 21:17:47 -07:00
Aleix Conchillo Flaqué
256ecf4d71 Merge pull request #2490 from pipecat-ai/aleix/speechmatics-exceptions
Speechmatics exception handling
2025-08-21 19:48:43 -07:00
Aleix Conchillo Flaqué
c16969c4f5 Merge pull request #2489 from pipecat-ai/aleix/daily-python-0.19.7
pyproject: update daily-python to 0.19.7
2025-08-21 19:48:31 -07:00
Mark Backman
8ef64d8c8d Update quickstart, make it deployable 2025-08-21 22:32:34 -04:00
Aleix Conchillo Flaqué
4947d08733 GladiaSTTService: update loggin levels 2025-08-21 18:42:23 -07:00
Aleix Conchillo Flaqué
b61846534d SpeechmaticsSTTService: improve exception handling and loggin 2025-08-21 18:42:23 -07:00
Aleix Conchillo Flaqué
8f01cd220a pyproject: update daily-python to 0.19.7 2025-08-21 18:40:01 -07:00
Aleix Conchillo Flaqué
3abaaf80e0 Merge pull request #2487 from pipecat-ai/aleix/watchdog-timers-removal
remove watchdog timers and specific asyncio implementations
2025-08-21 18:37:35 -07:00
Aleix Conchillo Flaqué
13890fa021 github(tests): use python 3.12 to run unit tests/coverage 2025-08-21 18:09:56 -07:00
Aleix Conchillo Flaqué
802af28888 update pytest-asyncio to 1.1.0 2025-08-21 18:09:56 -07:00
Aleix Conchillo Flaqué
24a628c85e remove watchdog timers and specific asyncio implementations
Watchdog timers have been removed. They were introduced in 0.0.72 to help
diagnose pipeline freezes. Unfortunately, they proved ineffective since they
required developers to use Pipecat-specific queues, iterators, and events to
correctly reset the timer, which limited their usefulness and added friction.
2025-08-21 18:09:56 -07:00
Mark Backman
ddab95835b Merge pull request #2474 from pipecat-ai/mb/add-frames-pipeline-idle
Add UserStarted/StoppedSpeakingFrames to idle_timeout_frames
2025-08-21 03:45:46 -07:00
Mark Backman
cb13f4b4cb Add user speaking and transcription frames to idle_timeout_frames 2025-08-21 06:43:10 -04:00
Aleix Conchillo Flaqué
4793277d34 Merge pull request #2480 from pipecat-ai/aleix/replace-asyncio-waitfor
replace asyncio.wait_for for wait_for2.wait_for
2025-08-20 17:43:32 -07:00
Aleix Conchillo Flaqué
28c729cc36 replace asyncio.wait_for for wait_for2.wait_for 2025-08-20 15:26:57 -07:00
Aleix Conchillo Flaqué
4d07c7b77c Merge pull request #2479 from pipecat-ai/aleix/simplify-dtmf-aggregator
DTMFAggregator: no need for interruption task
2025-08-20 15:15:35 -07:00
Aleix Conchillo Flaqué
4ff0567025 BaseObject: allow keyword arguments 2025-08-20 15:14:31 -07:00
Aleix Conchillo Flaqué
1377dec01b DTMFAggregator: no need for interruption task
Now that system frames are queued there's no need to have an additional task to
push a `BotInterruptionFrame`.
2025-08-20 14:35:04 -07:00
Aleix Conchillo Flaqué
42f4d73a63 Merge pull request #2478 from pipecat-ai/aleix/fix-wait-for2-import
timeout: fix wait_for2 import
2025-08-20 14:29:19 -07:00
Aleix Conchillo Flaqué
f1c1ebf852 timeout: fix wait_for2 import 2025-08-20 14:24:16 -07:00
Aleix Conchillo Flaqué
eb6d43f6cb Merge pull request #2476 from pipecat-ai/aleix/add-asyncio-timeout
implement custom asyncio.wait_for()
2025-08-20 14:20:22 -07:00
Aleix Conchillo Flaqué
f387776985 add custom asyncio.wait_for()
This patch uses `wait_for2` package to implement `asyncio.wait_for()` for
Python < 3.12.

In Python 3.12, `asyncio.wait_for()` is implemented in terms of
`asyncio.timeout()` which fixed a bunch of issues. However, this was never
backported (because of the lack of `async.timeout()`) and there are still many
remainig issues, specially in Python 3.10, in `async.wait_for()`.

See https://github.com/python/cpython/pull/98518
2025-08-20 14:09:05 -07:00
Aleix Conchillo Flaqué
5286591826 Merge pull request #2464 from pipecat-ai/aleix/frame-processor-updates
various frame processor updates
2025-08-20 10:11:49 -07:00
Aleix Conchillo Flaqué
6831e63ec9 PipelineTask: use PipelineSource/PipelineSink and remove tasks 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
12bcb7db64 ParallelPipeline: use PipelineSource/PipelineSink and remove tasks 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
1b48b1d860 Pipeline: allow passing user source and sink processors 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
d161e2767f FrameProcessor: allow pausing/resuming system frames 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
4e3af00b6d tests: try to use default SleepFrame time 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
4015aedb86 tests: fix unit tests 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
75a6ee839b BaseObserver: added new on_process_frame 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
13ce02c896 FrameProcessor: add new entry_processors() method 2025-08-20 10:08:54 -07:00
Aleix Conchillo Flaqué
2fd5885dc3 pipeline: implement processors property 2025-08-20 07:40:21 -07:00
Aleix Conchillo Flaqué
d743586bfb BasePipeline: move processors_with_metrics() to FrameProcessor 2025-08-20 07:40:21 -07:00
Aleix Conchillo Flaqué
8051017895 pipeline: wrap with pipelines, use direct mode and reduce tasks 2025-08-20 07:40:21 -07:00
Aleix Conchillo Flaqué
dc7bf98ce5 Pipeline: improve performance by using direct mode 2025-08-20 07:40:21 -07:00
Aleix Conchillo Flaqué
609a43a191 FrameProcessor: added processors/next/previous properties 2025-08-20 07:40:19 -07:00
Aleix Conchillo Flaqué
4fb04422d9 FrameProcessor: remove unused set_parent/get_parent 2025-08-20 07:40:02 -07:00
Mark Backman
2f74a7e674 Merge pull request #2469 from pipecat-ai/mb/11labs-text-normalization
Add apply_text_normalization to ElevenLabs TTS services
2025-08-19 18:21:33 -07:00
Mark Backman
5205f56087 Add apply_text_normalization to ElevenLabs TTS services 2025-08-19 21:19:00 -04:00
Mark Backman
694c792af3 Merge pull request #2470 from pipecat-ai/mb/11labs-settings-reconnect
Update ElevenLabsTTSService: update runtime configuration
2025-08-19 18:18:14 -07:00
Mark Backman
406e82a842 Merge pull request #2438 from pipecat-ai/mb/delete-old-docs
Remove stale docs
2025-08-19 12:22:54 -07:00
Mark Backman
837de5f893 Merge pull request #2468 from pipecat-ai/mb/fix-mistral-docs-errors
Fix Mistral docstrings build errors
2025-08-19 12:22:26 -07:00
Mark Backman
10b9b1da2f Merge pull request #2471 from pipecat-ai/mb/add-13j
Add foundational 13j for Azure STT
2025-08-19 12:10:03 -07:00
Mark Backman
7854a2ec83 Add foundational 13j for Azure STT 2025-08-19 14:36:31 -04:00
Mark Backman
ac7c69078f Merge pull request #2442 from pipecat-ai/mb/retry-completion
retry_on_timeout: Anthropic, AWS Bedrock
2025-08-19 11:23:43 -07:00
Mark Backman
c9b4356ea6 Update changelog 2025-08-19 14:21:18 -04:00
Mark Backman
b3e4421191 Add retry_on_timeout to AWSBedrockLLMService 2025-08-19 14:20:35 -04:00
Mark Backman
84058c3948 Add retry_on_timeout to AnthropicLLMService 2025-08-19 14:20:35 -04:00
Mark Backman
aebc781419 Update ElevenLabsTTSService to update when voice_settings change 2025-08-19 13:51:10 -04:00
Mark Backman
4160446f4c Update ElevenLabsTTSService: reconnect on model and language changes 2025-08-19 11:32:54 -04:00
Mark Backman
05a14af184 Fix Mistral docstrings build errors 2025-08-19 10:31:03 -04:00
Filipi da Silva Fuchter
89d2ef2bde Merge pull request #2465 from pipecat-ai/filipi/heygen_changing_log_level
Changing heygen log level to trace.
2025-08-19 07:50:11 -03:00
Filipi Fuchter
f550015efb Changing heygen log level to trace. 2025-08-18 18:00:25 -03:00
Mark Backman
8fa44863fb Merge pull request #2455 from pipecat-ai/vp-log-line
log: add Disconnected from ElevenLabs debug log
2025-08-15 14:12:28 -07:00
vipyne
088cb56922 log: add Disconnected from ElevenLabs debug log 2025-08-15 15:05:07 -05:00
Aleix Conchillo Flaqué
a789e5feea Merge pull request #2451 from pipecat-ai/aleix/audio-buffer-processor-overlap
AudioBufferProcessor: fix overlap when buffer size is set
2025-08-14 15:31:50 -07:00
Aleix Conchillo Flaqué
16ca44131c Merge pull request #2452 from pipecat-ai/aleix/runner-daily-direct-handlesigint
Runner: set handle_sigint to True for Daily direct
2025-08-14 15:25:05 -07:00
Mark Backman
418860cf26 Merge pull request #2450 from pipecat-ai/mb/fix-openai-changelog-entry
fix: Move OpenAI retry changelog entry to the correct release
2025-08-14 15:23:00 -07:00
Aleix Conchillo Flaqué
e2fc8b3dce Runner: set handle_sigint to True for Daily direct 2025-08-14 14:55:52 -07:00
Aleix Conchillo Flaqué
8b641089f8 AudioBufferProcessor: fix overlap when buffer size is set 2025-08-14 14:44:08 -07:00
Mark Backman
d36ed755ce fix: Move OpenAI retry changelog entry to the correct release 2025-08-14 17:34:35 -04:00
Mark Backman
7aaf64fe55 Merge pull request #2447 from pipecat-ai/mb/update-foundational-readme
Improve the foundational example README
2025-08-14 09:51:01 -07:00
Mark Backman
5f52008974 Improve the foundational example README 2025-08-14 11:29:04 -04:00
Mark Backman
d520677b23 Merge pull request #2408 from pipecat-ai/mb/add-mistral-llm
Add MistralLLMService
2025-08-14 08:19:18 -07:00
Mark Backman
42bd1e9d40 Add Mistral to README and pyproject.toml 2025-08-14 11:15:52 -04:00
Mark Backman
7f0494aa04 Override build_chat_completion_params for Mistral 2025-08-14 10:32:18 -04:00
Mark Backman
b7ae2989ac Add foundational 14w-function-calling.py 2025-08-14 10:00:46 -04:00
Mark Backman
2b2b0f8121 Add MistralLLMService 2025-08-14 09:57:14 -04:00
Mark Backman
5ca33a2b00 Merge pull request #2445 from pipecat-ai/mb/fix-changelog-asyncai
fix: Changelog for Async AI bugfix
2025-08-14 06:48:08 -07:00
Mark Backman
938dcb613d fix: Changelog for Async AI bugfix 2025-08-14 09:13:03 -04:00
Mark Backman
bc748cf9d0 Merge pull request #2444 from ashotbagh/fix/asyncai-force-flush
fix(asyncai): force flush WS TTS to eliminate stalls
2025-08-14 06:10:16 -07:00
Ashot
3b55d16a49 fix(asyncai): force flush WS TTS to eliminate stalls 2025-08-14 16:34:34 +04:00
Mark Backman
2c220ca54e Remove stale docs 2025-08-13 14:11:41 -04:00
118 changed files with 5922 additions and 2134 deletions

View File

@@ -25,7 +25,7 @@ jobs:
version: "latest"
- name: Set up Python
run: uv python install 3.10
run: uv python install 3.12
- name: Install system packages
run: |

View File

@@ -23,17 +23,12 @@ jobs:
token: ${{ secrets.QUICKSTART_SYNC_TOKEN }}
path: quickstart-repo
- name: Sync files (excluding READMEs)
- name: Sync files (excluding uv.lock and README.md)
run: |
# Copy code files only, skip READMEs
cp examples/quickstart/bot.py quickstart-repo/
cp examples/quickstart/requirements.txt quickstart-repo/
cp examples/quickstart/env.example quickstart-repo/
# Copy any other files that aren't README.md
# Copy all files except uv.lock and README.md
find examples/quickstart -type f \
-not -name "README.md" \
-not -name "*.md" \
-not -name "uv.lock" \
-exec cp {} quickstart-repo/ \;
- name: Commit and push changes

View File

@@ -29,7 +29,7 @@ jobs:
version: "latest"
- name: Set up Python
run: uv python install 3.10
run: uv python install 3.12
- name: Install system packages
run: |

View File

@@ -5,6 +5,103 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.81] - 2025-08-25
### Added
- Added `pipecat.extensions.voicemail`, a module for detecting voicemail vs.
live conversation, primarily intended for use in outbound calling scenarios.
The voicemail module is optimized for text LLMs only.
- Added new frames to the `idle_timeout_frames` arg: `TranscriptionFrame`,
`InterimTranscriptionFrame`, `UserStartedSpeakingFrame`, and
`UserStoppedSpeakingFrame`. These additions serve as indicators of user
activity in the pipeline idle detection logic.
- Allow passing custom pipeline sink and source processors to a
`Pipeline`. Pipeline source and sink processors are used to know and control
what's coming in and out of a `Pipeline` processor.
- Added `FrameProcessor.pause_processing_system_frames()` and
`FrameProcessor.resume_processing_system_frames()`. These allow to pause and
resume the processing of system frame.
- Added new `on_process_frame()` observer method which makes it possible to know
when a frame is being processed.
- Added new `FrameProcessor.entry_processor()` method. This allows you to access
the first non-compound processor in a pipeline.
- Added `FrameProcessor` properties `processors`, `next` and `previous`.
- `ElevenLabsTTSService` now supports additional runtime changes to the `model`,
`language`, and `voice_settings` parameters.
- Added `apply_text_normalization` support to `ElevenLabsTTSService` and
`ElevenLabsHttpTTSService`.
- Added `MistralLLMService`, using Mistral's chat completion API.
- Added the ability to retry executing a chat completion after a timeout period
for `OpenAILLMService` and its subclasses, `AnthropicLLMService`, and
`AWSBedrockLLMService`. The LLM services accept new args:
`retry_timeout_secs` and `retry_on_timeout`. This feature is disabled by
default.
### Changed
- Updated `daily-python` to 0.19.7.
### Deprecated
- `FrameProcessor.wait_for_task()` is deprecated. Use `await task` or `await
asyncio.wait_for(task, timeout)` instead.
### Removed
- Watchdog timers have been removed. They were introduced in 0.0.72 to help
diagnose pipeline freezes. Unfortunately, they proved ineffective since they
required developers to use Pipecat-specific queues, iterators, and events to
correctly reset the timer, which limited their usefulness and added friction.
- Removed unused `FrameProcessor.set_parent()` and
`FrameProcessor.get_parent()`.
### Fixed
- Fixed an issue that would cause `PipelineRunner` and `PipelineTask` to not
handle external asyncio task cancellation properly.
- Added `SpeechmaticsSTTService` exception handling on connection and sending.
- Replaced `asyncio.wait_for()` for `wait_for2.wait_for()` for Python <
3.12. because of issues regarding task cancellation (i.e. cancellation is
never propagated).
See https://bugs.python.org/issue42130
- Fixed an `AudioBufferProcessor` issues that would cause audio overlap when
setting a max buffer size.
- Fixed an issue where `AsyncAITTSService` had very high latency in responding
by adding `force=true` when sending the flush command.
### Performance
- Improve `PipelineTask` performance by using direct mode processors and by
removing unnecessary tasks.
- Improve `ParallelPipeline` performance by using direct mode, by not
creating a task for each frame and every sub-pipeline and also by removing
other unnecessary tasks.
- `Pipeline` performance improvements by using direct mode.
### Other
- Added `14w-function-calling-mistal.py` using `MistralLLMService`.
- Added `13j-azure-transcription.py` using `AzureSTTService`.
## [0.0.80] - 2025-08-13
### Added
@@ -13,11 +110,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Gemini model can be prompted to insert styled speech to control the TTS
output.
- For `OpenAILLMService` and its subclasses, added the ability to retry
executing a chat completion after a timeout period. The new args are
`retry_timeout_secs` and `retry_on_timeout`. This feature is disabled by
default.
- Added Exotel support to Pipecat's development runner. You can now connect
using the runner with `uv run bot.py -t exotel` and an ngrok connection to
HTTP port 7860.

View File

@@ -54,7 +54,7 @@ You can connect to Pipecat from any platform using our official SDKs:
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
@@ -115,7 +115,7 @@ You can get started with Pipecat running on your local machine, then move your a
### Prerequisites
**Minimum Python Version:** 3.10
**Recommended Python Version:** 3.11-3.12
**Recommended Python Version:** 3.12
### Setup Steps

View File

@@ -1,10 +0,0 @@
# Pipecat Docs
## [Architecture Overview](architecture.md)
Learn about the thinking behind the framework's design.
## [A Frame's Progress](frame-progress.md)
See how a Frame is processed through a Transport, a Pipeline, and a series of Frame Processors.

View File

@@ -1,17 +0,0 @@
# Pipecat architecture guide
## Frames
Frames can represent discrete chunks of data, for instance a chunk of text, a chunk of audio, or an image. They can also be used to as control flow, for instance a frame that indicates that there is no more data available, or that a user started or stopped talking. They can also represent more complex data structures, such as a message array used for an LLM completion.
## FrameProcessors
Frame processors operate on frames. Every frame processor implements a `process_frame` method that consumes one frame and produces zero or more frames. Frame processors can do simple transforms, such as concatenating text fragments into sentences, or they can treat frames as input for an AI Service, and emit chat completions based on message arrays or transform text into audio or images.
## Pipelines
Pipelines are lists of frame processors linked together. Frame processors can push frames upstream or downstream to their peers. A very simple pipeline might chain an LLM frame processor to a text-to-speech frame processor, with a transport as an output.
## Transports
Transports provide input and output frame processors to receive or send frames respectively. For example, the `DailyTransport` does this with a WebRTC session joined to a Daily.co room.

View File

@@ -1,46 +0,0 @@
# A Frame's Progress
1. A user says “Hello, LLM” and the cloud transcription service delivers a transcription to the Transport.
![A transcript frame arrives](images/frame-progress-01.png)
2. The Transport places a Transcription frame in the Pipelines source queue.
![Frame in source queue](images/frame-progress-02.png)
3. The Pipeline passes the Transcription frame to the first Frame Processor in its list, the LLM User Message Aggregator.
![To UMA](images/frame-progress-03.png)
4. The LLM User Message Aggregator updates the LLM Context with a `{“user”: “Hello LLM”}` message.
![Update context](images/frame-progress-04.png)
5. The LLM User Message Aggregator yields an LLM Message Frame, containing the updated LLM Context. The Pipeline passes this frame to the LLM Frame Processor.
![Update context](images/frame-progress-05.png)
6. The LLM Frame Processor creates a streaming chat completion based on the LLM context and yields the first chunk of a response, Text Frame with the value “Hi, “. The Pipeline passes this frame to the TTS Frame Processor. The TTS Frame Processor aggregates this response but doesnt yield anything, yet, because its waiting for a full sentence.
![LLM yields Text](images/frame-progress-06.png)
7. The LLM Frame Processor yields another Text Frame with the value “there.”. The Pipeline passes this frame to the TTS Frame Processor.
![LLM yields more Text](images/frame-progress-07.png)
8. The TTS Frame Processor now has a full sentence, so it starts streaming audio based on “Hi, there.” It yields the first chunk of streaming audio as an Audio frame, which the Pipeline passes to the LLM Assistant Message Aggregator.
![TTS yields Audio](images/frame-progress-08.png)
9. The LLM Assistant Message Aggregator doesnt do anything with Audio frames, so it immediately yields the frame, unchanged. This is the convention for all Frame Processors: frames that the processor doesnt process should be immediately yielded.
![pass-through](images/frame-progress-09.png)
10. The Pipeline places the first Audio frame in its sink queue, which is being watched by the Transport. Since the frame is now in a queue, the Pipeline can continue processing other frames. Note that the source and sink queues form a sort of “boundary of concurrent processing” between a Pipeline and the outside world. In a Pipeline, Frames are processed sequentially; once a Frame is on a queue it can be processed in parallel with the frames being processed by the Pipeline. TODO: link to a more in-depth section about this.
![sink queue](images/frame-progress-10.png)
11. The TTS Frame Processor yields another Audio frame as the Transport transmits the first Audio frame.
![parallel audio](images/frame-progress-11.png)
12. As before, the LLM Assistant Message Aggregator immediately yields the Audio frame and the Pipeline places the Audio frame in the sink queue.
![sink queue 2](images/frame-progress-12.png)
13. The TTS Frame Processor has no more frames to yield. The LLM Frame Processor emits an LLM Response End Frame, which the Pipeline passes to the TTS Frame Processor.
![response end](images/frame-progress-13.png)
14. The TTS Frame Processor immediately yields the LLM Response End Frame, so the Pipeline passes it along to the LLM Assistant Message Aggregator. The LLM Assistant Message Aggregator updates the LLM Context with the full response from the LLM. TODO TODO: I realized I forgot that the TSS Frame Processor also yields the Text frames that the LLM emitted so that the LLM Assistant Message Aggregator could accumulate them, arrggh.
![response end](images/frame-progress-14.png)
15. The system is quiet, and waiting for the next message from the Transport.
![response end](images/frame-progress-15.png)

View File

@@ -1,110 +0,0 @@
# Understanding Different Frame Types in the Pipecat System
In the Pipecat system, frames are used to represent different types of data and control signals that flow through the pipeline. Understanding these frame types is crucial for working with the system effectively. This tutorial will cover the main categories of frames and their specific uses.
## 1. Base Frame Classes
### Frame
The `Frame` class is the base class for all frames. It includes:
- `id`: A unique identifier
- `name`: A descriptive name
- `pts`: Presentation timestamp (optional)
### DataFrame
`DataFrame` is a subclass of `Frame` and serves as a base for most data-carrying frames.
## 2. Audio Frames
### AudioRawFrame
Represents a chunk of audio with properties:
- `audio`: Raw audio data
- `sample_rate`: Audio sample rate
- `num_channels`: Number of audio channels
Subclasses include:
- `InputAudioRawFrame`: For audio from input sources
- `OutputAudioRawFrame`: For audio to be played by output devices
- `TTSAudioRawFrame`: For audio generated by Text-to-Speech services
## 3. Image Frames
### ImageRawFrame
Represents an image with properties:
- `image`: Raw image data
- `size`: Image dimensions
- `format`: Image format (e.g., JPEG, PNG)
Subclasses include:
- `InputImageRawFrame`: For images from input sources
- `OutputImageRawFrame`: For images to be displayed
- `UserImageRawFrame`: For images associated with a specific user
- `VisionImageRawFrame`: For images with associated text for description
- `URLImageRawFrame`: For images with an associated URL
### SpriteFrame
Represents an animated sprite, containing a list of `ImageRawFrame` objects.
## 4. Text and Transcription Frames
### TextFrame
Represents a chunk of text, used for various purposes in the pipeline.
### TranscriptionFrame
A specialized `TextFrame` for speech transcriptions, including:
- `user_id`: ID of the speaking user
- `timestamp`: When the transcription was generated
- `language`: Detected language of the speech
### InterimTranscriptionFrame
Similar to `TranscriptionFrame`, but for interim (not final) transcriptions.
## 5. LLM (Language Model) Frames
### LLMMessagesFrame
Contains a list of messages for an LLM service to process.
### LLMMessagesAppendFrame and LLMMessagesUpdateFrame
Used to modify the current context of LLM messages.
### LLMSetToolsFrame
Specifies tools (functions) available for the LLM to use.
### LLMEnablePromptCachingFrame
Controls prompt caching in certain LLMs.
## 6. System and Control Frames
### SystemFrame
Base class for system-level frames.
Important system frames include:
- `StartFrame`: Initiates a pipeline
- `CancelFrame`: Stops a pipeline immediately
- `ErrorFrame`: Notifies of errors (with `FatalErrorFrame` for unrecoverable errors)
- `EndTaskFrame` and `CancelTaskFrame`: Control pipeline tasks
- `StartInterruptionFrame` and `StopInterruptionFrame`: Indicate user speech for interruptions
### ControlFrame
Base class for control-flow frames.
Notable control frames:
- `EndFrame`: Signals the end of a pipeline
- `LLMFullResponseStartFrame` and `LLMFullResponseEndFrame`: Bracket LLM responses
- `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame`: Indicate user speech activity
- `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame`: Indicate bot speech activity
- `TTSStartedFrame` and `TTSStoppedFrame`: Bracket Text-to-Speech responses
## 7. Special Purpose Frames
### MetricsFrame
Contains performance metrics data.
### FunctionCallInProgressFrame and FunctionCallResultFrame
Used for handling LLM function (tool) calls.
### ServiceUpdateSettingsFrame
Base class for updating service settings, with specific subclasses for LLM, TTS, and STT services.
## Conclusion
Understanding these frame types is essential for working with the Pipecat system. Each frame type serves a specific purpose in the pipeline, whether it's carrying data (like audio or images), controlling the flow of the pipeline, or managing system-level operations. By using the appropriate frame types, you can effectively process and transmit various kinds of information through your pipeline.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 95 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 111 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 117 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

View File

@@ -0,0 +1,88 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.azure.stt import AzureSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = AzureSTTService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(
pipeline,
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,165 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.mistral.llm import MistralLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = MistralLLMService(api_key=os.getenv("MISTRAL_API_KEY"))
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,139 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.frames.frames import EndTaskFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
classifier_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
voicemail = VoicemailDetector(llm=classifier_llm)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
stt,
voicemail.detector(), # Voicemail detection — between STT and User context aggregator
context_aggregator.user(),
llm,
tts,
voicemail.gate(), # TTS gating — Immediately after the TTS service
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@voicemail.event_handler("on_voicemail_detected")
async def handle_voicemail(processor):
logger.info("Voicemail detected! Leaving a message...")
# Push frames using standard Pipecat pattern
await processor.push_frame(
TTSSpeakFrame(
"Hello, this is Jamie calling about your appointment. Please call me back at 555-0123 when you get this."
)
)
# NOTE: A common pattern is to end pipeline after the voicemail is left.
# Uncomment the following line to end the pipeline after leaving the voicemail.
# await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,40 +4,32 @@ This directory contains examples showing how to build voice and multimodal agent
## Setup
1. Make sure you have uv installed:
1. Follow the [README](../../README.md#%EF%B8%8F-contributing-to-the-framework) steps to get your local environment configured.
```bash
curl -LsSf https://astral.sh/uv/install.sh | sh
```
> **Run from root directory**: Make sure you are running the steps from the root directory.
> **Need help?** Refer to the [uv install documentation](https://docs.astral.sh/uv/getting-started/installation/).
> **Using local audio?**: The `LocalAudioTransport` requires a system dependency for `portaudio`. Install the dependency to use the transport.
2. Create a venv and install example dependencies:
```bash
uv sync --all-extras --no-extra krisp
```
3. Create a `.env` file with your API keys:
2. Copy the [`env.example`](../../env.example) file and add API keys for services you plan to use:
```bash
cp env.example .env
# Edit .env with your API keys
```
4. Navigate to the examples directory:
3. Navigate to the examples directory if you aren't already there:
```bash
cd examples/foundational
```
5. Run any example:
4. Run any example:
```bash
uv run python 01-say-one-thing.py
```
6. Open the web interface at http://localhost:7860/client/ and click "Connect"
5. Open the web interface at http://localhost:7860/client/ and click "Connect"
## Running examples with other transports

View File

@@ -0,0 +1,16 @@
FROM dailyco/pipecat-base:latest
# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1
# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy
# Install the project's dependencies using the lockfile and settings
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --locked --no-install-project --no-dev
# Copy the application code
COPY ./bot.py bot.py

View File

@@ -1,87 +1,159 @@
# Pipecat Quickstart
Run your first Pipecat bot in under 5 minutes. This example creates a voice AI bot that you can talk to in your browser.
Build and deploy your first voice AI bot in under 10 minutes. Develop locally, then scale to production on Pipecat Cloud.
## Prerequisites
**Two steps**: [🏠 Local Development](#run-your-bot-locally) → [☁️ Production Deployment](#deploy-to-production)
### Python 3.10+
> 🎯 Quick start: Local bot in 5 minutes, production deployment in 5 more
Pipecat requires Python 3.10 or newer. Check your version:
## Step 1: Local Development (5 min)
```bash
python --version
```
### Prerequisites
If you need to upgrade Python, we recommend using a version manager like `uv` or `pyenv`.
#### Environment
### AI Service API keys
- Python 3.10 or later
- [uv](https://docs.astral.sh/uv/getting-started/installation/) package manager installed
Pipecat orchestrates different AI services in a pipeline, ensuring low latency communication. In this quickstart example, we'll use:
#### AI Service API keys
- [Deepgram](https://console.deepgram.com/signup) for Speech-to-Text transcriptions
You'll need API keys from three services:
- [Deepgram](https://console.deepgram.com/signup) for Speech-to-Text
- [OpenAI](https://auth.openai.com/create-account) for LLM inference
- [Cartesia](https://play.cartesia.ai/sign-up) for Text-to-Speech audio generation
- [Cartesia](https://play.cartesia.ai/sign-up) for Text-to-Speech
Have your API keys ready. We'll add them to your `.env` shortly.
> 💡 **Tip**: Sign up for all three now. You'll need them for both local and cloud deployment.
## Setup
### Setup
1. Set up a virtual environment
Navigate to the quickstart directory and set up your environment.
From the `examples/quickstart` directory, run:
1. Install dependencies:
```bash
uv sync
```
2. Configure your API keys:
Create a `.env` file:
```bash
cp env.example .env
```
Then, add your API keys:
```ini
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key
```
### Run your bot locally
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
uv run bot.py
```
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
2. Install dependencies
```bash
pip install -r requirements.txt
```
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
3. Configure environment variables
Create a `.env` file:
```bash
cp env.example .env
```
Then, add your API keys:
```
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key
```
4. Run the example
Run your bot using:
```bash
python bot.py
```
> Using `uv`? Run your bot using: `uv run bot.py`.
**Open http://localhost:7860 in your browser** and click `Connect` to start talking to your bot.
> 💡 First run note: The initial startup may take ~10 seconds as Pipecat downloads required models, like the Silero VAD model.
> 💡 First run note: The initial startup may take ~20 seconds as Pipecat downloads required models and imports.
## Troubleshooting
🎉 **Success!** Your bot is running locally. Now let's deploy it to production so others can use it.
- **Browser permissions**: Make sure to allow microphone access when prompted by your browser.
- **Connection issues**: If the WebRTC connection fails, first try a different browser. If that fails, make sure you don't have a VPN or firewall rules blocking traffic. WebRTC uses UDP to communicate.
- **Audio issues**: Check that your microphone and speakers are working and not muted.
---
## Next Steps
## Step 2: Deploy to Production (5 min)
- **Read the docs**: Check out [Pipecat's docs](https://docs.pipecat.ai/) for guides and reference information.
- **Join Discord**: Join [Pipecat's Discord server](https://discord.gg/pipecat) to get help and learn about what others are building.
Transform your local bot into a production-ready service. Pipecat Cloud handles scaling, monitoring, and global deployment.
### Prerequisites
1. [Sign up for Pipecat Cloud](https://pipecat.daily.co/sign-up).
2. Install the Pipecat Cloud CLI:
```bash
uv add pipecatcloud
```
> 💡 Tip: You can run the `pipecatcloud` CLI using the `pcc` alias.
3. Set up Docker for building your bot image:
- **Install [Docker](https://www.docker.com/)** on your system
- **Create a [Docker Hub](https://hub.docker.com/) account**
- **Login to Docker Hub:**
```bash
docker login
```
### Configure your deployment
The `pcc-deploy.toml` file tells Pipecat Cloud how to run your bot. **Update the image field** with your Docker Hub username by editing `pcc-deploy.toml`.
```ini
agent_name = "quickstart"
image = "YOUR_DOCKERHUB_USERNAME/quickstart:0.1" # 👈 Update this line
secret_set = "quickstart-secrets"
[scaling]
min_agents = 1
```
**Understanding the TOML file settings:**
- `agent_name`: Your bot's name in Pipecat Cloud
- `image`: The Docker image to deploy (format: `username/image:version`)
- `secret_set`: Where your API keys are stored securely
- `min_agents`: Number of bot instances to keep ready (1 = instant start)
> 💡 Tip: [Set up `image_credentials`](https://docs.pipecat.ai/deployment/pipecat-cloud/fundamentals/secrets#image-pull-secrets) in your TOML file for authenticated image pulls
### Configure secrets
Upload your API keys to Pipecat Cloud's secure storage:
```bash
uv run pcc secrets set quickstart-secrets --file .env
```
This creates a secret set called `quickstart-secrets` (matching your TOML file) and uploads all your API keys from `.env`.
### Build and deploy
Build your Docker image and push to Docker Hub:
```bash
# Update build.sh with your Docker Hub username, then:
./build.sh
```
Deploy to Pipecat Cloud:
```bash
uv run pcc deploy
```
### Connect to your agent
1. Open your [Pipecat Cloud dashboard](https://pipecat.daily.co/)
2. Select your `quickstart` agent → **Sandbox**
3. Allow microphone access and click **Connect**
---
## What's Next?
**🔧 Customize your bot**: Modify `bot.py` to change personality, add functions, or integrate with your data
**📚 Learn more**: Check out [Pipecat's docs](https://docs.pipecat.ai/) for advanced features
**💬 Get help**: Join [Pipecat's Discord](https://discord.gg/pipecat) to connect with the community
### Troubleshooting
- **Browser permissions**: Allow microphone access when prompted
- **Connection issues**: Try a different browser or check VPN/firewall settings
- **Audio issues**: Verify microphone and speakers are working and not muted

View File

@@ -7,18 +7,16 @@
"""Pipecat Quickstart Example.
The example runs a simple voice AI bot that you can connect to using your
browser and speak with it.
browser and speak with it. You can also deploy this bot to Pipecat Cloud.
Required AI services:
- Deepgram (Speech-to-Text)
- OpenAI (LLM)
- Cartesia (Text-to-Speech)
The example connects between client and server using a P2P WebRTC connection.
Run the bot using::
python bot.py
uv run bot.py
"""
import os
@@ -27,7 +25,7 @@ from dotenv import load_dotenv
from loguru import logger
print("🚀 Starting Pipecat bot...")
print("⏳ Loading AI models (30-40 seconds first run, <2 seconds after)\n")
print("⏳ Loading models and imports (20 seconds first run only)\n")
logger.info("Loading Silero VAD model...")
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -40,15 +38,12 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
logger.info("✅ Pipeline components loaded")
logger.info("Loading WebRTC transport...")
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.services.daily import DailyParams
logger.info("✅ All components loaded successfully!")
@@ -121,14 +116,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def bot(runner_args: RunnerArguments):
"""Main bot entry point for the bot starter."""
transport = SmallWebRTCTransport(
params=TransportParams(
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)

19
examples/quickstart/build.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -e
VERSION="0.1"
DOCKER_USERNAME="your_username"
AGENT_NAME="quickstart"
# Build the Docker image with the correct context
echo "Building Docker image..."
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
# Push the Docker images
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"

View File

@@ -1,3 +1,6 @@
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key
CARTESIA_API_KEY=your_cartesia_api_key
# Optional: Connect via Daily WebRTC locally
DAILY_API_KEY=your_daily_api_key

View File

@@ -0,0 +1,6 @@
agent_name = "quickstart"
image = "your_username/quickstart:0.1"
secret_set = "quickstart-secrets"
[scaling]
min_agents = 1

View File

@@ -0,0 +1,19 @@
[project]
name = "pipecat-quickstart"
version = "0.1.0"
description = "Quickstart example for building voice AI bots with Pipecat"
requires-python = ">=3.10"
dependencies = [
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,runner]>=0.0.79",
"pipecatcloud>=0.2.3"
]
[dependency-groups]
dev = [
"ruff~=0.12.1",
]
[tool.ruff]
line-length = 100
[tool.ruff.lint]
select = ["I"]

View File

@@ -1 +0,0 @@
pipecat-ai[webrtc,silero,deepgram,openai,cartesia,runner]>=0.0.77

3148
examples/quickstart/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -36,6 +36,7 @@ dependencies = [
"openai>=1.74.0,<=1.99.1",
# Pinning numba to resolve package dependencies
"numba==0.61.2",
"wait_for2>=0.4.1; python_version<'3.12'",
]
[project.urls]
@@ -52,7 +53,7 @@ azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "websockets>=13.1,<15.0" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.19.6" ]
daily = [ "daily-python~=0.19.7" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
elevenlabs = [ "websockets>=13.1,<15.0" ]
fal = [ "fal-client~=0.5.9" ]
@@ -73,6 +74,7 @@ lmnt = [ "websockets>=13.1,<15.0" ]
local = [ "pyaudio~=0.2.14" ]
mcp = [ "mcp[cli]~=1.9.4" ]
mem0 = [ "mem0ai~=0.1.94" ]
mistral = []
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
moondream = [ "accelerate~=1.10.0", "einops~=0.8.0", "pyvips[binary]~=3.0.0", "timm~=1.0.13", "transformers>=4.48.0" ]
nim = []
@@ -113,7 +115,7 @@ dev = [
"pre-commit~=4.2.0",
"pyright~=1.1.402",
"pytest~=8.4.1",
"pytest-asyncio~=1.0.0",
"pytest-asyncio~=1.1.0",
"pytest-aiohttp==1.1.0",
"ruff~=0.12.1",
"setuptools~=78.1.1",

View File

@@ -89,7 +89,13 @@ class EvalRunner:
async def assert_eval_false(self):
await self._queue.put(False)
async def run_eval(self, example_file: str, prompt: EvalPrompt, eval: Optional[str] = None):
async def run_eval(
self,
example_file: str,
prompt: EvalPrompt,
eval: str,
user_speaks_first: bool = False,
):
if not re.match(self._pattern, example_file):
return
@@ -106,7 +112,9 @@ class EvalRunner:
try:
tasks = [
asyncio.create_task(run_example_pipeline(script_path)),
asyncio.create_task(run_eval_pipeline(self, example_file, prompt, eval)),
asyncio.create_task(
run_eval_pipeline(self, example_file, prompt, eval, user_speaks_first)
),
]
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
if pending:
@@ -195,7 +203,8 @@ async def run_eval_pipeline(
eval_runner: EvalRunner,
example_file: str,
prompt: EvalPrompt,
eval: Optional[str],
eval: str,
user_speaks_first: bool = False,
):
logger.info(f"Starting eval bot")
@@ -225,7 +234,7 @@ async def run_eval_pipeline(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
voice_id="97f4b8fb-f2fe-444b-bb9a-c109783a857a", # Nathan
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
@@ -257,15 +266,17 @@ async def run_eval_pipeline(
elif isinstance(prompt, tuple):
example_prompt, example_image = prompt
# See if we need to include an eval prompt.
eval_prompt = ""
if eval:
eval_prompt = f"The answer is correct if the user says [{eval}]."
eval_prompt = f"The answer is correct if it's appropriate for the context and matches: {eval}."
common_system_prompt = f"Call the eval function with your assessment only if the user answers the question. {eval_prompt}"
if user_speaks_first:
system_prompt = f"You are an LLM eval, be extremly brief. You will start the conversation by saying: '{example_prompt}'. {common_system_prompt}"
else:
system_prompt = f"You are an LLM eval, be extremly brief. Your goal is to first ask one question: {example_prompt}. {common_system_prompt}"
messages = [
{
"role": "system",
"content": f"You are an LLM eval, be extremly brief. Your goal is to only ask one question: {example_prompt}. Call the eval function only if the user answers the question and check if the answer is correct (words as numbers are valid). {eval_prompt}",
"content": system_prompt,
},
]
@@ -313,6 +324,14 @@ async def run_eval_pipeline(
)
await audio_buffer.start_recording()
# Default behavior is for the bot to speak first
# If the eval bot speaks first, we append the prompt to the messages
if user_speaks_first:
messages.append(
{"role": "user", "content": f"Start by saying this exactly: '{prompt}'"}
)
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@@ -322,6 +341,8 @@ async def run_eval_pipeline(
async def on_pipeline_idle_timeout(task):
await eval_runner.assert_eval_false()
runner = PipelineRunner()
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
# eval and the example.
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -24,9 +24,13 @@ ASSETS_DIR = SCRIPT_DIR / "assets"
FOUNDATIONAL_DIR = SCRIPT_DIR.parent.parent / "examples" / "foundational"
# Speaking order constants
USER_SPEAKS_FIRST = True
BOT_SPEAKS_FIRST = False
# Math
PROMPT_SIMPLE_MATH = "A simple math addition."
EVAL_SIMPLE_MATH = "Correct math addition."
# Weather
PROMPT_WEATHER = "What's the weather in San Francisco?"
@@ -40,120 +44,157 @@ EVAL_ONLINE_SEARCH = f"Today is {datetime.now(timezone.utc).strftime('%B %d, %Y'
# Switch language
PROMPT_SWITCH_LANGUAGE = "Say something in Spanish."
EVAL_SWITCH_LANGUAGE = "Check if the user is now talking in Spanish."
EVAL_SWITCH_LANGUAGE = "The user is now talking in Spanish."
# Vision
PROMPT_VISION = ("What do you see?", Image.open(ASSETS_DIR / "cat.jpg"))
EVAL_VISION = "A cat description."
# Voicemail
PROMPT_VOICEMAIL = "Please leave a message after the beep."
EVAL_VOICEMAIL = "Assess the conversation and determine if it is a voicemail."
PROMPT_CONVERSATION = "Hello, this is Mark."
EVAL_CONVERSATION = "A start of a conversation, not a voicemail."
TESTS_07 = [
# 07 series
("07-interruptible.py", PROMPT_SIMPLE_MATH, None),
("07-interruptible-cartesia-http.py", PROMPT_SIMPLE_MATH, None),
("07a-interruptible-speechmatics.py", PROMPT_SIMPLE_MATH, None),
("07aa-interruptible-soniox.py", PROMPT_SIMPLE_MATH, None),
("07ab-interruptible-inworld-http.py", PROMPT_SIMPLE_MATH, None),
("07ac-interruptible-asyncai.py", PROMPT_SIMPLE_MATH, None),
("07ac-interruptible-asyncai-http.py", PROMPT_SIMPLE_MATH, None),
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, None),
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, None),
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, None),
("07d-interruptible-elevenlabs-http.py", PROMPT_SIMPLE_MATH, None),
("07e-interruptible-playht.py", PROMPT_SIMPLE_MATH, None),
("07e-interruptible-playht-http.py", PROMPT_SIMPLE_MATH, None),
("07f-interruptible-azure.py", PROMPT_SIMPLE_MATH, None),
("07g-interruptible-openai.py", PROMPT_SIMPLE_MATH, None),
("07h-interruptible-openpipe.py", PROMPT_SIMPLE_MATH, None),
("07j-interruptible-gladia.py", PROMPT_SIMPLE_MATH, None),
("07k-interruptible-lmnt.py", PROMPT_SIMPLE_MATH, None),
("07l-interruptible-groq.py", PROMPT_SIMPLE_MATH, None),
("07m-interruptible-aws.py", PROMPT_SIMPLE_MATH, None),
("07n-interruptible-gemini.py", PROMPT_SIMPLE_MATH, None),
("07n-interruptible-google.py", PROMPT_SIMPLE_MATH, None),
("07o-interruptible-assemblyai.py", PROMPT_SIMPLE_MATH, None),
("07q-interruptible-rime.py", PROMPT_SIMPLE_MATH, None),
("07q-interruptible-rime-http.py", PROMPT_SIMPLE_MATH, None),
("07r-interruptible-riva-nim.py", PROMPT_SIMPLE_MATH, None),
("07s-interruptible-google-audio-in.py", PROMPT_SIMPLE_MATH, None),
("07t-interruptible-fish.py", PROMPT_SIMPLE_MATH, None),
("07v-interruptible-neuphonic.py", PROMPT_SIMPLE_MATH, None),
("07v-interruptible-neuphonic-http.py", PROMPT_SIMPLE_MATH, None),
("07w-interruptible-fal.py", PROMPT_SIMPLE_MATH, None),
("07y-interruptible-minimax.py", PROMPT_SIMPLE_MATH, None),
("07z-interruptible-sarvam.py", PROMPT_SIMPLE_MATH, None),
("07-interruptible.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07-interruptible-cartesia-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07a-interruptible-speechmatics.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07aa-interruptible-soniox.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ab-interruptible-inworld-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ac-interruptible-asyncai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ac-interruptible-asyncai-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"07d-interruptible-elevenlabs-http.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
("07e-interruptible-playht.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07e-interruptible-playht-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07f-interruptible-azure.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07g-interruptible-openai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07h-interruptible-openpipe.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07j-interruptible-gladia.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07k-interruptible-lmnt.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07l-interruptible-groq.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07m-interruptible-aws.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07n-interruptible-gemini.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07n-interruptible-google.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07o-interruptible-assemblyai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07q-interruptible-rime.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07q-interruptible-rime-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07r-interruptible-riva-nim.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"07s-interruptible-google-audio-in.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
("07t-interruptible-fish.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07v-interruptible-neuphonic.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07v-interruptible-neuphonic-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07w-interruptible-fal.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07y-interruptible-minimax.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07z-interruptible-sarvam.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", PROMPT_SIMPLE_MATH, None),
# ("07i-interruptible-xtts.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# Needs a Krisp license.
# ("07p-interruptible-krisp.py", PROMPT_SIMPLE_MATH, None),
# ("07p-interruptible-krisp.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# Needs GPU resources.
# ("07u-interruptible-ultravox.py", PROMPT_SIMPLE_MATH, None),
# ("07u-interruptible-ultravox.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
]
TESTS_12 = [
("12-describe-video.py", PROMPT_VISION, EVAL_VISION),
("12a-describe-video-gemini-flash.py", PROMPT_VISION, EVAL_VISION),
("12b-describe-video-gpt-4o.py", PROMPT_VISION, EVAL_VISION),
("12c-describe-video-anthropic.py", PROMPT_VISION, EVAL_VISION),
("12-describe-video.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12a-describe-video-gemini-flash.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12b-describe-video-gpt-4o.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12c-describe-video-anthropic.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
]
TESTS_14 = [
("14-function-calling.py", PROMPT_WEATHER, EVAL_WEATHER),
("14a-function-calling-anthropic.py", PROMPT_WEATHER, EVAL_WEATHER),
("14b-function-calling-anthropic-video.py", PROMPT_WEATHER, EVAL_WEATHER),
("14d-function-calling-video.py", PROMPT_WEATHER, EVAL_WEATHER),
("14e-function-calling-google.py", PROMPT_WEATHER, EVAL_WEATHER),
("14f-function-calling-groq.py", PROMPT_WEATHER, EVAL_WEATHER),
("14g-function-calling-grok.py", PROMPT_WEATHER, EVAL_WEATHER),
("14h-function-calling-azure.py", PROMPT_WEATHER, EVAL_WEATHER),
("14i-function-calling-fireworks.py", PROMPT_WEATHER, EVAL_WEATHER),
("14j-function-calling-nim.py", PROMPT_WEATHER, EVAL_WEATHER),
("14m-function-calling-openrouter.py", PROMPT_WEATHER, EVAL_WEATHER),
("14n-function-calling-perplexity.py", PROMPT_WEATHER, EVAL_WEATHER),
("14p-function-calling-gemini-vertex-ai.py", PROMPT_WEATHER, EVAL_WEATHER),
("14q-function-calling-qwen.py", PROMPT_WEATHER, EVAL_WEATHER),
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER),
("14-function-calling.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14a-function-calling-anthropic.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14b-function-calling-anthropic-video.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14d-function-calling-video.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14e-function-calling-google.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14f-function-calling-groq.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14g-function-calling-grok.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14h-function-calling-azure.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14i-function-calling-fireworks.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14j-function-calling-nim.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14m-function-calling-openrouter.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14n-function-calling-perplexity.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14p-function-calling-gemini-vertex-ai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14q-function-calling-qwen.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# Currently not working.
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER),
# ("14k-function-calling-cerebras.py", PROMPT_WEATHER, EVAL_WEATHER),
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER),
# ("14o-function-calling-gemini-openai-format.py", PROMPT_WEATHER, EVAL_WEATHER),
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14k-function-calling-cerebras.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14o-function-calling-gemini-openai-format.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
]
TESTS_15 = [
("15a-switch-languages.py", PROMPT_SWITCH_LANGUAGE, EVAL_SWITCH_LANGUAGE),
("15a-switch-languages.py", PROMPT_SWITCH_LANGUAGE, EVAL_SWITCH_LANGUAGE, BOT_SPEAKS_FIRST),
]
TESTS_19 = [
("19-openai-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER),
("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER),
("19b-openai-realtime-beta-text.py", PROMPT_WEATHER, EVAL_WEATHER),
("19-openai-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-beta-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
]
TESTS_21 = [
("21a-tavus-video-service.py", PROMPT_SIMPLE_MATH, None),
("21a-tavus-video-service.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
]
TESTS_26 = [
("26-gemini-multimodal-live.py", PROMPT_SIMPLE_MATH, None),
("26a-gemini-multimodal-live-transcription.py", PROMPT_SIMPLE_MATH, None),
("26b-gemini-multimodal-live-function-calling.py", PROMPT_WEATHER, EVAL_WEATHER),
("26c-gemini-multimodal-live-video.py", PROMPT_SIMPLE_MATH, None),
("26e-gemini-multimodal-google-search.py", PROMPT_ONLINE_SEARCH, EVAL_ONLINE_SEARCH),
("26-gemini-multimodal-live.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26a-gemini-multimodal-live-transcription.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
(
"26b-gemini-multimodal-live-function-calling.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
("26c-gemini-multimodal-live-video.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26e-gemini-multimodal-google-search.py",
PROMPT_ONLINE_SEARCH,
EVAL_ONLINE_SEARCH,
BOT_SPEAKS_FIRST,
),
# Currently not working.
# ("26d-gemini-multimodal-live-text.py", PROMPT_SIMPLE_MATH, None),
# ("26d-gemini-multimodal-live-text.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
]
TESTS_27 = [
("27-simli-layer.py", PROMPT_SIMPLE_MATH, None),
("27-simli-layer.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
]
TESTS_40 = [
("40-aws-nova-sonic.py", PROMPT_SIMPLE_MATH, None),
("40-aws-nova-sonic.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
]
TESTS_43 = [
("43a-heygen-video-service.py", PROMPT_SIMPLE_MATH, None),
("43a-heygen-video-service.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
]
TESTS_44 = [
("44-voicemail-detection.py", PROMPT_VOICEMAIL, EVAL_VOICEMAIL, USER_SPEAKS_FIRST),
("44-voicemail-detection.py", PROMPT_CONVERSATION, EVAL_CONVERSATION, USER_SPEAKS_FIRST),
]
TESTS = [
@@ -167,6 +208,7 @@ TESTS = [
*TESTS_27,
*TESTS_40,
*TESTS_43,
*TESTS_44,
]
@@ -188,8 +230,11 @@ async def main(args: argparse.Namespace):
log_level=log_level,
)
for test, prompt, eval in TESTS:
await runner.run_eval(test, prompt, eval)
# Parse test config: (test, prompt, eval, user_speaks_first)
for test_config in TESTS:
test, prompt, eval, user_speaks_first = test_config
await runner.run_eval(test, prompt, eval, user_speaks_first)
runner.print_results()

View File

@@ -12,3 +12,20 @@ from loguru import logger
__version__ = version("pipecat-ai")
logger.info(f"ᓚᘏᗢ Pipecat {__version__} (Python {sys.version}) ᓚᘏᗢ")
# We replace `asyncio.wait_for()` for `wait_for2.wait_for()` for Python < 3.12.
#
# In Python 3.12, `asyncio.wait_for()` is implemented in terms of
# `asyncio.timeout()` which fixed a bunch of issues. However, this was never
# backported (because of the lack of `async.timeout()`) and there are still many
# remainig issues, specially in Python 3.10, in `async.wait_for()`.
#
# See https://github.com/python/cpython/pull/98518
import asyncio
if sys.version_info < (3, 12):
import wait_for2
# Replace asyncio.wait_for.
asyncio.wait_for = wait_for2.wait_for

View File

@@ -28,7 +28,7 @@ SPEAKING_THRESHOLD = 20
def create_default_resampler(**kwargs) -> BaseAudioResampler:
"""Create a default audio resampler instance.
. deprecated:: 0.0.74
.. deprecated:: 0.0.74
This function is deprecated and will be removed in a future version.
Use `create_stream_resampler` for real-time processing scenarios or
`create_file_resampler` for batch processing of complete audio files.

View File

View File

@@ -0,0 +1,707 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Voicemail detection module for Pipecat.
This module provides voicemail detection capabilities using parallel pipeline
processing to classify incoming calls as either voicemail messages or live
conversations. It's specifically designed for outbound calling scenarios where
a bot needs to determine if a human answered or if the call went to voicemail.
Note:
The voicemail module is optimized for text LLMs only.
"""
import asyncio
from typing import List, Optional
from loguru import logger
from pipecat.frames.frames import (
BotInterruptionFrame,
EndFrame,
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StopFrame,
SystemFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.services.llm_service import LLMService
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.sync.event_notifier import EventNotifier
class NotifierGate(FrameProcessor):
"""Base gate processor that controls frame flow based on notifier signals.
This base class provides common gate functionality for processors that need to
start open and close permanently when a notifier signals. Subclasses define
which frames are allowed through when the gate is closed.
The gate starts open to allow initial processing and closes permanently once
the notifier signals. This ensures controlled frame flow based on external
decisions or events.
"""
def __init__(self, notifier: BaseNotifier, task_name: str = "gate"):
"""Initialize the notifier gate.
Args:
notifier: Notifier that signals when the gate should close.
task_name: Name for the notification waiting task (for debugging).
"""
super().__init__()
self._notifier = notifier
self._task_name = task_name
self._gate_opened = True
self._gate_task: Optional[asyncio.Task] = None
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._gate_task = self.create_task(self._wait_for_notification())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and control gate state based on notifier signals.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
# Gate logic: open gate allows all frames, closed gate filters frames
if self._gate_opened:
await self.push_frame(frame, direction)
elif isinstance(
frame,
(SystemFrame, EndFrame, StopFrame),
):
await self.push_frame(frame, direction)
async def _wait_for_notification(self):
"""Wait for notifier signal and close the gate.
This method blocks until the notifier signals, then closes the gate
permanently to change frame filtering behavior.
"""
await self._notifier.wait()
if self._gate_opened:
self._gate_opened = False
class ClassifierGate(NotifierGate):
"""Gate processor that controls frame flow based on classification decisions.
Inherits from NotifierGate and starts open to allow initial classification
processing. Closes permanently once a classification decision is made
(CONVERSATION or VOICEMAIL). This ensures the classifier only runs until a
definitive decision is reached, preventing unnecessary LLM calls and maintaining
system efficiency.
When closed, only allows system frames and user speaking frames to continue.
Speaking frames are needed for voicemail timing control, but not for conversation.
"""
def __init__(self, gate_notifier: BaseNotifier, conversation_notifier: BaseNotifier):
"""Initialize the classifier gate.
Args:
gate_notifier: Notifier that signals when a classification decision has
been made and the gate should close.
conversation_notifier: Notifier that signals when conversation is detected.
"""
super().__init__(gate_notifier, task_name="classifier_gate")
self._conversation_notifier = conversation_notifier
self._conversation_detected = False
self._conversation_task: Optional[asyncio.Task] = None
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._conversation_task = self.create_task(self._wait_for_conversation())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._conversation_task:
await self.cancel_task(self._conversation_task)
self._conversation_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and control gate state based on notifier signals.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await FrameProcessor.process_frame(self, frame, direction)
# Gate logic: open gate allows all frames, closed gate filters frames
if self._gate_opened:
await self.push_frame(frame, direction)
elif isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame)):
# Only allow speaking frames if conversation was NOT detected (i.e., voicemail case)
# This prevents the UserContextAggregator from issuing a warning about no aggregation
# to push.
if not self._conversation_detected:
await self.push_frame(frame, direction)
elif isinstance(frame, (SystemFrame, EndFrame, StopFrame)):
# Always allow system frames through
# This includes the UserStartedSpeakingFrame and UserStoppedSpeakingFrame
# which are used to detect voicemail timing.
await self.push_frame(frame, direction)
async def _wait_for_conversation(self):
"""Wait for conversation detection notification and mark conversation detected."""
await self._conversation_notifier.wait()
self._conversation_detected = True
class ConversationGate(NotifierGate):
"""Gate processor that blocks conversation flow when voicemail is detected.
Inherits from NotifierGate and starts open to allow normal conversation
processing. Closes permanently when voicemail is detected to prevent the
main conversation LLM from processing additional input after voicemail
classification.
When closed, only allows system frames and user speaking frames to continue.
"""
def __init__(self, voicemail_notifier: BaseNotifier):
"""Initialize the conversation gate.
Args:
voicemail_notifier: Notifier that signals when voicemail has been
detected and the conversation should be blocked.
"""
super().__init__(voicemail_notifier, task_name="conversation_gate")
class ClassificationProcessor(FrameProcessor):
"""Processor that handles LLM classification responses and triggers events.
This processor aggregates LLM text tokens into complete responses and analyzes
them to determine if the call reached a voicemail system or a live person.
It uses the LLM response frame delimiters (LLMFullResponseStartFrame and
LLMFullResponseEndFrame) to ensure complete token aggregation regardless
of how the LLM tokenizes the response words.
The processor expects responses containing either "CONVERSATION" (indicating
a human answered) or "VOICEMAIL" (indicating an automated system). Once a
decision is made, it triggers the appropriate notifications and event handlers.
For voicemail detection, the event handler timer starts immediately and is cancelled
and restarted based on user speech patterns to ensure proper timing.
"""
def __init__(
self,
*,
gate_notifier: BaseNotifier,
conversation_notifier: BaseNotifier,
voicemail_notifier: BaseNotifier,
voicemail_response_delay: float,
):
"""Initialize the voicemail processor.
Args:
gate_notifier: Notifier to signal the ClassifierGate about classification
decisions so it can close and stop processing.
conversation_notifier: Notifier to signal the TTSGate to release
all gated TTS frames for normal conversation flow.
voicemail_notifier: Notifier to signal the TTSGate to clear
gated TTS frames since voicemail was detected.
voicemail_response_delay: Delay in seconds after user stops speaking
before triggering the voicemail event handler. This ensures the voicemail
greeting or user message is complete before responding.
"""
super().__init__()
self._gate_notifier = gate_notifier
self._conversation_notifier = conversation_notifier
self._voicemail_notifier = voicemail_notifier
self._voicemail_response_delay = voicemail_response_delay
# Register the voicemail detected event
self._register_event_handler("on_voicemail_detected")
# Aggregation state for collecting complete LLM responses
self._processing_response = False
self._response_buffer = ""
self._decision_made = False
# Voicemail timing state
self._voicemail_detected = False
self._voicemail_task: Optional[asyncio.Task] = None
self._voicemail_event = asyncio.Event()
self._voicemail_event.set()
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._voicemail_task = self.create_task(self._delayed_voicemail_handler())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._voicemail_task:
await self.cancel_task(self._voicemail_task)
self._voicemail_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and handle LLM classification responses.
This method implements a state machine for aggregating LLM responses:
1. LLMFullResponseStartFrame: Begin collecting tokens
2. LLMTextFrame: Accumulate text tokens into buffer
3. LLMFullResponseEndFrame: Process complete response and make decision
4. UserStartedSpeakingFrame/UserStoppedSpeakingFrame: Manage voicemail timing
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
# Begin aggregating a new LLM response
self._processing_response = True
self._response_buffer = ""
elif isinstance(frame, LLMFullResponseEndFrame):
# Complete response received - make classification decision
if self._processing_response and not self._decision_made:
await self._process_classification(self._response_buffer.strip())
self._processing_response = False
self._response_buffer = ""
elif isinstance(frame, LLMTextFrame) and self._processing_response:
# Accumulate text tokens from the streaming LLM response
self._response_buffer += frame.text
elif isinstance(frame, UserStartedSpeakingFrame):
# User started speaking - set the voicemail event
if self._voicemail_detected:
self._voicemail_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
# User stopped speaking - clear the voicemail event
if self._voicemail_detected:
self._voicemail_event.clear()
else:
# Pass all non-LLM frames through
# Blocking LLM frames prevents interference with the downstream LLM
await self.push_frame(frame, direction)
async def _process_classification(self, full_response: str):
"""Process the complete LLM classification response and trigger actions.
Analyzes the aggregated response text to determine if it contains
"CONVERSATION" or "VOICEMAIL" and triggers the appropriate notifications
and callbacks based on the classification result.
Args:
full_response: The complete aggregated response text from the LLM.
"""
if self._decision_made:
return
response = full_response.upper()
logger.debug(f"{self}: Classifying response: '{full_response}'")
if "CONVERSATION" in response:
# Human answered - continue normal conversation flow
self._decision_made = True
logger.info(f"{self}: CONVERSATION detected")
await self._gate_notifier.notify() # Close the classifier gate
await self._conversation_notifier.notify() # Release buffered TTS frames
elif "VOICEMAIL" in response:
# Voicemail detected - trigger voicemail handling
self._decision_made = True
self._voicemail_detected = True
logger.info(f"{self}: VOICEMAIL detected")
await self._gate_notifier.notify() # Close the classifier gate
await self._voicemail_notifier.notify() # Clear buffered TTS frames
# Interrupt the current pipeline to stop any ongoing processing
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
# Set the voicemail event to trigger the voicemail handler
self._voicemail_event.clear()
else:
# This can happen if the LLM is interrupted before completing the response
logger.debug(f"{self}: No classification found: '{full_response}'")
async def _delayed_voicemail_handler(self):
"""Execute the voicemail event handler after the configured delay.
This method waits for the specified delay period, then triggers the
developer's voicemail event handler. The timer can be cancelled and restarted
based on user speech patterns to ensure proper timing.
"""
while True:
try:
await asyncio.wait_for(
self._voicemail_event.wait(), timeout=self._voicemail_response_delay
)
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
await self._call_event_handler("on_voicemail_detected")
break
class TTSGate(FrameProcessor):
"""Gates TTS frames until voicemail classification decision is made.
This processor holds TTS output frames in a gate while the voicemail
classification is in progress. This prevents audio from being played
to the caller before determining if they're human or a voicemail system.
The gate operates in two modes based on the classification result:
- CONVERSATION: Opens the gate to release all held frames for normal dialogue
- VOICEMAIL: Clears held frames since they're not needed for voicemail
The gating only applies to TTS-related frames (TTSTextFrame, TTSAudioRawFrame).
All other frames pass through immediately to maintain proper pipeline flow.
"""
def __init__(self, conversation_notifier: BaseNotifier, voicemail_notifier: BaseNotifier):
"""Initialize the TTS gate.
Args:
conversation_notifier: Notifier that signals when a conversation is
detected and gated frames should be released for playback.
voicemail_notifier: Notifier that signals when voicemail is detected
and gated frames should be cleared (not played).
"""
super().__init__()
self._conversation_notifier = conversation_notifier
self._voicemail_notifier = voicemail_notifier
self._frame_buffer: List[tuple[Frame, FrameDirection]] = []
self._gating_active = True
self._conversation_task: Optional[asyncio.Task] = None
self._voicemail_task: Optional[asyncio.Task] = None
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._conversation_task = self.create_task(self._wait_for_conversation())
self._voicemail_task = self.create_task(self._wait_for_voicemail())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._conversation_task:
await self.cancel_task(self._conversation_task)
self._conversation_task = None
if self._voicemail_task:
await self.cancel_task(self._voicemail_task)
self._voicemail_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and handle gating logic based on frame type.
TTS frames are gated while classification is active. All other frames
pass through immediately. The gating state is controlled by the
classification notifications.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
# Core gating logic: hold TTS frames, pass everything else through
if self._gating_active and isinstance(
frame, (TTSStartedFrame, TTSStoppedFrame, TTSTextFrame, TTSAudioRawFrame)
):
# Gate TTS frames while waiting for classification decision
self._frame_buffer.append((frame, direction))
else:
# Pass through all non-TTS frames immediately
await self.push_frame(frame, direction)
async def _wait_for_conversation(self):
"""Wait for conversation detection notification and release gated frames.
When a conversation is detected, all gated TTS frames are released
in order to continue normal dialogue flow. This allows the bot to
respond naturally to the human caller.
"""
await self._conversation_notifier.wait()
# Release all gated frames in original order
self._gating_active = False
for frame, direction in self._frame_buffer:
await self.push_frame(frame, direction)
self._frame_buffer.clear()
async def _wait_for_voicemail(self):
"""Wait for voicemail detection notification and clear gated frames.
When voicemail is detected, all gated TTS frames are discarded
since they were intended for human conversation and are not appropriate
for voicemail systems. The developer event handlers will handle voicemail-
specific audio output.
"""
await self._voicemail_notifier.wait()
# Clear gated frames without playing them
self._gating_active = False
self._frame_buffer.clear()
class VoicemailDetector(ParallelPipeline):
"""Parallel pipeline for detecting voicemail vs. live conversation in outbound calls.
This detector uses a parallel pipeline architecture to perform real-time
classification of outbound phone calls without interrupting the conversation
flow. It determines whether a human answered the phone or if the call went
to a voicemail system.
Architecture:
- Conversation branch: Empty pipeline that allows normal frame flow
- Classification branch: Contains the LLM classifier and decision logic
The system uses a gate mechanism to control when classification runs and
a gating system to prevent TTS output until classification is complete.
Once a decision is made, the appropriate action is taken:
- CONVERSATION: Continue normal bot dialogue
- VOICEMAIL: Trigger developer event handler for custom voicemail handling
Example::
classification_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
detector = VoicemailDetector(llm=classification_llm)
@detector.event_handler("on_voicemail_detected")
async def handle_voicemail(processor):
await processor.push_frame(TTSSpeakFrame("Please leave a message."))
pipeline = Pipeline([
transport.input(),
stt,
detector.detector(), # Classification
context_aggregator.user(),
llm,
tts,
detector.gate(), # TTS gating
transport.output(),
context_aggregator.assistant(),
])
# For custom prompts, append the required response instruction:
custom_prompt = "Your custom classification logic here. " + VoicemailDetector.CLASSIFIER_RESPONSE_INSTRUCTION
Events:
on_voicemail_detected: Triggered when voicemail is detected after the configured
delay. The event handler receives one argument: the ClassificationProcessor
instance which can be used to push frames.
Constants:
CLASSIFIER_RESPONSE_INSTRUCTION: The exact text that must be included in custom
system prompts to ensure proper classification functionality.
"""
CLASSIFIER_RESPONSE_INSTRUCTION = 'Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it\'s voicemail/recording.'
DEFAULT_SYSTEM_PROMPT = (
"""You are a voicemail detection classifier for an OUTBOUND calling system. A bot has called a phone number and you need to determine if a human answered or if the call went to voicemail based on the provided text.
HUMAN ANSWERED - LIVE CONVERSATION (respond "CONVERSATION"):
- Personal greetings: "Hello?", "Hi", "Yeah?", "John speaking"
- Interactive responses: "Who is this?", "What do you want?", "Can I help you?"
- Conversational tone expecting back-and-forth dialogue
- Questions directed at the caller: "Hello? Anyone there?"
- Informal responses: "Yep", "What's up?", "Speaking"
- Natural, spontaneous speech patterns
- Immediate acknowledgment of the call
VOICEMAIL SYSTEM (respond "VOICEMAIL"):
- Automated voicemail greetings: "Hi, you've reached [name], please leave a message"
- Phone carrier messages: "The number you have dialed is not in service", "Please leave a message", "All circuits are busy"
- Professional voicemail: "This is [name], I'm not available right now"
- Instructions about leaving messages: "leave a message", "leave your name and number"
- References to callback or messaging: "call me back", "I'll get back to you"
- Carrier system messages: "mailbox is full", "has not been set up"
- Business hours messages: "our office is currently closed"
"""
+ CLASSIFIER_RESPONSE_INSTRUCTION
)
def __init__(
self,
*,
llm: LLMService,
voicemail_response_delay: float = 2.0,
custom_system_prompt: Optional[str] = None,
):
"""Initialize the voicemail detector with classification and buffering components.
Args:
llm: LLM service used for voicemail vs conversation classification.
Should be fast and reliable for real-time classification.
voicemail_response_delay: Delay in seconds after user stops speaking
before triggering the voicemail event handler. This allows voicemail
responses to be played back after a short delay to ensure the response
occurs during the voicemail recording. Default is 2.0 seconds.
custom_system_prompt: Optional custom system prompt for classification. If None,
uses the default prompt optimized for outbound calling scenarios.
Custom prompts should instruct the LLM to respond with exactly
"CONVERSATION" or "VOICEMAIL" for proper detection functionality.
"""
self._classifier_llm = llm
self._prompt = (
custom_system_prompt if custom_system_prompt is not None else self.DEFAULT_SYSTEM_PROMPT
)
self._voicemail_response_delay = voicemail_response_delay
# Validate custom prompts to ensure they work with the detection logic
if custom_system_prompt is not None:
self._validate_prompt(custom_system_prompt)
# Set up the LLM context with the classification prompt
self._messages = [
{
"role": "system",
"content": self._prompt,
},
]
# Create the LLM context and aggregators for conversation management
self._context = OpenAILLMContext(self._messages)
self._context_aggregator = llm.create_context_aggregator(self._context)
# Create notification system for coordinating between components
self._gate_notifier = EventNotifier() # Signals classification completion
self._conversation_notifier = EventNotifier() # Signals conversation detected
self._voicemail_notifier = EventNotifier() # Signals voicemail detected
# Create the processor components
self._classifier_gate = ClassifierGate(self._gate_notifier, self._conversation_notifier)
self._conversation_gate = ConversationGate(self._voicemail_notifier)
self._classification_processor = ClassificationProcessor(
gate_notifier=self._gate_notifier,
conversation_notifier=self._conversation_notifier,
voicemail_notifier=self._voicemail_notifier,
voicemail_response_delay=voicemail_response_delay,
)
self._voicemail_gate = TTSGate(self._conversation_notifier, self._voicemail_notifier)
# Initialize the parallel pipeline with conversation and classifier branches
super().__init__(
# Conversation branch: gate to blocks after voicemail detection
[self._conversation_gate],
# Classification branch: gate -> context -> LLM -> processor -> context
[
self._classifier_gate,
self._context_aggregator.user(),
self._classifier_llm,
self._classification_processor,
self._context_aggregator.assistant(),
],
)
# Register the voicemail detected event after super().__init__()
self._register_event_handler("on_voicemail_detected")
def _validate_prompt(self, prompt: str) -> None:
"""Validate custom prompt contains required response format instructions.
Custom prompts must instruct the LLM to respond with exactly "CONVERSATION"
or "VOICEMAIL" for the detection logic to work properly. This method
checks for the presence of these keywords and warns if they're missing.
Args:
prompt: The custom system prompt to validate.
"""
has_conversation = "CONVERSATION" in prompt
has_voicemail = "VOICEMAIL" in prompt
if not has_conversation or not has_voicemail:
logger.warning(
"Custom system prompt should instruct the LLM to respond with exactly "
'"CONVERSATION" or "VOICEMAIL" for proper detection functionality. '
f"Consider appending VoicemailDetector.CLASSIFIER_RESPONSE_INSTRUCTION to your prompt: "
f'"{self.CLASSIFIER_RESPONSE_INSTRUCTION}"'
)
def detector(self) -> "VoicemailDetector":
"""Get the detector pipeline for placement after STT in the main pipeline.
This should be placed after the STT service and before the context
aggregator in your main pipeline to enable voicemail classification.
Returns:
The VoicemailDetector instance itself (which is a ParallelPipeline).
"""
return self
def gate(self) -> TTSGate:
"""Get the gate processor for placement after TTS in the main pipeline.
This should be placed after the TTS service and before the transport
output to enable TTS frame gating during classification.
Returns:
The TTSGate processor instance.
"""
return self._voicemail_gate
def add_event_handler(self, event_name: str, handler):
"""Add an event handler for voicemail detection events.
Args:
event_name: The name of the event to handle.
handler: The function to call when the event occurs.
"""
if event_name == "on_voicemail_detected":
self._classification_processor.add_event_handler(event_name, handler)
else:
super().add_event_handler(event_name, handler)

View File

@@ -228,7 +228,7 @@ class OutputImageRawFrame(DataFrame, ImageRawFrame):
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {self.size}, format: {self.format})"
@dataclass

View File

@@ -11,7 +11,6 @@ processors without modifying the pipeline structure. Observers can be used
for logging, debugging, analytics, and monitoring pipeline behavior.
"""
from abc import abstractmethod
from dataclasses import dataclass
from typing_extensions import TYPE_CHECKING
@@ -23,6 +22,28 @@ if TYPE_CHECKING:
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class FrameProcessed:
"""Event data for frame processing in the pipeline.
Represents an event where a frame is being processed by a processor. This
data structure is typically used by observers to track the flow of frames
through the pipeline for logging, debugging, or analytics purposes.
Parameters:
processor: The processor processing the frame.
frame: The frame being processed.
direction: The direction of the frame (e.g., downstream or upstream).
timestamp: The time when the frame was pushed, based on the pipeline clock.
"""
processor: "FrameProcessor"
frame: Frame
direction: "FrameDirection"
timestamp: int
@dataclass
class FramePushed:
"""Event data for frame transfers between processors in the pipeline.
@@ -56,7 +77,18 @@ class BaseObserver(BaseObject):
performance analysis, and analytics collection.
"""
@abstractmethod
async def on_process_frame(self, data: FrameProcessed):
"""Handle the event when a frame is being processed by a processor.
This method should be implemented by subclasses to define specific
behavior (e.g., logging, monitoring, debugging) when a frame is
being processed by a processor.
Args:
data: The event data containing details about the frame processing.
"""
pass
async def on_push_frame(self, data: FramePushed):
"""Handle the event when a frame is pushed from one processor to another.

View File

@@ -6,31 +6,12 @@
"""Base pipeline implementation for frame processing."""
from abc import abstractmethod
from typing import List
from pipecat.processors.frame_processor import FrameProcessor
class BasePipeline(FrameProcessor):
"""Base class for all pipeline implementations.
"""Base class for all pipeline implementations."""
Provides the foundation for pipeline processors that need to support
metrics collection from their contained processors.
"""
def __init__(self):
def __init__(self, **kwargs):
"""Initialize the base pipeline."""
super().__init__()
@abstractmethod
def processors_with_metrics(self) -> List[FrameProcessor]:
"""Return processors that can generate metrics.
Implementing classes should collect and return all processors within
their pipeline that support metrics generation.
Returns:
List of frame processors that support metrics collection.
"""
pass
super().__init__(**kwargs)

View File

@@ -11,106 +11,15 @@ sub-pipelines concurrently, with coordination for system frames and proper
handling of pipeline lifecycle events.
"""
import asyncio
from itertools import chain
from typing import Awaitable, Callable, Dict, List
from typing import Dict, List
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
)
from pipecat.frames.frames import EndFrame, Frame, StartFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
class ParallelPipelineSource(FrameProcessor):
"""Source processor for parallel pipeline branches.
Handles frame routing for parallel pipeline inputs, directing system frames
to the parent push function and other upstream frames to a queue for processing.
"""
def __init__(
self,
upstream_queue: asyncio.Queue,
push_frame_func: Callable[[Frame, FrameDirection], Awaitable[None]],
):
"""Initialize the parallel pipeline source.
Args:
upstream_queue: Queue for collecting upstream frames from this branch.
push_frame_func: Function to push frames to the parent parallel pipeline.
"""
super().__init__(enable_direct_mode=True)
self._up_queue = upstream_queue
self._push_frame_func = push_frame_func
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with special handling for system frames.
Args:
frame: The frame to process.
direction: The direction of frame flow.
"""
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
if isinstance(frame, SystemFrame):
await self._push_frame_func(frame, direction)
else:
await self._up_queue.put(frame)
case FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
class ParallelPipelineSink(FrameProcessor):
"""Sink processor for parallel pipeline branches.
Handles frame routing for parallel pipeline outputs, directing system frames
to the parent push function and other downstream frames to a queue for coordination.
"""
def __init__(
self,
downstream_queue: asyncio.Queue,
push_frame_func: Callable[[Frame, FrameDirection], Awaitable[None]],
):
"""Initialize the parallel pipeline sink.
Args:
downstream_queue: Queue for collecting downstream frames from this branch.
push_frame_func: Function to push frames to the parent parallel pipeline.
"""
super().__init__(enable_direct_mode=True)
self._down_queue = downstream_queue
self._push_frame_func = push_frame_func
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with special handling for system frames.
Args:
frame: The frame to process.
direction: The direction of frame flow.
"""
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
case FrameDirection.DOWNSTREAM:
if isinstance(frame, SystemFrame):
await self._push_frame_func(frame, direction)
else:
await self._down_queue.put(frame)
class ParallelPipeline(BasePipeline):
@@ -132,28 +41,69 @@ class ParallelPipeline(BasePipeline):
Exception: If no processor lists are provided.
TypeError: If any argument is not a list of processors.
"""
# We don't set it to direct mode because we use frame pausing and that
# requires queues.
super().__init__()
if len(args) == 0:
raise Exception(f"ParallelPipeline needs at least one argument")
self._args = args
self._sources = []
self._sinks = []
self._pipelines = []
self._seen_ids = set()
self._endframe_counter: Dict[int, int] = {}
self._start_frame_counter: Dict[int, int] = {}
self._started = False
self._frame_counter: Dict[int, int] = {}
self._up_task = None
self._down_task = None
logger.debug(f"Creating {self} pipelines")
for processors in args:
if not isinstance(processors, list):
raise TypeError(f"ParallelPipeline argument {processors} is not a list")
num_pipelines = len(self._pipelines)
# We add a source before the pipeline and a sink after so we control
# the frames that are pushed upstream and downstream.
source = PipelineSource(
self._parallel_push_frame, name=f"{self}::Source{num_pipelines}"
)
sink = PipelineSink(self._pipeline_sink_push_frame, name=f"{self}::Sink{num_pipelines}")
# Create pipeline
pipeline = Pipeline(processors, source=source, sink=sink)
self._pipelines.append(pipeline)
logger.debug(f"Finished creating {self} pipelines")
#
# BasePipeline
# Frame processor
#
@property
def processors(self):
"""Return the list of sub-processors contained within this processor.
Only compound processors (e.g. pipelines and parallel pipelines) have
sub-processors. Non-compound processors will return an empty list.
Returns:
The list of sub-processors if this is a compound processor.
"""
return self._pipelines
@property
def entry_processors(self) -> List["FrameProcessor"]:
"""Return the list of entry processors for this processor.
Entry processors are the first processors in a compound processor
(e.g. pipelines, parallel pipelines). Note that pipelines can also be an
entry processor as pipelines are processors themselves. Non-compound
processors will simply return an empty list.
Returns:
The list of entry processors.
"""
return self._pipelines
def processors_with_metrics(self) -> List[FrameProcessor]:
"""Collect processors that can generate metrics from all parallel branches.
@@ -162,10 +112,6 @@ class ParallelPipeline(BasePipeline):
"""
return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines))
#
# Frame processor
#
async def setup(self, setup: FrameProcessorSetup):
"""Set up the parallel pipeline and all its branches.
@@ -176,39 +122,14 @@ class ParallelPipeline(BasePipeline):
TypeError: If any processor list argument is not actually a list.
"""
await super().setup(setup)
self._up_queue = WatchdogQueue(setup.task_manager)
self._down_queue = WatchdogQueue(setup.task_manager)
logger.debug(f"Creating {self} pipelines")
for processors in self._args:
if not isinstance(processors, list):
raise TypeError(f"ParallelPipeline argument {processors} is not a list")
# We will add a source before the pipeline and a sink after.
source = ParallelPipelineSource(self._up_queue, self._parallel_push_frame)
sink = ParallelPipelineSink(self._down_queue, self._pipeline_sink_push_frame)
self._sources.append(source)
self._sinks.append(sink)
# Create pipeline
pipeline = Pipeline(processors)
source.link(pipeline)
pipeline.link(sink)
self._pipelines.append(pipeline)
logger.debug(f"Finished creating {self} pipelines")
await asyncio.gather(*[s.setup(setup) for s in self._sources])
await asyncio.gather(*[p.setup(setup) for p in self._pipelines])
await asyncio.gather(*[s.setup(setup) for s in self._sinks])
for p in self._pipelines:
await p.setup(setup)
async def cleanup(self):
"""Clean up the parallel pipeline and all its branches."""
await super().cleanup()
await asyncio.gather(*[s.cleanup() for s in self._sources])
await asyncio.gather(*[p.cleanup() for p in self._pipelines])
await asyncio.gather(*[s.cleanup() for s in self._sinks])
for p in self._pipelines:
await p.cleanup()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames through all parallel branches with lifecycle coordination.
@@ -219,79 +140,15 @@ class ParallelPipeline(BasePipeline):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._start_frame_counter[frame.id] = len(self._pipelines)
elif isinstance(frame, EndFrame):
self._endframe_counter[frame.id] = len(self._pipelines)
elif isinstance(frame, CancelFrame):
await self._cancel()
# Parallel pipeline synchronized frames.
if isinstance(frame, (StartFrame, EndFrame)):
self._frame_counter[frame.id] = len(self._pipelines)
await self.pause_processing_system_frames()
await self.pause_processing_frames()
if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks])
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources])
# Handle interruptions after everything has been cancelled.
if isinstance(frame, StartInterruptionFrame):
await self._handle_interruption()
# Wait for tasks to finish.
elif isinstance(frame, EndFrame):
await self._stop()
async def _start(self, frame: StartFrame):
"""Start the parallel pipeline processing tasks."""
await self._create_tasks()
async def _stop(self):
"""Stop all parallel pipeline processing tasks."""
if self._up_task:
# The up task doesn't receive an EndFrame, so we just cancel it.
await self.cancel_task(self._up_task)
self._up_task = None
if self._down_task:
# The down tasks waits for the last EndFrame sent by the internal
# pipelines.
await self._down_task
self._down_task = None
async def _cancel(self):
"""Cancel all parallel pipeline processing tasks."""
if self._up_task:
self._up_queue.cancel()
await self.cancel_task(self._up_task)
self._up_task = None
if self._down_task:
self._down_queue.cancel()
await self.cancel_task(self._down_task)
self._down_task = None
async def _create_tasks(self):
"""Create upstream and downstream processing tasks if not already running."""
if not self._up_task:
self._up_task = self.create_task(self._process_up_queue())
if not self._down_task:
self._down_task = self.create_task(self._process_down_queue())
async def _drain_queue(self, queue: asyncio.Queue):
try:
while not queue.empty():
queue.get_nowait()
except asyncio.QueueEmpty:
logger.debug(f"Draining {self} queue already empty")
async def _drain_queues(self):
"""Drain all frames from upstream and downstream queues."""
await self._drain_queue(self._up_queue)
await self._drain_queue(self._down_queue)
async def _handle_interruption(self):
"""Handle interruption by cancelling tasks, draining queues, and restarting."""
await self._cancel()
await self._drain_queues()
await self._create_tasks()
# Process frames in each of the sub-pipelines.
for p in self._pipelines:
await p.queue_frame(frame, direction)
async def _parallel_push_frame(self, frame: Frame, direction: FrameDirection):
"""Push frames while avoiding duplicates using frame ID tracking."""
@@ -300,52 +157,18 @@ class ParallelPipeline(BasePipeline):
await self.push_frame(frame, direction)
async def _pipeline_sink_push_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
# Decrement counter and check if all pipelines have processed the StartFrame
start_frame_counter = self._start_frame_counter.get(frame.id, 0)
if start_frame_counter > 0:
self._start_frame_counter[frame.id] -= 1
start_frame_counter = self._start_frame_counter[frame.id]
# Parallel pipeline synchronized frames.
if isinstance(frame, (StartFrame, EndFrame)):
# Decrement counter.
frame_counter = self._frame_counter.get(frame.id, 0)
if frame_counter > 0:
self._frame_counter[frame.id] -= 1
frame_counter = self._frame_counter[frame.id]
# Only push the StartFrame when all pipelines have processed it
if start_frame_counter == 0:
self._started = True
await self._start(frame)
# Only push the frame when all pipelines have processed it.
if frame_counter == 0:
await self._parallel_push_frame(frame, direction)
await self.resume_processing_system_frames()
await self.resume_processing_frames()
else:
if self._started:
await self._parallel_push_frame(frame, direction)
else:
await self._down_queue.put(frame)
async def _process_up_queue(self):
"""Process upstream frames from all parallel branches."""
while True:
frame = await self._up_queue.get()
await self._parallel_push_frame(frame, FrameDirection.UPSTREAM)
self._up_queue.task_done()
async def _process_down_queue(self):
"""Process downstream frames with EndFrame coordination.
Coordinates EndFrames to ensure they are only pushed upstream once
all parallel branches have completed processing them.
"""
running = True
while running:
frame = await self._down_queue.get()
endframe_counter = self._endframe_counter.get(frame.id, 0)
# If we have a counter, decrement it.
if endframe_counter > 0:
self._endframe_counter[frame.id] -= 1
endframe_counter = self._endframe_counter[frame.id]
# If we don't have a counter or we reached 0, push the frame.
if endframe_counter == 0:
await self._parallel_push_frame(frame, FrameDirection.DOWNSTREAM)
running = not (endframe_counter == 0 and isinstance(frame, EndFrame))
self._down_queue.task_done()
await self._parallel_push_frame(frame, direction)

View File

@@ -11,7 +11,7 @@ in sequence and manages frame flow between them, along with helper classes
for pipeline source and sink operations.
"""
from typing import Callable, Coroutine, List
from typing import Callable, Coroutine, List, Optional
from pipecat.frames.frames import Frame
from pipecat.pipeline.base_pipeline import BasePipeline
@@ -26,13 +26,14 @@ class PipelineSource(FrameProcessor):
provided upstream handler function.
"""
def __init__(self, upstream_push_frame: Callable[[Frame, FrameDirection], Coroutine]):
def __init__(self, upstream_push_frame: Callable[[Frame, FrameDirection], Coroutine], **kwargs):
"""Initialize the pipeline source.
Args:
upstream_push_frame: Coroutine function to handle upstream frames.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(enable_direct_mode=True)
super().__init__(enable_direct_mode=True, **kwargs)
self._upstream_push_frame = upstream_push_frame
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -59,13 +60,16 @@ class PipelineSink(FrameProcessor):
provided downstream handler function.
"""
def __init__(self, downstream_push_frame: Callable[[Frame, FrameDirection], Coroutine]):
def __init__(
self, downstream_push_frame: Callable[[Frame, FrameDirection], Coroutine], **kwargs
):
"""Initialize the pipeline sink.
Args:
downstream_push_frame: Coroutine function to handle downstream frames.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(enable_direct_mode=True)
super().__init__(enable_direct_mode=True, **kwargs)
self._downstream_push_frame = downstream_push_frame
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -92,26 +96,60 @@ class Pipeline(BasePipeline):
provides metrics collection from contained processors.
"""
def __init__(self, processors: List[FrameProcessor]):
def __init__(
self,
processors: List[FrameProcessor],
*,
source: Optional[FrameProcessor] = None,
sink: Optional[FrameProcessor] = None,
):
"""Initialize the pipeline with a list of processors.
Args:
processors: List of frame processors to connect in sequence.
source: An optional pipeline source processor.
sink: An optional pipeline sink processor.
"""
super().__init__()
super().__init__(enable_direct_mode=True)
# Add a source and a sink queue so we can forward frames upstream and
# downstream outside of the pipeline.
self._source = PipelineSource(self.push_frame)
self._sink = PipelineSink(self.push_frame)
self._source = source or PipelineSource(self.push_frame, name=f"{self}::Source")
self._sink = sink or PipelineSink(self.push_frame, name=f"{self}::Sink")
self._processors: List[FrameProcessor] = [self._source] + processors + [self._sink]
self._link_processors()
#
# BasePipeline
# Frame processor
#
@property
def processors(self):
"""Return the list of sub-processors contained within this processor.
Only compound processors (e.g. pipelines and parallel pipelines) have
sub-processors. Non-compound processors will return an empty list.
Returns:
The list of sub-processors if this is a compound processor.
"""
return self._processors
@property
def entry_processors(self) -> List["FrameProcessor"]:
"""Return the list of entry processors for this processor.
Entry processors are the first processors in a compound processor
(e.g. pipelines, parallel pipelines). Note that pipelines can also be an
entry processor as pipelines are processors themselves. Non-compound
processors will simply return an empty list.
Returns:
The list of entry processors.
"""
return [self._source]
def processors_with_metrics(self):
"""Return processors that can generate metrics.
@@ -122,17 +160,12 @@ class Pipeline(BasePipeline):
List of frame processors that can generate metrics.
"""
services = []
for p in self._processors:
if isinstance(p, BasePipeline):
services.extend(p.processors_with_metrics())
elif p.can_generate_metrics():
for p in self.processors:
if p.can_generate_metrics():
services.append(p)
services.extend(p.processors_with_metrics())
return services
#
# Frame processor
#
async def setup(self, setup: FrameProcessorSetup):
"""Set up the pipeline and all contained processors.
@@ -175,7 +208,5 @@ class Pipeline(BasePipeline):
"""Link all processors in sequence and set their parent."""
prev = self._processors[0]
for curr in self._processors[1:]:
prev.set_parent(self)
prev.link(curr)
prev = curr
prev.set_parent(self)

View File

@@ -71,7 +71,10 @@ class PipelineRunner(BaseObject):
logger.debug(f"Runner {self} started running {task}")
self._tasks[task.name] = task
params = PipelineTaskParams(loop=self._loop)
await task.run(params)
try:
await task.run(params)
except asyncio.CancelledError:
await self._cancel()
del self._tasks[task.name]
# Cleanup base object.
@@ -95,6 +98,10 @@ class PipelineRunner(BaseObject):
async def cancel(self):
"""Cancel all running tasks immediately."""
logger.debug(f"Cancelling runner {self}")
await self._cancel()
async def _cancel(self):
"""Cancel all running tasks immediately."""
await asyncio.gather(*[t.cancel() for t in self._tasks.values()])
def _setup_sigint(self):

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import ControlFrame, EndFrame, Frame, SystemFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
@dataclass
@@ -128,40 +127,15 @@ class SyncParallelPipeline(BasePipeline):
if len(args) == 0:
raise Exception(f"SyncParallelPipeline needs at least one argument")
self._args = args
self._sinks = []
self._sources = []
self._pipelines = []
#
# BasePipeline
#
def processors_with_metrics(self) -> List[FrameProcessor]:
"""Collect processors that can generate metrics from all parallel pipelines.
Returns:
List of frame processors that support metrics collection from all parallel paths.
"""
return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines))
#
# Frame processor
#
async def setup(self, setup: FrameProcessorSetup):
"""Set up the parallel pipeline and all contained processors.
Args:
setup: Configuration for frame processor setup.
"""
await super().setup(setup)
self._up_queue = WatchdogQueue(setup.task_manager)
self._down_queue = WatchdogQueue(setup.task_manager)
self._up_queue = asyncio.Queue()
self._down_queue = asyncio.Queue()
logger.debug(f"Creating {self} pipelines")
for processors in self._args:
for processors in args:
if not isinstance(processors, list):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
@@ -171,29 +145,68 @@ class SyncParallelPipeline(BasePipeline):
source = SyncParallelPipelineSource(up_queue)
sink = SyncParallelPipelineSink(down_queue)
# Create pipeline
pipeline = Pipeline(processors)
source.link(pipeline)
pipeline.link(sink)
self._pipelines.append(pipeline)
# Keep track of sources and sinks. We also keep the output queue of
# the source and the sinks so we can use it later.
self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})
# Create pipeline
pipeline = Pipeline(processors, source=source, sink=sink)
self._pipelines.append(pipeline)
logger.debug(f"Finished creating {self} pipelines")
await asyncio.gather(*[s["processor"].setup(setup) for s in self._sources])
#
# Frame processor
#
@property
def processors(self):
"""Return the list of sub-processors contained within this processor.
Only compound processors (e.g. pipelines and parallel pipelines) have
sub-processors. Non-compound processors will return an empty list.
Returns:
The list of sub-processors if this is a compound processor.
"""
return self._pipelines
@property
def entry_processors(self) -> List["FrameProcessor"]:
"""Return the list of entry processors for this processor.
Entry processors are the first processors in a compound processor
(e.g. pipelines, parallel pipelines). Note that pipelines can also be an
entry processor as pipelines are processors themselves. Non-compound
processors will simply return an empty list.
Returns:
The list of entry processors.
"""
return self._sources
def processors_with_metrics(self) -> List[FrameProcessor]:
"""Collect processors that can generate metrics from all parallel pipelines.
Returns:
List of frame processors that support metrics collection from all parallel paths.
"""
return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines))
async def setup(self, setup: FrameProcessorSetup):
"""Set up the parallel pipeline and all contained processors.
Args:
setup: Configuration for frame processor setup.
"""
await super().setup(setup)
await asyncio.gather(*[p.setup(setup) for p in self._pipelines])
await asyncio.gather(*[s["processor"].setup(setup) for s in self._sinks])
async def cleanup(self):
"""Clean up the parallel pipeline and all contained processors."""
await super().cleanup()
await asyncio.gather(*[s["processor"].cleanup() for s in self._sources])
await asyncio.gather(*[p.cleanup() for p in self._pipelines])
await asyncio.gather(*[s["processor"].cleanup() for s in self._sinks])
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames through all parallel pipelines with synchronization.

View File

@@ -32,26 +32,24 @@ from pipecat.frames.frames import (
Frame,
HeartbeatFrame,
InputAudioRawFrame,
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
MetricsFrame,
StartFrame,
StopFrame,
StopTaskFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import (
WATCHDOG_TIMEOUT,
BaseTaskManager,
TaskManager,
TaskManagerParams,
)
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -101,70 +99,6 @@ class PipelineParams(BaseModel):
start_metadata: Dict[str, Any] = Field(default_factory=dict)
class PipelineTaskSource(FrameProcessor):
"""Source processor for pipeline tasks that handles frame routing.
This is the source processor that is linked at the beginning of the
pipeline given to the pipeline task. It allows us to easily push frames
downstream to the pipeline and also receive upstream frames coming from the
pipeline.
"""
def __init__(self, up_queue: asyncio.Queue):
"""Initialize the pipeline task source.
Args:
up_queue: Queue for upstream frame processing.
**kwargs: Additional arguments passed to the parent class.
"""
super().__init__(enable_direct_mode=True)
self._up_queue = up_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and route them based on direction.
Args:
frame: The frame to process.
direction: The direction of frame flow.
"""
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self._up_queue.put(frame)
case FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
class PipelineTaskSink(FrameProcessor):
"""Sink processor for pipeline tasks that handles final frame processing.
This is the sink processor that is linked at the end of the pipeline
given to the pipeline task. It allows us to receive downstream frames and
act on them, for example, waiting to receive an EndFrame.
"""
def __init__(self, down_queue: asyncio.Queue):
"""Initialize the pipeline task sink.
Args:
down_queue: Queue for downstream frame processing.
**kwargs: Additional arguments passed to the parent class.
"""
super().__init__(enable_direct_mode=True)
self._down_queue = down_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and route them to the downstream queue.
Args:
frame: The frame to process.
direction: The direction of frame flow.
"""
await super().process_frame(frame, direction)
await self._down_queue.put(frame)
class PipelineTask(BasePipelineTask):
"""Manages the execution of a pipeline, handling frame processing and task lifecycle.
@@ -196,7 +130,7 @@ class PipelineTask(BasePipelineTask):
def __init__(
self,
pipeline: BasePipeline,
pipeline: FrameProcessor,
*,
params: Optional[PipelineParams] = None,
additional_span_attributes: Optional[dict] = None,
@@ -206,16 +140,17 @@ class PipelineTask(BasePipelineTask):
conversation_id: Optional[str] = None,
enable_tracing: bool = False,
enable_turn_tracking: bool = True,
enable_watchdog_logging: bool = False,
enable_watchdog_timers: bool = False,
idle_timeout_frames: Tuple[Type[Frame], ...] = (
BotSpeakingFrame,
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
),
idle_timeout_secs: Optional[float] = 300,
observers: Optional[List[BaseObserver]] = None,
task_manager: Optional[BaseTaskManager] = None,
watchdog_timeout_secs: float = WATCHDOG_TIMEOUT,
):
"""Initialize the PipelineTask.
@@ -231,8 +166,6 @@ class PipelineTask(BasePipelineTask):
conversation_id: Optional custom ID for the conversation.
enable_tracing: Whether to enable tracing.
enable_turn_tracking: Whether to enable turn tracking.
enable_watchdog_logging: Whether to print task processing times.
enable_watchdog_timers: Whether to enable task watchdog timers.
idle_timeout_frames: A tuple with the frames that should trigger an idle
timeout if not received within `idle_timeout_seconds`.
idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or
@@ -240,11 +173,8 @@ class PipelineTask(BasePipelineTask):
automatically.
observers: List of observers for monitoring pipeline execution.
task_manager: Optional task manager for handling asyncio tasks.
watchdog_timeout_secs: Watchdog timer timeout (in seconds). A warning
will be logged if the watchdog timer is not reset before this timeout.
"""
super().__init__()
self._pipeline = pipeline
self._params = params or PipelineParams()
self._additional_span_attributes = additional_span_attributes or {}
self._cancel_on_idle_timeout = cancel_on_idle_timeout
@@ -253,11 +183,8 @@ class PipelineTask(BasePipelineTask):
self._conversation_id = conversation_id
self._enable_tracing = enable_tracing and is_tracing_available()
self._enable_turn_tracking = enable_turn_tracking
self._enable_watchdog_logging = enable_watchdog_logging
self._enable_watchdog_timers = enable_watchdog_timers
self._idle_timeout_frames = idle_timeout_frames
self._idle_timeout_secs = idle_timeout_secs
self._watchdog_timeout_secs = watchdog_timeout_secs
if self._params.observers:
import warnings
@@ -288,40 +215,30 @@ class PipelineTask(BasePipelineTask):
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
# This queue receives frames coming from the pipeline upstream.
self._up_queue = WatchdogQueue(self._task_manager)
self._process_up_task: Optional[asyncio.Task] = None
# This queue receives frames coming from the pipeline downstream.
self._down_queue = WatchdogQueue(self._task_manager)
self._process_down_task: Optional[asyncio.Task] = None
# This queue is the queue used to push frames to the pipeline.
self._push_queue = WatchdogQueue(self._task_manager)
self._push_queue = asyncio.Queue()
self._process_push_task: Optional[asyncio.Task] = None
# This is the heartbeat queue. When a heartbeat frame is received in the
# down queue we add it to the heartbeat queue for processing.
self._heartbeat_queue = WatchdogQueue(self._task_manager)
self._heartbeat_queue = asyncio.Queue()
self._heartbeat_push_task: Optional[asyncio.Task] = None
self._heartbeat_monitor_task: Optional[asyncio.Task] = None
# This is the idle queue. When frames are received downstream they are
# put in the queue. If no frame is received the pipeline is considered
# idle.
self._idle_queue = WatchdogQueue(self._task_manager)
self._idle_queue = asyncio.Queue()
self._idle_monitor_task: Optional[asyncio.Task] = None
# This event is used to indicate a finalize frame (e.g. EndFrame,
# StopFrame) has been received in the down queue.
self._pipeline_end_event = asyncio.Event()
# This is a source processor that we connect to the provided
# pipeline. This source processor allows up to receive and react to
# upstream frames.
self._source = PipelineTaskSource(self._up_queue)
self._source.link(pipeline)
# This is a sink processor that we connect to the provided
# pipeline. This sink processor allows up to receive and react to
# downstream frames.
self._sink = PipelineTaskSink(self._down_queue)
pipeline.link(self._sink)
# This is the final pipeline. It is composed of a source processor,
# followed by the user pipeline, and ending with a sink processor. The
# source allows us to receive and react to upstream frames, and the sink
# allows us to receive and react to downstream frames.
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
self._pipeline = Pipeline([pipeline], source=source, sink=sink)
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
@@ -446,24 +363,43 @@ class PipelineTask(BasePipelineTask):
# Create all main tasks and wait of the main push task. This is the
# task that pushes frames to the very beginning of our pipeline (our
# controlled PipelineTaskSource processor).
# controlled source processor).
push_task = await self._create_tasks()
await self._task_manager.wait_for_task(push_task)
await push_task
# We have already cleaned up the pipeline inside the task.
cleanup_pipeline = False
except asyncio.CancelledError:
# We are awaiting on the push task and it might be cancelled
# (e.g. Ctrl-C). This means we will get a CancelledError here as
# well, because you get a CancelledError in every place you are
# awaiting a task.
pass
finally:
await self._cancel_tasks()
await self._cleanup(cleanup_pipeline)
if self._check_dangling_tasks:
self._print_dangling_tasks()
# Pipeline has finished nicely.
self._finished = True
except asyncio.CancelledError:
# Raise exception back to the pipeline runner so it can cancel this
# task properly.
raise
finally:
# We can reach this point for different reasons:
#
# 1. The task has finished properly (e.g. `EndFrame`).
# 2. By calling `PipelineTask.cancel()`.
# 3. By asyncio task cancellation.
#
# Case (1) will execute the code below without issues because
# `self._finished` is true.
#
# Case (2) will execute the code below without issues because
# `self._cancelled` is true.
#
# Case (3) will raise the exception above (because we are cancelling
# the asyncio task). This will be then captured by the
# `PipelineRunner` which will call `PipelineTask.cancel()` and
# therefore becoming case (2).
if self._finished or self._cancelled:
logger.debug(f"Pipeline task {self} has finished, cleaning up resources")
await self._cancel_tasks()
await self._cleanup(cleanup_pipeline)
if self._check_dangling_tasks:
self._print_dangling_tasks()
self._finished = True
async def queue_frame(self, frame: Frame):
"""Queue a single frame to be pushed down the pipeline.
@@ -494,7 +430,7 @@ class PipelineTask(BasePipelineTask):
# Make sure everything is cleaned up downstream. This is sent
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._source.push_frame(CancelFrame())
await self._pipeline.queue_frame(CancelFrame())
# Wait for CancelFrame to make it throught the pipeline.
await self._wait_for_pipeline_end()
# Only cancel the push task, we don't want to be able to process any
@@ -506,12 +442,6 @@ class PipelineTask(BasePipelineTask):
async def _create_tasks(self):
"""Create and start all pipeline processing tasks."""
self._process_up_task = self._task_manager.create_task(
self._process_up_queue(), f"{self}::_process_up_queue"
)
self._process_down_task = self._task_manager.create_task(
self._process_down_queue(), f"{self}::_process_down_queue"
)
self._process_push_task = self._task_manager.create_task(
self._process_push_queue(), f"{self}::_process_push_queue"
)
@@ -545,14 +475,6 @@ class PipelineTask(BasePipelineTask):
await self._task_manager.cancel_task(self._process_push_task)
self._process_push_task = None
if self._process_up_task:
await self._task_manager.cancel_task(self._process_up_task)
self._process_up_task = None
if self._process_down_task:
await self._task_manager.cancel_task(self._process_down_task)
self._process_down_task = None
await self._maybe_cancel_heartbeat_tasks()
await self._maybe_cancel_idle_task()
@@ -572,7 +494,6 @@ class PipelineTask(BasePipelineTask):
async def _maybe_cancel_idle_task(self):
"""Cancel idle monitoring task if it is running."""
if self._idle_timeout_secs and self._idle_monitor_task:
self._idle_queue.cancel()
await self._task_manager.cancel_task(self._idle_monitor_task)
self._idle_monitor_task = None
@@ -592,23 +513,15 @@ class PipelineTask(BasePipelineTask):
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
mgr_params = TaskManagerParams(
loop=params.loop,
enable_watchdog_logging=self._enable_watchdog_logging,
enable_watchdog_timers=self._enable_watchdog_timers,
watchdog_timeout=self._watchdog_timeout_secs,
)
mgr_params = TaskManagerParams(loop=params.loop)
self._task_manager.setup(mgr_params)
setup = FrameProcessorSetup(
clock=self._clock,
task_manager=self._task_manager,
observer=self._observer,
watchdog_timers_enabled=self._enable_watchdog_timers,
)
await self._source.setup(setup)
await self._pipeline.setup(setup)
await self._sink.setup(setup)
async def _cleanup(self, cleanup_pipeline: bool):
"""Clean up the pipeline task and processors."""
@@ -620,10 +533,8 @@ class PipelineTask(BasePipelineTask):
self._turn_trace_observer.end_conversation_tracing()
# Cleanup pipeline processors.
await self._source.cleanup()
if cleanup_pipeline:
await self._pipeline.cleanup()
await self._sink.cleanup()
async def _process_push_queue(self):
"""Process frames from the push queue and send them through the pipeline.
@@ -647,16 +558,16 @@ class PipelineTask(BasePipelineTask):
interruption_strategies=self._params.interruption_strategies,
)
start_frame.metadata = self._params.start_metadata
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
await self._pipeline.queue_frame(start_frame)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
await self._source.queue_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
await self._pipeline.queue_frame(self._initial_metrics_frame())
running = True
cleanup_pipeline = True
while running:
frame = await self._push_queue.get()
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
await self._pipeline.queue_frame(frame)
if isinstance(frame, (CancelFrame, EndFrame, StopFrame)):
await self._wait_for_pipeline_end()
running = not isinstance(frame, (CancelFrame, EndFrame, StopFrame))
@@ -664,7 +575,7 @@ class PipelineTask(BasePipelineTask):
self._push_queue.task_done()
await self._cleanup(cleanup_pipeline)
async def _process_up_queue(self):
async def _source_push_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames coming upstream from the pipeline.
This is the task that processes frames coming upstream from the
@@ -672,33 +583,29 @@ class PipelineTask(BasePipelineTask):
pipeline to be stopped (e.g. EndTaskFrame) in which case we would send
an EndFrame down the pipeline.
"""
while True:
frame = await self._up_queue.get()
if isinstance(frame, self._reached_upstream_types):
await self._call_event_handler("on_frame_reached_upstream", frame)
if isinstance(frame, self._reached_upstream_types):
await self._call_event_handler("on_frame_reached_upstream", frame)
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
# Tell the task we should stop nicely.
await self.queue_frame(StopFrame())
elif isinstance(frame, ErrorFrame):
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
# Cancel all tasks downstream.
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
# Tell the task we should stop nicely.
await self.queue_frame(StopFrame())
elif isinstance(frame, ErrorFrame):
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
# Cancel all tasks downstream.
await self.queue_frame(CancelFrame())
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
else:
logger.warning(f"Something went wrong: {frame}")
self._up_queue.task_done()
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
else:
logger.warning(f"Something went wrong: {frame}")
async def _process_down_queue(self):
async def _sink_push_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames coming downstream from the pipeline.
This tasks process frames coming downstream from the pipeline. For
@@ -706,34 +613,30 @@ class PipelineTask(BasePipelineTask):
processors have handled the EndFrame and therefore we can exit the task
cleanly.
"""
while True:
frame = await self._down_queue.get()
# Queue received frame to the idle queue so we can monitor idle
# pipelines.
await self._idle_queue.put(frame)
# Queue received frame to the idle queue so we can monitor idle
# pipelines.
await self._idle_queue.put(frame)
if isinstance(frame, self._reached_downstream_types):
await self._call_event_handler("on_frame_reached_downstream", frame)
if isinstance(frame, self._reached_downstream_types):
await self._call_event_handler("on_frame_reached_downstream", frame)
if isinstance(frame, StartFrame):
await self._call_event_handler("on_pipeline_started", frame)
if isinstance(frame, StartFrame):
await self._call_event_handler("on_pipeline_started", frame)
# Start heartbeat tasks now that StartFrame has been processed
# by all processors in the pipeline
self._maybe_start_heartbeat_tasks()
elif isinstance(frame, EndFrame):
await self._call_event_handler("on_pipeline_ended", frame)
self._pipeline_end_event.set()
elif isinstance(frame, StopFrame):
await self._call_event_handler("on_pipeline_stopped", frame)
self._pipeline_end_event.set()
elif isinstance(frame, CancelFrame):
await self._call_event_handler("on_pipeline_cancelled", frame)
self._pipeline_end_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
self._down_queue.task_done()
# Start heartbeat tasks now that StartFrame has been processed
# by all processors in the pipeline
self._maybe_start_heartbeat_tasks()
elif isinstance(frame, EndFrame):
await self._call_event_handler("on_pipeline_ended", frame)
self._pipeline_end_event.set()
elif isinstance(frame, StopFrame):
await self._call_event_handler("on_pipeline_stopped", frame)
self._pipeline_end_event.set()
elif isinstance(frame, CancelFrame):
await self._call_event_handler("on_pipeline_cancelled", frame)
self._pipeline_end_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
async def _heartbeat_push_handler(self):
"""Push heartbeat frames at regular intervals."""
@@ -741,7 +644,7 @@ class PipelineTask(BasePipelineTask):
# Don't use `queue_frame()` because if an EndFrame is queued the
# task will just stop waiting for the pipeline to finish not
# allowing more frames to be pushed.
await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time()))
await self._pipeline.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time()))
await asyncio.sleep(self._params.heartbeats_period_secs)
async def _heartbeat_monitor_handler(self):
@@ -816,6 +719,10 @@ class PipelineTask(BasePipelineTask):
Returns:
Whether the pipeline task should continue running.
"""
# If we are cancelling, just exit the task.
if self._cancelled:
return True
logger.warning("Idle timeout detected. Last 10 frames received:")
for i, frame in enumerate(last_frames, 1):
logger.warning(f"Frame {i}: {frame}")

View File

@@ -13,13 +13,12 @@ the main pipeline execution.
import asyncio
import inspect
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from attr import dataclass
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
@dataclass
@@ -86,7 +85,7 @@ class TaskObserver(BaseObserver):
# If we already started, create a new proxy for the observer.
# Otherwise, it will be created in start().
if self._started():
if self._proxies:
proxy = self._create_proxy(observer)
self._proxies[observer] = proxy
@@ -97,7 +96,7 @@ class TaskObserver(BaseObserver):
observer: The observer to remove.
"""
# If the observer has a proxy, remove it.
if observer in self._proxies:
if self._proxies and observer in self._proxies:
proxy = self._proxies[observer]
# Remove the proxy so it doesn't get called anymore.
del self._proxies[observer]
@@ -120,22 +119,25 @@ class TaskObserver(BaseObserver):
for proxy in self._proxies.values():
await self._task_manager.cancel_task(proxy.task)
async def on_process_frame(self, data: FramePushed):
"""Queue frame data for all managed observers.
Args:
data: The frame push event data to distribute to observers.
"""
await self._send_to_proxy(data)
async def on_push_frame(self, data: FramePushed):
"""Queue frame data for all managed observers.
Args:
data: The frame push event data to distribute to observers.
"""
for proxy in self._proxies.values():
await proxy.queue.put(data)
def _started(self) -> bool:
"""Check if the task observer has been started."""
return self._proxies is not None
await self._send_to_proxy(data)
def _create_proxy(self, observer: BaseObserver) -> Proxy:
"""Create a proxy for a single observer."""
queue = WatchdogQueue(self._task_manager)
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer}::_proxy_task_handler",
@@ -151,6 +153,10 @@ class TaskObserver(BaseObserver):
proxies[observer] = proxy
return proxies
async def _send_to_proxy(self, data: Any):
for proxy in self._proxies.values():
await proxy.queue.put(data)
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
"""Handle frame processing for a single observer."""
on_push_frame_deprecated = False
@@ -169,11 +175,15 @@ class TaskObserver(BaseObserver):
while True:
data = await queue.get()
if on_push_frame_deprecated:
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
else:
await observer.on_push_frame(data)
if isinstance(data, FramePushed):
if on_push_frame_deprecated:
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
else:
await observer.on_push_frame(data)
elif isinstance(data, FrameProcessed):
await observer.on_process_frame(data)
queue.task_done()

View File

@@ -24,7 +24,7 @@ from pipecat.frames.frames import (
StartFrame,
TranscriptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.time import time_now_iso8601
@@ -64,7 +64,11 @@ class DTMFAggregator(FrameProcessor):
self._digit_event = asyncio.Event()
self._aggregation_task: Optional[asyncio.Task] = None
self._interruption_task: Optional[asyncio.Task] = None
async def cleanup(self) -> None:
"""Clean up resources."""
await super().cleanup()
await self._stop_aggregation_task()
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process incoming frames and handle DTMF aggregation.
@@ -82,7 +86,6 @@ class DTMFAggregator(FrameProcessor):
if self._aggregation:
await self._flush_aggregation()
await self._stop_aggregation_task()
await self._stop_interruption_task()
await self.push_frame(frame, direction)
elif isinstance(frame, InputDTMFFrame):
# Push the DTMF frame downstream first
@@ -102,7 +105,7 @@ class DTMFAggregator(FrameProcessor):
# For first digit, schedule interruption in separate task
if is_first_digit:
self._interruption_task = self.create_task(self._send_interruption_task())
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
# Check for immediate flush conditions
if frame.button == self._termination_digit:
@@ -111,16 +114,6 @@ class DTMFAggregator(FrameProcessor):
# Signal digit received for timeout handling
self._digit_event.set()
async def _send_interruption_task(self):
"""Send interruption frame safely in a separate task."""
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
async def _stop_interruption_task(self) -> None:
"""Stops the interruption task."""
if self._interruption_task:
await self.cancel_task(self._interruption_task)
self._interruption_task = None
def _create_aggregation_task(self) -> None:
"""Creates the aggregation task if it hasn't been created yet."""
if not self._aggregation_task:
@@ -139,7 +132,6 @@ class DTMFAggregator(FrameProcessor):
await asyncio.wait_for(self._digit_event.wait(), timeout=self._idle_timeout)
self._digit_event.clear()
except asyncio.TimeoutError:
self.reset_watchdog()
if self._aggregation:
await self._flush_aggregation()
@@ -157,8 +149,3 @@ class DTMFAggregator(FrameProcessor):
await self.push_frame(transcription_frame)
self._aggregation = ""
async def cleanup(self) -> None:
"""Clean up resources."""
await super().cleanup()
await self._stop_aggregation_task()

View File

@@ -670,7 +670,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
if self._vad_params
else self._params.turn_emulated_vad_timeout
)
await asyncio.wait_for(self._aggregation_event.wait(), timeout)
await asyncio.wait_for(self._aggregation_event.wait(), timeout=timeout)
await self._maybe_emulate_user_speaking()
except asyncio.TimeoutError:
if not self._user_speaking:
@@ -684,7 +684,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
)
self._emulating_vad = False
finally:
self.reset_watchdog()
self._aggregation_event.clear()
async def _maybe_emulate_user_speaking(self):
@@ -986,10 +985,6 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
def _context_updated_task_finished(self, task: asyncio.Task):
self._context_updated_tasks.discard(task)
# The task is finished so this should exit immediately. We need to do
# this because otherwise the task manager would report a dangling task
# if we don't remove it.
asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop())
class LLMUserResponseAggregator(LLMUserContextAggregator):

View File

@@ -178,6 +178,7 @@ class AudioBufferProcessor(FrameProcessor):
Calls audio handlers with any remaining buffered audio before stopping.
"""
await self._call_on_audio_data_handler()
self._reset_recording()
self._recording = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -230,6 +231,7 @@ class AudioBufferProcessor(FrameProcessor):
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
await self._call_on_audio_data_handler()
self._reset_recording()
# Process turn recording with preprocessed data.
if self._enable_turn_audio:
@@ -288,8 +290,6 @@ class AudioBufferProcessor(FrameProcessor):
self._num_channels,
)
self._reset_audio_buffers()
def _buffer_has_audio(self, buffer: bytearray) -> bool:
"""Check if a buffer contains audio data."""
return buffer is not None and len(buffer) > 0

View File

@@ -12,7 +12,6 @@ from typing import Awaitable, Callable, Optional
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
class ConsumerProcessor(FrameProcessor):
@@ -66,7 +65,7 @@ class ConsumerProcessor(FrameProcessor):
async def _start(self, _: StartFrame):
"""Start the consumer task and register with the producer."""
if not self._consumer_task:
self._queue: WatchdogQueue = self._producer.add_consumer()
self._queue = self._producer.add_consumer()
self._consumer_task = self.create_task(self._consumer_task_handler())
async def _stop(self, _: EndFrame):
@@ -77,7 +76,6 @@ class ConsumerProcessor(FrameProcessor):
async def _cancel(self, _: CancelFrame):
"""Cancel the consumer task."""
if self._consumer_task:
self._queue.cancel()
await self.cancel_task(self._consumer_task)
async def _consumer_task_handler(self):

View File

@@ -34,15 +34,9 @@ from pipecat.frames.frames import (
SystemFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
from pipecat.utils.asyncio.watchdog_priority_queue import (
WatchdogPriorityCancelSentinel,
WatchdogPriorityQueue,
)
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.base_object import BaseObject
@@ -69,16 +63,14 @@ class FrameProcessorSetup:
clock: The clock instance for timing operations.
task_manager: The task manager for handling async operations.
observer: Optional observer for monitoring frame processing events.
watchdog_timers_enabled: Whether to enable watchdog timers by default.
"""
clock: BaseClock
task_manager: BaseTaskManager
observer: Optional[BaseObserver] = None
watchdog_timers_enabled: bool = False
class FrameProcessorQueue(WatchdogPriorityQueue):
class FrameProcessorQueue(asyncio.PriorityQueue):
"""A priority queue for systems frames and other frames.
This is a specialized queue for frame processors that separates and
@@ -90,14 +82,14 @@ class FrameProcessorQueue(WatchdogPriorityQueue):
HIGH_PRIORITY = 1
LOW_PRIORITY = 2
def __init__(self, manager: BaseTaskManager):
def __init__(self):
"""Initialize the FrameProcessorQueue.
Args:
manager (BaseTaskManager): The task manager used by the internal watchdog queues.
"""
super().__init__(manager, tuple_size=3)
super().__init__()
self.__high_counter = 0
self.__low_counter = 0
@@ -151,10 +143,7 @@ class FrameProcessor(BaseObject):
*,
name: Optional[str] = None,
enable_direct_mode: bool = False,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
metrics: Optional[FrameProcessorMetrics] = None,
watchdog_timeout_secs: Optional[float] = None,
**kwargs,
):
"""Initialize the frame processor.
@@ -162,29 +151,16 @@ class FrameProcessor(BaseObject):
Args:
name: Optional name for this processor instance.
enable_direct_mode: Whether to process frames immediately or use internal queues.
enable_watchdog_logging: Whether to enable watchdog logging for tasks.
enable_watchdog_timers: Whether to enable watchdog timers for tasks.
metrics: Optional metrics collector for this processor.
watchdog_timeout_secs: Timeout in seconds for watchdog operations.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(name=name)
self._parent: Optional["FrameProcessor"] = None
super().__init__(name=name, **kwargs)
self._prev: Optional["FrameProcessor"] = None
self._next: Optional["FrameProcessor"] = None
# Enable direct mode to skip queues and process frames right away.
self._enable_direct_mode = enable_direct_mode
# Enable watchdog timers for all tasks created by this frame processor.
self._enable_watchdog_timers = enable_watchdog_timers
# Enable watchdog logging for all tasks created by this frame processor.
self._enable_watchdog_logging = enable_watchdog_logging
# Allow this frame processor to control their tasks timeout.
self._watchdog_timeout_secs = watchdog_timeout_secs
# Clock
self._clock: Optional[BaseClock] = None
@@ -226,6 +202,8 @@ class FrameProcessor(BaseObject):
# The input task that handles all types of frames. It processes system
# frames right away and queues non-system frames for later processing.
self.__should_block_system_frames = False
self.__input_event: Optional[asyncio.Event] = None
self.__input_frame_task: Optional[asyncio.Task] = None
# The process task processes non-system frames. Non-system frames will
@@ -234,6 +212,7 @@ class FrameProcessor(BaseObject):
# called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False
self.__process_event: Optional[asyncio.Event] = None
self.__process_frame_task: Optional[asyncio.Task] = None
@property
@@ -254,6 +233,50 @@ class FrameProcessor(BaseObject):
"""
return self._name
@property
def processors(self) -> List["FrameProcessor"]:
"""Return the list of sub-processors contained within this processor.
Only compound processors (e.g. pipelines and parallel pipelines) have
sub-processors. Non-compound processors will return an empty list.
Returns:
The list of sub-processors if this is a compound processor.
"""
return []
@property
def entry_processors(self) -> List["FrameProcessor"]:
"""Return the list of entry processors for this processor.
Entry processors are the first processors in a compound processor
(e.g. pipelines, parallel pipelines). Note that pipelines can also be an
entry processor as pipelines are processors themselves. Non-compound
processors will simply return an empty list.
Returns:
The list of entry processors.
"""
return []
@property
def next(self) -> Optional["FrameProcessor"]:
"""Get the next processor.
Returns:
The next processor, or None if there's no next processor.
"""
return self._next
@property
def previous(self) -> Optional["FrameProcessor"]:
"""Get the previous processor.
Returns:
The previous processor, or None if there's no previous processor.
"""
return self._prev
@property
def interruptions_allowed(self):
"""Check if interruptions are allowed for this processor.
@@ -313,6 +336,17 @@ class FrameProcessor(BaseObject):
raise Exception(f"{self} TaskManager is still not initialized.")
return self._task_manager
def processors_with_metrics(self):
"""Return processors that can generate metrics.
Recursively collects all processors that support metrics generation,
including those from nested processors.
Returns:
List of frame processors that can generate metrics.
"""
return []
def can_generate_metrics(self) -> bool:
"""Check if this processor can generate metrics.
@@ -380,23 +414,12 @@ class FrameProcessor(BaseObject):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
def create_task(
self,
coroutine: Coroutine,
name: Optional[str] = None,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout_secs: Optional[float] = None,
) -> asyncio.Task:
def create_task(self, coroutine: Coroutine, name: Optional[str] = None) -> asyncio.Task:
"""Create a new task managed by this processor.
Args:
coroutine: The coroutine to run in the task.
name: Optional name for the task.
enable_watchdog_logging: Whether to enable watchdog logging.
enable_watchdog_timers: Whether to enable watchdog timers.
watchdog_timeout_secs: Timeout in seconds for watchdog operations.
Returns:
The created asyncio task.
@@ -405,21 +428,7 @@ class FrameProcessor(BaseObject):
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.task_manager.create_task(
coroutine,
name,
enable_watchdog_logging=(
enable_watchdog_logging
if enable_watchdog_logging
else self._enable_watchdog_logging
),
enable_watchdog_timers=(
enable_watchdog_timers if enable_watchdog_timers else self._enable_watchdog_timers
),
watchdog_timeout=(
watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout_secs
),
)
return self.task_manager.create_task(coroutine, name)
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
"""Cancel a task managed by this processor.
@@ -433,15 +442,27 @@ class FrameProcessor(BaseObject):
async def wait_for_task(self, task: asyncio.Task, timeout: Optional[float] = None):
"""Wait for a task to complete.
.. deprecated:: 0.0.81
This function is deprecated, use `await task` or
`await asyncio.wait_for(task, timeout) instead.
Args:
task: The task to wait for.
timeout: Optional timeout for waiting.
"""
await self.task_manager.wait_for_task(task, timeout)
import warnings
def reset_watchdog(self):
"""Reset the watchdog timer for the current task."""
self.task_manager.task_reset_watchdog()
warnings.warn(
"`FrameProcessor.wait_for_task()` is deprecated. "
"Use `await task` or `await asyncio.wait_for(task, timeout)` instead.",
DeprecationWarning,
stacklevel=2,
)
if timeout:
await asyncio.wait_for(task, timeout)
else:
await task
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
@@ -452,11 +473,6 @@ class FrameProcessor(BaseObject):
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
self._watchdog_timers_enabled = (
self._enable_watchdog_timers
if self._enable_watchdog_timers
else setup.watchdog_timers_enabled
)
# Create processing tasks.
self.__create_input_task()
@@ -482,30 +498,6 @@ class FrameProcessor(BaseObject):
processor._prev = self
logger.debug(f"Linking {self} -> {self._next}")
def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Get the event loop used by this processor.
Returns:
The asyncio event loop.
"""
return self.task_manager.get_event_loop()
def set_parent(self, parent: "FrameProcessor"):
"""Set the parent processor for this processor.
Args:
parent: The parent processor.
"""
self._parent = parent
def get_parent(self) -> Optional["FrameProcessor"]:
"""Get the parent processor.
Returns:
The parent processor, or None if no parent is set.
"""
return self._parent
def get_clock(self) -> BaseClock:
"""Get the clock used by this processor.
@@ -519,6 +511,14 @@ class FrameProcessor(BaseObject):
raise Exception(f"{self} Clock is still not initialized.")
return self._clock
def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Get the event loop used by this processor.
Returns:
The asyncio event loop.
"""
return self.task_manager.get_event_loop()
async def queue_frame(
self,
frame: Frame,
@@ -546,12 +546,23 @@ class FrameProcessor(BaseObject):
logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True
async def pause_processing_system_frames(self):
"""Pause processing of queued system frames."""
logger.trace(f"{self}: pausing system frame processing")
self.__should_block_system_frames = True
async def resume_processing_frames(self):
"""Resume processing of queued frames."""
logger.trace(f"{self}: resuming frame processing")
if self.__process_event:
self.__process_event.set()
async def resume_processing_system_frames(self):
"""Resume processing of queued system frames."""
logger.trace(f"{self}: resuming system frame processing")
if self.__input_event:
self.__input_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process a frame.
@@ -559,6 +570,16 @@ class FrameProcessor(BaseObject):
frame: The frame to process.
direction: The direction of frame flow.
"""
if self._observer:
timestamp = self._clock.get_time() if self._clock else 0
data = FrameProcessed(
processor=self,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_process_frame(data)
if isinstance(frame, StartFrame):
await self.__start(frame)
elif isinstance(frame, StartInterruptionFrame):
@@ -715,13 +736,13 @@ class FrameProcessor(BaseObject):
return
if not self.__input_frame_task:
self.__input_queue = FrameProcessorQueue(self.task_manager)
self.__input_event = asyncio.Event()
self.__input_queue = FrameProcessorQueue()
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
async def __cancel_input_task(self):
"""Cancel the frame input processing task."""
if self.__input_frame_task:
self.__input_queue.cancel()
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
@@ -732,14 +753,13 @@ class FrameProcessor(BaseObject):
if not self.__process_frame_task:
self.__should_block_frames = False
self.__process_event = WatchdogEvent(self.task_manager)
self.__process_queue = WatchdogQueue(self.task_manager)
self.__process_event = asyncio.Event()
self.__process_queue = asyncio.Queue()
self.__process_frame_task = self.create_task(self.__process_frame_task_handler())
async def __cancel_process_task(self):
"""Cancel the non-system frame processing task."""
if self.__process_frame_task:
self.__process_queue.cancel()
await self.cancel_task(self.__process_frame_task)
self.__process_frame_task = None
@@ -764,6 +784,13 @@ class FrameProcessor(BaseObject):
"""
while True:
if self.__should_block_system_frames and self.__input_event:
logger.trace(f"{self}: system frame processing paused")
await self.__input_event.wait()
self.__input_event.clear()
self.__should_block_system_frames = False
logger.trace(f"{self}: system frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get()
if isinstance(frame, SystemFrame):
@@ -780,7 +807,7 @@ class FrameProcessor(BaseObject):
async def __process_frame_task_handler(self):
"""Handle non-system frames from the process queue."""
while True:
if self.__should_block_frames:
if self.__should_block_frames and self.__process_event:
logger.trace(f"{self}: frame processing paused")
await self.__process_event.wait()
self.__process_event.clear()

View File

@@ -72,11 +72,9 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import (
FunctionCallParams, # TODO(aleix): we shouldn't import `services` from `processors`
)
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.string import match_endofsentence
RTVI_PROTOCOL_VERSION = "1.0.0"
@@ -1315,10 +1313,10 @@ class RTVIProcessor(FrameProcessor):
async def _start(self, frame: StartFrame):
"""Start the RTVI processor tasks."""
if not self._action_task:
self._action_queue = WatchdogQueue(self.task_manager)
self._action_queue = asyncio.Queue()
self._action_task = self.create_task(self._action_task_handler())
if not self._message_task:
self._message_queue = WatchdogQueue(self.task_manager)
self._message_queue = asyncio.Queue()
self._message_task = self.create_task(self._message_task_handler())
await self._call_event_handler("on_bot_started")
@@ -1333,12 +1331,10 @@ class RTVIProcessor(FrameProcessor):
async def _cancel_tasks(self):
"""Cancel all running tasks."""
if self._action_task:
self._action_queue.cancel()
await self.cancel_task(self._action_task)
self._action_task = None
if self._message_task:
self._message_queue.cancel()
await self.cancel_task(self._message_task)
self._message_task = None

View File

@@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List, Optional
from pipecat.frames.frames import Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
class IdleFrameProcessor(FrameProcessor):
@@ -78,7 +77,7 @@ class IdleFrameProcessor(FrameProcessor):
def _create_idle_task(self):
"""Create and start the idle monitoring task."""
if not self._idle_task:
self._idle_event = WatchdogEvent(self.task_manager)
self._idle_event = asyncio.Event()
self._idle_task = self.create_task(self._idle_task_handler())
async def _idle_task_handler(self):

View File

@@ -9,7 +9,6 @@
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
import sentry_sdk
@@ -51,7 +50,7 @@ class SentryMetrics(FrameProcessorMetrics):
"""
await super().setup(task_manager)
if self._sentry_available:
self._sentry_queue = WatchdogQueue(task_manager)
self._sentry_queue = asyncio.Queue()
self._sentry_task = self.task_manager.create_task(
self._sentry_task_handler(), name=f"{self}::_sentry_task_handler"
)
@@ -64,7 +63,7 @@ class SentryMetrics(FrameProcessorMetrics):
await super().cleanup()
if self._sentry_task:
await self._sentry_queue.put(None)
await self.task_manager.wait_for_task(self._sentry_task)
await self._sentry_task
self._sentry_task = None
logger.trace(f"{self} Flushing Sentry metrics")
sentry_sdk.flush(timeout=5.0)

View File

@@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
async def identity_transformer(frame: Frame):
@@ -64,7 +63,7 @@ class ProducerProcessor(FrameProcessor):
Returns:
asyncio.Queue: The queue for the newly added consumer.
"""
queue = WatchdogQueue(self.task_manager)
queue = asyncio.Queue()
self._consumers.append(queue)
return queue

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
class UserIdleProcessor(FrameProcessor):
@@ -78,7 +77,7 @@ class UserIdleProcessor(FrameProcessor):
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = None
self._idle_event = asyncio.Event()
def _wrap_callback(
self,
@@ -138,9 +137,6 @@ class UserIdleProcessor(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._idle_event = WatchdogEvent(self.task_manager)
# Check for end frames before processing
if isinstance(frame, (EndFrame, CancelFrame)):
# Stop the idle task, if it exists

View File

@@ -402,6 +402,7 @@ async def _run_daily_direct():
# Direct connections have no request body, so use empty dict
runner_args = DailyRunnerArguments(room_url=room_url, token=token)
runner_args.handle_sigint = True
# Get the bot module and run it directly
bot_module = _get_bot_module()

View File

@@ -260,6 +260,7 @@ async def maybe_capture_participant_screen(
await transport.capture_participant_video(
client["id"], framerate=framerate, video_source="screenVideo"
)
except ImportError:
pass

View File

@@ -53,11 +53,10 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
try:
from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven
from anthropic import NOT_GIVEN, APITimeoutError, AsyncAnthropic, NotGiven
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Anthropic, you need to `pip install pipecat-ai[anthropic]`.")
@@ -133,6 +132,8 @@ class AnthropicLLMService(LLMService):
model: str = "claude-sonnet-4-20250514",
params: Optional[InputParams] = None,
client=None,
retry_timeout_secs: Optional[float] = 5.0,
retry_on_timeout: Optional[bool] = False,
**kwargs,
):
"""Initialize the Anthropic LLM service.
@@ -142,6 +143,8 @@ class AnthropicLLMService(LLMService):
model: Model name to use. Defaults to "claude-sonnet-4-20250514".
params: Optional model parameters for inference.
client: Optional custom Anthropic client instance.
retry_timeout_secs: Request timeout in seconds for retry logic.
retry_on_timeout: Whether to retry the request once if it times out.
**kwargs: Additional arguments passed to parent LLMService.
"""
super().__init__(**kwargs)
@@ -150,6 +153,8 @@ class AnthropicLLMService(LLMService):
api_key=api_key
) # if the client is provided, use it and remove it, otherwise create a new one
self.set_model_name(model)
self._retry_timeout_secs = retry_timeout_secs
self._retry_on_timeout = retry_on_timeout
self._settings = {
"max_tokens": params.max_tokens,
"enable_prompt_caching_beta": params.enable_prompt_caching_beta or False,
@@ -167,6 +172,31 @@ class AnthropicLLMService(LLMService):
"""
return True
async def _create_message_stream(self, api_call, params):
"""Create message stream with optional timeout and retry.
Args:
api_call: The Anthropic API method to call.
params: Parameters for the API call.
Returns:
Async stream of message events.
"""
if self._retry_on_timeout:
try:
response = await asyncio.wait_for(
api_call(**params), timeout=self._retry_timeout_secs
)
return response
except (APITimeoutError, asyncio.TimeoutError):
# Retry, this time without a timeout so we get a response
logger.debug(f"{self}: Retrying message creation due to timeout")
response = await api_call(**params)
return response
else:
response = await api_call(**params)
return response
@property
def enable_prompt_caching_beta(self) -> bool:
"""Check if prompt caching beta feature is enabled.
@@ -250,7 +280,7 @@ class AnthropicLLMService(LLMService):
params.update(self._settings["extra"])
response = await api_call(**params)
response = await self._create_message_stream(api_call, params)
await self.stop_ttfb_metrics()
@@ -259,7 +289,7 @@ class AnthropicLLMService(LLMService):
json_accumulator = ""
function_calls = []
async for event in WatchdogAsyncIterator(response, manager=self.task_manager):
async for event in response:
# Aggregate streaming content, create frames, trigger events
if event.type == "content_block_delta":

View File

@@ -219,10 +219,7 @@ class AssemblyAISTTService(STTService):
await self._websocket.send(json.dumps({"type": "Terminate"}))
try:
await asyncio.wait_for(
self._termination_event.wait(),
timeout=5.0,
)
await asyncio.wait_for(self._termination_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Timed out waiting for termination message from server")
@@ -247,11 +244,9 @@ class AssemblyAISTTService(STTService):
try:
while self._connected:
try:
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
message = await self._websocket.recv()
data = json.loads(message)
await self._handle_message(data)
except asyncio.TimeoutError:
self.reset_watchdog()
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:

View File

@@ -29,7 +29,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -276,9 +275,7 @@ class AsyncAITTSService(InterruptibleTTSService):
self._started = False
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
async for message in self._get_websocket():
msg = json.loads(message)
if not msg:
continue
@@ -301,9 +298,8 @@ class AsyncAITTSService(InterruptibleTTSService):
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 3
while True:
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.state is State.OPEN:
@@ -335,7 +331,7 @@ class AsyncAITTSService(InterruptibleTTSService):
yield TTSStartedFrame()
self._started = True
msg = self._build_msg(text=text)
msg = self._build_msg(text=text, force=True)
try:
await self._get_websocket().send(msg)

View File

@@ -58,6 +58,7 @@ try:
import aioboto3
import httpx
from botocore.config import Config
from botocore.exceptions import ReadTimeoutError
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -724,6 +725,8 @@ class AWSBedrockLLMService(LLMService):
aws_region: str = "us-east-1",
params: Optional[InputParams] = None,
client_config: Optional[Config] = None,
retry_timeout_secs: Optional[float] = 5.0,
retry_on_timeout: Optional[bool] = False,
**kwargs,
):
"""Initialize the AWS Bedrock LLM service.
@@ -736,6 +739,8 @@ class AWSBedrockLLMService(LLMService):
aws_region: AWS region for the Bedrock service.
params: Model parameters and configuration.
client_config: Custom boto3 client configuration.
retry_timeout_secs: Request timeout in seconds for retry logic.
retry_on_timeout: Whether to retry the request once if it times out.
**kwargs: Additional arguments passed to parent LLMService.
"""
super().__init__(**kwargs)
@@ -762,6 +767,8 @@ class AWSBedrockLLMService(LLMService):
}
self.set_model_name(model)
self._retry_timeout_secs = retry_timeout_secs
self._retry_on_timeout = retry_on_timeout
self._settings = {
"max_tokens": params.max_tokens,
"temperature": params.temperature,
@@ -782,6 +789,31 @@ class AWSBedrockLLMService(LLMService):
"""
return True
async def _create_converse_stream(self, client, request_params):
"""Create converse stream with optional timeout and retry.
Args:
client: The AWS Bedrock client instance.
request_params: Parameters for the converse_stream call.
Returns:
Async stream of response events.
"""
if self._retry_on_timeout:
try:
response = await asyncio.wait_for(
await client.converse_stream(**request_params), timeout=self._retry_timeout_secs
)
return response
except (ReadTimeoutError, asyncio.TimeoutError) as e:
# Retry, this time without a timeout so we get a response
logger.debug(f"{self}: Retrying converse_stream due to timeout")
response = await client.converse_stream(**request_params)
return response
else:
response = await client.converse_stream(**request_params)
return response
def create_context_aggregator(
self,
context: OpenAILLMContext,
@@ -911,7 +943,7 @@ class AWSBedrockLLMService(LLMService):
service_name="bedrock-runtime", **self._aws_params
) as client:
# Call AWS Bedrock with streaming
response = await client.converse_stream(**request_params)
response = await self._create_converse_stream(client, request_params)
await self.stop_ttfb_metrics()
@@ -922,8 +954,6 @@ class AWSBedrockLLMService(LLMService):
function_calls = []
async for event in response["stream"]:
self.reset_watchdog()
# Handle text content
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]

View File

@@ -480,7 +480,7 @@ class AWSTranscribeSTTService(STTService):
break
try:
response = await asyncio.wait_for(self._ws_client.recv(), timeout=1.0)
response = await self._ws_client.recv()
headers, payload = decode_event(response)
@@ -531,8 +531,6 @@ class AWSTranscribeSTTService(STTService):
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
except asyncio.TimeoutError:
self.reset_watchdog()
except websockets.exceptions.ConnectionClosed as e:
logger.error(
f"{self} WebSocket connection closed in receive loop with code {e.code}: {e.reason}"

View File

@@ -62,7 +62,6 @@ from pipecat.services.aws_nova_sonic.context import (
)
from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.llm_service import LLMService
from pipecat.utils.asyncio.watchdog_coroutine import watchdog_coroutine
from pipecat.utils.time import time_now_iso8601
try:
@@ -795,7 +794,7 @@ class AWSNovaSonicLLMService(LLMService):
try:
while self._stream and not self._disconnecting:
output = await self._stream.await_output()
result = await watchdog_coroutine(output[1].receive(), manager=self.task_manager)
result = await output[1].receive()
if result.value and result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")

View File

@@ -29,7 +29,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import AudioContextWordTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -388,9 +387,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
self._context_id = None
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
async for message in self._get_websocket():
msg = json.loads(message)
if not msg or not self.audio_context_available(msg["context_id"]):
continue

View File

@@ -38,7 +38,6 @@ from pipecat.services.tts_service import (
WordTTSService,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for ElevenLabs configuration needed
@@ -245,6 +244,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
auto_mode: Whether to enable automatic mode optimization.
enable_ssml_parsing: Whether to parse SSML tags in text.
enable_logging: Whether to enable ElevenLabs logging.
apply_text_normalization: Text normalization mode ("auto", "on", "off").
"""
language: Optional[Language] = None
@@ -256,6 +256,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
auto_mode: Optional[bool] = True
enable_ssml_parsing: Optional[bool] = None
enable_logging: Optional[bool] = None
apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None
def __init__(
self,
@@ -320,6 +321,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
"auto_mode": str(params.auto_mode).lower(),
"enable_ssml_parsing": params.enable_ssml_parsing,
"enable_logging": params.enable_logging,
"apply_text_normalization": params.apply_text_normalization,
}
self.set_model_name(model)
self.set_voice(voice_id)
@@ -370,13 +372,49 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._connect()
async def _update_settings(self, settings: Mapping[str, Any]):
"""Update service settings and reconnect if voice changed."""
"""Update service settings and reconnect if voice, model, or language changed."""
# Track previous values for settings that require reconnection
prev_voice = self._voice_id
prev_model = self.model_name
prev_language = self._settings.get("language")
# Create snapshot of current voice settings to detect changes after update
prev_voice_settings = self._voice_settings.copy() if self._voice_settings else None
await super()._update_settings(settings)
if not prev_voice == self._voice_id:
logger.info(f"Switching TTS voice to: [{self._voice_id}]")
# Update voice settings for the next context creation
self._voice_settings = self._set_voice_settings()
# Check if URL-level settings changed (these require reconnection)
url_changed = (
prev_voice != self._voice_id
or prev_model != self.model_name
or prev_language != self._settings.get("language")
)
# Check if only voice settings changed (speed, stability, etc.)
voice_settings_changed = prev_voice_settings != self._voice_settings
if url_changed:
# These settings are in the WebSocket URL, so we need to reconnect
logger.debug(
f"URL-level setting changed (voice/model/language), reconnecting WebSocket"
)
await self._disconnect()
await self._connect()
elif voice_settings_changed and self._context_id:
# Voice settings can be updated by closing current context
# so new one gets created with updated voice settings
logger.debug(f"Voice settings changed, closing current context to apply changes")
try:
if self._websocket:
await self._websocket.send(
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.warning(f"Error closing context for voice settings update: {e}")
self._context_id = None
self._started = False
async def start(self, frame: StartFrame):
"""Start the ElevenLabs TTS service.
@@ -465,6 +503,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
if self._settings["enable_logging"]:
url += f"&enable_logging={self._settings['enable_logging']}"
if self._settings["apply_text_normalization"] is not None:
url += f"&apply_text_normalization={self._settings['apply_text_normalization']}"
# Language can only be used with the ELEVENLABS_MULTILINGUAL_MODELS
language = self._settings["language"]
if model in ELEVENLABS_MULTILINGUAL_MODELS and language is not None:
@@ -495,6 +536,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
if self._context_id:
await self._websocket.send(json.dumps({"close_socket": True}))
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
@@ -531,9 +573,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
async for message in self._get_websocket():
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
@@ -592,9 +632,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 10
while True:
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.state is State.OPEN:
@@ -695,6 +734,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
style: Style control for voice expression (0.0 to 1.0).
use_speaker_boost: Whether to use speaker boost enhancement.
speed: Voice speed control (0.25 to 4.0).
apply_text_normalization: Text normalization mode ("auto", "on", "off").
"""
language: Optional[Language] = None
@@ -704,6 +744,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
style: Optional[float] = None
use_speaker_boost: Optional[bool] = None
speed: Optional[float] = None
apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None
def __init__(
self,
@@ -754,6 +795,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
"style": params.style,
"use_speaker_boost": params.use_speaker_boost,
"speed": params.speed,
"apply_text_normalization": params.apply_text_normalization,
}
self.set_model_name(model)
self.set_voice(voice_id)
@@ -937,6 +979,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
}
if self._settings["optimize_streaming_latency"] is not None:
params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"]
if self._settings["apply_text_normalization"] is not None:
params["apply_text_normalization"] = self._settings["apply_text_normalization"]
try:
await self.start_ttfb_metrics()

View File

@@ -67,7 +67,6 @@ from pipecat.services.openai.llm import (
OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt
@@ -929,7 +928,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
async def _receive_task_handler(self):
"""Handle incoming messages from the WebSocket connection."""
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
evt = events.parse_server_event(message)
# logger.debug(f"Received event: {message[:500]}")
# logger.debug(f"Received event: {evt}")

View File

@@ -15,7 +15,6 @@ import base64
import json
import warnings
from typing import Any, AsyncGenerator, Dict, Literal, Optional
from urllib.parse import urlencode
import aiohttp
from loguru import logger
@@ -32,7 +31,6 @@ from pipecat.frames.frames import (
from pipecat.services.gladia.config import GladiaInputParams
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
@@ -433,7 +431,7 @@ class GladiaSTTService(STTService):
try:
self._websocket = websocket
self._connection_active = True
logger.info("Connected to Gladia WebSocket")
logger.debug(f"{self} Connected to Gladia WebSocket")
# Send buffered audio if any
await self._send_buffered_audio()
@@ -526,7 +524,7 @@ class GladiaSTTService(STTService):
"""Send any buffered audio after reconnection."""
async with self._buffer_lock:
if self._audio_buffer:
logger.info(f"Sending {len(self._audio_buffer)} bytes of buffered audio")
logger.debug(f"{self} Sending {len(self._audio_buffer)} bytes of buffered audio")
await self._send_audio(bytes(self._audio_buffer))
async def _send_stop_recording(self):
@@ -536,9 +534,8 @@ class GladiaSTTService(STTService):
async def _keepalive_task_handler(self):
"""Send periodic empty audio chunks to keep the connection alive."""
try:
KEEPALIVE_SLEEP = 20 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 20
while self._connection_active:
self.reset_watchdog()
# Send keepalive (Gladia times out after 30 seconds)
await asyncio.sleep(KEEPALIVE_SLEEP)
if self._websocket and self._websocket.state is State.OPEN:
@@ -555,7 +552,7 @@ class GladiaSTTService(STTService):
async def _receive_task_handler(self):
try:
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
content = json.loads(message)
# Handle audio chunk acknowledgments
@@ -613,8 +610,6 @@ class GladiaSTTService(STTService):
translation, "", time_now_iso8601(), translated_language
)
)
self.reset_watchdog()
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection
pass
@@ -631,8 +626,8 @@ class GladiaSTTService(STTService):
self._should_reconnect = False
return False
delay = self._reconnection_delay * (2 ** (self._reconnection_attempts - 1))
logger.info(
f"Reconnecting in {delay} seconds (attempt {self._reconnection_attempts}/{self._max_reconnection_attempts})"
logger.debug(
f"{self} Reconnecting in {delay} seconds (attempt {self._reconnection_attempts}/{self._max_reconnection_attempts})"
)
await asyncio.sleep(delay)
return True

View File

@@ -53,7 +53,6 @@ from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
# Suppress gRPC fork warnings
@@ -807,7 +806,7 @@ class GoogleLLMService(LLMService):
)
function_calls = []
async for chunk in WatchdogAsyncIterator(response, manager=self.task_manager):
async for chunk in response:
# Stop TTFB metrics after the first chunk
await self.stop_ttfb_metrics()
if chunk.usage_metadata:

View File

@@ -17,7 +17,6 @@ from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
# Suppress gRPC fork warnings
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"
@@ -77,7 +76,7 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
context
)
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -16,7 +16,6 @@ import json
import os
import time
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_stt
# Suppress gRPC fork warnings
@@ -781,7 +780,6 @@ class GoogleSTTService(STTService):
if self._request_queue.empty():
# wait for 10ms in case we don't have audio
await asyncio.sleep(0.01)
self.reset_watchdog()
continue
# Start bi-directional streaming
@@ -836,9 +834,7 @@ class GoogleSTTService(STTService):
async def _process_responses(self, streaming_recognize):
"""Process streaming recognition responses."""
try:
async for response in WatchdogAsyncIterator(
streaming_recognize, manager=self.task_manager
):
async for response in streaming_recognize:
# Check streaming limit
if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT:
logger.debug("Stream timeout reached in response processing")

View File

@@ -31,7 +31,6 @@ from pipecat.processors.frame_processor import FrameProcessorSetup
from pipecat.services.heygen.api import HeyGenApi, HeyGenSession, NewSessionRequest
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
from livekit import rtc
@@ -104,7 +103,7 @@ class HeyGenClient:
self._connected = False
self._session_request = session_request
self._callbacks = callbacks
self._event_queue: Optional[WatchdogQueue] = None
self._event_queue: Optional[asyncio.Queue] = None
self._event_task = None
# Currently supporting to capture the audio and video from a single participant
self._video_task = None
@@ -149,7 +148,7 @@ class HeyGenClient:
try:
await self._initialize()
self._event_queue = WatchdogQueue(self._task_manager)
self._event_queue = asyncio.Queue()
self._event_task = self._task_manager.create_task(
self._callback_task_handler(self._event_queue),
f"{self}::event_callback_task",
@@ -170,7 +169,6 @@ class HeyGenClient:
self._connected = False
if self._event_task and self._task_manager:
self._event_queue.cancel()
await self._task_manager.cancel_task(self._event_task)
self._event_task = None
except Exception as e:
@@ -231,11 +229,9 @@ class HeyGenClient:
"""Handle incoming WebSocket messages."""
while self._connected:
try:
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
message = await self._websocket.recv()
parsed_message = json.loads(message)
await self._handle_ws_server_event(parsed_message)
except asyncio.TimeoutError:
self._task_manager.task_reset_watchdog()
except ConnectionClosedOK:
break
except Exception as e:
@@ -248,7 +244,7 @@ class HeyGenClient:
if event_type == "agent.state":
logger.debug(f"HeyGenClient ws received agent status: {event}")
else:
logger.error(f"HeyGenClient ws received unknown event: {event_type}")
logger.trace(f"HeyGenClient ws received unknown event: {event_type}")
async def _ws_disconnect(self) -> None:
"""Disconnect from HeyGen websocket endpoint."""

View File

@@ -40,7 +40,6 @@ from pipecat.services.ai_service import AIService
from pipecat.services.heygen.api import NewSessionRequest
from pipecat.services.heygen.client import HEY_GEN_SAMPLE_RATE, HeyGenCallbacks, HeyGenClient
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
# Using the same values that we do in the BaseOutputTransport
AVATAR_VAD_STOP_SECS = 0.35
@@ -278,21 +277,14 @@ class HeyGenVideoService(AIService):
await self._client.stop()
async def _create_send_task(self):
"""Create the audio sending task if it doesn't exist.
Initializes a new WatchdogQueue and creates a task for handling audio sending.
"""
"""Create the audio sending task if it doesn't exist."""
if not self._send_task:
self._queue = WatchdogQueue(self.task_manager)
self._queue = asyncio.Queue()
self._send_task = self.create_task(self._send_task_handler())
async def _cancel_send_task(self):
"""Cancel the audio sending task if it exists.
Cancels and cleans up the audio sending task and associated queue.
"""
"""Cancel the audio sending task if it exists."""
if self._send_task:
self._queue.cancel()
await self.cancel_task(self._send_task)
self._send_task = None

View File

@@ -487,7 +487,7 @@ class LLMService(AIService):
self._function_call_tasks[task] = runner_item
# Since we run tasks sequentially we don't need to call
# task.add_done_callback(self._function_call_task_finished).
await self.wait_for_task(task)
await task
del self._function_call_tasks[task]
async def _run_function_call(self, runner_item: FunctionCallRunnerItem):
@@ -616,7 +616,3 @@ class LLMService(AIService):
def _function_call_task_finished(self, task: asyncio.Task):
if task in self._function_call_tasks:
del self._function_call_tasks[task]
# The task is finished so this should exit immediately. We need to
# do this because otherwise the task manager would report a dangling
# task if we don't remove it.
asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop())

View File

View File

@@ -0,0 +1,185 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Mistral LLM service implementation using OpenAI-compatible interface."""
from typing import List, Sequence
from loguru import logger
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pipecat.frames.frames import FunctionCallFromLLM
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
class MistralLLMService(OpenAILLMService):
"""A service for interacting with Mistral's API using the OpenAI-compatible interface.
This service extends OpenAILLMService to connect to Mistral's API endpoint while
maintaining full compatibility with OpenAI's interface and functionality.
"""
def __init__(
self,
*,
api_key: str,
base_url: str = "https://api.mistral.ai/v1",
model: str = "mistral-small-latest",
**kwargs,
):
"""Initialize the Mistral LLM service.
Args:
api_key: The API key for accessing Mistral's API.
base_url: The base URL for Mistral API. Defaults to "https://api.mistral.ai/v1".
model: The model identifier to use. Defaults to "mistral-small-latest".
**kwargs: Additional keyword arguments passed to OpenAILLMService.
"""
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)
def create_client(self, api_key=None, base_url=None, **kwargs):
"""Create OpenAI-compatible client for Mistral API endpoint.
Args:
api_key: The API key for authentication. If None, uses instance key.
base_url: The base URL for the API. If None, uses instance URL.
**kwargs: Additional arguments passed to the client constructor.
Returns:
An OpenAI-compatible client configured for Mistral API.
"""
logger.debug(f"Creating Mistral client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
def _apply_mistral_assistant_prefix(
self, messages: List[ChatCompletionMessageParam]
) -> List[ChatCompletionMessageParam]:
"""Apply Mistral's assistant message prefix requirement.
Mistral requires assistant messages to have prefix=True when they
are the final message in a conversation. According to Mistral's API:
- Assistant messages with prefix=True MUST be the last message
- Only add prefix=True to the final assistant message when needed
- This allows assistant messages to be accepted as the last message
Args:
messages: The original list of messages.
Returns:
Messages with Mistral prefix requirement applied to final assistant message.
"""
if not messages:
return messages
# Create a copy to avoid modifying the original
fixed_messages = [dict(msg) for msg in messages]
# Get the last message
last_message = fixed_messages[-1]
# Only add prefix=True to the last message if it's an assistant message
# and Mistral would otherwise reject it
if last_message.get("role") == "assistant" and "prefix" not in last_message:
last_message["prefix"] = True
return fixed_messages
async def run_function_calls(self, function_calls: Sequence[FunctionCallFromLLM]):
"""Execute function calls, filtering out already-completed ones.
Mistral and OpenAI have different function call detection patterns:
OpenAI (Stream-based detection):
- Detects function calls only from streaming chunks as the LLM generates them
- Second LLM completion doesn't re-detect existing tool_calls in message history
- Function calls execute exactly once
Mistral (Message-based detection):
- Detects function calls from the complete message history on each completion
- Second LLM completion with the response re-detects the same tool_calls from
previous messages
- Without filtering, function calls would execute twice
This method prevents duplicate execution by:
1. Checking message history for existing tool result messages
2. Filtering out function calls that already have corresponding results
3. Only executing function calls that haven't been completed yet
Note: This filtering prevents duplicate function execution, but the
on_function_calls_started event may still fire twice due to the detection
pattern difference. This is expected behavior.
Args:
function_calls: The function calls to potentially execute.
"""
if not function_calls:
return
# Filter out function calls that already have results
calls_to_execute = []
# Get messages from the first function call's context (they should all have the same context)
messages = function_calls[0].context.get_messages() if function_calls else []
# Get all tool_call_ids that already have results
executed_call_ids = set()
for msg in messages:
if msg.get("role") == "tool" and msg.get("tool_call_id"):
executed_call_ids.add(msg.get("tool_call_id"))
# Only include function calls that haven't been executed yet
for call in function_calls:
if call.tool_call_id not in executed_call_ids:
calls_to_execute.append(call)
else:
logger.trace(
f"Skipping already-executed function call: {call.function_name}:{call.tool_call_id}"
)
# Call parent method with filtered list
if calls_to_execute:
await super().run_function_calls(calls_to_execute)
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> dict:
"""Build parameters for Mistral chat completion request.
Handles Mistral-specific requirements including:
- Assistant message prefix requirement for API compatibility
- Parameter mapping (random_seed instead of seed)
- Core completion settings
"""
# Apply Mistral's assistant prefix requirement for API compatibility
fixed_messages = self._apply_mistral_assistant_prefix(messages)
params = {
"model": self.model_name,
"stream": True,
"messages": fixed_messages,
"tools": context.tools,
"tool_choice": context.tool_choice,
"frequency_penalty": self._settings["frequency_penalty"],
"presence_penalty": self._settings["presence_penalty"],
"temperature": self._settings["temperature"],
"top_p": self._settings["top_p"],
"max_tokens": self._settings["max_tokens"],
}
# Handle Mistral-specific parameter mapping
# Mistral uses "random_seed" instead of "seed"
if self._settings["seed"]:
params["random_seed"] = self._settings["seed"]
# Add any extra parameters
params.update(self._settings["extra"])
return params

View File

@@ -36,7 +36,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -315,7 +314,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _receive_messages(self):
"""Receive and process messages from Neuphonic WebSocket."""
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
if isinstance(message, str):
msg = json.loads(message)
if msg.get("data") and msg["data"].get("audio"):
@@ -327,9 +326,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _keepalive_task_handler(self):
"""Handle keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 10
while True:
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
await self._send_keepalive()

View File

@@ -39,7 +39,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -284,7 +283,7 @@ class BaseOpenAILLMService(LLMService):
context
)
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -53,7 +53,6 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
@@ -456,7 +455,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
#
async def _receive_task_handler(self):
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
evt = events.parse_server_event(message)
if evt.type == "session.created":
await self._handle_evt_session_created(evt)

View File

@@ -24,7 +24,6 @@ from pipecat.frames.frames import (
)
from pipecat.services.stt_service import SegmentedSTTService, STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
@@ -239,13 +238,13 @@ class RivaSTTService(STTService):
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
self._config = config
self._queue = WatchdogQueue(self.task_manager)
self._queue = asyncio.Queue()
if not self._thread_task:
self._thread_task = self.create_task(self._thread_task_handler())
if not self._response_task:
self._response_queue = WatchdogQueue(self.task_manager)
self._response_queue = asyncio.Queue()
self._response_task = self.create_task(self._response_task_handler())
async def stop(self, frame: EndFrame):

View File

@@ -168,7 +168,7 @@ class RivaTTSService(TTSService):
await asyncio.to_thread(read_audio_responses, queue)
# Wait for the thread to start.
resp = await asyncio.wait_for(queue.get(), RIVA_TTS_TIMEOUT_SECS)
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
while resp:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
@@ -177,7 +177,7 @@ class RivaTTSService(TTSService):
num_channels=1,
)
yield frame
resp = await asyncio.wait_for(queue.get(), RIVA_TTS_TIMEOUT_SECS)
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")

View File

@@ -20,7 +20,6 @@ from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -127,7 +126,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
context
)
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
from av.audio.frame import AudioFrame
@@ -96,7 +95,7 @@ class SimliVideoService(FrameProcessor):
"""Consume audio frames from Simli and push them downstream."""
await self._pipecat_resampler_event.wait()
audio_iterator = self._simli_client.getAudioStreamIterator()
async for audio_frame in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager):
async for audio_frame in audio_iterator:
resampled_frames = self._pipecat_resampler.resample(audio_frame)
for resampled_frame in resampled_frames:
audio_array = resampled_frame.to_ndarray()
@@ -114,7 +113,7 @@ class SimliVideoService(FrameProcessor):
"""Consume video frames from Simli and convert them to output frames."""
await self._pipecat_resampler_event.wait()
video_iterator = self._simli_client.getVideoStreamIterator(targetFormat="rgb24")
async for video_frame in WatchdogAsyncIterator(video_iterator, manager=self.task_manager):
async for video_frame in video_iterator:
# Process the video frame
convertedFrame: OutputImageRawFrame = OutputImageRawFrame(
image=video_frame.to_rgb().to_image().tobytes(),

View File

@@ -208,7 +208,7 @@ class SonioxSTTService(STTService):
if self._receive_task:
# Task cannot cancel itself. If task called _cleanup() we expect it to cancel itself.
if self._receive_task != asyncio.current_task():
await self.wait_for_task(self._receive_task)
await self._receive_task
self._receive_task = None
async def stop(self, frame: EndFrame):

View File

@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
BotInterruptionFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -463,8 +464,14 @@ class SpeechmaticsSTTService(STTService):
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Adds audio to the audio buffer and yields None."""
await self._client.send_audio(audio)
yield None
try:
if self._client:
await self._client.send_audio(audio)
yield None
except Exception as e:
logger.error(f"Speechmatics error: {e}")
yield ErrorFrame(f"Speechmatics error: {e}", fatal=False)
await self._disconnect()
def update_params(
self,
@@ -520,7 +527,7 @@ class SpeechmaticsSTTService(STTService):
)
# Log the event
logger.debug("Connected to Speechmatics STT service")
logger.debug(f"{self} Connecting to Speechmatics STT service")
# Recognition started event
@self._client.on(ServerMessageType.RECOGNITION_STARTED)
@@ -562,31 +569,36 @@ class SpeechmaticsSTTService(STTService):
)
# Start session
await self._client.start_session(
transcription_config=self._transcription_config,
audio_format=AudioFormat(
encoding=self._params.audio_encoding,
sample_rate=self.sample_rate,
chunk_size=self._params.chunk_size,
),
)
try:
await self._client.start_session(
transcription_config=self._transcription_config,
audio_format=AudioFormat(
encoding=self._params.audio_encoding,
sample_rate=self.sample_rate,
chunk_size=self._params.chunk_size,
),
)
logger.debug(f"{self} Connected to Speechmatics STT service")
except Exception as e:
logger.error(f"{self} Error connecting to Speechmatics: {e}")
finally:
self._client = None
async def _disconnect(self) -> None:
"""Disconnect from the STT service."""
# Disconnect the client
logger.debug(f"{self} Disconnecting from Speechmatics STT service")
try:
if self._client:
await asyncio.wait_for(self._client.close(), timeout=1.0)
await asyncio.wait_for(self._client.close(), timeout=5.0)
logger.debug(f"{self} Disconnected from Speechmatics STT service")
except asyncio.TimeoutError:
logger.warning("Timeout while closing Speechmatics client connection")
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"Error closing Speechmatics client: {e}")
logger.error(f"{self} Error closing Speechmatics client: {e}")
finally:
self._client = None
# Log the event
logger.debug("Disconnected from Speechmatics STT service")
def _process_config(self) -> None:
"""Create a formatted STT transcription config.

View File

@@ -35,7 +35,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.services.ai_service import AIService
from pipecat.transports.services.tavus import TavusCallbacks, TavusParams, TavusTransportClient
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
class TavusVideoService(AIService):
@@ -255,13 +254,12 @@ class TavusVideoService(AIService):
async def _create_send_task(self):
"""Create the audio sending task if it doesn't exist."""
if not self._send_task:
self._queue = WatchdogQueue(self.task_manager)
self._queue = asyncio.Queue()
self._send_task = self.create_task(self._send_task_handler())
async def _cancel_send_task(self):
"""Cancel the audio sending task if it exists."""
if self._send_task:
self._queue.cancel()
await self.cancel_task(self._send_task)
self._send_task = None

View File

@@ -37,7 +37,6 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.base_text_filter import BaseTextFilter
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator
@@ -258,7 +257,7 @@ class TTSService(AIService):
self._settings[key] = self.language_to_service_language(value)
elif key == "model":
self.set_model_name(value)
elif key == "voice":
elif key == "voice" or key == "voice_id":
self.set_voice(value)
elif key == "text_filter":
for filter in self._text_filters:
@@ -428,7 +427,7 @@ class TTSService(AIService):
while True:
try:
frame = await asyncio.wait_for(
self._stop_frame_queue.get(), self._stop_frame_timeout_s
self._stop_frame_queue.get(), timeout=self._stop_frame_timeout_s
)
if isinstance(frame, TTSStartedFrame):
has_started = True
@@ -438,8 +437,6 @@ class TTSService(AIService):
if has_started:
await self.push_frame(TTSStoppedFrame())
has_started = False
finally:
self.reset_watchdog()
class WordTTSService(TTSService):
@@ -526,7 +523,7 @@ class WordTTSService(TTSService):
def _create_words_task(self):
if not self._words_task:
self._words_queue = WatchdogQueue(self.task_manager)
self._words_queue = asyncio.Queue()
self._words_task = self.create_task(self._words_task_handler())
async def _stop_words_task(self):
@@ -797,7 +794,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
# Indicate no more audio contexts are available. this will end the
# task cleanly after all contexts have been processed.
await self._contexts_queue.put(None)
await self.wait_for_task(self._audio_context_task)
await self._audio_context_task
self._audio_context_task = None
async def cancel(self, frame: CancelFrame):
@@ -816,13 +813,12 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
def _create_audio_context_task(self):
if not self._audio_context_task:
self._contexts_queue = WatchdogQueue(self.task_manager)
self._contexts_queue = asyncio.Queue()
self._contexts: Dict[str, asyncio.Queue] = {}
self._audio_context_task = self.create_task(self._audio_context_task_handler())
async def _stop_audio_context_task(self):
if self._audio_context_task:
self._contexts_queue.cancel()
await self.cancel_task(self._audio_context_task)
self._audio_context_task = None
@@ -859,12 +855,10 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
while running:
try:
frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT)
self.reset_watchdog()
if frame:
await self.push_frame(frame)
running = frame is not None
except asyncio.TimeoutError:
self.reset_watchdog()
# We didn't get audio, so let's consider this context finished.
logger.trace(f"{self} time out on audio context {context_id}")
break

View File

@@ -36,7 +36,7 @@ class SleepFrame(SystemFrame):
sleep: Duration to sleep in seconds before processing the next frame.
"""
sleep: float = 0.1
sleep: float = 0.2
class HeartbeatsObserver(BaseObserver):
@@ -100,7 +100,7 @@ class QueuedFrameProcessor(FrameProcessor):
queue_direction: The direction of frames to capture (UPSTREAM or DOWNSTREAM).
ignore_start: Whether to ignore StartFrames when capturing.
"""
super().__init__()
super().__init__(enable_direct_mode=True)
self._queue = queue
self._queue_direction = queue_direction
self._ignore_start = ignore_start

View File

@@ -505,8 +505,6 @@ class BaseInputTransport(FrameProcessor):
if self._params.turn_analyzer:
self._params.turn_analyzer.clear()
await self._handle_user_interruption(UserStoppedSpeakingFrame())
finally:
self.reset_watchdog()
async def _handle_prediction_result(self, result: MetricsData):
"""Handle a prediction result event from the turn analyzer."""

View File

@@ -46,7 +46,6 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue
from pipecat.utils.time import nanoseconds_to_seconds
BOT_VAD_STOP_SECS = 0.35
@@ -436,9 +435,9 @@ class BaseOutputTransport(FrameProcessor):
# also need to wait for these tasks before cancelling the video task
# because it might be still rendering.
if self._audio_task:
await self._transport.wait_for_task(self._audio_task)
await self._audio_task
if self._clock_task:
await self._transport.wait_for_task(self._clock_task)
await self._clock_task
# Stop audio mixer.
if self._mixer:
@@ -626,10 +625,8 @@ class BaseOutputTransport(FrameProcessor):
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
self._transport.reset_watchdog()
yield frame
except asyncio.TimeoutError:
self._transport.reset_watchdog()
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
@@ -639,13 +636,11 @@ class BaseOutputTransport(FrameProcessor):
while True:
try:
frame = self._audio_queue.get_nowait()
self._transport.reset_watchdog()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
self._transport.reset_watchdog()
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
@@ -827,15 +822,12 @@ class BaseOutputTransport(FrameProcessor):
def _create_clock_task(self):
"""Create the clock/timing processing task."""
if not self._clock_task:
self._clock_queue = WatchdogPriorityQueue(
self._transport.task_manager, tuple_size=3
)
self._clock_queue = asyncio.PriorityQueue()
self._clock_task = self._transport.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):
"""Cancel and cleanup the clock processing task."""
if self._clock_task:
self._clock_queue.cancel()
await self._transport.cancel_task(self._clock_task)
self._clock_task = None

View File

@@ -37,7 +37,6 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
from fastapi import WebSocket
@@ -283,9 +282,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
async def _receive_messages(self):
"""Main message receiving loop for WebSocket messages."""
try:
async for message in WatchdogAsyncIterator(
self._client.receive(), manager=self.task_manager
):
async for message in self._client.receive():
if not self._params.serializer:
continue

View File

@@ -40,7 +40,6 @@ from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
import cv2
@@ -52,6 +51,10 @@ except ModuleNotFoundError as e:
logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.")
raise Exception(f"Missing module: {e}")
CAM_VIDEO_SOURCE = "camera"
SCREEN_VIDEO_SOURCE = "screenVideo"
MIC_AUDIO_SOURCE = "microphone"
class SmallWebRTCCallbacks(BaseModel):
"""Callback handlers for SmallWebRTC events.
@@ -221,6 +224,7 @@ class SmallWebRTCClient:
self._video_output_track = None
self._audio_input_track: Optional[AudioStreamTrack] = None
self._video_input_track: Optional[VideoStreamTrack] = None
self._screen_video_track: Optional[VideoStreamTrack] = None
self._params = None
self._audio_in_channels = None
@@ -274,22 +278,30 @@ class SmallWebRTCClient:
return cv2.cvtColor(frame_array, conversion_code)
async def read_video_frame(self):
async def read_video_frame(self, video_source: str):
"""Read video frames from the WebRTC connection.
Reads a video frame from the given MediaStreamTrack, converts it to RGB,
and creates an InputImageRawFrame.
Args:
video_source: Video source to capture ("camera" or "screenVideo").
Yields:
UserImageRawFrame objects containing video data from the peer.
"""
while True:
if self._video_input_track is None:
video_track = (
self._video_input_track
if video_source == CAM_VIDEO_SOURCE
else self._screen_video_track
)
if video_track is None:
await asyncio.sleep(0.01)
continue
try:
frame = await asyncio.wait_for(self._video_input_track.recv(), timeout=2.0)
frame = await asyncio.wait_for(video_track.recv(), timeout=2.0)
except asyncio.TimeoutError:
if self._webrtc_connection.is_connected():
logger.warning("Timeout: No video frame received within the specified time.")
@@ -315,6 +327,7 @@ class SmallWebRTCClient:
size=(frame.width, frame.height),
format="RGB",
)
image_frame.transport_source = video_source
yield image_frame
@@ -436,6 +449,7 @@ class SmallWebRTCClient:
self._audio_input_track = self._webrtc_connection.audio_input_track()
self._video_input_track = self._webrtc_connection.video_input_track()
self._screen_video_track = self._webrtc_connection.screen_video_input_track()
if self._params.audio_out_enabled:
self._audio_output_track = RawAudioTrack(sample_rate=self._out_sample_rate)
self._webrtc_connection.replace_audio_track(self._audio_output_track)
@@ -452,6 +466,7 @@ class SmallWebRTCClient:
"""Handle peer disconnection cleanup."""
self._audio_input_track = None
self._video_input_track = None
self._screen_video_track = None
self._audio_output_track = None
self._video_output_track = None
@@ -459,6 +474,7 @@ class SmallWebRTCClient:
"""Handle client connection closure."""
self._audio_input_track = None
self._video_input_track = None
self._screen_video_track = None
self._audio_output_track = None
self._video_output_track = None
await self._callbacks.on_client_disconnected(self._webrtc_connection)
@@ -515,6 +531,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
self._params = params
self._receive_audio_task = None
self._receive_video_task = None
self._receive_screen_video_task = None
self._image_requests = {}
# Whether we have seen a StartFrame already.
@@ -547,11 +564,11 @@ class SmallWebRTCInputTransport(BaseInputTransport):
await self._client.setup(self._params, frame)
await self._client.connect()
await self.set_transport_ready(frame)
if not self._receive_audio_task and self._params.audio_in_enabled:
self._receive_audio_task = self.create_task(self._receive_audio())
if not self._receive_video_task and self._params.video_in_enabled:
self._receive_video_task = self.create_task(self._receive_video())
await self.set_transport_ready(frame)
self._receive_video_task = self.create_task(self._receive_video(CAM_VIDEO_SOURCE))
async def _stop_tasks(self):
"""Stop all background tasks."""
@@ -586,40 +603,42 @@ class SmallWebRTCInputTransport(BaseInputTransport):
"""Background task for receiving audio frames from WebRTC."""
try:
audio_iterator = self._client.read_audio_frame()
async for audio_frame in WatchdogAsyncIterator(
audio_iterator, manager=self.task_manager
):
async for audio_frame in audio_iterator:
if audio_frame:
await self.push_audio_frame(audio_frame)
except Exception as e:
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
async def _receive_video(self):
"""Background task for receiving video frames from WebRTC."""
async def _receive_video(self, video_source: str):
"""Background task for receiving video frames from WebRTC.
Args:
video_source: Video source to capture ("camera" or "screenVideo").
"""
try:
video_iterator = self._client.read_video_frame()
async for video_frame in WatchdogAsyncIterator(
video_iterator, manager=self.task_manager
):
video_iterator = self._client.read_video_frame(video_source)
async for video_frame in video_iterator:
if video_frame:
await self.push_video_frame(video_frame)
# Check if there are any pending image requests and create UserImageRawFrame
if self._image_requests:
for req_id, request_frame in list(self._image_requests.items()):
# Create UserImageRawFrame using the current video frame
image_frame = UserImageRawFrame(
user_id=request_frame.user_id,
request=request_frame,
image=video_frame.image,
size=video_frame.size,
format=video_frame.format,
)
# Push the frame to the pipeline
await self.push_video_frame(image_frame)
# Remove from pending requests
del self._image_requests[req_id]
if request_frame.video_source == video_source:
# Create UserImageRawFrame using the current video frame
image_frame = UserImageRawFrame(
user_id=request_frame.user_id,
request=request_frame,
image=video_frame.image,
size=video_frame.size,
format=video_frame.format,
)
image_frame.transport_source = video_source
# Push the frame to the pipeline
await self.push_video_frame(image_frame)
# Remove from pending requests
del self._image_requests[req_id]
except Exception as e:
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
@@ -650,10 +669,60 @@ class SmallWebRTCInputTransport(BaseInputTransport):
request_id = f"{frame.function_name}:{frame.tool_call_id}"
self._image_requests[request_id] = frame
# Default to camera if no source specified
if frame.video_source is None:
frame.video_source = CAM_VIDEO_SOURCE
# If we're not already receiving video, try to get a frame now
if not self._receive_video_task and self._params.video_in_enabled:
if (
frame.video_source == CAM_VIDEO_SOURCE
and not self._receive_video_task
and self._params.video_in_enabled
):
# Start video reception if it's not already running
self._receive_video_task = self.create_task(self._receive_video())
self._receive_video_task = self.create_task(self._receive_video(CAM_VIDEO_SOURCE))
elif (
frame.video_source == SCREEN_VIDEO_SOURCE
and not self._receive_screen_video_task
and self._params.video_in_enabled
):
# Start screen video reception if it's not already running
self._receive_screen_video_task = self.create_task(
self._receive_video(SCREEN_VIDEO_SOURCE)
)
async def capture_participant_media(
self,
source: str = CAM_VIDEO_SOURCE,
):
"""Capture media from a specific participant.
Args:
source: Media source to capture from. ("camera", "microphone", or "screenVideo")
"""
# If we're not already receiving video, try to get a frame now
if (
source == MIC_AUDIO_SOURCE
and not self._receive_audio_task
and self._params.audio_in_enabled
):
# Start audio reception if it's not already running
self._receive_audio_task = self.create_task(self._receive_audio())
elif (
source == CAM_VIDEO_SOURCE
and not self._receive_video_task
and self._params.video_in_enabled
):
# Start video reception if it's not already running
self._receive_video_task = self.create_task(self._receive_video(CAM_VIDEO_SOURCE))
elif (
source == SCREEN_VIDEO_SOURCE
and not self._receive_screen_video_task
and self._params.video_in_enabled
):
# Start screen video reception if it's not already running
self._receive_screen_video_task = self.create_task(
self._receive_video(SCREEN_VIDEO_SOURCE)
)
class SmallWebRTCOutputTransport(BaseOutputTransport):
@@ -840,3 +909,27 @@ class SmallWebRTCTransport(BaseTransport):
async def _on_client_disconnected(self, webrtc_connection):
"""Handle client disconnection events."""
await self._call_event_handler("on_client_disconnected", webrtc_connection)
async def capture_participant_video(
self,
video_source: str = CAM_VIDEO_SOURCE,
):
"""Capture video from a specific participant.
Args:
video_source: Video source to capture from ("camera" or "screenVideo").
"""
if self._input:
await self._input.capture_participant_media(source=video_source)
async def capture_participant_audio(
self,
audio_source: str = MIC_AUDIO_SOURCE,
):
"""Capture audio from a specific participant.
Args:
audio_source: Audio source to capture from. (currently, "microphone" is the only supported option)
"""
if self._input:
await self._input.capture_participant_media(source=audio_source)

Some files were not shown because too many files have changed in this diff Show More