Compare commits
45 Commits
pk/optiona
...
v1.2.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71feb42711 | ||
|
|
6b93ca0cb6 | ||
|
|
b6ecce754b | ||
|
|
d39e6bf921 | ||
|
|
63064860ef | ||
|
|
f5158d51e7 | ||
|
|
94dbd2fa68 | ||
|
|
c6ea6c6522 | ||
|
|
58a22aeeb1 | ||
|
|
5403aa56e4 | ||
|
|
0e0d76d020 | ||
|
|
ea296babe9 | ||
|
|
b13af2b053 | ||
|
|
7b6d878f07 | ||
|
|
8e405f15aa | ||
|
|
44a40e8eb2 | ||
|
|
ea97cb1a78 | ||
|
|
22650b1b56 | ||
|
|
b76831e677 | ||
|
|
b57111743f | ||
|
|
dcbb0070c9 | ||
|
|
73278d3309 | ||
|
|
49bda11ae8 | ||
|
|
07640582ce | ||
|
|
078af6969a | ||
|
|
9f40ba21c2 | ||
|
|
82f0896d6a | ||
|
|
7e4cd23de4 | ||
|
|
97f50c8aa2 | ||
|
|
08680732f6 | ||
|
|
064b68aa01 | ||
|
|
b0f8ea7e28 | ||
|
|
ad50c8d5d5 | ||
|
|
39e7f9e354 | ||
|
|
7cc7968abb | ||
|
|
52d8008783 | ||
|
|
a3ce963b54 | ||
|
|
e70ee603b2 | ||
|
|
111e59a7b1 | ||
|
|
079282d140 | ||
|
|
0ccdd808e6 | ||
|
|
863a1bf177 | ||
|
|
58333b2705 | ||
|
|
ecaff1d1eb | ||
|
|
9b55d4ddd4 |
509
CHANGELOG.md
509
CHANGELOG.md
@@ -7,6 +7,515 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
<!-- towncrier release notes start -->
|
||||
|
||||
## [1.2.1] - 2026-05-15
|
||||
|
||||
### Changed
|
||||
|
||||
- Changed the default WebSocket endpoints for `GradiumSTTService` and
|
||||
`GradiumTTSService` to the region-neutral
|
||||
`wss://api.gradium.ai/api/speech/asr` and
|
||||
`wss://api.gradium.ai/api/speech/tts`. Gradium now automatically routes
|
||||
traffic to the nearest endpoint. Override the url to pin to a specific
|
||||
region.
|
||||
(PR [#4500](https://github.com/pipecat-ai/pipecat/pull/4500))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed bot hangs when `filter_incomplete_user_turns` was enabled and the LLM
|
||||
responded by calling a tool. The user turn never finalized, so the assistant
|
||||
aggregator gated the tool-result context push and the LLM continuation never
|
||||
ran. Tool calls now finalize the turn the moment they start, before the
|
||||
function dispatches.
|
||||
(PR [#4501](https://github.com/pipecat-ai/pipecat/pull/4501))
|
||||
|
||||
## [1.2.0] - 2026-05-14
|
||||
|
||||
### Added
|
||||
|
||||
- Added a `session_id` field to `RunnerArguments` so bots can log or trace a
|
||||
per-session identifier in local development the same way they can in Pipecat
|
||||
Cloud. The development runner now mints a UUID at every construction site,
|
||||
and paths that already returned a `sessionId` to the caller (Daily `/start`,
|
||||
dial-in webhook) share that same UUID with the runner args instead of
|
||||
generating two. The SmallWebRTC `/api/offer` endpoint also accepts an
|
||||
optional `session_id` query parameter so the `/sessions/{session_id}/...`
|
||||
proxy can thread it through.
|
||||
(PR [#4385](https://github.com/pipecat-ai/pipecat/pull/4385))
|
||||
|
||||
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService`
|
||||
for controlling Cartesia's server-side text buffering. When unset, Pipecat
|
||||
picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE`
|
||||
mode (custom buffering — avoids stacking client-side aggregation on top of
|
||||
Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode
|
||||
(Cartesia's managed buffering applies). Pass an explicit value (0–5000ms) to
|
||||
override.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and
|
||||
`DeepgramHttpTTSService` so callers can opt out of the Deepgram Model
|
||||
Improvement Program. When set, the value is forwarded to Deepgram as a query
|
||||
parameter on the speak request. Defaults to `None`, which preserves the
|
||||
existing behavior. See https://dpgr.am/deepgram-mip for pricing implications
|
||||
before enabling.
|
||||
(PR [#4400](https://github.com/pipecat-ai/pipecat/pull/4400))
|
||||
|
||||
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set
|
||||
via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that
|
||||
appends a developer-role message to the context whenever `LLMSetToolsFrame`
|
||||
changes the set of advertised standard tools. Helps the LLM stay coherent
|
||||
across mid-conversation tool changes, mitigating several flavors of
|
||||
tool-call-related hallucination: calling tools that have been removed,
|
||||
avoiding tools that have been re-added, and hallucinating output (made-up
|
||||
answers or tool-call-shaped non-tool-calls) when tools are unavailable.
|
||||
(PR [#4404](https://github.com/pipecat-ai/pipecat/pull/4404))
|
||||
|
||||
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in
|
||||
`pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the
|
||||
inference-triggered event and suppresses `on_user_turn_stopped`, leaving
|
||||
finalization to another strategy in the chain such as
|
||||
`LLMTurnCompletionUserTurnStopStrategy`.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` —
|
||||
a generic stop strategy that finalizes the user turn whenever a
|
||||
`UserTurnInferenceCompletedFrame` arrives, regardless of which component
|
||||
produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base;
|
||||
future producers (Flux, custom end-of-turn classifiers, etc.) can use the
|
||||
base directly or subclass it to add producer-specific setup.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `on_user_turn_inference_triggered`, a new event on the user turn
|
||||
controller, processor, aggregator and stop strategies that fires when a
|
||||
strategy has enough signal to start LLM inference. By default it fires
|
||||
together with `on_user_turn_stopped`; a gating strategy can fire only the
|
||||
inference-triggered event and defer finalization to a peer.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `FilterIncompleteUserTurnStrategies` in
|
||||
`pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization
|
||||
that wraps the detector chain with `deferred(...)` and appends
|
||||
`LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case:
|
||||
`user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass
|
||||
`config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`.
|
||||
When installed, the strategy gates `on_user_turn_stopped` on a
|
||||
`UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by
|
||||
any component that can judge turn completeness — e.g. the
|
||||
`UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout`
|
||||
provides a safety net if no completion frame ever arrives.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added first-class RTVI support for the UI Agent Protocol:
|
||||
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server
|
||||
messages, plus `ui-command` and `ui-task` server-to-client messages, with
|
||||
paired `*Data` / `*Message` pydantic models.
|
||||
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`,
|
||||
`Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching
|
||||
default handlers live in `@pipecat-ai/client-react`.
|
||||
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`,
|
||||
and `ui-cancel-task` messages.
|
||||
- Adds five UI pipeline frames, mirroring the `client-message`
|
||||
frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` /
|
||||
`RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` /
|
||||
`UITaskMessage` envelopes, while the processor pushes inbound
|
||||
`RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame`
|
||||
alongside `on_ui_message`.
|
||||
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.
|
||||
(PR [#4407](https://github.com/pipecat-ai/pipecat/pull/4407))
|
||||
|
||||
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore
|
||||
processor now resolve credentials via the standard boto3 provider chain (EC2
|
||||
instance profiles, EKS pod roles / IRSA, ECS task roles, SSO,
|
||||
`~/.aws/credentials`) when explicit credentials and `AWS_*` environment
|
||||
variables are absent. Services running with IAM roles no longer need to
|
||||
export static credentials.
|
||||
(PR [#4416](https://github.com/pipecat-ai/pipecat/pull/4416))
|
||||
|
||||
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can
|
||||
bias transcription for both file-based and realtime transcription.
|
||||
(PR [#4426](https://github.com/pipecat-ai/pipecat/pull/4426))
|
||||
|
||||
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and
|
||||
`DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum
|
||||
silence duration before the watchdog sends a silence packet to prevent
|
||||
dangling turns. The actual threshold is `max(chunk_duration * 2,
|
||||
watchdog_min_timeout)`, so it also adapts automatically to the audio chunk
|
||||
size in use.
|
||||
(PR [#4430](https://github.com/pipecat-ai/pipecat/pull/4430))
|
||||
|
||||
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on
|
||||
models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini
|
||||
2.x); the conversation now continues while the tool runs. On models that
|
||||
don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time
|
||||
warning explaining the limitation. (Note: an intermittent 1008 error can
|
||||
occasionally fire on Gemini 2.5 during long-running tool calls; we
|
||||
auto-reconnect.)
|
||||
(PR [#4448](https://github.com/pipecat-ai/pipecat/pull/4448))
|
||||
|
||||
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition
|
||||
using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint.
|
||||
Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is
|
||||
VAD-aware, and automatically reconnects on error.
|
||||
(PR [#4464](https://github.com/pipecat-ai/pipecat/pull/4464))
|
||||
|
||||
- Added NVIDIA Magpie TTS services via AWS SageMaker:
|
||||
`NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM
|
||||
back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream
|
||||
with full interruption support via `InterruptibleTTSService`).
|
||||
(PR [#4464](https://github.com/pipecat-ai/pipecat/pull/4464))
|
||||
|
||||
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`,
|
||||
for use with reasoning-capable Realtime models such as `gpt-realtime-2`.
|
||||
(PR [#4470](https://github.com/pipecat-ai/pipecat/pull/4470))
|
||||
|
||||
- Inworld TTS updates:
|
||||
- Added `delivery_mode` setting (`STABLE`/`BALANCED`/`CREATIVE`) to
|
||||
`InworldTTSService` and `InworldHttpTTSService`, enabling the
|
||||
stability-vs-creativity tradeoff in `inworld-tts-2`.
|
||||
- Added language support to `InworldTTSService` and
|
||||
`InworldHttpTTSService`. The `language` setting is now forwarded to the API,
|
||||
and a new `language_to_inworld_language()` helper normalizes Pipecat
|
||||
`Language` enums to Inworld's BCP-47 locale tags.
|
||||
(PR [#4473](https://github.com/pipecat-ai/pipecat/pull/4473))
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated the default `SonioxTTSService` model from `tts-rt-v1-preview` to the
|
||||
generally available `tts-rt-v1`.
|
||||
(PR [#4386](https://github.com/pipecat-ai/pipecat/pull/4386))
|
||||
|
||||
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16`
|
||||
to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the
|
||||
`use_normalized_timestamps` and `max_buffer_delay_ms` fields.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead
|
||||
of the deprecated `use_original_timestamps` field. Word timestamps now
|
||||
reflect what was actually spoken (post text-normalization and
|
||||
pronunciation-dictionary substitution), matching the convention Pipecat uses
|
||||
for ElevenLabs. This is a behavior change for `sonic-3` users, who were
|
||||
previously receiving timestamps tied to the input transcript.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Broadened `tool_resources` to `app_resources` for easy access not just in
|
||||
tool handlers but in other places like custom `FrameProcessor`s. Three
|
||||
changes: a rename (`tool_resources` → `app_resources`), a new `app_resources`
|
||||
property on `PipelineTask`, and a new `pipeline_task` property on
|
||||
`FrameProcessor`. Tool handlers now read `params.app_resources`; custom
|
||||
processors read `self.pipeline_task.app_resources`. The previous
|
||||
`tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and
|
||||
`FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit
|
||||
`DeprecationWarning`s.
|
||||
(PR [#4395](https://github.com/pipecat-ai/pipecat/pull/4395))
|
||||
|
||||
- Lowered the per-message log in
|
||||
`SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App
|
||||
messages can be high-frequency and were noisy at debug level; set the loguru
|
||||
level to `TRACE` to see them again.
|
||||
(PR [#4397](https://github.com/pipecat-ai/pipecat/pull/4397))
|
||||
|
||||
- Changed the default model for `GrokRealtimeLLMService` to
|
||||
`grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The
|
||||
previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is
|
||||
being removed.
|
||||
(PR [#4401](https://github.com/pipecat-ai/pipecat/pull/4401))
|
||||
|
||||
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to
|
||||
`inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`,
|
||||
`InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing
|
||||
users can pin the prior model explicitly via the `model`/`tts_model`
|
||||
argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid
|
||||
model IDs.
|
||||
(PR [#4422](https://github.com/pipecat-ai/pipecat/pull/4422))
|
||||
|
||||
- Changed the default model for `GrokLLMService` from `grok-3` to
|
||||
`grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.
|
||||
(PR [#4429](https://github.com/pipecat-ai/pipecat/pull/4429))
|
||||
|
||||
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic:
|
||||
`max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms.
|
||||
This prevents false silence injections when large audio chunks are sent at
|
||||
lower frequency.
|
||||
(PR [#4430](https://github.com/pipecat-ai/pipecat/pull/4430))
|
||||
|
||||
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the
|
||||
turn is complete (on `on_turn_context_completed`) rather than waiting until
|
||||
all audio has finished playing back. The `isFinal` message from ElevenLabs is
|
||||
now used to signal `TTSStoppedFrame` and clean up the audio context,
|
||||
improving turn transition timing.
|
||||
(PR [#4433](https://github.com/pipecat-ai/pipecat/pull/4433))
|
||||
|
||||
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio
|
||||
encoding by default, which returns audio bytes without headers.
|
||||
(PR [#4446](https://github.com/pipecat-ai/pipecat/pull/4446))
|
||||
|
||||
- Moved `create_task`, `cancel_task`, the `task_manager` property, and
|
||||
`setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom
|
||||
`BaseObject` subclasses (turn strategies, controllers, etc.) now inherit
|
||||
these methods directly instead of reimplementing the task manager wiring.
|
||||
Owners propagate the task manager to their child `BaseObject`s via `await
|
||||
child.setup(task_manager)`.
|
||||
(PR [#4449](https://github.com/pipecat-ai/pipecat/pull/4449))
|
||||
|
||||
- Changed the default OpenAI Realtime input audio transcription model from
|
||||
`gpt-4o-transcribe` to `gpt-realtime-whisper` for both
|
||||
`OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does
|
||||
not accept the `prompt` parameter; if a prompt is supplied alongside
|
||||
`gpt-realtime-whisper`, it is dropped automatically and a warning is logged.
|
||||
To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or
|
||||
`"gpt-4o-mini-transcribe"`).
|
||||
(PR [#4450](https://github.com/pipecat-ai/pipecat/pull/4450))
|
||||
|
||||
- Updated the default model for `CartesiaTTSService` and
|
||||
`CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.
|
||||
(PR [#4462](https://github.com/pipecat-ai/pipecat/pull/4462))
|
||||
|
||||
- Changed the default model for `OpenAIRealtimeLLMService` from
|
||||
`gpt-realtime-1.5` to `gpt-realtime-2`.
|
||||
(PR [#4472](https://github.com/pipecat-ai/pipecat/pull/4472))
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use
|
||||
`user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add
|
||||
`LLMTurnCompletionUserTurnStopStrategy` to a custom
|
||||
`user_turn_strategies.stop`) instead. Setting the legacy flag still works for
|
||||
one release: the aggregator emits a `DeprecationWarning` and rewires the
|
||||
strategies as if you had passed `FilterIncompleteUserTurnStrategies`
|
||||
directly.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the
|
||||
`create_file_resampler()` / `create_stream_resampler()` factories).
|
||||
Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class
|
||||
will be removed in Pipecat 2.0 along with the default `resampy` and `numba`
|
||||
dependencies.
|
||||
(PR [#4428](https://github.com/pipecat-ai/pipecat/pull/4428))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as
|
||||
`ErrorFrame`s. The latest API emits a `flush_done` per transcript when
|
||||
server-side buffering is disabled; Pipecat now consumes them silently since
|
||||
each turn already has its own `context_id`.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`,
|
||||
`VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance
|
||||
(e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both
|
||||
the class and an instance.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200
|
||||
response — one with the API's error text and a second, less informative
|
||||
"Unknown error" frame from the outer exception handler. It now pushes a
|
||||
single frame that includes the HTTP status code and returns cleanly.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally
|
||||
for user turn stop strategies. It is now only imported when
|
||||
`default_user_turn_stop_strategies()` is called. This improves startup time
|
||||
and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning
|
||||
when the default stop strategies are not used.
|
||||
(PR [#4393](https://github.com/pipecat-ai/pipecat/pull/4393))
|
||||
|
||||
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was
|
||||
stored in `Settings` but never sent to xAI, so every session silently fell
|
||||
back to xAI's server-side default. The model is now passed via the `?model=`
|
||||
query parameter on the WebSocket URL as xAI's Voice Agent API requires.
|
||||
(PR [#4401](https://github.com/pipecat-ai/pipecat/pull/4401))
|
||||
|
||||
- Fixed `on_user_turn_stopped` firing prematurely when
|
||||
`filter_incomplete_user_turns` was enabled. The event now fires only after
|
||||
the LLM confirms the user turn is complete (`✓`); previously the smart-turn
|
||||
detector's tentative stop was bubbling up before the LLM had a chance to veto
|
||||
it, causing observers, transcript appenders and UI indicators to receive an
|
||||
early — and sometimes duplicated — signal.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting
|
||||
across two assistant messages in the LLM context and not surfacing in
|
||||
`on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted
|
||||
at the end of a TTS context now carries a PTS just past the last word so it
|
||||
can't overtake clock-queued `TTSTextFrame`s in the transport's output, and
|
||||
`LLMAssistantAggregator` now triggers
|
||||
`on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the
|
||||
frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting
|
||||
transcripts).
|
||||
(PR [#4414](https://github.com/pipecat-ai/pipecat/pull/4414))
|
||||
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged
|
||||
words (e.g. `bookLook`) when using Flash models. Flash often splits sentences
|
||||
mid-stream into alignment chunks that begin with a real inter-word space, but
|
||||
the previous fix unconditionally stripped that space from every chunk.
|
||||
Leading spaces are now stripped only on the first alignment chunk of an
|
||||
utterance, so subsequent chunks correctly flush partial words across
|
||||
boundaries.
|
||||
(PR [#4415](https://github.com/pipecat-ai/pipecat/pull/4415))
|
||||
|
||||
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor
|
||||
erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
|
||||
was set in the environment. The half-populated kwargs are no longer forwarded
|
||||
to aioboto3; partial env-var configurations now fall through to the boto3
|
||||
credential chain like fully-unset configurations do.
|
||||
(PR [#4416](https://github.com/pipecat-ai/pipecat/pull/4416))
|
||||
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing
|
||||
romanized/normalized text to the LLM context. With non-Latin input (e.g.,
|
||||
Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao
|
||||
!` instead of `你好!`), which then degraded subsequent LLM turns. The services
|
||||
now consume `alignment` by default and only switch to `normalizedAlignment` /
|
||||
`normalized_alignment` when `pronunciation_dictionary_locators` is configured
|
||||
(where `alignment` has overlapping restarts that produce duplicated/garbled
|
||||
words, per #4316). Both fields are read with preferred-with-fallback
|
||||
semantics since each is nullable per the API schema.
|
||||
(PR [#4424](https://github.com/pipecat-ai/pipecat/pull/4424))
|
||||
|
||||
- Fixed a deadlock in `TTSService` that could permanently stall pipeline
|
||||
processing when all three conditions occurred together:
|
||||
`pause_frame_processing=True`, an interruption arrived before any TTS audio
|
||||
was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`,
|
||||
`FunctionCallResultFrame`) was in the processing queue at that moment. The
|
||||
process task would block on `__process_event.wait()` indefinitely because
|
||||
`BotStoppedSpeakingFrame` never arrives (no audio was played) and the
|
||||
interruption handler did not resume processing. Affects services using
|
||||
`pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and
|
||||
ResembleAI.
|
||||
(PR [#4431](https://github.com/pipecat-ai/pipecat/pull/4431))
|
||||
|
||||
- Fixed interruptions being delayed when a slow non-uninterruptible frame was
|
||||
processing and an uninterruptible frame was waiting in the queue. The bot
|
||||
would stall until the slow frame finished instead of cancelling it
|
||||
immediately on interruption.
|
||||
(PR [#4434](https://github.com/pipecat-ai/pipecat/pull/4434))
|
||||
|
||||
- Fixed `TTSService` dropping uninterruptible frames (e.g.
|
||||
`FunctionCallResultFrame`) from its internal serialization queue when an
|
||||
interruption occurs. Previously, the queue was recreated on every
|
||||
interruption, silently discarding any queued frames. The queue is now reset
|
||||
instead of recreated, preserving uninterruptible frames so they are always
|
||||
delivered downstream.
|
||||
(PR [#4435](https://github.com/pipecat-ai/pipecat/pull/4435))
|
||||
|
||||
- Fixed a race condition in the Daily transport that caused `AttributeError:
|
||||
'NoneType' object has no attribute 'send_app_message'` when tearing down a
|
||||
pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the
|
||||
same `DailyTransportClient` and both call `cleanup()`, which was releasing
|
||||
the underlying `CallClient` on the first call — leaving the second caller
|
||||
with a `None` client.
|
||||
(PR [#4440](https://github.com/pipecat-ai/pipecat/pull/4440))
|
||||
|
||||
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService`
|
||||
and `OpenAIRealtimeLLMService`. These services previously honored the flag by
|
||||
simply not cancelling in-flight function calls on interruption; the
|
||||
introduction of the new async-tool mechanism (which threads
|
||||
started/intermediate/final messages through the LLM context) broke that path
|
||||
because the realtime services didn't know how to interpret those messages.
|
||||
Note that new-style streamed intermediate results
|
||||
(`FunctionCallResultProperties(is_final=False)`) are not supported on these
|
||||
realtime services. Similar fixes for other impacted realtime services are
|
||||
forthcoming.
|
||||
(PR [#4441](https://github.com/pipecat-ai/pipecat/pull/4441))
|
||||
|
||||
- Fixed two misspelled Gemini TTS voice names in
|
||||
`GeminiTTSService.AVAILABLE_VOICES`.
|
||||
(PR [#4443](https://github.com/pipecat-ai/pipecat/pull/4443))
|
||||
|
||||
- Extended the `cancel_on_interruption=False` regression fix to
|
||||
`GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and
|
||||
`UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in
|
||||
#4441 (each service detects async-tool messages in the LLM context and routes
|
||||
the final result to its formal tool-result channel; Azure inherits
|
||||
transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different
|
||||
approach because its API freezes the conversation between
|
||||
`client_tool_invocation` and the matching `client_tool_result` — for
|
||||
async-registered functions it now ships a placeholder `client_tool_result`
|
||||
immediately when the function is invoked (to unfreeze the conversation), then
|
||||
injects the real result as user-side text once the tool finishes. Streamed
|
||||
intermediate results (`FunctionCallResultProperties(is_final=False)`) are
|
||||
still not supported on any of these realtime services. `GeminiLiveLLMService`
|
||||
and `InworldRealtimeLLMService` are excluded for now: Gemini Live's
|
||||
async-tool path needs deeper investigation, and Inworld tool calling needs to
|
||||
be sorted out first.
|
||||
(PR [#4447](https://github.com/pipecat-ai/pipecat/pull/4447))
|
||||
|
||||
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses
|
||||
(observed with `gpt-realtime-2`). A single response can now contain more than
|
||||
one audio item, and the first item's `audio.done` may arrive after the second
|
||||
item's deltas have started. Deltas still arrive strictly in playback order,
|
||||
so we continue to forward them as received (matching OpenAI's reference
|
||||
implementation). The fix removes spurious warnings, ensures truncation always
|
||||
targets the latest audio item, and emits a single bracketing
|
||||
`TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is
|
||||
now pushed on `response.done`).
|
||||
(PR [#4465](https://github.com/pipecat-ai/pipecat/pull/4465))
|
||||
|
||||
- Fixed missing `output` attribute on LLM OpenTelemetry spans when the LLM call
|
||||
is interrupted mid-stream.
|
||||
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
|
||||
|
||||
- Fixed incorrect `metrics.ttfb` on STT OpenTelemetry spans, and parented them
|
||||
to the current turn span.
|
||||
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
|
||||
|
||||
- Fixed incorrect `metrics.ttfb` on TTS OpenTelemetry spans for streaming
|
||||
services.
|
||||
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
|
||||
|
||||
- Extended the `cancel_on_interruption=False` regression fix to
|
||||
`InworldRealtimeLLMService`. Uses the same approach as in #4441 (the service
|
||||
detects async-tool messages in the LLM context and routes the final result to
|
||||
its formal tool-result channel). Note: as of this writing, Inworld Realtime
|
||||
doesn't appear to handle the resulting delayed tool result reliably — the
|
||||
routing is best-effort and the service surfaces a one-time warning when
|
||||
async-tool messages are seen. Streamed intermediate results
|
||||
(`FunctionCallResultProperties(is_final=False)`) are still not supported on
|
||||
this realtime service. (Inworld was excluded from #4447 pending resolution of
|
||||
an unrelated tool-calling issue, which turned out to be an account-level
|
||||
matter.)
|
||||
(PR [#4474](https://github.com/pipecat-ai/pipecat/pull/4474))
|
||||
|
||||
- Fixed Cartesia TTS Korean word timestamps to use normal spacing rules,
|
||||
preserving word boundaries and per-word timestamp alignment during downstream
|
||||
aggregation.
|
||||
(PR [#4475](https://github.com/pipecat-ai/pipecat/pull/4475))
|
||||
|
||||
- Fixed Cartesia TTS Chinese and Japanese timestamp grouping to preserve
|
||||
provider text spacing, avoiding artificial spaces when timestamp groups are
|
||||
reassembled downstream.
|
||||
(PR [#4475](https://github.com/pipecat-ai/pipecat/pull/4475))
|
||||
|
||||
- Fixed `SonioxSTTService` final transcription frames missing detected language
|
||||
metadata when Soniox returns token-level language annotations.
|
||||
(PR [#4482](https://github.com/pipecat-ai/pipecat/pull/4482))
|
||||
|
||||
- Fixed Soniox final transcription language detection to use the most common
|
||||
recognized token language, avoiding mislabeling an utterance when the last
|
||||
token is tagged with a different language.
|
||||
(PR [#4495](https://github.com/pipecat-ai/pipecat/pull/4495))
|
||||
|
||||
- Fixed dropped audio in streaming TTS services whose wire protocol doesn't
|
||||
echo `context_id` back on incoming audio (Sarvam, Smallest, Soniox, Inworld,
|
||||
and others). Previously, audio that arrived between contexts or at the very
|
||||
start of a turn was tagged with `context_id=None` and silently dropped with
|
||||
an "unable to append audio to context: no context ID provided" debug log.
|
||||
`TTSService.get_active_audio_context_id()` now falls back to the
|
||||
synthesis-side `_turn_context_id` when the playback cursor isn't set yet.
|
||||
(PR [#4497](https://github.com/pipecat-ai/pipecat/pull/4497))
|
||||
|
||||
### Security
|
||||
|
||||
- Fixed a path traversal issue in the development runner's
|
||||
`/files/{filename:path}` download endpoint. Previously, when the runner was
|
||||
started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could
|
||||
escape the configured folder because `%2F`-encoded separators bypassed
|
||||
Starlette's path normalisation. The endpoint now resolves the joined path and
|
||||
rejects any filename that escapes the allowed base with a 403, and also
|
||||
returns 404 (instead of an implicit `null` 200) when `--folder` is unset.
|
||||
(PR [#4417](https://github.com/pipecat-ai/pipecat/pull/4417))
|
||||
|
||||
## [1.1.0] - 2026-04-27
|
||||
|
||||
### Added
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
- Added a `session_id` field to `RunnerArguments` so bots can log or trace a per-session identifier in local development the same way they can in Pipecat Cloud. The development runner now mints a UUID at every construction site, and paths that already returned a `sessionId` to the caller (Daily `/start`, dial-in webhook) share that same UUID with the runner args instead of generating two. The SmallWebRTC `/api/offer` endpoint also accepts an optional `session_id` query parameter so the `/sessions/{session_id}/...` proxy can thread it through.
|
||||
@@ -1 +0,0 @@
|
||||
- Updated the default `SonioxTTSService` model from `tts-rt-v1-preview` to the generally available `tts-rt-v1`.
|
||||
@@ -1 +0,0 @@
|
||||
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService` for controlling Cartesia's server-side text buffering. When unset, Pipecat picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE` mode (custom buffering — avoids stacking client-side aggregation on top of Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode (Cartesia's managed buffering applies). Pass an explicit value (0–5000ms) to override.
|
||||
@@ -1 +0,0 @@
|
||||
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16` to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the `use_normalized_timestamps` and `max_buffer_delay_ms` fields.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead of the deprecated `use_original_timestamps` field. Word timestamps now reflect what was actually spoken (post text-normalization and pronunciation-dictionary substitution), matching the convention Pipecat uses for ElevenLabs. This is a behavior change for `sonic-3` users, who were previously receiving timestamps tied to the input transcript.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200 response — one with the API's error text and a second, less informative "Unknown error" frame from the outer exception handler. It now pushes a single frame that includes the HTTP status code and returns cleanly.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`, `VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance (e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both the class and an instance.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as `ErrorFrame`s. The latest API emits a `flush_done` per transcript when server-side buffering is disabled; Pipecat now consumes them silently since each turn already has its own `context_id`.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally for user turn stop strategies. It is now only imported when `default_user_turn_stop_strategies()` is called. This improves startup time and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning when the default stop strategies are not used.
|
||||
@@ -1 +0,0 @@
|
||||
- Broadened `tool_resources` to `app_resources` for easy access not just in tool handlers but in other places like custom `FrameProcessor`s. Three changes: a rename (`tool_resources` → `app_resources`), a new `app_resources` property on `PipelineTask`, and a new `pipeline_task` property on `FrameProcessor`. Tool handlers now read `params.app_resources`; custom processors read `self.pipeline_task.app_resources`. The previous `tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and `FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit `DeprecationWarning`s.
|
||||
@@ -1 +0,0 @@
|
||||
- Lowered the per-message log in `SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App messages can be high-frequency and were noisy at debug level; set the loguru level to `TRACE` to see them again.
|
||||
@@ -1 +0,0 @@
|
||||
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and `DeepgramHttpTTSService` so callers can opt out of the Deepgram Model Improvement Program. When set, the value is forwarded to Deepgram as a query parameter on the speak request. Defaults to `None`, which preserves the existing behavior. See https://dpgr.am/deepgram-mip for pricing implications before enabling.
|
||||
@@ -1 +0,0 @@
|
||||
- Changed the default model for `GrokRealtimeLLMService` to `grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is being removed.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was stored in `Settings` but never sent to xAI, so every session silently fell back to xAI's server-side default. The model is now passed via the `?model=` query parameter on the WebSocket URL as xAI's Voice Agent API requires.
|
||||
@@ -1 +0,0 @@
|
||||
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that appends a developer-role message to the context whenever `LLMSetToolsFrame` changes the set of advertised standard tools. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.
|
||||
@@ -1 +0,0 @@
|
||||
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.
|
||||
@@ -1,6 +0,0 @@
|
||||
- Added first-class RTVI support for the UI Agent Protocol:
|
||||
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server messages, plus `ui-command` and `ui-task` server-to-client messages, with paired `*Data` / `*Message` pydantic models.
|
||||
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`, `Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching default handlers live in `@pipecat-ai/client-react`.
|
||||
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`, and `ui-cancel-task` messages.
|
||||
- Adds five UI pipeline frames, mirroring the `client-message` frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` / `RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` / `UITaskMessage` envelopes, while the processor pushes inbound `RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame` alongside `on_ui_message`.
|
||||
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting across two assistant messages in the LLM context and not surfacing in `on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted at the end of a TTS context now carries a PTS just past the last word so it can't overtake clock-queued `TTSTextFrame`s in the transport's output, and `LLMAssistantAggregator` now triggers `on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting transcripts).
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged words (e.g. `bookLook`) when using Flash models. Flash often splits sentences mid-stream into alignment chunks that begin with a real inter-word space, but the previous fix unconditionally stripped that space from every chunk. Leading spaces are now stripped only on the first alignment chunk of an utterance, so subsequent chunks correctly flush partial words across boundaries.
|
||||
@@ -1 +0,0 @@
|
||||
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor now resolve credentials via the standard boto3 provider chain (EC2 instance profiles, EKS pod roles / IRSA, ECS task roles, SSO, `~/.aws/credentials`) when explicit credentials and `AWS_*` environment variables are absent. Services running with IAM roles no longer need to export static credentials.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` was set in the environment. The half-populated kwargs are no longer forwarded to aioboto3; partial env-var configurations now fall through to the boto3 credential chain like fully-unset configurations do.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed a path traversal issue in the development runner's `/files/{filename:path}` download endpoint. Previously, when the runner was started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could escape the configured folder because `%2F`-encoded separators bypassed Starlette's path normalisation. The endpoint now resolves the joined path and rejects any filename that escapes the allowed base with a 403, and also returns 404 (instead of an implicit `null` 200) when `--folder` is unset.
|
||||
@@ -1 +0,0 @@
|
||||
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to `inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`, `InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing users can pin the prior model explicitly via the `model`/`tts_model` argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid model IDs.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing romanized/normalized text to the LLM context. With non-Latin input (e.g., Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao !` instead of `你好!`), which then degraded subsequent LLM turns. The services now consume `alignment` by default and only switch to `normalizedAlignment` / `normalized_alignment` when `pronunciation_dictionary_locators` is configured (where `alignment` has overlapping restarts that produce duplicated/garbled words, per #4316). Both fields are read with preferred-with-fallback semantics since each is nullable per the API schema.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can bias transcription for both file-based and realtime transcription.
|
||||
@@ -1 +0,0 @@
|
||||
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the `create_file_resampler()` / `create_stream_resampler()` factories). Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class will be removed in Pipecat 2.0 along with the default `resampy` and `numba` dependencies.
|
||||
@@ -1 +0,0 @@
|
||||
- Changed the default model for `GrokLLMService` from `grok-3` to `grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.
|
||||
@@ -1 +0,0 @@
|
||||
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed a deadlock in `TTSService` that could permanently stall pipeline processing when all three conditions occurred together: `pause_frame_processing=True`, an interruption arrived before any TTS audio was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`, `FunctionCallResultFrame`) was in the processing queue at that moment. The process task would block on `__process_event.wait()` indefinitely because `BotStoppedSpeakingFrame` never arrives (no audio was played) and the interruption handler did not resume processing. Affects services using `pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and ResembleAI.
|
||||
@@ -1 +0,0 @@
|
||||
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed interruptions being delayed when a slow non-uninterruptible frame was processing and an uninterruptible frame was waiting in the queue. The bot would stall until the slow frame finished instead of cancelling it immediately on interruption.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `TTSService` dropping uninterruptible frames (e.g. `FunctionCallResultFrame`) from its internal serialization queue when an interruption occurs. Previously, the queue was recreated on every interruption, silently discarding any queued frames. The queue is now reset instead of recreated, preserving uninterruptible frames so they are always delivered downstream.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed a race condition in the Daily transport that caused `AttributeError: 'NoneType' object has no attribute 'send_app_message'` when tearing down a pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the same `DailyTransportClient` and both call `cleanup()`, which was releasing the underlying `CallClient` on the first call — leaving the second caller with a `None` client.
|
||||
@@ -1 +0,0 @@
|
||||
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService` and `OpenAIRealtimeLLMService`. These services previously honored the flag by simply not cancelling in-flight function calls on interruption; the introduction of the new async-tool mechanism (which threads started/intermediate/final messages through the LLM context) broke that path because the realtime services didn't know how to interpret those messages. Note that new-style streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are not supported on these realtime services. Similar fixes for other impacted realtime services are forthcoming.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed two misspelled Gemini TTS voice names in `GeminiTTSService.AVAILABLE_VOICES`.
|
||||
@@ -1 +0,0 @@
|
||||
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio encoding by default, which returns audio bytes without headers.
|
||||
@@ -1 +0,0 @@
|
||||
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini 2.x); the conversation now continues while the tool runs. On models that don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time warning explaining the limitation. (Note: an intermittent 1008 error can occasionally fire on Gemini 2.5 during long-running tool calls; we auto-reconnect.)
|
||||
@@ -1 +0,0 @@
|
||||
- Moved `create_task`, `cancel_task`, the `task_manager` property, and `setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom `BaseObject` subclasses (turn strategies, controllers, etc.) now inherit these methods directly instead of reimplementing the task manager wiring. Owners propagate the task manager to their child `BaseObject`s via `await child.setup(task_manager)`.
|
||||
@@ -1 +0,0 @@
|
||||
- Changed the default OpenAI Realtime input audio transcription model from `gpt-4o-transcribe` to `gpt-realtime-whisper` for both `OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does not accept the `prompt` parameter; if a prompt is supplied alongside `gpt-realtime-whisper`, it is dropped automatically and a warning is logged. To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or `"gpt-4o-mini-transcribe"`).
|
||||
@@ -1 +0,0 @@
|
||||
- Updated the default model for `CartesiaTTSService` and `CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.
|
||||
@@ -1 +0,0 @@
|
||||
- Added NVIDIA Magpie TTS services via AWS SageMaker: `NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream with full interruption support via `InterruptibleTTSService`).
|
||||
@@ -1 +0,0 @@
|
||||
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint. Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is VAD-aware, and automatically reconnects on error.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses (observed with `gpt-realtime-2`). A single response can now contain more than one audio item, and the first item's `audio.done` may arrive after the second item's deltas have started. Deltas still arrive strictly in playback order, so we continue to forward them as received (matching OpenAI's reference implementation). The fix removes spurious warnings, ensures truncation always targets the latest audio item, and emits a single bracketing `TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is now pushed on `response.done`).
|
||||
@@ -1 +0,0 @@
|
||||
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.
|
||||
@@ -1 +0,0 @@
|
||||
- Changed the default model for `OpenAIRealtimeLLMService` from `gpt-realtime-1.5` to `gpt-realtime-2`.
|
||||
@@ -68,9 +68,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.environ["OPENAI_API_KEY"],
|
||||
settings=OpenAITTSService.Settings(
|
||||
instructions="Please speak clearly and at a moderate pace.",
|
||||
voice="ballad",
|
||||
),
|
||||
instructions="Please speak clearly and at a moderate pace.",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
|
||||
@@ -71,8 +71,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
llm = QwenLLMService(
|
||||
api_key=os.environ["QWEN_API_KEY"],
|
||||
model="qwen2.5-72b-instruct",
|
||||
settings=QwenLLMService.Settings(
|
||||
model="qwen2.5-72b-instruct",
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
|
||||
@@ -28,10 +28,14 @@ Usage:
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.observers.loggers.transcription_log_observer import (
|
||||
TranscriptionLogObserver,
|
||||
@@ -48,6 +52,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
@@ -55,6 +60,43 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = (
|
||||
random.randint(60, 85)
|
||||
if params.arguments["format"] == "fahrenheit"
|
||||
else random.randint(15, 30)
|
||||
)
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"location": params.arguments["location"],
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
tools = ToolsSchema(standard_tools=[weather_function])
|
||||
|
||||
|
||||
# --- Transport Configuration ---
|
||||
|
||||
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
|
||||
@@ -85,7 +127,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# See: https://docs.inworld.ai/router/introduction
|
||||
llm = InworldRealtimeLLMService(
|
||||
api_key=os.environ["INWORLD_API_KEY"],
|
||||
llm_model="xai/grok-4-1-fast-non-reasoning",
|
||||
llm_model="openai/gpt-4.1-mini",
|
||||
voice="Sarah",
|
||||
settings=InworldRealtimeLLMService.Settings(
|
||||
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
|
||||
@@ -97,9 +139,14 @@ Always be helpful and proactive in offering assistance.""",
|
||||
),
|
||||
)
|
||||
|
||||
# Create context with initial message
|
||||
# Note: function calling requires a paid Inworld account and a
|
||||
# function-calling-capable model
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
# Create context with initial message + tools
|
||||
context = LLMContext(
|
||||
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
@@ -51,7 +51,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
stt = GradiumSTTService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
api_endpoint_base_url="wss://us.api.gradium.ai/api/speech/asr",
|
||||
settings=GradiumSTTService.Settings(
|
||||
language=Language.EN,
|
||||
delay_in_frames=8,
|
||||
|
||||
@@ -0,0 +1,201 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example 22: Filter Incomplete Turns
|
||||
|
||||
Demonstrates LLM-based turn completion detection to suppress bot responses when
|
||||
the user was cut off mid-thought. The LLM outputs one of three markers:
|
||||
- ✓ (complete): User finished their thought, respond normally
|
||||
- ○ (incomplete short): User was cut off, wait ~5s then prompt
|
||||
- ◐ (incomplete long): User needs time to think, wait ~10s then prompt
|
||||
|
||||
When incomplete is detected, the bot's response is suppressed. After the timeout
|
||||
expires, the LLM is automatically prompted to re-engage the user.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def get_weather(params: FunctionCallParams, location: str):
|
||||
"""Return the current weather for a location.
|
||||
|
||||
A stub that always reports the same conditions — replace with a real
|
||||
weather API in production.
|
||||
|
||||
Args:
|
||||
location (str): The city and state or country, e.g. "Paris, France".
|
||||
"""
|
||||
await params.result_callback(
|
||||
{
|
||||
"location": location,
|
||||
"temperature_celsius": 22,
|
||||
"conditions": "partly cloudy",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.environ["OPENAI_API_KEY"],
|
||||
settings=OpenAILLMService.Settings(
|
||||
system_instruction=(
|
||||
"You are a helpful assistant in a voice conversation. Your "
|
||||
"responses will be spoken aloud, so avoid emojis, bullet "
|
||||
"points, or other formatting that can't be spoken. Respond to "
|
||||
"what the user said in a creative, helpful, and brief way. "
|
||||
"If the user asks about the weather, call the get_weather "
|
||||
"tool and speak the result back naturally."
|
||||
),
|
||||
),
|
||||
)
|
||||
llm.register_direct_function(get_weather)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.environ["CARTESIA_API_KEY"],
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
context = LLMContext(tools=ToolsSchema(standard_tools=[get_weather]))
|
||||
# `FilterIncompleteUserTurnStrategies` pairs the default detector
|
||||
# chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors
|
||||
# trigger LLM inference but the public `on_user_turn_stopped` event
|
||||
# fires only when the LLM confirms ✓. The LLM marks each response
|
||||
# with one of:
|
||||
# ✓ = complete (respond normally)
|
||||
# ○ = incomplete short (wait 5s, then prompt)
|
||||
# ◐ = incomplete long (wait 10s, then prompt)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
user_turn_strategies=FilterIncompleteUserTurnStrategies(
|
||||
# Optional: customize turn completion behavior
|
||||
# config=UserTurnCompletionConfig(
|
||||
# incomplete_short_timeout=5.0,
|
||||
# incomplete_long_timeout=10.0,
|
||||
# incomplete_short_prompt="Custom prompt...",
|
||||
# incomplete_long_prompt="Custom prompt...",
|
||||
# instructions="Custom turn completion instructions...",
|
||||
# ),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}user: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
|
||||
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}assistant: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -50,10 +50,7 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = GradiumSTTService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
api_endpoint_base_url="wss://us.api.gradium.ai/api/speech/asr",
|
||||
)
|
||||
stt = GradiumSTTService(api_key=os.environ["GRADIUM_API_KEY"])
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.environ["CARTESIA_API_KEY"],
|
||||
|
||||
@@ -55,7 +55,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
tts = GradiumTTSService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
settings=GradiumTTSService.Settings(voice="YTpq7expH9539ERJ"),
|
||||
url="wss://us.api.gradium.ai/api/speech/tts",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
|
||||
@@ -54,7 +54,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
stt = GradiumSTTService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
api_endpoint_base_url="wss://us.api.gradium.ai/api/speech/asr",
|
||||
settings=GradiumSTTService.Settings(
|
||||
language=Language.EN,
|
||||
),
|
||||
@@ -62,7 +61,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
tts = GradiumTTSService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
url="wss://us.api.gradium.ai/api/speech/tts",
|
||||
settings=GradiumTTSService.Settings(
|
||||
voice="YTpq7expH9539ERJ",
|
||||
),
|
||||
|
||||
@@ -58,6 +58,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Add strict mode to enforce the language hints
|
||||
language_hints=[Language.EN],
|
||||
language_hints_strict=True,
|
||||
enable_language_identification=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -242,6 +242,7 @@ TESTS_VIDEO_AVATAR = [
|
||||
|
||||
TESTS_TURN_MANAGEMENT = [
|
||||
("turn-management/turn-management-filter-incomplete-turns.py", EVAL_COMPLETE_TURN),
|
||||
("turn-management/turn-management-filter-incomplete-turns-function-calling.py", EVAL_WEATHER),
|
||||
]
|
||||
|
||||
TESTS_THINKING = [
|
||||
|
||||
@@ -419,30 +419,30 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
"""Convenience method to create a speed tag."""
|
||||
return f'<speed ratio="{speed}" />'
|
||||
|
||||
def _is_cjk_language(self, language: str) -> bool:
|
||||
"""Check if the given language is CJK (Chinese, Japanese, Korean).
|
||||
def _is_chinese_or_japanese_language(self, language: str) -> bool:
|
||||
"""Check if the given language is Chinese or Japanese.
|
||||
|
||||
Args:
|
||||
language: The language code to check.
|
||||
|
||||
Returns:
|
||||
True if the language is Chinese, Japanese, or Korean.
|
||||
True if the language is Chinese or Japanese.
|
||||
"""
|
||||
cjk_languages = {"zh", "ja", "ko"}
|
||||
base_lang = language.split("-")[0].lower()
|
||||
return base_lang in cjk_languages
|
||||
return base_lang in {"zh", "ja"}
|
||||
|
||||
def _process_word_timestamps_for_language(
|
||||
self, words: list[str], starts: list[float]
|
||||
) -> list[tuple[str, float]]:
|
||||
"""Process word timestamps based on the current language.
|
||||
|
||||
For CJK languages, Cartesia groups related characters in the same timestamp message.
|
||||
For Chinese and Japanese, Cartesia groups related characters in the same timestamp
|
||||
message.
|
||||
For example, in Japanese a single message might be `['こ', 'ん', 'に', 'ち', 'は', '。']`.
|
||||
We combine these into single words so the downstream aggregator can add natural
|
||||
spacing between meaningful units rather than individual characters.
|
||||
|
||||
For non-CJK languages, words are already properly separated and are used as-is.
|
||||
For other languages, words are already properly separated and are used as-is.
|
||||
|
||||
Args:
|
||||
words: List of words/characters from Cartesia.
|
||||
@@ -453,10 +453,10 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
"""
|
||||
current_language = assert_given(self._settings.language)
|
||||
|
||||
# Check if this is a CJK language (if language is None, treat as non-CJK)
|
||||
if current_language and self._is_cjk_language(current_language):
|
||||
# For CJK languages, combine all characters in this message into one word
|
||||
# using the first character's start time
|
||||
# Check if this is a Chinese/Japanese language (if language is None, treat as other)
|
||||
if current_language and self._is_chinese_or_japanese_language(current_language):
|
||||
# For Chinese/Japanese, combine all characters in this message into one word
|
||||
# using the first character's start time.
|
||||
if words and starts:
|
||||
combined_word = "".join(words)
|
||||
first_start = starts[0]
|
||||
@@ -467,6 +467,11 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
# For non-CJK languages, use as-is
|
||||
return list(zip(words, starts))
|
||||
|
||||
def _word_timestamps_include_inter_frame_spaces(self) -> bool:
|
||||
"""Whether timestamp text should be treated as carrying its own spacing."""
|
||||
current_language = assert_given(self._settings.language)
|
||||
return bool(current_language and self._is_chinese_or_japanese_language(current_language))
|
||||
|
||||
def _build_msg(
|
||||
self,
|
||||
text: str = "",
|
||||
@@ -660,7 +665,13 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
processed_timestamps = self._process_word_timestamps_for_language(
|
||||
msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]
|
||||
)
|
||||
await self.add_word_timestamps(processed_timestamps, ctx_id)
|
||||
await self.add_word_timestamps(
|
||||
processed_timestamps,
|
||||
ctx_id,
|
||||
includes_inter_frame_spaces=(
|
||||
True if self._word_timestamps_include_inter_frame_spaces() else None
|
||||
),
|
||||
)
|
||||
elif msg["type"] == "chunk":
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
|
||||
@@ -150,7 +150,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
api_endpoint_base_url: str = "wss://eu.api.gradium.ai/api/speech/asr",
|
||||
api_endpoint_base_url: str = "wss://api.gradium.ai/api/speech/asr",
|
||||
encoding: str = "pcm",
|
||||
sample_rate: int | None = None,
|
||||
params: InputParams | None = None,
|
||||
@@ -163,7 +163,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
|
||||
Args:
|
||||
api_key: Gradium API key for authentication.
|
||||
api_endpoint_base_url: WebSocket endpoint URL. Defaults to Gradium's streaming endpoint.
|
||||
api_endpoint_base_url: WebSocket endpoint URL.
|
||||
encoding: Base audio encoding type. One of "pcm", "wav", or "opus".
|
||||
For PCM, the sample rate is appended automatically from the
|
||||
pipeline's audio_in_sample_rate (e.g., "pcm" becomes "pcm_16000").
|
||||
|
||||
@@ -68,7 +68,7 @@ class GradiumTTSService(WebsocketTTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str | None = None,
|
||||
url: str = "wss://eu.api.gradium.ai/api/speech/tts",
|
||||
url: str = "wss://api.gradium.ai/api/speech/tts",
|
||||
model: str | None = None,
|
||||
json_config: str | None = None,
|
||||
params: InputParams | None = None,
|
||||
|
||||
@@ -48,7 +48,8 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators import async_tool_messages
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
|
||||
from pipecat.services.settings import (
|
||||
@@ -361,6 +362,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
self._messages_added_manually = {}
|
||||
self._pending_function_calls = {}
|
||||
self._completed_tool_calls = set()
|
||||
self._async_tool_warning_logged: bool = False
|
||||
|
||||
self._register_event_handler("on_conversation_item_created")
|
||||
self._register_event_handler("on_conversation_item_updated")
|
||||
@@ -629,6 +631,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
self._receive_task = None
|
||||
|
||||
self._completed_tool_calls = set()
|
||||
self._async_tool_warning_logged = False
|
||||
self._audio_buffer = b""
|
||||
self._interim_transcription_text = ""
|
||||
self._disconnecting = False
|
||||
@@ -1000,9 +1003,80 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
|
||||
async def _process_completed_function_calls(self, send_new_results: bool):
|
||||
"""Process completed function calls and send results to the service."""
|
||||
# If the user registered a function with cancel_on_interruption=False,
|
||||
# the aggregator emits async-tool-style messages into the context. As
|
||||
# of this writing, Inworld Realtime doesn't appear to handle the
|
||||
# resulting delayed tool result reliably — the routing code below
|
||||
# is best-effort. Surface a one-time warning so users see they're not
|
||||
# getting what they expect.
|
||||
if not self._async_tool_warning_logged:
|
||||
for message in self._context.get_messages():
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
continue
|
||||
if async_tool_messages.parse_message(message) is not None:
|
||||
logger.error(
|
||||
f"{self}: cancel_on_interruption=False is not reliably "
|
||||
f"supported by Inworld Realtime as of this writing. "
|
||||
f"Use cancel_on_interruption=True (the default), or "
|
||||
f"consider another LLM service if your tool needs the "
|
||||
f"async semantics."
|
||||
)
|
||||
await self.push_error(
|
||||
error_msg=(
|
||||
"cancel_on_interruption=False is not reliably supported "
|
||||
"by Inworld Realtime as of this writing."
|
||||
),
|
||||
)
|
||||
self._async_tool_warning_logged = True
|
||||
break
|
||||
|
||||
sent_new_result = False
|
||||
|
||||
for message in self._context.get_messages():
|
||||
# LLMSpecificMessages are opaque provider-specific payloads, not
|
||||
# standard tool-result messages — skip them.
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
continue
|
||||
|
||||
# Async-tool messages live alongside regular tool messages in the
|
||||
# context; detect and route them before the regular logic so we
|
||||
# don't try to send the async-tool envelope JSON as a tool result.
|
||||
async_payload = async_tool_messages.parse_message(message)
|
||||
if async_payload is not None:
|
||||
if async_payload.tool_call_id in self._completed_tool_calls:
|
||||
continue
|
||||
if async_payload.kind == "started":
|
||||
# The provider already issued the tool call and natively
|
||||
# awaits a result; nothing to send for the started marker.
|
||||
continue
|
||||
if async_payload.kind == "intermediate":
|
||||
logger.error(
|
||||
f"{self}: Inworld Realtime does not support streamed async "
|
||||
f"tool results; dropping intermediate result for "
|
||||
f"tool_call_id={async_payload.tool_call_id}. Consider "
|
||||
f"another LLM service if your tool needs to stream "
|
||||
f"intermediate results."
|
||||
)
|
||||
await self.push_error(
|
||||
error_msg="Inworld Realtime does not support streamed async tool results.",
|
||||
)
|
||||
continue
|
||||
if async_payload.kind == "final":
|
||||
# Deliver via the formal tool-result channel — same path
|
||||
# as a synchronous tool result, just delayed.
|
||||
if send_new_results:
|
||||
sent_new_result = True
|
||||
await self._send_tool_result(
|
||||
async_payload.tool_call_id, async_payload.result
|
||||
)
|
||||
self._completed_tool_calls.add(async_payload.tool_call_id)
|
||||
continue
|
||||
# Defensive: any async-tool message must not fall through
|
||||
# to the regular tool-result block below, even if it
|
||||
# carries a kind we don't recognize.
|
||||
continue
|
||||
|
||||
# Look for newly-completed "regular" (as opposed to async-tool) results
|
||||
if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS":
|
||||
tool_call_id = message.get("tool_call_id")
|
||||
if tool_call_id and tool_call_id not in self._completed_tool_calls:
|
||||
@@ -1011,6 +1085,8 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
await self._send_tool_result(tool_call_id, message.get("content"))
|
||||
self._completed_tool_calls.add(tool_call_id)
|
||||
|
||||
# If we reported any new tool call results to the service, trigger
|
||||
# another response
|
||||
if sent_new_result:
|
||||
await self._create_response()
|
||||
|
||||
|
||||
@@ -60,9 +60,40 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService
|
||||
from pipecat.transcriptions.language import Language, resolve_language
|
||||
from pipecat.utils.tracing.service_decorators import traced_tts
|
||||
|
||||
|
||||
def language_to_inworld_language(language: Language) -> str:
|
||||
"""Convert a Language enum to an Inworld TTS BCP-47 language tag.
|
||||
|
||||
Args:
|
||||
language: The Language enum value to convert.
|
||||
|
||||
Returns:
|
||||
The corresponding Inworld BCP-47 language tag (e.g. ``"en-US"``).
|
||||
Unverified languages fall back to their BCP-47 string value with a warning.
|
||||
"""
|
||||
LANGUAGE_MAP = {
|
||||
Language.AR: "ar-SA",
|
||||
Language.DE: "de-DE",
|
||||
Language.EN: "en-US",
|
||||
Language.ES: "es-ES",
|
||||
Language.FR: "fr-FR",
|
||||
Language.HE: "he-IL",
|
||||
Language.HI: "hi-IN",
|
||||
Language.IT: "it-IT",
|
||||
Language.JA: "ja-JP",
|
||||
Language.KO: "ko-KR",
|
||||
Language.NL: "nl-NL",
|
||||
Language.PL: "pl-PL",
|
||||
Language.PT: "pt-BR",
|
||||
Language.RU: "ru-RU",
|
||||
Language.ZH: "zh-CN",
|
||||
}
|
||||
return resolve_language(language, LANGUAGE_MAP, use_base_code=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class InworldTTSSettings(TTSSettings):
|
||||
"""Settings for InworldTTSService and InworldHttpTTSService.
|
||||
@@ -70,10 +101,18 @@ class InworldTTSSettings(TTSSettings):
|
||||
Parameters:
|
||||
speaking_rate: Speaking rate for speech synthesis.
|
||||
temperature: Temperature for speech synthesis.
|
||||
delivery_mode: Controls the stability vs. creativity tradeoff.
|
||||
``"STABLE"`` produces reliable, predictable speech.
|
||||
``"BALANCED"`` is the default midpoint.
|
||||
``"CREATIVE"`` produces more expressive, emotionally varied speech.
|
||||
Only supported by ``inworld-tts-2``.
|
||||
"""
|
||||
|
||||
speaking_rate: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
temperature: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
delivery_mode: Literal["STABLE", "BALANCED", "CREATIVE"] | None | _NotGiven = field(
|
||||
default_factory=lambda: NOT_GIVEN
|
||||
)
|
||||
|
||||
_aliases: ClassVar[dict[str, str]] = {
|
||||
"voiceId": "voice",
|
||||
@@ -167,6 +206,7 @@ class InworldHttpTTSService(TTSService):
|
||||
language=None,
|
||||
speaking_rate=None,
|
||||
temperature=None,
|
||||
delivery_mode=None,
|
||||
)
|
||||
|
||||
# 2. Apply direct init arg overrides (deprecated)
|
||||
@@ -227,6 +267,17 @@ class InworldHttpTTSService(TTSService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
"""Convert a Language enum to Inworld language format.
|
||||
|
||||
Args:
|
||||
language: The language to convert.
|
||||
|
||||
Returns:
|
||||
The Inworld-specific BCP-47 language code, or None if not supported.
|
||||
"""
|
||||
return language_to_inworld_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Inworld TTS service.
|
||||
|
||||
@@ -313,6 +364,10 @@ class InworldHttpTTSService(TTSService):
|
||||
|
||||
if self._settings.temperature is not None:
|
||||
payload["temperature"] = self._settings.temperature
|
||||
if self._settings.delivery_mode is not None:
|
||||
payload["deliveryMode"] = self._settings.delivery_mode
|
||||
if self._settings.language is not None:
|
||||
payload["language"] = self._settings.language
|
||||
|
||||
# Use WORD timestamps for simplicity and correct spacing/capitalization
|
||||
payload["timestampType"] = self._timestamp_type
|
||||
@@ -609,6 +664,7 @@ class InworldTTSService(WebsocketTTSService):
|
||||
language=None,
|
||||
speaking_rate=None,
|
||||
temperature=None,
|
||||
delivery_mode=None,
|
||||
)
|
||||
|
||||
# 2. Apply direct init arg overrides (deprecated)
|
||||
@@ -700,6 +756,17 @@ class InworldTTSService(WebsocketTTSService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
"""Convert a Language enum to Inworld language format.
|
||||
|
||||
Args:
|
||||
language: The language to convert.
|
||||
|
||||
Returns:
|
||||
The Inworld-specific BCP-47 language code, or None if not supported.
|
||||
"""
|
||||
return language_to_inworld_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Inworld WebSocket TTS service.
|
||||
|
||||
@@ -1089,6 +1156,10 @@ class InworldTTSService(WebsocketTTSService):
|
||||
|
||||
if self._settings.temperature is not None:
|
||||
create_config["temperature"] = self._settings.temperature
|
||||
if self._settings.delivery_mode is not None:
|
||||
create_config["deliveryMode"] = self._settings.delivery_mode
|
||||
if self._settings.language is not None:
|
||||
create_config["language"] = self._settings.language
|
||||
if self._apply_text_normalization is not None:
|
||||
create_config["applyTextNormalization"] = self._apply_text_normalization
|
||||
if self._auto_mode is not None:
|
||||
|
||||
@@ -280,6 +280,8 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
self._client: SageMakerBidiClient | None = None
|
||||
self._receive_task = None
|
||||
self._speech_completed_event = asyncio.Event()
|
||||
self._audio_buffer = b""
|
||||
self._playback_started = False
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
@@ -377,7 +379,12 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
logger.info(f"{self}: verifying if websocket connection is active {active}")
|
||||
return active
|
||||
|
||||
def _reset_audio_buffer(self):
|
||||
self._audio_buffer = b""
|
||||
self._playback_started = False
|
||||
|
||||
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
|
||||
self._reset_audio_buffer()
|
||||
if self._bot_speaking and self._client:
|
||||
logger.debug(
|
||||
f"{self}: interruption detected, sending input_text.done and waiting for speech.completed"
|
||||
@@ -391,6 +398,30 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
logger.warning(f"{self}: timed out waiting for conversation.item.speech.completed")
|
||||
await super()._handle_interruption(frame, direction)
|
||||
|
||||
async def _handle_audio_chunk(self, audio: bytes, context_id: str | None = None):
|
||||
"""Buffer audio and emit frames using a jitter-buffer approach.
|
||||
|
||||
Holds back audio until chunk_size bytes have been accumulated (to avoid
|
||||
glitches at the start of playback), then emits each subsequent chunk
|
||||
immediately as it arrives.
|
||||
"""
|
||||
self._audio_buffer += audio
|
||||
|
||||
if not self._playback_started:
|
||||
if len(self._audio_buffer) < self.chunk_size:
|
||||
return
|
||||
self._playback_started = True
|
||||
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=self._audio_buffer,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
context_id=context_id,
|
||||
)
|
||||
)
|
||||
self._audio_buffer = b""
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Receive NIM JSON events and push audio frames."""
|
||||
while self._client and self._client.is_active and not self._disconnecting:
|
||||
@@ -415,14 +446,7 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
msg = json.loads(payload.decode("utf-8"))
|
||||
except (UnicodeDecodeError, json.JSONDecodeError):
|
||||
# Unexpected binary frame — treat as raw PCM
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=payload,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
context_id=context_id,
|
||||
)
|
||||
)
|
||||
await self._handle_audio_chunk(payload, context_id)
|
||||
continue
|
||||
|
||||
event_type = msg.get("type", "")
|
||||
@@ -434,14 +458,7 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
chunk_b64 = msg.get("audio", "")
|
||||
if chunk_b64:
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=base64.b64decode(chunk_b64),
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
context_id=context_id,
|
||||
)
|
||||
)
|
||||
await self._handle_audio_chunk(base64.b64decode(chunk_b64), context_id)
|
||||
elif event_type == "error":
|
||||
await self.push_error(error_msg=f"NIM error: {msg.get('message', msg)}")
|
||||
# In case of error we need to reconnect, otherwise we are not going to receive audio from the TTS service anymore
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
import json
|
||||
import time
|
||||
from collections import Counter
|
||||
from collections.abc import AsyncGenerator
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
@@ -201,6 +202,24 @@ def _prepare_language_hints(
|
||||
return list(set(prepared_languages))
|
||||
|
||||
|
||||
def _language_from_tokens(tokens: list[dict]) -> Language | None:
|
||||
language_counts: Counter[Language] = Counter()
|
||||
|
||||
for token in tokens:
|
||||
language = token.get("language")
|
||||
if not language:
|
||||
continue
|
||||
try:
|
||||
language_counts[Language(language)] += 1
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if not language_counts:
|
||||
return None
|
||||
|
||||
return language_counts.most_common(1)[0][0]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SonioxSTTSettings(STTSettings):
|
||||
"""Settings for SonioxSTTService.
|
||||
@@ -557,6 +576,7 @@ class SonioxSTTService(WebsocketSTTService):
|
||||
async def send_endpoint_transcript():
|
||||
if self._final_transcription_buffer:
|
||||
text = "".join(map(lambda token: token["text"], self._final_transcription_buffer))
|
||||
language = _language_from_tokens(self._final_transcription_buffer)
|
||||
# Soniox only pushes TranscriptionFrame when an end token is received,
|
||||
# so every TranscriptionFrame is inherently finalized
|
||||
await self.push_frame(
|
||||
@@ -564,11 +584,12 @@ class SonioxSTTService(WebsocketSTTService):
|
||||
text=text,
|
||||
user_id=self._user_id,
|
||||
timestamp=time_now_iso8601(),
|
||||
language=language,
|
||||
result=self._final_transcription_buffer,
|
||||
finalized=True,
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(text, is_final=True)
|
||||
await self._handle_transcription(text, is_final=True, language=language)
|
||||
await self.stop_processing_metrics()
|
||||
self._final_transcription_buffer = []
|
||||
|
||||
|
||||
@@ -1283,10 +1283,17 @@ class TTSService(AIService):
|
||||
def get_active_audio_context_id(self) -> str | None:
|
||||
"""Get the active audio context ID.
|
||||
|
||||
Returns the playback cursor when set (during active playback), falling
|
||||
back to the current turn's synthesis context_id. The fallback covers
|
||||
the gap between contexts and the start of a turn before the playback
|
||||
task has popped the just-created context off the serialization queue —
|
||||
important for services whose wire protocol does not echo context_id
|
||||
back on incoming audio.
|
||||
|
||||
Returns:
|
||||
The active context ID, or None if no context is active.
|
||||
The active context ID, or None if neither cursor is set.
|
||||
"""
|
||||
return self._playing_context_id
|
||||
return self._playing_context_id or self._turn_context_id
|
||||
|
||||
async def remove_active_audio_context(self):
|
||||
"""Remove the active audio context."""
|
||||
|
||||
@@ -20,6 +20,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallsStartedFrame,
|
||||
InterruptionFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMMarkerFrame,
|
||||
@@ -222,6 +223,14 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
# ensures graceful degradation if the LLM disobeys and outputs additional text.
|
||||
self._turn_suppressed = False
|
||||
self._turn_complete_found = False # True when ✓ (COMPLETE) is detected
|
||||
# Set when the LLM made a tool call during this turn. Informational
|
||||
# only — broadcasting is idempotency-gated by
|
||||
# ``_turn_completion_broadcasted``.
|
||||
self._turn_had_function_call = False
|
||||
# True once ``UserTurnInferenceCompletedFrame`` has been broadcast
|
||||
# for this turn. Prevents double-broadcast when ✓ and a tool call
|
||||
# both occur in the same turn.
|
||||
self._turn_completion_broadcasted = False
|
||||
|
||||
# Timeout handling
|
||||
self._user_turn_completion_config = UserTurnCompletionConfig()
|
||||
@@ -236,6 +245,27 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
"""
|
||||
self._user_turn_completion_config = config
|
||||
|
||||
async def _broadcast_turn_completion(self):
|
||||
"""Broadcast ``UserTurnInferenceCompletedFrame`` at most once per turn.
|
||||
|
||||
Called from the two places we know the LLM has committed to a
|
||||
response for the current user turn:
|
||||
|
||||
- the ``✓`` marker is detected in the text stream
|
||||
- a ``FunctionCallsStartedFrame`` is emitted — the LLM committed
|
||||
to a tool call before producing (or instead of) a marker.
|
||||
|
||||
Broadcasting on the tool-call path matters for races: the
|
||||
downstream ``UserStoppedSpeakingFrame`` needs to propagate
|
||||
before the function actually executes and a
|
||||
``FunctionCallResultFrame`` flows back to the assistant
|
||||
aggregator.
|
||||
"""
|
||||
if self._turn_completion_broadcasted:
|
||||
return
|
||||
self._turn_completion_broadcasted = True
|
||||
await self.broadcast_frame(UserTurnInferenceCompletedFrame)
|
||||
|
||||
async def _start_incomplete_timeout(self, incomplete_type: Literal["short", "long"]):
|
||||
"""Start a timeout task for incomplete turn handling.
|
||||
|
||||
@@ -325,6 +355,8 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
self._turn_text_buffer = ""
|
||||
self._turn_suppressed = False
|
||||
self._turn_complete_found = False
|
||||
self._turn_had_function_call = False
|
||||
self._turn_completion_broadcasted = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames, handling turn completion state resets.
|
||||
@@ -351,7 +383,14 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
frame: The frame to push downstream.
|
||||
direction: The direction of frame flow. Defaults to downstream.
|
||||
"""
|
||||
if isinstance(frame, LLMFullResponseEndFrame):
|
||||
if isinstance(frame, FunctionCallsStartedFrame):
|
||||
self._turn_had_function_call = True
|
||||
# Broadcast turn completion now, before the function dispatches
|
||||
# — gives ``UserStoppedSpeakingFrame`` maximum time to propagate
|
||||
# so the assistant aggregator's ``_user_speaking`` is False by
|
||||
# the time a ``FunctionCallResultFrame`` arrives.
|
||||
await self._broadcast_turn_completion()
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
await self._turn_reset()
|
||||
|
||||
await super().push_frame(frame, direction)
|
||||
@@ -427,7 +466,9 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
# LLMTurnCompletionUserTurnStopStrategy) can fire
|
||||
# `on_user_turn_stopped`. Must fire before the marker so
|
||||
# downstream consumers see the signal before the response.
|
||||
await self.broadcast_frame(UserTurnInferenceCompletedFrame)
|
||||
# Idempotent: a tool call earlier in the turn may have
|
||||
# already broadcast.
|
||||
await self._broadcast_turn_completion()
|
||||
|
||||
# Push the marker as a sideband signal that the assistant
|
||||
# aggregator will prepend to the upcoming aggregated text,
|
||||
|
||||
@@ -11,7 +11,6 @@ rich information about service execution including configuration,
|
||||
parameters, and performance metrics.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import inspect
|
||||
import json
|
||||
@@ -24,7 +23,16 @@ if TYPE_CHECKING:
|
||||
from opentelemetry import context as context_api
|
||||
from opentelemetry import trace
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
MetricsFrame,
|
||||
TranscriptionFrame,
|
||||
TTSStoppedFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import TTFBMetricsData
|
||||
from pipecat.processors.aggregators.llm_context import NOT_GIVEN
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.utils.tracing.service_attributes import (
|
||||
add_gemini_live_span_attributes,
|
||||
add_llm_span_attributes,
|
||||
@@ -175,6 +183,13 @@ def traced_tts(func: Callable | None = None, *, name: str | None = None) -> Call
|
||||
- Character count and text content
|
||||
- Performance metrics like TTFB
|
||||
|
||||
The span is scoped to the full synthesis operation, from
|
||||
``create_audio_context`` until ``TTSStoppedFrame`` (or
|
||||
``remove_audio_context`` as a safety net), so TTFB and any other
|
||||
runtime-computed metrics land on the correct span even when audio
|
||||
chunks are delivered after ``run_tts`` returns (e.g. WebSocket
|
||||
streaming TTS services).
|
||||
|
||||
Works with both async functions and generators.
|
||||
|
||||
Args:
|
||||
@@ -190,102 +205,224 @@ def traced_tts(func: Callable | None = None, *, name: str | None = None) -> Call
|
||||
def decorator(f):
|
||||
is_async_generator = inspect.isasyncgenfunction(f)
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def tracing_context(self, text):
|
||||
"""Async context manager for TTS tracing.
|
||||
def end_tts_span(service, context_id, *, interrupted=False):
|
||||
"""End the TTS span for ``context_id`` if still open. Idempotent."""
|
||||
entry = service._tts_spans.pop(context_id, None)
|
||||
if not entry:
|
||||
return
|
||||
try:
|
||||
span = entry["span"]
|
||||
if interrupted:
|
||||
span.set_attribute("tts.interrupted", True)
|
||||
span.end()
|
||||
except Exception as e:
|
||||
logging.warning(f"Error closing TTS span: {e}")
|
||||
|
||||
Args:
|
||||
self: The TTS service instance.
|
||||
text: The text being synthesized.
|
||||
def install_audio_context_patches(service):
|
||||
"""Install per-instance wrappers on the audio-context methods.
|
||||
|
||||
Yields:
|
||||
The active span for the TTS operation.
|
||||
The wrappers own the lifetime of the TTS span:
|
||||
|
||||
- ``create_audio_context``: opens the span and records
|
||||
baseline attributes.
|
||||
- ``append_to_audio_context``: ends the span on
|
||||
``TTSStoppedFrame``.
|
||||
- ``push_frame``: records ``metrics.ttfb`` from the
|
||||
canonical ``TTFBMetricsData`` payload of any
|
||||
``MetricsFrame`` pushed by ``stop_ttfb_metrics``. Reading
|
||||
the value from the metrics event (instead of polling
|
||||
``_metrics.ttfb`` when the first audio is queued) avoids
|
||||
the ``ttfb`` property's in-progress fallback, which would
|
||||
otherwise report an under-estimate whenever a context's
|
||||
audio waits behind earlier queued audio before
|
||||
``_handle_audio_context`` actually stops the TTFB
|
||||
measurement.
|
||||
- ``remove_audio_context``: ends any still-open span as a
|
||||
safety net for error and cancellation paths.
|
||||
- ``on_audio_context_completed``: ends the span on natural
|
||||
completion. Needed because services that rely on the
|
||||
base class to auto-push ``TTSStoppedFrame`` (via
|
||||
``push_frame`` in ``_handle_audio_context``) bypass the
|
||||
``append_to_audio_context`` hook entirely.
|
||||
- ``reset_active_audio_context``: ends the currently
|
||||
playing context's span if still open. Always called from
|
||||
``_handle_interruption``, so this is the interruption
|
||||
hook.
|
||||
|
||||
The patches check ``_tracing_enabled`` at invocation time,
|
||||
so they are safe to install regardless of whether tracing
|
||||
is enabled.
|
||||
"""
|
||||
# Check if tracing is enabled for this service instance
|
||||
if not getattr(self, "_tracing_enabled", False):
|
||||
yield None
|
||||
if getattr(service, "__tts_tracing_patches_installed__", False):
|
||||
return
|
||||
service.__tts_tracing_patches_installed__ = True
|
||||
service._tts_spans = {}
|
||||
|
||||
orig_create = service.create_audio_context
|
||||
orig_append = service.append_to_audio_context
|
||||
orig_remove = service.remove_audio_context
|
||||
orig_completed = service.on_audio_context_completed
|
||||
orig_reset_active = service.reset_active_audio_context
|
||||
orig_push_frame = service.push_frame
|
||||
|
||||
async def traced_create_audio_context(context_id):
|
||||
if getattr(service, "_tracing_enabled", False):
|
||||
try:
|
||||
parent = _get_turn_context(service) or _get_parent_service_context(service)
|
||||
tracer = trace.get_tracer("pipecat")
|
||||
span = tracer.start_span("tts", context=parent)
|
||||
service._tts_spans[context_id] = {"span": span, "ttfb_recorded": False}
|
||||
|
||||
settings = getattr(service, "_settings", None)
|
||||
add_tts_span_attributes(
|
||||
span=span,
|
||||
service_name=service.__class__.__name__,
|
||||
model=_get_model_name(service),
|
||||
voice_id=getattr(settings, "voice", "unknown"),
|
||||
settings=settings,
|
||||
operation_name="tts",
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warning(f"Error opening TTS span: {e}")
|
||||
return await orig_create(context_id)
|
||||
|
||||
async def traced_append_to_audio_context(context_id, frame):
|
||||
entry = service._tts_spans.get(context_id)
|
||||
if entry and frame is not None:
|
||||
try:
|
||||
if isinstance(frame, TTSStoppedFrame):
|
||||
entry["span"].end()
|
||||
service._tts_spans.pop(context_id, None)
|
||||
except Exception as e:
|
||||
logging.warning(f"Error updating TTS span: {e}")
|
||||
return await orig_append(context_id, frame)
|
||||
|
||||
async def traced_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
|
||||
await orig_push_frame(frame, direction)
|
||||
if not getattr(service, "_tracing_enabled", False):
|
||||
return
|
||||
if not isinstance(frame, MetricsFrame):
|
||||
return
|
||||
try:
|
||||
playing_id = getattr(service, "_playing_context_id", None)
|
||||
if playing_id is None:
|
||||
return
|
||||
entry = service._tts_spans.get(playing_id)
|
||||
if not entry or entry["ttfb_recorded"]:
|
||||
return
|
||||
for data in frame.data:
|
||||
if isinstance(data, TTFBMetricsData):
|
||||
entry["span"].set_attribute("metrics.ttfb", data.value)
|
||||
entry["ttfb_recorded"] = True
|
||||
break
|
||||
except Exception as e:
|
||||
logging.warning(f"Error recording TTS ttfb from MetricsFrame: {e}")
|
||||
|
||||
async def traced_remove_audio_context(context_id):
|
||||
entry = service._tts_spans.pop(context_id, None)
|
||||
if entry:
|
||||
try:
|
||||
entry["span"].end()
|
||||
except Exception as e:
|
||||
logging.warning(f"Error closing TTS span: {e}")
|
||||
return await orig_remove(context_id)
|
||||
|
||||
async def traced_on_audio_context_completed(context_id):
|
||||
end_tts_span(service, context_id)
|
||||
return await orig_completed(context_id)
|
||||
|
||||
def traced_reset_active_audio_context():
|
||||
playing_id = getattr(service, "_playing_context_id", None)
|
||||
if playing_id is not None:
|
||||
end_tts_span(service, playing_id, interrupted=True)
|
||||
return orig_reset_active()
|
||||
|
||||
service.create_audio_context = traced_create_audio_context
|
||||
service.append_to_audio_context = traced_append_to_audio_context
|
||||
service.push_frame = traced_push_frame
|
||||
service.remove_audio_context = traced_remove_audio_context
|
||||
service.on_audio_context_completed = traced_on_audio_context_completed
|
||||
service.reset_active_audio_context = traced_reset_active_audio_context
|
||||
|
||||
def patch_setup(owner):
|
||||
"""Wrap ``owner.setup`` so audio-context patches install per-instance.
|
||||
|
||||
Idempotent: if a parent class has already been wrapped,
|
||||
skip. The patches check ``_tracing_enabled`` at invocation
|
||||
time, so wrapping is always safe.
|
||||
"""
|
||||
original_setup = owner.setup
|
||||
if getattr(original_setup, "__tts_tracing_setup_wrapped__", False):
|
||||
return
|
||||
|
||||
service_class_name = self.__class__.__name__
|
||||
span_name = "tts"
|
||||
@functools.wraps(original_setup)
|
||||
async def patched_setup(self, setup):
|
||||
await original_setup(self, setup)
|
||||
install_audio_context_patches(self)
|
||||
|
||||
# Get parent context
|
||||
parent_context = _get_turn_context(self) or _get_parent_service_context(self)
|
||||
setattr(patched_setup, "__tts_tracing_setup_wrapped__", True)
|
||||
owner.setup = patched_setup
|
||||
|
||||
# Create span
|
||||
tracer = trace.get_tracer("pipecat")
|
||||
with tracer.start_as_current_span(span_name, context=parent_context) as span:
|
||||
try:
|
||||
settings = getattr(self, "_settings", None)
|
||||
add_tts_span_attributes(
|
||||
span=span,
|
||||
service_name=service_class_name,
|
||||
model=_get_model_name(self),
|
||||
voice_id=getattr(settings, "voice", "unknown"),
|
||||
text=text,
|
||||
settings=settings,
|
||||
character_count=len(text),
|
||||
operation_name="tts",
|
||||
cartesia_version=getattr(self, "_cartesia_version", None),
|
||||
context_id=getattr(self, "_context_id", None),
|
||||
)
|
||||
def attach_run_tts_attributes(service, text, args, kwargs):
|
||||
"""Attach text-specific attributes to the in-flight TTS span."""
|
||||
if not getattr(service, "_tracing_enabled", False):
|
||||
return
|
||||
try:
|
||||
context_id = args[0] if args else kwargs.get("context_id")
|
||||
entry = getattr(service, "_tts_spans", {}).get(context_id)
|
||||
if entry and text:
|
||||
span = entry["span"]
|
||||
span.set_attribute("text", text)
|
||||
span.set_attribute("metrics.character_count", len(text))
|
||||
except Exception as e:
|
||||
logging.warning(f"Error attaching TTS text to span: {e}")
|
||||
|
||||
yield span
|
||||
def make_run_tts_wrapper():
|
||||
"""Build the wrapper around ``run_tts`` that adds per-call attributes.
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Error in TTS tracing: {e}")
|
||||
raise
|
||||
finally:
|
||||
# Update TTFB metric at the end
|
||||
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
|
||||
if ttfb is not None:
|
||||
span.set_attribute("metrics.ttfb", ttfb)
|
||||
Span lifetime is owned by the audio-context patches. This
|
||||
wrapper only attaches the text and character count to the
|
||||
span that was opened by ``create_audio_context`` just
|
||||
before ``run_tts`` was invoked.
|
||||
"""
|
||||
if is_async_generator:
|
||||
|
||||
if is_async_generator:
|
||||
|
||||
@functools.wraps(f)
|
||||
async def gen_wrapper(self, text, *args, **kwargs):
|
||||
if not getattr(self, "_tracing_enabled", False):
|
||||
async for item in f(self, text, *args, **kwargs):
|
||||
yield item
|
||||
return
|
||||
|
||||
fn_called = False
|
||||
try:
|
||||
async with tracing_context(self, text):
|
||||
fn_called = True
|
||||
async for item in f(self, text, *args, **kwargs):
|
||||
yield item
|
||||
except Exception as e:
|
||||
if fn_called:
|
||||
raise
|
||||
logging.error(f"Error in TTS tracing (continuing without tracing): {e}")
|
||||
@functools.wraps(f)
|
||||
async def gen_wrapper(self, text, *args, **kwargs):
|
||||
attach_run_tts_attributes(self, text, args, kwargs)
|
||||
async for item in f(self, text, *args, **kwargs):
|
||||
yield item
|
||||
|
||||
return gen_wrapper
|
||||
else:
|
||||
return gen_wrapper
|
||||
|
||||
@functools.wraps(f)
|
||||
async def wrapper(self, text, *args, **kwargs):
|
||||
if not getattr(self, "_tracing_enabled", False):
|
||||
return await f(self, text, *args, **kwargs)
|
||||
async def coro_wrapper(self, text, *args, **kwargs):
|
||||
attach_run_tts_attributes(self, text, args, kwargs)
|
||||
return await f(self, text, *args, **kwargs)
|
||||
|
||||
fn_called = False
|
||||
try:
|
||||
async with tracing_context(self, text):
|
||||
fn_called = True
|
||||
return await f(self, text, *args, **kwargs)
|
||||
except Exception as e:
|
||||
if fn_called:
|
||||
raise
|
||||
logging.error(f"Error in TTS tracing (continuing without tracing): {e}")
|
||||
return await f(self, text, *args, **kwargs)
|
||||
return coro_wrapper
|
||||
|
||||
return wrapper
|
||||
class _TracedTTSDescriptor:
|
||||
"""Class-level descriptor that wires up TTS tracing at class definition time.
|
||||
|
||||
``__set_name__`` fires when the class body finishes evaluating,
|
||||
giving us a chance to wrap the owner's ``setup()`` so that the
|
||||
audio-context patches install on every instance before any
|
||||
``create_audio_context`` call (including the very first one).
|
||||
"""
|
||||
|
||||
def __set_name__(self, owner, attr_name):
|
||||
patch_setup(owner)
|
||||
setattr(owner, attr_name, make_run_tts_wrapper())
|
||||
|
||||
return _TracedTTSDescriptor()
|
||||
|
||||
if func is not None:
|
||||
return decorator(func)
|
||||
# ``decorator(func)`` returns a descriptor placeholder that
|
||||
# Python replaces with the real wrapped function once
|
||||
# ``__set_name__`` runs at class definition time. Pyright sees
|
||||
# only the descriptor instance, hence the ignore.
|
||||
return decorator(func) # type: ignore[return-value]
|
||||
return decorator
|
||||
|
||||
|
||||
@@ -299,72 +436,285 @@ def traced_stt(func: Callable | None = None, *, name: str | None = None) -> Call
|
||||
- Language information
|
||||
- Performance metrics like TTFB
|
||||
|
||||
The span is scoped to one STT segment, from
|
||||
``VADUserStartedSpeakingFrame`` (or the first ``TranscriptionFrame``
|
||||
when VAD did not fire, e.g. whispered speech) until a finalized
|
||||
``TranscriptionFrame``. Multiple finalized transcripts in a single
|
||||
user turn produce multiple sequential spans, each anchored at the
|
||||
point speech for that segment began. ``metrics.ttfb`` is read after
|
||||
the base ``push_frame`` runs ``stop_ttfb_metrics`` for the
|
||||
finalized frame, so the value is correct for the closing span.
|
||||
|
||||
Args:
|
||||
func: The STT method to trace.
|
||||
name: Custom span name. Defaults to function name.
|
||||
|
||||
Returns:
|
||||
Wrapped method with STT-specific tracing.
|
||||
The original method unchanged. The decorator's class-definition-
|
||||
time work is to install a ``push_frame`` wrapper on the owning
|
||||
class that owns the span lifetime.
|
||||
"""
|
||||
if not is_tracing_available():
|
||||
return _noop_decorator if func is None else _noop_decorator(func)
|
||||
|
||||
def decorator(f):
|
||||
@functools.wraps(f)
|
||||
async def wrapper(self, transcript, is_final, language=None):
|
||||
if not getattr(self, "_tracing_enabled", False):
|
||||
return await f(self, transcript, is_final, language)
|
||||
def patch_push_frame(owner):
|
||||
"""Wrap ``owner.push_frame`` to drive the STT span lifecycle.
|
||||
|
||||
fn_called = False
|
||||
try:
|
||||
service_class_name = self.__class__.__name__
|
||||
span_name = "stt"
|
||||
Idempotent: if a parent class has already been wrapped, skip.
|
||||
The wrapper checks ``_tracing_enabled`` at invocation time,
|
||||
so it is safe to install regardless of whether tracing is
|
||||
enabled.
|
||||
"""
|
||||
original_push_frame = owner.push_frame
|
||||
if getattr(original_push_frame, "__stt_tracing_push_frame_wrapped__", False):
|
||||
return
|
||||
|
||||
# Get the turn context first, then fall back to service context
|
||||
parent_context = _get_turn_context(self) or _get_parent_service_context(self)
|
||||
def update_transcript(state, new_text):
|
||||
"""Append or extend the current segment in ``state['segments']``.
|
||||
|
||||
# Create a new span as child of the turn span or service span
|
||||
If ``new_text`` starts with the last recorded segment,
|
||||
treat it as a continuation (interim accumulation) and
|
||||
replace the last segment. Otherwise treat it as a new
|
||||
segment and append. Some STT services (Deepgram with
|
||||
utterance_end_ms enabled, for example) emit several
|
||||
``TranscriptionFrame``s per turn where each carries a
|
||||
different segment rather than a cumulative update —
|
||||
without this logic the span's transcript would only
|
||||
show the last segment and the beginning would be lost.
|
||||
"""
|
||||
if not new_text:
|
||||
return
|
||||
segments = state["segments"]
|
||||
if not segments:
|
||||
segments.append(new_text)
|
||||
elif new_text.startswith(segments[-1]):
|
||||
segments[-1] = new_text
|
||||
else:
|
||||
segments.append(new_text)
|
||||
|
||||
def open_span(service, state):
|
||||
"""Open the STT span, anchored at ``segment_start_time`` if set."""
|
||||
parent = _get_turn_context(service) or _get_parent_service_context(service)
|
||||
tracer = trace.get_tracer("pipecat")
|
||||
with tracer.start_as_current_span(
|
||||
span_name, context=parent_context
|
||||
) as current_span:
|
||||
start_time_ns = (
|
||||
int(state["segment_start_time"] * 1e9)
|
||||
if state["segment_start_time"] is not None
|
||||
else None
|
||||
)
|
||||
span = tracer.start_span("stt", context=parent, start_time=start_time_ns)
|
||||
try:
|
||||
settings = getattr(service, "_settings", None)
|
||||
add_stt_span_attributes(
|
||||
span=span,
|
||||
service_name=service.__class__.__name__,
|
||||
model=_get_model_name(service),
|
||||
settings=settings,
|
||||
vad_enabled=getattr(service, "vad_enabled", False),
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warning(f"Error setting STT span baseline attributes: {e}")
|
||||
state["span"] = span
|
||||
|
||||
def handle_pre_push(service, frame, state):
|
||||
"""Record speech-start anchor; lazy-open span on first transcript.
|
||||
|
||||
Lazy-opening on ``TranscriptionFrame`` (rather than on
|
||||
``VADUserStartedSpeakingFrame`` or
|
||||
``UserStartedSpeakingFrame``) avoids racing with
|
||||
``TurnTraceObserver._handle_turn_started``, which runs
|
||||
in a background task fired by ``_call_event_handler``
|
||||
(``base_object.py:232``) and may not have set the new
|
||||
turn's context yet — that produces STT spans parented
|
||||
to the previous turn. By the time STT actually emits
|
||||
a transcript, the turn observer has run.
|
||||
|
||||
Opening happens in pre-push (rather than post-push) so
|
||||
that the recursive ``push_frame`` that
|
||||
``STTService.push_frame`` triggers for the
|
||||
``MetricsFrame`` (via ``stop_ttfb_metrics`` at
|
||||
``stt_service.py:465``) sees the span already open and
|
||||
can attribute ``metrics.ttfb`` to it.
|
||||
"""
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
# Anchor the next span at the moment speech began.
|
||||
# Skip if we already have an anchor (intra-turn VAD
|
||||
# re-trigger) or a span open.
|
||||
if state["span"] is None and state["segment_start_time"] is None:
|
||||
state["segment_start_time"] = frame.timestamp - frame.start_secs
|
||||
elif isinstance(frame, TranscriptionFrame) and state["span"] is None:
|
||||
open_span(service, state)
|
||||
|
||||
async def handle_post_push(service, frame, state):
|
||||
"""Attach per-frame attrs; close on finalized; record TTFB from MetricsFrame.
|
||||
|
||||
``metrics.ttfb`` is read off the ``TTFBMetricsData``
|
||||
payload of any ``MetricsFrame`` pushed by
|
||||
``stop_ttfb_metrics`` — the canonical value the rest
|
||||
of the system uses — rather than from
|
||||
``_metrics.ttfb``, which has an in-progress fallback
|
||||
branch (``frame_processor_metrics.py:48-62``) that
|
||||
would return an under-estimate if read at the wrong
|
||||
time.
|
||||
|
||||
One STT span per finalized transcript: the span opens
|
||||
lazily on the first ``TranscriptionFrame`` (pre-push,
|
||||
anchored at speech start via ``segment_start_time``)
|
||||
and closes on ``finalized=True``. Multiple finalized
|
||||
transcripts in a single turn produce multiple spans.
|
||||
|
||||
For services that never set ``frame.finalized=True``
|
||||
(e.g. Deepgram, which only marks it via
|
||||
``confirm_finalize()``), the span closes on
|
||||
``UserStoppedSpeakingFrame``. To capture
|
||||
``metrics.ttfb`` for those spans we force-stop any
|
||||
pending TTFB measurement before closing — that pushes
|
||||
a ``MetricsFrame``, our post-push attributes the
|
||||
value, and ``patched_stop_ttfb_metrics`` closes the
|
||||
span. The ``stt.incomplete=true`` flag is only set if
|
||||
neither a finalized transcript nor a TTFB measurement
|
||||
ever finalized for the span.
|
||||
"""
|
||||
if isinstance(frame, UserStoppedSpeakingFrame):
|
||||
prev_span = state["span"]
|
||||
if prev_span is None:
|
||||
return
|
||||
metrics = getattr(service, "_metrics", None)
|
||||
if metrics is not None and getattr(metrics, "_start_ttfb_time", 0) > 0:
|
||||
last_transcript_time = getattr(service, "_last_transcript_time", 0) or None
|
||||
try:
|
||||
await service.stop_ttfb_metrics(end_time=last_transcript_time)
|
||||
except Exception as e:
|
||||
logging.warning(f"Error force-stopping STT TTFB on user turn end: {e}")
|
||||
# patched_stop_ttfb_metrics may have closed the span
|
||||
# via the timeout path; re-check.
|
||||
if state["span"] is None:
|
||||
state["segments"] = []
|
||||
return
|
||||
state["span"].set_attribute("stt.incomplete", True)
|
||||
state["span"].end()
|
||||
state["span"] = None
|
||||
state["segment_start_time"] = None
|
||||
state["segments"] = []
|
||||
elif isinstance(frame, MetricsFrame):
|
||||
span = state["span"]
|
||||
if span is None:
|
||||
return
|
||||
for data in frame.data:
|
||||
if isinstance(data, TTFBMetricsData):
|
||||
span.set_attribute("metrics.ttfb", data.value)
|
||||
break
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
span = state["span"]
|
||||
if span is None:
|
||||
return
|
||||
if frame.text:
|
||||
update_transcript(state, frame.text)
|
||||
span.set_attribute("transcript", " ".join(state["segments"]).strip())
|
||||
span.set_attribute("is_final", bool(frame.finalized))
|
||||
if frame.language:
|
||||
span.set_attribute("language", str(frame.language))
|
||||
if frame.user_id:
|
||||
span.set_attribute("user_id", frame.user_id)
|
||||
if frame.finalized:
|
||||
span.end()
|
||||
state["span"] = None
|
||||
state["segment_start_time"] = None
|
||||
state["segments"] = []
|
||||
|
||||
@functools.wraps(original_push_frame)
|
||||
async def patched_push_frame(self, frame, direction=FrameDirection.DOWNSTREAM):
|
||||
state = getattr(self, "_stt_span_state", None)
|
||||
if state is None:
|
||||
state = {"span": None, "segment_start_time": None, "segments": []}
|
||||
self._stt_span_state = state
|
||||
|
||||
if getattr(self, "_tracing_enabled", False):
|
||||
try:
|
||||
# Get TTFB metric if available
|
||||
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
|
||||
|
||||
# Use settings from the service if available
|
||||
settings = getattr(self, "_settings", None)
|
||||
|
||||
add_stt_span_attributes(
|
||||
span=current_span,
|
||||
service_name=service_class_name,
|
||||
model=_get_model_name(self),
|
||||
transcript=transcript,
|
||||
is_final=is_final,
|
||||
language=str(language) if language else None,
|
||||
user_id=getattr(self, "_user_id", None),
|
||||
vad_enabled=getattr(self, "vad_enabled", False),
|
||||
settings=settings,
|
||||
ttfb=ttfb,
|
||||
)
|
||||
|
||||
# Call the original function
|
||||
fn_called = True
|
||||
return await f(self, transcript, is_final, language)
|
||||
handle_pre_push(self, frame, state)
|
||||
except Exception as e:
|
||||
# Log any exception but don't disrupt the main flow
|
||||
logging.warning(f"Error in STT transcription tracing: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
if fn_called:
|
||||
raise
|
||||
logging.error(f"Error in STT tracing (continuing without tracing): {e}")
|
||||
return await f(self, transcript, is_final, language)
|
||||
logging.warning(f"Error in STT pre-push tracing: {e}")
|
||||
|
||||
return wrapper
|
||||
await original_push_frame(self, frame, direction)
|
||||
|
||||
if getattr(self, "_tracing_enabled", False):
|
||||
try:
|
||||
await handle_post_push(self, frame, state)
|
||||
except Exception as e:
|
||||
logging.warning(f"Error in STT post-push tracing: {e}")
|
||||
|
||||
setattr(patched_push_frame, "__stt_tracing_push_frame_wrapped__", True)
|
||||
owner.push_frame = patched_push_frame
|
||||
|
||||
def patch_stop_ttfb_metrics(owner):
|
||||
"""Wrap ``owner.stop_ttfb_metrics`` to close the span on the timeout path.
|
||||
|
||||
When ``stop_ttfb_metrics`` is invoked with ``end_time`` set,
|
||||
that signals the TTFB-timeout handler firing
|
||||
(`stt_service.py:566`), or our own force-stop from the
|
||||
``UserStoppedSpeakingFrame`` handler. In either case we
|
||||
anchor the span's end at ``end_time``
|
||||
(= ``_last_transcript_time``) rather than at whenever the
|
||||
coroutine resumed.
|
||||
|
||||
``metrics.ttfb`` attribution is not done here — the
|
||||
``MetricsFrame`` that ``stop_ttfb_metrics`` pushes flows
|
||||
through ``push_frame`` and gets recorded by
|
||||
``handle_post_push``, which reads the canonical
|
||||
``TTFBMetricsData.value`` rather than the in-progress
|
||||
``_metrics.ttfb`` property.
|
||||
"""
|
||||
original_stop = owner.stop_ttfb_metrics
|
||||
if getattr(original_stop, "__stt_tracing_stop_ttfb_wrapped__", False):
|
||||
return
|
||||
|
||||
@functools.wraps(original_stop)
|
||||
async def patched_stop(self, *, end_time=None):
|
||||
await original_stop(self, end_time=end_time)
|
||||
if end_time is None:
|
||||
return
|
||||
if not getattr(self, "_tracing_enabled", False):
|
||||
return
|
||||
state = getattr(self, "_stt_span_state", None)
|
||||
if not state or state["span"] is None:
|
||||
return
|
||||
try:
|
||||
span = state["span"]
|
||||
span.end(end_time=int(end_time * 1e9))
|
||||
state["span"] = None
|
||||
state["segment_start_time"] = None
|
||||
state["segments"] = []
|
||||
except Exception as e:
|
||||
logging.warning(f"Error in STT stop_ttfb_metrics tracing: {e}")
|
||||
|
||||
setattr(patched_stop, "__stt_tracing_stop_ttfb_wrapped__", True)
|
||||
owner.stop_ttfb_metrics = patched_stop
|
||||
|
||||
class _TracedSTTDescriptor:
|
||||
"""Class-level descriptor that wires up STT tracing at class definition time.
|
||||
|
||||
``__set_name__`` fires when the class body finishes evaluating,
|
||||
giving us a chance to wrap the owner's ``push_frame`` so that
|
||||
VAD, transcription, and finalization events drive the span
|
||||
lifecycle, and to wrap ``stop_ttfb_metrics`` so the
|
||||
TTFB-timeout path can attach metrics and close the span when
|
||||
no finalized transcript ever arrives. The decorated method
|
||||
itself runs unchanged.
|
||||
"""
|
||||
|
||||
def __set_name__(self, owner, attr_name):
|
||||
patch_push_frame(owner)
|
||||
patch_stop_ttfb_metrics(owner)
|
||||
setattr(owner, attr_name, f)
|
||||
|
||||
return _TracedSTTDescriptor()
|
||||
|
||||
if func is not None:
|
||||
return decorator(func)
|
||||
# ``decorator(func)`` returns a descriptor placeholder that
|
||||
# Python replaces with the real wrapped function once
|
||||
# ``__set_name__`` runs at class definition time. Pyright sees
|
||||
# only the descriptor instance, hence the ignore.
|
||||
return decorator(func) # type: ignore[return-value]
|
||||
return decorator
|
||||
|
||||
|
||||
@@ -549,10 +899,6 @@ def traced_llm(func: Callable | None = None, *, name: str | None = None) -> Call
|
||||
fn_called = True
|
||||
result = await f(self, context, *args, **kwargs)
|
||||
|
||||
# Add aggregated output after function completes, if available
|
||||
if output_text:
|
||||
current_span.set_attribute("output", output_text)
|
||||
|
||||
return result
|
||||
|
||||
finally:
|
||||
@@ -565,6 +911,15 @@ def traced_llm(func: Callable | None = None, *, name: str | None = None) -> Call
|
||||
):
|
||||
self.start_llm_usage_metrics = original_start_llm_usage_metrics
|
||||
|
||||
# Attach whatever output text we accumulated so
|
||||
# far. Doing this in finally captures partial
|
||||
# output when ``f`` is cancelled or raises mid-
|
||||
# stream (e.g. interruption during LLM
|
||||
# generation), rather than only on clean
|
||||
# completion.
|
||||
if output_text:
|
||||
current_span.set_attribute("output", output_text)
|
||||
|
||||
# Update TTFB metric
|
||||
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
|
||||
if ttfb is not None:
|
||||
|
||||
111
tests/test_cartesia_tts.py
Normal file
111
tests/test_cartesia_tts.py
Normal file
@@ -0,0 +1,111 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.settings import TTSSettings
|
||||
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
|
||||
|
||||
|
||||
def _service(language: str) -> CartesiaTTSService:
|
||||
service = CartesiaTTSService.__new__(CartesiaTTSService)
|
||||
service._settings = TTSSettings(language=language)
|
||||
return service
|
||||
|
||||
|
||||
def _process_word_timestamps(
|
||||
words: list[str], starts: list[float], language: str
|
||||
) -> list[tuple[str, float]]:
|
||||
return _service(language)._process_word_timestamps_for_language(words, starts)
|
||||
|
||||
|
||||
def _concatenate_processed_timestamps(
|
||||
timestamp_groups: list[tuple[list[str], list[float]]], language: str
|
||||
) -> str:
|
||||
service = _service(language)
|
||||
text_parts = []
|
||||
for words, starts in timestamp_groups:
|
||||
processed_timestamps = service._process_word_timestamps_for_language(words, starts)
|
||||
includes_inter_frame_spaces = service._word_timestamps_include_inter_frame_spaces()
|
||||
text_parts.extend(
|
||||
TextPartForConcatenation(
|
||||
word,
|
||||
includes_inter_part_spaces=includes_inter_frame_spaces,
|
||||
)
|
||||
for word, _timestamp in processed_timestamps
|
||||
)
|
||||
return concatenate_aggregated_text(text_parts)
|
||||
|
||||
|
||||
def test_cartesia_chinese_word_timestamps_join_without_spaces():
|
||||
assert _process_word_timestamps(
|
||||
words=["你", "好", "。"],
|
||||
starts=[0.0, 0.1, 0.2],
|
||||
language="zh",
|
||||
) == [("你好。", 0.0)]
|
||||
|
||||
|
||||
def test_cartesia_japanese_word_timestamps_join_without_spaces():
|
||||
assert _process_word_timestamps(
|
||||
words=["こ", "ん", "に", "ち", "は", "。"],
|
||||
starts=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5],
|
||||
language="ja",
|
||||
) == [("こんにちは。", 0.0)]
|
||||
|
||||
|
||||
def test_cartesia_korean_word_timestamps_preserve_words_and_timestamps():
|
||||
assert _process_word_timestamps(
|
||||
words=["안녕하세요", "반갑습니다"],
|
||||
starts=[0.0, 0.2],
|
||||
language="ko",
|
||||
) == [("안녕하세요", 0.0), ("반갑습니다", 0.2)]
|
||||
|
||||
|
||||
def test_cartesia_korean_word_timestamps_do_not_join_latin_and_hangul():
|
||||
assert _process_word_timestamps(
|
||||
words=["AI", "어시스턴트입니다."],
|
||||
starts=[3.7026982, 4.1999383],
|
||||
language="ko",
|
||||
) == [("AI", 3.7026982), ("어시스턴트입니다.", 4.1999383)]
|
||||
|
||||
|
||||
def test_cartesia_japanese_timestamp_groups_reassemble_without_spaces():
|
||||
assert (
|
||||
_concatenate_processed_timestamps(
|
||||
[
|
||||
(["こ", "ん", "に", "ち", "は", "、", "私"], [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]),
|
||||
(["は", "あ", "な", "た", "の"], [1.0, 1.1, 1.2, 1.3, 1.4]),
|
||||
],
|
||||
language="ja",
|
||||
)
|
||||
== "こんにちは、私はあなたの"
|
||||
)
|
||||
|
||||
|
||||
def test_cartesia_chinese_timestamp_groups_reassemble_without_spaces():
|
||||
assert (
|
||||
_concatenate_processed_timestamps(
|
||||
[
|
||||
(["你", "好", ",", "我", "是"], [0.1, 0.2, 0.3, 0.4, 0.5]),
|
||||
(["你", "的", "智", "能"], [1.0, 1.1, 1.2, 1.3]),
|
||||
],
|
||||
language="zh",
|
||||
)
|
||||
== "你好,我是你的智能"
|
||||
)
|
||||
|
||||
|
||||
def test_cartesia_korean_timestamp_groups_reassemble_with_spaces():
|
||||
assert (
|
||||
_concatenate_processed_timestamps(
|
||||
[
|
||||
(["저는"], [1.6]),
|
||||
(["여러분의"], [1.8]),
|
||||
(["AI", "어시스턴트입니다."], [3.7, 4.2]),
|
||||
],
|
||||
language="ko",
|
||||
)
|
||||
== "저는 여러분의 AI 어시스턴트입니다."
|
||||
)
|
||||
31
tests/test_inworld_tts_language.py
Normal file
31
tests/test_inworld_tts_language.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Tests for Inworld TTS language code mapping."""
|
||||
|
||||
from pipecat.services.inworld.tts import language_to_inworld_language
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
|
||||
def test_inworld_base_languages_resolve_to_canonical_regional_tags():
|
||||
"""Base GA languages should use the regional tags emitted by Inworld Playground."""
|
||||
assert language_to_inworld_language(Language.EN) == "en-US"
|
||||
assert language_to_inworld_language(Language.RU) == "ru-RU"
|
||||
assert language_to_inworld_language(Language.FR) == "fr-FR"
|
||||
assert language_to_inworld_language(Language.ZH) == "zh-CN"
|
||||
|
||||
|
||||
def test_inworld_regional_languages_are_preserved():
|
||||
"""Explicit regional variants should be passed through as supported BCP-47 tags."""
|
||||
assert language_to_inworld_language(Language.EN_GB) == "en-GB"
|
||||
assert language_to_inworld_language(Language.PT_PT) == "pt-PT"
|
||||
assert language_to_inworld_language(Language.RU_RU) == "ru-RU"
|
||||
|
||||
|
||||
def test_inworld_other_languages_are_passed_through_as_bcp47_tags():
|
||||
"""Languages outside the canonical locale map should keep their BCP-47 enum value."""
|
||||
assert language_to_inworld_language(Language.SV_SE) == "sv-SE"
|
||||
assert language_to_inworld_language(Language.UK_UA) == "uk-UA"
|
||||
175
tests/test_soniox_stt.py
Normal file
175
tests/test_soniox_stt.py
Normal file
@@ -0,0 +1,175 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from pipecat.frames.frames import TranscriptionFrame
|
||||
from pipecat.services.soniox.stt import END_TOKEN, SonioxSTTService, _language_from_tokens
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
|
||||
class _FakeWebsocket:
|
||||
def __init__(self, messages):
|
||||
self._messages = messages
|
||||
|
||||
def __aiter__(self):
|
||||
return self._iter_messages()
|
||||
|
||||
async def _iter_messages(self):
|
||||
for message in self._messages:
|
||||
yield message
|
||||
|
||||
|
||||
def test_language_from_tokens_uses_single_recognized_language():
|
||||
tokens = [
|
||||
{"text": "Hello", "language": "en"},
|
||||
{"text": " world", "language": "en"},
|
||||
]
|
||||
|
||||
assert _language_from_tokens(tokens) == Language.EN
|
||||
|
||||
|
||||
def test_language_from_tokens_uses_most_common_language():
|
||||
tokens = [
|
||||
{"text": "Ik", "language": "nl"},
|
||||
{"text": " zoek", "language": "nl"},
|
||||
{"text": " computer", "language": "en"},
|
||||
]
|
||||
|
||||
assert _language_from_tokens(tokens) == Language.NL
|
||||
|
||||
|
||||
def test_language_from_tokens_skips_unknown_language():
|
||||
tokens = [
|
||||
{"text": "Hello", "language": "en"},
|
||||
{"text": "!", "language": "klingon"},
|
||||
]
|
||||
|
||||
assert _language_from_tokens(tokens) == Language.EN
|
||||
|
||||
|
||||
def test_language_from_tokens_skips_missing_language():
|
||||
tokens = [
|
||||
{"text": "Hello", "language": "en"},
|
||||
{"text": " wereld"},
|
||||
]
|
||||
|
||||
assert _language_from_tokens(tokens) == Language.EN
|
||||
|
||||
|
||||
def test_language_from_tokens_ignores_unknown_and_missing_languages():
|
||||
tokens = [
|
||||
{"text": "Hello", "language": "klingon"},
|
||||
{"text": " world"},
|
||||
{"text": "!"},
|
||||
]
|
||||
|
||||
assert _language_from_tokens(tokens) is None
|
||||
|
||||
|
||||
def test_language_from_tokens_uses_first_language_on_tie():
|
||||
tokens = [
|
||||
{"text": "Hello", "language": "en"},
|
||||
{"text": " wereld", "language": "nl"},
|
||||
]
|
||||
|
||||
assert _language_from_tokens(tokens) == Language.EN
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_receive_messages_sets_final_transcription_language(monkeypatch):
|
||||
service = SonioxSTTService(api_key="test-key")
|
||||
pushed_frames = []
|
||||
traced_transcriptions = []
|
||||
|
||||
async def fake_push_frame(frame):
|
||||
pushed_frames.append(frame)
|
||||
|
||||
async def fake_handle_transcription(transcript, is_final, language=None):
|
||||
traced_transcriptions.append((transcript, is_final, language))
|
||||
|
||||
async def fake_stop_processing_metrics():
|
||||
pass
|
||||
|
||||
messages = [
|
||||
json.dumps(
|
||||
{
|
||||
"tokens": [
|
||||
{"text": "Ik", "is_final": True, "language": "nl"},
|
||||
{"text": " zoek", "is_final": True, "language": "nl"},
|
||||
{"text": " computer", "is_final": True, "language": "en"},
|
||||
{"text": END_TOKEN, "is_final": True},
|
||||
]
|
||||
}
|
||||
),
|
||||
json.dumps({"tokens": [], "finished": True}),
|
||||
]
|
||||
|
||||
service._websocket = _FakeWebsocket(messages)
|
||||
monkeypatch.setattr(service, "push_frame", fake_push_frame)
|
||||
monkeypatch.setattr(service, "_handle_transcription", fake_handle_transcription)
|
||||
monkeypatch.setattr(service, "stop_processing_metrics", fake_stop_processing_metrics)
|
||||
|
||||
await service._receive_messages()
|
||||
|
||||
final_frames = [frame for frame in pushed_frames if isinstance(frame, TranscriptionFrame)]
|
||||
assert len(final_frames) == 1
|
||||
assert final_frames[0].text == "Ik zoek computer"
|
||||
assert final_frames[0].language == Language.NL
|
||||
assert final_frames[0].finalized is True
|
||||
assert final_frames[0].result == [
|
||||
{"text": "Ik", "is_final": True, "language": "nl"},
|
||||
{"text": " zoek", "is_final": True, "language": "nl"},
|
||||
{"text": " computer", "is_final": True, "language": "en"},
|
||||
]
|
||||
assert traced_transcriptions == [("Ik zoek computer", True, Language.NL)]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_receive_messages_allows_final_transcription_without_language(monkeypatch):
|
||||
service = SonioxSTTService(api_key="test-key")
|
||||
pushed_frames = []
|
||||
traced_transcriptions = []
|
||||
|
||||
async def fake_push_frame(frame):
|
||||
pushed_frames.append(frame)
|
||||
|
||||
async def fake_handle_transcription(transcript, is_final, language=None):
|
||||
traced_transcriptions.append((transcript, is_final, language))
|
||||
|
||||
async def fake_stop_processing_metrics():
|
||||
pass
|
||||
|
||||
messages = [
|
||||
json.dumps(
|
||||
{
|
||||
"tokens": [
|
||||
{"text": "Tell", "is_final": True},
|
||||
{"text": " me", "is_final": True},
|
||||
{"text": " a", "is_final": True},
|
||||
{"text": " joke.", "is_final": True},
|
||||
{"text": END_TOKEN, "is_final": True},
|
||||
]
|
||||
}
|
||||
),
|
||||
json.dumps({"tokens": [], "finished": True}),
|
||||
]
|
||||
|
||||
service._websocket = _FakeWebsocket(messages)
|
||||
monkeypatch.setattr(service, "push_frame", fake_push_frame)
|
||||
monkeypatch.setattr(service, "_handle_transcription", fake_handle_transcription)
|
||||
monkeypatch.setattr(service, "stop_processing_metrics", fake_stop_processing_metrics)
|
||||
|
||||
await service._receive_messages()
|
||||
|
||||
final_frames = [frame for frame in pushed_frames if isinstance(frame, TranscriptionFrame)]
|
||||
assert len(final_frames) == 1
|
||||
assert final_frames[0].text == "Tell me a joke."
|
||||
assert final_frames[0].language is None
|
||||
assert final_frames[0].finalized is True
|
||||
assert traced_transcriptions == [("Tell me a joke.", True, None)]
|
||||
Reference in New Issue
Block a user