Compare commits
8 Commits
rtvi-send-
...
v0.0.106
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8750c26cdc | ||
|
|
3e0c536fe7 | ||
|
|
7ee5fa9e20 | ||
|
|
7dfcaf8096 | ||
|
|
4aea7784c9 | ||
|
|
bad10177d4 | ||
|
|
c4be513044 | ||
|
|
4b704e6d3a |
212
CHANGELOG.md
212
CHANGELOG.md
@@ -7,6 +7,218 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
<!-- towncrier release notes start -->
|
||||
|
||||
## [0.0.106] - 2026-03-18
|
||||
|
||||
### Added
|
||||
|
||||
- Added optional `service` field to `ServiceUpdateSettingsFrame` (and its
|
||||
subclasses `LLMUpdateSettingsFrame`, `TTSUpdateSettingsFrame`,
|
||||
`STTUpdateSettingsFrame`) to target a specific service instance. When
|
||||
`service` is set, only the matching service applies the settings; others
|
||||
forward the frame unchanged. This enables updating a single service when
|
||||
multiple services of the same type exist in the pipeline.
|
||||
(PR [#4004](https://github.com/pipecat-ai/pipecat/pull/4004))
|
||||
|
||||
- Added `sip_provider` and `room_geo` parameters to `configure()` in the Daily
|
||||
runner. These convenience parameters let callers specify a SIP provider name
|
||||
and geographic region directly without manually constructing
|
||||
`DailyRoomProperties` and `DailyRoomSipParams`.
|
||||
(PR [#4005](https://github.com/pipecat-ai/pipecat/pull/4005))
|
||||
|
||||
- Added `PerplexityLLMAdapter` that automatically transforms conversation
|
||||
messages to satisfy Perplexity's stricter API constraints (strict role
|
||||
alternation, no non-initial system messages, last message must be user/tool).
|
||||
Previously, certain conversation histories could cause Perplexity API errors
|
||||
that didn't occur with OpenAI (`PerplexityLLMService` subclasses
|
||||
`OpenAILLMService` since Perplexity uses an OpenAI-compatible API).
|
||||
(PR [#4009](https://github.com/pipecat-ai/pipecat/pull/4009))
|
||||
|
||||
- Added DTMF input event support to the Daily transport. Incoming DTMF tones
|
||||
are now received via Daily's `on_dtmf_event` callback and pushed into the
|
||||
pipeline as `InputDTMFFrame`, enabling bots to react to keypad presses from
|
||||
phone callers.
|
||||
(PR [#4047](https://github.com/pipecat-ai/pipecat/pull/4047))
|
||||
|
||||
- Added `WakePhraseUserTurnStartStrategy` for triggering user turns based on
|
||||
wake phrases, with support for `single_activation` mode. Deprecates
|
||||
`WakeCheckFilter`.
|
||||
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
|
||||
|
||||
- Added `default_user_turn_start_strategies()` and
|
||||
`default_user_turn_stop_strategies()` helper functions for composing custom
|
||||
strategy lists.
|
||||
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
|
||||
|
||||
### Changed
|
||||
|
||||
- Changed tool result JSON serialization to use `ensure_ascii=False`,
|
||||
preserving UTF-8 characters instead of escaping them. This reduces context
|
||||
size and token usage for non-English languages.
|
||||
(PR [#3457](https://github.com/pipecat-ai/pipecat/pull/3457))
|
||||
|
||||
- `OpenAIRealtimeSTTService`'s `noise_reduction` parameter is now part of
|
||||
`OpenAIRealtimeSTTSettings`, making it runtime-updatable via
|
||||
`STTUpdateSettingsFrame`. The direct `noise_reduction` init argument is
|
||||
deprecated as of 0.0.106.
|
||||
(PR [#3991](https://github.com/pipecat-ai/pipecat/pull/3991))
|
||||
|
||||
- Updated `sarvamai` dependency from `0.1.26a2` (alpha) to `0.1.26` (stable
|
||||
release).
|
||||
(PR [#3997](https://github.com/pipecat-ai/pipecat/pull/3997))
|
||||
|
||||
- `SimliVideoService` now extends `AIService` instead of `FrameProcessor`,
|
||||
aligning it with the HeyGen and Tavus video services. It supports
|
||||
`SimliVideoService.Settings(...)` for configuration and uses
|
||||
`start()`/`stop()`/`cancel()` lifecycle methods. Existing constructor usage
|
||||
(`api_key`, `face_id`, etc.) remains unchanged.
|
||||
(PR [#4001](https://github.com/pipecat-ai/pipecat/pull/4001))
|
||||
|
||||
- Update `pipecat-ai-small-webrtc-prebuilt` to `2.4.0`.
|
||||
(PR [#4023](https://github.com/pipecat-ai/pipecat/pull/4023))
|
||||
|
||||
- Nova Sonic assistant text transcripts are now delivered in real-time using
|
||||
speculative text events instead of delayed final text events. Previously,
|
||||
assistant text only arrived after all audio had finished playing, causing
|
||||
laggy transcripts in client UIs. Speculative text arrives before each audio
|
||||
chunk, providing text synchronized with what the bot is saying. This also
|
||||
simplifies the internal text handling by removing the interruption re-push
|
||||
hack and assistant text buffer.
|
||||
(PR [#4042](https://github.com/pipecat-ai/pipecat/pull/4042))
|
||||
|
||||
- Updated `daily-python` dependency to 0.25.0.
|
||||
(PR [#4047](https://github.com/pipecat-ai/pipecat/pull/4047))
|
||||
|
||||
- Added `enable_dialout` parameter to `configure()` in `pipecat.runner.daily`
|
||||
to support dial-out rooms. Also narrowed misleading `Optional` type hints and
|
||||
deduplicated token expiry calculation.
|
||||
(PR [#4048](https://github.com/pipecat-ai/pipecat/pull/4048))
|
||||
|
||||
- Extended `ProcessFrameResult` to stop strategies, allowing a stop strategy to
|
||||
short-circuit evaluation of subsequent strategies by returning `STOP`.
|
||||
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
|
||||
|
||||
- `GradiumSTTService` now takes both an `encoding` and `sample_rate`
|
||||
constructor argument which is assmebled in the class to form the
|
||||
`input_format`. PCM accepts `8000`, `16000`, and `24000` Hz sample rates.
|
||||
(PR [#4066](https://github.com/pipecat-ai/pipecat/pull/4066))
|
||||
|
||||
- Improved `GradiumSTTService` transcription accuracy by reworking how text
|
||||
fragments are accumulated and finalized. Previously, trailing words could be
|
||||
dropped when the server's `flushed` response arrived before all text tokens
|
||||
were delivered. The service now uses a short aggregation delay after flush to
|
||||
capture trailing tokens, producing complete utterances.
|
||||
(PR [#4066](https://github.com/pipecat-ai/pipecat/pull/4066))
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `SimliVideoService.InputParams` is deprecated. Use the direct constructor
|
||||
parameters `max_session_length`, `max_idle_time`, and `enable_logging`
|
||||
instead.
|
||||
(PR [#4001](https://github.com/pipecat-ai/pipecat/pull/4001))
|
||||
|
||||
- Deprecated `LocalSmartTurnAnalyzerV2` and `LocalCoreMLSmartTurnAnalyzer`. Use
|
||||
`LocalSmartTurnAnalyzerV3` instead. Instantiating these analyzers will now
|
||||
emit a `DeprecationWarning`.
|
||||
(PR [#4012](https://github.com/pipecat-ai/pipecat/pull/4012))
|
||||
|
||||
- Deprecated `WakeCheckFilter` in favor of `WakePhraseUserTurnStartStrategy`.
|
||||
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where the default model for `OpenAILLMService` and
|
||||
`AzureLLMService` was mistakenly reverted to `gpt-4o`. The defaults are now
|
||||
restored to `gpt-4.1`.
|
||||
(PR [#4000](https://github.com/pipecat-ai/pipecat/pull/4000))
|
||||
|
||||
- Fixed a race condition where `EndTaskFrame` could cause the pipeline to shut
|
||||
down before in-flight frames (e.g. LLM function call responses) finished
|
||||
processing. `EndTaskFrame` and `StopTaskFrame` now flow through the pipeline
|
||||
as `ControlFrame`s, ensuring all pending work is flushed before shutdown
|
||||
begins. `CancelTaskFrame` and `InterruptionTaskFrame` remain immediate
|
||||
(`SystemFrame`).
|
||||
(PR [#4006](https://github.com/pipecat-ai/pipecat/pull/4006))
|
||||
|
||||
- Fixed `ParallelPipeline` dropping or misordering frames during lifecycle
|
||||
synchronization. Buffered frames are now flushed in the correct order
|
||||
relative to synchronization frames (`StartFrame` goes first,
|
||||
`EndFrame`/`CancelFrame` go after), and frames added to the buffer during
|
||||
flush are also drained.
|
||||
(PR [#4007](https://github.com/pipecat-ai/pipecat/pull/4007))
|
||||
|
||||
- Fixed `TTSService` potentially canceling in-flight audio during shutdown. The
|
||||
stop sequence now waits for all queued audio contexts to finish processing
|
||||
before canceling the stop frame task.
|
||||
(PR [#4007](https://github.com/pipecat-ai/pipecat/pull/4007))
|
||||
|
||||
- Fixed `Language` enum values (e.g. `Language.ES`) not being converted to
|
||||
service-specific codes when passed via
|
||||
`settings=Service.Settings(language=Language.ES)` at init time. This caused
|
||||
API errors (e.g. 400 from Rime) because the raw enum was sent instead of the
|
||||
expected language code (e.g. `"spa"`). Runtime updates via
|
||||
`UpdateSettingsFrame` were unaffected. The fix centralizes conversion in the
|
||||
base `TTSService` and `STTService` classes so all services handle this
|
||||
consistently.
|
||||
(PR [#4024](https://github.com/pipecat-ai/pipecat/pull/4024))
|
||||
|
||||
- Fixed `DeepgramSTTService` ignoring the `base_url` scheme when using `ws://`
|
||||
or `http://`. Previously these were silently overwritten with `wss://` /
|
||||
`https://`, breaking air-gapped or private deployments that don't use TLS.
|
||||
All scheme choices (`wss://`, `https://`, `ws://`, `http://`, or bare
|
||||
hostname) are now respected.
|
||||
(PR [#4026](https://github.com/pipecat-ai/pipecat/pull/4026))
|
||||
|
||||
- Fixed `LLMSwitcher.register_function()` and `register_direct_function()` not
|
||||
accepting or forwarding the `timeout_secs` parameter.
|
||||
(PR [#4037](https://github.com/pipecat-ai/pipecat/pull/4037))
|
||||
|
||||
- Fixed empty user transcriptions in Nova Sonic causing spurious interruptions.
|
||||
Previously, an empty transcription could trigger an interruption of the
|
||||
assistant's response even though the user hadn't actually spoken.
|
||||
(PR [#4042](https://github.com/pipecat-ai/pipecat/pull/4042))
|
||||
|
||||
- Fixed `SonioxSTTService` and `OpenAIRealtimeSTTService` crash when language
|
||||
parameters contain plain strings instead of `Language` enum values.
|
||||
(PR [#4046](https://github.com/pipecat-ai/pipecat/pull/4046))
|
||||
|
||||
- Fixed premature user turn stops caused by late transcriptions arriving
|
||||
between turns. A stale transcript from the previous turn could persist into
|
||||
the next turn and trigger a stop before the current turn's real transcript
|
||||
arrived. Stop strategies are now reset at both turn start and turn stop to
|
||||
prevent state from leaking across turn boundaries.
|
||||
(PR [#4057](https://github.com/pipecat-ai/pipecat/pull/4057))
|
||||
|
||||
- Fixed raw language strings like `"de-DE"` silently failing when passed to
|
||||
TTS/STT services (e.g. ElevenLabs producing no audio). Raw strings now go
|
||||
through the same `Language` enum resolution as enum values, so regional codes
|
||||
like `"de-DE"` are properly converted to service-expected formats like
|
||||
`"de"`. Unrecognized strings log a warning instead of failing silently.
|
||||
(PR [#4058](https://github.com/pipecat-ai/pipecat/pull/4058))
|
||||
|
||||
- Fixed Deepgram STT list-type settings (`keyterm`, `keywords`, `search`,
|
||||
`redact`, `replace`) being stringified instead of passed as lists to the SDK,
|
||||
which caused them to be sent as literal strings (e.g. `"['pipecat']"`) in the
|
||||
WebSocket query params.
|
||||
(PR [#4063](https://github.com/pipecat-ai/pipecat/pull/4063))
|
||||
|
||||
- Fixed `MinWordsUserTurnStartStrategy` including text below the word threshold
|
||||
in the output by resetting aggregation when the minimum word count is not
|
||||
met.
|
||||
(PR [#4064](https://github.com/pipecat-ai/pipecat/pull/4064))
|
||||
|
||||
- Fixed audio overlap and potential dropped TTS content when multiple assistant
|
||||
turns occur in quick succession. `TTSService` now flushes remaining text
|
||||
before pausing frame processing on `LLMFullResponseEndFrame`/`EndFrame`,
|
||||
instead of pausing first.
|
||||
(PR [#4071](https://github.com/pipecat-ai/pipecat/pull/4071))
|
||||
|
||||
### Security
|
||||
|
||||
- Bumped PyJWT minimum version from 2.10.1 to 2.12.0 in the `livekit` extra to
|
||||
address CVE-2026-32597 (GHSA-752w-5fwx-jx9f), where PyJWT <= 2.11.0 accepted
|
||||
unknown `crit` header extensions.
|
||||
(PR [#4035](https://github.com/pipecat-ai/pipecat/pull/4035))
|
||||
|
||||
## [0.0.105] - 2026-03-10
|
||||
|
||||
### Added
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
- Changed tool result JSON serialization to use `ensure_ascii=False`, preserving UTF-8 characters instead of escaping them. This reduces context size and token usage for non-English languages.
|
||||
@@ -1 +0,0 @@
|
||||
- `OpenAIRealtimeSTTService`'s `noise_reduction` parameter is now part of `OpenAIRealtimeSTTSettings`, making it runtime-updatable via `STTUpdateSettingsFrame`. The direct `noise_reduction` init argument is deprecated as of 0.0.106.
|
||||
@@ -1 +0,0 @@
|
||||
- Updated `sarvamai` dependency from `0.1.26a2` (alpha) to `0.1.26` (stable release).
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed an issue where the default model for `OpenAILLMService` and `AzureLLMService` was mistakenly reverted to `gpt-4o`. The defaults are now restored to `gpt-4.1`.
|
||||
@@ -1 +0,0 @@
|
||||
- `SimliVideoService` now extends `AIService` instead of `FrameProcessor`, aligning it with the HeyGen and Tavus video services. It supports `SimliVideoService.Settings(...)` for configuration and uses `start()`/`stop()`/`cancel()` lifecycle methods. Existing constructor usage (`api_key`, `face_id`, etc.) remains unchanged.
|
||||
@@ -1 +0,0 @@
|
||||
- `SimliVideoService.InputParams` is deprecated. Use the direct constructor parameters `max_session_length`, `max_idle_time`, and `enable_logging` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- Added optional `service` field to `ServiceUpdateSettingsFrame` (and its subclasses `LLMUpdateSettingsFrame`, `TTSUpdateSettingsFrame`, `STTUpdateSettingsFrame`) to target a specific service instance. When `service` is set, only the matching service applies the settings; others forward the frame unchanged. This enables updating a single service when multiple services of the same type exist in the pipeline.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `sip_provider` and `room_geo` parameters to `configure()` in the Daily runner. These convenience parameters let callers specify a SIP provider name and geographic region directly without manually constructing `DailyRoomProperties` and `DailyRoomSipParams`.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed a race condition where `EndTaskFrame` could cause the pipeline to shut down before in-flight frames (e.g. LLM function call responses) finished processing. `EndTaskFrame` and `StopTaskFrame` now flow through the pipeline as `ControlFrame`s, ensuring all pending work is flushed before shutdown begins. `CancelTaskFrame` and `InterruptionTaskFrame` remain immediate (`SystemFrame`).
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `TTSService` potentially canceling in-flight audio during shutdown. The stop sequence now waits for all queued audio contexts to finish processing before canceling the stop frame task.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `ParallelPipeline` dropping or misordering frames during lifecycle synchronization. Buffered frames are now flushed in the correct order relative to synchronization frames (`StartFrame` goes first, `EndFrame`/`CancelFrame` go after), and frames added to the buffer during flush are also drained.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `PerplexityLLMAdapter` that automatically transforms conversation messages to satisfy Perplexity's stricter API constraints (strict role alternation, no non-initial system messages, last message must be user/tool). Previously, certain conversation histories could cause Perplexity API errors that didn't occur with OpenAI (`PerplexityLLMService` subclasses `OpenAILLMService` since Perplexity uses an OpenAI-compatible API).
|
||||
@@ -1 +0,0 @@
|
||||
- Deprecated `LocalSmartTurnAnalyzerV2` and `LocalCoreMLSmartTurnAnalyzer`. Use `LocalSmartTurnAnalyzerV3` instead. Instantiating these analyzers will now emit a `DeprecationWarning`.
|
||||
@@ -1 +0,0 @@
|
||||
- Update `pipecat-ai-small-webrtc-prebuilt` to `2.4.0`.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `Language` enum values (e.g. `Language.ES`) not being converted to service-specific codes when passed via `settings=Service.Settings(language=Language.ES)` at init time. This caused API errors (e.g. 400 from Rime) because the raw enum was sent instead of the expected language code (e.g. `"spa"`). Runtime updates via `UpdateSettingsFrame` were unaffected. The fix centralizes conversion in the base `TTSService` and `STTService` classes so all services handle this consistently.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `DeepgramSTTService` ignoring the `base_url` scheme when using `ws://` or `http://`. Previously these were silently overwritten with `wss://` / `https://`, breaking air-gapped or private deployments that don't use TLS. All scheme choices (`wss://`, `https://`, `ws://`, `http://`, or bare hostname) are now respected.
|
||||
@@ -1 +0,0 @@
|
||||
- Bumped PyJWT minimum version from 2.10.1 to 2.12.0 in the `livekit` extra to address CVE-2026-32597 (GHSA-752w-5fwx-jx9f), where PyJWT <= 2.11.0 accepted unknown `crit` header extensions.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `LLMSwitcher.register_function()` and `register_direct_function()` not accepting or forwarding the `timeout_secs` parameter.
|
||||
@@ -1 +0,0 @@
|
||||
Fixed `SonioxSTTService` and `OpenAIRealtimeSTTService` crash when language parameters contain plain strings instead of `Language` enum values.
|
||||
@@ -1 +0,0 @@
|
||||
- Added DTMF input event support to the Daily transport. Incoming DTMF tones are now received via Daily's `on_dtmf_event` callback and pushed into the pipeline as `InputDTMFFrame`, enabling bots to react to keypad presses from phone callers.
|
||||
@@ -1 +0,0 @@
|
||||
- Updated `daily-python` dependency to 0.25.0.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `enable_dialout` parameter to `configure()` in `pipecat.runner.daily` to support dial-out rooms. Also narrowed misleading `Optional` type hints and deduplicated token expiry calculation.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed premature user turn stops caused by late transcriptions arriving between turns. A stale transcript from the previous turn could persist into the next turn and trigger a stop before the current turn's real transcript arrived. Stop strategies are now reset at both turn start and turn stop to prevent state from leaking across turn boundaries.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed raw language strings like `"de-DE"` silently failing when passed to TTS/STT services (e.g. ElevenLabs producing no audio). Raw strings now go through the same `Language` enum resolution as enum values, so regional codes like `"de-DE"` are properly converted to service-expected formats like `"de"`. Unrecognized strings log a warning instead of failing silently.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed Deepgram STT list-type settings (`keyterm`, `keywords`, `search`, `redact`, `replace`) being stringified instead of passed as lists to the SDK, which caused them to be sent as literal strings (e.g. `"['pipecat']"`) in the WebSocket query params.
|
||||
@@ -19,7 +19,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
@@ -28,6 +27,11 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.user_start import WakePhraseUserTurnStartStrategy
|
||||
from pipecat.turns.user_turn_strategies import (
|
||||
UserTurnStrategies,
|
||||
default_user_turn_start_strategies,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -52,7 +56,12 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
settings=DeepgramSTTService.Settings(
|
||||
keyterm=["pipecat"],
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
@@ -68,19 +77,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
),
|
||||
)
|
||||
|
||||
hey_robot_filter = WakeCheckFilter(["hey robot", "hey, robot"])
|
||||
|
||||
context = LLMContext()
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
user_params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
start=[
|
||||
WakePhraseUserTurnStartStrategy(
|
||||
phrases=["pipecat"],
|
||||
# Timeout before wake phrase must be spoken again
|
||||
timeout=5.0,
|
||||
),
|
||||
*default_user_turn_start_strategies(),
|
||||
]
|
||||
),
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
hey_robot_filter, # Filter out speech not directed at the robot
|
||||
stt,
|
||||
user_aggregator, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
@@ -102,12 +120,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Please introduce yourself. Tell the user they should say 'Hey Robot' before talking to you.",
|
||||
}
|
||||
)
|
||||
context.add_message({"role": "user", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -447,6 +447,9 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
|
||||
)
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_reset_aggregation", self._on_reset_aggregation
|
||||
)
|
||||
|
||||
self._user_idle_controller = UserIdleController(
|
||||
user_idle_timeout=self._params.user_idle_timeout
|
||||
@@ -748,6 +751,12 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
await self._maybe_emit_user_turn_stopped(strategy)
|
||||
|
||||
async def _on_reset_aggregation(
|
||||
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
|
||||
):
|
||||
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
|
||||
await self.reset()
|
||||
|
||||
async def _on_user_turn_stop_timeout(self, controller):
|
||||
await self._call_event_handler("on_user_turn_stop_timeout")
|
||||
|
||||
|
||||
@@ -6,6 +6,9 @@
|
||||
|
||||
"""Wake phrase detection filter for Pipecat transcription processing.
|
||||
|
||||
.. deprecated:: 0.0.106
|
||||
Use :class:`~pipecat.turns.user_start.WakePhraseUserTurnStartStrategy` instead.
|
||||
|
||||
This module provides a frame processor that filters transcription frames,
|
||||
only allowing them through after wake phrases have been detected. Includes
|
||||
keepalive functionality to maintain conversation flow after wake detection.
|
||||
@@ -13,6 +16,7 @@ keepalive functionality to maintain conversation flow after wake detection.
|
||||
|
||||
import re
|
||||
import time
|
||||
import warnings
|
||||
from enum import Enum
|
||||
from typing import List
|
||||
|
||||
@@ -25,6 +29,11 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
class WakeCheckFilter(FrameProcessor):
|
||||
"""Frame processor that filters transcription frames based on wake phrase detection.
|
||||
|
||||
.. deprecated:: 0.0.106
|
||||
Use :class:`~pipecat.turns.user_start.WakePhraseUserTurnStartStrategy` instead,
|
||||
which integrates with the user turn strategy system and supports configurable
|
||||
timeouts and single-activation mode.
|
||||
|
||||
This filter monitors transcription frames for configured wake phrases and only
|
||||
passes frames through after a wake phrase has been detected. Maintains a
|
||||
keepalive timeout to allow continued conversation after wake detection.
|
||||
@@ -65,12 +74,21 @@ class WakeCheckFilter(FrameProcessor):
|
||||
def __init__(self, wake_phrases: List[str], keepalive_timeout: float = 3):
|
||||
"""Initialize the wake phrase filter.
|
||||
|
||||
.. deprecated:: 0.0.106
|
||||
Use :class:`~pipecat.turns.user_start.WakePhraseUserTurnStartStrategy` instead.
|
||||
|
||||
Args:
|
||||
wake_phrases: List of wake phrases to detect in transcriptions.
|
||||
keepalive_timeout: Duration in seconds to keep passing frames after
|
||||
wake detection. Defaults to 3 seconds.
|
||||
"""
|
||||
super().__init__()
|
||||
warnings.warn(
|
||||
"WakeCheckFilter is deprecated since v0.0.106. "
|
||||
"Use WakePhraseUserTurnStartStrategy instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
self._participant_states = {}
|
||||
self._keepalive_timeout = keepalive_timeout
|
||||
self._wake_patterns = []
|
||||
|
||||
@@ -27,8 +27,8 @@ from pydantic import BaseModel, Field
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
|
||||
from pipecat.frames.frames import (
|
||||
AggregatedTextFrame,
|
||||
AggregationType,
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
@@ -424,18 +424,16 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._input_audio_content_name: Optional[str] = None
|
||||
self._content_being_received: Optional[CurrentContent] = None
|
||||
self._assistant_is_responding = False
|
||||
self._may_need_repush_assistant_text = False
|
||||
self._ready_to_send_context = False
|
||||
self._handling_bot_stopped_speaking = False
|
||||
self._triggering_assistant_response = False
|
||||
self._waiting_for_trigger_transcription = False
|
||||
self._disconnecting = False
|
||||
self._connected_time: Optional[float] = None
|
||||
self._wants_connection = False
|
||||
self._user_text_buffer = ""
|
||||
self._assistant_text_buffer = ""
|
||||
self._completed_tool_calls = set()
|
||||
self._audio_input_started = False
|
||||
self._pending_speculative_text: Optional[str] = None
|
||||
|
||||
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
|
||||
with wave.open(file_path.open("rb"), "rb") as wav_file:
|
||||
@@ -505,11 +503,13 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
async def reset_conversation(self):
|
||||
"""Reset the conversation state while preserving context.
|
||||
|
||||
Handles bot stopped speaking event, disconnects from the service,
|
||||
and reconnects with the preserved context.
|
||||
Cleans up any in-progress assistant response, disconnects from the
|
||||
service, and reconnects with the preserved context.
|
||||
"""
|
||||
logger.debug("Resetting conversation")
|
||||
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
|
||||
if self._assistant_is_responding:
|
||||
self._assistant_is_responding = False
|
||||
await self._report_assistant_response_ended()
|
||||
|
||||
# Grab context to carry through disconnect/reconnect
|
||||
context = self._context
|
||||
@@ -540,8 +540,6 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
await self._handle_context(context)
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
await self._handle_input_audio_frame(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
|
||||
elif isinstance(frame, InterruptionFrame):
|
||||
await self._handle_interruption_frame()
|
||||
|
||||
@@ -569,49 +567,8 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
await self._send_user_audio_event(frame.audio)
|
||||
|
||||
async def _handle_bot_stopped_speaking(self, delay_to_catch_trailing_assistant_text: bool):
|
||||
# Protect against back-to-back BotStoppedSpeaking calls, which I've observed
|
||||
if self._handling_bot_stopped_speaking:
|
||||
return
|
||||
self._handling_bot_stopped_speaking = True
|
||||
|
||||
async def finalize_assistant_response():
|
||||
if self._assistant_is_responding:
|
||||
# Consider the assistant finished with their response (possibly after a short delay,
|
||||
# to allow for any trailing FINAL assistant text block to come in that need to make
|
||||
# it into context).
|
||||
#
|
||||
# TODO: ideally we could base this solely on the LLM output events, but I couldn't
|
||||
# figure out a reliable way to determine when we've gotten our last FINAL text block
|
||||
# after the LLM is done talking.
|
||||
#
|
||||
# First I looked at stopReason, but it doesn't seem like the last FINAL text block
|
||||
# is reliably marked END_TURN (sometimes the *first* one is, but not the last...
|
||||
# bug?)
|
||||
#
|
||||
# Then I considered schemes where we tally or match up SPECULATIVE text blocks with
|
||||
# FINAL text blocks to know how many or which FINAL blocks to expect, but user
|
||||
# interruptions throw a wrench in these schemes: depending on the exact timing of
|
||||
# the interruption, we should or shouldn't expect some FINAL blocks.
|
||||
if delay_to_catch_trailing_assistant_text:
|
||||
# This delay length is a balancing act between "catching" trailing assistant
|
||||
# text that is quite delayed but not waiting so long that user text comes in
|
||||
# first and results in a bit of context message order scrambling.
|
||||
await asyncio.sleep(1.25)
|
||||
self._assistant_is_responding = False
|
||||
await self._report_assistant_response_ended()
|
||||
|
||||
self._handling_bot_stopped_speaking = False
|
||||
|
||||
# Finalize the assistant response, either now or after a delay
|
||||
if delay_to_catch_trailing_assistant_text:
|
||||
self.create_task(finalize_assistant_response())
|
||||
else:
|
||||
await finalize_assistant_response()
|
||||
|
||||
async def _handle_interruption_frame(self):
|
||||
if self._assistant_is_responding:
|
||||
self._may_need_repush_assistant_text = True
|
||||
pass
|
||||
|
||||
#
|
||||
# LLM communication: lifecycle
|
||||
@@ -771,17 +728,15 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._input_audio_content_name = None
|
||||
self._content_being_received = None
|
||||
self._assistant_is_responding = False
|
||||
self._may_need_repush_assistant_text = False
|
||||
self._ready_to_send_context = False
|
||||
self._handling_bot_stopped_speaking = False
|
||||
self._triggering_assistant_response = False
|
||||
self._waiting_for_trigger_transcription = False
|
||||
self._disconnecting = False
|
||||
self._connected_time = None
|
||||
self._user_text_buffer = ""
|
||||
self._assistant_text_buffer = ""
|
||||
self._completed_tool_calls = set()
|
||||
self._audio_input_started = False
|
||||
self._pending_speculative_text = None
|
||||
|
||||
logger.info("Finished disconnecting")
|
||||
except Exception as e:
|
||||
@@ -1153,10 +1108,11 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._content_being_received = content
|
||||
|
||||
if content.role == Role.ASSISTANT:
|
||||
if content.type == ContentType.AUDIO:
|
||||
# Note that an assistant response can comprise of multiple audio blocks
|
||||
if not self._assistant_is_responding:
|
||||
# The assistant has started responding.
|
||||
if content.type == ContentType.TEXT:
|
||||
if (
|
||||
content.text_stage == TextStage.SPECULATIVE
|
||||
and not self._assistant_is_responding
|
||||
):
|
||||
self._assistant_is_responding = True
|
||||
await self._report_user_transcription_ended() # Consider user turn over
|
||||
await self._report_assistant_response_started()
|
||||
@@ -1232,18 +1188,30 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
if content.role == Role.ASSISTANT:
|
||||
if content.type == ContentType.TEXT:
|
||||
# Ignore non-final text, and the "interrupted" message (which isn't meaningful text)
|
||||
if content.text_stage == TextStage.FINAL and stop_reason != "INTERRUPTED":
|
||||
if self._assistant_is_responding:
|
||||
# Text added to the ongoing assistant response
|
||||
await self._report_assistant_response_text_added(content.text_content)
|
||||
if stop_reason != "INTERRUPTED":
|
||||
if content.text_stage == TextStage.SPECULATIVE:
|
||||
await self._report_llm_text(content.text_content)
|
||||
elif self._assistant_is_responding:
|
||||
# TEXT INTERRUPTED with no audio means the user interrupted
|
||||
# before audio started. End the response here since no AUDIO
|
||||
# contentEnd will arrive.
|
||||
self._assistant_is_responding = False
|
||||
await self._report_assistant_response_ended()
|
||||
elif content.type == ContentType.AUDIO:
|
||||
# Emit deferred TTSTextFrame after all audio chunks have been sent
|
||||
await self._report_tts_text()
|
||||
if stop_reason in ("END_TURN", "INTERRUPTED"):
|
||||
# END_TURN: normal completion. INTERRUPTED: user interrupted
|
||||
# mid-audio. Both mean no more audio for this turn.
|
||||
self._assistant_is_responding = False
|
||||
await self._report_assistant_response_ended()
|
||||
elif content.role == Role.USER:
|
||||
if content.type == ContentType.TEXT:
|
||||
if content.text_stage == TextStage.FINAL:
|
||||
# User transcription text added
|
||||
await self._report_user_transcription_text_added(content.text_content)
|
||||
|
||||
async def _handle_completion_end_event(self, event_json):
|
||||
async def _handle_completion_end_event(self, _):
|
||||
pass
|
||||
|
||||
#
|
||||
@@ -1256,29 +1224,40 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
async def _report_assistant_response_started(self):
|
||||
logger.debug("Assistant response started")
|
||||
|
||||
# Report the start of the assistant response.
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
# Report that equivalent of TTS (this is a speech-to-speech model) started
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
|
||||
async def _report_assistant_response_text_added(self, text):
|
||||
if not self._context: # should never happen
|
||||
return
|
||||
async def _report_llm_text(self, text):
|
||||
"""Push speculative assistant text and defer TTSTextFrame.
|
||||
|
||||
logger.debug(f"Assistant response text added: {text}")
|
||||
Speculative text arrives before each audio chunk, providing real-time
|
||||
text that is synchronized with what the bot is saying. LLMTextFrame and
|
||||
AggregatedTextFrame are pushed immediately for real-time text display.
|
||||
TTSTextFrame emission is deferred to audio contentEnd so it aligns with
|
||||
audio playout timing.
|
||||
"""
|
||||
logger.debug(f"Assistant speculative text: {text}")
|
||||
|
||||
# Report the text of the assistant response.
|
||||
await self._push_assistant_response_text_frames(text)
|
||||
llm_text_frame = LLMTextFrame(text)
|
||||
llm_text_frame.append_to_context = False
|
||||
await self.push_frame(llm_text_frame)
|
||||
|
||||
# HACK: here we're also buffering the assistant text ourselves as a
|
||||
# backup rather than relying solely on the assistant context aggregator
|
||||
# to do it, because the text arrives from Nova Sonic only after all the
|
||||
# assistant audio frames have been pushed, meaning that if an
|
||||
# interruption frame were to arrive we would lose all of it (the text
|
||||
# frames sitting in the queue would be wiped).
|
||||
self._assistant_text_buffer += text
|
||||
aggregated_text_frame = AggregatedTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
aggregated_text_frame.append_to_context = False
|
||||
await self.push_frame(aggregated_text_frame)
|
||||
|
||||
self._pending_speculative_text = text
|
||||
|
||||
async def _report_tts_text(self):
|
||||
if self._pending_speculative_text:
|
||||
tts_text_frame = TTSTextFrame(
|
||||
self._pending_speculative_text, aggregated_by=AggregationType.SENTENCE
|
||||
)
|
||||
tts_text_frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(tts_text_frame)
|
||||
self._pending_speculative_text = None
|
||||
|
||||
async def _report_assistant_response_ended(self):
|
||||
if not self._context: # should never happen
|
||||
@@ -1286,54 +1265,12 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug("Assistant response ended")
|
||||
|
||||
# If an interruption frame arrived while the assistant was responding
|
||||
# we may have lost all of the assistant text (see HACK, above), so
|
||||
# re-push it downstream to the aggregator now.
|
||||
if self._may_need_repush_assistant_text:
|
||||
# Just in case, check that assistant text hasn't already made it
|
||||
# into the context (sometimes it does, despite the interruption).
|
||||
messages = self._context.get_messages()
|
||||
last_message = messages[-1] if messages else None
|
||||
if (
|
||||
not last_message
|
||||
or last_message.get("role") != "assistant"
|
||||
or last_message.get("content") != self._assistant_text_buffer
|
||||
):
|
||||
# We also need to re-push the LLMFullResponseStartFrame since the
|
||||
# TTSTextFrame would be ignored otherwise (the interruption frame
|
||||
# would have cleared the assistant aggregator state).
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self._push_assistant_response_text_frames(self._assistant_text_buffer)
|
||||
self._may_need_repush_assistant_text = False
|
||||
|
||||
# Report the end of the assistant response.
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
# Clear out the buffered assistant text
|
||||
self._assistant_text_buffer = ""
|
||||
|
||||
async def _push_assistant_response_text_frames(self, text: str):
|
||||
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
|
||||
# proceed beyond the TTS service. Therefore, since a speech-to-speech
|
||||
# service like Nova Sonic combines both LLM and TTS functionality, you
|
||||
# would think we wouldn't need to push LLMTextFrames at all. However,
|
||||
# RTVI relies on LLMTextFrames being pushed to trigger its
|
||||
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
|
||||
# appending it to context to avoid context message duplication.
|
||||
|
||||
# Push LLMTextFrame
|
||||
llm_text_frame = LLMTextFrame(text)
|
||||
llm_text_frame.append_to_context = False
|
||||
await self.push_frame(llm_text_frame)
|
||||
|
||||
# Push TTSTextFrame
|
||||
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
tts_text_frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(tts_text_frame)
|
||||
|
||||
#
|
||||
# user transcription reporting
|
||||
#
|
||||
@@ -1363,6 +1300,12 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
if not self._context: # should never happen
|
||||
return
|
||||
|
||||
# Nothing to report if no user speech was transcribed (e.g. the prompt
|
||||
# was text-only, which is the case on the first user turn when the bot
|
||||
# starts the conversation).
|
||||
if not self._user_text_buffer:
|
||||
return
|
||||
|
||||
logger.debug(f"User transcription ended")
|
||||
|
||||
# Report to the upstream user context aggregator that some new user
|
||||
|
||||
@@ -10,6 +10,7 @@ This module provides integration with Gradium's real-time speech-to-text
|
||||
WebSocket API for streaming audio transcription.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
@@ -22,6 +23,7 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
@@ -43,7 +45,37 @@ except ModuleNotFoundError as e:
|
||||
logger.error('In order to use Gradium, you need to `pip install "pipecat-ai[gradium]"`.')
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
SAMPLE_RATE = 24000
|
||||
# Seconds to wait after a "flushed" message for trailing text tokens to arrive
|
||||
# before finalizing the transcription.
|
||||
TRANSCRIPT_AGGREGATION_DELAY = 0.1
|
||||
|
||||
|
||||
def _input_format_from_encoding(encoding: str, sample_rate: int) -> str:
|
||||
"""Build Gradium input_format from encoding type and sample rate.
|
||||
|
||||
For PCM encoding, appends the sample rate (e.g., "pcm_16000").
|
||||
For other encodings (wav, opus), returns the encoding as-is.
|
||||
|
||||
Args:
|
||||
encoding: Base encoding type ("pcm", "wav", or "opus").
|
||||
sample_rate: Audio sample rate in Hz.
|
||||
|
||||
Returns:
|
||||
The full input_format string for the Gradium API.
|
||||
"""
|
||||
if encoding == "pcm":
|
||||
match sample_rate:
|
||||
case 8000:
|
||||
return "pcm_8000"
|
||||
case 16000:
|
||||
return "pcm_16000"
|
||||
case 24000:
|
||||
return "pcm_24000"
|
||||
logger.warning(
|
||||
f"GradiumSTTService: unsupported sample rate {sample_rate} for PCM encoding, using pcm_16000"
|
||||
)
|
||||
return "pcm_16000"
|
||||
return encoding
|
||||
|
||||
|
||||
def language_to_gradium_language(language: Language) -> Optional[str]:
|
||||
@@ -115,6 +147,8 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
*,
|
||||
api_key: str,
|
||||
api_endpoint_base_url: str = "wss://eu.api.gradium.ai/api/speech/asr",
|
||||
encoding: str = "pcm",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: Optional[InputParams] = None,
|
||||
json_config: Optional[str] = None,
|
||||
settings: Optional[Settings] = None,
|
||||
@@ -126,6 +160,12 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
Args:
|
||||
api_key: Gradium API key for authentication.
|
||||
api_endpoint_base_url: WebSocket endpoint URL. Defaults to Gradium's streaming endpoint.
|
||||
encoding: Base audio encoding type. One of "pcm", "wav", or "opus".
|
||||
For PCM, the sample rate is appended automatically from the
|
||||
pipeline's audio_in_sample_rate (e.g., "pcm" becomes "pcm_16000").
|
||||
Defaults to "pcm".
|
||||
sample_rate: Audio sample rate in Hz. If None, uses the pipeline
|
||||
sample rate.
|
||||
params: Configuration parameters for language and delay settings.
|
||||
|
||||
.. deprecated:: 0.0.105
|
||||
@@ -153,7 +193,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
|
||||
# 1. Initialize default_settings with hardcoded defaults
|
||||
default_settings = self.Settings(
|
||||
model=None,
|
||||
model="default",
|
||||
language=None,
|
||||
delay_in_frames=None,
|
||||
)
|
||||
@@ -173,7 +213,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
default_settings.apply_update(settings)
|
||||
|
||||
super().__init__(
|
||||
sample_rate=SAMPLE_RATE,
|
||||
sample_rate=sample_rate,
|
||||
ttfs_p99_latency=ttfs_p99_latency,
|
||||
settings=default_settings,
|
||||
**kwargs,
|
||||
@@ -181,19 +221,25 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
|
||||
self._api_key = api_key
|
||||
self._api_endpoint_base_url = api_endpoint_base_url
|
||||
self._encoding = encoding
|
||||
self._websocket = None
|
||||
self._json_config = json_config
|
||||
|
||||
self._receive_task = None
|
||||
|
||||
self._input_format = ""
|
||||
|
||||
self._audio_buffer = bytearray()
|
||||
self._chunk_size_ms = 80
|
||||
self._chunk_size_bytes = 0
|
||||
|
||||
# Set from the ready message when connecting to the service.
|
||||
# These values are used for flushing transcription.
|
||||
self._delay_in_frames = 0
|
||||
self._frame_size = 0
|
||||
# Accumulates text fragments within a turn. Each "text" message
|
||||
# appends to this list. On "flushed" a short aggregation delay
|
||||
# allows trailing tokens to arrive before the full text is joined
|
||||
# and pushed as a TranscriptionFrame.
|
||||
self._accumulated_text: list[str] = []
|
||||
self._flush_counter = 0
|
||||
self._transcript_aggregation_task: Optional[asyncio.Task] = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if the service can generate metrics.
|
||||
@@ -228,6 +274,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
frame: Start frame to begin processing.
|
||||
"""
|
||||
await super().start(frame)
|
||||
self._input_format = _input_format_from_encoding(self._encoding, self.sample_rate)
|
||||
self._chunk_size_bytes = int(self._chunk_size_ms * self.sample_rate * 2 / 1000)
|
||||
await self._connect()
|
||||
|
||||
@@ -249,56 +296,41 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames with VAD-specific handling.
|
||||
async def _start_metrics(self):
|
||||
"""Start performance metrics collection for transcription processing."""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
When VAD detects the user has stopped speaking, we flush the transcription
|
||||
by sending silence frames. This makes the system more reactive by getting
|
||||
the final transcription faster without closing the connection.
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames and handle speech events.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction of frame processing.
|
||||
direction: Direction of frame flow in the pipeline.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self.start_processing_metrics()
|
||||
await self._start_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
await self._flush_transcription()
|
||||
await self._send_flush()
|
||||
|
||||
async def _flush_transcription(self):
|
||||
"""Flush the transcription by sending silence frames.
|
||||
async def _send_flush(self):
|
||||
"""Send a flush request to process any buffered audio immediately.
|
||||
|
||||
When VAD detects the user stopped speaking, we send delay_in_frames
|
||||
chunks of silence (zeros) to flush the remaining audio from the model's
|
||||
buffer. This allows for faster turn-around without closing the connection.
|
||||
|
||||
From Gradium docs: "feed in delay_in_frames chunks of silence (vectors
|
||||
of zeros). If those are fed in faster than realtime, the API also has
|
||||
a possibility to process them faster."
|
||||
Sends a flush message to tell the server to process buffered audio.
|
||||
The server responds with text fragments followed by a "flushed"
|
||||
acknowledgment, which triggers finalization.
|
||||
"""
|
||||
if not self._websocket or self._websocket.state is not State.OPEN:
|
||||
return
|
||||
|
||||
if self._delay_in_frames <= 0:
|
||||
logger.debug("No delay_in_frames set, skipping flush")
|
||||
return
|
||||
|
||||
# Create a silence chunk (zeros) of frame_size samples
|
||||
# Each sample is 2 bytes (16-bit PCM)
|
||||
silence_bytes = bytes(self._frame_size * 2)
|
||||
silence_b64 = base64.b64encode(silence_bytes).decode("utf-8")
|
||||
|
||||
logger.debug(f"Flushing Gradium STT with {self._delay_in_frames} silence frames")
|
||||
|
||||
for _ in range(self._delay_in_frames):
|
||||
msg = {"type": "audio", "audio": silence_b64}
|
||||
try:
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send silence frame: {e}")
|
||||
break
|
||||
self._flush_counter += 1
|
||||
flush_id = str(self._flush_counter)
|
||||
msg = {"type": "flush", "flush_id": flush_id}
|
||||
try:
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send flush: {e}")
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Process audio data for speech-to-text conversion.
|
||||
@@ -353,7 +385,8 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
await self._call_event_handler("on_connected")
|
||||
setup_msg = {
|
||||
"type": "setup",
|
||||
"input_format": "pcm",
|
||||
"model_name": self._settings.model,
|
||||
"input_format": self._input_format,
|
||||
}
|
||||
# Build json_config: start with deprecated json_config, then override with params
|
||||
json_config = {}
|
||||
@@ -375,13 +408,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
if ready_msg["type"] != "ready":
|
||||
raise Exception(f"unexpected first message type {ready_msg['type']}")
|
||||
|
||||
# Store delay_in_frames and frame_size for silence flushing
|
||||
self._delay_in_frames = ready_msg.get("delay_in_frames", 0)
|
||||
self._frame_size = ready_msg.get("frame_size", 1920)
|
||||
logger.debug(
|
||||
f"Connected to Gradium STT (delay_in_frames={self._delay_in_frames}, "
|
||||
f"frame_size={self._frame_size})"
|
||||
)
|
||||
logger.debug("Connected to Gradium STT")
|
||||
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
|
||||
@@ -390,6 +417,13 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
async def _disconnect(self):
|
||||
await super()._disconnect()
|
||||
|
||||
if self._transcript_aggregation_task:
|
||||
await self.cancel_task(self._transcript_aggregation_task)
|
||||
self._transcript_aggregation_task = None
|
||||
|
||||
self._accumulated_text.clear()
|
||||
self._flush_counter = 0
|
||||
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
@@ -412,41 +446,75 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
return self._websocket
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
async def _process_messages(self):
|
||||
async def _receive_messages(self):
|
||||
async for message in self._get_websocket():
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_response(data)
|
||||
msg = json.loads(message)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Received non-JSON message: {message}")
|
||||
continue
|
||||
|
||||
async def _receive_messages(self):
|
||||
while True:
|
||||
await self._process_messages()
|
||||
logger.debug(f"{self} Gradium connection was disconnected (timeout?), reconnecting")
|
||||
await self._connect_websocket()
|
||||
|
||||
async def _process_response(self, msg):
|
||||
type_ = msg.get("type", "")
|
||||
if type_ == "text":
|
||||
await self._handle_text(msg["text"])
|
||||
elif type_ == "end_of_stream":
|
||||
await self._handle_end_of_stream()
|
||||
elif type_ == "error":
|
||||
await self.push_error(error_msg=f"Error: {msg}")
|
||||
|
||||
async def _handle_end_of_stream(self):
|
||||
"""Handle termination message."""
|
||||
logger.debug("Received end_of_stream message from server")
|
||||
type_ = msg.get("type", "")
|
||||
if type_ == "text":
|
||||
await self._handle_text(msg["text"])
|
||||
elif type_ == "flushed":
|
||||
await self._handle_flushed()
|
||||
elif type_ == "end_of_stream":
|
||||
logger.debug("Received end_of_stream message from server")
|
||||
elif type_ == "error":
|
||||
await self.push_error(error_msg=f"Error: {msg}")
|
||||
|
||||
async def _handle_text(self, text: str):
|
||||
"""Handle transcription results."""
|
||||
"""Handle streaming transcription fragment.
|
||||
|
||||
Accumulates text and pushes an InterimTranscriptionFrame with the
|
||||
full accumulated text so far.
|
||||
"""
|
||||
self._accumulated_text.append(text)
|
||||
accumulated = " ".join(self._accumulated_text)
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
text=accumulated,
|
||||
user_id=self._user_id,
|
||||
timestamp=time_now_iso8601(),
|
||||
language=self._settings.language,
|
||||
)
|
||||
)
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
async def _handle_flushed(self):
|
||||
"""Handle flush completion by starting a transcript aggregation timer.
|
||||
|
||||
The "flushed" message confirms that buffered audio has been processed,
|
||||
but text tokens may still arrive after this point. A short timer allows
|
||||
trailing tokens to accumulate before finalizing the transcription.
|
||||
"""
|
||||
if self._transcript_aggregation_task:
|
||||
await self.cancel_task(self._transcript_aggregation_task)
|
||||
self._transcript_aggregation_task = self.create_task(
|
||||
self._transcript_aggregation_handler(), "transcript_aggregation"
|
||||
)
|
||||
|
||||
async def _transcript_aggregation_handler(self):
|
||||
"""Wait for trailing tokens then finalize the accumulated transcription."""
|
||||
await asyncio.sleep(TRANSCRIPT_AGGREGATION_DELAY)
|
||||
await self._finalize_accumulated_text()
|
||||
|
||||
async def _finalize_accumulated_text(self):
|
||||
"""Join accumulated text, push TranscriptionFrame, and clear state."""
|
||||
if not self._accumulated_text:
|
||||
return
|
||||
self._transcript_aggregation_task = None
|
||||
|
||||
text = " ".join(self._accumulated_text)
|
||||
self._accumulated_text.clear()
|
||||
logger.debug(f"Final transcription: [{text}]")
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(
|
||||
text,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
)
|
||||
)
|
||||
await self._trace_transcription(text, is_final=True, language=None)
|
||||
await self.stop_processing_metrics()
|
||||
await self._trace_transcription(text, is_final=True, language=self._settings.language)
|
||||
|
||||
@@ -720,11 +720,6 @@ class TTSService(AIService):
|
||||
self._turn_context_id = self.create_context_id()
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
|
||||
# We pause processing incoming frames if the LLM response included
|
||||
# text (it might be that it's only a function calling response). We
|
||||
# pause to avoid audio overlapping.
|
||||
await self._maybe_pause_frame_processing()
|
||||
|
||||
# Flush any remaining text (including text waiting for lookahead)
|
||||
remaining = await self._text_aggregator.flush()
|
||||
# Stop the aggregation metric (no-op if already stopped on first sentence).
|
||||
@@ -732,6 +727,11 @@ class TTSService(AIService):
|
||||
if remaining:
|
||||
await self._push_tts_frames(AggregatedTextFrame(remaining.text, remaining.type))
|
||||
|
||||
# We pause processing incoming frames if the LLM response included
|
||||
# text (it might be that it's only a function calling response). We
|
||||
# pause to avoid audio overlapping.
|
||||
await self._maybe_pause_frame_processing()
|
||||
|
||||
# Log accumulated streamed text and emit aggregated usage metric.
|
||||
if self._streamed_text:
|
||||
logger.debug(f"{self}: Generating TTS [{self._streamed_text}]")
|
||||
|
||||
@@ -241,6 +241,7 @@ class TavusTransportClient:
|
||||
on_dialout_stopped=partial(self._on_handle_callback, "on_dialout_stopped"),
|
||||
on_dialout_error=partial(self._on_handle_callback, "on_dialout_error"),
|
||||
on_dialout_warning=partial(self._on_handle_callback, "on_dialout_warning"),
|
||||
on_dtmf_event=partial(self._on_handle_callback, "on_dtmf_event"),
|
||||
on_participant_joined=self._callbacks.on_participant_joined,
|
||||
on_participant_left=self._callbacks.on_participant_left,
|
||||
on_participant_updated=partial(self._on_handle_callback, "on_participant_updated"),
|
||||
|
||||
24
src/pipecat/turns/types.py
Normal file
24
src/pipecat/turns/types.py
Normal file
@@ -0,0 +1,24 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Shared result type for user turn strategy frame processing."""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class ProcessFrameResult(Enum):
|
||||
"""Result of processing a frame in a user turn strategy.
|
||||
|
||||
Controls whether the strategy loop in the controller continues to the
|
||||
next strategy or stops early.
|
||||
|
||||
Attributes:
|
||||
CONTINUE: Continue to the next strategy in the loop.
|
||||
STOP: Stop evaluating further strategies for this frame.
|
||||
"""
|
||||
|
||||
CONTINUE = "continue"
|
||||
STOP = "stop"
|
||||
@@ -9,6 +9,7 @@ from .external_user_turn_start_strategy import ExternalUserTurnStartStrategy
|
||||
from .min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy
|
||||
from .transcription_user_turn_start_strategy import TranscriptionUserTurnStartStrategy
|
||||
from .vad_user_turn_start_strategy import VADUserTurnStartStrategy
|
||||
from .wake_phrase_user_turn_start_strategy import WakePhraseUserTurnStartStrategy
|
||||
|
||||
__all__ = [
|
||||
"BaseUserTurnStartStrategy",
|
||||
@@ -17,4 +18,5 @@ __all__ = [
|
||||
"TranscriptionUserTurnStartStrategy",
|
||||
"UserTurnStartedParams",
|
||||
"VADUserTurnStartStrategy",
|
||||
"WakePhraseUserTurnStartStrategy",
|
||||
]
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import Optional, Type
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
@@ -76,6 +77,7 @@ class BaseUserTurnStartStrategy(BaseObject):
|
||||
self._register_event_handler("on_push_frame", sync=True)
|
||||
self._register_event_handler("on_broadcast_frame", sync=True)
|
||||
self._register_event_handler("on_user_turn_started", sync=True)
|
||||
self._register_event_handler("on_reset_aggregation", sync=True)
|
||||
|
||||
@property
|
||||
def task_manager(self) -> BaseTaskManager:
|
||||
@@ -100,7 +102,7 @@ class BaseUserTurnStartStrategy(BaseObject):
|
||||
"""Reset the strategy to its initial state."""
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame.
|
||||
|
||||
Subclasses should override this to implement logic that decides whether
|
||||
@@ -108,6 +110,10 @@ class BaseUserTurnStartStrategy(BaseObject):
|
||||
|
||||
Args:
|
||||
frame: The frame to be processed.
|
||||
|
||||
Returns:
|
||||
A ProcessFrameResult indicating the outcome. Subclasses that return
|
||||
None are treated as CONTINUE for backward compatibility.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -138,3 +144,7 @@ class BaseUserTurnStartStrategy(BaseObject):
|
||||
enable_user_speaking_frames=self._enable_user_speaking_frames,
|
||||
),
|
||||
)
|
||||
|
||||
async def trigger_reset_aggregation(self):
|
||||
"""Trigger the `on_reset_aggregation` event."""
|
||||
await self._call_event_handler("on_reset_aggregation")
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
"""User turn start strategy triggered by externally emitted frames."""
|
||||
|
||||
from pipecat.frames.frames import Frame, UserStartedSpeakingFrame
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
|
||||
|
||||
@@ -27,13 +28,17 @@ class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
"""
|
||||
super().__init__(enable_interruptions=False, enable_user_speaking_frames=False, **kwargs)
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to detect user turn start.
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
Returns:
|
||||
STOP if a user started speaking frame was received, CONTINUE otherwise.
|
||||
"""
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self.trigger_user_turn_started()
|
||||
return ProcessFrameResult.STOP
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
@@ -15,6 +15,7 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
|
||||
|
||||
@@ -47,7 +48,7 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
await super().reset()
|
||||
self._bot_speaking = False
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to detect the start of a user turn.
|
||||
|
||||
This method updates internal state based on transcription frames and
|
||||
@@ -55,17 +56,20 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
Returns:
|
||||
STOP if the minimum word count was reached, CONTINUE otherwise.
|
||||
"""
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
await self._handle_bot_started_speaking(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self._handle_bot_stopped_speaking(frame)
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._handle_transcription(frame)
|
||||
return await self._handle_transcription(frame)
|
||||
elif isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
|
||||
await self._handle_transcription(frame)
|
||||
return await self._handle_transcription(frame)
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame):
|
||||
"""Handle bot started speaking frame.
|
||||
@@ -87,11 +91,16 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
"""
|
||||
self._bot_speaking = False
|
||||
|
||||
async def _handle_transcription(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
|
||||
"""Handle a completed transcription frame and check word count.
|
||||
async def _handle_transcription(
|
||||
self, frame: TranscriptionFrame | InterimTranscriptionFrame
|
||||
) -> ProcessFrameResult:
|
||||
"""Handle a transcription frame and check word count.
|
||||
|
||||
Args:
|
||||
frame: The transcription frame to be processed.
|
||||
|
||||
Returns:
|
||||
STOP if the minimum word count was reached, CONTINUE otherwise.
|
||||
"""
|
||||
min_words = self._min_words if self._bot_speaking else 1
|
||||
|
||||
@@ -106,3 +115,7 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
|
||||
if should_trigger:
|
||||
await self.trigger_user_turn_started()
|
||||
return ProcessFrameResult.STOP
|
||||
await self.trigger_reset_aggregation()
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
"""User turn start strategy based on transcriptions."""
|
||||
|
||||
from pipecat.frames.frames import Frame, InterimTranscriptionFrame, TranscriptionFrame
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
|
||||
|
||||
@@ -25,15 +26,20 @@ class TranscriptionUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
super().__init__(**kwargs)
|
||||
self._use_interim = use_interim
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to detect the start of a user turn.
|
||||
|
||||
Args:
|
||||
frame: The frame to be processed.
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
Returns:
|
||||
STOP if a transcription was received, CONTINUE otherwise.
|
||||
"""
|
||||
if isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
|
||||
await self.trigger_user_turn_started()
|
||||
return ProcessFrameResult.STOP
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self.trigger_user_turn_started()
|
||||
return ProcessFrameResult.STOP
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
"""User turn start strategy based on VAD events."""
|
||||
|
||||
from pipecat.frames.frames import Frame, VADUserStartedSpeakingFrame
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
|
||||
|
||||
@@ -18,13 +19,17 @@ class VADUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
|
||||
"""
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to detect user turn start.
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
Returns:
|
||||
STOP if the user started speaking, CONTINUE otherwise.
|
||||
"""
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self.trigger_user_turn_started()
|
||||
return ProcessFrameResult.STOP
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
@@ -0,0 +1,281 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""User turn start strategy that gates interaction behind wake phrase detection."""
|
||||
|
||||
import asyncio
|
||||
import enum
|
||||
import re
|
||||
from typing import List, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
Frame,
|
||||
TranscriptionFrame,
|
||||
UserSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
|
||||
|
||||
class _WakeState(enum.Enum):
|
||||
"""Internal state for wake phrase detection."""
|
||||
|
||||
IDLE = "idle"
|
||||
AWAKE = "awake"
|
||||
|
||||
|
||||
class WakePhraseUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
"""User turn start strategy that requires a wake phrase before interaction.
|
||||
|
||||
Blocks subsequent strategies until a wake phrase is detected in a final
|
||||
transcription. After detection, allows interaction for a configurable
|
||||
timeout period before requiring the wake phrase again. Use
|
||||
``single_activation=True`` to require the wake phrase before every turn.
|
||||
|
||||
This strategy should be placed first in the start strategies list.
|
||||
|
||||
Event handlers available:
|
||||
|
||||
- on_wake_phrase_detected: Called when a wake phrase is matched.
|
||||
- on_wake_phrase_timeout: Called when the inactivity timeout expires
|
||||
(timeout mode only).
|
||||
|
||||
Example::
|
||||
|
||||
# Timeout mode (default): wake phrase unlocks interaction for 10s
|
||||
strategy = WakePhraseUserTurnStartStrategy(
|
||||
phrases=["hey pipecat", "ok pipecat"],
|
||||
timeout=10.0,
|
||||
)
|
||||
|
||||
# Single activation: wake phrase required before every turn
|
||||
strategy = WakePhraseUserTurnStartStrategy(
|
||||
phrases=["hey pipecat"],
|
||||
single_activation=True,
|
||||
)
|
||||
|
||||
@strategy.event_handler("on_wake_phrase_detected")
|
||||
async def on_wake_phrase_detected(strategy, phrase):
|
||||
...
|
||||
|
||||
@strategy.event_handler("on_wake_phrase_timeout")
|
||||
async def on_wake_phrase_timeout(strategy):
|
||||
...
|
||||
|
||||
Args:
|
||||
phrases: List of wake phrases to detect.
|
||||
timeout: Inactivity timeout in seconds before returning to IDLE.
|
||||
In timeout mode, the timer resets on activity (user, bot speech).
|
||||
In single activation mode, acts as a keepalive window — the strategy
|
||||
stays AWAKE for this duration after wake phrase detection, allowing
|
||||
the current turn to complete before returning to IDLE.
|
||||
single_activation: If True, the wake phrase is required before every
|
||||
turn. The strategy returns to IDLE after each turn completes.
|
||||
**kwargs: Additional keyword arguments passed to parent.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
phrases: List[str],
|
||||
timeout: float = 10.0,
|
||||
single_activation: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the wake phrase user turn start strategy.
|
||||
|
||||
Args:
|
||||
phrases: List of wake phrases to detect.
|
||||
timeout: Inactivity timeout in seconds before returning to IDLE.
|
||||
In timeout mode, the timer resets on activity. In single activation
|
||||
mode, acts as a keepalive window after wake phrase detection.
|
||||
single_activation: If True, the wake phrase is required before every
|
||||
turn. The strategy returns to IDLE after each turn completes.
|
||||
**kwargs: Additional keyword arguments passed to parent.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._phrases = phrases
|
||||
self._timeout = timeout
|
||||
self._single_activation = single_activation
|
||||
|
||||
self._patterns: List[re.Pattern] = []
|
||||
for phrase in phrases:
|
||||
pattern = re.compile(
|
||||
r"\b" + r"\s*".join(re.escape(word) for word in phrase.split()) + r"\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
self._patterns.append(pattern)
|
||||
|
||||
self._state = _WakeState.IDLE
|
||||
self._accumulated_text = ""
|
||||
|
||||
self._timeout_event = asyncio.Event()
|
||||
self._timeout_task: Optional[asyncio.Task] = None
|
||||
|
||||
self._register_event_handler("on_wake_phrase_detected")
|
||||
self._register_event_handler("on_wake_phrase_timeout")
|
||||
|
||||
@property
|
||||
def state(self) -> _WakeState:
|
||||
"""Returns the current wake state."""
|
||||
return self._state
|
||||
|
||||
async def setup(self, task_manager: BaseTaskManager):
|
||||
"""Initialize the strategy with the given task manager.
|
||||
|
||||
Args:
|
||||
task_manager: The task manager to be associated with this instance.
|
||||
"""
|
||||
await super().setup(task_manager)
|
||||
if not self._timeout_task:
|
||||
self._timeout_task = self.task_manager.create_task(
|
||||
self._timeout_task_handler(),
|
||||
f"{self}::_timeout_task_handler",
|
||||
)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup the strategy."""
|
||||
await super().cleanup()
|
||||
if self._timeout_task:
|
||||
await self.task_manager.cancel_task(self._timeout_task)
|
||||
self._timeout_task = None
|
||||
|
||||
async def reset(self):
|
||||
"""Reset the strategy.
|
||||
|
||||
In timeout mode, preserves state and refreshes timeout since reset
|
||||
means a turn started (activity). In single activation mode, does
|
||||
nothing — the keepalive timeout (started when the wake phrase was
|
||||
detected) handles the transition back to IDLE.
|
||||
"""
|
||||
await super().reset()
|
||||
if self._state == _WakeState.AWAKE:
|
||||
if not self._single_activation:
|
||||
self._refresh_timeout()
|
||||
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame for wake phrase detection or passthrough.
|
||||
|
||||
Args:
|
||||
frame: The frame to be processed.
|
||||
|
||||
Returns:
|
||||
STOP when the wake phrase is detected or when in IDLE state
|
||||
(blocks subsequent strategies), CONTINUE when in AWAKE state
|
||||
(allows subsequent strategies to proceed).
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
if self._state == _WakeState.IDLE:
|
||||
return await self._process_idle(frame)
|
||||
else:
|
||||
return await self._process_awake(frame)
|
||||
|
||||
async def _process_idle(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process a frame while in IDLE state.
|
||||
|
||||
Only final ``TranscriptionFrame`` instances are checked for wake phrase
|
||||
matches. When a match is found, a user turn start is triggered.
|
||||
Transcription frames that don't match have their text cleared so that
|
||||
pre-wake-phrase speech is not added to the LLM context. All frames
|
||||
return STOP to block subsequent strategies.
|
||||
"""
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
if self._check_wake_phrase(frame.text):
|
||||
await self.trigger_user_turn_started()
|
||||
return ProcessFrameResult.STOP
|
||||
await self.trigger_reset_aggregation()
|
||||
|
||||
return ProcessFrameResult.STOP
|
||||
|
||||
async def _process_awake(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process a frame while in AWAKE state.
|
||||
|
||||
Refreshes the timeout on activity frames (timeout mode only). Returns
|
||||
CONTINUE so subsequent strategies can process the frame.
|
||||
"""
|
||||
if not self._single_activation:
|
||||
if isinstance(frame, (UserSpeakingFrame, BotSpeakingFrame)):
|
||||
self._refresh_timeout()
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
self._refresh_timeout()
|
||||
elif isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
self._refresh_timeout()
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
@staticmethod
|
||||
def _strip_punctuation(text: str) -> str:
|
||||
"""Strip punctuation from text, keeping only letters, digits, and whitespace."""
|
||||
return re.sub(r"[^\w\s]", "", text)
|
||||
|
||||
def _check_wake_phrase(self, text: str) -> bool:
|
||||
"""Check if the accumulated text contains a wake phrase.
|
||||
|
||||
Punctuation is stripped before matching so that STT output like
|
||||
"Hey, Pipecat!" still matches the phrase "hey pipecat".
|
||||
|
||||
Args:
|
||||
text: New transcription text to append and check.
|
||||
|
||||
Returns:
|
||||
True if a wake phrase was found, False otherwise.
|
||||
"""
|
||||
self._accumulated_text += " " + self._strip_punctuation(text)
|
||||
# Cap accumulated text to prevent unbounded growth.
|
||||
if len(self._accumulated_text) > 250:
|
||||
self._accumulated_text = self._accumulated_text[-250:]
|
||||
|
||||
for i, pattern in enumerate(self._patterns):
|
||||
if pattern.search(self._accumulated_text):
|
||||
phrase = self._phrases[i]
|
||||
logger.debug(f"{self} wake phrase detected: {phrase!r}")
|
||||
self._transition_to_awake(phrase)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _transition_to_awake(self, phrase: str):
|
||||
"""Transition from IDLE to AWAKE state."""
|
||||
self._state = _WakeState.AWAKE
|
||||
self._accumulated_text = ""
|
||||
self._refresh_timeout()
|
||||
self.task_manager.create_task(
|
||||
self._call_event_handler("on_wake_phrase_detected", phrase),
|
||||
f"{self}::on_wake_phrase_detected",
|
||||
)
|
||||
|
||||
def _transition_to_idle(self):
|
||||
"""Transition from AWAKE to IDLE state."""
|
||||
logger.debug(f"{self} wake phrase timeout, returning to IDLE")
|
||||
self._state = _WakeState.IDLE
|
||||
self._accumulated_text = ""
|
||||
self.task_manager.create_task(
|
||||
self._call_event_handler("on_wake_phrase_timeout"),
|
||||
f"{self}::on_wake_phrase_timeout",
|
||||
)
|
||||
|
||||
def _refresh_timeout(self):
|
||||
"""Refresh the inactivity timeout."""
|
||||
self._timeout_event.set()
|
||||
|
||||
async def _timeout_task_handler(self):
|
||||
"""Background task that monitors inactivity timeout."""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._timeout_event.wait(),
|
||||
timeout=self._timeout,
|
||||
)
|
||||
self._timeout_event.clear()
|
||||
except asyncio.TimeoutError:
|
||||
if self._state == _WakeState.AWAKE:
|
||||
self._transition_to_idle()
|
||||
@@ -11,6 +11,7 @@ from typing import Optional, Type
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
@@ -89,7 +90,7 @@ class BaseUserTurnStopStrategy(BaseObject):
|
||||
"""Reset the strategy to its initial state."""
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to decide whether the user stopped speaking.
|
||||
|
||||
Subclasses should override this to implement logic that decides whether
|
||||
@@ -97,6 +98,10 @@ class BaseUserTurnStopStrategy(BaseObject):
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
|
||||
Returns:
|
||||
A ProcessFrameResult indicating the outcome. Subclasses that return
|
||||
None are treated as CONTINUE for backward compatibility.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ from pipecat.frames.frames import (
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
|
||||
@@ -69,7 +70,7 @@ class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
await self.task_manager.cancel_task(self._task)
|
||||
self._task = None
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to update strategy state.
|
||||
|
||||
Updates internal transcription text and VAD state. The user end turn
|
||||
@@ -78,6 +79,8 @@ class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
|
||||
Returns:
|
||||
Always returns CONTINUE so subsequent stop strategies are evaluated.
|
||||
"""
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self._handle_user_started_speaking(frame)
|
||||
@@ -88,6 +91,8 @@ class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._handle_transcription(frame)
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame):
|
||||
"""Handle when the external service indicates the user is speaking."""
|
||||
self._user_speaking = True
|
||||
|
||||
@@ -17,6 +17,7 @@ from pipecat.frames.frames import (
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
|
||||
@@ -83,7 +84,7 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
await self.task_manager.cancel_task(self._timeout_task)
|
||||
self._timeout_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to update strategy state.
|
||||
|
||||
Updates internal transcription text and VAD state. The user end turn
|
||||
@@ -92,6 +93,8 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
|
||||
Returns:
|
||||
Always returns CONTINUE so subsequent stop strategies are evaluated.
|
||||
"""
|
||||
if isinstance(frame, STTMetadataFrame):
|
||||
self._stt_timeout = frame.ttfs_p99_latency
|
||||
@@ -102,6 +105,8 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._handle_transcription(frame)
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
async def _handle_vad_user_started_speaking(self, _: VADUserStartedSpeakingFrame):
|
||||
"""Handle when the VAD indicates the user is speaking."""
|
||||
self._vad_user_speaking = True
|
||||
|
||||
@@ -22,6 +22,7 @@ from pipecat.frames.frames import (
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
|
||||
@@ -88,11 +89,14 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
await self.task_manager.cancel_task(self._timeout_task)
|
||||
self._timeout_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Process an incoming frame to update the turn analyzer and strategy state.
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
|
||||
Returns:
|
||||
Always returns CONTINUE so subsequent stop strategies are evaluated.
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
@@ -109,6 +113,8 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._handle_transcription(frame)
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
"""Process the start frame to configure the turn analyzer."""
|
||||
self._turn_analyzer.set_sample_rate(frame.audio_in_sample_rate)
|
||||
|
||||
@@ -19,7 +19,11 @@ from pipecat.frames.frames import (
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start import (
|
||||
BaseUserTurnStartStrategy,
|
||||
UserTurnStartedParams,
|
||||
)
|
||||
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
|
||||
from pipecat.turns.user_turn_strategies import UserTurnStrategies
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
@@ -94,6 +98,7 @@ class UserTurnController(BaseObject):
|
||||
self._register_event_handler("on_user_turn_started", sync=True)
|
||||
self._register_event_handler("on_user_turn_stopped", sync=True)
|
||||
self._register_event_handler("on_user_turn_stop_timeout", sync=True)
|
||||
self._register_event_handler("on_reset_aggregation", sync=True)
|
||||
|
||||
@property
|
||||
def task_manager(self) -> BaseTaskManager:
|
||||
@@ -161,10 +166,14 @@ class UserTurnController(BaseObject):
|
||||
await self._handle_transcription(frame)
|
||||
|
||||
for strategy in self._user_turn_strategies.start or []:
|
||||
await strategy.process_frame(frame)
|
||||
result = await strategy.process_frame(frame)
|
||||
if result == ProcessFrameResult.STOP:
|
||||
break
|
||||
|
||||
for strategy in self._user_turn_strategies.stop or []:
|
||||
await strategy.process_frame(frame)
|
||||
result = await strategy.process_frame(frame)
|
||||
if result == ProcessFrameResult.STOP:
|
||||
break
|
||||
|
||||
async def _setup_strategies(self):
|
||||
for s in self._user_turn_strategies.start or []:
|
||||
@@ -172,6 +181,7 @@ class UserTurnController(BaseObject):
|
||||
s.add_event_handler("on_push_frame", self._on_push_frame)
|
||||
s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
|
||||
s.add_event_handler("on_user_turn_started", self._on_user_turn_started)
|
||||
s.add_event_handler("on_reset_aggregation", self._on_reset_aggregation)
|
||||
|
||||
for s in self._user_turn_strategies.stop or []:
|
||||
await s.setup(self.task_manager)
|
||||
@@ -242,6 +252,9 @@ class UserTurnController(BaseObject):
|
||||
):
|
||||
await self._trigger_user_turn_stop(strategy, params)
|
||||
|
||||
async def _on_reset_aggregation(self, strategy: BaseUserTurnStartStrategy):
|
||||
await self._call_event_handler("on_reset_aggregation", strategy)
|
||||
|
||||
async def _trigger_user_turn_start(
|
||||
self, strategy: Optional[BaseUserTurnStartStrategy], params: UserTurnStartedParams
|
||||
):
|
||||
|
||||
@@ -23,6 +23,31 @@ from pipecat.turns.user_stop import (
|
||||
)
|
||||
|
||||
|
||||
def default_user_turn_start_strategies() -> List[BaseUserTurnStartStrategy]:
|
||||
"""Return the default user turn start strategies.
|
||||
|
||||
Returns ``[VADUserTurnStartStrategy, TranscriptionUserTurnStartStrategy]``.
|
||||
Useful when building a custom strategy list that extends the defaults.
|
||||
|
||||
Example::
|
||||
|
||||
start_strategies = [
|
||||
WakePhraseUserTurnStartStrategy(phrases=["hey pipecat"]),
|
||||
*default_user_turn_start_strategies(),
|
||||
]
|
||||
"""
|
||||
return [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()]
|
||||
|
||||
|
||||
def default_user_turn_stop_strategies() -> List[BaseUserTurnStopStrategy]:
|
||||
"""Return the default user turn stop strategies.
|
||||
|
||||
Returns ``[TurnAnalyzerUserTurnStopStrategy(LocalSmartTurnAnalyzerV3)]``.
|
||||
Useful when building a custom strategy list that extends the defaults.
|
||||
"""
|
||||
return [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserTurnStrategies:
|
||||
"""Container for user turn start and stop strategies.
|
||||
@@ -45,9 +70,9 @@ class UserTurnStrategies:
|
||||
|
||||
def __post_init__(self):
|
||||
if not self.start:
|
||||
self.start = [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()]
|
||||
self.start = default_user_turn_start_strategies()
|
||||
if not self.stop:
|
||||
self.stop = [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
|
||||
self.stop = default_user_turn_stop_strategies()
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
346
tests/test_wake_phrase_user_turn_start_strategy.py
Normal file
346
tests/test_wake_phrase_user_turn_start_strategy.py
Normal file
@@ -0,0 +1,346 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
UserSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_start.wake_phrase_user_turn_start_strategy import (
|
||||
WakePhraseUserTurnStartStrategy,
|
||||
_WakeState,
|
||||
)
|
||||
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
|
||||
|
||||
|
||||
class TestWakePhraseUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
def _create_strategy(self, **kwargs) -> WakePhraseUserTurnStartStrategy:
|
||||
kwargs.setdefault("phrases", ["hey pipecat"])
|
||||
kwargs.setdefault("timeout", 10.0)
|
||||
return WakePhraseUserTurnStartStrategy(**kwargs)
|
||||
|
||||
async def _setup_strategy(self, strategy: WakePhraseUserTurnStartStrategy):
|
||||
task_manager = TaskManager()
|
||||
loop = asyncio.get_running_loop()
|
||||
task_manager.setup(TaskManagerParams(loop=loop))
|
||||
await strategy.setup(task_manager)
|
||||
return task_manager
|
||||
|
||||
async def test_wake_phrase_in_final_transcription(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_interim_transcription_ignored(self):
|
||||
"""Interim transcriptions are never used for wake phrase matching."""
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
InterimTranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_no_wake_phrase_returns_stop(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="hello world", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_non_matching_text_resets_aggregation(self):
|
||||
"""Non-matching transcription triggers aggregation reset to prevent LLM context pollution."""
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
reset_called = False
|
||||
|
||||
@strategy.event_handler("on_reset_aggregation")
|
||||
async def on_reset_aggregation(strategy):
|
||||
nonlocal reset_called
|
||||
reset_called = True
|
||||
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hello world", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertTrue(reset_called)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_vad_frame_returns_stop_in_listening(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_inactive_returns_continue(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
# Trigger wake phrase first.
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Subsequent frames should return CONTINUE.
|
||||
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
self.assertEqual(result, ProcessFrameResult.CONTINUE)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="what is the weather", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.CONTINUE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_accumulation_across_frames(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_multiple_phrases(self):
|
||||
strategy = self._create_strategy(phrases=["hey pipecat", "ok computer"])
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="ok computer", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_punctuation_stripped(self):
|
||||
"""STT punctuation like 'Hey, Pipecat!' should still match."""
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="Hey, Pipecat!", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_reset_preserves_inactive_state(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.reset()
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_timeout_returns_to_listening(self):
|
||||
strategy = self._create_strategy(timeout=0.1)
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
# Trigger wake phrase.
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Wait for timeout to expire.
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_activity_refreshes_timeout(self):
|
||||
strategy = self._create_strategy(timeout=0.2)
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
# Trigger wake phrase.
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Send activity before timeout.
|
||||
await asyncio.sleep(0.1)
|
||||
await strategy.process_frame(UserSpeakingFrame())
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Send more activity.
|
||||
await asyncio.sleep(0.1)
|
||||
await strategy.process_frame(BotSpeakingFrame())
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Wait for timeout to expire after last activity.
|
||||
await asyncio.sleep(0.3)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_wake_phrase_detected_event(self):
|
||||
strategy = self._create_strategy()
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
detected_phrase = None
|
||||
|
||||
@strategy.event_handler("on_wake_phrase_detected")
|
||||
async def on_wake_phrase_detected(strategy, phrase):
|
||||
nonlocal detected_phrase
|
||||
detected_phrase = phrase
|
||||
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
|
||||
# Event fires in a background task, give it a moment.
|
||||
await asyncio.sleep(0.05)
|
||||
self.assertEqual(detected_phrase, "hey pipecat")
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_wake_phrase_timeout_event(self):
|
||||
strategy = self._create_strategy(timeout=0.1)
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
timeout_fired = False
|
||||
|
||||
@strategy.event_handler("on_wake_phrase_timeout")
|
||||
async def on_wake_phrase_timeout(strategy):
|
||||
nonlocal timeout_fired
|
||||
timeout_fired = True
|
||||
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
|
||||
# Wait for timeout.
|
||||
await asyncio.sleep(0.3)
|
||||
self.assertTrue(timeout_fired)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_single_activation_stays_inactive_after_reset(self):
|
||||
"""In single activation mode, reset() keeps INACTIVE so the current turn can finish."""
|
||||
strategy = self._create_strategy(single_activation=True, timeout=0.5)
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
# Trigger wake phrase.
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Simulate turn start (controller calls reset on all start strategies).
|
||||
await strategy.reset()
|
||||
# State remains INACTIVE so frames continue to flow.
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Subsequent frames should pass through (CONTINUE).
|
||||
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
self.assertEqual(result, ProcessFrameResult.CONTINUE)
|
||||
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="what is the weather", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.CONTINUE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_single_activation_timeout_returns_to_listening(self):
|
||||
"""In single activation mode, the keepalive timeout returns to LISTENING."""
|
||||
strategy = self._create_strategy(single_activation=True, timeout=0.1)
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
# Trigger wake phrase.
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
# Wait for keepalive timeout to expire.
|
||||
await asyncio.sleep(0.3)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
# Frames should now be blocked again.
|
||||
result = await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
async def test_single_activation_requires_wake_phrase_after_timeout(self):
|
||||
"""Single activation mode requires wake phrase again after keepalive timeout."""
|
||||
strategy = self._create_strategy(single_activation=True, timeout=0.1)
|
||||
await self._setup_strategy(strategy)
|
||||
|
||||
# First turn: wake phrase -> INACTIVE -> timeout -> LISTENING.
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
await asyncio.sleep(0.3)
|
||||
self.assertEqual(strategy.state, _WakeState.IDLE)
|
||||
|
||||
# Without wake phrase, frames are blocked.
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="what is the weather", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
|
||||
# Second turn: wake phrase again.
|
||||
result = await strategy.process_frame(
|
||||
TranscriptionFrame(text="hey pipecat", user_id="user1", timestamp="")
|
||||
)
|
||||
self.assertEqual(result, ProcessFrameResult.STOP)
|
||||
self.assertEqual(strategy.state, _WakeState.AWAKE)
|
||||
|
||||
await strategy.cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user