Compare commits

..

34 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
ea296babe9 Merge pull request #4498 from pipecat-ai/changelog-1.2.0
Release 1.2.0 - Changelog Update
2026-05-14 14:47:47 -07:00
aconchillo
b13af2b053 Update changelog for version 1.2.0 2026-05-14 21:45:36 +00:00
Aleix Conchillo Flaqué
7b6d878f07 update uv.lock 2026-05-14 14:41:38 -07:00
Aleix Conchillo Flaqué
8e405f15aa changelog: fix 4446.change.md file name 2026-05-14 14:38:54 -07:00
Aleix Conchillo Flaqué
44a40e8eb2 Merge pull request #4497 from pipecat-ai/aleix/fix-tts-context-id-fallback
Fall back to _turn_context_id in get_active_audio_context_id
2026-05-14 13:34:34 -07:00
Aleix Conchillo Flaqué
ea97cb1a78 Add changelog for #4497 2026-05-14 13:22:50 -07:00
Aleix Conchillo Flaqué
22650b1b56 Move QwenLLMService model into Settings in the qwen example
Mirrors the deprecation in ``QwenLLMService.__init__``: ``model`` should
be passed via ``settings=QwenLLMService.Settings(model=...)`` instead of
as a direct constructor arg.
2026-05-14 13:22:07 -07:00
Aleix Conchillo Flaqué
b76831e677 Fall back to _turn_context_id in get_active_audio_context_id
TTS services whose wire protocol does not echo the context_id back on
incoming audio (Sarvam, Smallest, Soniox, Inworld, ...) call
``get_active_audio_context_id()`` to tag each chunk. That accessor
returned only ``_playing_context_id`` — the playback-side cursor set
asynchronously by ``_audio_context_task_handler`` when it pops a context
off the serialization queue.

Result: incoming audio that arrived in the gap between contexts or at
the very start of a turn (before the playback loop popped) had
``context_id=None`` and was dropped with
``unable to append audio to context: no context ID provided``.

Fall back to ``_turn_context_id`` (the synthesis-side cursor, set as
soon as the turn's context is created) so the gap is covered without
prematurely nulling the playback cursor.
2026-05-14 13:22:00 -07:00
Mark Backman
b57111743f Merge pull request #4495 from pipecat-ai/mb/soniox-stt-lang-counter 2026-05-14 15:57:31 -04:00
Mark Backman
dcbb0070c9 Add changelog for Soniox language selection 2026-05-14 15:42:43 -04:00
Mark Backman
73278d3309 Use majority language for Soniox transcripts 2026-05-14 15:18:43 -04:00
Mark Backman
49bda11ae8 Merge pull request #4482 from pipecat-ai/mb/soniox-stt-token-language
Propagate Soniox token language
2026-05-13 16:28:56 -04:00
Aleix Conchillo Flaqué
07640582ce Merge pull request #4467 from pipecat-ai/aleix/fix-tts-ttfb-tracing
Fix metrics.ttfb and partial output on TTS/STT/LLM OpenTelemetry spans
2026-05-13 13:10:52 -07:00
Mark Backman
078af6969a Merge pull request #4473 from timofey-TK/inworld-tts-v2
Add support for Inworld TTS v2 fields
2026-05-13 15:32:16 -04:00
Mark Backman
9f40ba21c2 Add changelog for Soniox language fix 2026-05-13 15:26:10 -04:00
Mark Backman
82f0896d6a Propagate Soniox token language 2026-05-13 15:23:22 -04:00
kompfner
7e4cd23de4 Merge pull request #4474 from pipecat-ai/pk/inworld-realtime-tools
Extend cancel_on_interruption=False to Inworld Realtime (best-effort + warning)
2026-05-13 15:12:34 -04:00
TimTk
97f50c8aa2 Address review: use resolve_language, narrow delivery_mode type, update changelog
- Replace custom LANGUAGE_MAP fallback in language_to_inworld_language with
  resolve_language(language, LANGUAGE_MAP, use_base_code=False) to match the
  pattern used by other services and restore the unverified-language warning
- Tighten delivery_mode type from str to Literal["STABLE", "BALANCED", "CREATIVE"]
- Update changelog entry to mention delivery_mode and language normalization
2026-05-13 21:43:02 +03:00
Mark Backman
08680732f6 Merge pull request #4475 from pipecat-ai/mb/cartesia-korean-fix
Fix Cartesia CJK timestamp spacing
2026-05-13 13:20:42 -04:00
Mark Backman
064b68aa01 Fix Cartesia CJK timestamp spacing 2026-05-13 13:13:40 -04:00
Filipi da Silva Fuchter
b0f8ea7e28 Merge pull request #4477 from pipecat-ai/filipi/nvidia_sagemaker_follow_up
NVidia TTS Sagemaker: Buffering audio to avoid glitches.
2026-05-13 14:06:44 -03:00
filipi87
ad50c8d5d5 Buffering audio to avoid glitches. 2026-05-13 14:01:03 -03:00
Timofey
39e7f9e354 Fix Inworld TTS v2 request fields 2026-05-13 11:17:31 +03:00
Aleix Conchillo Flaqué
7cc7968abb Fix pyright errors in service_decorators.py 2026-05-12 20:10:43 -07:00
Aleix Conchillo Flaqué
52d8008783 Add LLM interruption changelog entry for #4467 2026-05-12 20:10:43 -07:00
Aleix Conchillo Flaqué
a3ce963b54 Capture partial LLM output on interruption
traced_llm only attached the aggregated ``output`` attribute to the
span after the wrapped function returned successfully. When the LLM
call was cancelled mid-stream (e.g. interruption during generation),
the accumulated text was discarded — the span had no ``output``.

Moved the attribute assignment into the ``finally`` block alongside
the existing TTFB write so the partial text we already captured via
the patched ``push_frame`` lands on the span regardless of whether
``f`` returned normally, raised, or was cancelled.
2026-05-12 20:10:43 -07:00
Aleix Conchillo Flaqué
e70ee603b2 Add STT changelog entry for #4467 2026-05-12 20:10:43 -07:00
Aleix Conchillo Flaqué
111e59a7b1 Apply the same span-scope fix to traced_stt
@traced_stt had the same root issue as @traced_tts: the span lifetime
was tied to a per-transcript handler call, which doesn't match the
operation we want to trace. Now uses the __set_name__ pattern to
install:

- A push_frame wrapper that drives one STT span per finalized
  TranscriptionFrame. The span is anchored at speech start
  (VADUserStartedSpeakingFrame.timestamp - start_secs) but lazy-opened
  on the first TranscriptionFrame. Opening earlier (on VAD or
  UserStartedSpeakingFrame) races with TurnTraceObserver._handle_turn_started,
  which runs as a background task via _call_event_handler (sync=False),
  so the span would end up parented to the previous turn. Deferring
  the open to the first TranscriptionFrame avoids that race because
  STT only emits transcripts well after the turn observer has set
  the current turn's context.

- A stop_ttfb_metrics wrapper that closes the span on the TTFB-timeout
  path (called with end_time != None from stt_service.py:566). The
  span is marked stt.timed_out=True and its end_time is pinned to
  the timeout's end_time (= _last_transcript_time) so the duration
  reflects when STT actually stopped responding, not when the timeout
  fired.

Span lifecycle:
- Open: lazy on first TranscriptionFrame of a segment.
- Close (success): finalized=True attaches metrics.ttfb and closes
  the span. Multiple finalized transcripts in a single turn produce
  multiple spans.
- Close (timeout): stop_ttfb_metrics(end_time=...) closes with
  stt.timed_out=True.
- Close (orphan): UserStoppedSpeakingFrame closes any still-open
  span with stt.incomplete=True (covers turns where no finalized
  transcript and no timeout fired).

No changes required outside service_decorators.py — stt_service.py
and every per-service file are untouched.
2026-05-12 20:10:43 -07:00
Aleix Conchillo Flaqué
079282d140 Add changelog for #4467 2026-05-12 20:10:43 -07:00
Aleix Conchillo Flaqué
0ccdd808e6 Fix traced_tts so metrics.ttfb reflects the real TTFB
Previously @traced_tts scoped the span to the lifetime of run_tts(). For
streaming TTS services run_tts() returns as soon as the synthesis request
is sent, long before audio chunks arrive, so:

- The span duration measured the WebSocket-send time, not synthesis time.
- The first synthesis recorded the WS-send duration as metrics.ttfb (via
  the in-progress fallback in FrameProcessorMetrics.ttfb).
- Subsequent syntheses recorded the previous call's TTFB on the current
  span (off-by-one).

The decorator now uses a __set_name__ descriptor to wrap the owning
class's setup() at class definition time. setup() installs per-instance
patches on create_audio_context, append_to_audio_context,
remove_audio_context, on_audio_context_completed, and
reset_active_audio_context. These patches own the span lifetime:

- create_audio_context: open span, set baseline attributes.
- append_to_audio_context: record metrics.ttfb on the first
  TTSAudioRawFrame (when stop_ttfb_metrics has produced a real value),
  end span on appended TTSStoppedFrame.
- on_audio_context_completed: end span on natural completion (handles
  services that auto-push TTSStoppedFrame via push_frame, bypassing
  append_to_audio_context).
- remove_audio_context: safety net for explicit removal paths.
- reset_active_audio_context: interruption hook (always reached from
  _handle_interruption); marks the span tts.interrupted=true only when
  nothing else has closed it.

The run_tts wrapper now only attaches per-call attributes (text,
metrics.character_count) to the already-open span. No changes required
in tts_service.py or in any of the per-service files.
2026-05-12 20:10:43 -07:00
Paul Kompfner
863a1bf177 Add changelog for #4474 2026-05-12 16:04:12 -04:00
Paul Kompfner
58333b2705 Extend cancel_on_interruption=False to InworldRealtimeLLMService (best-effort)
Same async-tool routing approach as #4441: detect async-tool messages in
the LLM context, deliver the final result via the formal tool-result
channel.

Caveat: as of this writing, Inworld Realtime doesn't appear to handle
the resulting delayed tool result reliably, so the routing is
best-effort and the service emits a one-time warning when async-tool
messages are seen. Streamed intermediate results remain unsupported.

Also adds function calling to the realtime-inworld.py example, and
softens the Inworld mention in the #4447 changelog now that the
exclusion is being closed.
2026-05-12 16:03:34 -04:00
TimTk
ecaff1d1eb Fix changelog fragment number 2026-05-12 22:21:59 +03:00
TimTk
9b55d4ddd4 Add support for Inworld TTS v2 fields 2026-05-12 22:13:09 +03:00
75 changed files with 2466 additions and 2136 deletions

View File

@@ -7,6 +7,494 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- towncrier release notes start -->
## [1.2.0] - 2026-05-14
### Added
- Added a `session_id` field to `RunnerArguments` so bots can log or trace a
per-session identifier in local development the same way they can in Pipecat
Cloud. The development runner now mints a UUID at every construction site,
and paths that already returned a `sessionId` to the caller (Daily `/start`,
dial-in webhook) share that same UUID with the runner args instead of
generating two. The SmallWebRTC `/api/offer` endpoint also accepts an
optional `session_id` query parameter so the `/sessions/{session_id}/...`
proxy can thread it through.
(PR [#4385](https://github.com/pipecat-ai/pipecat/pull/4385))
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService`
for controlling Cartesia's server-side text buffering. When unset, Pipecat
picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE`
mode (custom buffering — avoids stacking client-side aggregation on top of
Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode
(Cartesia's managed buffering applies). Pass an explicit value (05000ms) to
override.
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and
`DeepgramHttpTTSService` so callers can opt out of the Deepgram Model
Improvement Program. When set, the value is forwarded to Deepgram as a query
parameter on the speak request. Defaults to `None`, which preserves the
existing behavior. See https://dpgr.am/deepgram-mip for pricing implications
before enabling.
(PR [#4400](https://github.com/pipecat-ai/pipecat/pull/4400))
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set
via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that
appends a developer-role message to the context whenever `LLMSetToolsFrame`
changes the set of advertised standard tools. Helps the LLM stay coherent
across mid-conversation tool changes, mitigating several flavors of
tool-call-related hallucination: calling tools that have been removed,
avoiding tools that have been re-added, and hallucinating output (made-up
answers or tool-call-shaped non-tool-calls) when tools are unavailable.
(PR [#4404](https://github.com/pipecat-ai/pipecat/pull/4404))
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in
`pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the
inference-triggered event and suppresses `on_user_turn_stopped`, leaving
finalization to another strategy in the chain such as
`LLMTurnCompletionUserTurnStopStrategy`.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop`
a generic stop strategy that finalizes the user turn whenever a
`UserTurnInferenceCompletedFrame` arrives, regardless of which component
produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base;
future producers (Flux, custom end-of-turn classifiers, etc.) can use the
base directly or subclass it to add producer-specific setup.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Added `on_user_turn_inference_triggered`, a new event on the user turn
controller, processor, aggregator and stop strategies that fires when a
strategy has enough signal to start LLM inference. By default it fires
together with `on_user_turn_stopped`; a gating strategy can fire only the
inference-triggered event and defer finalization to a peer.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Added `FilterIncompleteUserTurnStrategies` in
`pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization
that wraps the detector chain with `deferred(...)` and appends
`LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case:
`user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass
`config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`.
When installed, the strategy gates `on_user_turn_stopped` on a
`UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by
any component that can judge turn completeness — e.g. the
`UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout`
provides a safety net if no completion frame ever arrives.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Added first-class RTVI support for the UI Agent Protocol:
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server
messages, plus `ui-command` and `ui-task` server-to-client messages, with
paired `*Data` / `*Message` pydantic models.
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`,
`Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching
default handlers live in `@pipecat-ai/client-react`.
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`,
and `ui-cancel-task` messages.
- Adds five UI pipeline frames, mirroring the `client-message`
frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` /
`RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` /
`UITaskMessage` envelopes, while the processor pushes inbound
`RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame`
alongside `on_ui_message`.
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.
(PR [#4407](https://github.com/pipecat-ai/pipecat/pull/4407))
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore
processor now resolve credentials via the standard boto3 provider chain (EC2
instance profiles, EKS pod roles / IRSA, ECS task roles, SSO,
`~/.aws/credentials`) when explicit credentials and `AWS_*` environment
variables are absent. Services running with IAM roles no longer need to
export static credentials.
(PR [#4416](https://github.com/pipecat-ai/pipecat/pull/4416))
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can
bias transcription for both file-based and realtime transcription.
(PR [#4426](https://github.com/pipecat-ai/pipecat/pull/4426))
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and
`DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum
silence duration before the watchdog sends a silence packet to prevent
dangling turns. The actual threshold is `max(chunk_duration * 2,
watchdog_min_timeout)`, so it also adapts automatically to the audio chunk
size in use.
(PR [#4430](https://github.com/pipecat-ai/pipecat/pull/4430))
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on
models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini
2.x); the conversation now continues while the tool runs. On models that
don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time
warning explaining the limitation. (Note: an intermittent 1008 error can
occasionally fire on Gemini 2.5 during long-running tool calls; we
auto-reconnect.)
(PR [#4448](https://github.com/pipecat-ai/pipecat/pull/4448))
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition
using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint.
Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is
VAD-aware, and automatically reconnects on error.
(PR [#4464](https://github.com/pipecat-ai/pipecat/pull/4464))
- Added NVIDIA Magpie TTS services via AWS SageMaker:
`NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM
back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream
with full interruption support via `InterruptibleTTSService`).
(PR [#4464](https://github.com/pipecat-ai/pipecat/pull/4464))
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`,
for use with reasoning-capable Realtime models such as `gpt-realtime-2`.
(PR [#4470](https://github.com/pipecat-ai/pipecat/pull/4470))
- Inworld TTS updates:
- Added `delivery_mode` setting (`STABLE`/`BALANCED`/`CREATIVE`) to
`InworldTTSService` and `InworldHttpTTSService`, enabling the
stability-vs-creativity tradeoff in `inworld-tts-2`.
- Added language support to `InworldTTSService` and
`InworldHttpTTSService`. The `language` setting is now forwarded to the API,
and a new `language_to_inworld_language()` helper normalizes Pipecat
`Language` enums to Inworld's BCP-47 locale tags.
(PR [#4473](https://github.com/pipecat-ai/pipecat/pull/4473))
### Changed
- Updated the default `SonioxTTSService` model from `tts-rt-v1-preview` to the
generally available `tts-rt-v1`.
(PR [#4386](https://github.com/pipecat-ai/pipecat/pull/4386))
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16`
to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the
`use_normalized_timestamps` and `max_buffer_delay_ms` fields.
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead
of the deprecated `use_original_timestamps` field. Word timestamps now
reflect what was actually spoken (post text-normalization and
pronunciation-dictionary substitution), matching the convention Pipecat uses
for ElevenLabs. This is a behavior change for `sonic-3` users, who were
previously receiving timestamps tied to the input transcript.
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
- Broadened `tool_resources` to `app_resources` for easy access not just in
tool handlers but in other places like custom `FrameProcessor`s. Three
changes: a rename (`tool_resources``app_resources`), a new `app_resources`
property on `PipelineTask`, and a new `pipeline_task` property on
`FrameProcessor`. Tool handlers now read `params.app_resources`; custom
processors read `self.pipeline_task.app_resources`. The previous
`tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and
`FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit
`DeprecationWarning`s.
(PR [#4395](https://github.com/pipecat-ai/pipecat/pull/4395))
- Lowered the per-message log in
`SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App
messages can be high-frequency and were noisy at debug level; set the loguru
level to `TRACE` to see them again.
(PR [#4397](https://github.com/pipecat-ai/pipecat/pull/4397))
- Changed the default model for `GrokRealtimeLLMService` to
`grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The
previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is
being removed.
(PR [#4401](https://github.com/pipecat-ai/pipecat/pull/4401))
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to
`inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`,
`InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing
users can pin the prior model explicitly via the `model`/`tts_model`
argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid
model IDs.
(PR [#4422](https://github.com/pipecat-ai/pipecat/pull/4422))
- Changed the default model for `GrokLLMService` from `grok-3` to
`grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.
(PR [#4429](https://github.com/pipecat-ai/pipecat/pull/4429))
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic:
`max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms.
This prevents false silence injections when large audio chunks are sent at
lower frequency.
(PR [#4430](https://github.com/pipecat-ai/pipecat/pull/4430))
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the
turn is complete (on `on_turn_context_completed`) rather than waiting until
all audio has finished playing back. The `isFinal` message from ElevenLabs is
now used to signal `TTSStoppedFrame` and clean up the audio context,
improving turn transition timing.
(PR [#4433](https://github.com/pipecat-ai/pipecat/pull/4433))
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio
encoding by default, which returns audio bytes without headers.
(PR [#4446](https://github.com/pipecat-ai/pipecat/pull/4446))
- Moved `create_task`, `cancel_task`, the `task_manager` property, and
`setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom
`BaseObject` subclasses (turn strategies, controllers, etc.) now inherit
these methods directly instead of reimplementing the task manager wiring.
Owners propagate the task manager to their child `BaseObject`s via `await
child.setup(task_manager)`.
(PR [#4449](https://github.com/pipecat-ai/pipecat/pull/4449))
- Changed the default OpenAI Realtime input audio transcription model from
`gpt-4o-transcribe` to `gpt-realtime-whisper` for both
`OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does
not accept the `prompt` parameter; if a prompt is supplied alongside
`gpt-realtime-whisper`, it is dropped automatically and a warning is logged.
To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or
`"gpt-4o-mini-transcribe"`).
(PR [#4450](https://github.com/pipecat-ai/pipecat/pull/4450))
- Updated the default model for `CartesiaTTSService` and
`CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.
(PR [#4462](https://github.com/pipecat-ai/pipecat/pull/4462))
- Changed the default model for `OpenAIRealtimeLLMService` from
`gpt-realtime-1.5` to `gpt-realtime-2`.
(PR [#4472](https://github.com/pipecat-ai/pipecat/pull/4472))
### Deprecated
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use
`user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add
`LLMTurnCompletionUserTurnStopStrategy` to a custom
`user_turn_strategies.stop`) instead. Setting the legacy flag still works for
one release: the aggregator emits a `DeprecationWarning` and rewires the
strategies as if you had passed `FilterIncompleteUserTurnStrategies`
directly.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the
`create_file_resampler()` / `create_stream_resampler()` factories).
Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class
will be removed in Pipecat 2.0 along with the default `resampy` and `numba`
dependencies.
(PR [#4428](https://github.com/pipecat-ai/pipecat/pull/4428))
### Fixed
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as
`ErrorFrame`s. The latest API emits a `flush_done` per transcript when
server-side buffering is disabled; Pipecat now consumes them silently since
each turn already has its own `context_id`.
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`,
`VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance
(e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both
the class and an instance.
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200
response — one with the API's error text and a second, less informative
"Unknown error" frame from the outer exception handler. It now pushes a
single frame that includes the HTTP status code and returns cleanly.
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally
for user turn stop strategies. It is now only imported when
`default_user_turn_stop_strategies()` is called. This improves startup time
and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning
when the default stop strategies are not used.
(PR [#4393](https://github.com/pipecat-ai/pipecat/pull/4393))
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was
stored in `Settings` but never sent to xAI, so every session silently fell
back to xAI's server-side default. The model is now passed via the `?model=`
query parameter on the WebSocket URL as xAI's Voice Agent API requires.
(PR [#4401](https://github.com/pipecat-ai/pipecat/pull/4401))
- Fixed `on_user_turn_stopped` firing prematurely when
`filter_incomplete_user_turns` was enabled. The event now fires only after
the LLM confirms the user turn is complete (`✓`); previously the smart-turn
detector's tentative stop was bubbling up before the LLM had a chance to veto
it, causing observers, transcript appenders and UI indicators to receive an
early — and sometimes duplicated — signal.
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting
across two assistant messages in the LLM context and not surfacing in
`on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted
at the end of a TTS context now carries a PTS just past the last word so it
can't overtake clock-queued `TTSTextFrame`s in the transport's output, and
`LLMAssistantAggregator` now triggers
`on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the
frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting
transcripts).
(PR [#4414](https://github.com/pipecat-ai/pipecat/pull/4414))
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged
words (e.g. `bookLook`) when using Flash models. Flash often splits sentences
mid-stream into alignment chunks that begin with a real inter-word space, but
the previous fix unconditionally stripped that space from every chunk.
Leading spaces are now stripped only on the first alignment chunk of an
utterance, so subsequent chunks correctly flush partial words across
boundaries.
(PR [#4415](https://github.com/pipecat-ai/pipecat/pull/4415))
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor
erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
was set in the environment. The half-populated kwargs are no longer forwarded
to aioboto3; partial env-var configurations now fall through to the boto3
credential chain like fully-unset configurations do.
(PR [#4416](https://github.com/pipecat-ai/pipecat/pull/4416))
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing
romanized/normalized text to the LLM context. With non-Latin input (e.g.,
Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao
!` instead of `你好!`), which then degraded subsequent LLM turns. The services
now consume `alignment` by default and only switch to `normalizedAlignment` /
`normalized_alignment` when `pronunciation_dictionary_locators` is configured
(where `alignment` has overlapping restarts that produce duplicated/garbled
words, per #4316). Both fields are read with preferred-with-fallback
semantics since each is nullable per the API schema.
(PR [#4424](https://github.com/pipecat-ai/pipecat/pull/4424))
- Fixed a deadlock in `TTSService` that could permanently stall pipeline
processing when all three conditions occurred together:
`pause_frame_processing=True`, an interruption arrived before any TTS audio
was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`,
`FunctionCallResultFrame`) was in the processing queue at that moment. The
process task would block on `__process_event.wait()` indefinitely because
`BotStoppedSpeakingFrame` never arrives (no audio was played) and the
interruption handler did not resume processing. Affects services using
`pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and
ResembleAI.
(PR [#4431](https://github.com/pipecat-ai/pipecat/pull/4431))
- Fixed interruptions being delayed when a slow non-uninterruptible frame was
processing and an uninterruptible frame was waiting in the queue. The bot
would stall until the slow frame finished instead of cancelling it
immediately on interruption.
(PR [#4434](https://github.com/pipecat-ai/pipecat/pull/4434))
- Fixed `TTSService` dropping uninterruptible frames (e.g.
`FunctionCallResultFrame`) from its internal serialization queue when an
interruption occurs. Previously, the queue was recreated on every
interruption, silently discarding any queued frames. The queue is now reset
instead of recreated, preserving uninterruptible frames so they are always
delivered downstream.
(PR [#4435](https://github.com/pipecat-ai/pipecat/pull/4435))
- Fixed a race condition in the Daily transport that caused `AttributeError:
'NoneType' object has no attribute 'send_app_message'` when tearing down a
pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the
same `DailyTransportClient` and both call `cleanup()`, which was releasing
the underlying `CallClient` on the first call — leaving the second caller
with a `None` client.
(PR [#4440](https://github.com/pipecat-ai/pipecat/pull/4440))
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService`
and `OpenAIRealtimeLLMService`. These services previously honored the flag by
simply not cancelling in-flight function calls on interruption; the
introduction of the new async-tool mechanism (which threads
started/intermediate/final messages through the LLM context) broke that path
because the realtime services didn't know how to interpret those messages.
Note that new-style streamed intermediate results
(`FunctionCallResultProperties(is_final=False)`) are not supported on these
realtime services. Similar fixes for other impacted realtime services are
forthcoming.
(PR [#4441](https://github.com/pipecat-ai/pipecat/pull/4441))
- Fixed two misspelled Gemini TTS voice names in
`GeminiTTSService.AVAILABLE_VOICES`.
(PR [#4443](https://github.com/pipecat-ai/pipecat/pull/4443))
- Extended the `cancel_on_interruption=False` regression fix to
`GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and
`UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in
#4441 (each service detects async-tool messages in the LLM context and routes
the final result to its formal tool-result channel; Azure inherits
transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different
approach because its API freezes the conversation between
`client_tool_invocation` and the matching `client_tool_result` — for
async-registered functions it now ships a placeholder `client_tool_result`
immediately when the function is invoked (to unfreeze the conversation), then
injects the real result as user-side text once the tool finishes. Streamed
intermediate results (`FunctionCallResultProperties(is_final=False)`) are
still not supported on any of these realtime services. `GeminiLiveLLMService`
and `InworldRealtimeLLMService` are excluded for now: Gemini Live's
async-tool path needs deeper investigation, and Inworld tool calling needs to
be sorted out first.
(PR [#4447](https://github.com/pipecat-ai/pipecat/pull/4447))
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses
(observed with `gpt-realtime-2`). A single response can now contain more than
one audio item, and the first item's `audio.done` may arrive after the second
item's deltas have started. Deltas still arrive strictly in playback order,
so we continue to forward them as received (matching OpenAI's reference
implementation). The fix removes spurious warnings, ensures truncation always
targets the latest audio item, and emits a single bracketing
`TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is
now pushed on `response.done`).
(PR [#4465](https://github.com/pipecat-ai/pipecat/pull/4465))
- Fixed missing `output` attribute on LLM OpenTelemetry spans when the LLM call
is interrupted mid-stream.
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
- Fixed incorrect `metrics.ttfb` on STT OpenTelemetry spans, and parented them
to the current turn span.
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
- Fixed incorrect `metrics.ttfb` on TTS OpenTelemetry spans for streaming
services.
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
- Extended the `cancel_on_interruption=False` regression fix to
`InworldRealtimeLLMService`. Uses the same approach as in #4441 (the service
detects async-tool messages in the LLM context and routes the final result to
its formal tool-result channel). Note: as of this writing, Inworld Realtime
doesn't appear to handle the resulting delayed tool result reliably — the
routing is best-effort and the service surfaces a one-time warning when
async-tool messages are seen. Streamed intermediate results
(`FunctionCallResultProperties(is_final=False)`) are still not supported on
this realtime service. (Inworld was excluded from #4447 pending resolution of
an unrelated tool-calling issue, which turned out to be an account-level
matter.)
(PR [#4474](https://github.com/pipecat-ai/pipecat/pull/4474))
- Fixed Cartesia TTS Korean word timestamps to use normal spacing rules,
preserving word boundaries and per-word timestamp alignment during downstream
aggregation.
(PR [#4475](https://github.com/pipecat-ai/pipecat/pull/4475))
- Fixed Cartesia TTS Chinese and Japanese timestamp grouping to preserve
provider text spacing, avoiding artificial spaces when timestamp groups are
reassembled downstream.
(PR [#4475](https://github.com/pipecat-ai/pipecat/pull/4475))
- Fixed `SonioxSTTService` final transcription frames missing detected language
metadata when Soniox returns token-level language annotations.
(PR [#4482](https://github.com/pipecat-ai/pipecat/pull/4482))
- Fixed Soniox final transcription language detection to use the most common
recognized token language, avoiding mislabeling an utterance when the last
token is tagged with a different language.
(PR [#4495](https://github.com/pipecat-ai/pipecat/pull/4495))
- Fixed dropped audio in streaming TTS services whose wire protocol doesn't
echo `context_id` back on incoming audio (Sarvam, Smallest, Soniox, Inworld,
and others). Previously, audio that arrived between contexts or at the very
start of a turn was tagged with `context_id=None` and silently dropped with
an "unable to append audio to context: no context ID provided" debug log.
`TTSService.get_active_audio_context_id()` now falls back to the
synthesis-side `_turn_context_id` when the playback cursor isn't set yet.
(PR [#4497](https://github.com/pipecat-ai/pipecat/pull/4497))
### Security
- Fixed a path traversal issue in the development runner's
`/files/{filename:path}` download endpoint. Previously, when the runner was
started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could
escape the configured folder because `%2F`-encoded separators bypassed
Starlette's path normalisation. The endpoint now resolves the joined path and
rejects any filename that escapes the allowed base with a 403, and also
returns 404 (instead of an implicit `null` 200) when `--folder` is unset.
(PR [#4417](https://github.com/pipecat-ai/pipecat/pull/4417))
## [1.1.0] - 2026-04-27
### Added

View File

@@ -1 +0,0 @@
- Added a `session_id` field to `RunnerArguments` so bots can log or trace a per-session identifier in local development the same way they can in Pipecat Cloud. The development runner now mints a UUID at every construction site, and paths that already returned a `sessionId` to the caller (Daily `/start`, dial-in webhook) share that same UUID with the runner args instead of generating two. The SmallWebRTC `/api/offer` endpoint also accepts an optional `session_id` query parameter so the `/sessions/{session_id}/...` proxy can thread it through.

View File

@@ -1 +0,0 @@
- Updated the default `SonioxTTSService` model from `tts-rt-v1-preview` to the generally available `tts-rt-v1`.

View File

@@ -1 +0,0 @@
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService` for controlling Cartesia's server-side text buffering. When unset, Pipecat picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE` mode (custom buffering — avoids stacking client-side aggregation on top of Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode (Cartesia's managed buffering applies). Pass an explicit value (05000ms) to override.

View File

@@ -1 +0,0 @@
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16` to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the `use_normalized_timestamps` and `max_buffer_delay_ms` fields.

View File

@@ -1 +0,0 @@
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead of the deprecated `use_original_timestamps` field. Word timestamps now reflect what was actually spoken (post text-normalization and pronunciation-dictionary substitution), matching the convention Pipecat uses for ElevenLabs. This is a behavior change for `sonic-3` users, who were previously receiving timestamps tied to the input transcript.

View File

@@ -1 +0,0 @@
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200 response — one with the API's error text and a second, less informative "Unknown error" frame from the outer exception handler. It now pushes a single frame that includes the HTTP status code and returns cleanly.

View File

@@ -1 +0,0 @@
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`, `VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance (e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both the class and an instance.

View File

@@ -1 +0,0 @@
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as `ErrorFrame`s. The latest API emits a `flush_done` per transcript when server-side buffering is disabled; Pipecat now consumes them silently since each turn already has its own `context_id`.

View File

@@ -1 +0,0 @@
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally for user turn stop strategies. It is now only imported when `default_user_turn_stop_strategies()` is called. This improves startup time and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning when the default stop strategies are not used.

View File

@@ -1 +0,0 @@
- Broadened `tool_resources` to `app_resources` for easy access not just in tool handlers but in other places like custom `FrameProcessor`s. Three changes: a rename (`tool_resources``app_resources`), a new `app_resources` property on `PipelineTask`, and a new `pipeline_task` property on `FrameProcessor`. Tool handlers now read `params.app_resources`; custom processors read `self.pipeline_task.app_resources`. The previous `tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and `FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit `DeprecationWarning`s.

View File

@@ -1 +0,0 @@
- Lowered the per-message log in `SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App messages can be high-frequency and were noisy at debug level; set the loguru level to `TRACE` to see them again.

View File

@@ -1 +0,0 @@
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and `DeepgramHttpTTSService` so callers can opt out of the Deepgram Model Improvement Program. When set, the value is forwarded to Deepgram as a query parameter on the speak request. Defaults to `None`, which preserves the existing behavior. See https://dpgr.am/deepgram-mip for pricing implications before enabling.

View File

@@ -1 +0,0 @@
- Changed the default model for `GrokRealtimeLLMService` to `grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is being removed.

View File

@@ -1 +0,0 @@
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was stored in `Settings` but never sent to xAI, so every session silently fell back to xAI's server-side default. The model is now passed via the `?model=` query parameter on the WebSocket URL as xAI's Voice Agent API requires.

View File

@@ -1 +0,0 @@
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that appends a developer-role message to the context whenever `LLMSetToolsFrame` changes the set of advertised standard tools. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable.

View File

@@ -1 +0,0 @@
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.

View File

@@ -1 +0,0 @@
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.

View File

@@ -1 +0,0 @@
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.

View File

@@ -1 +0,0 @@
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.

View File

@@ -1 +0,0 @@
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.

View File

@@ -1 +0,0 @@
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.

View File

@@ -1 +0,0 @@
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.

View File

@@ -1,6 +0,0 @@
- Added first-class RTVI support for the UI Agent Protocol:
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server messages, plus `ui-command` and `ui-task` server-to-client messages, with paired `*Data` / `*Message` pydantic models.
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`, `Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching default handlers live in `@pipecat-ai/client-react`.
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`, and `ui-cancel-task` messages.
- Adds five UI pipeline frames, mirroring the `client-message` frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` / `RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` / `UITaskMessage` envelopes, while the processor pushes inbound `RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame` alongside `on_ui_message`.
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.

View File

@@ -1 +0,0 @@
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting across two assistant messages in the LLM context and not surfacing in `on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted at the end of a TTS context now carries a PTS just past the last word so it can't overtake clock-queued `TTSTextFrame`s in the transport's output, and `LLMAssistantAggregator` now triggers `on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting transcripts).

View File

@@ -1 +0,0 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged words (e.g. `bookLook`) when using Flash models. Flash often splits sentences mid-stream into alignment chunks that begin with a real inter-word space, but the previous fix unconditionally stripped that space from every chunk. Leading spaces are now stripped only on the first alignment chunk of an utterance, so subsequent chunks correctly flush partial words across boundaries.

View File

@@ -1 +0,0 @@
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor now resolve credentials via the standard boto3 provider chain (EC2 instance profiles, EKS pod roles / IRSA, ECS task roles, SSO, `~/.aws/credentials`) when explicit credentials and `AWS_*` environment variables are absent. Services running with IAM roles no longer need to export static credentials.

View File

@@ -1 +0,0 @@
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` was set in the environment. The half-populated kwargs are no longer forwarded to aioboto3; partial env-var configurations now fall through to the boto3 credential chain like fully-unset configurations do.

View File

@@ -1 +0,0 @@
- Fixed a path traversal issue in the development runner's `/files/{filename:path}` download endpoint. Previously, when the runner was started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could escape the configured folder because `%2F`-encoded separators bypassed Starlette's path normalisation. The endpoint now resolves the joined path and rejects any filename that escapes the allowed base with a 403, and also returns 404 (instead of an implicit `null` 200) when `--folder` is unset.

View File

@@ -1 +0,0 @@
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to `inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`, `InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing users can pin the prior model explicitly via the `model`/`tts_model` argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid model IDs.

View File

@@ -1 +0,0 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing romanized/normalized text to the LLM context. With non-Latin input (e.g., Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao !` instead of `你好!`), which then degraded subsequent LLM turns. The services now consume `alignment` by default and only switch to `normalizedAlignment` / `normalized_alignment` when `pronunciation_dictionary_locators` is configured (where `alignment` has overlapping restarts that produce duplicated/garbled words, per #4316). Both fields are read with preferred-with-fallback semantics since each is nullable per the API schema.

View File

@@ -1 +0,0 @@
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can bias transcription for both file-based and realtime transcription.

View File

@@ -1 +0,0 @@
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the `create_file_resampler()` / `create_stream_resampler()` factories). Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class will be removed in Pipecat 2.0 along with the default `resampy` and `numba` dependencies.

View File

@@ -1 +0,0 @@
- Changed the default model for `GrokLLMService` from `grok-3` to `grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.

View File

@@ -1 +0,0 @@
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.

View File

@@ -1 +0,0 @@
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.

View File

@@ -1 +0,0 @@
- Fixed a deadlock in `TTSService` that could permanently stall pipeline processing when all three conditions occurred together: `pause_frame_processing=True`, an interruption arrived before any TTS audio was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`, `FunctionCallResultFrame`) was in the processing queue at that moment. The process task would block on `__process_event.wait()` indefinitely because `BotStoppedSpeakingFrame` never arrives (no audio was played) and the interruption handler did not resume processing. Affects services using `pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and ResembleAI.

View File

@@ -1 +0,0 @@
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.

View File

@@ -1 +0,0 @@
- Fixed interruptions being delayed when a slow non-uninterruptible frame was processing and an uninterruptible frame was waiting in the queue. The bot would stall until the slow frame finished instead of cancelling it immediately on interruption.

View File

@@ -1 +0,0 @@
- Fixed `TTSService` dropping uninterruptible frames (e.g. `FunctionCallResultFrame`) from its internal serialization queue when an interruption occurs. Previously, the queue was recreated on every interruption, silently discarding any queued frames. The queue is now reset instead of recreated, preserving uninterruptible frames so they are always delivered downstream.

View File

@@ -1 +0,0 @@
- Fixed a race condition in the Daily transport that caused `AttributeError: 'NoneType' object has no attribute 'send_app_message'` when tearing down a pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the same `DailyTransportClient` and both call `cleanup()`, which was releasing the underlying `CallClient` on the first call — leaving the second caller with a `None` client.

View File

@@ -1 +0,0 @@
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService` and `OpenAIRealtimeLLMService`. These services previously honored the flag by simply not cancelling in-flight function calls on interruption; the introduction of the new async-tool mechanism (which threads started/intermediate/final messages through the LLM context) broke that path because the realtime services didn't know how to interpret those messages. Note that new-style streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are not supported on these realtime services. Similar fixes for other impacted realtime services are forthcoming.

View File

@@ -1 +0,0 @@
- Fixed two misspelled Gemini TTS voice names in `GeminiTTSService.AVAILABLE_VOICES`.

View File

@@ -1 +0,0 @@
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio encoding by default, which returns audio bytes without headers.

View File

@@ -1 +0,0 @@
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.

View File

@@ -1 +0,0 @@
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini 2.x); the conversation now continues while the tool runs. On models that don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time warning explaining the limitation. (Note: an intermittent 1008 error can occasionally fire on Gemini 2.5 during long-running tool calls; we auto-reconnect.)

View File

@@ -1 +0,0 @@
- Moved `create_task`, `cancel_task`, the `task_manager` property, and `setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom `BaseObject` subclasses (turn strategies, controllers, etc.) now inherit these methods directly instead of reimplementing the task manager wiring. Owners propagate the task manager to their child `BaseObject`s via `await child.setup(task_manager)`.

View File

@@ -1 +0,0 @@
- Changed the default OpenAI Realtime input audio transcription model from `gpt-4o-transcribe` to `gpt-realtime-whisper` for both `OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does not accept the `prompt` parameter; if a prompt is supplied alongside `gpt-realtime-whisper`, it is dropped automatically and a warning is logged. To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or `"gpt-4o-mini-transcribe"`).

View File

@@ -1 +0,0 @@
- Updated the default model for `CartesiaTTSService` and `CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.

View File

@@ -1 +0,0 @@
- Added NVIDIA Magpie TTS services via AWS SageMaker: `NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream with full interruption support via `InterruptibleTTSService`).

View File

@@ -1 +0,0 @@
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint. Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is VAD-aware, and automatically reconnects on error.

View File

@@ -1 +0,0 @@
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses (observed with `gpt-realtime-2`). A single response can now contain more than one audio item, and the first item's `audio.done` may arrive after the second item's deltas have started. Deltas still arrive strictly in playback order, so we continue to forward them as received (matching OpenAI's reference implementation). The fix removes spurious warnings, ensures truncation always targets the latest audio item, and emits a single bracketing `TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is now pushed on `response.done`).

View File

@@ -1 +0,0 @@
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.

View File

@@ -1 +0,0 @@
- Changed the default model for `OpenAIRealtimeLLMService` from `gpt-realtime-1.5` to `gpt-realtime-2`.

View File

@@ -1 +0,0 @@
- Added `wait_for_transcript_to_end_user_turn` on `LLMUserAggregatorParams` for pipelines where local turn detection drives a realtime service like Gemini Live. Set it to False to avoid unnecessary latency from transcript delay — the realtime service consumes user audio directly, so we don't need user transcripts in context before it can respond. The option makes it so that (1) turn strategies do not consider user transcripts, letting the user turn end sooner, and (2) user transcripts are then handled by the aggregator: a simple timer gives it time to gather those transcripts after the user turn ends, and once gathered, the aggregator emits a new `on_user_turn_message_finalized` event with the new user context message. The new event also fires in the default mode (coinciding with `on_user_turn_stopped`), so consumers that want the populated user transcript can subscribe to it uniformly. See `examples/realtime/realtime-gemini-live-local-vad.py` for the full pattern.

View File

@@ -71,8 +71,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = QwenLLMService(
api_key=os.environ["QWEN_API_KEY"],
model="qwen2.5-72b-instruct",
settings=QwenLLMService.Settings(
model="qwen2.5-72b-instruct",
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)

View File

@@ -20,7 +20,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserMessageFinalizedMessage,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
@@ -71,25 +70,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
},
],
)
# `wait_for_transcript_to_end_user_turn=False` is the right setting
# for pipelines like this one — local turn detection driving a
# realtime service. It avoids unnecessary latency from transcript
# delay: the realtime service consumes user audio directly, so
# we don't need user transcripts in context before it can respond.
# With this option:
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
wait_for_transcript_to_end_user_turn=False,
),
)
@@ -123,23 +107,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage
):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")

View File

@@ -28,10 +28,14 @@ Usage:
"""
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.transcription_log_observer import (
TranscriptionLogObserver,
@@ -48,6 +52,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -55,6 +60,43 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
# --- Transport Configuration ---
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
@@ -85,7 +127,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# See: https://docs.inworld.ai/router/introduction
llm = InworldRealtimeLLMService(
api_key=os.environ["INWORLD_API_KEY"],
llm_model="xai/grok-4-1-fast-non-reasoning",
llm_model="openai/gpt-4.1-mini",
voice="Sarah",
settings=InworldRealtimeLLMService.Settings(
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
@@ -97,9 +139,14 @@ Always be helpful and proactive in offering assistance.""",
),
)
# Create context with initial message
# Note: function calling requires a paid Inworld account and a
# function-calling-capable model
llm.register_function("get_current_weather", fetch_weather_from_api)
# Create context with initial message + tools
context = LLMContext(
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

View File

@@ -1,182 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserMessageFinalizedMessage,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# `turn_detection=False` disables OpenAI Realtime's server-side VAD,
# so this pipeline's local turn detection drives turn boundaries.
# The service then sends `input_audio_buffer.commit` +
# `response.create` when it sees `UserStoppedSpeakingFrame`.
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=False,
),
),
),
),
)
context = LLMContext(
[
{
"role": "developer",
"content": "Say hello. Then ask if I want to hear a joke.",
},
],
)
# `wait_for_transcript_to_end_user_turn=False` is the right setting
# for pipelines like this one — local turn detection driving a
# realtime service. It avoids unnecessary latency from transcript
# delay: the realtime service consumes user audio directly, so
# we don't need user transcripts in context before it can respond.
# With this option:
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
wait_for_transcript_to_end_user_turn=False,
),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage
):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -58,6 +58,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Add strict mode to enforce the language hints
language_hints=[Language.EN],
language_hints_strict=True,
enable_language_identification=True,
),
)

View File

@@ -55,7 +55,6 @@ from pipecat.frames.frames import (
LLMThoughtStartFrame,
LLMThoughtTextFrame,
StartFrame,
STTMetadataFrame,
TextFrame,
TranscriptionFrame,
TranslationFrame,
@@ -81,7 +80,6 @@ from pipecat.processors.aggregators.llm_context_summarizer import (
SummaryAppliedEvent,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
@@ -129,25 +127,6 @@ class LLMUserAggregatorParams:
has been idle (not speaking) for this duration. Set to 0 to disable
idle detection.
vad_analyzer: Voice Activity Detection analyzer instance.
wait_for_transcript_to_end_user_turn: Defaults to True. Set to
False for pipelines where local turn detection drives a
realtime service like Gemini Live. The realtime service
consumes user audio directly, so we don't need user
transcripts in context before it can respond, and waiting
for them is pure latency. When False:
- Turn strategies do not consider user transcripts, so the
user turn ends sooner. ``on_user_turn_stopped`` fires at
the end of turn with empty content. To achieve this,
the aggregator drops ``TranscriptionUserTurnStartStrategy``
from start strategies and flips
``wait_for_transcript=False`` on any stop strategy that
supports it.
- User transcripts are handled by the aggregator: a simple
post-turn transcript wait gives it time to receive them
after the user turn ends, then the aggregator emits a
new ``on_user_turn_message_finalized`` event with the
new user context message.
filter_incomplete_user_turns: [DEPRECATED] Use
``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
instead. When enabled, the LLM outputs a turn-completion
@@ -178,7 +157,6 @@ class LLMUserAggregatorParams:
user_turn_stop_timeout: float = 5.0
user_idle_timeout: float = 0
vad_analyzer: VADAnalyzer | None = None
wait_for_transcript_to_end_user_turn: bool = True
filter_incomplete_user_turns: bool = False
user_turn_completion_config: UserTurnCompletionConfig | None = None
@@ -281,43 +259,13 @@ class LLMAssistantAggregatorParams:
@dataclass
class UserTurnStoppedMessage:
"""A message accompanying ``on_user_turn_stopped`` (end of user turn).
"""A user turn stopped message containing a user transcript update.
With ``wait_for_transcript_to_end_user_turn=True`` (the default),
the user message is finalized at the end of the turn, so
``content`` carries the aggregated transcript. With it set to
False, the aggregator is still in its post-turn transcript wait
at this point, so ``content`` is ``None`` — subscribe to
``on_user_turn_message_finalized`` for the assembled message.
A message in a conversation transcript containing the user content. This is
the aggregated transcript that is then used in the context.
Parameters:
content: The aggregated user transcript, or ``None`` when
``wait_for_transcript_to_end_user_turn=False`` (the
aggregator is still in its post-turn transcript wait at
this point).
timestamp: When the user turn started.
user_id: Optional identifier for the user.
"""
content: str | None
timestamp: str
user_id: str | None = None
@dataclass
class UserMessageFinalizedMessage:
"""A message accompanying ``on_user_turn_message_finalized``.
Fired when the user message has been finalized into the context.
With ``wait_for_transcript_to_end_user_turn=True`` (the default)
this coincides with ``on_user_turn_stopped``. With it set to
False, the aggregator first runs a post-turn transcript wait, so
this event fires later than ``on_user_turn_stopped``.
``content`` is always populated.
Parameters:
content: The aggregated user transcript.
content: The message content/text.
timestamp: When the user turn started.
user_id: Optional identifier for the user.
@@ -578,21 +526,8 @@ class LLMUserAggregator(LLMContextAggregator):
Event handlers available:
- on_user_turn_started: Called when the user turn starts.
- on_user_turn_stopped: Called at the end of turn, with a
``UserTurnStoppedMessage``. With
``wait_for_transcript_to_end_user_turn=True`` (the default),
``message.content`` carries the aggregated transcript. With it
set to False, the aggregator is still in its post-turn transcript
wait at this point, so ``message.content`` is ``None``; subscribe
to ``on_user_turn_message_finalized`` for the assembled message.
- on_user_turn_message_finalized: Called when the user message
has been finalized into the context, with a
``UserMessageFinalizedMessage``. With
``wait_for_transcript_to_end_user_turn=True`` this coincides
with ``on_user_turn_stopped``; with it set to False it fires
later, after the aggregator's post-turn transcript wait window
completes. ``message.content`` is always populated.
- on_user_turn_started: Called when the user turn starts
- on_user_turn_stopped: Called when the user turn ends
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
- on_user_turn_idle: Called when the user has been idle for the configured timeout
- on_user_mute_started: Called when the user becomes muted
@@ -608,10 +543,6 @@ class LLMUserAggregator(LLMContextAggregator):
async def on_user_turn_stopped(aggregator, strategy: BaseUserTurnStopStrategy, message: UserTurnStoppedMessage):
...
@aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(aggregator, strategy: BaseUserTurnStopStrategy, message: UserMessageFinalizedMessage):
...
@aggregator.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(aggregator):
...
@@ -655,14 +586,12 @@ class LLMUserAggregator(LLMContextAggregator):
self._register_event_handler("on_user_turn_started")
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_message_finalized")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._register_event_handler("on_user_turn_inference_triggered")
self._register_event_handler("on_user_mute_started")
self._register_event_handler("on_user_mute_stopped")
user_provided_strategies = self._params.user_turn_strategies is not None
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
# Deprecated path: translate filter_incomplete_user_turns into
@@ -676,17 +605,6 @@ class LLMUserAggregator(LLMContextAggregator):
)
self._params.user_turn_strategies = user_turn_strategies
# When `wait_for_transcript_to_end_user_turn=False`, mutate the
# user turn strategies so they don't consider user transcripts:
# drop the transcription start strategy, flip
# `wait_for_transcript=False` on stop strategies that support
# it. Loud log if the user passed their own strategies (we're
# overwriting parts of their config); quiet log otherwise.
if not self._params.wait_for_transcript_to_end_user_turn:
self._apply_no_transcript_wait_bundle(
user_turn_strategies, user_provided_strategies=user_provided_strategies
)
self._user_is_muted = False
self._user_turn_start_timestamp = ""
# Full transcript across the user turn. Each
@@ -698,20 +616,6 @@ class LLMUserAggregator(LLMContextAggregator):
# inferences fire before finalization.
self._full_user_turn_aggregation: str | None = None
# Post-turn transcript wait state, used when the aggregator
# waits for transcripts after the user turn ends
# (`_wait_for_post_turn_transcripts == True`):
# `on_user_turn_stopped` has fired with empty content, and the
# aggregator is waiting on `_post_turn_transcript_wait_task`
# before finalizing the user message into context. The wait
# window duration is taken from the last `STTMetadataFrame`
# seen (`STTMetadataFrame.ttfs_p99_latency`), falling back to
# `DEFAULT_TTFS_P99` if no STT service has reported one.
self._post_turn_transcript_wait_strategy: BaseUserTurnStopStrategy | None = None
self._inference_during_post_turn_transcript_wait: bool = False
self._post_turn_transcript_wait_task: asyncio.Task | None = None
self._stt_ttfs_p99_latency: float | None = None
self._user_turn_controller = UserTurnController(
user_turn_strategies=user_turn_strategies,
user_turn_stop_timeout=self._params.user_turn_stop_timeout,
@@ -754,81 +658,6 @@ class LLMUserAggregator(LLMContextAggregator):
self._vad_controller.add_event_handler("on_push_frame", self._on_push_frame)
self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
@property
def _wait_for_post_turn_transcripts(self) -> bool:
"""True when the aggregator runs a post-turn transcript wait.
Inverse of the public ``wait_for_transcript_to_end_user_turn``
param: when that's False, this is True. In this mode, turn
strategies don't consider user transcripts (so the user turn
ends sooner), and the aggregator runs a simple timer after the
end of turn to receive any transcripts that arrive, then emits
``on_user_turn_message_finalized`` with the assembled user
context message. Always travels with the strategy-mutation
bundle applied at init.
"""
return not self._params.wait_for_transcript_to_end_user_turn
def _apply_no_transcript_wait_bundle(
self,
user_turn_strategies: UserTurnStrategies,
*,
user_provided_strategies: bool,
):
"""Adjust strategies to match ``wait_for_transcript_to_end_user_turn=False``.
Mutates the user turn strategies so they don't consider user
transcripts: drops ``TranscriptionUserTurnStartStrategy`` from
start strategies (so late-arriving transcripts don't start
new turns), and sets ``wait_for_transcript=False`` on any
stop strategy that supports it. The net effect: the user turn
ends sooner.
Logs loudly when adjusting user-provided strategies — we're
mutating objects the caller passed in. Logs quietly when only
synthesized defaults are in play.
"""
# Local import to avoid a top-level cycle with `turns.user_start`.
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy
adjustments: list[str] = []
if user_turn_strategies.start:
filtered = [
s
for s in user_turn_strategies.start
if not isinstance(s, TranscriptionUserTurnStartStrategy)
]
dropped = len(user_turn_strategies.start) - len(filtered)
if dropped:
user_turn_strategies.start = filtered
adjustments.append(
f"dropped {dropped} TranscriptionUserTurnStartStrategy from start strategies"
)
flipped = 0
for s in user_turn_strategies.stop or []:
if hasattr(s, "_wait_for_transcript") and s._wait_for_transcript:
s._wait_for_transcript = False
flipped += 1
if flipped:
adjustments.append(
f"set wait_for_transcript=False on {flipped} stop "
f"strateg{'y' if flipped == 1 else 'ies'}"
)
if not adjustments:
return
message = (
f"{self}: wait_for_transcript_to_end_user_turn=False adjusted "
f"user turn strategies: {'; '.join(adjustments)}."
)
if user_provided_strategies:
logger.warning(message)
else:
logger.info(message)
async def cleanup(self):
"""Clean up processor resources."""
await super().cleanup()
@@ -868,13 +697,6 @@ class LLMUserAggregator(LLMContextAggregator):
# Interim transcriptions and translations are consumed here
# and not pushed downstream, same as final TranscriptionFrame.
pass
elif isinstance(frame, STTMetadataFrame):
# Record the STT service's reported P99 TTFS so the
# post-turn transcript wait timer can size itself to the real
# latency. Frame is also pushed downstream so other
# processors keep seeing it.
self._stt_ttfs_p99_latency = frame.ttfs_p99_latency
await self.push_frame(frame, direction)
elif isinstance(frame, LLMRunFrame):
await self._handle_llm_run(frame)
elif isinstance(frame, LLMMessagesAppendFrame):
@@ -925,31 +747,13 @@ class LLMUserAggregator(LLMContextAggregator):
await s.setup(self.task_manager)
async def _stop(self, frame: EndFrame):
await self._finalize_on_session_end()
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
async def _cancel(self, frame: CancelFrame):
await self._finalize_on_session_end()
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
async def _finalize_on_session_end(self):
"""Flush any pending user message on session end.
If a post-turn transcript wait is in flight, complete it now so
the user message is captured before the session shuts down.
Otherwise, run the mode-appropriate finalize path on whatever
is currently in the buffer.
"""
if (
self._post_turn_transcript_wait_strategy is not None
or self._inference_during_post_turn_transcript_wait
):
await self._complete_post_turn_transcript_wait(on_session_end=True)
elif self._wait_for_post_turn_transcripts:
await self._finalize_user_message(on_session_end=True)
else:
await self._finalize_user_turn(on_session_end=True)
async def _cleanup(self):
if self._vad_controller:
await self._vad_controller.cleanup()
@@ -1080,21 +884,6 @@ class LLMUserAggregator(LLMContextAggregator):
):
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
# Precondition guard: if the previous turn's post-turn
# transcript wait is still active when the next turn starts,
# the assumption that transcripts arrive before the next turn
# has been violated. Complete the previous turn's wait now so
# its user message is finalized before this turn proceeds.
if (
self._post_turn_transcript_wait_strategy is not None
or self._inference_during_post_turn_transcript_wait
):
logger.warning(
f"{self}: user turn started before previous turn's transcripts "
f"arrived; flushing previous turn now"
)
await self._complete_post_turn_transcript_wait()
self._user_turn_start_timestamp = time_now_iso8601()
self._full_user_turn_aggregation = None
@@ -1115,14 +904,6 @@ class LLMUserAggregator(LLMContextAggregator):
):
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
if self._wait_for_post_turn_transcripts:
# The aggregator is in its post-turn transcript wait.
# Defer push_aggregation and event emission; they'll run
# alongside user message finalization when the wait window
# completes.
self._inference_during_post_turn_transcript_wait = True
return
# Push aggregation now: this writes the user message segment to
# the context and emits LLMContextFrame, which kicks LLM
# inference. Concatenate the segment into
@@ -1148,144 +929,42 @@ class LLMUserAggregator(LLMContextAggregator):
):
logger.debug(f"{self}: User stopped speaking (strategy: {strategy})")
# End-of-turn side effects always fire on the strategy event,
# regardless of whether user message finalization is deferred
# to a post-turn transcript wait window.
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
if self._wait_for_post_turn_transcripts:
# Fire `on_user_turn_stopped` now for the end of turn —
# content is `None` because no transcripts have arrived
# yet. Start the post-turn transcript wait timer; when it
# completes, the aggregator finalizes the user message and
# emits `on_user_turn_message_finalized`. Consumers wanting
# the assembled message subscribe to
# `on_user_turn_message_finalized`.
end_of_turn_message = UserTurnStoppedMessage(
content=None, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, end_of_turn_message)
self._post_turn_transcript_wait_strategy = strategy
wait_timeout = (
self._stt_ttfs_p99_latency
if self._stt_ttfs_p99_latency is not None
else DEFAULT_TTFS_P99
)
self._post_turn_transcript_wait_task = self.create_task(
self._post_turn_transcript_wait_handler(wait_timeout),
f"{self}::post_turn_transcript_wait",
)
return
await self._finalize_user_turn(strategy)
async def _post_turn_transcript_wait_handler(self, timeout: float):
"""Post-turn transcript wait timer.
Waits ``timeout`` seconds — giving transcripts time to arrive
after the end of turn — then completes the wait and finalizes
the user message into context, with whatever transcripts the
aggregator has received by then (possibly none).
The simple-timer approach relies on the assumptions that
transcripts don't arrive too late and that the assistant
response won't finish before this timer.
Cancelled by reset, the next-turn precondition guard, or
session end.
"""
try:
await asyncio.sleep(timeout)
except asyncio.CancelledError:
return
finally:
self._post_turn_transcript_wait_task = None
await self._complete_post_turn_transcript_wait()
async def _complete_post_turn_transcript_wait(self, *, on_session_end: bool = False):
"""Complete the active post-turn transcript wait window.
``on_user_turn_stopped`` already fired at the end of turn (with
empty content) and the aggregator has been receiving
transcripts since. This finalizes that work: flushes any
inference-triggered segment whose push was deferred during the
wait, then emits ``on_user_turn_message_finalized`` with the
assembled user context message. Called from the post-turn
transcript wait timer (the normal path), the precondition guard
in ``_on_user_turn_started``, and the session-end paths.
"""
if self._post_turn_transcript_wait_task:
await self.cancel_task(self._post_turn_transcript_wait_task)
self._post_turn_transcript_wait_task = None
wait_strategy = self._post_turn_transcript_wait_strategy
had_pending_inference = self._inference_during_post_turn_transcript_wait
self._post_turn_transcript_wait_strategy = None
self._inference_during_post_turn_transcript_wait = False
if had_pending_inference:
segment = await self.push_aggregation()
if segment:
if self._full_user_turn_aggregation:
self._full_user_turn_aggregation = (
f"{self._full_user_turn_aggregation} {segment}".strip()
)
else:
self._full_user_turn_aggregation = segment
await self._call_event_handler("on_user_turn_inference_triggered", wait_strategy)
if wait_strategy is not None or on_session_end:
# `on_user_turn_stopped` already fired at the end of turn;
# this is the deferred user message finalization.
await self._finalize_user_message(wait_strategy, on_session_end=on_session_end)
await self._maybe_emit_user_turn_stopped(strategy)
async def _on_reset_aggregation(
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
):
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
await self._cancel_post_turn_transcript_wait()
await self.reset()
async def _cancel_post_turn_transcript_wait(self):
"""Cancel any active post-turn transcript wait window without finalizing.
Called from reset paths (interruption, explicit reset).
"Reset" means "throw it away" — we don't flush a partial
transcript that was about to be invalidated anyway.
"""
if self._post_turn_transcript_wait_task:
await self.cancel_task(self._post_turn_transcript_wait_task)
self._post_turn_transcript_wait_task = None
self._post_turn_transcript_wait_strategy = None
self._inference_during_post_turn_transcript_wait = False
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")
async def _on_user_turn_idle(self, controller):
await self._call_event_handler("on_user_turn_idle")
async def _flush_user_message_to_context(
self, on_session_end: bool = False
) -> tuple[str, str] | None:
"""Push the aggregated user message to context, return ``(content, timestamp)``.
async def _maybe_emit_user_turn_stopped(
self,
strategy: BaseUserTurnStopStrategy | None = None,
on_session_end: bool = False,
):
"""Maybe emit user turn stopped event.
Earlier inference triggers in the same turn already pushed their
segments to the context and accumulated them in
``self._full_user_turn_aggregation``; whatever arrived after the
last inference trigger is flushed here so end-of-turn content is
never lost.
Earlier inference triggers in the same turn have already pushed
their segments to the context and accumulated them into
``self._full_user_turn_aggregation``. Any aggregation that
arrived after the last inference trigger is flushed here so
end-of-turn content is never lost from the public event.
Returns ``(content, timestamp)`` for the just-finalized user
message, or ``None`` when there's no content to flush and
``on_session_end=True`` (avoids emitting empty events during
session shutdown). Callers construct the appropriate message
dataclass for each event they emit.
Args:
strategy: The strategy that triggered the turn stop.
on_session_end: If True, only emit if there's unemitted content
(avoids duplicate events when session ends).
"""
segment = await self.push_aggregation()
full_aggregation = self._full_user_turn_aggregation
@@ -1296,53 +975,12 @@ class LLMUserAggregator(LLMContextAggregator):
else:
content = full_aggregation or segment
if on_session_end and not content:
return None
timestamp = self._user_turn_start_timestamp
self._user_turn_start_timestamp = ""
return content, timestamp
async def _finalize_user_turn(
self,
strategy: BaseUserTurnStopStrategy | None = None,
on_session_end: bool = False,
):
"""Finalize the user turn: flush the message, emit both events.
Used in the default mode (``_wait_for_post_turn_transcripts ==
False``), where end of turn and user message finalization
coincide. Emits both ``on_user_turn_stopped`` and
``on_user_turn_message_finalized``.
"""
result = await self._flush_user_message_to_context(on_session_end=on_session_end)
if result is None:
return
content, timestamp = result
stopped_msg = UserTurnStoppedMessage(content=content, timestamp=timestamp)
finalized_msg = UserMessageFinalizedMessage(content=content, timestamp=timestamp)
await self._call_event_handler("on_user_turn_stopped", strategy, stopped_msg)
await self._call_event_handler("on_user_turn_message_finalized", strategy, finalized_msg)
async def _finalize_user_message(
self,
strategy: BaseUserTurnStopStrategy | None = None,
on_session_end: bool = False,
):
"""Finalize the user message: flush to context, emit one event.
Used when the aggregator runs a post-turn transcript wait
(``_wait_for_post_turn_transcripts == True``), where user
message finalization fires after the end of turn. Emits
``on_user_turn_message_finalized`` only; ``on_user_turn_stopped``
was already emitted at the end of turn.
"""
result = await self._flush_user_message_to_context(on_session_end=on_session_end)
if result is None:
return
content, timestamp = result
finalized_msg = UserMessageFinalizedMessage(content=content, timestamp=timestamp)
await self._call_event_handler("on_user_turn_message_finalized", strategy, finalized_msg)
if not on_session_end or content:
message = UserTurnStoppedMessage(
content=content, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, message)
self._user_turn_start_timestamp = ""
class LLMAssistantAggregator(LLMContextAggregator):

View File

@@ -419,30 +419,30 @@ class CartesiaTTSService(WebsocketTTSService):
"""Convenience method to create a speed tag."""
return f'<speed ratio="{speed}" />'
def _is_cjk_language(self, language: str) -> bool:
"""Check if the given language is CJK (Chinese, Japanese, Korean).
def _is_chinese_or_japanese_language(self, language: str) -> bool:
"""Check if the given language is Chinese or Japanese.
Args:
language: The language code to check.
Returns:
True if the language is Chinese, Japanese, or Korean.
True if the language is Chinese or Japanese.
"""
cjk_languages = {"zh", "ja", "ko"}
base_lang = language.split("-")[0].lower()
return base_lang in cjk_languages
return base_lang in {"zh", "ja"}
def _process_word_timestamps_for_language(
self, words: list[str], starts: list[float]
) -> list[tuple[str, float]]:
"""Process word timestamps based on the current language.
For CJK languages, Cartesia groups related characters in the same timestamp message.
For Chinese and Japanese, Cartesia groups related characters in the same timestamp
message.
For example, in Japanese a single message might be `['', '', '', '', '', '']`.
We combine these into single words so the downstream aggregator can add natural
spacing between meaningful units rather than individual characters.
For non-CJK languages, words are already properly separated and are used as-is.
For other languages, words are already properly separated and are used as-is.
Args:
words: List of words/characters from Cartesia.
@@ -453,10 +453,10 @@ class CartesiaTTSService(WebsocketTTSService):
"""
current_language = assert_given(self._settings.language)
# Check if this is a CJK language (if language is None, treat as non-CJK)
if current_language and self._is_cjk_language(current_language):
# For CJK languages, combine all characters in this message into one word
# using the first character's start time
# Check if this is a Chinese/Japanese language (if language is None, treat as other)
if current_language and self._is_chinese_or_japanese_language(current_language):
# For Chinese/Japanese, combine all characters in this message into one word
# using the first character's start time.
if words and starts:
combined_word = "".join(words)
first_start = starts[0]
@@ -467,6 +467,11 @@ class CartesiaTTSService(WebsocketTTSService):
# For non-CJK languages, use as-is
return list(zip(words, starts))
def _word_timestamps_include_inter_frame_spaces(self) -> bool:
"""Whether timestamp text should be treated as carrying its own spacing."""
current_language = assert_given(self._settings.language)
return bool(current_language and self._is_chinese_or_japanese_language(current_language))
def _build_msg(
self,
text: str = "",
@@ -660,7 +665,13 @@ class CartesiaTTSService(WebsocketTTSService):
processed_timestamps = self._process_word_timestamps_for_language(
msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]
)
await self.add_word_timestamps(processed_timestamps, ctx_id)
await self.add_word_timestamps(
processed_timestamps,
ctx_id,
includes_inter_frame_spaces=(
True if self._word_timestamps_include_inter_frame_spaces() else None
),
)
elif msg["type"] == "chunk":
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["data"]),

View File

@@ -48,7 +48,8 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import (
@@ -361,6 +362,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
self._messages_added_manually = {}
self._pending_function_calls = {}
self._completed_tool_calls = set()
self._async_tool_warning_logged: bool = False
self._register_event_handler("on_conversation_item_created")
self._register_event_handler("on_conversation_item_updated")
@@ -629,6 +631,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
self._receive_task = None
self._completed_tool_calls = set()
self._async_tool_warning_logged = False
self._audio_buffer = b""
self._interim_transcription_text = ""
self._disconnecting = False
@@ -1000,9 +1003,80 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
async def _process_completed_function_calls(self, send_new_results: bool):
"""Process completed function calls and send results to the service."""
# If the user registered a function with cancel_on_interruption=False,
# the aggregator emits async-tool-style messages into the context. As
# of this writing, Inworld Realtime doesn't appear to handle the
# resulting delayed tool result reliably — the routing code below
# is best-effort. Surface a one-time warning so users see they're not
# getting what they expect.
if not self._async_tool_warning_logged:
for message in self._context.get_messages():
if isinstance(message, LLMSpecificMessage):
continue
if async_tool_messages.parse_message(message) is not None:
logger.error(
f"{self}: cancel_on_interruption=False is not reliably "
f"supported by Inworld Realtime as of this writing. "
f"Use cancel_on_interruption=True (the default), or "
f"consider another LLM service if your tool needs the "
f"async semantics."
)
await self.push_error(
error_msg=(
"cancel_on_interruption=False is not reliably supported "
"by Inworld Realtime as of this writing."
),
)
self._async_tool_warning_logged = True
break
sent_new_result = False
for message in self._context.get_messages():
# LLMSpecificMessages are opaque provider-specific payloads, not
# standard tool-result messages — skip them.
if isinstance(message, LLMSpecificMessage):
continue
# Async-tool messages live alongside regular tool messages in the
# context; detect and route them before the regular logic so we
# don't try to send the async-tool envelope JSON as a tool result.
async_payload = async_tool_messages.parse_message(message)
if async_payload is not None:
if async_payload.tool_call_id in self._completed_tool_calls:
continue
if async_payload.kind == "started":
# The provider already issued the tool call and natively
# awaits a result; nothing to send for the started marker.
continue
if async_payload.kind == "intermediate":
logger.error(
f"{self}: Inworld Realtime does not support streamed async "
f"tool results; dropping intermediate result for "
f"tool_call_id={async_payload.tool_call_id}. Consider "
f"another LLM service if your tool needs to stream "
f"intermediate results."
)
await self.push_error(
error_msg="Inworld Realtime does not support streamed async tool results.",
)
continue
if async_payload.kind == "final":
# Deliver via the formal tool-result channel — same path
# as a synchronous tool result, just delayed.
if send_new_results:
sent_new_result = True
await self._send_tool_result(
async_payload.tool_call_id, async_payload.result
)
self._completed_tool_calls.add(async_payload.tool_call_id)
continue
# Defensive: any async-tool message must not fall through
# to the regular tool-result block below, even if it
# carries a kind we don't recognize.
continue
# Look for newly-completed "regular" (as opposed to async-tool) results
if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS":
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
@@ -1011,6 +1085,8 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
# If we reported any new tool call results to the service, trigger
# another response
if sent_new_result:
await self._create_response()

View File

@@ -60,9 +60,40 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
def language_to_inworld_language(language: Language) -> str:
"""Convert a Language enum to an Inworld TTS BCP-47 language tag.
Args:
language: The Language enum value to convert.
Returns:
The corresponding Inworld BCP-47 language tag (e.g. ``"en-US"``).
Unverified languages fall back to their BCP-47 string value with a warning.
"""
LANGUAGE_MAP = {
Language.AR: "ar-SA",
Language.DE: "de-DE",
Language.EN: "en-US",
Language.ES: "es-ES",
Language.FR: "fr-FR",
Language.HE: "he-IL",
Language.HI: "hi-IN",
Language.IT: "it-IT",
Language.JA: "ja-JP",
Language.KO: "ko-KR",
Language.NL: "nl-NL",
Language.PL: "pl-PL",
Language.PT: "pt-BR",
Language.RU: "ru-RU",
Language.ZH: "zh-CN",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=False)
@dataclass
class InworldTTSSettings(TTSSettings):
"""Settings for InworldTTSService and InworldHttpTTSService.
@@ -70,10 +101,18 @@ class InworldTTSSettings(TTSSettings):
Parameters:
speaking_rate: Speaking rate for speech synthesis.
temperature: Temperature for speech synthesis.
delivery_mode: Controls the stability vs. creativity tradeoff.
``"STABLE"`` produces reliable, predictable speech.
``"BALANCED"`` is the default midpoint.
``"CREATIVE"`` produces more expressive, emotionally varied speech.
Only supported by ``inworld-tts-2``.
"""
speaking_rate: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
temperature: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
delivery_mode: Literal["STABLE", "BALANCED", "CREATIVE"] | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
_aliases: ClassVar[dict[str, str]] = {
"voiceId": "voice",
@@ -167,6 +206,7 @@ class InworldHttpTTSService(TTSService):
language=None,
speaking_rate=None,
temperature=None,
delivery_mode=None,
)
# 2. Apply direct init arg overrides (deprecated)
@@ -227,6 +267,17 @@ class InworldHttpTTSService(TTSService):
"""
return True
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to Inworld language format.
Args:
language: The language to convert.
Returns:
The Inworld-specific BCP-47 language code, or None if not supported.
"""
return language_to_inworld_language(language)
async def start(self, frame: StartFrame):
"""Start the Inworld TTS service.
@@ -313,6 +364,10 @@ class InworldHttpTTSService(TTSService):
if self._settings.temperature is not None:
payload["temperature"] = self._settings.temperature
if self._settings.delivery_mode is not None:
payload["deliveryMode"] = self._settings.delivery_mode
if self._settings.language is not None:
payload["language"] = self._settings.language
# Use WORD timestamps for simplicity and correct spacing/capitalization
payload["timestampType"] = self._timestamp_type
@@ -609,6 +664,7 @@ class InworldTTSService(WebsocketTTSService):
language=None,
speaking_rate=None,
temperature=None,
delivery_mode=None,
)
# 2. Apply direct init arg overrides (deprecated)
@@ -700,6 +756,17 @@ class InworldTTSService(WebsocketTTSService):
"""
return True
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to Inworld language format.
Args:
language: The language to convert.
Returns:
The Inworld-specific BCP-47 language code, or None if not supported.
"""
return language_to_inworld_language(language)
async def start(self, frame: StartFrame):
"""Start the Inworld WebSocket TTS service.
@@ -1089,6 +1156,10 @@ class InworldTTSService(WebsocketTTSService):
if self._settings.temperature is not None:
create_config["temperature"] = self._settings.temperature
if self._settings.delivery_mode is not None:
create_config["deliveryMode"] = self._settings.delivery_mode
if self._settings.language is not None:
create_config["language"] = self._settings.language
if self._apply_text_normalization is not None:
create_config["applyTextNormalization"] = self._apply_text_normalization
if self._auto_mode is not None:

View File

@@ -280,6 +280,8 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
self._client: SageMakerBidiClient | None = None
self._receive_task = None
self._speech_completed_event = asyncio.Event()
self._audio_buffer = b""
self._playback_started = False
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -377,7 +379,12 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
logger.info(f"{self}: verifying if websocket connection is active {active}")
return active
def _reset_audio_buffer(self):
self._audio_buffer = b""
self._playback_started = False
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
self._reset_audio_buffer()
if self._bot_speaking and self._client:
logger.debug(
f"{self}: interruption detected, sending input_text.done and waiting for speech.completed"
@@ -391,6 +398,30 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
logger.warning(f"{self}: timed out waiting for conversation.item.speech.completed")
await super()._handle_interruption(frame, direction)
async def _handle_audio_chunk(self, audio: bytes, context_id: str | None = None):
"""Buffer audio and emit frames using a jitter-buffer approach.
Holds back audio until chunk_size bytes have been accumulated (to avoid
glitches at the start of playback), then emits each subsequent chunk
immediately as it arrives.
"""
self._audio_buffer += audio
if not self._playback_started:
if len(self._audio_buffer) < self.chunk_size:
return
self._playback_started = True
await self.push_frame(
TTSAudioRawFrame(
audio=self._audio_buffer,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
)
self._audio_buffer = b""
async def _receive_messages(self):
"""Receive NIM JSON events and push audio frames."""
while self._client and self._client.is_active and not self._disconnecting:
@@ -415,14 +446,7 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
msg = json.loads(payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
# Unexpected binary frame — treat as raw PCM
await self.push_frame(
TTSAudioRawFrame(
audio=payload,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
)
await self._handle_audio_chunk(payload, context_id)
continue
event_type = msg.get("type", "")
@@ -434,14 +458,7 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
chunk_b64 = msg.get("audio", "")
if chunk_b64:
await self.stop_ttfb_metrics()
await self.push_frame(
TTSAudioRawFrame(
audio=base64.b64decode(chunk_b64),
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
)
await self._handle_audio_chunk(base64.b64decode(chunk_b64), context_id)
elif event_type == "error":
await self.push_error(error_msg=f"NIM error: {msg.get('message', msg)}")
# In case of error we need to reconnect, otherwise we are not going to receive audio from the TTS service anymore

View File

@@ -8,6 +8,7 @@
import json
import time
from collections import Counter
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Any
@@ -201,6 +202,24 @@ def _prepare_language_hints(
return list(set(prepared_languages))
def _language_from_tokens(tokens: list[dict]) -> Language | None:
language_counts: Counter[Language] = Counter()
for token in tokens:
language = token.get("language")
if not language:
continue
try:
language_counts[Language(language)] += 1
except ValueError:
pass
if not language_counts:
return None
return language_counts.most_common(1)[0][0]
@dataclass
class SonioxSTTSettings(STTSettings):
"""Settings for SonioxSTTService.
@@ -557,6 +576,7 @@ class SonioxSTTService(WebsocketSTTService):
async def send_endpoint_transcript():
if self._final_transcription_buffer:
text = "".join(map(lambda token: token["text"], self._final_transcription_buffer))
language = _language_from_tokens(self._final_transcription_buffer)
# Soniox only pushes TranscriptionFrame when an end token is received,
# so every TranscriptionFrame is inherently finalized
await self.push_frame(
@@ -564,11 +584,12 @@ class SonioxSTTService(WebsocketSTTService):
text=text,
user_id=self._user_id,
timestamp=time_now_iso8601(),
language=language,
result=self._final_transcription_buffer,
finalized=True,
)
)
await self._handle_transcription(text, is_final=True)
await self._handle_transcription(text, is_final=True, language=language)
await self.stop_processing_metrics()
self._final_transcription_buffer = []

View File

@@ -1283,10 +1283,17 @@ class TTSService(AIService):
def get_active_audio_context_id(self) -> str | None:
"""Get the active audio context ID.
Returns the playback cursor when set (during active playback), falling
back to the current turn's synthesis context_id. The fallback covers
the gap between contexts and the start of a turn before the playback
task has popped the just-created context off the serialization queue —
important for services whose wire protocol does not echo context_id
back on incoming audio.
Returns:
The active context ID, or None if no context is active.
The active context ID, or None if neither cursor is set.
"""
return self._playing_context_id
return self._playing_context_id or self._turn_context_id
async def remove_active_audio_context(self):
"""Remove the active audio context."""

View File

@@ -43,42 +43,18 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
(rearmed on each transcript). stt_timeout has no meaning here since it
is defined relative to VAD stop, and STT has already emitted a
transcript — so the stt wait is marked done immediately.
Set ``wait_for_transcript=False`` to make this strategy not consider
user transcripts, so the user turn ends sooner — as soon as the
user_speech_timeout elapses. Most callers don't set this directly:
it's flipped automatically by
``wait_for_transcript_to_end_user_turn=False`` on
``LLMUserAggregatorParams``, which also wires the aggregator to
gather user transcripts after the turn ends. That pattern fits
pipelines where local turn detection drives a realtime service like
Gemini Live — the realtime service consumes user audio directly,
so user transcripts don't need to be in context before it can
respond.
"""
def __init__(
self,
*,
user_speech_timeout: float = 0.6,
wait_for_transcript: bool = True,
**kwargs,
):
def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs):
"""Initialize the speech timeout-based user turn stop strategy.
Args:
user_speech_timeout: Time to wait for the user to potentially
say more after they pause speaking. Defaults to 0.6 seconds.
wait_for_transcript: Whether the strategy considers user
transcripts in deciding when the user turn ends.
Defaults to True. Usually flipped indirectly via
``wait_for_transcript_to_end_user_turn=False`` on
``LLMUserAggregatorParams``.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self._user_speech_timeout = user_speech_timeout
self._wait_for_transcript = wait_for_transcript
self._stt_timeout: float = 0.0 # STT P99 latency from STTMetadataFrame
self._stop_secs: float = 0.0 # VAD stop_secs from VADUserStoppedSpeakingFrame
self._stop_secs_warned: bool = False
@@ -182,12 +158,11 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
# fallback-mode run of the same timer is superseded here.
await self._restart_user_speech_timer()
# stt_timeout is a safety net. Short-circuit it if we're not waiting
# for a transcript, if the transcript is already finalized, or if the
# VAD stop_secs already covered it.
# stt_timeout is a safety net. Short-circuit it if the transcript is
# already finalized, or if the VAD stop_secs already covered it.
self._stt_wait_done = False
effective_stt_wait = max(0.0, self._stt_timeout - self._stop_secs)
if not self._wait_for_transcript or self._transcript_finalized or effective_stt_wait <= 0:
if self._transcript_finalized or effective_stt_wait <= 0:
self._stt_wait_done = True
else:
self._stt_timeout_task = self.task_manager.create_task(
@@ -278,11 +253,9 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
Both timers must be done (stt is marked done immediately on the
fallback path and when finalization short-circuits the safety net),
the user must not be currently speaking, and at least one transcript
must have been received (skipped when ``wait_for_transcript`` is False).
must have been received.
"""
if self._vad_user_speaking:
return
if self._wait_for_transcript and not self._text:
if self._vad_user_speaking or not self._text:
return
if self._user_speech_wait_done and self._stt_wait_done:

View File

@@ -42,41 +42,17 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
the turn can be triggered immediately once the finalized transcript is
received. Otherwise, an STT timeout (adjusted by VAD stop_secs) is used
as a fallback.
Set ``wait_for_transcript=False`` to make this strategy not consider
user transcripts, so the user turn ends sooner — as soon as the
analyzer concludes the turn is complete. Most callers don't set
this directly: it's flipped automatically by
``wait_for_transcript_to_end_user_turn=False`` on
``LLMUserAggregatorParams``, which also wires the aggregator to
gather user transcripts after the turn ends. That pattern fits
pipelines where local turn detection drives a realtime service like
Gemini Live — the realtime service consumes user audio directly,
so user transcripts don't need to be in context before it can
respond.
"""
def __init__(
self,
*,
turn_analyzer: BaseTurnAnalyzer,
wait_for_transcript: bool = True,
**kwargs,
):
def __init__(self, *, turn_analyzer: BaseTurnAnalyzer, **kwargs):
"""Initialize the user turn stop strategy.
Args:
turn_analyzer: The turn detection analyzer instance to detect end of user turn.
wait_for_transcript: Whether the strategy considers user
transcripts in deciding when the user turn ends.
Defaults to True. Usually flipped indirectly via
``wait_for_transcript_to_end_user_turn=False`` on
``LLMUserAggregatorParams``.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self._turn_analyzer = turn_analyzer
self._wait_for_transcript = wait_for_transcript
self._stt_timeout: float = 0.0 # STT P99 latency from STTMetadataFrame
self._stop_secs: float = 0.0 # VAD stop_secs from VADUserStoppedSpeakingFrame
@@ -193,13 +169,6 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
# wait for transcriptions.
self._turn_complete = state == EndOfTurnState.COMPLETE
if not self._wait_for_transcript:
# No transcript to wait for. Trigger now if the turn is already
# complete; otherwise the analyzer's audio path will trigger once
# it indicates completion.
await self._maybe_trigger_user_turn_stopped()
return
# Start the STT timeout (adjusted by VAD stop_secs since that time already elapsed)
timeout = max(0, self._stt_timeout - self._stop_secs)
@@ -287,13 +256,11 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""Trigger user turn stopped if conditions are met.
Conditions:
- We have transcription text (skipped when ``wait_for_transcript`` is False)
- We have transcription text
- Turn analyzer indicates turn is complete
- Either the timeout has elapsed OR we have a finalized transcript
"""
if not self._turn_complete:
return
if self._wait_for_transcript and not self._text:
if not self._text or not self._turn_complete:
return
# For finalized transcripts, trigger immediately

View File

@@ -11,7 +11,6 @@ rich information about service execution including configuration,
parameters, and performance metrics.
"""
import contextlib
import functools
import inspect
import json
@@ -24,7 +23,16 @@ if TYPE_CHECKING:
from opentelemetry import context as context_api
from opentelemetry import trace
from pipecat.frames.frames import (
MetricsFrame,
TranscriptionFrame,
TTSStoppedFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.processors.aggregators.llm_context import NOT_GIVEN
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.tracing.service_attributes import (
add_gemini_live_span_attributes,
add_llm_span_attributes,
@@ -175,6 +183,13 @@ def traced_tts(func: Callable | None = None, *, name: str | None = None) -> Call
- Character count and text content
- Performance metrics like TTFB
The span is scoped to the full synthesis operation, from
``create_audio_context`` until ``TTSStoppedFrame`` (or
``remove_audio_context`` as a safety net), so TTFB and any other
runtime-computed metrics land on the correct span even when audio
chunks are delivered after ``run_tts`` returns (e.g. WebSocket
streaming TTS services).
Works with both async functions and generators.
Args:
@@ -190,102 +205,224 @@ def traced_tts(func: Callable | None = None, *, name: str | None = None) -> Call
def decorator(f):
is_async_generator = inspect.isasyncgenfunction(f)
@contextlib.asynccontextmanager
async def tracing_context(self, text):
"""Async context manager for TTS tracing.
def end_tts_span(service, context_id, *, interrupted=False):
"""End the TTS span for ``context_id`` if still open. Idempotent."""
entry = service._tts_spans.pop(context_id, None)
if not entry:
return
try:
span = entry["span"]
if interrupted:
span.set_attribute("tts.interrupted", True)
span.end()
except Exception as e:
logging.warning(f"Error closing TTS span: {e}")
Args:
self: The TTS service instance.
text: The text being synthesized.
def install_audio_context_patches(service):
"""Install per-instance wrappers on the audio-context methods.
Yields:
The active span for the TTS operation.
The wrappers own the lifetime of the TTS span:
- ``create_audio_context``: opens the span and records
baseline attributes.
- ``append_to_audio_context``: ends the span on
``TTSStoppedFrame``.
- ``push_frame``: records ``metrics.ttfb`` from the
canonical ``TTFBMetricsData`` payload of any
``MetricsFrame`` pushed by ``stop_ttfb_metrics``. Reading
the value from the metrics event (instead of polling
``_metrics.ttfb`` when the first audio is queued) avoids
the ``ttfb`` property's in-progress fallback, which would
otherwise report an under-estimate whenever a context's
audio waits behind earlier queued audio before
``_handle_audio_context`` actually stops the TTFB
measurement.
- ``remove_audio_context``: ends any still-open span as a
safety net for error and cancellation paths.
- ``on_audio_context_completed``: ends the span on natural
completion. Needed because services that rely on the
base class to auto-push ``TTSStoppedFrame`` (via
``push_frame`` in ``_handle_audio_context``) bypass the
``append_to_audio_context`` hook entirely.
- ``reset_active_audio_context``: ends the currently
playing context's span if still open. Always called from
``_handle_interruption``, so this is the interruption
hook.
The patches check ``_tracing_enabled`` at invocation time,
so they are safe to install regardless of whether tracing
is enabled.
"""
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
yield None
if getattr(service, "__tts_tracing_patches_installed__", False):
return
service.__tts_tracing_patches_installed__ = True
service._tts_spans = {}
orig_create = service.create_audio_context
orig_append = service.append_to_audio_context
orig_remove = service.remove_audio_context
orig_completed = service.on_audio_context_completed
orig_reset_active = service.reset_active_audio_context
orig_push_frame = service.push_frame
async def traced_create_audio_context(context_id):
if getattr(service, "_tracing_enabled", False):
try:
parent = _get_turn_context(service) or _get_parent_service_context(service)
tracer = trace.get_tracer("pipecat")
span = tracer.start_span("tts", context=parent)
service._tts_spans[context_id] = {"span": span, "ttfb_recorded": False}
settings = getattr(service, "_settings", None)
add_tts_span_attributes(
span=span,
service_name=service.__class__.__name__,
model=_get_model_name(service),
voice_id=getattr(settings, "voice", "unknown"),
settings=settings,
operation_name="tts",
)
except Exception as e:
logging.warning(f"Error opening TTS span: {e}")
return await orig_create(context_id)
async def traced_append_to_audio_context(context_id, frame):
entry = service._tts_spans.get(context_id)
if entry and frame is not None:
try:
if isinstance(frame, TTSStoppedFrame):
entry["span"].end()
service._tts_spans.pop(context_id, None)
except Exception as e:
logging.warning(f"Error updating TTS span: {e}")
return await orig_append(context_id, frame)
async def traced_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
await orig_push_frame(frame, direction)
if not getattr(service, "_tracing_enabled", False):
return
if not isinstance(frame, MetricsFrame):
return
try:
playing_id = getattr(service, "_playing_context_id", None)
if playing_id is None:
return
entry = service._tts_spans.get(playing_id)
if not entry or entry["ttfb_recorded"]:
return
for data in frame.data:
if isinstance(data, TTFBMetricsData):
entry["span"].set_attribute("metrics.ttfb", data.value)
entry["ttfb_recorded"] = True
break
except Exception as e:
logging.warning(f"Error recording TTS ttfb from MetricsFrame: {e}")
async def traced_remove_audio_context(context_id):
entry = service._tts_spans.pop(context_id, None)
if entry:
try:
entry["span"].end()
except Exception as e:
logging.warning(f"Error closing TTS span: {e}")
return await orig_remove(context_id)
async def traced_on_audio_context_completed(context_id):
end_tts_span(service, context_id)
return await orig_completed(context_id)
def traced_reset_active_audio_context():
playing_id = getattr(service, "_playing_context_id", None)
if playing_id is not None:
end_tts_span(service, playing_id, interrupted=True)
return orig_reset_active()
service.create_audio_context = traced_create_audio_context
service.append_to_audio_context = traced_append_to_audio_context
service.push_frame = traced_push_frame
service.remove_audio_context = traced_remove_audio_context
service.on_audio_context_completed = traced_on_audio_context_completed
service.reset_active_audio_context = traced_reset_active_audio_context
def patch_setup(owner):
"""Wrap ``owner.setup`` so audio-context patches install per-instance.
Idempotent: if a parent class has already been wrapped,
skip. The patches check ``_tracing_enabled`` at invocation
time, so wrapping is always safe.
"""
original_setup = owner.setup
if getattr(original_setup, "__tts_tracing_setup_wrapped__", False):
return
service_class_name = self.__class__.__name__
span_name = "tts"
@functools.wraps(original_setup)
async def patched_setup(self, setup):
await original_setup(self, setup)
install_audio_context_patches(self)
# Get parent context
parent_context = _get_turn_context(self) or _get_parent_service_context(self)
setattr(patched_setup, "__tts_tracing_setup_wrapped__", True)
owner.setup = patched_setup
# Create span
tracer = trace.get_tracer("pipecat")
with tracer.start_as_current_span(span_name, context=parent_context) as span:
try:
settings = getattr(self, "_settings", None)
add_tts_span_attributes(
span=span,
service_name=service_class_name,
model=_get_model_name(self),
voice_id=getattr(settings, "voice", "unknown"),
text=text,
settings=settings,
character_count=len(text),
operation_name="tts",
cartesia_version=getattr(self, "_cartesia_version", None),
context_id=getattr(self, "_context_id", None),
)
def attach_run_tts_attributes(service, text, args, kwargs):
"""Attach text-specific attributes to the in-flight TTS span."""
if not getattr(service, "_tracing_enabled", False):
return
try:
context_id = args[0] if args else kwargs.get("context_id")
entry = getattr(service, "_tts_spans", {}).get(context_id)
if entry and text:
span = entry["span"]
span.set_attribute("text", text)
span.set_attribute("metrics.character_count", len(text))
except Exception as e:
logging.warning(f"Error attaching TTS text to span: {e}")
yield span
def make_run_tts_wrapper():
"""Build the wrapper around ``run_tts`` that adds per-call attributes.
except Exception as e:
logging.warning(f"Error in TTS tracing: {e}")
raise
finally:
# Update TTFB metric at the end
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
if ttfb is not None:
span.set_attribute("metrics.ttfb", ttfb)
Span lifetime is owned by the audio-context patches. This
wrapper only attaches the text and character count to the
span that was opened by ``create_audio_context`` just
before ``run_tts`` was invoked.
"""
if is_async_generator:
if is_async_generator:
@functools.wraps(f)
async def gen_wrapper(self, text, *args, **kwargs):
if not getattr(self, "_tracing_enabled", False):
async for item in f(self, text, *args, **kwargs):
yield item
return
fn_called = False
try:
async with tracing_context(self, text):
fn_called = True
async for item in f(self, text, *args, **kwargs):
yield item
except Exception as e:
if fn_called:
raise
logging.error(f"Error in TTS tracing (continuing without tracing): {e}")
@functools.wraps(f)
async def gen_wrapper(self, text, *args, **kwargs):
attach_run_tts_attributes(self, text, args, kwargs)
async for item in f(self, text, *args, **kwargs):
yield item
return gen_wrapper
else:
return gen_wrapper
@functools.wraps(f)
async def wrapper(self, text, *args, **kwargs):
if not getattr(self, "_tracing_enabled", False):
return await f(self, text, *args, **kwargs)
async def coro_wrapper(self, text, *args, **kwargs):
attach_run_tts_attributes(self, text, args, kwargs)
return await f(self, text, *args, **kwargs)
fn_called = False
try:
async with tracing_context(self, text):
fn_called = True
return await f(self, text, *args, **kwargs)
except Exception as e:
if fn_called:
raise
logging.error(f"Error in TTS tracing (continuing without tracing): {e}")
return await f(self, text, *args, **kwargs)
return coro_wrapper
return wrapper
class _TracedTTSDescriptor:
"""Class-level descriptor that wires up TTS tracing at class definition time.
``__set_name__`` fires when the class body finishes evaluating,
giving us a chance to wrap the owner's ``setup()`` so that the
audio-context patches install on every instance before any
``create_audio_context`` call (including the very first one).
"""
def __set_name__(self, owner, attr_name):
patch_setup(owner)
setattr(owner, attr_name, make_run_tts_wrapper())
return _TracedTTSDescriptor()
if func is not None:
return decorator(func)
# ``decorator(func)`` returns a descriptor placeholder that
# Python replaces with the real wrapped function once
# ``__set_name__`` runs at class definition time. Pyright sees
# only the descriptor instance, hence the ignore.
return decorator(func) # type: ignore[return-value]
return decorator
@@ -299,72 +436,285 @@ def traced_stt(func: Callable | None = None, *, name: str | None = None) -> Call
- Language information
- Performance metrics like TTFB
The span is scoped to one STT segment, from
``VADUserStartedSpeakingFrame`` (or the first ``TranscriptionFrame``
when VAD did not fire, e.g. whispered speech) until a finalized
``TranscriptionFrame``. Multiple finalized transcripts in a single
user turn produce multiple sequential spans, each anchored at the
point speech for that segment began. ``metrics.ttfb`` is read after
the base ``push_frame`` runs ``stop_ttfb_metrics`` for the
finalized frame, so the value is correct for the closing span.
Args:
func: The STT method to trace.
name: Custom span name. Defaults to function name.
Returns:
Wrapped method with STT-specific tracing.
The original method unchanged. The decorator's class-definition-
time work is to install a ``push_frame`` wrapper on the owning
class that owns the span lifetime.
"""
if not is_tracing_available():
return _noop_decorator if func is None else _noop_decorator(func)
def decorator(f):
@functools.wraps(f)
async def wrapper(self, transcript, is_final, language=None):
if not getattr(self, "_tracing_enabled", False):
return await f(self, transcript, is_final, language)
def patch_push_frame(owner):
"""Wrap ``owner.push_frame`` to drive the STT span lifecycle.
fn_called = False
try:
service_class_name = self.__class__.__name__
span_name = "stt"
Idempotent: if a parent class has already been wrapped, skip.
The wrapper checks ``_tracing_enabled`` at invocation time,
so it is safe to install regardless of whether tracing is
enabled.
"""
original_push_frame = owner.push_frame
if getattr(original_push_frame, "__stt_tracing_push_frame_wrapped__", False):
return
# Get the turn context first, then fall back to service context
parent_context = _get_turn_context(self) or _get_parent_service_context(self)
def update_transcript(state, new_text):
"""Append or extend the current segment in ``state['segments']``.
# Create a new span as child of the turn span or service span
If ``new_text`` starts with the last recorded segment,
treat it as a continuation (interim accumulation) and
replace the last segment. Otherwise treat it as a new
segment and append. Some STT services (Deepgram with
utterance_end_ms enabled, for example) emit several
``TranscriptionFrame``s per turn where each carries a
different segment rather than a cumulative update —
without this logic the span's transcript would only
show the last segment and the beginning would be lost.
"""
if not new_text:
return
segments = state["segments"]
if not segments:
segments.append(new_text)
elif new_text.startswith(segments[-1]):
segments[-1] = new_text
else:
segments.append(new_text)
def open_span(service, state):
"""Open the STT span, anchored at ``segment_start_time`` if set."""
parent = _get_turn_context(service) or _get_parent_service_context(service)
tracer = trace.get_tracer("pipecat")
with tracer.start_as_current_span(
span_name, context=parent_context
) as current_span:
start_time_ns = (
int(state["segment_start_time"] * 1e9)
if state["segment_start_time"] is not None
else None
)
span = tracer.start_span("stt", context=parent, start_time=start_time_ns)
try:
settings = getattr(service, "_settings", None)
add_stt_span_attributes(
span=span,
service_name=service.__class__.__name__,
model=_get_model_name(service),
settings=settings,
vad_enabled=getattr(service, "vad_enabled", False),
)
except Exception as e:
logging.warning(f"Error setting STT span baseline attributes: {e}")
state["span"] = span
def handle_pre_push(service, frame, state):
"""Record speech-start anchor; lazy-open span on first transcript.
Lazy-opening on ``TranscriptionFrame`` (rather than on
``VADUserStartedSpeakingFrame`` or
``UserStartedSpeakingFrame``) avoids racing with
``TurnTraceObserver._handle_turn_started``, which runs
in a background task fired by ``_call_event_handler``
(``base_object.py:232``) and may not have set the new
turn's context yet — that produces STT spans parented
to the previous turn. By the time STT actually emits
a transcript, the turn observer has run.
Opening happens in pre-push (rather than post-push) so
that the recursive ``push_frame`` that
``STTService.push_frame`` triggers for the
``MetricsFrame`` (via ``stop_ttfb_metrics`` at
``stt_service.py:465``) sees the span already open and
can attribute ``metrics.ttfb`` to it.
"""
if isinstance(frame, VADUserStartedSpeakingFrame):
# Anchor the next span at the moment speech began.
# Skip if we already have an anchor (intra-turn VAD
# re-trigger) or a span open.
if state["span"] is None and state["segment_start_time"] is None:
state["segment_start_time"] = frame.timestamp - frame.start_secs
elif isinstance(frame, TranscriptionFrame) and state["span"] is None:
open_span(service, state)
async def handle_post_push(service, frame, state):
"""Attach per-frame attrs; close on finalized; record TTFB from MetricsFrame.
``metrics.ttfb`` is read off the ``TTFBMetricsData``
payload of any ``MetricsFrame`` pushed by
``stop_ttfb_metrics`` — the canonical value the rest
of the system uses — rather than from
``_metrics.ttfb``, which has an in-progress fallback
branch (``frame_processor_metrics.py:48-62``) that
would return an under-estimate if read at the wrong
time.
One STT span per finalized transcript: the span opens
lazily on the first ``TranscriptionFrame`` (pre-push,
anchored at speech start via ``segment_start_time``)
and closes on ``finalized=True``. Multiple finalized
transcripts in a single turn produce multiple spans.
For services that never set ``frame.finalized=True``
(e.g. Deepgram, which only marks it via
``confirm_finalize()``), the span closes on
``UserStoppedSpeakingFrame``. To capture
``metrics.ttfb`` for those spans we force-stop any
pending TTFB measurement before closing — that pushes
a ``MetricsFrame``, our post-push attributes the
value, and ``patched_stop_ttfb_metrics`` closes the
span. The ``stt.incomplete=true`` flag is only set if
neither a finalized transcript nor a TTFB measurement
ever finalized for the span.
"""
if isinstance(frame, UserStoppedSpeakingFrame):
prev_span = state["span"]
if prev_span is None:
return
metrics = getattr(service, "_metrics", None)
if metrics is not None and getattr(metrics, "_start_ttfb_time", 0) > 0:
last_transcript_time = getattr(service, "_last_transcript_time", 0) or None
try:
await service.stop_ttfb_metrics(end_time=last_transcript_time)
except Exception as e:
logging.warning(f"Error force-stopping STT TTFB on user turn end: {e}")
# patched_stop_ttfb_metrics may have closed the span
# via the timeout path; re-check.
if state["span"] is None:
state["segments"] = []
return
state["span"].set_attribute("stt.incomplete", True)
state["span"].end()
state["span"] = None
state["segment_start_time"] = None
state["segments"] = []
elif isinstance(frame, MetricsFrame):
span = state["span"]
if span is None:
return
for data in frame.data:
if isinstance(data, TTFBMetricsData):
span.set_attribute("metrics.ttfb", data.value)
break
elif isinstance(frame, TranscriptionFrame):
span = state["span"]
if span is None:
return
if frame.text:
update_transcript(state, frame.text)
span.set_attribute("transcript", " ".join(state["segments"]).strip())
span.set_attribute("is_final", bool(frame.finalized))
if frame.language:
span.set_attribute("language", str(frame.language))
if frame.user_id:
span.set_attribute("user_id", frame.user_id)
if frame.finalized:
span.end()
state["span"] = None
state["segment_start_time"] = None
state["segments"] = []
@functools.wraps(original_push_frame)
async def patched_push_frame(self, frame, direction=FrameDirection.DOWNSTREAM):
state = getattr(self, "_stt_span_state", None)
if state is None:
state = {"span": None, "segment_start_time": None, "segments": []}
self._stt_span_state = state
if getattr(self, "_tracing_enabled", False):
try:
# Get TTFB metric if available
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
# Use settings from the service if available
settings = getattr(self, "_settings", None)
add_stt_span_attributes(
span=current_span,
service_name=service_class_name,
model=_get_model_name(self),
transcript=transcript,
is_final=is_final,
language=str(language) if language else None,
user_id=getattr(self, "_user_id", None),
vad_enabled=getattr(self, "vad_enabled", False),
settings=settings,
ttfb=ttfb,
)
# Call the original function
fn_called = True
return await f(self, transcript, is_final, language)
handle_pre_push(self, frame, state)
except Exception as e:
# Log any exception but don't disrupt the main flow
logging.warning(f"Error in STT transcription tracing: {e}")
raise
except Exception as e:
if fn_called:
raise
logging.error(f"Error in STT tracing (continuing without tracing): {e}")
return await f(self, transcript, is_final, language)
logging.warning(f"Error in STT pre-push tracing: {e}")
return wrapper
await original_push_frame(self, frame, direction)
if getattr(self, "_tracing_enabled", False):
try:
await handle_post_push(self, frame, state)
except Exception as e:
logging.warning(f"Error in STT post-push tracing: {e}")
setattr(patched_push_frame, "__stt_tracing_push_frame_wrapped__", True)
owner.push_frame = patched_push_frame
def patch_stop_ttfb_metrics(owner):
"""Wrap ``owner.stop_ttfb_metrics`` to close the span on the timeout path.
When ``stop_ttfb_metrics`` is invoked with ``end_time`` set,
that signals the TTFB-timeout handler firing
(`stt_service.py:566`), or our own force-stop from the
``UserStoppedSpeakingFrame`` handler. In either case we
anchor the span's end at ``end_time``
(= ``_last_transcript_time``) rather than at whenever the
coroutine resumed.
``metrics.ttfb`` attribution is not done here — the
``MetricsFrame`` that ``stop_ttfb_metrics`` pushes flows
through ``push_frame`` and gets recorded by
``handle_post_push``, which reads the canonical
``TTFBMetricsData.value`` rather than the in-progress
``_metrics.ttfb`` property.
"""
original_stop = owner.stop_ttfb_metrics
if getattr(original_stop, "__stt_tracing_stop_ttfb_wrapped__", False):
return
@functools.wraps(original_stop)
async def patched_stop(self, *, end_time=None):
await original_stop(self, end_time=end_time)
if end_time is None:
return
if not getattr(self, "_tracing_enabled", False):
return
state = getattr(self, "_stt_span_state", None)
if not state or state["span"] is None:
return
try:
span = state["span"]
span.end(end_time=int(end_time * 1e9))
state["span"] = None
state["segment_start_time"] = None
state["segments"] = []
except Exception as e:
logging.warning(f"Error in STT stop_ttfb_metrics tracing: {e}")
setattr(patched_stop, "__stt_tracing_stop_ttfb_wrapped__", True)
owner.stop_ttfb_metrics = patched_stop
class _TracedSTTDescriptor:
"""Class-level descriptor that wires up STT tracing at class definition time.
``__set_name__`` fires when the class body finishes evaluating,
giving us a chance to wrap the owner's ``push_frame`` so that
VAD, transcription, and finalization events drive the span
lifecycle, and to wrap ``stop_ttfb_metrics`` so the
TTFB-timeout path can attach metrics and close the span when
no finalized transcript ever arrives. The decorated method
itself runs unchanged.
"""
def __set_name__(self, owner, attr_name):
patch_push_frame(owner)
patch_stop_ttfb_metrics(owner)
setattr(owner, attr_name, f)
return _TracedSTTDescriptor()
if func is not None:
return decorator(func)
# ``decorator(func)`` returns a descriptor placeholder that
# Python replaces with the real wrapped function once
# ``__set_name__`` runs at class definition time. Pyright sees
# only the descriptor instance, hence the ignore.
return decorator(func) # type: ignore[return-value]
return decorator
@@ -549,10 +899,6 @@ def traced_llm(func: Callable | None = None, *, name: str | None = None) -> Call
fn_called = True
result = await f(self, context, *args, **kwargs)
# Add aggregated output after function completes, if available
if output_text:
current_span.set_attribute("output", output_text)
return result
finally:
@@ -565,6 +911,15 @@ def traced_llm(func: Callable | None = None, *, name: str | None = None) -> Call
):
self.start_llm_usage_metrics = original_start_llm_usage_metrics
# Attach whatever output text we accumulated so
# far. Doing this in finally captures partial
# output when ``f`` is cancelled or raises mid-
# stream (e.g. interruption during LLM
# generation), rather than only on clean
# completion.
if output_text:
current_span.set_attribute("output", output_text)
# Update TTFB metric
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
if ttfb is not None:

111
tests/test_cartesia_tts.py Normal file
View File

@@ -0,0 +1,111 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.settings import TTSSettings
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
def _service(language: str) -> CartesiaTTSService:
service = CartesiaTTSService.__new__(CartesiaTTSService)
service._settings = TTSSettings(language=language)
return service
def _process_word_timestamps(
words: list[str], starts: list[float], language: str
) -> list[tuple[str, float]]:
return _service(language)._process_word_timestamps_for_language(words, starts)
def _concatenate_processed_timestamps(
timestamp_groups: list[tuple[list[str], list[float]]], language: str
) -> str:
service = _service(language)
text_parts = []
for words, starts in timestamp_groups:
processed_timestamps = service._process_word_timestamps_for_language(words, starts)
includes_inter_frame_spaces = service._word_timestamps_include_inter_frame_spaces()
text_parts.extend(
TextPartForConcatenation(
word,
includes_inter_part_spaces=includes_inter_frame_spaces,
)
for word, _timestamp in processed_timestamps
)
return concatenate_aggregated_text(text_parts)
def test_cartesia_chinese_word_timestamps_join_without_spaces():
assert _process_word_timestamps(
words=["", "", ""],
starts=[0.0, 0.1, 0.2],
language="zh",
) == [("你好。", 0.0)]
def test_cartesia_japanese_word_timestamps_join_without_spaces():
assert _process_word_timestamps(
words=["", "", "", "", "", ""],
starts=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5],
language="ja",
) == [("こんにちは。", 0.0)]
def test_cartesia_korean_word_timestamps_preserve_words_and_timestamps():
assert _process_word_timestamps(
words=["안녕하세요", "반갑습니다"],
starts=[0.0, 0.2],
language="ko",
) == [("안녕하세요", 0.0), ("반갑습니다", 0.2)]
def test_cartesia_korean_word_timestamps_do_not_join_latin_and_hangul():
assert _process_word_timestamps(
words=["AI", "어시스턴트입니다."],
starts=[3.7026982, 4.1999383],
language="ko",
) == [("AI", 3.7026982), ("어시스턴트입니다.", 4.1999383)]
def test_cartesia_japanese_timestamp_groups_reassemble_without_spaces():
assert (
_concatenate_processed_timestamps(
[
(["", "", "", "", "", "", ""], [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]),
(["", "", "", "", ""], [1.0, 1.1, 1.2, 1.3, 1.4]),
],
language="ja",
)
== "こんにちは、私はあなたの"
)
def test_cartesia_chinese_timestamp_groups_reassemble_without_spaces():
assert (
_concatenate_processed_timestamps(
[
(["", "", "", "", ""], [0.1, 0.2, 0.3, 0.4, 0.5]),
(["", "", "", ""], [1.0, 1.1, 1.2, 1.3]),
],
language="zh",
)
== "你好,我是你的智能"
)
def test_cartesia_korean_timestamp_groups_reassemble_with_spaces():
assert (
_concatenate_processed_timestamps(
[
(["저는"], [1.6]),
(["여러분의"], [1.8]),
(["AI", "어시스턴트입니다."], [3.7, 4.2]),
],
language="ko",
)
== "저는 여러분의 AI 어시스턴트입니다."
)

View File

@@ -763,474 +763,6 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["I'm thinking", "about pizza"])
async def test_no_wait_for_transcript_basic_flow(self):
"""``wait_for_transcript_to_end_user_turn=False`` splits the lifecycle:
- ``on_user_turn_stopped`` fires at the end of turn with empty
content (no transcripts have arrived yet).
- Transcripts arriving after the end of turn are captured into
``_aggregation``.
- When the post-turn transcript wait timer fires,
``on_user_turn_message_finalized`` fires with the populated
user context message.
"""
from unittest.mock import patch
from pipecat.processors.aggregators import llm_response_universal
# Shrink the timer so the test runs quickly.
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", TRANSCRIPTION_TIMEOUT):
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT
)
],
),
wait_for_transcript_to_end_user_turn=False,
),
)
events: list[tuple[str, str]] = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(("stopped", message.content))
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_finalized(aggregator, strategy, message):
events.append(("finalized", message.content))
pipeline = Pipeline([user_aggregator])
frames_to_send = [
VADUserStartedSpeakingFrame(),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
# Let the user_speech_timeout fire so the strategy
# fires turn-stopped.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
# Transcripts arrive after the end of turn (just one
# here for the basic case).
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
# Wait for the post-turn transcript wait timer to fire.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
await run_test(pipeline, frames_to_send=frames_to_send)
# Two events fired in order: end of turn first (empty),
# user message finalization later (populated).
self.assertEqual(events, [("stopped", None), ("finalized", "Hello!")])
# Context contains the user message.
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
async def test_no_wait_for_transcript_uses_stt_metadata_for_wait_timer(self):
"""The post-turn transcript wait timer prefers the STT-reported P99 TTFS
over ``DEFAULT_TTFS_P99``. With a long ``DEFAULT_TTFS_P99`` and
a short STT-reported value, the wait completes by the shorter
time — if the timer fell back to ``DEFAULT_TTFS_P99``, this test
would hang.
"""
from unittest.mock import patch
from pipecat.frames.frames import STTMetadataFrame
from pipecat.processors.aggregators import llm_response_universal
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", 60.0):
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT
)
],
),
wait_for_transcript_to_end_user_turn=False,
),
)
events: list[tuple[str, str | None]] = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(("stopped", message.content))
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_finalized(aggregator, strategy, message):
events.append(("finalized", message.content))
pipeline = Pipeline([user_aggregator])
frames_to_send = [
# STT service advertises its P99 TTFS latency.
STTMetadataFrame(service_name="TestSTT", ttfs_p99_latency=TRANSCRIPTION_TIMEOUT),
VADUserStartedSpeakingFrame(),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
# Let the user_speech_timeout fire so the strategy
# fires turn-stopped.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
# Wait for the post-turn transcript wait timer to fire (sized
# to the STT-reported TTFS, not DEFAULT_TTFS_P99).
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
await run_test(pipeline, frames_to_send=frames_to_send)
self.assertEqual(events, [("stopped", None), ("finalized", "Hello!")])
async def test_no_wait_for_transcript_no_transcripts_arrive(self):
"""When no transcripts arrive, the post-turn transcript wait timer still
runs — ``on_user_turn_message_finalized`` fires with empty
content and nothing is written to context.
"""
from unittest.mock import patch
from pipecat.processors.aggregators import llm_response_universal
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", TRANSCRIPTION_TIMEOUT):
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT
)
],
),
wait_for_transcript_to_end_user_turn=False,
),
)
events: list[tuple[str, str]] = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(("stopped", message.content))
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_finalized(aggregator, strategy, message):
events.append(("finalized", message.content))
pipeline = Pipeline([user_aggregator])
frames_to_send = [
VADUserStartedSpeakingFrame(),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
# Strategy fires turn-stopped.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
# Pending-finalization timer fires without any transcripts.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
await run_test(pipeline, frames_to_send=frames_to_send)
self.assertEqual(events, [("stopped", None), ("finalized", "")])
# No user message added to context (empty aggregation).
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual(user_messages, [])
async def test_no_wait_for_transcript_next_turn_force_flushes_previous(self):
"""If a new user turn starts while the previous turn's
finalization is still pending (precondition violation), the
previous turn's finalization fires before the new turn's start.
Whatever transcripts were captured by then are what lands in
context.
"""
from unittest.mock import patch
from pipecat.processors.aggregators import llm_response_universal
with patch.object(
llm_response_universal,
"DEFAULT_TTFS_P99",
TRANSCRIPTION_TIMEOUT * 10, # timer should NOT fire during the test
):
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT
)
],
),
wait_for_transcript_to_end_user_turn=False,
),
)
events: list[tuple[str, str]] = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(("stopped", message.content))
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_finalized(aggregator, strategy, message):
events.append(("finalized", message.content))
@user_aggregator.event_handler("on_user_turn_started")
async def on_started(aggregator, strategy):
events.append(("started", ""))
pipeline = Pipeline([user_aggregator])
frames_to_send = [
# Turn 1
VADUserStartedSpeakingFrame(),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
# Late transcript for turn 1 arrives (just one here for
# simplicity).
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
SleepFrame(),
# Turn 2 starts before turn 1's post-turn transcript wait timer
# fires — precondition violation. The aggregator should
# force-flush turn 1 first.
VADUserStartedSpeakingFrame(),
SleepFrame(),
]
await run_test(pipeline, frames_to_send=frames_to_send)
# The sequence must show turn 1's end of turn and user message
# finalization firing before turn 2's start event.
self.assertEqual(
events,
[
("started", ""), # turn 1 starts
("stopped", None), # turn 1 end of turn
("finalized", "Hello!"), # forced flush before turn 2 starts
("started", ""), # turn 2 starts
],
)
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
async def test_no_wait_for_transcript_context_order_with_assistant_response(self):
"""End-to-end ordering test: with both aggregators, verify the user
message lands in context *before* the assistant message, even
though the user's transcripts arrive after the end of turn.
Correct ordering requires the user aggregator's deferred
``push_aggregation`` to run before the assistant aggregator's
``push_aggregation`` (which fires on ``LLMFullResponseEndFrame``).
The patched-short post-turn transcript wait timer plus the sleep
between LLM start and end make that constraint hold here.
"""
from unittest.mock import patch
from pipecat.processors.aggregators import llm_response_universal
# Short timer so the user flush fires while the assistant
# response is still streaming.
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", 0.05):
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT
)
],
),
wait_for_transcript_to_end_user_turn=False,
),
)
pipeline = Pipeline([user_aggregator, assistant_aggregator])
frames_to_send = [
VADUserStartedSpeakingFrame(),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
# Strategy fires turn-stopped (end of turn).
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
# User transcripts arrive after end of turn (the realtime
# service has finally emitted them — just one here).
TranscriptionFrame(text="What's the weather?", user_id="", timestamp="now"),
# Bot starts responding. Ordering correctness depends on
# the user's post-turn transcript wait timer firing before
# LLMFullResponseEndFrame below.
LLMFullResponseStartFrame(),
LLMTextFrame("It's sunny."),
# Allow time for the user's post-turn transcript wait timer to
# fire (flushing the user message to context) before
# the assistant turn ends.
SleepFrame(sleep=0.1),
LLMFullResponseEndFrame(),
SleepFrame(),
]
await run_test(pipeline, frames_to_send=frames_to_send)
# Context must contain the user message before the assistant message.
roles_and_content = [(m.get("role"), m.get("content")) for m in context.get_messages()]
self.assertEqual(
roles_and_content,
[
("user", "What's the weather?"),
("assistant", "It's sunny."),
],
)
async def test_no_wait_for_transcript_strategies_are_mutated(self):
"""``wait_for_transcript_to_end_user_turn=False`` mutates the
provided strategies: drops ``TranscriptionUserTurnStartStrategy``
from start, flips ``wait_for_transcript=False`` on stop.
"""
from pipecat.turns.user_start import (
TranscriptionUserTurnStartStrategy,
VADUserTurnStartStrategy,
)
context = LLMContext()
stop = SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT,
wait_for_transcript=True, # explicitly True; bundle should flip to False
)
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
TranscriptionUserTurnStartStrategy(),
],
stop=[stop],
),
wait_for_transcript_to_end_user_turn=False,
),
)
# Start strategies: TranscriptionUserTurnStartStrategy dropped.
start_types = [type(s) for s in (user_aggregator._params.user_turn_strategies.start or [])]
self.assertEqual(start_types, [VADUserTurnStartStrategy])
# Stop strategy: wait_for_transcript flipped to False.
self.assertFalse(stop._wait_for_transcript)
async def test_transcript_fallback_default_mode(self):
"""The strategy's fallback path (transcripts with no prior VAD)
triggers turn-stopped correctly in default mode, and the user
message lands in context with the aggregated content.
"""
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
],
),
),
)
events: list[tuple[str, str]] = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(("stopped", message.content))
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_finalized(aggregator, strategy, message):
events.append(("finalized", message.content))
pipeline = Pipeline([user_aggregator])
# No VAD frames — fallback path: transcripts with no prior VAD
# (just one transcript here for simplicity).
frames_to_send = [
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
await run_test(pipeline, frames_to_send=frames_to_send)
# Both events fire with the aggregated content.
self.assertEqual(events, [("stopped", "Hello!"), ("finalized", "Hello!")])
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
async def test_transcript_fallback_no_wait_for_transcript_mode(self):
"""The strategy's fallback path still gets the user message into
context when ``wait_for_transcript_to_end_user_turn=False``,
even though no end-of-turn event ever fires (the bundle drops
``TranscriptionUserTurnStartStrategy``, so a transcript-only
flow never starts a turn in the controller; the strategy's
stop-fire is dropped by the controller too).
At session end the aggregated text is flushed and
``on_user_turn_message_finalized`` fires with the content.
``on_user_turn_stopped`` doesn't fire — when the aggregator
runs a post-turn transcript wait, that event is reserved for
the end-of-turn path.
"""
from unittest.mock import patch
from pipecat.processors.aggregators import llm_response_universal
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", TRANSCRIPTION_TIMEOUT):
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(
user_speech_timeout=TRANSCRIPTION_TIMEOUT
)
],
),
wait_for_transcript_to_end_user_turn=False,
),
)
events: list[tuple[str, str]] = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(("stopped", message.content))
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_finalized(aggregator, strategy, message):
events.append(("finalized", message.content))
pipeline = Pipeline([user_aggregator])
frames_to_send = [
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
# Wait long enough that the strategy's fallback timer
# has elapsed (its stop-fire is dropped by the
# controller, since no turn ever started).
SleepFrame(sleep=2 * TRANSCRIPTION_TIMEOUT + 0.1),
]
await run_test(pipeline, frames_to_send=frames_to_send)
# No end-of-turn event (no turn ever started in the controller).
# Only message_finalized fires, with the populated transcript.
self.assertEqual(events, [("finalized", "Hello!")])
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
async def test_empty(self):

View File

@@ -0,0 +1,31 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Tests for Inworld TTS language code mapping."""
from pipecat.services.inworld.tts import language_to_inworld_language
from pipecat.transcriptions.language import Language
def test_inworld_base_languages_resolve_to_canonical_regional_tags():
"""Base GA languages should use the regional tags emitted by Inworld Playground."""
assert language_to_inworld_language(Language.EN) == "en-US"
assert language_to_inworld_language(Language.RU) == "ru-RU"
assert language_to_inworld_language(Language.FR) == "fr-FR"
assert language_to_inworld_language(Language.ZH) == "zh-CN"
def test_inworld_regional_languages_are_preserved():
"""Explicit regional variants should be passed through as supported BCP-47 tags."""
assert language_to_inworld_language(Language.EN_GB) == "en-GB"
assert language_to_inworld_language(Language.PT_PT) == "pt-PT"
assert language_to_inworld_language(Language.RU_RU) == "ru-RU"
def test_inworld_other_languages_are_passed_through_as_bcp47_tags():
"""Languages outside the canonical locale map should keep their BCP-47 enum value."""
assert language_to_inworld_language(Language.SV_SE) == "sv-SE"
assert language_to_inworld_language(Language.UK_UA) == "uk-UA"

175
tests/test_soniox_stt.py Normal file
View File

@@ -0,0 +1,175 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import json
import pytest
from pipecat.frames.frames import TranscriptionFrame
from pipecat.services.soniox.stt import END_TOKEN, SonioxSTTService, _language_from_tokens
from pipecat.transcriptions.language import Language
class _FakeWebsocket:
def __init__(self, messages):
self._messages = messages
def __aiter__(self):
return self._iter_messages()
async def _iter_messages(self):
for message in self._messages:
yield message
def test_language_from_tokens_uses_single_recognized_language():
tokens = [
{"text": "Hello", "language": "en"},
{"text": " world", "language": "en"},
]
assert _language_from_tokens(tokens) == Language.EN
def test_language_from_tokens_uses_most_common_language():
tokens = [
{"text": "Ik", "language": "nl"},
{"text": " zoek", "language": "nl"},
{"text": " computer", "language": "en"},
]
assert _language_from_tokens(tokens) == Language.NL
def test_language_from_tokens_skips_unknown_language():
tokens = [
{"text": "Hello", "language": "en"},
{"text": "!", "language": "klingon"},
]
assert _language_from_tokens(tokens) == Language.EN
def test_language_from_tokens_skips_missing_language():
tokens = [
{"text": "Hello", "language": "en"},
{"text": " wereld"},
]
assert _language_from_tokens(tokens) == Language.EN
def test_language_from_tokens_ignores_unknown_and_missing_languages():
tokens = [
{"text": "Hello", "language": "klingon"},
{"text": " world"},
{"text": "!"},
]
assert _language_from_tokens(tokens) is None
def test_language_from_tokens_uses_first_language_on_tie():
tokens = [
{"text": "Hello", "language": "en"},
{"text": " wereld", "language": "nl"},
]
assert _language_from_tokens(tokens) == Language.EN
@pytest.mark.asyncio
async def test_receive_messages_sets_final_transcription_language(monkeypatch):
service = SonioxSTTService(api_key="test-key")
pushed_frames = []
traced_transcriptions = []
async def fake_push_frame(frame):
pushed_frames.append(frame)
async def fake_handle_transcription(transcript, is_final, language=None):
traced_transcriptions.append((transcript, is_final, language))
async def fake_stop_processing_metrics():
pass
messages = [
json.dumps(
{
"tokens": [
{"text": "Ik", "is_final": True, "language": "nl"},
{"text": " zoek", "is_final": True, "language": "nl"},
{"text": " computer", "is_final": True, "language": "en"},
{"text": END_TOKEN, "is_final": True},
]
}
),
json.dumps({"tokens": [], "finished": True}),
]
service._websocket = _FakeWebsocket(messages)
monkeypatch.setattr(service, "push_frame", fake_push_frame)
monkeypatch.setattr(service, "_handle_transcription", fake_handle_transcription)
monkeypatch.setattr(service, "stop_processing_metrics", fake_stop_processing_metrics)
await service._receive_messages()
final_frames = [frame for frame in pushed_frames if isinstance(frame, TranscriptionFrame)]
assert len(final_frames) == 1
assert final_frames[0].text == "Ik zoek computer"
assert final_frames[0].language == Language.NL
assert final_frames[0].finalized is True
assert final_frames[0].result == [
{"text": "Ik", "is_final": True, "language": "nl"},
{"text": " zoek", "is_final": True, "language": "nl"},
{"text": " computer", "is_final": True, "language": "en"},
]
assert traced_transcriptions == [("Ik zoek computer", True, Language.NL)]
@pytest.mark.asyncio
async def test_receive_messages_allows_final_transcription_without_language(monkeypatch):
service = SonioxSTTService(api_key="test-key")
pushed_frames = []
traced_transcriptions = []
async def fake_push_frame(frame):
pushed_frames.append(frame)
async def fake_handle_transcription(transcript, is_final, language=None):
traced_transcriptions.append((transcript, is_final, language))
async def fake_stop_processing_metrics():
pass
messages = [
json.dumps(
{
"tokens": [
{"text": "Tell", "is_final": True},
{"text": " me", "is_final": True},
{"text": " a", "is_final": True},
{"text": " joke.", "is_final": True},
{"text": END_TOKEN, "is_final": True},
]
}
),
json.dumps({"tokens": [], "finished": True}),
]
service._websocket = _FakeWebsocket(messages)
monkeypatch.setattr(service, "push_frame", fake_push_frame)
monkeypatch.setattr(service, "_handle_transcription", fake_handle_transcription)
monkeypatch.setattr(service, "stop_processing_metrics", fake_stop_processing_metrics)
await service._receive_messages()
final_frames = [frame for frame in pushed_frames if isinstance(frame, TranscriptionFrame)]
assert len(final_frames) == 1
assert final_frames[0].text == "Tell me a joke."
assert final_frames[0].language is None
assert final_frames[0].finalized is True
assert traced_transcriptions == [("Tell me a joke.", True, None)]

1623
uv.lock generated

File diff suppressed because it is too large Load Diff