Compare commits

...

8 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
8750c26cdc Merge pull request #4080 from pipecat-ai/changelog-0.0.106
Release 0.0.106 - Changelog Update
2026-03-18 23:39:22 -07:00
aconchillo
3e0c536fe7 Update changelog for version 0.0.106 2026-03-18 23:36:18 -07:00
Aleix Conchillo Flaqué
7ee5fa9e20 Merge pull request #4079 from pipecat-ai/aleix/fix-tavus-dtmf-callback
Add missing on_dtmf_event callback to Tavus transport
2026-03-18 21:47:28 -07:00
Aleix Conchillo Flaqué
7dfcaf8096 Add missing on_dtmf_event callback to Tavus transport
The on_dtmf_event callback was added to DailyCallbacks in #4047 but
the Tavus transport was not updated, causing a missing argument error.
2026-03-18 21:46:06 -07:00
Filipi da Silva Fuchter
4aea7784c9 Fixed the ordering of _maybe_pause_frame_processing call in TTSService (#4071)
* Fixing the invocation of pause_frame_processing at the correct time when receiving LLMFullResponseEndFrame and EndFrame.
2026-03-18 16:55:59 -04:00
Mark Backman
bad10177d4 Add WakePhraseUserTurnStartStrategy (#4064)
- Add WakePhraseUserTurnStartStrategy for gating interaction behind wake                                                                            
  phrase detection, with timeout and single_activation modes                                                                                        
- Add default_user_turn_start_strategies() and                                                                                                      
  default_user_turn_stop_strategies() helper functions                                                                                              
- Deprecate WakeCheckFilter in favor of the new strategy
- Extend ProcessFrameResult to stop strategies for short-circuit evaluation
- Fix MinWordsUserTurnStartStrategy including filtered text in output
2026-03-18 16:47:17 -04:00
Mark Backman
c4be513044 Improvements for Nova Sonic LLM and TTS output frames (#4042)
* Fix empty user transcription causing spurious interruption in Nova Sonic

Skip _report_user_transcription_ended() when _user_text_buffer is empty,
which happens when the initial prompt is text-only. Previously, an empty
TranscriptionFrame was pushed upstream, triggering a chain reaction:
on_user_turn_stopped → UserStartedSpeakingFrame → interruption →
premature BotStoppedSpeaking → multiple response start/stop cycles.

* Improve TextFrame and assistant end of turn logic

Now, SPECULATIVE text results are used to push the LLMTextFrame,
AggregatedTextFrame, and TTSTextFrame. Additionally, the TTSTextFrames
are push at the end of the corresponding audio segment. 

* Remove BotStoppedSpeakingFrame fallback from Nova Sonic

Now that assistant response end is detected directly from Nova Sonic
contentEnd events (END_TURN and INTERRUPTED), the BotStoppedSpeakingFrame
handler is no longer needed. Inline the cleanup logic in reset_conversation.
2026-03-18 16:04:12 -04:00
Mark Backman
4b704e6d3a GradiumSTTService improvements (#4066)
* Remove duplicate reconnection logic from Gradium STT

The _receive_messages method had its own while-True reconnect loop,
duplicating the reconnection handling already provided by
WebsocketService._receive_task_handler (exponential backoff, max
retries, error reporting). Flatten to just the inner message loop
and let the base class handle reconnection.

* Align Gradium STT VAD handling with base class patterns

Replace the process_frame override with a _handle_vad_user_stopped_speaking
override, which is the proper hook provided by STTService. Move
start_processing_metrics() into run_stt (matching Gladia's pattern).
Remove unused FrameDirection and VADUserStartedSpeakingFrame imports.

* Add transcript aggregation delay after flushed to capture trailing tokens

Gradium flushed response can arrive before all text tokens have been
delivered. Instead of finalizing immediately on flushed, start a short
timer (100ms) that allows trailing tokens to accumulate before pushing
the final TranscriptionFrame.

* Add changelog for PR #4066

* Change default encoding to pcm_16000

* Decouple encoding from sample_rate in Gradium STT

The encoding parameter now takes just the base type (pcm, wav, opus)
and the sample rate is derived from the pipeline audio_in_sample_rate,
assembled dynamically via input_format_from_encoding(). This fixes the
mismatch where SAMPLE_RATE=24000 was passed to the base class while
encoding defaulted to pcm_16000.
2026-03-18 15:57:34 -04:00
48 changed files with 1254 additions and 264 deletions

View File

@@ -7,6 +7,218 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- towncrier release notes start -->
## [0.0.106] - 2026-03-18
### Added
- Added optional `service` field to `ServiceUpdateSettingsFrame` (and its
subclasses `LLMUpdateSettingsFrame`, `TTSUpdateSettingsFrame`,
`STTUpdateSettingsFrame`) to target a specific service instance. When
`service` is set, only the matching service applies the settings; others
forward the frame unchanged. This enables updating a single service when
multiple services of the same type exist in the pipeline.
(PR [#4004](https://github.com/pipecat-ai/pipecat/pull/4004))
- Added `sip_provider` and `room_geo` parameters to `configure()` in the Daily
runner. These convenience parameters let callers specify a SIP provider name
and geographic region directly without manually constructing
`DailyRoomProperties` and `DailyRoomSipParams`.
(PR [#4005](https://github.com/pipecat-ai/pipecat/pull/4005))
- Added `PerplexityLLMAdapter` that automatically transforms conversation
messages to satisfy Perplexity's stricter API constraints (strict role
alternation, no non-initial system messages, last message must be user/tool).
Previously, certain conversation histories could cause Perplexity API errors
that didn't occur with OpenAI (`PerplexityLLMService` subclasses
`OpenAILLMService` since Perplexity uses an OpenAI-compatible API).
(PR [#4009](https://github.com/pipecat-ai/pipecat/pull/4009))
- Added DTMF input event support to the Daily transport. Incoming DTMF tones
are now received via Daily's `on_dtmf_event` callback and pushed into the
pipeline as `InputDTMFFrame`, enabling bots to react to keypad presses from
phone callers.
(PR [#4047](https://github.com/pipecat-ai/pipecat/pull/4047))
- Added `WakePhraseUserTurnStartStrategy` for triggering user turns based on
wake phrases, with support for `single_activation` mode. Deprecates
`WakeCheckFilter`.
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
- Added `default_user_turn_start_strategies()` and
`default_user_turn_stop_strategies()` helper functions for composing custom
strategy lists.
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
### Changed
- Changed tool result JSON serialization to use `ensure_ascii=False`,
preserving UTF-8 characters instead of escaping them. This reduces context
size and token usage for non-English languages.
(PR [#3457](https://github.com/pipecat-ai/pipecat/pull/3457))
- `OpenAIRealtimeSTTService`'s `noise_reduction` parameter is now part of
`OpenAIRealtimeSTTSettings`, making it runtime-updatable via
`STTUpdateSettingsFrame`. The direct `noise_reduction` init argument is
deprecated as of 0.0.106.
(PR [#3991](https://github.com/pipecat-ai/pipecat/pull/3991))
- Updated `sarvamai` dependency from `0.1.26a2` (alpha) to `0.1.26` (stable
release).
(PR [#3997](https://github.com/pipecat-ai/pipecat/pull/3997))
- `SimliVideoService` now extends `AIService` instead of `FrameProcessor`,
aligning it with the HeyGen and Tavus video services. It supports
`SimliVideoService.Settings(...)` for configuration and uses
`start()`/`stop()`/`cancel()` lifecycle methods. Existing constructor usage
(`api_key`, `face_id`, etc.) remains unchanged.
(PR [#4001](https://github.com/pipecat-ai/pipecat/pull/4001))
- Update `pipecat-ai-small-webrtc-prebuilt` to `2.4.0`.
(PR [#4023](https://github.com/pipecat-ai/pipecat/pull/4023))
- Nova Sonic assistant text transcripts are now delivered in real-time using
speculative text events instead of delayed final text events. Previously,
assistant text only arrived after all audio had finished playing, causing
laggy transcripts in client UIs. Speculative text arrives before each audio
chunk, providing text synchronized with what the bot is saying. This also
simplifies the internal text handling by removing the interruption re-push
hack and assistant text buffer.
(PR [#4042](https://github.com/pipecat-ai/pipecat/pull/4042))
- Updated `daily-python` dependency to 0.25.0.
(PR [#4047](https://github.com/pipecat-ai/pipecat/pull/4047))
- Added `enable_dialout` parameter to `configure()` in `pipecat.runner.daily`
to support dial-out rooms. Also narrowed misleading `Optional` type hints and
deduplicated token expiry calculation.
(PR [#4048](https://github.com/pipecat-ai/pipecat/pull/4048))
- Extended `ProcessFrameResult` to stop strategies, allowing a stop strategy to
short-circuit evaluation of subsequent strategies by returning `STOP`.
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
- `GradiumSTTService` now takes both an `encoding` and `sample_rate`
constructor argument which is assmebled in the class to form the
`input_format`. PCM accepts `8000`, `16000`, and `24000` Hz sample rates.
(PR [#4066](https://github.com/pipecat-ai/pipecat/pull/4066))
- Improved `GradiumSTTService` transcription accuracy by reworking how text
fragments are accumulated and finalized. Previously, trailing words could be
dropped when the server's `flushed` response arrived before all text tokens
were delivered. The service now uses a short aggregation delay after flush to
capture trailing tokens, producing complete utterances.
(PR [#4066](https://github.com/pipecat-ai/pipecat/pull/4066))
### Deprecated
- `SimliVideoService.InputParams` is deprecated. Use the direct constructor
parameters `max_session_length`, `max_idle_time`, and `enable_logging`
instead.
(PR [#4001](https://github.com/pipecat-ai/pipecat/pull/4001))
- Deprecated `LocalSmartTurnAnalyzerV2` and `LocalCoreMLSmartTurnAnalyzer`. Use
`LocalSmartTurnAnalyzerV3` instead. Instantiating these analyzers will now
emit a `DeprecationWarning`.
(PR [#4012](https://github.com/pipecat-ai/pipecat/pull/4012))
- Deprecated `WakeCheckFilter` in favor of `WakePhraseUserTurnStartStrategy`.
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
### Fixed
- Fixed an issue where the default model for `OpenAILLMService` and
`AzureLLMService` was mistakenly reverted to `gpt-4o`. The defaults are now
restored to `gpt-4.1`.
(PR [#4000](https://github.com/pipecat-ai/pipecat/pull/4000))
- Fixed a race condition where `EndTaskFrame` could cause the pipeline to shut
down before in-flight frames (e.g. LLM function call responses) finished
processing. `EndTaskFrame` and `StopTaskFrame` now flow through the pipeline
as `ControlFrame`s, ensuring all pending work is flushed before shutdown
begins. `CancelTaskFrame` and `InterruptionTaskFrame` remain immediate
(`SystemFrame`).
(PR [#4006](https://github.com/pipecat-ai/pipecat/pull/4006))
- Fixed `ParallelPipeline` dropping or misordering frames during lifecycle
synchronization. Buffered frames are now flushed in the correct order
relative to synchronization frames (`StartFrame` goes first,
`EndFrame`/`CancelFrame` go after), and frames added to the buffer during
flush are also drained.
(PR [#4007](https://github.com/pipecat-ai/pipecat/pull/4007))
- Fixed `TTSService` potentially canceling in-flight audio during shutdown. The
stop sequence now waits for all queued audio contexts to finish processing
before canceling the stop frame task.
(PR [#4007](https://github.com/pipecat-ai/pipecat/pull/4007))
- Fixed `Language` enum values (e.g. `Language.ES`) not being converted to
service-specific codes when passed via
`settings=Service.Settings(language=Language.ES)` at init time. This caused
API errors (e.g. 400 from Rime) because the raw enum was sent instead of the
expected language code (e.g. `"spa"`). Runtime updates via
`UpdateSettingsFrame` were unaffected. The fix centralizes conversion in the
base `TTSService` and `STTService` classes so all services handle this
consistently.
(PR [#4024](https://github.com/pipecat-ai/pipecat/pull/4024))
- Fixed `DeepgramSTTService` ignoring the `base_url` scheme when using `ws://`
or `http://`. Previously these were silently overwritten with `wss://` /
`https://`, breaking air-gapped or private deployments that don't use TLS.
All scheme choices (`wss://`, `https://`, `ws://`, `http://`, or bare
hostname) are now respected.
(PR [#4026](https://github.com/pipecat-ai/pipecat/pull/4026))
- Fixed `LLMSwitcher.register_function()` and `register_direct_function()` not
accepting or forwarding the `timeout_secs` parameter.
(PR [#4037](https://github.com/pipecat-ai/pipecat/pull/4037))
- Fixed empty user transcriptions in Nova Sonic causing spurious interruptions.
Previously, an empty transcription could trigger an interruption of the
assistant's response even though the user hadn't actually spoken.
(PR [#4042](https://github.com/pipecat-ai/pipecat/pull/4042))
- Fixed `SonioxSTTService` and `OpenAIRealtimeSTTService` crash when language
parameters contain plain strings instead of `Language` enum values.
(PR [#4046](https://github.com/pipecat-ai/pipecat/pull/4046))
- Fixed premature user turn stops caused by late transcriptions arriving
between turns. A stale transcript from the previous turn could persist into
the next turn and trigger a stop before the current turn's real transcript
arrived. Stop strategies are now reset at both turn start and turn stop to
prevent state from leaking across turn boundaries.
(PR [#4057](https://github.com/pipecat-ai/pipecat/pull/4057))
- Fixed raw language strings like `"de-DE"` silently failing when passed to
TTS/STT services (e.g. ElevenLabs producing no audio). Raw strings now go
through the same `Language` enum resolution as enum values, so regional codes
like `"de-DE"` are properly converted to service-expected formats like
`"de"`. Unrecognized strings log a warning instead of failing silently.
(PR [#4058](https://github.com/pipecat-ai/pipecat/pull/4058))
- Fixed Deepgram STT list-type settings (`keyterm`, `keywords`, `search`,
`redact`, `replace`) being stringified instead of passed as lists to the SDK,
which caused them to be sent as literal strings (e.g. `"['pipecat']"`) in the
WebSocket query params.
(PR [#4063](https://github.com/pipecat-ai/pipecat/pull/4063))
- Fixed `MinWordsUserTurnStartStrategy` including text below the word threshold
in the output by resetting aggregation when the minimum word count is not
met.
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
- Fixed audio overlap and potential dropped TTS content when multiple assistant
turns occur in quick succession. `TTSService` now flushes remaining text
before pausing frame processing on `LLMFullResponseEndFrame`/`EndFrame`,
instead of pausing first.
(PR [#4071](https://github.com/pipecat-ai/pipecat/pull/4071))
### Security
- Bumped PyJWT minimum version from 2.10.1 to 2.12.0 in the `livekit` extra to
address CVE-2026-32597 (GHSA-752w-5fwx-jx9f), where PyJWT <= 2.11.0 accepted
unknown `crit` header extensions.
(PR [#4035](https://github.com/pipecat-ai/pipecat/pull/4035))
## [0.0.105] - 2026-03-10
### Added

View File

@@ -1 +0,0 @@
- Changed tool result JSON serialization to use `ensure_ascii=False`, preserving UTF-8 characters instead of escaping them. This reduces context size and token usage for non-English languages.

View File

@@ -1 +0,0 @@
- `OpenAIRealtimeSTTService`'s `noise_reduction` parameter is now part of `OpenAIRealtimeSTTSettings`, making it runtime-updatable via `STTUpdateSettingsFrame`. The direct `noise_reduction` init argument is deprecated as of 0.0.106.

View File

@@ -1 +0,0 @@
- Updated `sarvamai` dependency from `0.1.26a2` (alpha) to `0.1.26` (stable release).

View File

@@ -1 +0,0 @@
- Fixed an issue where the default model for `OpenAILLMService` and `AzureLLMService` was mistakenly reverted to `gpt-4o`. The defaults are now restored to `gpt-4.1`.

View File

@@ -1 +0,0 @@
- `SimliVideoService` now extends `AIService` instead of `FrameProcessor`, aligning it with the HeyGen and Tavus video services. It supports `SimliVideoService.Settings(...)` for configuration and uses `start()`/`stop()`/`cancel()` lifecycle methods. Existing constructor usage (`api_key`, `face_id`, etc.) remains unchanged.

View File

@@ -1 +0,0 @@
- `SimliVideoService.InputParams` is deprecated. Use the direct constructor parameters `max_session_length`, `max_idle_time`, and `enable_logging` instead.

View File

@@ -1 +0,0 @@
- Added optional `service` field to `ServiceUpdateSettingsFrame` (and its subclasses `LLMUpdateSettingsFrame`, `TTSUpdateSettingsFrame`, `STTUpdateSettingsFrame`) to target a specific service instance. When `service` is set, only the matching service applies the settings; others forward the frame unchanged. This enables updating a single service when multiple services of the same type exist in the pipeline.

View File

@@ -1 +0,0 @@
- Added `sip_provider` and `room_geo` parameters to `configure()` in the Daily runner. These convenience parameters let callers specify a SIP provider name and geographic region directly without manually constructing `DailyRoomProperties` and `DailyRoomSipParams`.

View File

@@ -1 +0,0 @@
- Fixed a race condition where `EndTaskFrame` could cause the pipeline to shut down before in-flight frames (e.g. LLM function call responses) finished processing. `EndTaskFrame` and `StopTaskFrame` now flow through the pipeline as `ControlFrame`s, ensuring all pending work is flushed before shutdown begins. `CancelTaskFrame` and `InterruptionTaskFrame` remain immediate (`SystemFrame`).

View File

@@ -1 +0,0 @@
- Fixed `TTSService` potentially canceling in-flight audio during shutdown. The stop sequence now waits for all queued audio contexts to finish processing before canceling the stop frame task.

View File

@@ -1 +0,0 @@
- Fixed `ParallelPipeline` dropping or misordering frames during lifecycle synchronization. Buffered frames are now flushed in the correct order relative to synchronization frames (`StartFrame` goes first, `EndFrame`/`CancelFrame` go after), and frames added to the buffer during flush are also drained.

View File

@@ -1 +0,0 @@
- Added `PerplexityLLMAdapter` that automatically transforms conversation messages to satisfy Perplexity's stricter API constraints (strict role alternation, no non-initial system messages, last message must be user/tool). Previously, certain conversation histories could cause Perplexity API errors that didn't occur with OpenAI (`PerplexityLLMService` subclasses `OpenAILLMService` since Perplexity uses an OpenAI-compatible API).

View File

@@ -1 +0,0 @@
- Deprecated `LocalSmartTurnAnalyzerV2` and `LocalCoreMLSmartTurnAnalyzer`. Use `LocalSmartTurnAnalyzerV3` instead. Instantiating these analyzers will now emit a `DeprecationWarning`.

View File

@@ -1 +0,0 @@
- Update `pipecat-ai-small-webrtc-prebuilt` to `2.4.0`.

View File

@@ -1 +0,0 @@
- Fixed `Language` enum values (e.g. `Language.ES`) not being converted to service-specific codes when passed via `settings=Service.Settings(language=Language.ES)` at init time. This caused API errors (e.g. 400 from Rime) because the raw enum was sent instead of the expected language code (e.g. `"spa"`). Runtime updates via `UpdateSettingsFrame` were unaffected. The fix centralizes conversion in the base `TTSService` and `STTService` classes so all services handle this consistently.

View File

@@ -1 +0,0 @@
- Fixed `DeepgramSTTService` ignoring the `base_url` scheme when using `ws://` or `http://`. Previously these were silently overwritten with `wss://` / `https://`, breaking air-gapped or private deployments that don't use TLS. All scheme choices (`wss://`, `https://`, `ws://`, `http://`, or bare hostname) are now respected.

View File

@@ -1 +0,0 @@
- Bumped PyJWT minimum version from 2.10.1 to 2.12.0 in the `livekit` extra to address CVE-2026-32597 (GHSA-752w-5fwx-jx9f), where PyJWT <= 2.11.0 accepted unknown `crit` header extensions.

View File

@@ -1 +0,0 @@
- Fixed `LLMSwitcher.register_function()` and `register_direct_function()` not accepting or forwarding the `timeout_secs` parameter.

View File

@@ -1 +0,0 @@
Fixed `SonioxSTTService` and `OpenAIRealtimeSTTService` crash when language parameters contain plain strings instead of `Language` enum values.

View File

@@ -1 +0,0 @@
- Added DTMF input event support to the Daily transport. Incoming DTMF tones are now received via Daily's `on_dtmf_event` callback and pushed into the pipeline as `InputDTMFFrame`, enabling bots to react to keypad presses from phone callers.

View File

@@ -1 +0,0 @@
- Updated `daily-python` dependency to 0.25.0.

View File

@@ -1 +0,0 @@
- Added `enable_dialout` parameter to `configure()` in `pipecat.runner.daily` to support dial-out rooms. Also narrowed misleading `Optional` type hints and deduplicated token expiry calculation.

View File

@@ -1 +0,0 @@
- Fixed premature user turn stops caused by late transcriptions arriving between turns. A stale transcript from the previous turn could persist into the next turn and trigger a stop before the current turn's real transcript arrived. Stop strategies are now reset at both turn start and turn stop to prevent state from leaking across turn boundaries.

View File

@@ -1 +0,0 @@
- Fixed raw language strings like `"de-DE"` silently failing when passed to TTS/STT services (e.g. ElevenLabs producing no audio). Raw strings now go through the same `Language` enum resolution as enum values, so regional codes like `"de-DE"` are properly converted to service-expected formats like `"de"`. Unrecognized strings log a warning instead of failing silently.

View File

@@ -1 +0,0 @@
- Fixed Deepgram STT list-type settings (`keyterm`, `keywords`, `search`, `redact`, `replace`) being stringified instead of passed as lists to the SDK, which caused them to be sent as literal strings (e.g. `"['pipecat']"`) in the WebSocket query params.

View File

@@ -19,7 +19,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -28,6 +27,11 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_start import WakePhraseUserTurnStartStrategy
from pipecat.turns.user_turn_strategies import (
UserTurnStrategies,
default_user_turn_start_strategies,
)
load_dotenv(override=True)
@@ -52,7 +56,12 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
settings=DeepgramSTTService.Settings(
keyterm=["pipecat"],
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -68,19 +77,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
hey_robot_filter = WakeCheckFilter(["hey robot", "hey, robot"])
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
start=[
WakePhraseUserTurnStartStrategy(
phrases=["pipecat"],
# Timeout before wake phrase must be spoken again
timeout=5.0,
),
*default_user_turn_start_strategies(),
]
),
vad_analyzer=SileroVADAnalyzer(),
),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
hey_robot_filter, # Filter out speech not directed at the robot
stt,
user_aggregator, # User responses
llm, # LLM
tts, # TTS
@@ -102,12 +120,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{
"role": "user",
"content": "Please introduce yourself. Tell the user they should say 'Hey Robot' before talking to you.",
}
)
context.add_message({"role": "user", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -447,6 +447,9 @@ class LLMUserAggregator(LLMContextAggregator):
self._user_turn_controller.add_event_handler(
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
)
self._user_turn_controller.add_event_handler(
"on_reset_aggregation", self._on_reset_aggregation
)
self._user_idle_controller = UserIdleController(
user_idle_timeout=self._params.user_idle_timeout
@@ -748,6 +751,12 @@ class LLMUserAggregator(LLMContextAggregator):
await self._maybe_emit_user_turn_stopped(strategy)
async def _on_reset_aggregation(
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
):
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
await self.reset()
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")

View File

@@ -6,6 +6,9 @@
"""Wake phrase detection filter for Pipecat transcription processing.
.. deprecated:: 0.0.106
Use :class:`~pipecat.turns.user_start.WakePhraseUserTurnStartStrategy` instead.
This module provides a frame processor that filters transcription frames,
only allowing them through after wake phrases have been detected. Includes
keepalive functionality to maintain conversation flow after wake detection.
@@ -13,6 +16,7 @@ keepalive functionality to maintain conversation flow after wake detection.
import re
import time
import warnings
from enum import Enum
from typing import List
@@ -25,6 +29,11 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class WakeCheckFilter(FrameProcessor):
"""Frame processor that filters transcription frames based on wake phrase detection.
.. deprecated:: 0.0.106
Use :class:`~pipecat.turns.user_start.WakePhraseUserTurnStartStrategy` instead,
which integrates with the user turn strategy system and supports configurable
timeouts and single-activation mode.
This filter monitors transcription frames for configured wake phrases and only
passes frames through after a wake phrase has been detected. Maintains a
keepalive timeout to allow continued conversation after wake detection.
@@ -65,12 +74,21 @@ class WakeCheckFilter(FrameProcessor):
def __init__(self, wake_phrases: List[str], keepalive_timeout: float = 3):
"""Initialize the wake phrase filter.
.. deprecated:: 0.0.106
Use :class:`~pipecat.turns.user_start.WakePhraseUserTurnStartStrategy` instead.
Args:
wake_phrases: List of wake phrases to detect in transcriptions.
keepalive_timeout: Duration in seconds to keep passing frames after
wake detection. Defaults to 3 seconds.
"""
super().__init__()
warnings.warn(
"WakeCheckFilter is deprecated since v0.0.106. "
"Use WakePhraseUserTurnStartStrategy instead.",
DeprecationWarning,
stacklevel=2,
)
self._participant_states = {}
self._keepalive_timeout = keepalive_timeout
self._wake_patterns = []

View File

@@ -27,8 +27,8 @@ from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
from pipecat.frames.frames import (
AggregatedTextFrame,
AggregationType,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
@@ -424,18 +424,16 @@ class AWSNovaSonicLLMService(LLMService):
self._input_audio_content_name: Optional[str] = None
self._content_being_received: Optional[CurrentContent] = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time: Optional[float] = None
self._wants_connection = False
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
self._audio_input_started = False
self._pending_speculative_text: Optional[str] = None
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
@@ -505,11 +503,13 @@ class AWSNovaSonicLLMService(LLMService):
async def reset_conversation(self):
"""Reset the conversation state while preserving context.
Handles bot stopped speaking event, disconnects from the service,
and reconnects with the preserved context.
Cleans up any in-progress assistant response, disconnects from the
service, and reconnects with the preserved context.
"""
logger.debug("Resetting conversation")
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
if self._assistant_is_responding:
self._assistant_is_responding = False
await self._report_assistant_response_ended()
# Grab context to carry through disconnect/reconnect
context = self._context
@@ -540,8 +540,6 @@ class AWSNovaSonicLLMService(LLMService):
await self._handle_context(context)
elif isinstance(frame, InputAudioRawFrame):
await self._handle_input_audio_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption_frame()
@@ -569,49 +567,8 @@ class AWSNovaSonicLLMService(LLMService):
await self._send_user_audio_event(frame.audio)
async def _handle_bot_stopped_speaking(self, delay_to_catch_trailing_assistant_text: bool):
# Protect against back-to-back BotStoppedSpeaking calls, which I've observed
if self._handling_bot_stopped_speaking:
return
self._handling_bot_stopped_speaking = True
async def finalize_assistant_response():
if self._assistant_is_responding:
# Consider the assistant finished with their response (possibly after a short delay,
# to allow for any trailing FINAL assistant text block to come in that need to make
# it into context).
#
# TODO: ideally we could base this solely on the LLM output events, but I couldn't
# figure out a reliable way to determine when we've gotten our last FINAL text block
# after the LLM is done talking.
#
# First I looked at stopReason, but it doesn't seem like the last FINAL text block
# is reliably marked END_TURN (sometimes the *first* one is, but not the last...
# bug?)
#
# Then I considered schemes where we tally or match up SPECULATIVE text blocks with
# FINAL text blocks to know how many or which FINAL blocks to expect, but user
# interruptions throw a wrench in these schemes: depending on the exact timing of
# the interruption, we should or shouldn't expect some FINAL blocks.
if delay_to_catch_trailing_assistant_text:
# This delay length is a balancing act between "catching" trailing assistant
# text that is quite delayed but not waiting so long that user text comes in
# first and results in a bit of context message order scrambling.
await asyncio.sleep(1.25)
self._assistant_is_responding = False
await self._report_assistant_response_ended()
self._handling_bot_stopped_speaking = False
# Finalize the assistant response, either now or after a delay
if delay_to_catch_trailing_assistant_text:
self.create_task(finalize_assistant_response())
else:
await finalize_assistant_response()
async def _handle_interruption_frame(self):
if self._assistant_is_responding:
self._may_need_repush_assistant_text = True
pass
#
# LLM communication: lifecycle
@@ -771,17 +728,15 @@ class AWSNovaSonicLLMService(LLMService):
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time = None
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
self._audio_input_started = False
self._pending_speculative_text = None
logger.info("Finished disconnecting")
except Exception as e:
@@ -1153,10 +1108,11 @@ class AWSNovaSonicLLMService(LLMService):
self._content_being_received = content
if content.role == Role.ASSISTANT:
if content.type == ContentType.AUDIO:
# Note that an assistant response can comprise of multiple audio blocks
if not self._assistant_is_responding:
# The assistant has started responding.
if content.type == ContentType.TEXT:
if (
content.text_stage == TextStage.SPECULATIVE
and not self._assistant_is_responding
):
self._assistant_is_responding = True
await self._report_user_transcription_ended() # Consider user turn over
await self._report_assistant_response_started()
@@ -1232,18 +1188,30 @@ class AWSNovaSonicLLMService(LLMService):
if content.role == Role.ASSISTANT:
if content.type == ContentType.TEXT:
# Ignore non-final text, and the "interrupted" message (which isn't meaningful text)
if content.text_stage == TextStage.FINAL and stop_reason != "INTERRUPTED":
if self._assistant_is_responding:
# Text added to the ongoing assistant response
await self._report_assistant_response_text_added(content.text_content)
if stop_reason != "INTERRUPTED":
if content.text_stage == TextStage.SPECULATIVE:
await self._report_llm_text(content.text_content)
elif self._assistant_is_responding:
# TEXT INTERRUPTED with no audio means the user interrupted
# before audio started. End the response here since no AUDIO
# contentEnd will arrive.
self._assistant_is_responding = False
await self._report_assistant_response_ended()
elif content.type == ContentType.AUDIO:
# Emit deferred TTSTextFrame after all audio chunks have been sent
await self._report_tts_text()
if stop_reason in ("END_TURN", "INTERRUPTED"):
# END_TURN: normal completion. INTERRUPTED: user interrupted
# mid-audio. Both mean no more audio for this turn.
self._assistant_is_responding = False
await self._report_assistant_response_ended()
elif content.role == Role.USER:
if content.type == ContentType.TEXT:
if content.text_stage == TextStage.FINAL:
# User transcription text added
await self._report_user_transcription_text_added(content.text_content)
async def _handle_completion_end_event(self, event_json):
async def _handle_completion_end_event(self, _):
pass
#
@@ -1256,29 +1224,40 @@ class AWSNovaSonicLLMService(LLMService):
async def _report_assistant_response_started(self):
logger.debug("Assistant response started")
# Report the start of the assistant response.
await self.push_frame(LLMFullResponseStartFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) started
await self.push_frame(TTSStartedFrame())
async def _report_assistant_response_text_added(self, text):
if not self._context: # should never happen
return
async def _report_llm_text(self, text):
"""Push speculative assistant text and defer TTSTextFrame.
logger.debug(f"Assistant response text added: {text}")
Speculative text arrives before each audio chunk, providing real-time
text that is synchronized with what the bot is saying. LLMTextFrame and
AggregatedTextFrame are pushed immediately for real-time text display.
TTSTextFrame emission is deferred to audio contentEnd so it aligns with
audio playout timing.
"""
logger.debug(f"Assistant speculative text: {text}")
# Report the text of the assistant response.
await self._push_assistant_response_text_frames(text)
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# HACK: here we're also buffering the assistant text ourselves as a
# backup rather than relying solely on the assistant context aggregator
# to do it, because the text arrives from Nova Sonic only after all the
# assistant audio frames have been pushed, meaning that if an
# interruption frame were to arrive we would lose all of it (the text
# frames sitting in the queue would be wiped).
self._assistant_text_buffer += text
aggregated_text_frame = AggregatedTextFrame(text, aggregated_by=AggregationType.SENTENCE)
aggregated_text_frame.append_to_context = False
await self.push_frame(aggregated_text_frame)
self._pending_speculative_text = text
async def _report_tts_text(self):
if self._pending_speculative_text:
tts_text_frame = TTSTextFrame(
self._pending_speculative_text, aggregated_by=AggregationType.SENTENCE
)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
self._pending_speculative_text = None
async def _report_assistant_response_ended(self):
if not self._context: # should never happen
@@ -1286,54 +1265,12 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Assistant response ended")
# If an interruption frame arrived while the assistant was responding
# we may have lost all of the assistant text (see HACK, above), so
# re-push it downstream to the aggregator now.
if self._may_need_repush_assistant_text:
# Just in case, check that assistant text hasn't already made it
# into the context (sometimes it does, despite the interruption).
messages = self._context.get_messages()
last_message = messages[-1] if messages else None
if (
not last_message
or last_message.get("role") != "assistant"
or last_message.get("content") != self._assistant_text_buffer
):
# We also need to re-push the LLMFullResponseStartFrame since the
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
await self._push_assistant_response_text_frames(self._assistant_text_buffer)
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
await self.push_frame(LLMFullResponseEndFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())
# Clear out the buffered assistant text
self._assistant_text_buffer = ""
async def _push_assistant_response_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Nova Sonic combines both LLM and TTS functionality, you
# would think we wouldn't need to push LLMTextFrames at all. However,
# RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
#
# user transcription reporting
#
@@ -1363,6 +1300,12 @@ class AWSNovaSonicLLMService(LLMService):
if not self._context: # should never happen
return
# Nothing to report if no user speech was transcribed (e.g. the prompt
# was text-only, which is the case on the first user turn when the bot
# starts the conversation).
if not self._user_text_buffer:
return
logger.debug(f"User transcription ended")
# Report to the upstream user context aggregator that some new user

View File

@@ -10,6 +10,7 @@ This module provides integration with Gradium's real-time speech-to-text
WebSocket API for streaming audio transcription.
"""
import asyncio
import base64
import json
from dataclasses import dataclass, field
@@ -22,6 +23,7 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
@@ -43,7 +45,37 @@ except ModuleNotFoundError as e:
logger.error('In order to use Gradium, you need to `pip install "pipecat-ai[gradium]"`.')
raise Exception(f"Missing module: {e}")
SAMPLE_RATE = 24000
# Seconds to wait after a "flushed" message for trailing text tokens to arrive
# before finalizing the transcription.
TRANSCRIPT_AGGREGATION_DELAY = 0.1
def _input_format_from_encoding(encoding: str, sample_rate: int) -> str:
"""Build Gradium input_format from encoding type and sample rate.
For PCM encoding, appends the sample rate (e.g., "pcm_16000").
For other encodings (wav, opus), returns the encoding as-is.
Args:
encoding: Base encoding type ("pcm", "wav", or "opus").
sample_rate: Audio sample rate in Hz.
Returns:
The full input_format string for the Gradium API.
"""
if encoding == "pcm":
match sample_rate:
case 8000:
return "pcm_8000"
case 16000:
return "pcm_16000"
case 24000:
return "pcm_24000"
logger.warning(
f"GradiumSTTService: unsupported sample rate {sample_rate} for PCM encoding, using pcm_16000"
)
return "pcm_16000"
return encoding
def language_to_gradium_language(language: Language) -> Optional[str]:
@@ -115,6 +147,8 @@ class GradiumSTTService(WebsocketSTTService):
*,
api_key: str,
api_endpoint_base_url: str = "wss://eu.api.gradium.ai/api/speech/asr",
encoding: str = "pcm",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
json_config: Optional[str] = None,
settings: Optional[Settings] = None,
@@ -126,6 +160,12 @@ class GradiumSTTService(WebsocketSTTService):
Args:
api_key: Gradium API key for authentication.
api_endpoint_base_url: WebSocket endpoint URL. Defaults to Gradium's streaming endpoint.
encoding: Base audio encoding type. One of "pcm", "wav", or "opus".
For PCM, the sample rate is appended automatically from the
pipeline's audio_in_sample_rate (e.g., "pcm" becomes "pcm_16000").
Defaults to "pcm".
sample_rate: Audio sample rate in Hz. If None, uses the pipeline
sample rate.
params: Configuration parameters for language and delay settings.
.. deprecated:: 0.0.105
@@ -153,7 +193,7 @@ class GradiumSTTService(WebsocketSTTService):
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model=None,
model="default",
language=None,
delay_in_frames=None,
)
@@ -173,7 +213,7 @@ class GradiumSTTService(WebsocketSTTService):
default_settings.apply_update(settings)
super().__init__(
sample_rate=SAMPLE_RATE,
sample_rate=sample_rate,
ttfs_p99_latency=ttfs_p99_latency,
settings=default_settings,
**kwargs,
@@ -181,19 +221,25 @@ class GradiumSTTService(WebsocketSTTService):
self._api_key = api_key
self._api_endpoint_base_url = api_endpoint_base_url
self._encoding = encoding
self._websocket = None
self._json_config = json_config
self._receive_task = None
self._input_format = ""
self._audio_buffer = bytearray()
self._chunk_size_ms = 80
self._chunk_size_bytes = 0
# Set from the ready message when connecting to the service.
# These values are used for flushing transcription.
self._delay_in_frames = 0
self._frame_size = 0
# Accumulates text fragments within a turn. Each "text" message
# appends to this list. On "flushed" a short aggregation delay
# allows trailing tokens to arrive before the full text is joined
# and pushed as a TranscriptionFrame.
self._accumulated_text: list[str] = []
self._flush_counter = 0
self._transcript_aggregation_task: Optional[asyncio.Task] = None
def can_generate_metrics(self) -> bool:
"""Check if the service can generate metrics.
@@ -228,6 +274,7 @@ class GradiumSTTService(WebsocketSTTService):
frame: Start frame to begin processing.
"""
await super().start(frame)
self._input_format = _input_format_from_encoding(self._encoding, self.sample_rate)
self._chunk_size_bytes = int(self._chunk_size_ms * self.sample_rate * 2 / 1000)
await self._connect()
@@ -249,56 +296,41 @@ class GradiumSTTService(WebsocketSTTService):
await super().cancel(frame)
await self._disconnect()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with VAD-specific handling.
async def _start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_processing_metrics()
When VAD detects the user has stopped speaking, we flush the transcription
by sending silence frames. This makes the system more reactive by getting
the final transcription faster without closing the connection.
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
Args:
frame: The frame to process.
direction: The direction of frame processing.
direction: Direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, VADUserStartedSpeakingFrame):
await self.start_processing_metrics()
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
await self._flush_transcription()
await self._send_flush()
async def _flush_transcription(self):
"""Flush the transcription by sending silence frames.
async def _send_flush(self):
"""Send a flush request to process any buffered audio immediately.
When VAD detects the user stopped speaking, we send delay_in_frames
chunks of silence (zeros) to flush the remaining audio from the model's
buffer. This allows for faster turn-around without closing the connection.
From Gradium docs: "feed in delay_in_frames chunks of silence (vectors
of zeros). If those are fed in faster than realtime, the API also has
a possibility to process them faster."
Sends a flush message to tell the server to process buffered audio.
The server responds with text fragments followed by a "flushed"
acknowledgment, which triggers finalization.
"""
if not self._websocket or self._websocket.state is not State.OPEN:
return
if self._delay_in_frames <= 0:
logger.debug("No delay_in_frames set, skipping flush")
return
# Create a silence chunk (zeros) of frame_size samples
# Each sample is 2 bytes (16-bit PCM)
silence_bytes = bytes(self._frame_size * 2)
silence_b64 = base64.b64encode(silence_bytes).decode("utf-8")
logger.debug(f"Flushing Gradium STT with {self._delay_in_frames} silence frames")
for _ in range(self._delay_in_frames):
msg = {"type": "audio", "audio": silence_b64}
try:
await self._websocket.send(json.dumps(msg))
except Exception as e:
logger.warning(f"Failed to send silence frame: {e}")
break
self._flush_counter += 1
flush_id = str(self._flush_counter)
msg = {"type": "flush", "flush_id": flush_id}
try:
await self._websocket.send(json.dumps(msg))
except Exception as e:
logger.warning(f"Failed to send flush: {e}")
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process audio data for speech-to-text conversion.
@@ -353,7 +385,8 @@ class GradiumSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
setup_msg = {
"type": "setup",
"input_format": "pcm",
"model_name": self._settings.model,
"input_format": self._input_format,
}
# Build json_config: start with deprecated json_config, then override with params
json_config = {}
@@ -375,13 +408,7 @@ class GradiumSTTService(WebsocketSTTService):
if ready_msg["type"] != "ready":
raise Exception(f"unexpected first message type {ready_msg['type']}")
# Store delay_in_frames and frame_size for silence flushing
self._delay_in_frames = ready_msg.get("delay_in_frames", 0)
self._frame_size = ready_msg.get("frame_size", 1920)
logger.debug(
f"Connected to Gradium STT (delay_in_frames={self._delay_in_frames}, "
f"frame_size={self._frame_size})"
)
logger.debug("Connected to Gradium STT")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
@@ -390,6 +417,13 @@ class GradiumSTTService(WebsocketSTTService):
async def _disconnect(self):
await super()._disconnect()
if self._transcript_aggregation_task:
await self.cancel_task(self._transcript_aggregation_task)
self._transcript_aggregation_task = None
self._accumulated_text.clear()
self._flush_counter = 0
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
@@ -412,41 +446,75 @@ class GradiumSTTService(WebsocketSTTService):
return self._websocket
raise Exception("Websocket not connected")
async def _process_messages(self):
async def _receive_messages(self):
async for message in self._get_websocket():
try:
data = json.loads(message)
await self._process_response(data)
msg = json.loads(message)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
continue
async def _receive_messages(self):
while True:
await self._process_messages()
logger.debug(f"{self} Gradium connection was disconnected (timeout?), reconnecting")
await self._connect_websocket()
async def _process_response(self, msg):
type_ = msg.get("type", "")
if type_ == "text":
await self._handle_text(msg["text"])
elif type_ == "end_of_stream":
await self._handle_end_of_stream()
elif type_ == "error":
await self.push_error(error_msg=f"Error: {msg}")
async def _handle_end_of_stream(self):
"""Handle termination message."""
logger.debug("Received end_of_stream message from server")
type_ = msg.get("type", "")
if type_ == "text":
await self._handle_text(msg["text"])
elif type_ == "flushed":
await self._handle_flushed()
elif type_ == "end_of_stream":
logger.debug("Received end_of_stream message from server")
elif type_ == "error":
await self.push_error(error_msg=f"Error: {msg}")
async def _handle_text(self, text: str):
"""Handle transcription results."""
"""Handle streaming transcription fragment.
Accumulates text and pushes an InterimTranscriptionFrame with the
full accumulated text so far.
"""
self._accumulated_text.append(text)
accumulated = " ".join(self._accumulated_text)
await self.push_frame(
InterimTranscriptionFrame(
text=accumulated,
user_id=self._user_id,
timestamp=time_now_iso8601(),
language=self._settings.language,
)
)
await self.stop_processing_metrics()
async def _handle_flushed(self):
"""Handle flush completion by starting a transcript aggregation timer.
The "flushed" message confirms that buffered audio has been processed,
but text tokens may still arrive after this point. A short timer allows
trailing tokens to accumulate before finalizing the transcription.
"""
if self._transcript_aggregation_task:
await self.cancel_task(self._transcript_aggregation_task)
self._transcript_aggregation_task = self.create_task(
self._transcript_aggregation_handler(), "transcript_aggregation"
)
async def _transcript_aggregation_handler(self):
"""Wait for trailing tokens then finalize the accumulated transcription."""
await asyncio.sleep(TRANSCRIPT_AGGREGATION_DELAY)
await self._finalize_accumulated_text()
async def _finalize_accumulated_text(self):
"""Join accumulated text, push TranscriptionFrame, and clear state."""
if not self._accumulated_text:
return
self._transcript_aggregation_task = None
text = " ".join(self._accumulated_text)
self._accumulated_text.clear()
logger.debug(f"Final transcription: [{text}]")
await self.push_frame(
TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._settings.language,
)
)
await self._trace_transcription(text, is_final=True, language=None)
await self.stop_processing_metrics()
await self._trace_transcription(text, is_final=True, language=self._settings.language)

View File

@@ -720,11 +720,6 @@ class TTSService(AIService):
self._turn_context_id = self.create_context_id()
await self.push_frame(frame, direction)
elif isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
# We pause processing incoming frames if the LLM response included
# text (it might be that it's only a function calling response). We
# pause to avoid audio overlapping.
await self._maybe_pause_frame_processing()
# Flush any remaining text (including text waiting for lookahead)
remaining = await self._text_aggregator.flush()
# Stop the aggregation metric (no-op if already stopped on first sentence).
@@ -732,6 +727,11 @@ class TTSService(AIService):
if remaining:
await self._push_tts_frames(AggregatedTextFrame(remaining.text, remaining.type))
# We pause processing incoming frames if the LLM response included
# text (it might be that it's only a function calling response). We
# pause to avoid audio overlapping.
await self._maybe_pause_frame_processing()
# Log accumulated streamed text and emit aggregated usage metric.
if self._streamed_text:
logger.debug(f"{self}: Generating TTS [{self._streamed_text}]")

View File

@@ -241,6 +241,7 @@ class TavusTransportClient:
on_dialout_stopped=partial(self._on_handle_callback, "on_dialout_stopped"),
on_dialout_error=partial(self._on_handle_callback, "on_dialout_error"),
on_dialout_warning=partial(self._on_handle_callback, "on_dialout_warning"),
on_dtmf_event=partial(self._on_handle_callback, "on_dtmf_event"),
on_participant_joined=self._callbacks.on_participant_joined,
on_participant_left=self._callbacks.on_participant_left,
on_participant_updated=partial(self._on_handle_callback, "on_participant_updated"),

View File

@@ -0,0 +1,24 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Shared result type for user turn strategy frame processing."""
from enum import Enum
class ProcessFrameResult(Enum):
"""Result of processing a frame in a user turn strategy.
Controls whether the strategy loop in the controller continues to the
next strategy or stops early.
Attributes:
CONTINUE: Continue to the next strategy in the loop.
STOP: Stop evaluating further strategies for this frame.
"""
CONTINUE = "continue"
STOP = "stop"

View File

@@ -9,6 +9,7 @@ from .external_user_turn_start_strategy import ExternalUserTurnStartStrategy
from .min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy
from .transcription_user_turn_start_strategy import TranscriptionUserTurnStartStrategy
from .vad_user_turn_start_strategy import VADUserTurnStartStrategy
from .wake_phrase_user_turn_start_strategy import WakePhraseUserTurnStartStrategy
__all__ = [
"BaseUserTurnStartStrategy",
@@ -17,4 +18,5 @@ __all__ = [
"TranscriptionUserTurnStartStrategy",
"UserTurnStartedParams",
"VADUserTurnStartStrategy",
"WakePhraseUserTurnStartStrategy",
]

View File

@@ -11,6 +11,7 @@ from typing import Optional, Type
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.turns.types import ProcessFrameResult
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -76,6 +77,7 @@ class BaseUserTurnStartStrategy(BaseObject):
self._register_event_handler("on_push_frame", sync=True)
self._register_event_handler("on_broadcast_frame", sync=True)
self._register_event_handler("on_user_turn_started", sync=True)
self._register_event_handler("on_reset_aggregation", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
@@ -100,7 +102,7 @@ class BaseUserTurnStartStrategy(BaseObject):
"""Reset the strategy to its initial state."""
pass
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame.
Subclasses should override this to implement logic that decides whether
@@ -108,6 +110,10 @@ class BaseUserTurnStartStrategy(BaseObject):
Args:
frame: The frame to be processed.
Returns:
A ProcessFrameResult indicating the outcome. Subclasses that return
None are treated as CONTINUE for backward compatibility.
"""
pass
@@ -138,3 +144,7 @@ class BaseUserTurnStartStrategy(BaseObject):
enable_user_speaking_frames=self._enable_user_speaking_frames,
),
)
async def trigger_reset_aggregation(self):
"""Trigger the `on_reset_aggregation` event."""
await self._call_event_handler("on_reset_aggregation")

View File

@@ -7,6 +7,7 @@
"""User turn start strategy triggered by externally emitted frames."""
from pipecat.frames.frames import Frame, UserStartedSpeakingFrame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
@@ -27,13 +28,17 @@ class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""
super().__init__(enable_interruptions=False, enable_user_speaking_frames=False, **kwargs)
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect user turn start.
Args:
frame: The frame to be analyzed.
"""
await super().process_frame(frame)
Returns:
STOP if a user started speaking frame was received, CONTINUE otherwise.
"""
if isinstance(frame, UserStartedSpeakingFrame):
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
return ProcessFrameResult.CONTINUE

View File

@@ -15,6 +15,7 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
TranscriptionFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
@@ -47,7 +48,7 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
await super().reset()
self._bot_speaking = False
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect the start of a user turn.
This method updates internal state based on transcription frames and
@@ -55,17 +56,20 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
Args:
frame: The frame to be analyzed.
"""
await super().process_frame(frame)
Returns:
STOP if the minimum word count was reached, CONTINUE otherwise.
"""
if isinstance(frame, BotStartedSpeakingFrame):
await self._handle_bot_started_speaking(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(frame)
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
return await self._handle_transcription(frame)
elif isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
await self._handle_transcription(frame)
return await self._handle_transcription(frame)
return ProcessFrameResult.CONTINUE
async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame):
"""Handle bot started speaking frame.
@@ -87,11 +91,16 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""
self._bot_speaking = False
async def _handle_transcription(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
"""Handle a completed transcription frame and check word count.
async def _handle_transcription(
self, frame: TranscriptionFrame | InterimTranscriptionFrame
) -> ProcessFrameResult:
"""Handle a transcription frame and check word count.
Args:
frame: The transcription frame to be processed.
Returns:
STOP if the minimum word count was reached, CONTINUE otherwise.
"""
min_words = self._min_words if self._bot_speaking else 1
@@ -106,3 +115,7 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
if should_trigger:
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
await self.trigger_reset_aggregation()
return ProcessFrameResult.CONTINUE

View File

@@ -7,6 +7,7 @@
"""User turn start strategy based on transcriptions."""
from pipecat.frames.frames import Frame, InterimTranscriptionFrame, TranscriptionFrame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
@@ -25,15 +26,20 @@ class TranscriptionUserTurnStartStrategy(BaseUserTurnStartStrategy):
super().__init__(**kwargs)
self._use_interim = use_interim
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect the start of a user turn.
Args:
frame: The frame to be processed.
"""
await super().process_frame(frame)
Returns:
STOP if a transcription was received, CONTINUE otherwise.
"""
if isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
elif isinstance(frame, TranscriptionFrame):
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
return ProcessFrameResult.CONTINUE

View File

@@ -7,6 +7,7 @@
"""User turn start strategy based on VAD events."""
from pipecat.frames.frames import Frame, VADUserStartedSpeakingFrame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
@@ -18,13 +19,17 @@ class VADUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect user turn start.
Args:
frame: The frame to be analyzed.
"""
await super().process_frame(frame)
Returns:
STOP if the user started speaking, CONTINUE otherwise.
"""
if isinstance(frame, VADUserStartedSpeakingFrame):
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
return ProcessFrameResult.CONTINUE

View File

@@ -0,0 +1,281 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn start strategy that gates interaction behind wake phrase detection."""
import asyncio
import enum
import re
from typing import List, Optional
from loguru import logger
from pipecat.frames.frames import (
BotSpeakingFrame,
Frame,
TranscriptionFrame,
UserSpeakingFrame,
VADUserStartedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class _WakeState(enum.Enum):
"""Internal state for wake phrase detection."""
IDLE = "idle"
AWAKE = "awake"
class WakePhraseUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""User turn start strategy that requires a wake phrase before interaction.
Blocks subsequent strategies until a wake phrase is detected in a final
transcription. After detection, allows interaction for a configurable
timeout period before requiring the wake phrase again. Use
``single_activation=True`` to require the wake phrase before every turn.
This strategy should be placed first in the start strategies list.
Event handlers available:
- on_wake_phrase_detected: Called when a wake phrase is matched.
- on_wake_phrase_timeout: Called when the inactivity timeout expires
(timeout mode only).
Example::
# Timeout mode (default): wake phrase unlocks interaction for 10s
strategy = WakePhraseUserTurnStartStrategy(
phrases=["hey pipecat", "ok pipecat"],
timeout=10.0,
)
# Single activation: wake phrase required before every turn
strategy = WakePhraseUserTurnStartStrategy(
phrases=["hey pipecat"],
single_activation=True,
)
@strategy.event_handler("on_wake_phrase_detected")
async def on_wake_phrase_detected(strategy, phrase):
...
@strategy.event_handler("on_wake_phrase_timeout")
async def on_wake_phrase_timeout(strategy):
...
Args:
phrases: List of wake phrases to detect.
timeout: Inactivity timeout in seconds before returning to IDLE.
In timeout mode, the timer resets on activity (user, bot speech).
In single activation mode, acts as a keepalive window — the strategy
stays AWAKE for this duration after wake phrase detection, allowing
the current turn to complete before returning to IDLE.
single_activation: If True, the wake phrase is required before every
turn. The strategy returns to IDLE after each turn completes.
**kwargs: Additional keyword arguments passed to parent.
"""
def __init__(
self,
*,
phrases: List[str],
timeout: float = 10.0,
single_activation: bool = False,
**kwargs,
):
"""Initialize the wake phrase user turn start strategy.
Args:
phrases: List of wake phrases to detect.
timeout: Inactivity timeout in seconds before returning to IDLE.
In timeout mode, the timer resets on activity. In single activation
mode, acts as a keepalive window after wake phrase detection.
single_activation: If True, the wake phrase is required before every
turn. The strategy returns to IDLE after each turn completes.
**kwargs: Additional keyword arguments passed to parent.
"""
super().__init__(**kwargs)
self._phrases = phrases
self._timeout = timeout
self._single_activation = single_activation
self._patterns: List[re.Pattern] = []
for phrase in phrases:
pattern = re.compile(
r"\b" + r"\s*".join(re.escape(word) for word in phrase.split()) + r"\b",
re.IGNORECASE,
)
self._patterns.append(pattern)
self._state = _WakeState.IDLE
self._accumulated_text = ""
self._timeout_event = asyncio.Event()
self._timeout_task: Optional[asyncio.Task] = None
self._register_event_handler("on_wake_phrase_detected")
self._register_event_handler("on_wake_phrase_timeout")
@property
def state(self) -> _WakeState:
"""Returns the current wake state."""
return self._state
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
await super().setup(task_manager)
if not self._timeout_task:
self._timeout_task = self.task_manager.create_task(
self._timeout_task_handler(),
f"{self}::_timeout_task_handler",
)
async def cleanup(self):
"""Cleanup the strategy."""
await super().cleanup()
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
async def reset(self):
"""Reset the strategy.
In timeout mode, preserves state and refreshes timeout since reset
means a turn started (activity). In single activation mode, does
nothing — the keepalive timeout (started when the wake phrase was
detected) handles the transition back to IDLE.
"""
await super().reset()
if self._state == _WakeState.AWAKE:
if not self._single_activation:
self._refresh_timeout()
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame for wake phrase detection or passthrough.
Args:
frame: The frame to be processed.
Returns:
STOP when the wake phrase is detected or when in IDLE state
(blocks subsequent strategies), CONTINUE when in AWAKE state
(allows subsequent strategies to proceed).
"""
await super().process_frame(frame)
if self._state == _WakeState.IDLE:
return await self._process_idle(frame)
else:
return await self._process_awake(frame)
async def _process_idle(self, frame: Frame) -> ProcessFrameResult:
"""Process a frame while in IDLE state.
Only final ``TranscriptionFrame`` instances are checked for wake phrase
matches. When a match is found, a user turn start is triggered.
Transcription frames that don't match have their text cleared so that
pre-wake-phrase speech is not added to the LLM context. All frames
return STOP to block subsequent strategies.
"""
if isinstance(frame, TranscriptionFrame):
if self._check_wake_phrase(frame.text):
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
await self.trigger_reset_aggregation()
return ProcessFrameResult.STOP
async def _process_awake(self, frame: Frame) -> ProcessFrameResult:
"""Process a frame while in AWAKE state.
Refreshes the timeout on activity frames (timeout mode only). Returns
CONTINUE so subsequent strategies can process the frame.
"""
if not self._single_activation:
if isinstance(frame, (UserSpeakingFrame, BotSpeakingFrame)):
self._refresh_timeout()
elif isinstance(frame, TranscriptionFrame):
self._refresh_timeout()
elif isinstance(frame, VADUserStartedSpeakingFrame):
self._refresh_timeout()
return ProcessFrameResult.CONTINUE
@staticmethod
def _strip_punctuation(text: str) -> str:
"""Strip punctuation from text, keeping only letters, digits, and whitespace."""
return re.sub(r"[^\w\s]", "", text)
def _check_wake_phrase(self, text: str) -> bool:
"""Check if the accumulated text contains a wake phrase.
Punctuation is stripped before matching so that STT output like
"Hey, Pipecat!" still matches the phrase "hey pipecat".
Args:
text: New transcription text to append and check.
Returns:
True if a wake phrase was found, False otherwise.
"""
self._accumulated_text += " " + self._strip_punctuation(text)
# Cap accumulated text to prevent unbounded growth.
if len(self._accumulated_text) > 250:
self._accumulated_text = self._accumulated_text[-250:]
for i, pattern in enumerate(self._patterns):
if pattern.search(self._accumulated_text):
phrase = self._phrases[i]
logger.debug(f"{self} wake phrase detected: {phrase!r}")
self._transition_to_awake(phrase)
return True
return False
def _transition_to_awake(self, phrase: str):
"""Transition from IDLE to AWAKE state."""
self._state = _WakeState.AWAKE
self._accumulated_text = ""
self._refresh_timeout()
self.task_manager.create_task(
self._call_event_handler("on_wake_phrase_detected", phrase),
f"{self}::on_wake_phrase_detected",
)
def _transition_to_idle(self):
"""Transition from AWAKE to IDLE state."""
logger.debug(f"{self} wake phrase timeout, returning to IDLE")
self._state = _WakeState.IDLE
self._accumulated_text = ""
self.task_manager.create_task(
self._call_event_handler("on_wake_phrase_timeout"),
f"{self}::on_wake_phrase_timeout",
)
def _refresh_timeout(self):
"""Refresh the inactivity timeout."""
self._timeout_event.set()
async def _timeout_task_handler(self):
"""Background task that monitors inactivity timeout."""
while True:
try:
await asyncio.wait_for(
self._timeout_event.wait(),
timeout=self._timeout,
)
self._timeout_event.clear()
except asyncio.TimeoutError:
if self._state == _WakeState.AWAKE:
self._transition_to_idle()

View File

@@ -11,6 +11,7 @@ from typing import Optional, Type
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.turns.types import ProcessFrameResult
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -89,7 +90,7 @@ class BaseUserTurnStopStrategy(BaseObject):
"""Reset the strategy to its initial state."""
pass
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to decide whether the user stopped speaking.
Subclasses should override this to implement logic that decides whether
@@ -97,6 +98,10 @@ class BaseUserTurnStopStrategy(BaseObject):
Args:
frame: The frame to be analyzed.
Returns:
A ProcessFrameResult indicating the outcome. Subclasses that return
None are treated as CONTINUE for backward compatibility.
"""
pass

View File

@@ -16,6 +16,7 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@@ -69,7 +70,7 @@ class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
await self.task_manager.cancel_task(self._task)
self._task = None
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to update strategy state.
Updates internal transcription text and VAD state. The user end turn
@@ -78,6 +79,8 @@ class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
Args:
frame: The frame to be analyzed.
Returns:
Always returns CONTINUE so subsequent stop strategies are evaluated.
"""
if isinstance(frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(frame)
@@ -88,6 +91,8 @@ class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
return ProcessFrameResult.CONTINUE
async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame):
"""Handle when the external service indicates the user is speaking."""
self._user_speaking = True

View File

@@ -17,6 +17,7 @@ from pipecat.frames.frames import (
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@@ -83,7 +84,7 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to update strategy state.
Updates internal transcription text and VAD state. The user end turn
@@ -92,6 +93,8 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
Args:
frame: The frame to be analyzed.
Returns:
Always returns CONTINUE so subsequent stop strategies are evaluated.
"""
if isinstance(frame, STTMetadataFrame):
self._stt_timeout = frame.ttfs_p99_latency
@@ -102,6 +105,8 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
return ProcessFrameResult.CONTINUE
async def _handle_vad_user_started_speaking(self, _: VADUserStartedSpeakingFrame):
"""Handle when the VAD indicates the user is speaking."""
self._vad_user_speaking = True

View File

@@ -22,6 +22,7 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import MetricsData
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@@ -88,11 +89,14 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
async def process_frame(self, frame: Frame):
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to update the turn analyzer and strategy state.
Args:
frame: The frame to be analyzed.
Returns:
Always returns CONTINUE so subsequent stop strategies are evaluated.
"""
await super().process_frame(frame)
@@ -109,6 +113,8 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
return ProcessFrameResult.CONTINUE
async def _start(self, frame: StartFrame):
"""Process the start frame to configure the turn analyzer."""
self._turn_analyzer.set_sample_rate(frame.audio_in_sample_rate)

View File

@@ -19,7 +19,11 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start import (
BaseUserTurnStartStrategy,
UserTurnStartedParams,
)
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@@ -94,6 +98,7 @@ class UserTurnController(BaseObject):
self._register_event_handler("on_user_turn_started", sync=True)
self._register_event_handler("on_user_turn_stopped", sync=True)
self._register_event_handler("on_user_turn_stop_timeout", sync=True)
self._register_event_handler("on_reset_aggregation", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
@@ -161,10 +166,14 @@ class UserTurnController(BaseObject):
await self._handle_transcription(frame)
for strategy in self._user_turn_strategies.start or []:
await strategy.process_frame(frame)
result = await strategy.process_frame(frame)
if result == ProcessFrameResult.STOP:
break
for strategy in self._user_turn_strategies.stop or []:
await strategy.process_frame(frame)
result = await strategy.process_frame(frame)
if result == ProcessFrameResult.STOP:
break
async def _setup_strategies(self):
for s in self._user_turn_strategies.start or []:
@@ -172,6 +181,7 @@ class UserTurnController(BaseObject):
s.add_event_handler("on_push_frame", self._on_push_frame)
s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
s.add_event_handler("on_user_turn_started", self._on_user_turn_started)
s.add_event_handler("on_reset_aggregation", self._on_reset_aggregation)
for s in self._user_turn_strategies.stop or []:
await s.setup(self.task_manager)
@@ -242,6 +252,9 @@ class UserTurnController(BaseObject):
):
await self._trigger_user_turn_stop(strategy, params)
async def _on_reset_aggregation(self, strategy: BaseUserTurnStartStrategy):
await self._call_event_handler("on_reset_aggregation", strategy)
async def _trigger_user_turn_start(
self, strategy: Optional[BaseUserTurnStartStrategy], params: UserTurnStartedParams
):

View File

@@ -23,6 +23,31 @@ from pipecat.turns.user_stop import (
)
def default_user_turn_start_strategies() -> List[BaseUserTurnStartStrategy]:
"""Return the default user turn start strategies.
Returns ``[VADUserTurnStartStrategy, TranscriptionUserTurnStartStrategy]``.
Useful when building a custom strategy list that extends the defaults.
Example::
start_strategies = [
WakePhraseUserTurnStartStrategy(phrases=["hey pipecat"]),
*default_user_turn_start_strategies(),
]
"""
return [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()]
def default_user_turn_stop_strategies() -> List[BaseUserTurnStopStrategy]:
"""Return the default user turn stop strategies.
Returns ``[TurnAnalyzerUserTurnStopStrategy(LocalSmartTurnAnalyzerV3)]``.
Useful when building a custom strategy list that extends the defaults.
"""
return [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
@dataclass
class UserTurnStrategies:
"""Container for user turn start and stop strategies.
@@ -45,9 +70,9 @@ class UserTurnStrategies:
def __post_init__(self):
if not self.start:
self.start = [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()]
self.start = default_user_turn_start_strategies()
if not self.stop:
self.stop = [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
self.stop = default_user_turn_stop_strategies()
@dataclass

View File

@@ -0,0 +1,346 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.frames.frames import (
BotSpeakingFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
UserSpeakingFrame,
VADUserStartedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.wake_phrase_user_turn_start_strategy import (
WakePhraseUserTurnStartStrategy,
_WakeState,
)
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
class TestWakePhraseUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
def _create_strategy(self, **kwargs) -> WakePhraseUserTurnStartStrategy:
kwargs.setdefault("phrases", ["hey pipecat"])
kwargs.setdefault("timeout", 10.0)
return WakePhraseUserTurnStartStrategy(**kwargs)
async def _setup_strategy(self, strategy: WakePhraseUserTurnStartStrategy):
task_manager = TaskManager()
loop = asyncio.get_running_loop()
task_manager.setup(TaskManagerParams(loop=loop))
await strategy.setup(task_manager)
return task_manager
async def test_wake_phrase_in_final_transcription(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
result = await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.cleanup()
async def test_interim_transcription_ignored(self):
"""Interim transcriptions are never used for wake phrase matching."""
strategy = self._create_strategy()
await self._setup_strategy(strategy)
result = await strategy.process_frame(
InterimTranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.IDLE)
await strategy.cleanup()
async def test_no_wake_phrase_returns_stop(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
result = await strategy.process_frame(
TranscriptionFrame(text="hello world", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.IDLE)
await strategy.cleanup()
async def test_non_matching_text_resets_aggregation(self):
"""Non-matching transcription triggers aggregation reset to prevent LLM context pollution."""
strategy = self._create_strategy()
await self._setup_strategy(strategy)
reset_called = False
@strategy.event_handler("on_reset_aggregation")
async def on_reset_aggregation(strategy):
nonlocal reset_called
reset_called = True
await strategy.process_frame(
TranscriptionFrame(text="hello world", user_id="user1", timestamp="")
)
self.assertTrue(reset_called)
await strategy.cleanup()
async def test_vad_frame_returns_stop_in_listening(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.IDLE)
await strategy.cleanup()
async def test_inactive_returns_continue(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
# Trigger wake phrase first.
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Subsequent frames should return CONTINUE.
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(result, ProcessFrameResult.CONTINUE)
result = await strategy.process_frame(
TranscriptionFrame(text="what is the weather", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.CONTINUE)
await strategy.cleanup()
async def test_accumulation_across_frames(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
result = await strategy.process_frame(
TranscriptionFrame(text="hey", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.IDLE)
result = await strategy.process_frame(
TranscriptionFrame(text="pipecat", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.cleanup()
async def test_multiple_phrases(self):
strategy = self._create_strategy(phrases=["hey pipecat", "ok computer"])
await self._setup_strategy(strategy)
result = await strategy.process_frame(
TranscriptionFrame(text="ok computer", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.cleanup()
async def test_punctuation_stripped(self):
"""STT punctuation like 'Hey, Pipecat!' should still match."""
strategy = self._create_strategy()
await self._setup_strategy(strategy)
result = await strategy.process_frame(
TranscriptionFrame(text="Hey, Pipecat!", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.cleanup()
async def test_reset_preserves_inactive_state(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.reset()
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.cleanup()
async def test_timeout_returns_to_listening(self):
strategy = self._create_strategy(timeout=0.1)
await self._setup_strategy(strategy)
# Trigger wake phrase.
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Wait for timeout to expire.
await asyncio.sleep(0.3)
self.assertEqual(strategy.state, _WakeState.IDLE)
await strategy.cleanup()
async def test_activity_refreshes_timeout(self):
strategy = self._create_strategy(timeout=0.2)
await self._setup_strategy(strategy)
# Trigger wake phrase.
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Send activity before timeout.
await asyncio.sleep(0.1)
await strategy.process_frame(UserSpeakingFrame())
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Send more activity.
await asyncio.sleep(0.1)
await strategy.process_frame(BotSpeakingFrame())
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Wait for timeout to expire after last activity.
await asyncio.sleep(0.3)
self.assertEqual(strategy.state, _WakeState.IDLE)
await strategy.cleanup()
async def test_wake_phrase_detected_event(self):
strategy = self._create_strategy()
await self._setup_strategy(strategy)
detected_phrase = None
@strategy.event_handler("on_wake_phrase_detected")
async def on_wake_phrase_detected(strategy, phrase):
nonlocal detected_phrase
detected_phrase = phrase
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
# Event fires in a background task, give it a moment.
await asyncio.sleep(0.05)
self.assertEqual(detected_phrase, "hey pipecat")
await strategy.cleanup()
async def test_wake_phrase_timeout_event(self):
strategy = self._create_strategy(timeout=0.1)
await self._setup_strategy(strategy)
timeout_fired = False
@strategy.event_handler("on_wake_phrase_timeout")
async def on_wake_phrase_timeout(strategy):
nonlocal timeout_fired
timeout_fired = True
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
# Wait for timeout.
await asyncio.sleep(0.3)
self.assertTrue(timeout_fired)
await strategy.cleanup()
async def test_single_activation_stays_inactive_after_reset(self):
"""In single activation mode, reset() keeps INACTIVE so the current turn can finish."""
strategy = self._create_strategy(single_activation=True, timeout=0.5)
await self._setup_strategy(strategy)
# Trigger wake phrase.
result = await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Simulate turn start (controller calls reset on all start strategies).
await strategy.reset()
# State remains INACTIVE so frames continue to flow.
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Subsequent frames should pass through (CONTINUE).
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(result, ProcessFrameResult.CONTINUE)
result = await strategy.process_frame(
TranscriptionFrame(text="what is the weather", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.CONTINUE)
await strategy.cleanup()
async def test_single_activation_timeout_returns_to_listening(self):
"""In single activation mode, the keepalive timeout returns to LISTENING."""
strategy = self._create_strategy(single_activation=True, timeout=0.1)
await self._setup_strategy(strategy)
# Trigger wake phrase.
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(strategy.state, _WakeState.AWAKE)
# Wait for keepalive timeout to expire.
await asyncio.sleep(0.3)
self.assertEqual(strategy.state, _WakeState.IDLE)
# Frames should now be blocked again.
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(result, ProcessFrameResult.STOP)
await strategy.cleanup()
async def test_single_activation_requires_wake_phrase_after_timeout(self):
"""Single activation mode requires wake phrase again after keepalive timeout."""
strategy = self._create_strategy(single_activation=True, timeout=0.1)
await self._setup_strategy(strategy)
# First turn: wake phrase -> INACTIVE -> timeout -> LISTENING.
await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await asyncio.sleep(0.3)
self.assertEqual(strategy.state, _WakeState.IDLE)
# Without wake phrase, frames are blocked.
result = await strategy.process_frame(
TranscriptionFrame(text="what is the weather", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
# Second turn: wake phrase again.
result = await strategy.process_frame(
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
)
self.assertEqual(result, ProcessFrameResult.STOP)
self.assertEqual(strategy.state, _WakeState.AWAKE)
await strategy.cleanup()
if __name__ == "__main__":
unittest.main()