Compare commits

...

105 Commits

Author SHA1 Message Date
James Hush
6f2ffa8fed fix: add type annotations to event_handler decorator
Add proper generic type annotations to the event_handler decorator
so that static type checkers (pyright, mypy) understand the decorated
function is returned and used, eliminating false reportUnusedFunction
warnings.

Changes:
- Import TypeVar and Callable from typing
- Define F TypeVar bound to Callable[..., Any]
- Add Callable[[F], F] return type to event_handler method
- Add F type annotations to inner decorator function
2026-01-19 11:07:56 +08:00
Mark Backman
829c5f4604 Merge pull request #3169 from Incanta/hathora
Add Hathora STT and TTS services
2026-01-17 16:25:12 -05:00
Mike Seese
dc8ea615d9 add hathora to run-release-evals.py 2026-01-17 10:33:58 -08:00
Mike Seese
a3d206050d move hathora example as requested 2026-01-17 10:31:08 -08:00
Mike Seese
f48a567873 run the linter 2026-01-17 10:30:47 -08:00
Mark Backman
e69ccd8ea7 Merge pull request #3490 from pipecat-ai/mb/on-user-mute-events
Add on_user_mute_started and on_user_mute_stopped events
2026-01-17 11:05:15 -05:00
Mark Backman
11924bb980 Add on_user_mute_started and on_user_mute_stopped events 2026-01-17 11:01:46 -05:00
Mark Backman
af89154e96 Merge pull request #3489 from pipecat-ai/mb/fix-azure-tts-punctuation-spacing
fix: AzureTTSService punctuation spacing
2026-01-17 11:00:30 -05:00
Mark Backman
1485ea0831 Merge pull request #3488 from pipecat-ai/mb/on-user-turn-idle
Update on_user_idle to on_user_turn_idle
2026-01-17 11:00:16 -05:00
Mark Backman
e22bc777d8 Fix spacing for CJK languages 2026-01-17 09:04:50 -05:00
Mark Backman
043403fe23 fix: AzureTTSService punctuation spacing 2026-01-17 08:18:31 -05:00
Mark Backman
1e1160906e Update on_user_idle to on_user_turn_idle 2026-01-17 07:04:27 -05:00
Aleix Conchillo Flaqué
f7d3e63063 Merge pull request #3474 from pipecat-ai/fix/optional-member-access-function-call-cancel
Fix Pylance reportOptionalMemberAccess in _handle_function_call_cancel
2026-01-16 22:06:45 -08:00
Mark Backman
473d39791b Merge pull request #3482 from pipecat-ai/mb/user-idle-in-user-aggregator
Add UserIdleController, deprecate UserIdleProcessor
2026-01-16 18:47:10 -05:00
Aleix Conchillo Flaqué
2114abb8c6 add changelog file for 3484 2026-01-16 15:46:29 -08:00
Aleix Conchillo Flaqué
4fb4c26f55 Merge pull request #3484 from amichyrpi/main
Remove async_mode parameter from Mem0 storage
2026-01-16 15:44:52 -08:00
Mark Backman
2e8e574ea5 Add UserIdleController, deprecate UserIdleProcessor 2026-01-16 18:44:19 -05:00
Aleix Conchillo Flaqué
84c7e97be2 Merge pull request #3483 from pipecat-ai/aleix/throttle-user-speaking-frame
throttle user speaking frame
2026-01-16 15:29:37 -08:00
Amory Hen
a6e7c99d55 Remove async_mode parameter from Mem0 storage 2026-01-17 00:26:38 +01:00
Aleix Conchillo Flaqué
ac3fa7f91f BaseOuputTransport: minor cleanup 2026-01-16 15:15:49 -08:00
Aleix Conchillo Flaqué
6eadad53b2 BaseInputTransport: throttle UserSpeakingFrame 2026-01-16 15:15:49 -08:00
kompfner
b11150f31f Merge pull request #3480 from pipecat-ai/pk/fix-grok-realtime-smallwebrtc
Fix an issue where Grok Realtime would error out when running with Sm…
2026-01-16 15:46:27 -05:00
Paul Kompfner
836cf60611 Fix an issue where Grok Realtime would error out when running with SmallWebRTC transport.
The underlying issue was related to the fact that we were sending audio to Grok before we had configured the Grok session with our default input sample rate (16000), so Grok was interpreting those initial audio chunks as having its default sample rate (24000). We didn't see this issue when using the Daily transport simply because in our test environments Daily took a smidge longer than a reflexive (localhost) pure WebRTC connection, so we would only send audio to Grok *after* we had configured the Grok session with the desired sample rate.
2026-01-16 15:41:33 -05:00
James Hush
1c13ad95a5 Fix Pylance reportOptionalMemberAccess in _handle_function_call_cancel
Extract dictionary value to local variable and check for None before
accessing cancel_on_interruption attribute, since the dictionary values
are typed as Optional[FunctionCallInProgressFrame].
2026-01-16 15:04:26 -05:00
Mark Backman
1e8516e91d Merge pull request #3476 from pipecat-ai/mb/project-urls
Update project.urls for PyPI
2026-01-16 14:57:39 -05:00
Mark Backman
32c775311d Merge pull request #3471 from pipecat-ai/mb/fix-pydantic-2.12-docs
Revert pydantic 2.12 extra type annotation
2026-01-16 14:57:24 -05:00
Mark Backman
28d0bb98de Merge pull request #3472 from pipecat-ai/mb/whisker-dev
Add whisker_setup.py setup file to .gitignore
2026-01-16 14:55:48 -05:00
Aleix Conchillo Flaqué
a9a9f3aeaa Merge pull request #3462 from pipecat-ai/aleix/fix-min-words-transcription-aggregation
MinWordsUserTurnStartStrategy: don't aggregate transcriptions
2026-01-16 11:18:23 -08:00
Aleix Conchillo Flaqué
c2a0735975 MinWordsUserTurnStartStrategy: don't aggregate transcriptions
If we aggregate transcriptions we will get incorrect interruptions. For example,
if we have a strategy with min_words=3 and we say "One" and pause, then "Two"
and pause and then "Three", this would trigger the start of the turn when it
shouldn't. We should only look at the incoming transcription text and don't
aggregate it with the previous.
2026-01-16 11:16:06 -08:00
Aleix Conchillo Flaqué
41cb53f6c2 Merge pull request #3479 from pipecat-ai/aleix/turns-mute-to-user-mute
turns: move mute to user_mute
2026-01-16 11:11:50 -08:00
Aleix Conchillo Flaqué
58552af8fd examples(foundational): remote STTMuteFilter example 2026-01-16 11:07:20 -08:00
Aleix Conchillo Flaqué
c7ab87b0cc turns: move mute to user_mute 2026-01-16 11:07:20 -08:00
Mark Backman
11ecc5fdee Update project.urls for PyPI 2026-01-16 12:48:13 -05:00
kompfner
19fb3eed9f Merge pull request #3466 from pipecat-ai/pk/fix-aws-nova-sonic-rtvi-bot-output
Fix realtime (speech-to-speech) services' RTVI event compatibility
2026-01-16 09:56:13 -05:00
Mark Backman
b292b32374 Merge pull request #3461 from glennpow/glenn/websocket-headers
Allow WebsocketClientTransport to send custom headers
2026-01-15 20:26:36 -05:00
Mark Backman
63d1393bb0 Add whisker_setup.py to .gitignore 2026-01-15 20:21:25 -05:00
Glenn Powell
37914cb062 Removed import and added changelog entry. 2026-01-15 16:47:15 -08:00
Mark Backman
ec40696854 Revert pydantic 2.12 extra type annotation 2026-01-15 19:16:15 -05:00
Mike Seese
2249f3d673 add requested changes from code review 2026-01-15 15:27:56 -08:00
Mike Seese
d2df324f29 fix some bugs after testing changes 2026-01-15 15:27:56 -08:00
Mike Seese
67fdb0b659 use parent _settings dict instead of self._params pattern 2026-01-15 15:27:56 -08:00
Mike Seese
e77bdf66f9 add can_generate_metrics functions 2026-01-15 15:27:56 -08:00
Mike Seese
1b3b67779c switch hathora services to use InputParams pattern 2026-01-15 15:27:55 -08:00
Mike Seese
6c7e386391 remove traced_stt from run_stt 2026-01-15 15:27:55 -08:00
Mike Seese
ba25b279d6 fix issues with PR suggestions 2026-01-15 15:27:55 -08:00
Mike Seese
e7c83c19b6 port turn_start_strategies to the newer user_turn_strategies 2026-01-15 15:27:55 -08:00
Mike Seese
7be7fb49a3 remove turn_analyzer args from transport params 2026-01-15 15:27:54 -08:00
Mike Seese
bcccb4cbb3 put fallback sample_rate value in function arg 2026-01-15 15:27:54 -08:00
Mike Seese
e9f1d951d3 Apply suggestions from code review
Co-authored-by: Mark Backman <m.backman@gmail.com>
2026-01-15 15:27:54 -08:00
Mike Seese
e5632a9339 transition Hathora service to use the unified API and apply PR feedback
add Hathora to root files

Hathora run linter

added hathora changelog
2026-01-15 15:27:53 -08:00
Mike Seese
1510fb4fc0 add Hathora STT and TTS services 2026-01-15 15:26:52 -08:00
Mark Backman
64a1ad2649 Merge pull request #3470 from pipecat-ai/mb/fix-docs-0.0.99
Docs fixes after 0.0.99
2026-01-15 17:34:44 -05:00
Mark Backman
4458ca1d24 Mock FastAPI 2026-01-15 17:29:47 -05:00
Mark Backman
21aaa48e62 Fix pydantic issues impacting autodoc 2026-01-15 17:29:47 -05:00
Mark Backman
e75c241030 Merge pull request #3468 from pipecat-ai/mb/camb-cleanuo
Clean up CambTTSService
2026-01-15 17:16:28 -05:00
Mark Backman
60216048a8 Docs fixes after 0.0.99 2026-01-15 16:40:42 -05:00
Mark Backman
f3c2e29fb4 Clean up CambTTSService 2026-01-15 15:59:17 -05:00
Paul Kompfner
ce99924be4 Add CHANGELOG entry describing fix for the missing "bot-llm-text" RTVI event when using realtime (speech-to-speech) services 2026-01-15 15:55:39 -05:00
Paul Kompfner
5de80a60d4 Fix "bot-llm-text" not firing when using Grok Realtime 2026-01-15 15:30:00 -05:00
Paul Kompfner
5753762350 Fix "bot-llm-text" not firing when using OpenAI Realtime 2026-01-15 15:16:08 -05:00
Paul Kompfner
885b318b04 Fix "bot-llm-text" not firing when using Gemini Live 2026-01-15 15:03:45 -05:00
Paul Kompfner
7a22d58cf4 Fix "bot-llm-text" not firing when using AWS Nova Sonic 2026-01-15 14:56:50 -05:00
Mark Backman
c8e4b462c9 Merge pull request #3460 from pipecat-ai/mb/reorder-07-examples
Renumber the 07 foundational examples
2026-01-15 14:44:21 -05:00
Mark Backman
30a3f42255 Merge pull request #3349 from eRuaro/feat/camb-tts-integration
Add Camb.ai TTS integration with MARS models
2026-01-15 14:43:12 -05:00
Neil Ruaro
26ddb2de2f minimal uv.lock update for camb-sdk 2026-01-16 03:18:01 +08:00
Neil Ruaro
f60eeaa212 reverted uv.lock, updated readthedocs.yaml, copyright year updates 2026-01-16 02:50:18 +08:00
Neil Ruaro
8cf72b36cb manually add camb-sdk to uv.lock, exclude camb from docs build 2026-01-16 02:26:38 +08:00
Neil Ruaro
38c3bcef96 exclude camb from docs build 2026-01-16 02:20:26 +08:00
Neil Ruaro
80604ba7b6 remove _update_settings method 2026-01-16 02:00:48 +08:00
Neil Ruaro
256c70c631 use UserTurnStrategies 2026-01-16 01:32:08 +08:00
Glenn Powell
0e3532c529 Allow WebsocketClientTransport to send custom headers 2026-01-15 09:31:48 -08:00
Neil Ruaro
9942fcfeb2 updated per PR reviews 2026-01-16 01:20:17 +08:00
Neil Ruaro
003c24ca6e Make model parameter explicit in docstring example 2026-01-16 01:18:37 +08:00
Neil Ruaro
ed120d014d Add model-specific sample rates, transport example, and fix audio buffer alignment 2026-01-16 01:18:37 +08:00
Neil Ruaro
e76a3d04f0 Update Camb TTS to 48kHz sample rate 2026-01-16 01:18:37 +08:00
Neil Ruaro
641d17007f Clean up Camb TTS service and tests 2026-01-16 01:18:37 +08:00
Neil Ruaro
9293b5f24a Migrate Camb TTS service from raw HTTP to official SDK
- Replace aiohttp with camb SDK (AsyncCambAI client)
- Add support for passing existing SDK client instance
- Simplify API: no longer requires aiohttp_session parameter
- Update example to use simplified initialization
- Rewrite tests to mock SDK client instead of HTTP servers
2026-01-16 01:18:37 +08:00
Neil Ruaro
c1f3cbd1d4 Yield TTSAudioRawFrame directly instead of calling private method 2026-01-16 01:18:37 +08:00
Neil Ruaro
78fa2ab65e Update default voice ID, fix MARS naming, and clean up example 2026-01-16 01:18:37 +08:00
Neil Ruaro
56da2caeed Update Camb.ai TTS inference options 2026-01-16 01:18:37 +08:00
Neil Ruaro
a541d65255 Update MARS model names to mars-flash, mars-pro, mars-instruct
Rename model identifiers from mars-8-* to the new naming convention:
- mars-8-flash -> mars-flash (default)
- mars-8 -> removed
- mars-8-instruct -> mars-instruct
- Added mars-pro
2026-01-16 01:18:37 +08:00
Neil Ruaro
a3d7e9eafe Address PR feedback: add --voice-id arg, remove test script
- Add --voice-id CLI argument to example (default: 2681)
- Remove test_camb_quick.py from examples/ (tests belong in tests/)
- Update docstring with new usage
2026-01-16 01:18:36 +08:00
Neil Ruaro
54933bea2a Rename changelog to PR number 2026-01-16 01:18:36 +08:00
Neil Ruaro
fcab9899cc Add changelog entry for Camb.ai TTS integration 2026-01-16 01:18:36 +08:00
Neil Ruaro
be098e85db Remove non-working Daily/WebRTC example
The Daily transport example had authentication issues. Keeping the
local audio example (07zb-interruptible-camb-local.py) which works.
2026-01-16 01:18:36 +08:00
Neil Ruaro
ed0ff46a87 added local test 2026-01-16 01:18:36 +08:00
Neil Ruaro
7ae0d651d6 added cambai tts integration 2026-01-16 01:18:36 +08:00
Mark Backman
efd4432cfb Renumber the 07 foundational examples 2026-01-15 10:26:17 -05:00
kompfner
24082b84f2 Merge pull request #3453 from pipecat-ai/pk/consistency-pass-on-user-started-stopped-speaking-frames
Do a consistency pass on how we're sending `UserStartedSpeakingFrame`…
2026-01-15 09:24:14 -05:00
Aleix Conchillo Flaqué
dcd5840341 Merge pull request #3455 from pipecat-ai/aleix/reset-user-turn-start-strategies
UserTurnController: reset user turn start strategies when turn triggered
2026-01-14 19:28:32 -08:00
Aleix Conchillo Flaqué
9e705ce768 UserTurnController: reset user turn start strategies when turn triggered 2026-01-14 18:20:29 -08:00
Mark Backman
965466cc09 Merge pull request #3454 from pipecat-ai/mb/external-turn-strategies-timeout
fix to make on_user_turn_stop_timeout work with ExternalUserTurnStrat…
2026-01-14 20:15:31 -05:00
Mark Backman
f3993f1775 fix to make on_user_turn_stop_timeout work with ExternalUserTurnStrategies 2026-01-14 20:10:56 -05:00
Paul Kompfner
e107902b14 Do a consistency pass on how we're sending UserStartedSpeakingFrames and UserStoppedSpeakingFrames. The codebase is now consistent in broadcasting both types of frames up and downstream. 2026-01-14 18:47:15 -05:00
kompfner
e7b5ff49f4 Merge pull request #3447 from pipecat-ai/pk/add-pr-3420-to-changelog
Add PR 3420 to CHANGELOG (it was missing)
2026-01-14 15:33:44 -05:00
Paul Kompfner
e33172c44e Add PR 3420 to CHANGELOG (it was missing) 2026-01-14 15:33:07 -05:00
Mark Backman
3d858e8aa6 Merge pull request #3444 from pipecat-ai/mb/update-quickstart-0.0.99
Update quickstart example for 0.0.99
2026-01-14 10:29:55 -05:00
Mark Backman
eab059c49a Merge pull request #3446 from pipecat-ai/mb/add-3392-changelog
Add PR 3392 to changelog, linting cleanup
2026-01-14 10:28:57 -05:00
Mark Backman
4aaff04fb3 Add PR 3392 to changelog, linting cleanup 2026-01-14 09:43:17 -05:00
Mark Backman
cb364f3cab Update quickstart example for 0.0.99 2026-01-14 08:59:20 -05:00
Mark Backman
a9bfb090c3 Merge pull request #3287 from ashotbagh/feature/asyncai-multicontext-wss
Fix TTFB metric and add multi-context WebSocket support for Async TTS
2026-01-14 07:52:52 -05:00
Ashot
c4ae4025f3 Adjustments of Async TTS for multicontext websocket support 2026-01-14 16:33:30 +04:00
Ashot
15067c678d adapt Async TTS to updated AudioContextTTSService 2026-01-14 15:45:27 +04:00
Ashot
5ae592f38e Improve Async TTS interruption handling by using AudioContextTTSService class and add changelog fragments 2026-01-14 15:45:27 +04:00
Ashot
9cdbc56be3 Fix TTFB metric and add multi-context WebSocket support for Async TTS 2026-01-14 15:45:27 +04:00
90 changed files with 2237 additions and 420 deletions

5
.gitignore vendored
View File

@@ -51,4 +51,7 @@ docs/api/_build/
docs/api/api
# uv
.python-version
.python-version
# Pipecat
whisker_setup.py

View File

@@ -24,39 +24,40 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
A list of strategies can be specified for both strategies; strategies are
evaluated in order until one evaluates to true.
Available user turn start strategies:
- VADUserTurnStartStrategy
- TranscriptionUserTurnStartStrategy
- MinWordsUserTurnStartStrategy
- ExternalUserTurnStartStrategy
Available user turn start strategies:
Available user turn stop strategies:
- TranscriptionUserTurnStopStrategy
- TurnAnalyzerUserTurnStopStrategy
- ExternalUserTurnStopStrategy
- VADUserTurnStartStrategy
- TranscriptionUserTurnStartStrategy
- MinWordsUserTurnStartStrategy
- ExternalUserTurnStartStrategy
The default strategies are:
Available user turn stop strategies:
- start: [VADUserTurnStartStrategy, TranscriptionUserTurnStartStrategy]
- stop: [TranscriptionUserTurnStopStrategy]
- TranscriptionUserTurnStopStrategy
- TurnAnalyzerUserTurnStopStrategy
- ExternalUserTurnStopStrategy
urn strategies are configured when setting up `LLMContextAggregatorPair`.
The default strategies are:
- start: [VADUserTurnStartStrategy, TranscriptionUserTurnStartStrategy]
- stop: [TranscriptionUserTurnStopStrategy]
Turn strategies are configured when setting up `LLMContextAggregatorPair`.
For example:
```python
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
)
],
)
),
)
```
```python
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
)
],
)
),
)
```
In order to use the user turn strategies you must update to the new
universal `LLMContext` and `LLMContextAggregatorPair`.
@@ -69,13 +70,13 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
- Added `GrokRealtimeLLMService` for xAI's Grok Voice Agent API with real-time
voice conversations:
- Support for real-time audio streaming with WebSocket connection
- Built-in server-side VAD (Voice Activity Detection)
- Multiple voice options: Ara, Rex, Sal, Eve, Leo
- Built-in tools support: web_search, x_search, file_search
- Custom function calling with standard Pipecat tools schema
- Configurable audio formats (PCM at 8kHz-48kHz)
(PR [#3267](https://github.com/pipecat-ai/pipecat/pull/3267))
- Support for real-time audio streaming with WebSocket connection
- Built-in server-side VAD (Voice Activity Detection)
- Multiple voice options: Ara, Rex, Sal, Eve, Leo
- Built-in tools support: web_search, x_search, file_search
- Custom function calling with standard Pipecat tools schema
- Configurable audio formats (PCM at 8kHz-48kHz)
(PR [#3267](https://github.com/pipecat-ai/pipecat/pull/3267))
- Added an approximation of TTFB for Ultravox.
(PR [#3268](https://github.com/pipecat-ai/pipecat/pull/3268))
@@ -86,11 +87,12 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
(PR [#3289](https://github.com/pipecat-ai/pipecat/pull/3289))
- `LLMUserAggregator` now exposes the following events:
- `on_user_turn_started`: triggered when a user turn starts
- `on_user_turn_stopped`: triggered when a user turn ends
- `on_user_turn_stop_timeout`: triggered when a user turn does not stop
and times out
(PR [#3291](https://github.com/pipecat-ai/pipecat/pull/3291))
- `on_user_turn_started`: triggered when a user turn starts
- `on_user_turn_stopped`: triggered when a user turn ends
- `on_user_turn_stop_timeout`: triggered when a user turn does not stop
and times out
(PR [#3291](https://github.com/pipecat-ai/pipecat/pull/3291))
- Introducing user mute strategies. User mute strategies indicate when user
input should be muted based on the current system state.
@@ -104,12 +106,12 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
frame is muted if any of the configured strategies indicates it should be
muted.
Available user mute strategies:
Available user mute strategies:
* `FirstSpeechUserMuteStrategy`
* `MuteUntilFirstBotCompleteUserMuteStrategy`
* `AlwaysUserMuteStrategy`
* `FunctionCallUserMuteStrategy`
- `FirstSpeechUserMuteStrategy`
- `MuteUntilFirstBotCompleteUserMuteStrategy`
- `AlwaysUserMuteStrategy`
- `FunctionCallUserMuteStrategy`
User mute strategies replace the legacy `STTMuteFilter` and provide a more
flexible and composable approach to muting user input.
@@ -117,16 +119,16 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
User mute strategies are configured when setting up the
`LLMContextAggregatorPair`. For example:
```python
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_mute_strategies=[
FirstSpeechUserMuteStrategy(),
]
),
)
```
```python
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_mute_strategies=[
FirstSpeechUserMuteStrategy(),
]
),
)
```
In order to use user mute strategies you should update to the new universal
`LLMContext` and `LLMContextAggregatorPair`.
@@ -159,16 +161,17 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
(PR [#3357](https://github.com/pipecat-ai/pipecat/pull/3357))
- Added image support to `OpenAIRealtimeLLMService` via `InputImageRawFrame`:
- New `start_video_paused` parameter to control initial video input state
- New `video_frame_detail` parameter to set image processing quality
("auto",
"low", or "high"). This corresponds to OpenAI Realtime's `image_detail`
parameter.
- `set_video_input_paused()` method to pause/resume video input at runtime
- `set_video_frame_detail()` method to adjust video frame quality
dynamically
- Automatic rate limiting (1 frame per second) to prevent API overload
(PR [#3360](https://github.com/pipecat-ai/pipecat/pull/3360))
- New `start_video_paused` parameter to control initial video input state
- New `video_frame_detail` parameter to set image processing quality
("auto",
"low", or "high"). This corresponds to OpenAI Realtime's `image_detail`
parameter.
- `set_video_input_paused()` method to pause/resume video input at runtime
- `set_video_frame_detail()` method to adjust video frame quality
dynamically
- Automatic rate limiting (1 frame per second) to prevent API overload
(PR [#3360](https://github.com/pipecat-ai/pipecat/pull/3360))
- Added `UserTurnProcessor`, a frame processor built on `UserTurnController`
that pushes `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` frames
@@ -188,11 +191,12 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
(PR [#3374](https://github.com/pipecat-ai/pipecat/pull/3374))
- `LLMAssistantAggregator` now exposes the following events:
- `on_assistant_turn_started`: triggered when the assistant turn starts
- `on_assistant_turn_stopped`: triggered when the assistant turn ends
- `on_assistant_thought`: triggered when there's an assistant thought
available
(PR [#3385](https://github.com/pipecat-ai/pipecat/pull/3385))
- `on_assistant_turn_started`: triggered when the assistant turn starts
- `on_assistant_turn_stopped`: triggered when the assistant turn ends
- `on_assistant_thought`: triggered when there's an assistant thought
available
(PR [#3385](https://github.com/pipecat-ai/pipecat/pull/3385))
- Added `KrispVivaTurn` analyzer for end of turn detection using the Krisp VIVA
SDK (requires `krisp_audio`).
@@ -202,13 +206,14 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
register custom pipeline task setup files by setting the
`PIPECAT_SETUP_FILES` environment variable. This variable should contain a
colon-separated list of Python files (e.g. `export
PIPECAT_SETUP_FILES="setup1.py:setup.py:..."`). Each file must define a
PIPECAT_SETUP_FILES="setup1.py:setup.py:..."`). Each file must define a
function with the following signature:
```python
async def setup_pipeline_task(task: PipelineTask):
...
```
```python
async def setup_pipeline_task(task: PipelineTask):
...
```
(PR [#3397](https://github.com/pipecat-ai/pipecat/pull/3397))
- Added a keepalive task for `InworldTTSService` to keep the service connected
@@ -238,12 +243,14 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
- Updated `ElevenLabsRealtimeSTTService` to accept the
`include_language_detection` parameter to detect language.
```python
stt = ElevenLabsRealtimeSTTService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
include_language_detection=True
)
```
```python
stt = ElevenLabsRealtimeSTTService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
include_language_detection=True
)
```
(PR [#3216](https://github.com/pipecat-ai/pipecat/pull/3216))
- Updated `SpeechmaticsSTTService` to use new Python Voice SDK with improved
@@ -251,16 +258,18 @@ turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
without any impact on accuracy. Use the `turn_detection_mode` parameter to control
the endpointing of speech, with `TurnDetectionMode.EXTERNAL` (default),
`TurnDetectionMode.ADAPTIVE`, or `TurnDetectionMode.SMART_TURN`.
```python
```python
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
),
)
```
```
(PR [#3225](https://github.com/pipecat-ai/pipecat/pull/3225))
- `daily-python` updated to 0.23.0.
@@ -273,10 +282,15 @@ turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
- Updates to Inworld TTS services:
- Improved `InworldTTSService`'s websocket implementation to better flush
and close context to better handle long inputs.
- Improved docstrings for `InworldTTSService` and `InworldHttpTTSService`.
(PR [#3288](https://github.com/pipecat-ai/pipecat/pull/3288))
- Improved `InworldTTSService`'s websocket implementation to better flush
and close context to better handle long inputs.
- Improved docstrings for `InworldTTSService` and `InworldHttpTTSService`.
(PR [#3288](https://github.com/pipecat-ai/pipecat/pull/3288))
- Improved the error handling and reconnection logic for `WebsocketServer` by
distinguishing between errors when disconnecting and websocket communication
errors.
(PR [#3392](https://github.com/pipecat-ai/pipecat/pull/3392))
- Updated `DeepgramSTTService` to push user started/stopped speaking and
interruption frames when `vad_enabled` is set to true. This centralizes the
@@ -308,7 +322,8 @@ turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
- Smart Turn now takes into account `vad_start_seconds` when buffering audio,
meaning that the start of the turn audio is not cut off. This improves
accuracy for short utterances.
- The default value of `pre_speech_ms` is now set to 500ms for Smart Turn.
- The default value of `pre_speech_ms` is now set to 500ms for Smart Turn.
(PR [#3377](https://github.com/pipecat-ai/pipecat/pull/3377))
- Improved Krisp SDK management to allow `KrispVivaTurn` and `KrispVivaFilter`
@@ -376,17 +391,18 @@ turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
From the developer's point of view, switching to using `LLMContext`
machinery will usually be a matter of going from this:
```python
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
```
```python
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
```
To this:
To this:
```
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
```
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
(PR [#3263](https://github.com/pipecat-ai/pipecat/pull/3263))
- `STTMuteFilter` is deprecated and will be removed in a future version. Use
@@ -401,16 +417,17 @@ turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
`LLMUserAggregator`'s new parameter `user_turn_strategies` instead. For
example, to disable interruptions but still get user turns you can do:
```python
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
start=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
),
),
)
```
```python
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
start=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
),
),
)
```
(PR [#3297](https://github.com/pipecat-ai/pipecat/pull/3297))
- `TranscriptProcessor` and related data classes and frames
@@ -433,7 +450,9 @@ start=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
### Fixed
- Improved error handling in `ElevenLabsRealtimeSTTService`
- Fixed an issue in `ElevenLabsRealtimeSTTService` causing an infinite loop
(PR [#3233](https://github.com/pipecat-ai/pipecat/pull/3233))
- Fixed an issue in `ElevenLabsRealtimeSTTService` causing an infinite loop
that blocks the process if the websocket disconnects due to an error
(PR [#3233](https://github.com/pipecat-ai/pipecat/pull/3233))
@@ -446,13 +465,14 @@ start=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
(PR [#3322](https://github.com/pipecat-ai/pipecat/pull/3322))
- Updated `SpeechmaticsSTTService` for version `0.0.99+`:
- Fixed `SpeechmaticsSTTService` to listen for `VADUserStoppedSpeakingFrame`
in order to finalize transcription.
- Default to `TurnDetectionMode.FIXED` for Pipecat-controlled end of turn
detection.
- Only emit VAD + interruption frames if VAD is enabled within the plugin
(modes other than `TurnDetectionMode.FIXED` or `TurnDetectionMode.EXTERNAL`).
(PR [#3328](https://github.com/pipecat-ai/pipecat/pull/3328))
- Fixed `SpeechmaticsSTTService` to listen for `VADUserStoppedSpeakingFrame`
in order to finalize transcription.
- Default to `TurnDetectionMode.FIXED` for Pipecat-controlled end of turn
detection.
- Only emit VAD + interruption frames if VAD is enabled within the plugin
(modes other than `TurnDetectionMode.FIXED` or `TurnDetectionMode.EXTERNAL`).
(PR [#3328](https://github.com/pipecat-ai/pipecat/pull/3328))
- Fixed an issue with function calling where a handler failing to invoke its
result callback could leave the context stuck in IN_PROGRESS, causing LLM
@@ -481,6 +501,9 @@ start=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
guard was set.
(PR [#3400](https://github.com/pipecat-ai/pipecat/pull/3400))
- Fixed parallel function calling when using Gemini thinking.
(PR [3420](https://github.com/pipecat-ai/pipecat/pull/3420))
- Fixed an issue in `traced_llm` where `model_name` in OpenTelemetry appears as
`unknown`.
(PR [#3422](https://github.com/pipecat-ai/pipecat/pull/3422))

View File

@@ -73,9 +73,9 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
| Category | Services |
| ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [Hathora](https://docs.pipecat.ai/server/services/stt/hathora), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hathora](https://docs.pipecat.ai/server/services/tts/hathora), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/server/services/s2s/ultravox), |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Exotel](https://docs.pipecat.ai/server/utilities/serializers/exotel), [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx), [Vonage](https://docs.pipecat.ai/server/utilities/serializers/vonage) |

1
changelog/3169.added.md Normal file
View File

@@ -0,0 +1 @@
- Added Hathora service to support Hathora-hosted TTS and STT models (only non-streaming)

View File

@@ -0,0 +1 @@
- Enhanced interruption handling in `AsyncAITTSService` by supporting multi-context WebSocket sessions for more robust context management.

1
changelog/3287.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Corrected TTFB metric calculation in `AsyncAIHttpTTSService`.

1
changelog/3349.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `CambTTSService`, using Camb.ai's TTS integration with MARS models (mars-flash, mars-pro, mars-instruct) for high-quality text-to-speech synthesis.

8
changelog/3446.fixed.md Normal file
View File

@@ -0,0 +1,8 @@
- Fixed an issue where the "bot-llm-text" RTVI event would not fire for realtime (speech-to-speech) services:
- `AWSNovaSonicLLMService`
- `GeminiLiveLLMService`
- `OpenAIRealtimeLLMService`
- `GrokRealtimeLLMService`
The issue was that these services weren't pushing `LLMTextFrame`s. Now they do.

1
changelog/3454.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue where `on_user_turn_stop_timeout` could fire while a user is talking when using `ExternalUserTurnStrategies`.

1
changelog/3455.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue where user turn start strategies were not being reset after a user turn started, causing incorrect strategy behavior.

1
changelog/3461.added.md Normal file
View File

@@ -0,0 +1 @@
- Added the `additional_headers` param to `WebsocketClientParams`, allowing `WebsocketClientTransport` to send custom headers on connect, for cases such as authentication.

1
changelog/3462.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `MinWordsUserTurnStartStrategy` to not aggregate transcriptions, preventing incorrect turn starts when words are spoken with pauses between them.

View File

@@ -0,0 +1 @@
- For consistency with other package names, we just deprecated `pipecat.turns.mute` (introduced in Pipecat 0.0.99) in favor of `pipecat.turns.user_mute`.

1
changelog/3480.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue where Grok Realtime would error out when running with SmallWebRTC transport.

1
changelog/3482.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `UserIdleController` for detecting user idle state, integrated into `LLMUserAggregator` and `UserTurnProcessor` via optional `user_idle_timeout` parameter. Emits `on_user_turn_idle` event for application-level handling. Deprecated `UserIdleProcessor` in favor of the new compositional approach.

View File

@@ -0,0 +1 @@
- Throttle `UserSpeakingFrame` to broadcast at most every 200ms instead of on every audio chunk, reducing frame processing overhead during user speech.

1
changelog/3484.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a `Mem0MemoryService` issue where passing `async_mode: true` was causing an error. See https://docs.mem0.ai/platform/features/async-mode-default-change.

3
changelog/3489.fixed.md Normal file
View File

@@ -0,0 +1,3 @@
- Fixed `AzureTTSService` transcript formatting issues:
- Punctuation now appears without extra spaces (e.g., "Hello!" instead of "Hello !")
- CJK languages (Chinese, Japanese, Korean) no longer have unwanted spaces between characters

1
changelog/3490.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `on_user_mute_started` and `on_user_mute_stopped` event handlers to `LLMUserAggregator` for tracking user mute state changes.

View File

@@ -91,6 +91,25 @@ autodoc_mock_imports = [
# MLX dependencies (Apple Silicon specific)
"mlx",
"mlx_whisper", # Note: might need underscore format too
# Pydantic v2 compatibility issues in third-party SDKs
"hume",
"hume.tts",
"hume.tts.types",
"cartesia",
"camb",
"sarvamai",
"openpipe",
"openai.types.beta.realtime",
"langchain_core",
"langchain_core.messages",
# FastAPI - Pydantic v2 compatibility issues during Sphinx autodoc
"fastapi",
"fastapi.applications",
"fastapi.routing",
"fastapi.params",
"fastapi.middleware",
"fastapi.responses",
"uvicorn",
]
# HTML output settings

View File

@@ -31,6 +31,9 @@ AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Camb.ai
CAMB_API_KEY=...
# Cartesia
CARTESIA_API_KEY=...
CARTESIA_VOICE_ID=...
@@ -82,6 +85,9 @@ GROK_API_KEY=...
# Groq
GROQ_API_KEY=...
# Hathora
HATHORA_API_KEY=...
# Heygen
HEYGEN_API_KEY=...
HEYGEN_LIVE_AVATAR_API_KEY=...

View File

@@ -0,0 +1,138 @@
#
# Copyright (c) 20242026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting Camb AI TTS bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CambTTSService(
api_key=os.getenv("CAMB_API_KEY"),
model="mars-flash",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful voice assistant powered by Camb AI text-to-speech. "
"Keep your responses concise and conversational since they will be spoken aloud. "
"Avoid special characters, emojis, or bullet points.",
},
]
context = LLMContext(messages)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
),
),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
audio_out_sample_rate=22050,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,18 +1,14 @@
#
# Copyright (c) 2024-2026, Daily
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
@@ -25,12 +21,10 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.hathora.stt import HathoraSTTService
from pipecat.services.hathora.tts import HathoraTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -40,15 +34,6 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Add a delay to test interruption during function calls
logger.info("Weather API call starting...")
await asyncio.sleep(5) # 5-second delay
logger.info("Weather API call completed")
await params.result_callback({"conditions": "nice", "temperature": "75"})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -74,50 +59,30 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Configure the mute processor with both strategies
stt_mute_processor = STTMuteFilter(
config=STTMuteConfig(
strategies={
STTMuteStrategy.MUTE_UNTIL_FIRST_BOT_COMPLETE,
STTMuteStrategy.FUNCTION_CALL,
}
),
stt = HathoraSTTService(
model="nvidia-parakeet-tdt-0.6b-v3",
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm.register_function("get_current_weather", fetch_weather_from_api)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
tts = HathoraTTSService(
model="hexgrad-kokoro-82m",
)
# See https://models.hathora.dev/model/qwen3-30b-a3b
llm = OpenAILLMService(
base_url="https://app-362f7ca1-6975-4e18-a605-ab202bf2c315.app.hathora.dev/v1",
api_key=os.getenv("HATHORA_API_KEY"),
model=None,
)
tools = ToolsSchema(standard_tools=[weather_function])
messages = [
{
"role": "system",
"content": "You are a helpful assistant who can check the weather. Always check the weather when a location is mentioned. Respond concisely and naturally. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages, tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
@@ -129,13 +94,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
stt_mute_processor, # Add the mute processor between STT and context aggregator
user_aggregator, # User responses
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -151,13 +115,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation with a weather-related prompt
messages.append(
{
"role": "system",
"content": "Ask the user what city they'd like to know the weather for.",
}
)
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -13,7 +13,12 @@ from loguru import logger
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, LLMRunFrame, TTSSpeakFrame
from pipecat.frames.frames import (
EndTaskFrame,
LLMMessagesAppendFrame,
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -22,7 +27,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -36,6 +41,43 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
class IdleHandler:
"""Helper class to manage user idle retry logic."""
def __init__(self):
self._retry_count = 0
def reset(self):
"""Reset the retry count when user becomes active."""
self._retry_count = 0
async def handle_idle(self, aggregator):
"""Handle user idle event with escalating prompts."""
self._retry_count += 1
if self._retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
message = {
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
await aggregator.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
elif self._retry_count == 2:
# Second attempt: More direct prompt
message = {
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
await aggregator.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
else:
# Third attempt: End the conversation
await aggregator.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await aggregator.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -84,42 +126,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
user_turn_strategies=UserTurnStrategies(
stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
),
user_idle_timeout=5.0, # Detect user idle after 5 seconds
),
)
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
message = {
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
message = {
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
else:
# Third attempt: End the conversation
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
return False
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_idle, # Idle user check-in
user_aggregator,
user_aggregator, # User aggregator with built-in idle detection
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
@@ -136,6 +151,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Set up idle handling with retry logic
idle_handler = IdleHandler()
@user_aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
await idle_handler.handle_idle(aggregator)
@user_aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy):
idle_handler.reset()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")

View File

@@ -119,7 +119,7 @@ class CompletenessCheck(FrameProcessor):
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._notifier.notify()
elif isinstance(frame, TextFrame) and frame.text == "NO":
logger.debug("Completeness check NO")

View File

@@ -322,7 +322,7 @@ class CompletenessCheck(FrameProcessor):
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("!!! Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._notifier.notify()
elif isinstance(frame, TextFrame) and frame.text == "NO":
logger.debug("!!! Completeness check NO")

View File

@@ -451,7 +451,7 @@ class CompletenessCheck(FrameProcessor):
logger.debug("Completeness check YES")
if self._idle_task:
await self.cancel_task(self._idle_task)
await self.push_frame(UserStoppedSpeakingFrame())
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._audio_accumulator.reset()
await self._notifier.notify()
elif isinstance(frame, TextFrame):

View File

@@ -34,7 +34,7 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.mute import (
from pipecat.turns.user_mute import (
FunctionCallUserMuteStrategy,
MuteUntilFirstBotCompleteUserMuteStrategy,
)
@@ -161,6 +161,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
@user_aggregator.event_handler("on_user_mute_started")
async def on_user_mute_started(aggregator):
logger.info(f"User mute started")
@user_aggregator.event_handler("on_user_mute_stopped")
async def on_user_mute_stopped(aggregator):
logger.info(f"User mute stopped")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025, Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025, Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -44,8 +44,11 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -53,6 +56,10 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.turns.user_stop.turn_analyzer_user_turn_stop_strategy import (
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
logger.info("✅ All components loaded successfully!")
@@ -79,20 +86,27 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
),
),
)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
assistant_aggregator, # Assistant spoken responses
]
)
@@ -130,13 +144,11 @@ async def bot(runner_args: RunnerArguments):
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(),
),
}

View File

@@ -41,8 +41,11 @@ dependencies = [
]
[project.urls]
Homepage = "https://pipecat.ai"
Documentation = "https://docs.pipecat.ai/"
Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
Issues = "https://github.com/pipecat-ai/pipecat/issues"
Changelog = "https://github.com/pipecat-ai/pipecat/blob/main/CHANGELOG.md"
[project.optional-dependencies]
aic = [ "aic-sdk~=1.2.0" ]
@@ -53,6 +56,7 @@ aws = [ "aioboto3~=15.5.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.2.0; python_version>='3.12'" ]
azure = [ "azure-cognitiveservices-speech~=1.44.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
camb = [ "camb-sdk>=1.5.4" ]
cerebras = []
daily = [ "daily-python~=0.23.0" ]
deepgram = [ "deepgram-sdk~=4.7.0", "pipecat-ai[websockets-base]" ]
@@ -96,7 +100,7 @@ qwen = []
remote-smart-turn = []
rime = [ "pipecat-ai[websockets-base]" ]
riva = [ "pipecat-ai[nvidia]" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.122.0", "pipecat-ai-small-webrtc-prebuilt>=2.0.4"]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.128.0", "pipecat-ai-small-webrtc-prebuilt>=2.0.4"]
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
sambanova = []
sarvam = [ "sarvamai==0.1.21", "pipecat-ai[websockets-base]" ]
@@ -112,7 +116,7 @@ together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "pipecat-ai[websockets-base]" ]
webrtc = [ "aiortc>=1.14.0,<2", "opencv-python>=4.11.0.86,<5" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.122.0" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.128.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
whisper = [ "faster-whisper~=1.1.1" ]

View File

@@ -97,15 +97,6 @@ TESTS_07 = [
("07-interruptible-cartesia-http.py", EVAL_SIMPLE_MATH),
("07a-interruptible-speechmatics.py", EVAL_SIMPLE_MATH),
("07a-interruptible-speechmatics-vad.py", EVAL_SIMPLE_MATH),
("07aa-interruptible-soniox.py", EVAL_SIMPLE_MATH),
("07ab-interruptible-inworld.py", EVAL_SIMPLE_MATH),
("07ab-interruptible-inworld-http.py", EVAL_SIMPLE_MATH),
("07ac-interruptible-asyncai.py", EVAL_SIMPLE_MATH),
("07ac-interruptible-asyncai-http.py", EVAL_SIMPLE_MATH),
# Need license key to run
# ("07ad-interruptible-aicoustics.py", EVAL_SIMPLE_MATH),
("07ae-interruptible-hume.py", EVAL_SIMPLE_MATH),
("07af-interruptible-gradium.py", EVAL_SIMPLE_MATH),
("07b-interruptible-langchain.py", EVAL_SIMPLE_MATH),
("07c-interruptible-deepgram.py", EVAL_SIMPLE_MATH),
("07c-interruptible-deepgram-flux.py", EVAL_SIMPLE_MATH),
@@ -137,6 +128,16 @@ TESTS_07 = [
("07y-interruptible-minimax.py", EVAL_SIMPLE_MATH),
("07z-interruptible-sarvam.py", EVAL_SIMPLE_MATH),
("07z-interruptible-sarvam-http.py", EVAL_SIMPLE_MATH),
("07za-interruptible-soniox.py", EVAL_SIMPLE_MATH),
("07zb-interruptible-inworld.py", EVAL_SIMPLE_MATH),
("07zb-interruptible-inworld-http.py", EVAL_SIMPLE_MATH),
("07zc-interruptible-asyncai.py", EVAL_SIMPLE_MATH),
("07zc-interruptible-asyncai-http.py", EVAL_SIMPLE_MATH),
# Need license key to run
# ("07zd-interruptible-aicoustics.py", EVAL_SIMPLE_MATH),
("07ze-interruptible-hume.py", EVAL_SIMPLE_MATH),
("07zf-interruptible-gradium.py", EVAL_SIMPLE_MATH),
("07zh-interruptible-hathora.py", EVAL_SIMPLE_MATH),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),
# Needs a Krisp license.

View File

@@ -61,6 +61,7 @@ class KrispFilter(BaseAudioFilter):
Provides real-time noise reduction for audio streams using Krisp's
proprietary noise suppression algorithms. Requires a Krisp model file
for operation.
.. deprecated:: 0.0.94
The KrispFilter is deprecated and will be removed in a future version.
Use KrispVivaFilter instead.

View File

@@ -1024,10 +1024,8 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
logger.debug(
f"{self} FunctionCallCancelFrame: [{frame.function_name}:{frame.tool_call_id}]"
)
if frame.tool_call_id not in self._function_calls_in_progress:
return
if self._function_calls_in_progress[frame.tool_call_id].cancel_on_interruption:
function_call = self._function_calls_in_progress.get(frame.tool_call_id)
if function_call and function_call.cancel_on_interruption:
await self.handle_function_call_cancel(frame)
del self._function_calls_in_progress[frame.tool_call_id]

View File

@@ -62,7 +62,8 @@ from pipecat.processors.aggregators.llm_context import (
NotGiven,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.turns.mute import BaseUserMuteStrategy
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_controller import UserTurnController
@@ -80,11 +81,16 @@ class LLMUserAggregatorParams:
user_mute_strategies: List of user mute strategies.
user_turn_stop_timeout: Time in seconds to wait before considering the
user's turn finished.
user_idle_timeout: Optional timeout in seconds for detecting user idle state.
If set, the aggregator will emit an `on_user_turn_idle` event when the user
has been idle (not speaking) for this duration. Set to None to disable
idle detection.
"""
user_turn_strategies: Optional[UserTurnStrategies] = None
user_mute_strategies: List[BaseUserMuteStrategy] = field(default_factory=list)
user_turn_stop_timeout: float = 5.0
user_idle_timeout: Optional[float] = None
@dataclass
@@ -291,11 +297,14 @@ class LLMUserAggregator(LLMContextAggregator):
- on_user_turn_started: Called when the user turn starts
- on_user_turn_stopped: Called when the user turn ends
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
- on_user_turn_idle: Called when the user has been idle for the configured timeout
- on_user_mute_started: Called when the user becomes muted
- on_user_mute_stopped: Called when the user becomes unmuted
Example::
@aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy: BaseUserTurnStartStrategy]):
async def on_user_turn_started(aggregator, strategy: BaseUserTurnStartStrategy):
...
@aggregator.event_handler("on_user_turn_stopped")
@@ -306,6 +315,18 @@ class LLMUserAggregator(LLMContextAggregator):
async def on_user_turn_stop_timeout(aggregator):
...
@aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
...
@aggregator.event_handler("on_user_mute_started")
async def on_user_mute_started(aggregator):
...
@aggregator.event_handler("on_user_mute_stopped")
async def on_user_mute_stopped(aggregator):
...
"""
def __init__(
@@ -328,6 +349,9 @@ class LLMUserAggregator(LLMContextAggregator):
self._register_event_handler("on_user_turn_started")
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._register_event_handler("on_user_mute_started")
self._register_event_handler("on_user_mute_stopped")
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
@@ -350,6 +374,16 @@ class LLMUserAggregator(LLMContextAggregator):
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
)
# Optional user idle controller
self._user_idle_controller: Optional[UserIdleController] = None
if self._params.user_idle_timeout:
self._user_idle_controller = UserIdleController(
user_idle_timeout=self._params.user_idle_timeout
)
self._user_idle_controller.add_event_handler(
"on_user_turn_idle", self._on_user_turn_idle
)
async def cleanup(self):
"""Clean up processor resources."""
await super().cleanup()
@@ -405,6 +439,9 @@ class LLMUserAggregator(LLMContextAggregator):
await self._user_turn_controller.process_frame(frame)
if self._user_idle_controller:
await self._user_idle_controller.process_frame(frame)
async def push_aggregation(self) -> str:
"""Push the current aggregation."""
if len(self._aggregation) == 0:
@@ -420,6 +457,9 @@ class LLMUserAggregator(LLMContextAggregator):
async def _start(self, frame: StartFrame):
await self._user_turn_controller.setup(self.task_manager)
if self._user_idle_controller:
await self._user_idle_controller.setup(self.task_manager)
for s in self._params.user_mute_strategies:
await s.setup(self.task_manager)
@@ -432,6 +472,9 @@ class LLMUserAggregator(LLMContextAggregator):
async def _cleanup(self):
await self._user_turn_controller.cleanup()
if self._user_idle_controller:
await self._user_idle_controller.cleanup()
for s in self._params.user_mute_strategies:
await s.cleanup()
@@ -461,6 +504,12 @@ class LLMUserAggregator(LLMContextAggregator):
logger.debug(f"{self}: user is now {'muted' if should_mute_next_time else 'unmuted'}")
self._user_is_muted = should_mute_next_time
# Emit mute state change events
if self._user_is_muted:
await self._call_event_handler("on_user_mute_started")
else:
await self._call_event_handler("on_user_mute_stopped")
return should_mute_frame
async def _handle_llm_run(self, frame: LLMRunFrame):
@@ -565,6 +614,9 @@ class LLMUserAggregator(LLMContextAggregator):
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")
async def _on_user_turn_idle(self, controller):
await self._call_event_handler("on_user_turn_idle")
class LLMAssistantAggregator(LLMContextAggregator):
"""Assistant LLM aggregator that processes bot responses and function calls.
@@ -858,10 +910,8 @@ class LLMAssistantAggregator(LLMContextAggregator):
logger.debug(
f"{self} FunctionCallCancelFrame: [{frame.function_name}:{frame.tool_call_id}]"
)
if frame.tool_call_id not in self._function_calls_in_progress:
return
if self._function_calls_in_progress[frame.tool_call_id].cancel_on_interruption:
function_call = self._function_calls_in_progress.get(frame.tool_call_id)
if function_call and function_call.cancel_on_interruption:
# Update context with the function call cancellation
self._update_function_call_result(frame.function_name, frame.tool_call_id, "CANCELLED")
del self._function_calls_in_progress[frame.tool_call_id]

View File

@@ -8,6 +8,7 @@
import asyncio
import inspect
import warnings
from typing import Awaitable, Callable, Union
from pipecat.frames.frames import (
@@ -26,6 +27,10 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class UserIdleProcessor(FrameProcessor):
"""Monitors user inactivity and triggers callbacks after timeout periods.
.. deprecated::
UserIdleProcessor is deprecated in 0.0.100 and will be removed in a future version.
Use LLMUserAggregator with user_idle_timeout parameter instead.
This processor tracks user activity and triggers configurable callbacks when
users become idle. It starts monitoring only after the first conversation
activity and supports both basic and retry-based callback patterns.
@@ -70,6 +75,14 @@ class UserIdleProcessor(FrameProcessor):
**kwargs: Additional arguments passed to FrameProcessor.
"""
super().__init__(**kwargs)
warnings.warn(
"UserIdleProcessor is deprecated in 0.0.100 and will be removed in a "
"future version. Use LLMUserAggregator with user_idle_timeout parameter "
"instead.",
DeprecationWarning,
)
self._callback = self._wrap_callback(callback)
self._timeout = timeout
self._retry_count = 0

View File

@@ -34,8 +34,7 @@ class VonageFrameSerializer(FrameSerializer):
WebSocket streaming protocol.
Note:
Ref docs:
https://developer.vonage.com/en/video/guides/audio-connector
Ref docs: https://developer.vonage.com/en/video/guides/audio-connector
"""
class InputParams(BaseModel):

View File

@@ -9,6 +9,7 @@
import asyncio
import base64
import json
import uuid
from typing import AsyncGenerator, Optional
import aiohttp
@@ -27,7 +28,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleTTSService, TTSService
from pipecat.services.tts_service import AudioContextTTSService, TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -72,7 +73,7 @@ def language_to_async_language(language: Language) -> Optional[str]:
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
class AsyncAITTSService(InterruptibleTTSService):
class AsyncAITTSService(AudioContextTTSService):
"""Async TTS service with WebSocket streaming.
Provides text-to-speech using Async's streaming WebSocket API.
@@ -148,6 +149,7 @@ class AsyncAITTSService(InterruptibleTTSService):
self._receive_task = None
self._keepalive_task = None
self._started = False
self._context_id = None
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -168,8 +170,8 @@ class AsyncAITTSService(InterruptibleTTSService):
"""
return language_to_async_language(language)
def _build_msg(self, text: str = "", force: bool = False) -> str:
msg = {"transcript": text, "force": force}
def _build_msg(self, text: str = "", context_id: str = "", force: bool = False) -> str:
msg = {"transcript": text, "context_id": context_id, "force": force}
return json.dumps(msg)
async def start(self, frame: StartFrame):
@@ -253,11 +255,16 @@ class AsyncAITTSService(InterruptibleTTSService):
if self._websocket:
logger.debug("Disconnecting from Async")
# Close all contexts and the socket
if self._context_id:
await self._websocket.send(json.dumps({"terminate": True}))
await self._websocket.close()
logger.debug("Disconnected from Async")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._websocket = None
self._context_id = None
self._started = False
await self._call_event_handler("on_disconnected")
@@ -268,10 +275,10 @@ class AsyncAITTSService(InterruptibleTTSService):
async def flush_audio(self):
"""Flush any pending audio."""
if not self._websocket:
if not self._context_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
msg = self._build_msg(text=" ", force=True)
msg = self._build_msg(text=" ", context_id=self._context_id, force=True)
await self._websocket.send(msg)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -291,35 +298,75 @@ class AsyncAITTSService(InterruptibleTTSService):
if not msg:
continue
elif msg.get("audio"):
received_ctx_id = msg.get("context_id")
# Handle final messages first, regardless of context availability
# At the moment, this message is received AFTER the close_context message is
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
if msg.get("final") is True:
logger.trace(f"Received final message for context {received_ctx_id}")
continue
# Check if this message belongs to the current context.
if not self.audio_context_available(received_ctx_id):
if self._context_id == received_ctx_id:
logger.debug(
f"Received a delayed message, recreating the context: {self._context_id}"
)
await self.create_audio_context(self._context_id)
else:
# This can happen if a message is received _after_ we have closed a context
# due to user interruption but _before_ the `isFinal` message for the context
# is received.
logger.debug(f"Ignoring message from unavailable context: {received_ctx_id}")
continue
if msg.get("audio"):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["audio"]),
sample_rate=self.sample_rate,
num_channels=1,
)
await self.push_frame(frame)
elif msg.get("error_code"):
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['message']}")
else:
await self.push_error(error_msg=f"Unknown message type: {msg}")
audio = base64.b64decode(msg["audio"])
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
await self.append_to_audio_context(received_ctx_id, frame)
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 3
KEEPALIVE_SLEEP = 10
while True:
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.state is State.OPEN:
keepalive_message = {"transcript": " "}
logger.trace("Sending keepalive message")
if self._context_id:
keepalive_message = {
"transcript": " ",
"context_id": self._context_id,
}
logger.trace("Sending keepalive message")
else:
# It's possible to have a user interruption which clears the context
# without generating a new TTS response. In this case, we'll just send
# an empty message to keep the connection alive.
keepalive_message = {"transcript": " "}
logger.trace("Sending keepalive without context")
await self._websocket.send(json.dumps(keepalive_message))
except websockets.ConnectionClosed as e:
logger.warning(f"{self} keepalive error: {e}")
break
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by closing the current context."""
await super()._handle_interruption(frame, direction)
# Close the current context when interrupted without closing the websocket
if self._context_id and self._websocket:
try:
await self._websocket.send(
json.dumps(
{"context_id": self._context_id, "close_context": True, "transcript": ""}
)
)
except Exception as e:
logger.error(f"Error closing context on interruption: {e}")
self._context_id = None
self._started = False
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Async API websocket endpoint.
@@ -336,21 +383,29 @@ class AsyncAITTSService(InterruptibleTTSService):
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
if not self._started:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
self._started = True
msg = self._build_msg(text=text, force=True)
try:
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
if not self._started:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
self._started = True
if not self._context_id:
self._context_id = str(uuid.uuid4())
if not self.audio_context_available(self._context_id):
await self.create_audio_context(self._context_id)
msg = self._build_msg(text=text, force=True, context_id=self._context_id)
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
else:
if self._websocket and self._context_id:
msg = self._build_msg(text=text, force=True, context_id=self._context_id)
await self._get_websocket().send(msg)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
self._started = False
return
yield None
except Exception as e:
@@ -490,7 +545,14 @@ class AsyncAIHttpTTSService(TTSService):
await self.push_error(error_msg=f"Async API error: {error_text}")
raise Exception(f"Async API returned status {response.status}: {error_text}")
audio_data = await response.read()
# Read streaming bytes; stop TTFB on the *first* received chunk
buffer = bytearray()
async for chunk in response.content.iter_chunked(64 * 1024):
if not chunk:
continue
await self.stop_ttfb_metrics()
buffer.extend(chunk)
audio_data = bytes(buffer)
await self.start_tts_usage_metrics(text)

View File

@@ -38,6 +38,7 @@ from pipecat.frames.frames import (
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
@@ -1077,9 +1078,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"Assistant response text added: {text}")
# Report the text of the assistant response.
frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_assistant_response_text_frames(text)
# HACK: here we're also buffering the assistant text ourselves as a
# backup rather than relying solely on the assistant context aggregator
@@ -1112,11 +1111,7 @@ class AWSNovaSonicLLMService(LLMService):
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(
self._assistant_text_buffer, aggregated_by=AggregationType.SENTENCE
)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_assistant_response_text_frames(self._assistant_text_buffer)
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
@@ -1128,6 +1123,25 @@ class AWSNovaSonicLLMService(LLMService):
# Clear out the buffered assistant text
self._assistant_text_buffer = ""
async def _push_assistant_response_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Nova Sonic combines both LLM and TTS functionality, you
# would think we wouldn't need to push LLMTextFrames at all. However,
# RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
#
# user transcription reporting
#
@@ -1187,7 +1201,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
)
await self.push_frame(UserStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
await self.broadcast_frame(UserStartedSpeakingFrame)
# Send the transcription upstream for the user context aggregator
frame = TranscriptionFrame(
@@ -1197,7 +1211,7 @@ class AWSNovaSonicLLMService(LLMService):
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
await self.push_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
await self.broadcast_frame(UserStoppedSpeakingFrame)
# Clear out the buffered user text
self._user_text_buffer = ""

View File

@@ -277,6 +277,8 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
self._started = False
self._first_chunk = True
self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds
self._last_word: Optional[str] = None # Track last word for punctuation merging
self._last_timestamp: Optional[float] = None # Track last timestamp
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -346,9 +348,34 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
await self.cancel_task(self._word_processor_task)
self._word_processor_task = None
def _is_cjk_language(self) -> bool:
"""Check if the configured language is CJK (Chinese, Japanese, Korean).
Returns:
True if the language is CJK, False otherwise.
"""
language = self._settings.get("language", "").lower()
# Check if language starts with CJK language codes
return language.startswith(("zh", "ja", "ko", "cmn", "yue", "wuu"))
def _is_punctuation_only(self, text: str) -> bool:
"""Check if text consists only of punctuation and whitespace.
Args:
text: Text to check.
Returns:
True if text is only punctuation/whitespace, False otherwise.
"""
return text and all(not c.isalnum() for c in text)
def _handle_word_boundary(self, evt):
"""Handle word boundary events from Azure SDK.
Azure sends punctuation as separate word boundaries, and breaks CJK text
into individual characters/particles. This method routes to language-specific
handlers to properly merge and emit word boundaries.
Args:
evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK
containing word text and audio offset timing.
@@ -362,13 +389,75 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
# Add cumulative offset to get absolute timestamp across sentences
absolute_seconds = self._cumulative_audio_offset + sentence_relative_seconds
# Queue word timestamp for async processing
# Use thread-safe queue since this is called from Azure SDK thread
if word:
logger.trace(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s")
# Put in temporary queue - will be processed by async task
# Store as (word, timestamp_in_seconds) tuple
self._word_boundary_queue.put_nowait((word, absolute_seconds))
if not word:
return
# Route to language-specific handler
if self._is_cjk_language():
self._handle_cjk_word_boundary(word, absolute_seconds)
else:
self._handle_non_cjk_word_boundary(word, absolute_seconds)
def _emit_pending_word(self):
"""Emit the currently buffered word if one exists."""
if self._last_word is not None:
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
self._last_word = None
self._last_timestamp = None
def _handle_cjk_word_boundary(self, word: str, timestamp: float):
"""Handle word boundaries for CJK languages (Chinese, Japanese, Korean).
CJK languages don't use spaces between words, so we merge characters together
and only emit at natural break points (punctuation or whitespace boundaries).
Without this logic, we don't get word output for CJK languages.
Args:
word: The word/character from Azure.
timestamp: Timestamp in seconds.
"""
# First word: just store it
if self._last_word is None:
self._last_word = word
self._last_timestamp = timestamp
return
# Punctuation: merge and emit (natural break)
if self._is_punctuation_only(word):
self._last_word += word
self._emit_pending_word()
return
# Whitespace: emit before boundary, start new segment
if word.strip() != word:
self._emit_pending_word()
self._last_word = word
self._last_timestamp = timestamp
return
# Default: continue merging CJK characters
self._last_word += word
def _handle_non_cjk_word_boundary(self, word: str, timestamp: float):
"""Handle word boundaries for non-CJK languages.
Non-CJK languages use spaces between words, so we emit each word separately
after merging any trailing punctuation.
Args:
word: The word from Azure.
timestamp: Timestamp in seconds.
"""
# Punctuation: merge with previous word (don't emit yet)
if self._is_punctuation_only(word) and self._last_word is not None:
self._last_word += word
return
# Regular word: emit previous, store current
if self._last_word is not None:
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
self._last_word = word
self._last_timestamp = timestamp
async def _word_processor_task_handler(self):
"""Process word timestamps from the queue and call add_word_timestamps."""
@@ -397,6 +486,12 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
Args:
evt: Completion event from Azure Speech SDK.
"""
# Flush any pending word before completing
if self._last_word is not None:
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
self._last_word = None
self._last_timestamp = None
# Update cumulative audio offset for next sentence
if evt.result and evt.result.audio_duration:
self._cumulative_audio_offset += evt.result.audio_duration.total_seconds()
@@ -435,6 +530,8 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
self._started = False
self._first_chunk = True
self._cumulative_audio_offset = 0.0
self._last_word = None
self._last_timestamp = None
async def flush_audio(self):
"""Flush any pending audio data."""

View File

@@ -0,0 +1,5 @@
#
# Copyright (c) 20242026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -0,0 +1,323 @@
#
# Copyright (c) 20242026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Camb.ai MARS text-to-speech service implementation.
This module provides TTS functionality using Camb.ai's MARS model family,
offering high-quality text-to-speech synthesis with streaming support.
Features:
- MARS models: mars-flash (fast), mars-pro (high quality)
- 140+ languages supported
- Real-time streaming via official SDK
- Model-specific sample rates: mars-pro (48kHz), mars-flash (22.05kHz)
"""
from typing import Any, AsyncGenerator, Dict, Optional
from camb import StreamTtsOutputConfiguration
from camb.client import AsyncCambAI
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
# Model-specific sample rates
MODEL_SAMPLE_RATES: Dict[str, int] = {
"mars-flash": 22050, # 22.05kHz
"mars-pro": 48000, # 48kHz
"mars-instruct": 22050, # 22.05kHz
}
def language_to_camb_language(language: Language) -> Optional[str]:
"""Convert a Pipecat Language enum to Camb.ai language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding Camb.ai language code (BCP-47 format), or None if not supported.
"""
LANGUAGE_MAP = {
Language.EN: "en-us",
Language.EN_US: "en-us",
Language.EN_GB: "en-gb",
Language.EN_AU: "en-au",
Language.ES: "es-es",
Language.ES_ES: "es-es",
Language.ES_MX: "es-mx",
Language.FR: "fr-fr",
Language.FR_FR: "fr-fr",
Language.FR_CA: "fr-ca",
Language.DE: "de-de",
Language.DE_DE: "de-de",
Language.IT: "it-it",
Language.PT: "pt-pt",
Language.PT_BR: "pt-br",
Language.PT_PT: "pt-pt",
Language.NL: "nl-nl",
Language.PL: "pl-pl",
Language.RU: "ru-ru",
Language.JA: "ja-jp",
Language.KO: "ko-kr",
Language.ZH: "zh-cn",
Language.ZH_CN: "zh-cn",
Language.ZH_TW: "zh-tw",
Language.AR: "ar-sa",
Language.HI: "hi-in",
Language.TR: "tr-tr",
Language.VI: "vi-vn",
Language.TH: "th-th",
Language.ID: "id-id",
Language.MS: "ms-my",
Language.SV: "sv-se",
Language.DA: "da-dk",
Language.NO: "no-no",
Language.FI: "fi-fi",
Language.CS: "cs-cz",
Language.EL: "el-gr",
Language.HE: "he-il",
Language.HU: "hu-hu",
Language.RO: "ro-ro",
Language.SK: "sk-sk",
Language.UK: "uk-ua",
Language.BG: "bg-bg",
Language.HR: "hr-hr",
Language.SR: "sr-rs",
Language.SL: "sl-si",
Language.CA: "ca-es",
Language.EU: "eu-es",
Language.GL: "gl-es",
Language.AF: "af-za",
Language.SW: "sw-ke",
Language.TA: "ta-in",
Language.TE: "te-in",
Language.BN: "bn-in",
Language.MR: "mr-in",
Language.GU: "gu-in",
Language.KN: "kn-in",
Language.ML: "ml-in",
Language.PA: "pa-in",
Language.UR: "ur-pk",
Language.FA: "fa-ir",
Language.TL: "tl-ph",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
def _get_aligned_audio(buffer: bytes) -> tuple[bytes, bytes]:
"""Split buffer into aligned audio (2-byte samples) and remainder.
Args:
buffer: Raw audio bytes to align.
Returns:
Tuple of (aligned audio bytes, remaining bytes).
"""
aligned_size = (len(buffer) // 2) * 2
return buffer[:aligned_size], buffer[aligned_size:]
class CambTTSService(TTSService):
"""Camb.ai MARS text-to-speech service using the official SDK.
Converts text to speech using Camb.ai's MARS TTS models with support for
multiple languages.
Models:
- mars-flash: Fast inference, 22.05kHz output (default)
- mars-pro: High quality, 48kHz output
Example::
# Basic usage with mars-flash (fast)
tts = CambTTSService(api_key="your-api-key", model="mars-flash")
# High quality with mars-pro
tts = CambTTSService(
api_key="your-api-key",
voice_id=12345,
model="mars-pro",
)
"""
class InputParams(BaseModel):
"""Input parameters for Camb.ai TTS configuration.
Parameters:
language: Language for synthesis (BCP-47 format). Defaults to English.
user_instructions: Custom instructions for mars-instruct model only.
Ignored for other models. Max 1000 characters.
"""
language: Optional[Language] = Language.EN
user_instructions: Optional[str] = Field(
default=None,
max_length=1000,
description="Custom instructions for mars-instruct model only. "
"Use to control tone, style, or pronunciation. Max 1000 characters.",
)
def __init__(
self,
*,
api_key: str,
voice_id: int = 147320,
model: str = "mars-flash",
timeout: float = 60.0,
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Camb.ai TTS service.
Args:
api_key: Camb.ai API key for authentication.
voice_id: Voice ID to use. Defaults to 147320.
model: TTS model to use. Options: "mars-flash" (fast), "mars-pro" (high quality).
Defaults to "mars-flash".
timeout: Request timeout in seconds. Defaults to 60.0 (minimum recommended
by Camb.ai).
sample_rate: Audio sample rate in Hz. If None, uses model-specific default.
params: Additional voice parameters. If None, uses defaults.
**kwargs: Additional arguments passed to parent TTSService.
"""
super().__init__(sample_rate=sample_rate, **kwargs)
params = params or CambTTSService.InputParams()
self._client = AsyncCambAI(api_key=api_key, timeout=timeout)
# Warn if sample rate doesn't match model's supported rate
if sample_rate and sample_rate != MODEL_SAMPLE_RATES.get(model):
logger.warning(
f"Camb.ai's {model} model only supports {MODEL_SAMPLE_RATES.get(model)}Hz "
f"sample rate. Current rate of {sample_rate}Hz may cause issues."
)
# Build settings
self._settings = {
"language": (
self.language_to_service_language(params.language) if params.language else "en-us"
),
"user_instructions": params.user_instructions,
}
self.set_model_name(model)
self.set_voice(str(voice_id))
self._voice_id = voice_id
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Camb.ai service supports metrics generation.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Camb.ai language format.
Args:
language: The language to convert.
Returns:
The Camb.ai-specific language code, or None if not supported.
"""
return language_to_camb_language(language)
async def start(self, frame: StartFrame):
"""Start the Camb.ai TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
# Use model-specific sample rate if not explicitly specified
if not self._init_sample_rate:
self._sample_rate = MODEL_SAMPLE_RATES.get(self.model_name, 22050)
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Camb.ai's TTS API.
Args:
text: The text to synthesize into speech (max 3000 characters).
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Validate text length
if len(text) > 3000:
logger.warning("Text too long for Camb.ai TTS (max 3000 chars), truncating")
text = text[:3000]
try:
await self.start_ttfb_metrics()
# Build SDK parameters
tts_kwargs: Dict[str, Any] = {
"text": text,
"voice_id": self._voice_id,
"language": self._settings["language"],
"speech_model": self.model_name,
"output_configuration": StreamTtsOutputConfiguration(format="pcm_s16le"),
}
# Add user instructions if using mars-instruct model
if self._model_name == "mars-instruct" and self._settings.get("user_instructions"):
tts_kwargs["user_instructions"] = self._settings["user_instructions"]
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
# Buffer for aligning chunks to 2-byte boundaries (16-bit PCM)
audio_buffer = b""
# Stream audio chunks from SDK
async for chunk in self._client.text_to_speech.tts(**tts_kwargs):
if chunk:
await self.stop_ttfb_metrics()
audio_buffer += chunk
# Only yield complete 16-bit samples (2 bytes per sample)
aligned_audio, audio_buffer = _get_aligned_audio(audio_buffer)
if aligned_audio:
yield TTSAudioRawFrame(
audio=aligned_audio,
sample_rate=self.sample_rate,
num_channels=1,
)
# Yield any remaining complete samples
if len(audio_buffer) >= 2:
aligned_audio, _ = _get_aligned_audio(audio_buffer)
if aligned_audio:
yield TTSAudioRawFrame(
audio=aligned_audio,
sample_rate=self.sample_rate,
num_channels=1,
)
except Exception as e:
yield ErrorFrame(error=f"Camb.ai TTS error: {e}")
finally:
yield TTSStoppedFrame()

View File

@@ -676,8 +676,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._handle_transcription(transcript, True, self._language)
await self.stop_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._call_event_handler("on_end_of_turn", transcript)
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):

View File

@@ -1710,11 +1710,26 @@ class GeminiLiveLLMService(LLMService):
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(text=text, aggregated_by=AggregationType.SENTENCE)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self._push_output_transcription_text_frames(text)
await self.push_frame(frame)
async def _push_output_transcription_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Gemini Live combines both LLM and TTS functionality, you
# might think we wouldn't need to push LLMTextFrames at all. However,
# RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_msg_grounding_metadata(self, message: LiveServerMessage):
"""Handle dedicated grounding metadata messages."""

View File

@@ -33,6 +33,7 @@ from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMSetToolsFrame,
LLMTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
TranscriptionFrame,
@@ -619,9 +620,26 @@ class GrokRealtimeLLMService(LLMService):
async def _handle_evt_audio_transcript_delta(self, evt):
"""Handle audio transcript delta event."""
if evt.delta:
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_output_transcript_text_frames(evt.delta)
async def _push_output_transcript_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Grok Realtime combines both LLM and TTS functionality,
# you might think we wouldn't need to push LLMTextFrames at all.
# However, RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Handle function call arguments done event."""
@@ -659,7 +677,7 @@ class GrokRealtimeLLMService(LLMService):
"""Handle speech stopped event from VAD."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame())
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _handle_evt_error(self, evt):
"""Handle error event."""
@@ -734,6 +752,14 @@ class GrokRealtimeLLMService(LLMService):
async def _send_user_audio(self, frame):
"""Send user audio to Grok."""
# Don't send audio if conversation setup is still pending, as it can
# lead to errors. For example: audio sent before conversation setup
# will be interpreted as having Grok's default sample rate (24000),
# and if that differs from the sample rate we eventually set through
# the conversation setup, Grok will error out.
if self._llm_needs_conversation_setup:
return
payload = base64.b64encode(frame.audio).decode("utf-8")
await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload))

View File

View File

@@ -0,0 +1,160 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""[Hathora-hosted](https://models.hathora.dev) speech-to-text services."""
import base64
import os
from typing import AsyncGenerator, Optional
import aiohttp
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TranscriptionFrame,
)
from pipecat.services.stt_service import SegmentedSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
from .utils import ConfigOption
class HathoraSTTService(SegmentedSTTService):
"""This service supports several different speech-to-text models hosted by Hathora.
[Documentation](https://models.hathora.dev)
"""
class InputParams(BaseModel):
"""Optional input parameters for Hathora STT configuration.
Parameters:
language: Language code (if supported by model).
config: Some models support additional config, refer to
[docs](https://models.hathora.dev) for each model to see
what is supported.
"""
language: Optional[str] = None
config: Optional[list[ConfigOption]] = None
def __init__(
self,
*,
model: str,
sample_rate: Optional[int] = None,
api_key: Optional[str] = None,
base_url: str = "https://api.models.hathora.dev/inference/v1/stt",
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Hathora STT service.
Args:
model: Model to use; find available models
[here](https://models.hathora.dev).
sample_rate: The sample rate for audio input. If None, will be determined
from the start frame.
api_key: API key for authentication with the Hathora service;
provision one [here](https://models.hathora.dev/tokens).
base_url: Base API URL for the Hathora STT service.
params: Configuration parameters.
**kwargs: Additional arguments passed to the parent class.
"""
super().__init__(
sample_rate=sample_rate,
**kwargs,
)
self._model = model
self._api_key = api_key or os.getenv("HATHORA_API_KEY")
self._base_url = base_url
params = params or HathoraSTTService.InputParams()
self._settings = {
"language": params.language,
"config": params.config,
}
self.set_model_name(model)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True
"""
return True
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Run speech-to-text on the provided audio data.
Args:
audio: Raw audio bytes to transcribe.
Yields:
Frame: Frames containing transcription results (typically TextFrame).
"""
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
url = f"{self._base_url}"
payload = {
"model": self._model,
}
if self._settings["language"] is not None:
payload["language"] = self._settings["language"]
if self._settings["config"] is not None:
payload["model_config"] = [
{"name": option.name, "value": option.value}
for option in self._settings["config"]
]
base64_audio = base64.b64encode(audio).decode("utf-8")
payload["audio"] = base64_audio
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers={"Authorization": f"Bearer {self._api_key}"},
json=payload,
) as resp:
response = await resp.json()
if response and "text" in response:
text = response["text"].strip()
if text: # Only yield non-empty text
# Hathora's API currently doesn't return language info
# so we default to the requested language or "en"
response_language = self._settings["language"] or "en"
await self._handle_transcription(text, True, response_language)
yield TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
Language(response_language),
result=response,
)
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -0,0 +1,173 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""[Hathora-hosted](https://models.hathora.dev) text-to-speech services."""
import io
import os
import wave
from typing import AsyncGenerator, Optional, Tuple
import aiohttp
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
from .utils import ConfigOption
def _decode_audio_payload(
audio_bytes: bytes,
*,
fallback_sample_rate: int = 24000,
fallback_channels: int = 1,
) -> Tuple[bytes, int, int]:
"""Convert a WAV/PCM payload into raw PCM samples for TTSAudioRawFrame."""
try:
with wave.open(io.BytesIO(audio_bytes), "rb") as wav_reader:
channels = wav_reader.getnchannels()
sample_rate = wav_reader.getframerate()
frames = wav_reader.readframes(wav_reader.getnframes())
return frames, sample_rate, channels
except (wave.Error, EOFError):
# If the payload is already raw PCM, just pass it through.
return audio_bytes, fallback_sample_rate, fallback_channels
class HathoraTTSService(TTSService):
"""This service supports several different text-to-speech models hosted by Hathora.
[Documentation](https://models.hathora.dev)
"""
class InputParams(BaseModel):
"""Optional input parameters for Hathora TTS configuration.
Parameters:
speed: Speech speed multiplier (if supported by model).
config: Some models support additional config, refer to
[docs](https://models.hathora.dev) for each model to see
what is supported.
"""
speed: Optional[float] = None
config: Optional[list[ConfigOption]] = None
def __init__(
self,
*,
model: str,
voice_id: Optional[str] = None,
sample_rate: Optional[int] = None,
api_key: Optional[str] = None,
base_url: str = "https://api.models.hathora.dev/inference/v1/tts",
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Hathora TTS service.
Args:
model: Model to use; find available models
[here](https://models.hathora.dev).
voice_id: Voice to use for synthesis (if supported by model).
sample_rate: Output sample rate for generated audio.
api_key: API key for authentication with the Hathora service;
provision one [here](https://models.hathora.dev/tokens).
base_url: Base API URL for the Hathora TTS service.
params: Configuration parameters.
**kwargs: Additional arguments passed to the parent class.
"""
super().__init__(
sample_rate=sample_rate,
**kwargs,
)
self._model = model
self._api_key = api_key or os.getenv("HATHORA_API_KEY")
self._base_url = base_url
params = params or HathoraTTSService.InputParams()
self._settings = {
"speed": params.speed,
"config": params.config,
}
self.set_model_name(model)
self.set_voice(voice_id)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Run text-to-speech synthesis on the provided text.
Args:
text: The text to synthesize into speech.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
url = f"{self._base_url}"
payload = {"model": self._model, "text": text}
if self._voice_id is not None:
payload["voice"] = self._voice_id
if self._settings["speed"] is not None:
payload["speed"] = self._settings["speed"]
if self._settings["config"] is not None:
payload["model_config"] = [
{"name": option.name, "value": option.value}
for option in self._settings["config"]
]
yield TTSStartedFrame()
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers={"Authorization": f"Bearer {self._api_key}"},
json=payload,
) as resp:
audio_data = await resp.read()
pcm_audio, sample_rate, num_channels = _decode_audio_payload(
audio_data,
fallback_sample_rate=self.sample_rate,
)
frame = TTSAudioRawFrame(
audio=pcm_audio,
sample_rate=self.sample_rate,
num_channels=num_channels,
)
yield frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
yield TTSStoppedFrame()

View File

@@ -0,0 +1,22 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Utilities and types for [Hathora-hosted](https://models.hathora.dev) voice services."""
from dataclasses import dataclass
@dataclass
class ConfigOption:
"""Extra configuration option passed into model_config for Hathora (if supported by model).
Args:
name: Name of the configuration option.
value: Value of the configuration option.
"""
name: str
value: str

View File

@@ -121,7 +121,6 @@ class Mem0MemoryService(FrameProcessor):
try:
logger.debug(f"Storing {len(messages)} messages in Mem0")
params = {
"async_mode": True,
"messages": messages,
"metadata": {"platform": "pipecat"},
"output_format": "v1.1",

View File

@@ -1002,17 +1002,14 @@ class TokenDetails(BaseModel):
image_tokens: Number of image tokens used (for input only).
"""
model_config = ConfigDict(extra="allow")
cached_tokens: Optional[int] = 0
text_tokens: Optional[int] = 0
audio_tokens: Optional[int] = 0
cached_tokens_details: Optional[CachedTokensDetails] = None
image_tokens: Optional[int] = 0
class Config:
"""Pydantic configuration for TokenDetails."""
extra = "allow"
class Usage(BaseModel):
"""Token usage statistics for a response.

View File

@@ -724,10 +724,26 @@ class OpenAIRealtimeLLMService(LLMService):
# We receive audio transcript deltas (as opposed to text deltas) when
# the output modality is "audio" (the default)
if evt.delta:
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_output_transcript_text_frames(evt.delta)
async def _push_output_transcript_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like OpenAI Realtime combines both LLM and TTS functionality,
# you might think we wouldn't need to push LLMTextFrames at all.
# However, RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Handle completion of function call arguments.
@@ -776,7 +792,7 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_speech_stopped(self, evt):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame())
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
"""Maybe handle an error event related to retrieving a conversation item.

View File

@@ -877,15 +877,12 @@ class TokenDetails(BaseModel):
audio_tokens: Number of audio tokens used. Defaults to 0.
"""
model_config = ConfigDict(extra="allow")
cached_tokens: Optional[int] = 0
text_tokens: Optional[int] = 0
audio_tokens: Optional[int] = 0
class Config:
"""Pydantic configuration for TokenDetails."""
extra = "allow"
class Usage(BaseModel):
"""Token usage statistics for a response.

View File

@@ -662,7 +662,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_speech_stopped(self, evt):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame())
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
"""Maybe handle an error event related to retrieving a conversation item.

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025 Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -11,6 +11,7 @@ input processing, including VAD, turn analysis, and interruption management.
"""
import asyncio
import time
from typing import Optional
from loguru import logger
@@ -77,6 +78,11 @@ class BaseInputTransport(FrameProcessor):
# Track user speaking state for interruption logic
self._user_speaking = False
# Last time a UserSpeakingFrame was pushed.
self._user_speaking_frame_time = 0
# How often a UserSpeakingFrame should be pushed (value should be
# greater than the audio chunks to have any effect).
self._user_speaking_frame_period = 0.2
# Task to process incoming audio (VAD) and push audio frames downstream
# if passthrough is enabled.
@@ -423,7 +429,7 @@ class BaseInputTransport(FrameProcessor):
await self._deprecated_run_turn_analyzer(frame, vad_state, previous_vad_state)
if vad_state == VADState.SPEAKING:
await self.broadcast_frame(UserSpeakingFrame)
await self._user_currently_speaking()
# Push audio downstream if passthrough is set.
if self._params.audio_in_passthrough:
@@ -444,6 +450,13 @@ class BaseInputTransport(FrameProcessor):
else:
await self.push_frame(VADUserStoppedSpeakingFrame())
async def _user_currently_speaking(self):
"""Handle user speaking frame."""
diff_time = time.time() - self._user_speaking_frame_time
if diff_time >= self._user_speaking_frame_period:
await self.broadcast_frame(UserSpeakingFrame)
self._user_speaking_frame_time = time.time()
#
# DEPRECATED.
#

View File

@@ -403,7 +403,7 @@ class BaseOutputTransport(FrameProcessor):
# Last time a BotSpeakingFrame was pushed.
self._bot_speaking_frame_time = 0
# How often a BotSpeakingFrame should be pushed (value should be
# lower than the audio chunks).
# greater than the audio chunks to have any effect).
self._bot_speaking_frame_period = 0.2
# Last time the bot actually spoke.
self._bot_speech_last_time = 0
@@ -644,8 +644,7 @@ class BaseOutputTransport(FrameProcessor):
diff_time = time.time() - self._bot_speaking_frame_time
if diff_time >= self._bot_speaking_frame_period:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
await self._transport.broadcast_frame(BotSpeakingFrame)
self._bot_speaking_frame_time = time.time()
self._bot_speech_last_time = time.time()

View File

@@ -10,11 +10,11 @@ Methods that wrap the Daily API to create rooms, check room URLs, and get meetin
"""
import time
from typing import Dict, List, Literal, Optional
from typing import Any, Dict, List, Literal, Optional
from urllib.parse import urlparse
import aiohttp
from pydantic import BaseModel, Field, ValidationError
from pydantic import BaseModel, ConfigDict, Field, ValidationError
class DailyRoomSipParams(BaseModel):
@@ -77,7 +77,7 @@ class TranscriptionBucketConfig(BaseModel):
allow_api_access: bool = False
class DailyRoomProperties(BaseModel, extra="allow"):
class DailyRoomProperties(BaseModel):
"""Properties for configuring a Daily room.
Reference: https://docs.daily.co/reference/rest-api/rooms/create-room#properties
@@ -100,6 +100,8 @@ class DailyRoomProperties(BaseModel, extra="allow"):
start_video_off: Whether video is off by default.
"""
model_config = ConfigDict(extra="allow")
exp: Optional[float] = None
enable_chat: bool = False
enable_prejoin_ui: bool = False
@@ -113,7 +115,7 @@ class DailyRoomProperties(BaseModel, extra="allow"):
recordings_bucket: Optional[RecordingsBucketConfig] = None
transcription_bucket: Optional[TranscriptionBucketConfig] = None
sip: Optional[DailyRoomSipParams] = None
sip_uri: Optional[dict] = None
sip_uri: Optional[Dict[str, Any]] = None
start_video_off: bool = False
@property
@@ -203,7 +205,7 @@ class DailyMeetingTokenProperties(BaseModel):
enable_recording: Optional[Literal["cloud", "local", "raw-tracks"]] = None
enable_prejoin_ui: Optional[bool] = None
start_cloud_recording: Optional[bool] = None
permissions: Optional[dict] = None
permissions: Optional[Dict[str, Any]] = None
class DailyMeetingTokenParams(BaseModel):

View File

@@ -50,6 +50,7 @@ class WebsocketClientParams(TransportParams):
"""
add_wav_header: bool = True
additional_headers: Optional[dict[str, str]] = None
serializer: Optional[FrameSerializer] = None
@@ -130,7 +131,11 @@ class WebsocketClientSession:
return
try:
self._websocket = await websocket_connect(uri=self._uri, open_timeout=10)
self._websocket = await websocket_connect(
uri=self._uri,
open_timeout=10,
additional_headers=self._params.additional_headers,
)
self._client_task = self.task_manager.create_task(
self._client_task_handler(),
f"{self._transport_name}::WebsocketClientSession::_client_task_handler",

View File

@@ -4,10 +4,21 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.turns.mute.always_user_mute_strategy import AlwaysUserMuteStrategy
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
from pipecat.turns.mute.first_speech_user_mute_strategy import FirstSpeechUserMuteStrategy
from pipecat.turns.mute.function_call_user_mute_strategy import FunctionCallUserMuteStrategy
from pipecat.turns.mute.mute_until_first_bot_complete_user_mute_strategy import (
import warnings
from pipecat.turns.user_mute.always_user_mute_strategy import AlwaysUserMuteStrategy
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
from pipecat.turns.user_mute.first_speech_user_mute_strategy import FirstSpeechUserMuteStrategy
from pipecat.turns.user_mute.function_call_user_mute_strategy import FunctionCallUserMuteStrategy
from pipecat.turns.user_mute.mute_until_first_bot_complete_user_mute_strategy import (
MuteUntilFirstBotCompleteUserMuteStrategy,
)
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.turns.mute are deprecated. "
"Please use the equivalent types from pipecat.turns.user_mute instead.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -0,0 +1,173 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""This module defines a controller for managing user idle detection."""
import asyncio
from typing import Optional
from pipecat.frames.frames import (
BotSpeakingFrame,
Frame,
FunctionCallResultFrame,
FunctionCallsStartedFrame,
UserSpeakingFrame,
UserStartedSpeakingFrame,
)
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
class UserIdleController(BaseObject):
"""Controller for managing user idle detection.
This class monitors user activity and triggers an event when the user has been
idle (not speaking) for a configured timeout period. It only starts monitoring
after the first conversation activity and does not trigger while the bot is
speaking or function calls are in progress.
The controller tracks activity using continuous frames (UserSpeakingFrame and
BotSpeakingFrame) which are emitted repeatedly while speaking is happening, and
state-based tracking for function calls (FunctionCallsStartedFrame and
FunctionCallResultFrame) which are only sent at start and end.
Event handlers available:
- on_user_turn_idle: Emitted when the user has been idle for the timeout period.
Example::
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
# Handle user idle - send reminder, prompt, etc.
...
"""
def __init__(
self,
*,
user_idle_timeout: float,
):
"""Initialize the user idle controller.
Args:
user_idle_timeout: Timeout in seconds before considering the user idle.
"""
super().__init__()
self._user_idle_timeout = user_idle_timeout
self._task_manager: Optional[BaseTaskManager] = None
self._conversation_started = False
self._function_call_in_progress = False
self.user_idle_event = asyncio.Event()
self.user_idle_task: Optional[asyncio.Task] = None
self._register_event_handler("on_user_turn_idle", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
"""Returns the configured task manager."""
if not self._task_manager:
raise RuntimeError(f"{self} user idle controller was not properly setup")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the controller with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
if not self.user_idle_task:
self.user_idle_task = self.task_manager.create_task(
self.user_idle_task_handler(),
f"{self}::user_idle_task_handler",
)
async def cleanup(self):
"""Cleanup the controller."""
await super().cleanup()
if self.user_idle_task:
await self.task_manager.cancel_task(self.user_idle_task)
self.user_idle_task = None
async def process_frame(self, frame: Frame):
"""Process an incoming frame to track user activity state.
Args:
frame: The frame to be processed.
"""
# Start monitoring on first conversation activity
if not self._conversation_started:
if isinstance(frame, (UserStartedSpeakingFrame, BotSpeakingFrame)):
self._conversation_started = True
self.user_idle_event.set()
else:
return
# Reset idle timer on continuous activity frames
if isinstance(frame, (UserSpeakingFrame, BotSpeakingFrame)):
await self._handle_activity(frame)
# Track function call state (start/end frames, not continuous)
elif isinstance(frame, FunctionCallsStartedFrame):
await self._handle_function_calls_started(frame)
elif isinstance(frame, FunctionCallResultFrame):
await self._handle_function_call_result(frame)
async def _handle_activity(self, _: UserSpeakingFrame | BotSpeakingFrame):
"""Handle continuous activity frames that should reset the idle timer.
These frames are emitted continuously while the user or bot is speaking,
so we simply reset the timer whenever we receive them.
Args:
frame: The activity frame to process.
"""
self.user_idle_event.set()
async def _handle_function_calls_started(self, _: FunctionCallsStartedFrame):
"""Handle function calls started event.
Function calls can take longer than the timeout, so we track their state
to prevent idle callbacks while they're in progress.
Args:
frame: The FunctionCallsStartedFrame to process.
"""
self._function_call_in_progress = True
self.user_idle_event.set()
async def _handle_function_call_result(self, _: FunctionCallResultFrame):
"""Handle function call result event.
Args:
frame: The FunctionCallResultFrame to process.
"""
self._function_call_in_progress = False
self.user_idle_event.set()
async def user_idle_task_handler(self):
"""Monitors for idle timeout and triggers events.
Runs in a loop until cancelled. The idle timer is reset whenever activity
frames are received (UserSpeakingFrame or BotSpeakingFrame). Function calls
are tracked via state since they only send start/end frames. If no activity
is detected for the configured timeout period and no function call is in
progress, the on_user_turn_idle event is triggered.
"""
while True:
try:
await asyncio.wait_for(self.user_idle_event.wait(), timeout=self._user_idle_timeout)
self.user_idle_event.clear()
except asyncio.TimeoutError:
# Only trigger if conversation has started and no function call is in progress
if self._conversation_started and not self._function_call_in_progress:
await self._call_event_handler("on_user_turn_idle")

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from .always_user_mute_strategy import AlwaysUserMuteStrategy
from .base_user_mute_strategy import BaseUserMuteStrategy
from .first_speech_user_mute_strategy import FirstSpeechUserMuteStrategy
from .function_call_user_mute_strategy import FunctionCallUserMuteStrategy
from .mute_until_first_bot_complete_user_mute_strategy import (
MuteUntilFirstBotCompleteUserMuteStrategy,
)
__all__ = [
"AlwaysUserMuteStrategy",
"BaseUserMuteStrategy",
"FirstSpeechUserMuteStrategy",
"FunctionCallUserMuteStrategy",
"MuteUntilFirstBotCompleteUserMuteStrategy",
]

View File

@@ -7,7 +7,7 @@
"""User mute strategy that always mutes the user while the bot is speaking."""
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
class AlwaysUserMuteStrategy(BaseUserMuteStrategy):

View File

@@ -7,7 +7,7 @@
"""User mute strategy that mutes the user only during the bots first speech."""
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
class FirstSpeechUserMuteStrategy(BaseUserMuteStrategy):

View File

@@ -14,7 +14,7 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
FunctionCallsStartedFrame,
)
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
class FunctionCallUserMuteStrategy(BaseUserMuteStrategy):

View File

@@ -7,7 +7,7 @@
"""User mute strategy that mutes the user until the bot completes its first speech."""
from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
class MuteUntilFirstBotCompleteUserMuteStrategy(BaseUserMuteStrategy):

View File

@@ -4,15 +4,17 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.turns.user_start.base_user_turn_start_strategy import (
BaseUserTurnStartStrategy,
UserTurnStartedParams,
)
from pipecat.turns.user_start.external_user_turn_start_strategy import ExternalUserTurnStartStrategy
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
MinWordsUserTurnStartStrategy,
)
from pipecat.turns.user_start.transcription_user_turn_start_strategy import (
TranscriptionUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import VADUserTurnStartStrategy
from .base_user_turn_start_strategy import BaseUserTurnStartStrategy, UserTurnStartedParams
from .external_user_turn_start_strategy import ExternalUserTurnStartStrategy
from .min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy
from .transcription_user_turn_start_strategy import TranscriptionUserTurnStartStrategy
from .vad_user_turn_start_strategy import VADUserTurnStartStrategy
__all__ = [
"BaseUserTurnStartStrategy",
"ExternalUserTurnStartStrategy",
"MinWordsUserTurnStartStrategy",
"TranscriptionUserTurnStartStrategy",
"UserTurnStartedParams",
"VADUserTurnStartStrategy",
]

View File

@@ -41,12 +41,10 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
self._min_words = min_words
self._use_interim = use_interim
self._bot_speaking = False
self._text = ""
async def reset(self):
"""Reset the strategy to its initial state."""
await super().reset()
self._text = ""
self._bot_speaking = False
async def process_frame(self, frame: Frame):
@@ -67,7 +65,7 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
elif isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
await self._handle_interim_transcription(frame)
await self._handle_transcription(frame)
async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame):
"""Handle bot started speaking frame.
@@ -89,41 +87,21 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""
self._bot_speaking = False
async def _handle_transcription(self, frame: TranscriptionFrame):
async def _handle_transcription(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
"""Handle a completed transcription frame and check word count.
Args:
frame: The transcription frame to be processed.
"""
self._text += frame.text
min_words = self._min_words if self._bot_speaking else 1
word_count = len(self._text.split())
should_trigger = word_count >= min_words
logger.debug(
f"{self} should_trigger={should_trigger} num_spoken_words={word_count} "
f"min_words={min_words} bot_speaking={self._bot_speaking}"
)
if should_trigger:
await self.trigger_user_turn_started()
async def _handle_interim_transcription(self, frame: InterimTranscriptionFrame):
"""Handle an interim transcription frame and check word count.
Args:
frame: The interim transcription frame to be processed.
"""
min_words = self._min_words if self._bot_speaking else 1
word_count = len(frame.text.split())
should_trigger = word_count >= min_words
is_interim = isinstance(frame, InterimTranscriptionFrame)
logger.debug(
f"{self} interim=True should_trigger={should_trigger} num_spoken_words={word_count} "
f"min_words={min_words} bot_speaking={self._bot_speaking}"
f"{self} should_trigger={should_trigger} num_spoken_words={word_count} "
f"min_words={min_words} bot_speaking={self._bot_speaking} interim_transcription={is_interim}"
)
if should_trigger:

View File

@@ -4,14 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.turns.user_stop.base_user_turn_stop_strategy import (
BaseUserTurnStopStrategy,
UserTurnStoppedParams,
)
from pipecat.turns.user_stop.external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
from pipecat.turns.user_stop.transcription_user_turn_stop_strategy import (
TranscriptionUserTurnStopStrategy,
)
from pipecat.turns.user_stop.turn_analyzer_user_turn_stop_strategy import (
TurnAnalyzerUserTurnStopStrategy,
)
from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams
from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
from .transcription_user_turn_stop_strategy import TranscriptionUserTurnStopStrategy
from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy
__all__ = [
"BaseUserTurnStopStrategy",
"ExternalUserTurnStopStrategy",
"UserTurnStoppedParams",
"TranscriptionUserTurnStopStrategy",
"TurnAnalyzerUserTurnStopStrategy",
]

View File

@@ -12,6 +12,8 @@ from typing import Optional, Type
from pipecat.frames.frames import (
Frame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
@@ -80,7 +82,7 @@ class UserTurnController(BaseObject):
self._task_manager: Optional[BaseTaskManager] = None
self._vad_user_speaking = False
self._user_speaking = False
self._user_turn = False
self._user_turn_stop_timeout_event = asyncio.Event()
@@ -146,7 +148,11 @@ class UserTurnController(BaseObject):
frame: The frame to be processed.
"""
if isinstance(frame, VADUserStartedSpeakingFrame):
if isinstance(frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(frame)
elif isinstance(frame, UserStoppedSpeakingFrame):
await self._handle_user_stopped_speaking(frame)
elif isinstance(frame, VADUserStartedSpeakingFrame):
await self._handle_vad_user_started_speaking(frame)
elif isinstance(frame, VADUserStoppedSpeakingFrame):
await self._handle_vad_user_stopped_speaking(frame)
@@ -179,14 +185,26 @@ class UserTurnController(BaseObject):
for s in self._user_turn_strategies.stop or []:
await s.cleanup()
async def _handle_user_started_speaking(self, frame: UserStartedSpeakingFrame):
self._user_speaking = True
# The user started talking, let's reset the user turn timeout.
self._user_turn_stop_timeout_event.set()
async def _handle_user_stopped_speaking(self, frame: UserStoppedSpeakingFrame):
self._user_speaking = False
# The user stopped talking, let's reset the user turn timeout.
self._user_turn_stop_timeout_event.set()
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
self._vad_user_speaking = True
self._user_speaking = True
# The user started talking, let's reset the user turn timeout.
self._user_turn_stop_timeout_event.set()
async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame):
self._vad_user_speaking = False
self._user_speaking = False
# The user stopped talking, let's reset the user turn timeout.
self._user_turn_stop_timeout_event.set()
@@ -233,6 +251,10 @@ class UserTurnController(BaseObject):
self._user_turn = True
self._user_turn_stop_timeout_event.set()
# Reset all user turn start strategies to start fresh.
for s in self._user_turn_strategies.start or []:
await s.reset()
await self._call_event_handler("on_user_turn_started", strategy, params)
async def _trigger_user_turn_stop(
@@ -260,7 +282,7 @@ class UserTurnController(BaseObject):
)
self._user_turn_stop_timeout_event.clear()
except asyncio.TimeoutError:
if self._user_turn and not self._vad_user_speaking:
if self._user_turn and not self._user_speaking:
await self._call_event_handler("on_user_turn_stop_timeout")
await self._trigger_user_turn_stop(
None, UserTurnStoppedParams(enable_user_speaking_frames=True)

View File

@@ -19,6 +19,7 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_controller import UserTurnController
@@ -38,6 +39,7 @@ class UserTurnProcessor(FrameProcessor):
- on_user_turn_started: Emitted when a user turn starts.
- on_user_turn_stopped: Emitted when a user turn stops.
- on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout.
- on_user_turn_idle: Emitted when the user has been idle for the configured timeout.
Example::
@@ -53,6 +55,10 @@ class UserTurnProcessor(FrameProcessor):
async def on_user_turn_stop_timeout(processor):
...
@processor.event_handler("on_user_turn_idle")
async def on_user_turn_idle(processor):
...
"""
def __init__(
@@ -60,6 +66,7 @@ class UserTurnProcessor(FrameProcessor):
*,
user_turn_strategies: Optional[UserTurnStrategies] = None,
user_turn_stop_timeout: float = 5.0,
user_idle_timeout: Optional[float] = None,
**kwargs,
):
"""Initialize the user turn processor.
@@ -68,6 +75,10 @@ class UserTurnProcessor(FrameProcessor):
user_turn_strategies: Configured strategies for starting and stopping user turns.
user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn
if no activity is detected.
user_idle_timeout: Optional timeout in seconds for detecting user idle state.
If set, the processor will emit an `on_user_turn_idle` event when the user
has been idle (not speaking) for this duration. Set to None to disable
idle detection.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
@@ -75,6 +86,7 @@ class UserTurnProcessor(FrameProcessor):
self._register_event_handler("on_user_turn_started")
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._user_turn_controller = UserTurnController(
user_turn_strategies=user_turn_strategies or UserTurnStrategies(),
@@ -92,6 +104,14 @@ class UserTurnProcessor(FrameProcessor):
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
)
# Optional user idle controller
self._user_idle_controller: Optional[UserIdleController] = None
if user_idle_timeout:
self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout)
self._user_idle_controller.add_event_handler(
"on_user_turn_idle", self._on_user_turn_idle
)
async def cleanup(self):
"""Clean up processor resources."""
await super().cleanup()
@@ -129,9 +149,15 @@ class UserTurnProcessor(FrameProcessor):
await self._user_turn_controller.process_frame(frame)
if self._user_idle_controller:
await self._user_idle_controller.process_frame(frame)
async def _start(self, frame: StartFrame):
await self._user_turn_controller.setup(self.task_manager)
if self._user_idle_controller:
await self._user_idle_controller.setup(self.task_manager)
async def _stop(self, frame: EndFrame):
await self._cleanup()
@@ -141,6 +167,9 @@ class UserTurnProcessor(FrameProcessor):
async def _cleanup(self):
await self._user_turn_controller.cleanup()
if self._user_idle_controller:
await self._user_idle_controller.cleanup()
async def _on_push_frame(
self, controller, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
@@ -180,3 +209,6 @@ class UserTurnProcessor(FrameProcessor):
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")
async def _on_user_turn_idle(self, controller):
await self._call_event_handler("on_user_turn_idle")

View File

@@ -16,12 +16,15 @@ import inspect
import traceback
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar
from loguru import logger
from pipecat.utils.utils import obj_count, obj_id
# TypeVar for preserving function signatures in decorators
F = TypeVar("F", bound=Callable[..., Any])
@dataclass
class EventHandler:
@@ -99,7 +102,7 @@ class BaseObject(ABC):
logger.debug(f"{self}: waiting on event handlers to finish {list(event_names)}...")
await asyncio.wait(tasks)
def event_handler(self, event_name: str):
def event_handler(self, event_name: str) -> Callable[[F], F]:
"""Decorator for registering event handlers.
Args:
@@ -109,7 +112,7 @@ class BaseObject(ABC):
The decorator function that registers the handler.
"""
def decorator(handler):
def decorator(handler: F) -> F:
self.add_event_handler(event_name, handler)
return handler

View File

@@ -40,7 +40,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMUserAggregatorParams,
)
from pipecat.tests.utils import SleepFrame, run_test
from pipecat.turns.mute import FirstSpeechUserMuteStrategy, FunctionCallUserMuteStrategy
from pipecat.turns.user_mute import FirstSpeechUserMuteStrategy, FunctionCallUserMuteStrategy
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025 Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025 Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -9,7 +9,7 @@ import os
import sys
import tempfile
import unittest
from unittest.mock import AsyncMock, MagicMock, Mock, patch
from unittest.mock import MagicMock, patch
import numpy as np

View File

@@ -0,0 +1,216 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.frames.frames import (
BotSpeakingFrame,
FunctionCallResultFrame,
FunctionCallsStartedFrame,
UserSpeakingFrame,
UserStartedSpeakingFrame,
)
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
USER_IDLE_TIMEOUT = 0.2
class TestUserIdleController(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.task_manager = TaskManager()
self.task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
async def test_basic_idle_detection(self):
"""Test that idle event is triggered after timeout when no activity."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_triggered = False
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_triggered
idle_triggered = True
# Start conversation
await controller.process_frame(UserStartedSpeakingFrame())
# Wait for idle timeout
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
self.assertTrue(idle_triggered)
await controller.cleanup()
async def test_user_speaking_resets_idle_timer(self):
"""Test that continuous UserSpeakingFrame frames reset the idle timer."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_triggered = False
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_triggered
idle_triggered = True
# Start conversation
await controller.process_frame(UserStartedSpeakingFrame())
# Send UserSpeakingFrame continuously to reset timer
for _ in range(5):
await asyncio.sleep(USER_IDLE_TIMEOUT * 0.5) # 50% of timeout period
await controller.process_frame(UserSpeakingFrame())
self.assertFalse(idle_triggered)
await controller.cleanup()
async def test_bot_speaking_resets_idle_timer(self):
"""Test that BotSpeakingFrame frames reset the idle timer."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_triggered = False
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_triggered
idle_triggered = True
# Start conversation
await controller.process_frame(UserStartedSpeakingFrame())
# Bot speaking should reset timer
for _ in range(5):
await asyncio.sleep(USER_IDLE_TIMEOUT * 0.6) # 60% of timeout
await controller.process_frame(BotSpeakingFrame())
self.assertFalse(idle_triggered)
await controller.cleanup()
async def test_function_call_prevents_idle(self):
"""Test that function calls in progress prevent idle event."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_triggered = False
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_triggered
idle_triggered = True
# Start conversation
await controller.process_frame(UserStartedSpeakingFrame())
# Start function call
await controller.process_frame(FunctionCallsStartedFrame(function_calls=[]))
# Wait longer than idle timeout
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
# Should not trigger idle because function call is in progress
self.assertFalse(idle_triggered)
# Complete function call
await controller.process_frame(
FunctionCallResultFrame(
function_name="test",
tool_call_id="123",
arguments={},
result=None,
run_llm=False,
)
)
# Now idle should trigger
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
self.assertTrue(idle_triggered)
await controller.cleanup()
async def test_no_idle_before_conversation_starts(self):
"""Test that idle monitoring doesn't start before first conversation activity."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_triggered = False
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_triggered
idle_triggered = True
# Wait without starting conversation
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
self.assertFalse(idle_triggered)
await controller.cleanup()
async def test_idle_starts_with_bot_speaking(self):
"""Test that monitoring starts with BotSpeakingFrame, not just user speech."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_triggered = False
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_triggered
idle_triggered = True
# Start conversation with bot speaking
await controller.process_frame(BotSpeakingFrame())
# Wait for idle timeout
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
self.assertTrue(idle_triggered)
await controller.cleanup()
async def test_multiple_idle_events(self):
"""Test that idle event can trigger multiple times."""
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
await controller.setup(self.task_manager)
idle_count = 0
@controller.event_handler("on_user_turn_idle")
async def on_user_turn_idle(controller):
nonlocal idle_count
idle_count += 1
# Start conversation
await controller.process_frame(UserStartedSpeakingFrame())
# First idle
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
first_count = idle_count
self.assertGreaterEqual(first_count, 1)
# Second idle
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
second_count = idle_count
self.assertGreater(second_count, first_count)
# User activity resets timer
await controller.process_frame(UserSpeakingFrame())
# Give a moment for the timer to reset
await asyncio.sleep(0.1)
# Third idle
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
third_count = idle_count
self.assertGreater(third_count, second_count)
await controller.cleanup()

View File

@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
FunctionCallsStartedFrame,
InterruptionFrame,
)
from pipecat.turns.mute import (
from pipecat.turns.user_mute import (
AlwaysUserMuteStrategy,
FirstSpeechUserMuteStrategy,
FunctionCallUserMuteStrategy,

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2026 Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -8,12 +8,18 @@ import asyncio
import unittest
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
MinWordsUserTurnStartStrategy,
)
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
USER_TURN_STOP_TIMEOUT = 0.2
@@ -56,6 +62,44 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
self.assertTrue(should_start)
self.assertTrue(should_stop)
async def test_user_turn_start_reset(self):
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
start=[MinWordsUserTurnStartStrategy(min_words=3)]
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
should_start = 0
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal should_start
should_start += 1
await controller.process_frame(BotStartedSpeakingFrame())
await controller.process_frame(TranscriptionFrame(text="One", user_id="cat", timestamp=""))
self.assertEqual(should_start, 0)
await controller.process_frame(
TranscriptionFrame(text="One two three!", user_id="cat", timestamp="")
)
self.assertEqual(should_start, 1)
# Trigger user stop turn so we can trigger user start turn again.
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
await controller.process_frame(BotStartedSpeakingFrame())
await controller.process_frame(TranscriptionFrame(text="Hi!", user_id="cat", timestamp=""))
self.assertEqual(should_start, 1)
await controller.process_frame(
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
)
self.assertEqual(should_start, 2)
async def test_user_turn_stop_timeout_no_transcription(self):
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(),
@@ -96,3 +140,53 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertTrue(timeout)
async def test_external_user_turn_strategies_no_timeout_while_speaking(self):
"""Test that timeout does not trigger when user is still speaking with external strategies."""
controller = UserTurnController(
user_turn_strategies=ExternalUserTurnStrategies(),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
should_start = None
should_stop = None
timeout = None
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal should_start
should_start = True
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal should_stop
should_stop = True
@controller.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(controller):
nonlocal timeout
timeout = True
# Simulate external service (like Deepgram Flux) broadcasting UserStartedSpeakingFrame
await controller.process_frame(UserStartedSpeakingFrame())
self.assertTrue(should_start)
self.assertFalse(should_stop)
self.assertFalse(timeout)
# User is still speaking, timeout should not trigger
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
self.assertTrue(should_start)
self.assertFalse(should_stop)
self.assertFalse(timeout)
# Now external service broadcasts UserStoppedSpeakingFrame
await controller.process_frame(UserStoppedSpeakingFrame())
# But no transcription, so timeout should trigger
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertTrue(timeout)

View File

@@ -38,7 +38,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
self.assertFalse(should_start)
await strategy.process_frame(
TranscriptionFrame(text=" there!", user_id="cat", timestamp="")
TranscriptionFrame(text="Hello there!", user_id="cat", timestamp="")
)
self.assertTrue(should_start)
@@ -55,6 +55,26 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
)
self.assertTrue(should_start)
async def test_bot_speaking_singlw_words(self):
strategy = MinWordsUserTurnStartStrategy(min_words=3)
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy, params):
nonlocal should_start
should_start = True
await strategy.process_frame(BotStartedSpeakingFrame())
await strategy.process_frame(TranscriptionFrame(text="One", user_id="cat", timestamp=""))
self.assertFalse(should_start)
await strategy.process_frame(TranscriptionFrame(text="Two", user_id="cat", timestamp=""))
self.assertFalse(should_start)
await strategy.process_frame(TranscriptionFrame(text="Three", user_id="cat", timestamp=""))
self.assertFalse(should_start)
async def test_bot_speaking_interim_transcriptions(self):
strategy = MinWordsUserTurnStartStrategy(min_words=2)

47
uv.lock generated
View File

@@ -629,6 +629,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/84/c2/80633736cd183ee4a62107413def345f7e6e3c01563dbca1417363cf957e/build-1.2.2.post1-py3-none-any.whl", hash = "sha256:1d61c0887fa860c01971625baae8bdd338e517b836a2f70dd1f7aa3a6b2fc5b5", size = 22950, upload-time = "2024-10-06T17:22:23.299Z" },
]
[[package]]
name = "camb-sdk"
version = "1.5.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
{ name = "pydantic" },
{ name = "typing-extensions" },
{ name = "websocket-client" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/68/44/b4161f5da381f1b0d35a1abc7d69588ff24af6dacc9f13bccfb6f9889c46/camb_sdk-1.5.4.tar.gz", hash = "sha256:e882fb1b32aa45243ead5a558fed4e4e7ff8542af9f1802565a5fb4b7114fb5c", size = 83074, upload-time = "2026-01-12T20:19:34.318Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d6/c9/f718b6028d6e457391d1a6a8f93ae6073055731d77279ef097734b84bdcb/camb_sdk-1.5.4-py3-none-any.whl", hash = "sha256:b9ad43fddc0d749dd522839a06136df89d109510f57e68102414a8c0078e496a", size = 152143, upload-time = "2026-01-12T20:19:31.769Z" },
]
[[package]]
name = "cartesia"
version = "2.0.17"
@@ -1430,7 +1446,7 @@ wheels = [
[[package]]
name = "fastapi"
version = "0.121.3"
version = "0.127.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "annotated-doc" },
@@ -1438,9 +1454,9 @@ dependencies = [
{ name = "starlette" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/80/f0/086c442c6516195786131b8ca70488c6ef11d2f2e33c9a893576b2b0d3f7/fastapi-0.121.3.tar.gz", hash = "sha256:0055bc24fe53e56a40e9e0ad1ae2baa81622c406e548e501e717634e2dfbc40b", size = 344501, upload-time = "2025-11-19T16:53:39.243Z" }
sdist = { url = "https://files.pythonhosted.org/packages/96/8a/6b9ba6eb8ff3817caae83120495965d9e70afb4d6348cb120e464ee199f4/fastapi-0.127.1.tar.gz", hash = "sha256:946a87ee5d931883b562b6bada787d6c8178becee2683cb3f9b980d593206359", size = 391876, upload-time = "2025-12-26T13:04:47.075Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/b6/4f620d7720fc0a754c8c1b7501d73777f6ba43b57c8ab99671f4d7441eb8/fastapi-0.121.3-py3-none-any.whl", hash = "sha256:0c78fc87587fcd910ca1bbf5bc8ba37b80e119b388a7206b39f0ecc95ebf53e9", size = 109801, upload-time = "2025-11-19T16:53:37.918Z" },
{ url = "https://files.pythonhosted.org/packages/d2/f3/a6858d147ed2645c095d11dc2440f94a5f1cd8f4df888e3377e6b5281a0f/fastapi-0.127.1-py3-none-any.whl", hash = "sha256:31d670a4f9373cc6d7994420f98e4dc46ea693145207abc39696746c83a44430", size = 112332, upload-time = "2025-12-26T13:04:45.329Z" },
]
[package.optional-dependencies]
@@ -1997,6 +2013,7 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/32/6a/33d1702184d94106d3cdd7bfb788e19723206fce152e303473ca3b946c7b/greenlet-3.3.0-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:6f8496d434d5cb2dce025773ba5597f71f5410ae499d5dd9533e0653258cdb3d", size = 273658, upload-time = "2025-12-04T14:23:37.494Z" },
{ url = "https://files.pythonhosted.org/packages/d6/b7/2b5805bbf1907c26e434f4e448cd8b696a0b71725204fa21a211ff0c04a7/greenlet-3.3.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b96dc7eef78fd404e022e165ec55327f935b9b52ff355b067eb4a0267fc1cffb", size = 574810, upload-time = "2025-12-04T14:50:04.154Z" },
{ url = "https://files.pythonhosted.org/packages/94/38/343242ec12eddf3d8458c73f555c084359883d4ddc674240d9e61ec51fd6/greenlet-3.3.0-cp310-cp310-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:73631cd5cccbcfe63e3f9492aaa664d278fda0ce5c3d43aeda8e77317e38efbd", size = 586248, upload-time = "2025-12-04T14:57:39.35Z" },
{ url = "https://files.pythonhosted.org/packages/f0/d0/0ae86792fb212e4384041e0ef8e7bc66f59a54912ce407d26a966ed2914d/greenlet-3.3.0-cp310-cp310-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:b299a0cb979f5d7197442dccc3aee67fce53500cd88951b7e6c35575701c980b", size = 597403, upload-time = "2025-12-04T15:07:10.831Z" },
{ url = "https://files.pythonhosted.org/packages/b6/a8/15d0aa26c0036a15d2659175af00954aaaa5d0d66ba538345bd88013b4d7/greenlet-3.3.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7dee147740789a4632cace364816046e43310b59ff8fb79833ab043aefa72fd5", size = 586910, upload-time = "2025-12-04T14:25:59.705Z" },
{ url = "https://files.pythonhosted.org/packages/e1/9b/68d5e3b7ccaba3907e5532cf8b9bf16f9ef5056a008f195a367db0ff32db/greenlet-3.3.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:39b28e339fc3c348427560494e28d8a6f3561c8d2bcf7d706e1c624ed8d822b9", size = 1547206, upload-time = "2025-12-04T15:04:21.027Z" },
{ url = "https://files.pythonhosted.org/packages/66/bd/e3086ccedc61e49f91e2cfb5ffad9d8d62e5dc85e512a6200f096875b60c/greenlet-3.3.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:b3c374782c2935cc63b2a27ba8708471de4ad1abaa862ffdb1ef45a643ddbb7d", size = 1613359, upload-time = "2025-12-04T14:27:26.548Z" },
@@ -2004,6 +2021,7 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1f/cb/48e964c452ca2b92175a9b2dca037a553036cb053ba69e284650ce755f13/greenlet-3.3.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:e29f3018580e8412d6aaf5641bb7745d38c85228dacf51a73bd4e26ddf2a6a8e", size = 274908, upload-time = "2025-12-04T14:23:26.435Z" },
{ url = "https://files.pythonhosted.org/packages/28/da/38d7bff4d0277b594ec557f479d65272a893f1f2a716cad91efeb8680953/greenlet-3.3.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a687205fb22794e838f947e2194c0566d3812966b41c78709554aa883183fb62", size = 577113, upload-time = "2025-12-04T14:50:05.493Z" },
{ url = "https://files.pythonhosted.org/packages/3c/f2/89c5eb0faddc3ff014f1c04467d67dee0d1d334ab81fadbf3744847f8a8a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:4243050a88ba61842186cb9e63c7dfa677ec146160b0efd73b855a3d9c7fcf32", size = 590338, upload-time = "2025-12-04T14:57:41.136Z" },
{ url = "https://files.pythonhosted.org/packages/80/d7/db0a5085035d05134f8c089643da2b44cc9b80647c39e93129c5ef170d8f/greenlet-3.3.0-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:670d0f94cd302d81796e37299bcd04b95d62403883b24225c6b5271466612f45", size = 601098, upload-time = "2025-12-04T15:07:11.898Z" },
{ url = "https://files.pythonhosted.org/packages/dc/a6/e959a127b630a58e23529972dbc868c107f9d583b5a9f878fb858c46bc1a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6cb3a8ec3db4a3b0eb8a3c25436c2d49e3505821802074969db017b87bc6a948", size = 590206, upload-time = "2025-12-04T14:26:01.254Z" },
{ url = "https://files.pythonhosted.org/packages/48/60/29035719feb91798693023608447283b266b12efc576ed013dd9442364bb/greenlet-3.3.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:2de5a0b09eab81fc6a382791b995b1ccf2b172a9fec934747a7a23d2ff291794", size = 1550668, upload-time = "2025-12-04T15:04:22.439Z" },
{ url = "https://files.pythonhosted.org/packages/0a/5f/783a23754b691bfa86bd72c3033aa107490deac9b2ef190837b860996c9f/greenlet-3.3.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4449a736606bd30f27f8e1ff4678ee193bc47f6ca810d705981cfffd6ce0d8c5", size = 1615483, upload-time = "2025-12-04T14:27:28.083Z" },
@@ -2011,6 +2029,7 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" },
{ url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" },
{ url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" },
{ url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" },
{ url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" },
{ url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" },
{ url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" },
@@ -2018,6 +2037,7 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" },
{ url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" },
{ url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" },
{ url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" },
{ url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" },
{ url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" },
{ url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" },
@@ -2025,6 +2045,7 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d7/7c/f0a6d0ede2c7bf092d00bc83ad5bafb7e6ec9b4aab2fbdfa6f134dc73327/greenlet-3.3.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:60c2ef0f578afb3c8d92ea07ad327f9a062547137afe91f38408f08aacab667f", size = 275671, upload-time = "2025-12-04T14:23:05.267Z" },
{ url = "https://files.pythonhosted.org/packages/44/06/dac639ae1a50f5969d82d2e3dd9767d30d6dbdbab0e1a54010c8fe90263c/greenlet-3.3.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a5d554d0712ba1de0a6c94c640f7aeba3f85b3a6e1f2899c11c2c0428da9365", size = 646360, upload-time = "2025-12-04T14:50:10.026Z" },
{ url = "https://files.pythonhosted.org/packages/e0/94/0fb76fe6c5369fba9bf98529ada6f4c3a1adf19e406a47332245ef0eb357/greenlet-3.3.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:3a898b1e9c5f7307ebbde4102908e6cbfcb9ea16284a3abe15cab996bee8b9b3", size = 658160, upload-time = "2025-12-04T14:57:45.41Z" },
{ url = "https://files.pythonhosted.org/packages/93/79/d2c70cae6e823fac36c3bbc9077962105052b7ef81db2f01ec3b9bf17e2b/greenlet-3.3.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:dcd2bdbd444ff340e8d6bdf54d2f206ccddbb3ccfdcd3c25bf4afaa7b8f0cf45", size = 671388, upload-time = "2025-12-04T15:07:15.789Z" },
{ url = "https://files.pythonhosted.org/packages/b8/14/bab308fc2c1b5228c3224ec2bf928ce2e4d21d8046c161e44a2012b5203e/greenlet-3.3.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5773edda4dc00e173820722711d043799d3adb4f01731f40619e07ea2750b955", size = 660166, upload-time = "2025-12-04T14:26:05.099Z" },
{ url = "https://files.pythonhosted.org/packages/4b/d2/91465d39164eaa0085177f61983d80ffe746c5a1860f009811d498e7259c/greenlet-3.3.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ac0549373982b36d5fd5d30beb8a7a33ee541ff98d2b502714a09f1169f31b55", size = 1615193, upload-time = "2025-12-04T15:04:27.041Z" },
{ url = "https://files.pythonhosted.org/packages/42/1b/83d110a37044b92423084d52d5d5a3b3a73cafb51b547e6d7366ff62eff1/greenlet-3.3.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d198d2d977460358c3b3a4dc844f875d1adb33817f0613f663a656f463764ccc", size = 1683653, upload-time = "2025-12-04T14:27:32.366Z" },
@@ -2032,6 +2053,7 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a0/66/bd6317bc5932accf351fc19f177ffba53712a202f9df10587da8df257c7e/greenlet-3.3.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d6ed6f85fae6cdfdb9ce04c9bf7a08d666cfcfb914e7d006f44f840b46741931", size = 282638, upload-time = "2025-12-04T14:25:20.941Z" },
{ url = "https://files.pythonhosted.org/packages/30/cf/cc81cb030b40e738d6e69502ccbd0dd1bced0588e958f9e757945de24404/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d9125050fcf24554e69c4cacb086b87b3b55dc395a8b3ebe6487b045b2614388", size = 651145, upload-time = "2025-12-04T14:50:11.039Z" },
{ url = "https://files.pythonhosted.org/packages/9c/ea/1020037b5ecfe95ca7df8d8549959baceb8186031da83d5ecceff8b08cd2/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:87e63ccfa13c0a0f6234ed0add552af24cc67dd886731f2261e46e241608bee3", size = 654236, upload-time = "2025-12-04T14:57:47.007Z" },
{ url = "https://files.pythonhosted.org/packages/69/cc/1e4bae2e45ca2fa55299f4e85854606a78ecc37fead20d69322f96000504/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:2662433acbca297c9153a4023fe2161c8dcfdcc91f10433171cf7e7d94ba2221", size = 662506, upload-time = "2025-12-04T15:07:16.906Z" },
{ url = "https://files.pythonhosted.org/packages/57/b9/f8025d71a6085c441a7eaff0fd928bbb275a6633773667023d19179fe815/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3c6e9b9c1527a78520357de498b0e709fb9e2f49c3a513afd5a249007261911b", size = 653783, upload-time = "2025-12-04T14:26:06.225Z" },
{ url = "https://files.pythonhosted.org/packages/f6/c7/876a8c7a7485d5d6b5c6821201d542ef28be645aa024cfe1145b35c120c1/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:286d093f95ec98fdd92fcb955003b8a3d054b4e2cab3e2707a5039e7b50520fd", size = 1614857, upload-time = "2025-12-04T15:04:28.484Z" },
{ url = "https://files.pythonhosted.org/packages/4f/dc/041be1dff9f23dac5f48a43323cd0789cb798342011c19a248d9c9335536/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6c10513330af5b8ae16f023e8ddbfb486ab355d04467c4679c5cfe4659975dd9", size = 1676034, upload-time = "2025-12-04T14:27:33.531Z" },
@@ -4259,6 +4281,9 @@ aws-nova-sonic = [
azure = [
{ name = "azure-cognitiveservices-speech" },
]
camb = [
{ name = "camb-sdk" },
]
cartesia = [
{ name = "cartesia" },
{ name = "websockets" },
@@ -4480,6 +4505,7 @@ requires-dist = [
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.2.0" },
{ name = "aws-sdk-sagemaker-runtime-http2", marker = "python_full_version >= '3.12' and extra == 'sagemaker'" },
{ name = "azure-cognitiveservices-speech", marker = "extra == 'azure'", specifier = "~=1.44.0" },
{ name = "camb-sdk", marker = "extra == 'camb'", specifier = ">=1.5.4" },
{ name = "cartesia", marker = "extra == 'cartesia'", specifier = "~=2.0.3" },
{ name = "coremltools", marker = "extra == 'local-smart-turn'", specifier = ">=8.0" },
{ name = "daily-python", marker = "extra == 'daily'", specifier = "~=0.23.0" },
@@ -4487,8 +4513,8 @@ requires-dist = [
{ name = "docstring-parser", specifier = "~=0.16" },
{ name = "einops", marker = "extra == 'moondream'", specifier = "~=0.8.0" },
{ name = "fal-client", marker = "extra == 'fal'", specifier = "~=0.5.9" },
{ name = "fastapi", marker = "extra == 'runner'", specifier = ">=0.115.6,<0.122.0" },
{ name = "fastapi", marker = "extra == 'websocket'", specifier = ">=0.115.6,<0.122.0" },
{ name = "fastapi", marker = "extra == 'runner'", specifier = ">=0.115.6,<0.128.0" },
{ name = "fastapi", marker = "extra == 'websocket'", specifier = ">=0.115.6,<0.128.0" },
{ name = "faster-whisper", marker = "extra == 'whisper'", specifier = "~=1.1.1" },
{ name = "google-cloud-speech", marker = "extra == 'google'", specifier = ">=2.33.0,<3" },
{ name = "google-cloud-texttospeech", marker = "extra == 'google'", specifier = ">=2.31.0,<3" },
@@ -4573,7 +4599,7 @@ requires-dist = [
{ name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1" },
{ name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" },
]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "krisp", "langchain", "livekit", "lmnt", "local", "local-smart-turn", "local-smart-turn-v3", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "rnnoise", "openpipe", "openrouter", "perplexity", "playht", "qwen", "remote-smart-turn", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "krisp", "langchain", "livekit", "lmnt", "local", "local-smart-turn", "local-smart-turn-v3", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "rnnoise", "openpipe", "openrouter", "perplexity", "playht", "qwen", "remote-smart-turn", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"]
[package.metadata.requires-dev]
dev = [
@@ -7558,6 +7584,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6e/d4/ed38dd3b1767193de971e694aa544356e63353c33a85d948166b5ff58b9e/watchfiles-1.1.1-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3e6f39af2eab0118338902798b5aa6664f46ff66bc0280de76fca67a7f262a49", size = 457546, upload-time = "2025-10-14T15:06:13.372Z" },
]
[[package]]
name = "websocket-client"
version = "1.9.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2c/41/aa4bf9664e4cda14c3b39865b12251e8e7d239f4cd0e3cc1b6c2ccde25c1/websocket_client-1.9.0.tar.gz", hash = "sha256:9e813624b6eb619999a97dc7958469217c3176312b3a16a4bd1bc7e08a46ec98", size = 70576, upload-time = "2025-10-07T21:16:36.495Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/34/db/b10e48aa8fff7407e67470363eac595018441cf32d5e1001567a7aeba5d2/websocket_client-1.9.0-py3-none-any.whl", hash = "sha256:af248a825037ef591efbf6ed20cc5faa03d3b47b9e5a2230a529eeee1c1fc3ef", size = 82616, upload-time = "2025-10-07T21:16:34.951Z" },
]
[[package]]
name = "websockets"
version = "13.1"