Compare commits
11 Commits
mb/openrou
...
pk/optiona
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fee91ddec | ||
|
|
638294c1cc | ||
|
|
ea96b7aec7 | ||
|
|
666c619113 | ||
|
|
797d09a1d5 | ||
|
|
ee1538d18e | ||
|
|
8330c3487d | ||
|
|
4479a3a6af | ||
|
|
8631518388 | ||
|
|
47e2f7a037 | ||
|
|
6d21507e95 |
509
CHANGELOG.md
509
CHANGELOG.md
@@ -7,515 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
<!-- towncrier release notes start -->
|
||||
|
||||
## [1.2.1] - 2026-05-15
|
||||
|
||||
### Changed
|
||||
|
||||
- Changed the default WebSocket endpoints for `GradiumSTTService` and
|
||||
`GradiumTTSService` to the region-neutral
|
||||
`wss://api.gradium.ai/api/speech/asr` and
|
||||
`wss://api.gradium.ai/api/speech/tts`. Gradium now automatically routes
|
||||
traffic to the nearest endpoint. Override the url to pin to a specific
|
||||
region.
|
||||
(PR [#4500](https://github.com/pipecat-ai/pipecat/pull/4500))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed bot hangs when `filter_incomplete_user_turns` was enabled and the LLM
|
||||
responded by calling a tool. The user turn never finalized, so the assistant
|
||||
aggregator gated the tool-result context push and the LLM continuation never
|
||||
ran. Tool calls now finalize the turn the moment they start, before the
|
||||
function dispatches.
|
||||
(PR [#4501](https://github.com/pipecat-ai/pipecat/pull/4501))
|
||||
|
||||
## [1.2.0] - 2026-05-14
|
||||
|
||||
### Added
|
||||
|
||||
- Added a `session_id` field to `RunnerArguments` so bots can log or trace a
|
||||
per-session identifier in local development the same way they can in Pipecat
|
||||
Cloud. The development runner now mints a UUID at every construction site,
|
||||
and paths that already returned a `sessionId` to the caller (Daily `/start`,
|
||||
dial-in webhook) share that same UUID with the runner args instead of
|
||||
generating two. The SmallWebRTC `/api/offer` endpoint also accepts an
|
||||
optional `session_id` query parameter so the `/sessions/{session_id}/...`
|
||||
proxy can thread it through.
|
||||
(PR [#4385](https://github.com/pipecat-ai/pipecat/pull/4385))
|
||||
|
||||
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService`
|
||||
for controlling Cartesia's server-side text buffering. When unset, Pipecat
|
||||
picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE`
|
||||
mode (custom buffering — avoids stacking client-side aggregation on top of
|
||||
Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode
|
||||
(Cartesia's managed buffering applies). Pass an explicit value (0–5000ms) to
|
||||
override.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and
|
||||
`DeepgramHttpTTSService` so callers can opt out of the Deepgram Model
|
||||
Improvement Program. When set, the value is forwarded to Deepgram as a query
|
||||
parameter on the speak request. Defaults to `None`, which preserves the
|
||||
existing behavior. See https://dpgr.am/deepgram-mip for pricing implications
|
||||
before enabling.
|
||||
(PR [#4400](https://github.com/pipecat-ai/pipecat/pull/4400))
|
||||
|
||||
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set
|
||||
via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that
|
||||
appends a developer-role message to the context whenever `LLMSetToolsFrame`
|
||||
changes the set of advertised standard tools. Helps the LLM stay coherent
|
||||
across mid-conversation tool changes, mitigating several flavors of
|
||||
tool-call-related hallucination: calling tools that have been removed,
|
||||
avoiding tools that have been re-added, and hallucinating output (made-up
|
||||
answers or tool-call-shaped non-tool-calls) when tools are unavailable.
|
||||
(PR [#4404](https://github.com/pipecat-ai/pipecat/pull/4404))
|
||||
|
||||
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in
|
||||
`pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the
|
||||
inference-triggered event and suppresses `on_user_turn_stopped`, leaving
|
||||
finalization to another strategy in the chain such as
|
||||
`LLMTurnCompletionUserTurnStopStrategy`.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` —
|
||||
a generic stop strategy that finalizes the user turn whenever a
|
||||
`UserTurnInferenceCompletedFrame` arrives, regardless of which component
|
||||
produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base;
|
||||
future producers (Flux, custom end-of-turn classifiers, etc.) can use the
|
||||
base directly or subclass it to add producer-specific setup.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `on_user_turn_inference_triggered`, a new event on the user turn
|
||||
controller, processor, aggregator and stop strategies that fires when a
|
||||
strategy has enough signal to start LLM inference. By default it fires
|
||||
together with `on_user_turn_stopped`; a gating strategy can fire only the
|
||||
inference-triggered event and defer finalization to a peer.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `FilterIncompleteUserTurnStrategies` in
|
||||
`pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization
|
||||
that wraps the detector chain with `deferred(...)` and appends
|
||||
`LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case:
|
||||
`user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass
|
||||
`config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`.
|
||||
When installed, the strategy gates `on_user_turn_stopped` on a
|
||||
`UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by
|
||||
any component that can judge turn completeness — e.g. the
|
||||
`UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout`
|
||||
provides a safety net if no completion frame ever arrives.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Added first-class RTVI support for the UI Agent Protocol:
|
||||
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server
|
||||
messages, plus `ui-command` and `ui-task` server-to-client messages, with
|
||||
paired `*Data` / `*Message` pydantic models.
|
||||
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`,
|
||||
`Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching
|
||||
default handlers live in `@pipecat-ai/client-react`.
|
||||
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`,
|
||||
and `ui-cancel-task` messages.
|
||||
- Adds five UI pipeline frames, mirroring the `client-message`
|
||||
frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` /
|
||||
`RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` /
|
||||
`UITaskMessage` envelopes, while the processor pushes inbound
|
||||
`RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame`
|
||||
alongside `on_ui_message`.
|
||||
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.
|
||||
(PR [#4407](https://github.com/pipecat-ai/pipecat/pull/4407))
|
||||
|
||||
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore
|
||||
processor now resolve credentials via the standard boto3 provider chain (EC2
|
||||
instance profiles, EKS pod roles / IRSA, ECS task roles, SSO,
|
||||
`~/.aws/credentials`) when explicit credentials and `AWS_*` environment
|
||||
variables are absent. Services running with IAM roles no longer need to
|
||||
export static credentials.
|
||||
(PR [#4416](https://github.com/pipecat-ai/pipecat/pull/4416))
|
||||
|
||||
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can
|
||||
bias transcription for both file-based and realtime transcription.
|
||||
(PR [#4426](https://github.com/pipecat-ai/pipecat/pull/4426))
|
||||
|
||||
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and
|
||||
`DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum
|
||||
silence duration before the watchdog sends a silence packet to prevent
|
||||
dangling turns. The actual threshold is `max(chunk_duration * 2,
|
||||
watchdog_min_timeout)`, so it also adapts automatically to the audio chunk
|
||||
size in use.
|
||||
(PR [#4430](https://github.com/pipecat-ai/pipecat/pull/4430))
|
||||
|
||||
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on
|
||||
models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini
|
||||
2.x); the conversation now continues while the tool runs. On models that
|
||||
don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time
|
||||
warning explaining the limitation. (Note: an intermittent 1008 error can
|
||||
occasionally fire on Gemini 2.5 during long-running tool calls; we
|
||||
auto-reconnect.)
|
||||
(PR [#4448](https://github.com/pipecat-ai/pipecat/pull/4448))
|
||||
|
||||
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition
|
||||
using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint.
|
||||
Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is
|
||||
VAD-aware, and automatically reconnects on error.
|
||||
(PR [#4464](https://github.com/pipecat-ai/pipecat/pull/4464))
|
||||
|
||||
- Added NVIDIA Magpie TTS services via AWS SageMaker:
|
||||
`NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM
|
||||
back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream
|
||||
with full interruption support via `InterruptibleTTSService`).
|
||||
(PR [#4464](https://github.com/pipecat-ai/pipecat/pull/4464))
|
||||
|
||||
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`,
|
||||
for use with reasoning-capable Realtime models such as `gpt-realtime-2`.
|
||||
(PR [#4470](https://github.com/pipecat-ai/pipecat/pull/4470))
|
||||
|
||||
- Inworld TTS updates:
|
||||
- Added `delivery_mode` setting (`STABLE`/`BALANCED`/`CREATIVE`) to
|
||||
`InworldTTSService` and `InworldHttpTTSService`, enabling the
|
||||
stability-vs-creativity tradeoff in `inworld-tts-2`.
|
||||
- Added language support to `InworldTTSService` and
|
||||
`InworldHttpTTSService`. The `language` setting is now forwarded to the API,
|
||||
and a new `language_to_inworld_language()` helper normalizes Pipecat
|
||||
`Language` enums to Inworld's BCP-47 locale tags.
|
||||
(PR [#4473](https://github.com/pipecat-ai/pipecat/pull/4473))
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated the default `SonioxTTSService` model from `tts-rt-v1-preview` to the
|
||||
generally available `tts-rt-v1`.
|
||||
(PR [#4386](https://github.com/pipecat-ai/pipecat/pull/4386))
|
||||
|
||||
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16`
|
||||
to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the
|
||||
`use_normalized_timestamps` and `max_buffer_delay_ms` fields.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead
|
||||
of the deprecated `use_original_timestamps` field. Word timestamps now
|
||||
reflect what was actually spoken (post text-normalization and
|
||||
pronunciation-dictionary substitution), matching the convention Pipecat uses
|
||||
for ElevenLabs. This is a behavior change for `sonic-3` users, who were
|
||||
previously receiving timestamps tied to the input transcript.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Broadened `tool_resources` to `app_resources` for easy access not just in
|
||||
tool handlers but in other places like custom `FrameProcessor`s. Three
|
||||
changes: a rename (`tool_resources` → `app_resources`), a new `app_resources`
|
||||
property on `PipelineTask`, and a new `pipeline_task` property on
|
||||
`FrameProcessor`. Tool handlers now read `params.app_resources`; custom
|
||||
processors read `self.pipeline_task.app_resources`. The previous
|
||||
`tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and
|
||||
`FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit
|
||||
`DeprecationWarning`s.
|
||||
(PR [#4395](https://github.com/pipecat-ai/pipecat/pull/4395))
|
||||
|
||||
- Lowered the per-message log in
|
||||
`SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App
|
||||
messages can be high-frequency and were noisy at debug level; set the loguru
|
||||
level to `TRACE` to see them again.
|
||||
(PR [#4397](https://github.com/pipecat-ai/pipecat/pull/4397))
|
||||
|
||||
- Changed the default model for `GrokRealtimeLLMService` to
|
||||
`grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The
|
||||
previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is
|
||||
being removed.
|
||||
(PR [#4401](https://github.com/pipecat-ai/pipecat/pull/4401))
|
||||
|
||||
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to
|
||||
`inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`,
|
||||
`InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing
|
||||
users can pin the prior model explicitly via the `model`/`tts_model`
|
||||
argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid
|
||||
model IDs.
|
||||
(PR [#4422](https://github.com/pipecat-ai/pipecat/pull/4422))
|
||||
|
||||
- Changed the default model for `GrokLLMService` from `grok-3` to
|
||||
`grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.
|
||||
(PR [#4429](https://github.com/pipecat-ai/pipecat/pull/4429))
|
||||
|
||||
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic:
|
||||
`max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms.
|
||||
This prevents false silence injections when large audio chunks are sent at
|
||||
lower frequency.
|
||||
(PR [#4430](https://github.com/pipecat-ai/pipecat/pull/4430))
|
||||
|
||||
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the
|
||||
turn is complete (on `on_turn_context_completed`) rather than waiting until
|
||||
all audio has finished playing back. The `isFinal` message from ElevenLabs is
|
||||
now used to signal `TTSStoppedFrame` and clean up the audio context,
|
||||
improving turn transition timing.
|
||||
(PR [#4433](https://github.com/pipecat-ai/pipecat/pull/4433))
|
||||
|
||||
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio
|
||||
encoding by default, which returns audio bytes without headers.
|
||||
(PR [#4446](https://github.com/pipecat-ai/pipecat/pull/4446))
|
||||
|
||||
- Moved `create_task`, `cancel_task`, the `task_manager` property, and
|
||||
`setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom
|
||||
`BaseObject` subclasses (turn strategies, controllers, etc.) now inherit
|
||||
these methods directly instead of reimplementing the task manager wiring.
|
||||
Owners propagate the task manager to their child `BaseObject`s via `await
|
||||
child.setup(task_manager)`.
|
||||
(PR [#4449](https://github.com/pipecat-ai/pipecat/pull/4449))
|
||||
|
||||
- Changed the default OpenAI Realtime input audio transcription model from
|
||||
`gpt-4o-transcribe` to `gpt-realtime-whisper` for both
|
||||
`OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does
|
||||
not accept the `prompt` parameter; if a prompt is supplied alongside
|
||||
`gpt-realtime-whisper`, it is dropped automatically and a warning is logged.
|
||||
To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or
|
||||
`"gpt-4o-mini-transcribe"`).
|
||||
(PR [#4450](https://github.com/pipecat-ai/pipecat/pull/4450))
|
||||
|
||||
- Updated the default model for `CartesiaTTSService` and
|
||||
`CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.
|
||||
(PR [#4462](https://github.com/pipecat-ai/pipecat/pull/4462))
|
||||
|
||||
- Changed the default model for `OpenAIRealtimeLLMService` from
|
||||
`gpt-realtime-1.5` to `gpt-realtime-2`.
|
||||
(PR [#4472](https://github.com/pipecat-ai/pipecat/pull/4472))
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use
|
||||
`user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add
|
||||
`LLMTurnCompletionUserTurnStopStrategy` to a custom
|
||||
`user_turn_strategies.stop`) instead. Setting the legacy flag still works for
|
||||
one release: the aggregator emits a `DeprecationWarning` and rewires the
|
||||
strategies as if you had passed `FilterIncompleteUserTurnStrategies`
|
||||
directly.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the
|
||||
`create_file_resampler()` / `create_stream_resampler()` factories).
|
||||
Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class
|
||||
will be removed in Pipecat 2.0 along with the default `resampy` and `numba`
|
||||
dependencies.
|
||||
(PR [#4428](https://github.com/pipecat-ai/pipecat/pull/4428))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as
|
||||
`ErrorFrame`s. The latest API emits a `flush_done` per transcript when
|
||||
server-side buffering is disabled; Pipecat now consumes them silently since
|
||||
each turn already has its own `context_id`.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`,
|
||||
`VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance
|
||||
(e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both
|
||||
the class and an instance.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200
|
||||
response — one with the API's error text and a second, less informative
|
||||
"Unknown error" frame from the outer exception handler. It now pushes a
|
||||
single frame that includes the HTTP status code and returns cleanly.
|
||||
(PR [#4390](https://github.com/pipecat-ai/pipecat/pull/4390))
|
||||
|
||||
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally
|
||||
for user turn stop strategies. It is now only imported when
|
||||
`default_user_turn_stop_strategies()` is called. This improves startup time
|
||||
and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning
|
||||
when the default stop strategies are not used.
|
||||
(PR [#4393](https://github.com/pipecat-ai/pipecat/pull/4393))
|
||||
|
||||
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was
|
||||
stored in `Settings` but never sent to xAI, so every session silently fell
|
||||
back to xAI's server-side default. The model is now passed via the `?model=`
|
||||
query parameter on the WebSocket URL as xAI's Voice Agent API requires.
|
||||
(PR [#4401](https://github.com/pipecat-ai/pipecat/pull/4401))
|
||||
|
||||
- Fixed `on_user_turn_stopped` firing prematurely when
|
||||
`filter_incomplete_user_turns` was enabled. The event now fires only after
|
||||
the LLM confirms the user turn is complete (`✓`); previously the smart-turn
|
||||
detector's tentative stop was bubbling up before the LLM had a chance to veto
|
||||
it, causing observers, transcript appenders and UI indicators to receive an
|
||||
early — and sometimes duplicated — signal.
|
||||
(PR [#4405](https://github.com/pipecat-ai/pipecat/pull/4405))
|
||||
|
||||
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting
|
||||
across two assistant messages in the LLM context and not surfacing in
|
||||
`on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted
|
||||
at the end of a TTS context now carries a PTS just past the last word so it
|
||||
can't overtake clock-queued `TTSTextFrame`s in the transport's output, and
|
||||
`LLMAssistantAggregator` now triggers
|
||||
`on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the
|
||||
frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting
|
||||
transcripts).
|
||||
(PR [#4414](https://github.com/pipecat-ai/pipecat/pull/4414))
|
||||
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged
|
||||
words (e.g. `bookLook`) when using Flash models. Flash often splits sentences
|
||||
mid-stream into alignment chunks that begin with a real inter-word space, but
|
||||
the previous fix unconditionally stripped that space from every chunk.
|
||||
Leading spaces are now stripped only on the first alignment chunk of an
|
||||
utterance, so subsequent chunks correctly flush partial words across
|
||||
boundaries.
|
||||
(PR [#4415](https://github.com/pipecat-ai/pipecat/pull/4415))
|
||||
|
||||
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor
|
||||
erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
|
||||
was set in the environment. The half-populated kwargs are no longer forwarded
|
||||
to aioboto3; partial env-var configurations now fall through to the boto3
|
||||
credential chain like fully-unset configurations do.
|
||||
(PR [#4416](https://github.com/pipecat-ai/pipecat/pull/4416))
|
||||
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing
|
||||
romanized/normalized text to the LLM context. With non-Latin input (e.g.,
|
||||
Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao
|
||||
!` instead of `你好!`), which then degraded subsequent LLM turns. The services
|
||||
now consume `alignment` by default and only switch to `normalizedAlignment` /
|
||||
`normalized_alignment` when `pronunciation_dictionary_locators` is configured
|
||||
(where `alignment` has overlapping restarts that produce duplicated/garbled
|
||||
words, per #4316). Both fields are read with preferred-with-fallback
|
||||
semantics since each is nullable per the API schema.
|
||||
(PR [#4424](https://github.com/pipecat-ai/pipecat/pull/4424))
|
||||
|
||||
- Fixed a deadlock in `TTSService` that could permanently stall pipeline
|
||||
processing when all three conditions occurred together:
|
||||
`pause_frame_processing=True`, an interruption arrived before any TTS audio
|
||||
was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`,
|
||||
`FunctionCallResultFrame`) was in the processing queue at that moment. The
|
||||
process task would block on `__process_event.wait()` indefinitely because
|
||||
`BotStoppedSpeakingFrame` never arrives (no audio was played) and the
|
||||
interruption handler did not resume processing. Affects services using
|
||||
`pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and
|
||||
ResembleAI.
|
||||
(PR [#4431](https://github.com/pipecat-ai/pipecat/pull/4431))
|
||||
|
||||
- Fixed interruptions being delayed when a slow non-uninterruptible frame was
|
||||
processing and an uninterruptible frame was waiting in the queue. The bot
|
||||
would stall until the slow frame finished instead of cancelling it
|
||||
immediately on interruption.
|
||||
(PR [#4434](https://github.com/pipecat-ai/pipecat/pull/4434))
|
||||
|
||||
- Fixed `TTSService` dropping uninterruptible frames (e.g.
|
||||
`FunctionCallResultFrame`) from its internal serialization queue when an
|
||||
interruption occurs. Previously, the queue was recreated on every
|
||||
interruption, silently discarding any queued frames. The queue is now reset
|
||||
instead of recreated, preserving uninterruptible frames so they are always
|
||||
delivered downstream.
|
||||
(PR [#4435](https://github.com/pipecat-ai/pipecat/pull/4435))
|
||||
|
||||
- Fixed a race condition in the Daily transport that caused `AttributeError:
|
||||
'NoneType' object has no attribute 'send_app_message'` when tearing down a
|
||||
pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the
|
||||
same `DailyTransportClient` and both call `cleanup()`, which was releasing
|
||||
the underlying `CallClient` on the first call — leaving the second caller
|
||||
with a `None` client.
|
||||
(PR [#4440](https://github.com/pipecat-ai/pipecat/pull/4440))
|
||||
|
||||
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService`
|
||||
and `OpenAIRealtimeLLMService`. These services previously honored the flag by
|
||||
simply not cancelling in-flight function calls on interruption; the
|
||||
introduction of the new async-tool mechanism (which threads
|
||||
started/intermediate/final messages through the LLM context) broke that path
|
||||
because the realtime services didn't know how to interpret those messages.
|
||||
Note that new-style streamed intermediate results
|
||||
(`FunctionCallResultProperties(is_final=False)`) are not supported on these
|
||||
realtime services. Similar fixes for other impacted realtime services are
|
||||
forthcoming.
|
||||
(PR [#4441](https://github.com/pipecat-ai/pipecat/pull/4441))
|
||||
|
||||
- Fixed two misspelled Gemini TTS voice names in
|
||||
`GeminiTTSService.AVAILABLE_VOICES`.
|
||||
(PR [#4443](https://github.com/pipecat-ai/pipecat/pull/4443))
|
||||
|
||||
- Extended the `cancel_on_interruption=False` regression fix to
|
||||
`GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and
|
||||
`UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in
|
||||
#4441 (each service detects async-tool messages in the LLM context and routes
|
||||
the final result to its formal tool-result channel; Azure inherits
|
||||
transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different
|
||||
approach because its API freezes the conversation between
|
||||
`client_tool_invocation` and the matching `client_tool_result` — for
|
||||
async-registered functions it now ships a placeholder `client_tool_result`
|
||||
immediately when the function is invoked (to unfreeze the conversation), then
|
||||
injects the real result as user-side text once the tool finishes. Streamed
|
||||
intermediate results (`FunctionCallResultProperties(is_final=False)`) are
|
||||
still not supported on any of these realtime services. `GeminiLiveLLMService`
|
||||
and `InworldRealtimeLLMService` are excluded for now: Gemini Live's
|
||||
async-tool path needs deeper investigation, and Inworld tool calling needs to
|
||||
be sorted out first.
|
||||
(PR [#4447](https://github.com/pipecat-ai/pipecat/pull/4447))
|
||||
|
||||
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses
|
||||
(observed with `gpt-realtime-2`). A single response can now contain more than
|
||||
one audio item, and the first item's `audio.done` may arrive after the second
|
||||
item's deltas have started. Deltas still arrive strictly in playback order,
|
||||
so we continue to forward them as received (matching OpenAI's reference
|
||||
implementation). The fix removes spurious warnings, ensures truncation always
|
||||
targets the latest audio item, and emits a single bracketing
|
||||
`TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is
|
||||
now pushed on `response.done`).
|
||||
(PR [#4465](https://github.com/pipecat-ai/pipecat/pull/4465))
|
||||
|
||||
- Fixed missing `output` attribute on LLM OpenTelemetry spans when the LLM call
|
||||
is interrupted mid-stream.
|
||||
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
|
||||
|
||||
- Fixed incorrect `metrics.ttfb` on STT OpenTelemetry spans, and parented them
|
||||
to the current turn span.
|
||||
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
|
||||
|
||||
- Fixed incorrect `metrics.ttfb` on TTS OpenTelemetry spans for streaming
|
||||
services.
|
||||
(PR [#4467](https://github.com/pipecat-ai/pipecat/pull/4467))
|
||||
|
||||
- Extended the `cancel_on_interruption=False` regression fix to
|
||||
`InworldRealtimeLLMService`. Uses the same approach as in #4441 (the service
|
||||
detects async-tool messages in the LLM context and routes the final result to
|
||||
its formal tool-result channel). Note: as of this writing, Inworld Realtime
|
||||
doesn't appear to handle the resulting delayed tool result reliably — the
|
||||
routing is best-effort and the service surfaces a one-time warning when
|
||||
async-tool messages are seen. Streamed intermediate results
|
||||
(`FunctionCallResultProperties(is_final=False)`) are still not supported on
|
||||
this realtime service. (Inworld was excluded from #4447 pending resolution of
|
||||
an unrelated tool-calling issue, which turned out to be an account-level
|
||||
matter.)
|
||||
(PR [#4474](https://github.com/pipecat-ai/pipecat/pull/4474))
|
||||
|
||||
- Fixed Cartesia TTS Korean word timestamps to use normal spacing rules,
|
||||
preserving word boundaries and per-word timestamp alignment during downstream
|
||||
aggregation.
|
||||
(PR [#4475](https://github.com/pipecat-ai/pipecat/pull/4475))
|
||||
|
||||
- Fixed Cartesia TTS Chinese and Japanese timestamp grouping to preserve
|
||||
provider text spacing, avoiding artificial spaces when timestamp groups are
|
||||
reassembled downstream.
|
||||
(PR [#4475](https://github.com/pipecat-ai/pipecat/pull/4475))
|
||||
|
||||
- Fixed `SonioxSTTService` final transcription frames missing detected language
|
||||
metadata when Soniox returns token-level language annotations.
|
||||
(PR [#4482](https://github.com/pipecat-ai/pipecat/pull/4482))
|
||||
|
||||
- Fixed Soniox final transcription language detection to use the most common
|
||||
recognized token language, avoiding mislabeling an utterance when the last
|
||||
token is tagged with a different language.
|
||||
(PR [#4495](https://github.com/pipecat-ai/pipecat/pull/4495))
|
||||
|
||||
- Fixed dropped audio in streaming TTS services whose wire protocol doesn't
|
||||
echo `context_id` back on incoming audio (Sarvam, Smallest, Soniox, Inworld,
|
||||
and others). Previously, audio that arrived between contexts or at the very
|
||||
start of a turn was tagged with `context_id=None` and silently dropped with
|
||||
an "unable to append audio to context: no context ID provided" debug log.
|
||||
`TTSService.get_active_audio_context_id()` now falls back to the
|
||||
synthesis-side `_turn_context_id` when the playback cursor isn't set yet.
|
||||
(PR [#4497](https://github.com/pipecat-ai/pipecat/pull/4497))
|
||||
|
||||
### Security
|
||||
|
||||
- Fixed a path traversal issue in the development runner's
|
||||
`/files/{filename:path}` download endpoint. Previously, when the runner was
|
||||
started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could
|
||||
escape the configured folder because `%2F`-encoded separators bypassed
|
||||
Starlette's path normalisation. The endpoint now resolves the joined path and
|
||||
rejects any filename that escapes the allowed base with a 403, and also
|
||||
returns 404 (instead of an implicit `null` 200) when `--folder` is unset.
|
||||
(PR [#4417](https://github.com/pipecat-ai/pipecat/pull/4417))
|
||||
|
||||
## [1.1.0] - 2026-04-27
|
||||
|
||||
### Added
|
||||
|
||||
1
changelog/4385.added.md
Normal file
1
changelog/4385.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added a `session_id` field to `RunnerArguments` so bots can log or trace a per-session identifier in local development the same way they can in Pipecat Cloud. The development runner now mints a UUID at every construction site, and paths that already returned a `sessionId` to the caller (Daily `/start`, dial-in webhook) share that same UUID with the runner args instead of generating two. The SmallWebRTC `/api/offer` endpoint also accepts an optional `session_id` query parameter so the `/sessions/{session_id}/...` proxy can thread it through.
|
||||
1
changelog/4386.changed.md
Normal file
1
changelog/4386.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Updated the default `SonioxTTSService` model from `tts-rt-v1-preview` to the generally available `tts-rt-v1`.
|
||||
1
changelog/4390.added.md
Normal file
1
changelog/4390.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService` for controlling Cartesia's server-side text buffering. When unset, Pipecat picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE` mode (custom buffering — avoids stacking client-side aggregation on top of Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode (Cartesia's managed buffering applies). Pass an explicit value (0–5000ms) to override.
|
||||
1
changelog/4390.changed.2.md
Normal file
1
changelog/4390.changed.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16` to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the `use_normalized_timestamps` and `max_buffer_delay_ms` fields.
|
||||
1
changelog/4390.changed.md
Normal file
1
changelog/4390.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead of the deprecated `use_original_timestamps` field. Word timestamps now reflect what was actually spoken (post text-normalization and pronunciation-dictionary substitution), matching the convention Pipecat uses for ElevenLabs. This is a behavior change for `sonic-3` users, who were previously receiving timestamps tied to the input transcript.
|
||||
1
changelog/4390.fixed.2.md
Normal file
1
changelog/4390.fixed.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200 response — one with the API's error text and a second, less informative "Unknown error" frame from the outer exception handler. It now pushes a single frame that includes the HTTP status code and returns cleanly.
|
||||
1
changelog/4390.fixed.3.md
Normal file
1
changelog/4390.fixed.3.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`, `VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance (e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both the class and an instance.
|
||||
1
changelog/4390.fixed.md
Normal file
1
changelog/4390.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as `ErrorFrame`s. The latest API emits a `flush_done` per transcript when server-side buffering is disabled; Pipecat now consumes them silently since each turn already has its own `context_id`.
|
||||
1
changelog/4393.fixed.md
Normal file
1
changelog/4393.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally for user turn stop strategies. It is now only imported when `default_user_turn_stop_strategies()` is called. This improves startup time and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning when the default stop strategies are not used.
|
||||
1
changelog/4395.changed.md
Normal file
1
changelog/4395.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Broadened `tool_resources` to `app_resources` for easy access not just in tool handlers but in other places like custom `FrameProcessor`s. Three changes: a rename (`tool_resources` → `app_resources`), a new `app_resources` property on `PipelineTask`, and a new `pipeline_task` property on `FrameProcessor`. Tool handlers now read `params.app_resources`; custom processors read `self.pipeline_task.app_resources`. The previous `tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and `FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit `DeprecationWarning`s.
|
||||
1
changelog/4397.changed.md
Normal file
1
changelog/4397.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Lowered the per-message log in `SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App messages can be high-frequency and were noisy at debug level; set the loguru level to `TRACE` to see them again.
|
||||
1
changelog/4400.added.md
Normal file
1
changelog/4400.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and `DeepgramHttpTTSService` so callers can opt out of the Deepgram Model Improvement Program. When set, the value is forwarded to Deepgram as a query parameter on the speak request. Defaults to `None`, which preserves the existing behavior. See https://dpgr.am/deepgram-mip for pricing implications before enabling.
|
||||
1
changelog/4401.changed.md
Normal file
1
changelog/4401.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Changed the default model for `GrokRealtimeLLMService` to `grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is being removed.
|
||||
1
changelog/4401.fixed.md
Normal file
1
changelog/4401.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was stored in `Settings` but never sent to xAI, so every session silently fell back to xAI's server-side default. The model is now passed via the `?model=` query parameter on the WebSocket URL as xAI's Voice Agent API requires.
|
||||
1
changelog/4404.added.md
Normal file
1
changelog/4404.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that appends a developer-role message to the context whenever `LLMSetToolsFrame` changes the set of advertised standard tools. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable.
|
||||
1
changelog/4405.added.2.md
Normal file
1
changelog/4405.added.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.
|
||||
1
changelog/4405.added.3.md
Normal file
1
changelog/4405.added.3.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.
|
||||
1
changelog/4405.added.4.md
Normal file
1
changelog/4405.added.4.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.
|
||||
1
changelog/4405.added.5.md
Normal file
1
changelog/4405.added.5.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.
|
||||
1
changelog/4405.added.md
Normal file
1
changelog/4405.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.
|
||||
1
changelog/4405.deprecated.md
Normal file
1
changelog/4405.deprecated.md
Normal file
@@ -0,0 +1 @@
|
||||
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.
|
||||
1
changelog/4405.fixed.md
Normal file
1
changelog/4405.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.
|
||||
6
changelog/4407.added.md
Normal file
6
changelog/4407.added.md
Normal file
@@ -0,0 +1,6 @@
|
||||
- Added first-class RTVI support for the UI Agent Protocol:
|
||||
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server messages, plus `ui-command` and `ui-task` server-to-client messages, with paired `*Data` / `*Message` pydantic models.
|
||||
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`, `Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching default handlers live in `@pipecat-ai/client-react`.
|
||||
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`, and `ui-cancel-task` messages.
|
||||
- Adds five UI pipeline frames, mirroring the `client-message` frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` / `RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` / `UITaskMessage` envelopes, while the processor pushes inbound `RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame` alongside `on_ui_message`.
|
||||
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.
|
||||
1
changelog/4414.fixed.md
Normal file
1
changelog/4414.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting across two assistant messages in the LLM context and not surfacing in `on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted at the end of a TTS context now carries a PTS just past the last word so it can't overtake clock-queued `TTSTextFrame`s in the transport's output, and `LLMAssistantAggregator` now triggers `on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting transcripts).
|
||||
1
changelog/4415.fixed.md
Normal file
1
changelog/4415.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged words (e.g. `bookLook`) when using Flash models. Flash often splits sentences mid-stream into alignment chunks that begin with a real inter-word space, but the previous fix unconditionally stripped that space from every chunk. Leading spaces are now stripped only on the first alignment chunk of an utterance, so subsequent chunks correctly flush partial words across boundaries.
|
||||
1
changelog/4416.added.md
Normal file
1
changelog/4416.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor now resolve credentials via the standard boto3 provider chain (EC2 instance profiles, EKS pod roles / IRSA, ECS task roles, SSO, `~/.aws/credentials`) when explicit credentials and `AWS_*` environment variables are absent. Services running with IAM roles no longer need to export static credentials.
|
||||
1
changelog/4416.fixed.md
Normal file
1
changelog/4416.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` was set in the environment. The half-populated kwargs are no longer forwarded to aioboto3; partial env-var configurations now fall through to the boto3 credential chain like fully-unset configurations do.
|
||||
1
changelog/4417.security.md
Normal file
1
changelog/4417.security.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed a path traversal issue in the development runner's `/files/{filename:path}` download endpoint. Previously, when the runner was started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could escape the configured folder because `%2F`-encoded separators bypassed Starlette's path normalisation. The endpoint now resolves the joined path and rejects any filename that escapes the allowed base with a 403, and also returns 404 (instead of an implicit `null` 200) when `--folder` is unset.
|
||||
1
changelog/4422.changed.md
Normal file
1
changelog/4422.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to `inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`, `InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing users can pin the prior model explicitly via the `model`/`tts_model` argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid model IDs.
|
||||
1
changelog/4424.fixed.md
Normal file
1
changelog/4424.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing romanized/normalized text to the LLM context. With non-Latin input (e.g., Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao !` instead of `你好!`), which then degraded subsequent LLM turns. The services now consume `alignment` by default and only switch to `normalizedAlignment` / `normalized_alignment` when `pronunciation_dictionary_locators` is configured (where `alignment` has overlapping restarts that produce duplicated/garbled words, per #4316). Both fields are read with preferred-with-fallback semantics since each is nullable per the API schema.
|
||||
1
changelog/4426.added.md
Normal file
1
changelog/4426.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can bias transcription for both file-based and realtime transcription.
|
||||
1
changelog/4428.deprecated.md
Normal file
1
changelog/4428.deprecated.md
Normal file
@@ -0,0 +1 @@
|
||||
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the `create_file_resampler()` / `create_stream_resampler()` factories). Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class will be removed in Pipecat 2.0 along with the default `resampy` and `numba` dependencies.
|
||||
1
changelog/4429.changed.md
Normal file
1
changelog/4429.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Changed the default model for `GrokLLMService` from `grok-3` to `grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.
|
||||
1
changelog/4430.added.md
Normal file
1
changelog/4430.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.
|
||||
1
changelog/4430.changed.md
Normal file
1
changelog/4430.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.
|
||||
1
changelog/4431.fixed.md
Normal file
1
changelog/4431.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed a deadlock in `TTSService` that could permanently stall pipeline processing when all three conditions occurred together: `pause_frame_processing=True`, an interruption arrived before any TTS audio was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`, `FunctionCallResultFrame`) was in the processing queue at that moment. The process task would block on `__process_event.wait()` indefinitely because `BotStoppedSpeakingFrame` never arrives (no audio was played) and the interruption handler did not resume processing. Affects services using `pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and ResembleAI.
|
||||
1
changelog/4433.changed.md
Normal file
1
changelog/4433.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.
|
||||
1
changelog/4434.fixed.md
Normal file
1
changelog/4434.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed interruptions being delayed when a slow non-uninterruptible frame was processing and an uninterruptible frame was waiting in the queue. The bot would stall until the slow frame finished instead of cancelling it immediately on interruption.
|
||||
1
changelog/4435.fixed.md
Normal file
1
changelog/4435.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `TTSService` dropping uninterruptible frames (e.g. `FunctionCallResultFrame`) from its internal serialization queue when an interruption occurs. Previously, the queue was recreated on every interruption, silently discarding any queued frames. The queue is now reset instead of recreated, preserving uninterruptible frames so they are always delivered downstream.
|
||||
1
changelog/4440.fixed.md
Normal file
1
changelog/4440.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed a race condition in the Daily transport that caused `AttributeError: 'NoneType' object has no attribute 'send_app_message'` when tearing down a pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the same `DailyTransportClient` and both call `cleanup()`, which was releasing the underlying `CallClient` on the first call — leaving the second caller with a `None` client.
|
||||
1
changelog/4441.fixed.md
Normal file
1
changelog/4441.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService` and `OpenAIRealtimeLLMService`. These services previously honored the flag by simply not cancelling in-flight function calls on interruption; the introduction of the new async-tool mechanism (which threads started/intermediate/final messages through the LLM context) broke that path because the realtime services didn't know how to interpret those messages. Note that new-style streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are not supported on these realtime services. Similar fixes for other impacted realtime services are forthcoming.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `GET /status` endpoint to the development runner that reports which transports the running instance accepts (all by default, or the single transport passed via `-t`).
|
||||
@@ -1 +0,0 @@
|
||||
- Added plain WebSocket transport support to the development runner. Bots can now accept connections from non-telephony WebSocket clients (e.g., browser apps using protobuf framing) via the `/ws-client` endpoint alongside other transports.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ The development runner now supports all transports (WebRTC, Daily, telephony, plain WebSocket) simultaneously from a single server. The `/start` endpoint accepts a `"transport"` field to select the transport per-request; omitting `-t` at startup enables all transports instead of defaulting to WebRTC. The Daily browser-redirect route moved from `GET /` to `GET /daily`.
|
||||
1
changelog/4443.fixed.md
Normal file
1
changelog/4443.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed two misspelled Gemini TTS voice names in `GeminiTTSService.AVAILABLE_VOICES`.
|
||||
1
changelog/4446.change.md
Normal file
1
changelog/4446.change.md
Normal file
@@ -0,0 +1 @@
|
||||
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio encoding by default, which returns audio bytes without headers.
|
||||
1
changelog/4447.fixed.md
Normal file
1
changelog/4447.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.
|
||||
1
changelog/4448.added.md
Normal file
1
changelog/4448.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini 2.x); the conversation now continues while the tool runs. On models that don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time warning explaining the limitation. (Note: an intermittent 1008 error can occasionally fire on Gemini 2.5 during long-running tool calls; we auto-reconnect.)
|
||||
1
changelog/4449.changed.md
Normal file
1
changelog/4449.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Moved `create_task`, `cancel_task`, the `task_manager` property, and `setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom `BaseObject` subclasses (turn strategies, controllers, etc.) now inherit these methods directly instead of reimplementing the task manager wiring. Owners propagate the task manager to their child `BaseObject`s via `await child.setup(task_manager)`.
|
||||
1
changelog/4450.changed.md
Normal file
1
changelog/4450.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Changed the default OpenAI Realtime input audio transcription model from `gpt-4o-transcribe` to `gpt-realtime-whisper` for both `OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does not accept the `prompt` parameter; if a prompt is supplied alongside `gpt-realtime-whisper`, it is dropped automatically and a warning is logged. To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or `"gpt-4o-mini-transcribe"`).
|
||||
1
changelog/4462.changed.md
Normal file
1
changelog/4462.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Updated the default model for `CartesiaTTSService` and `CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.
|
||||
1
changelog/4464.added.2.md
Normal file
1
changelog/4464.added.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added NVIDIA Magpie TTS services via AWS SageMaker: `NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream with full interruption support via `InterruptibleTTSService`).
|
||||
1
changelog/4464.added.md
Normal file
1
changelog/4464.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint. Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is VAD-aware, and automatically reconnects on error.
|
||||
1
changelog/4465.fixed.md
Normal file
1
changelog/4465.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses (observed with `gpt-realtime-2`). A single response can now contain more than one audio item, and the first item's `audio.done` may arrive after the second item's deltas have started. Deltas still arrive strictly in playback order, so we continue to forward them as received (matching OpenAI's reference implementation). The fix removes spurious warnings, ensures truncation always targets the latest audio item, and emits a single bracketing `TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is now pushed on `response.done`).
|
||||
1
changelog/4470.added.md
Normal file
1
changelog/4470.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.
|
||||
1
changelog/4472.changed.md
Normal file
1
changelog/4472.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Changed the default model for `OpenAIRealtimeLLMService` from `gpt-realtime-1.5` to `gpt-realtime-2`.
|
||||
1
changelog/4480.added.md
Normal file
1
changelog/4480.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `wait_for_transcript_to_end_user_turn` on `LLMUserAggregatorParams` for pipelines where local turn detection drives a realtime service like Gemini Live. Set it to False to avoid unnecessary latency from transcript delay — the realtime service consumes user audio directly, so we don't need user transcripts in context before it can respond. The option makes it so that (1) turn strategies do not consider user transcripts, letting the user turn end sooner, and (2) user transcripts are then handled by the aggregator: a simple timer gives it time to gather those transcripts after the user turn ends, and once gathered, the aggregator emits a new `on_user_turn_message_finalized` event with the new user context message. The new event also fires in the default mode (coinciding with `on_user_turn_stopped`), so consumers that want the populated user transcript can subscribe to it uniformly. See `examples/realtime/realtime-gemini-live-local-vad.py` for the full pattern.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `ElevenLabsSTTService` crashing when `language` was passed as `None`. When `language` is not set, the service now lets ElevenLabs auto-detect the audio language.
|
||||
@@ -1 +0,0 @@
|
||||
- OpenRouter LLM requests now convert `developer` messages to `user` messages by default for broader model compatibility. Override this by subclassing `OpenRouterLLMService` or setting `llm.supports_developer_role = True` for models that support the `developer` role.
|
||||
@@ -1 +0,0 @@
|
||||
- OpenRouter LLM service now defaults to `openai/gpt-4.1`.
|
||||
@@ -133,9 +133,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -142,9 +142,6 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -143,9 +143,6 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -134,9 +134,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -131,9 +131,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -144,9 +144,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -149,9 +149,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -135,9 +135,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -149,9 +149,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -187,9 +187,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -68,9 +68,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.environ["OPENAI_API_KEY"],
|
||||
settings=OpenAITTSService.Settings(
|
||||
instructions="Please speak clearly and at a moderate pace.",
|
||||
voice="ballad",
|
||||
),
|
||||
instructions="Please speak clearly and at a moderate pace.",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
@@ -148,9 +148,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -76,6 +76,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
llm = OpenRouterLLMService(
|
||||
api_key=os.environ["OPENROUTER_API_KEY"],
|
||||
settings=OpenRouterLLMService.Settings(
|
||||
model="openai/gpt-4o-2024-11-20",
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
@@ -135,9 +136,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -71,8 +71,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
llm = QwenLLMService(
|
||||
api_key=os.environ["QWEN_API_KEY"],
|
||||
model="qwen2.5-72b-instruct",
|
||||
settings=QwenLLMService.Settings(
|
||||
model="qwen2.5-72b-instruct",
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
@@ -134,9 +134,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -133,9 +133,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
|
||||
@@ -20,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
UserMessageFinalizedMessage,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
@@ -70,10 +71,25 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
},
|
||||
],
|
||||
)
|
||||
# `wait_for_transcript_to_end_user_turn=False` is the right setting
|
||||
# for pipelines like this one — local turn detection driving a
|
||||
# realtime service. It avoids unnecessary latency from transcript
|
||||
# delay: the realtime service consumes user audio directly, so
|
||||
# we don't need user transcripts in context before it can respond.
|
||||
# With this option:
|
||||
#
|
||||
# - Turn strategies do not consider user transcripts, so the user
|
||||
# turn ends sooner.
|
||||
# - User transcripts are handled by the aggregator: a simple
|
||||
# post-turn transcript wait gives them time to arrive after the
|
||||
# user turn ends, then the aggregator emits
|
||||
# `on_user_turn_message_finalized` with the new user context
|
||||
# message.
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -107,8 +123,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# `on_user_turn_stopped` fires at the end of the user turn. With
|
||||
# `wait_for_transcript_to_end_user_turn=False`, no user
|
||||
# transcripts have arrived yet at this point, so
|
||||
# `message.content` is empty. Logged here to make the end-of-turn
|
||||
# signal visible alongside the later finalization event.
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
|
||||
|
||||
# `on_user_turn_message_finalized` fires when the user message has
|
||||
# been finalized into the context. Here it fires later than
|
||||
# `on_user_turn_stopped`, after the aggregator's post-turn
|
||||
# transcript wait completes.
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_user_turn_message_finalized(
|
||||
aggregator, strategy, message: UserMessageFinalizedMessage
|
||||
):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}user: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
@@ -28,14 +28,10 @@ Usage:
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.observers.loggers.transcription_log_observer import (
|
||||
TranscriptionLogObserver,
|
||||
@@ -52,7 +48,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
@@ -60,43 +55,6 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = (
|
||||
random.randint(60, 85)
|
||||
if params.arguments["format"] == "fahrenheit"
|
||||
else random.randint(15, 30)
|
||||
)
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"location": params.arguments["location"],
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
tools = ToolsSchema(standard_tools=[weather_function])
|
||||
|
||||
|
||||
# --- Transport Configuration ---
|
||||
|
||||
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
|
||||
@@ -127,7 +85,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# See: https://docs.inworld.ai/router/introduction
|
||||
llm = InworldRealtimeLLMService(
|
||||
api_key=os.environ["INWORLD_API_KEY"],
|
||||
llm_model="openai/gpt-4.1-mini",
|
||||
llm_model="xai/grok-4-1-fast-non-reasoning",
|
||||
voice="Sarah",
|
||||
settings=InworldRealtimeLLMService.Settings(
|
||||
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
|
||||
@@ -139,14 +97,9 @@ Always be helpful and proactive in offering assistance.""",
|
||||
),
|
||||
)
|
||||
|
||||
# Note: function calling requires a paid Inworld account and a
|
||||
# function-calling-capable model
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
# Create context with initial message + tools
|
||||
# Create context with initial message
|
||||
context = LLMContext(
|
||||
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
182
examples/realtime/realtime-openai-local-vad.py
Normal file
182
examples/realtime/realtime-openai-local-vad.py
Normal file
@@ -0,0 +1,182 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
UserMessageFinalizedMessage,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.realtime.events import (
|
||||
AudioConfiguration,
|
||||
AudioInput,
|
||||
InputAudioTranscription,
|
||||
SessionProperties,
|
||||
)
|
||||
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# `turn_detection=False` disables OpenAI Realtime's server-side VAD,
|
||||
# so this pipeline's local turn detection drives turn boundaries.
|
||||
# The service then sends `input_audio_buffer.commit` +
|
||||
# `response.create` when it sees `UserStoppedSpeakingFrame`.
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.environ["OPENAI_API_KEY"],
|
||||
settings=OpenAIRealtimeLLMService.Settings(
|
||||
session_properties=SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
turn_detection=False,
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
context = LLMContext(
|
||||
[
|
||||
{
|
||||
"role": "developer",
|
||||
"content": "Say hello. Then ask if I want to hear a joke.",
|
||||
},
|
||||
],
|
||||
)
|
||||
# `wait_for_transcript_to_end_user_turn=False` is the right setting
|
||||
# for pipelines like this one — local turn detection driving a
|
||||
# realtime service. It avoids unnecessary latency from transcript
|
||||
# delay: the realtime service consumes user audio directly, so
|
||||
# we don't need user transcripts in context before it can respond.
|
||||
# With this option:
|
||||
#
|
||||
# - Turn strategies do not consider user transcripts, so the user
|
||||
# turn ends sooner.
|
||||
# - User transcripts are handled by the aggregator: a simple
|
||||
# post-turn transcript wait gives them time to arrive after the
|
||||
# user turn ends, then the aggregator emits
|
||||
# `on_user_turn_message_finalized` with the new user context
|
||||
# message.
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
user_aggregator,
|
||||
llm,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# `on_user_turn_stopped` fires at the end of the user turn. With
|
||||
# `wait_for_transcript_to_end_user_turn=False`, no user
|
||||
# transcripts have arrived yet at this point, so
|
||||
# `message.content` is empty. Logged here to make the end-of-turn
|
||||
# signal visible alongside the later finalization event.
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
|
||||
|
||||
# `on_user_turn_message_finalized` fires when the user message has
|
||||
# been finalized into the context. Here it fires later than
|
||||
# `on_user_turn_stopped`, after the aggregator's post-turn
|
||||
# transcript wait completes.
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_user_turn_message_finalized(
|
||||
aggregator, strategy, message: UserMessageFinalizedMessage
|
||||
):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}user: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
|
||||
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}assistant: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -51,6 +51,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
stt = GradiumSTTService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
api_endpoint_base_url="wss://us.api.gradium.ai/api/speech/asr",
|
||||
settings=GradiumSTTService.Settings(
|
||||
language=Language.EN,
|
||||
delay_in_frames=8,
|
||||
|
||||
@@ -1,201 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example 22: Filter Incomplete Turns
|
||||
|
||||
Demonstrates LLM-based turn completion detection to suppress bot responses when
|
||||
the user was cut off mid-thought. The LLM outputs one of three markers:
|
||||
- ✓ (complete): User finished their thought, respond normally
|
||||
- ○ (incomplete short): User was cut off, wait ~5s then prompt
|
||||
- ◐ (incomplete long): User needs time to think, wait ~10s then prompt
|
||||
|
||||
When incomplete is detected, the bot's response is suppressed. After the timeout
|
||||
expires, the LLM is automatically prompted to re-engage the user.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def get_weather(params: FunctionCallParams, location: str):
|
||||
"""Return the current weather for a location.
|
||||
|
||||
A stub that always reports the same conditions — replace with a real
|
||||
weather API in production.
|
||||
|
||||
Args:
|
||||
location (str): The city and state or country, e.g. "Paris, France".
|
||||
"""
|
||||
await params.result_callback(
|
||||
{
|
||||
"location": location,
|
||||
"temperature_celsius": 22,
|
||||
"conditions": "partly cloudy",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.environ["OPENAI_API_KEY"],
|
||||
settings=OpenAILLMService.Settings(
|
||||
system_instruction=(
|
||||
"You are a helpful assistant in a voice conversation. Your "
|
||||
"responses will be spoken aloud, so avoid emojis, bullet "
|
||||
"points, or other formatting that can't be spoken. Respond to "
|
||||
"what the user said in a creative, helpful, and brief way. "
|
||||
"If the user asks about the weather, call the get_weather "
|
||||
"tool and speak the result back naturally."
|
||||
),
|
||||
),
|
||||
)
|
||||
llm.register_direct_function(get_weather)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.environ["CARTESIA_API_KEY"],
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
context = LLMContext(tools=ToolsSchema(standard_tools=[get_weather]))
|
||||
# `FilterIncompleteUserTurnStrategies` pairs the default detector
|
||||
# chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors
|
||||
# trigger LLM inference but the public `on_user_turn_stopped` event
|
||||
# fires only when the LLM confirms ✓. The LLM marks each response
|
||||
# with one of:
|
||||
# ✓ = complete (respond normally)
|
||||
# ○ = incomplete short (wait 5s, then prompt)
|
||||
# ◐ = incomplete long (wait 10s, then prompt)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
user_turn_strategies=FilterIncompleteUserTurnStrategies(
|
||||
# Optional: customize turn completion behavior
|
||||
# config=UserTurnCompletionConfig(
|
||||
# incomplete_short_timeout=5.0,
|
||||
# incomplete_long_timeout=10.0,
|
||||
# incomplete_short_prompt="Custom prompt...",
|
||||
# incomplete_long_prompt="Custom prompt...",
|
||||
# instructions="Custom turn completion instructions...",
|
||||
# ),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}user: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
|
||||
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}assistant: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -50,7 +50,10 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = GradiumSTTService(api_key=os.environ["GRADIUM_API_KEY"])
|
||||
stt = GradiumSTTService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
api_endpoint_base_url="wss://us.api.gradium.ai/api/speech/asr",
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.environ["CARTESIA_API_KEY"],
|
||||
|
||||
@@ -55,6 +55,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
tts = GradiumTTSService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
settings=GradiumTTSService.Settings(voice="YTpq7expH9539ERJ"),
|
||||
url="wss://us.api.gradium.ai/api/speech/tts",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
|
||||
@@ -54,6 +54,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
stt = GradiumSTTService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
api_endpoint_base_url="wss://us.api.gradium.ai/api/speech/asr",
|
||||
settings=GradiumSTTService.Settings(
|
||||
language=Language.EN,
|
||||
),
|
||||
@@ -61,6 +62,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
tts = GradiumTTSService(
|
||||
api_key=os.environ["GRADIUM_API_KEY"],
|
||||
url="wss://us.api.gradium.ai/api/speech/tts",
|
||||
settings=GradiumTTSService.Settings(
|
||||
voice="YTpq7expH9539ERJ",
|
||||
),
|
||||
|
||||
@@ -58,7 +58,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Add strict mode to enforce the language hints
|
||||
language_hints=[Language.EN],
|
||||
language_hints_strict=True,
|
||||
enable_language_identification=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -103,7 +103,7 @@ piper = [ "piper-tts>=1.3.0,<2", "requests>=2.32.5,<3" ]
|
||||
qwen = []
|
||||
resembleai = [ "pipecat-ai[websockets-base]" ]
|
||||
rime = [ "pipecat-ai[websockets-base]" ]
|
||||
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-prebuilt>=1.0.0"]
|
||||
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-small-webrtc-prebuilt>=2.5.0"]
|
||||
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
|
||||
sambanova = []
|
||||
sarvam = [ "sarvamai==0.1.28", "pipecat-ai[websockets-base]" ]
|
||||
|
||||
@@ -242,7 +242,6 @@ TESTS_VIDEO_AVATAR = [
|
||||
|
||||
TESTS_TURN_MANAGEMENT = [
|
||||
("turn-management/turn-management-filter-incomplete-turns.py", EVAL_COMPLETE_TURN),
|
||||
("turn-management/turn-management-filter-incomplete-turns-function-calling.py", EVAL_WEATHER),
|
||||
]
|
||||
|
||||
TESTS_THINKING = [
|
||||
|
||||
@@ -55,6 +55,7 @@ from pipecat.frames.frames import (
|
||||
LLMThoughtStartFrame,
|
||||
LLMThoughtTextFrame,
|
||||
StartFrame,
|
||||
STTMetadataFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TranslationFrame,
|
||||
@@ -80,6 +81,7 @@ from pipecat.processors.aggregators.llm_context_summarizer import (
|
||||
SummaryAppliedEvent,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
|
||||
from pipecat.turns.user_idle_controller import UserIdleController
|
||||
from pipecat.turns.user_mute import BaseUserMuteStrategy
|
||||
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
@@ -127,6 +129,25 @@ class LLMUserAggregatorParams:
|
||||
has been idle (not speaking) for this duration. Set to 0 to disable
|
||||
idle detection.
|
||||
vad_analyzer: Voice Activity Detection analyzer instance.
|
||||
wait_for_transcript_to_end_user_turn: Defaults to True. Set to
|
||||
False for pipelines where local turn detection drives a
|
||||
realtime service like Gemini Live. The realtime service
|
||||
consumes user audio directly, so we don't need user
|
||||
transcripts in context before it can respond, and waiting
|
||||
for them is pure latency. When False:
|
||||
|
||||
- Turn strategies do not consider user transcripts, so the
|
||||
user turn ends sooner. ``on_user_turn_stopped`` fires at
|
||||
the end of turn with empty content. To achieve this,
|
||||
the aggregator drops ``TranscriptionUserTurnStartStrategy``
|
||||
from start strategies and flips
|
||||
``wait_for_transcript=False`` on any stop strategy that
|
||||
supports it.
|
||||
- User transcripts are handled by the aggregator: a simple
|
||||
post-turn transcript wait gives it time to receive them
|
||||
after the user turn ends, then the aggregator emits a
|
||||
new ``on_user_turn_message_finalized`` event with the
|
||||
new user context message.
|
||||
filter_incomplete_user_turns: [DEPRECATED] Use
|
||||
``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
|
||||
instead. When enabled, the LLM outputs a turn-completion
|
||||
@@ -157,6 +178,7 @@ class LLMUserAggregatorParams:
|
||||
user_turn_stop_timeout: float = 5.0
|
||||
user_idle_timeout: float = 0
|
||||
vad_analyzer: VADAnalyzer | None = None
|
||||
wait_for_transcript_to_end_user_turn: bool = True
|
||||
filter_incomplete_user_turns: bool = False
|
||||
user_turn_completion_config: UserTurnCompletionConfig | None = None
|
||||
|
||||
@@ -259,13 +281,43 @@ class LLMAssistantAggregatorParams:
|
||||
|
||||
@dataclass
|
||||
class UserTurnStoppedMessage:
|
||||
"""A user turn stopped message containing a user transcript update.
|
||||
"""A message accompanying ``on_user_turn_stopped`` (end of user turn).
|
||||
|
||||
A message in a conversation transcript containing the user content. This is
|
||||
the aggregated transcript that is then used in the context.
|
||||
With ``wait_for_transcript_to_end_user_turn=True`` (the default),
|
||||
the user message is finalized at the end of the turn, so
|
||||
``content`` carries the aggregated transcript. With it set to
|
||||
False, the aggregator is still in its post-turn transcript wait
|
||||
at this point, so ``content`` is ``None`` — subscribe to
|
||||
``on_user_turn_message_finalized`` for the assembled message.
|
||||
|
||||
Parameters:
|
||||
content: The message content/text.
|
||||
content: The aggregated user transcript, or ``None`` when
|
||||
``wait_for_transcript_to_end_user_turn=False`` (the
|
||||
aggregator is still in its post-turn transcript wait at
|
||||
this point).
|
||||
timestamp: When the user turn started.
|
||||
user_id: Optional identifier for the user.
|
||||
|
||||
"""
|
||||
|
||||
content: str | None
|
||||
timestamp: str
|
||||
user_id: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserMessageFinalizedMessage:
|
||||
"""A message accompanying ``on_user_turn_message_finalized``.
|
||||
|
||||
Fired when the user message has been finalized into the context.
|
||||
With ``wait_for_transcript_to_end_user_turn=True`` (the default)
|
||||
this coincides with ``on_user_turn_stopped``. With it set to
|
||||
False, the aggregator first runs a post-turn transcript wait, so
|
||||
this event fires later than ``on_user_turn_stopped``.
|
||||
``content`` is always populated.
|
||||
|
||||
Parameters:
|
||||
content: The aggregated user transcript.
|
||||
timestamp: When the user turn started.
|
||||
user_id: Optional identifier for the user.
|
||||
|
||||
@@ -526,8 +578,21 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
Event handlers available:
|
||||
|
||||
- on_user_turn_started: Called when the user turn starts
|
||||
- on_user_turn_stopped: Called when the user turn ends
|
||||
- on_user_turn_started: Called when the user turn starts.
|
||||
- on_user_turn_stopped: Called at the end of turn, with a
|
||||
``UserTurnStoppedMessage``. With
|
||||
``wait_for_transcript_to_end_user_turn=True`` (the default),
|
||||
``message.content`` carries the aggregated transcript. With it
|
||||
set to False, the aggregator is still in its post-turn transcript
|
||||
wait at this point, so ``message.content`` is ``None``; subscribe
|
||||
to ``on_user_turn_message_finalized`` for the assembled message.
|
||||
- on_user_turn_message_finalized: Called when the user message
|
||||
has been finalized into the context, with a
|
||||
``UserMessageFinalizedMessage``. With
|
||||
``wait_for_transcript_to_end_user_turn=True`` this coincides
|
||||
with ``on_user_turn_stopped``; with it set to False it fires
|
||||
later, after the aggregator's post-turn transcript wait window
|
||||
completes. ``message.content`` is always populated.
|
||||
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
|
||||
- on_user_turn_idle: Called when the user has been idle for the configured timeout
|
||||
- on_user_mute_started: Called when the user becomes muted
|
||||
@@ -543,6 +608,10 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
async def on_user_turn_stopped(aggregator, strategy: BaseUserTurnStopStrategy, message: UserTurnStoppedMessage):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_user_turn_message_finalized(aggregator, strategy: BaseUserTurnStopStrategy, message: UserMessageFinalizedMessage):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_turn_stop_timeout")
|
||||
async def on_user_turn_stop_timeout(aggregator):
|
||||
...
|
||||
@@ -586,12 +655,14 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
self._register_event_handler("on_user_turn_started")
|
||||
self._register_event_handler("on_user_turn_stopped")
|
||||
self._register_event_handler("on_user_turn_message_finalized")
|
||||
self._register_event_handler("on_user_turn_stop_timeout")
|
||||
self._register_event_handler("on_user_turn_idle")
|
||||
self._register_event_handler("on_user_turn_inference_triggered")
|
||||
self._register_event_handler("on_user_mute_started")
|
||||
self._register_event_handler("on_user_mute_stopped")
|
||||
|
||||
user_provided_strategies = self._params.user_turn_strategies is not None
|
||||
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
|
||||
|
||||
# Deprecated path: translate filter_incomplete_user_turns into
|
||||
@@ -605,6 +676,17 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
)
|
||||
self._params.user_turn_strategies = user_turn_strategies
|
||||
|
||||
# When `wait_for_transcript_to_end_user_turn=False`, mutate the
|
||||
# user turn strategies so they don't consider user transcripts:
|
||||
# drop the transcription start strategy, flip
|
||||
# `wait_for_transcript=False` on stop strategies that support
|
||||
# it. Loud log if the user passed their own strategies (we're
|
||||
# overwriting parts of their config); quiet log otherwise.
|
||||
if not self._params.wait_for_transcript_to_end_user_turn:
|
||||
self._apply_no_transcript_wait_bundle(
|
||||
user_turn_strategies, user_provided_strategies=user_provided_strategies
|
||||
)
|
||||
|
||||
self._user_is_muted = False
|
||||
self._user_turn_start_timestamp = ""
|
||||
# Full transcript across the user turn. Each
|
||||
@@ -616,6 +698,20 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
# inferences fire before finalization.
|
||||
self._full_user_turn_aggregation: str | None = None
|
||||
|
||||
# Post-turn transcript wait state, used when the aggregator
|
||||
# waits for transcripts after the user turn ends
|
||||
# (`_wait_for_post_turn_transcripts == True`):
|
||||
# `on_user_turn_stopped` has fired with empty content, and the
|
||||
# aggregator is waiting on `_post_turn_transcript_wait_task`
|
||||
# before finalizing the user message into context. The wait
|
||||
# window duration is taken from the last `STTMetadataFrame`
|
||||
# seen (`STTMetadataFrame.ttfs_p99_latency`), falling back to
|
||||
# `DEFAULT_TTFS_P99` if no STT service has reported one.
|
||||
self._post_turn_transcript_wait_strategy: BaseUserTurnStopStrategy | None = None
|
||||
self._inference_during_post_turn_transcript_wait: bool = False
|
||||
self._post_turn_transcript_wait_task: asyncio.Task | None = None
|
||||
self._stt_ttfs_p99_latency: float | None = None
|
||||
|
||||
self._user_turn_controller = UserTurnController(
|
||||
user_turn_strategies=user_turn_strategies,
|
||||
user_turn_stop_timeout=self._params.user_turn_stop_timeout,
|
||||
@@ -658,6 +754,81 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._vad_controller.add_event_handler("on_push_frame", self._on_push_frame)
|
||||
self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
|
||||
|
||||
@property
|
||||
def _wait_for_post_turn_transcripts(self) -> bool:
|
||||
"""True when the aggregator runs a post-turn transcript wait.
|
||||
|
||||
Inverse of the public ``wait_for_transcript_to_end_user_turn``
|
||||
param: when that's False, this is True. In this mode, turn
|
||||
strategies don't consider user transcripts (so the user turn
|
||||
ends sooner), and the aggregator runs a simple timer after the
|
||||
end of turn to receive any transcripts that arrive, then emits
|
||||
``on_user_turn_message_finalized`` with the assembled user
|
||||
context message. Always travels with the strategy-mutation
|
||||
bundle applied at init.
|
||||
"""
|
||||
return not self._params.wait_for_transcript_to_end_user_turn
|
||||
|
||||
def _apply_no_transcript_wait_bundle(
|
||||
self,
|
||||
user_turn_strategies: UserTurnStrategies,
|
||||
*,
|
||||
user_provided_strategies: bool,
|
||||
):
|
||||
"""Adjust strategies to match ``wait_for_transcript_to_end_user_turn=False``.
|
||||
|
||||
Mutates the user turn strategies so they don't consider user
|
||||
transcripts: drops ``TranscriptionUserTurnStartStrategy`` from
|
||||
start strategies (so late-arriving transcripts don't start
|
||||
new turns), and sets ``wait_for_transcript=False`` on any
|
||||
stop strategy that supports it. The net effect: the user turn
|
||||
ends sooner.
|
||||
|
||||
Logs loudly when adjusting user-provided strategies — we're
|
||||
mutating objects the caller passed in. Logs quietly when only
|
||||
synthesized defaults are in play.
|
||||
"""
|
||||
# Local import to avoid a top-level cycle with `turns.user_start`.
|
||||
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy
|
||||
|
||||
adjustments: list[str] = []
|
||||
|
||||
if user_turn_strategies.start:
|
||||
filtered = [
|
||||
s
|
||||
for s in user_turn_strategies.start
|
||||
if not isinstance(s, TranscriptionUserTurnStartStrategy)
|
||||
]
|
||||
dropped = len(user_turn_strategies.start) - len(filtered)
|
||||
if dropped:
|
||||
user_turn_strategies.start = filtered
|
||||
adjustments.append(
|
||||
f"dropped {dropped} TranscriptionUserTurnStartStrategy from start strategies"
|
||||
)
|
||||
|
||||
flipped = 0
|
||||
for s in user_turn_strategies.stop or []:
|
||||
if hasattr(s, "_wait_for_transcript") and s._wait_for_transcript:
|
||||
s._wait_for_transcript = False
|
||||
flipped += 1
|
||||
if flipped:
|
||||
adjustments.append(
|
||||
f"set wait_for_transcript=False on {flipped} stop "
|
||||
f"strateg{'y' if flipped == 1 else 'ies'}"
|
||||
)
|
||||
|
||||
if not adjustments:
|
||||
return
|
||||
|
||||
message = (
|
||||
f"{self}: wait_for_transcript_to_end_user_turn=False adjusted "
|
||||
f"user turn strategies: {'; '.join(adjustments)}."
|
||||
)
|
||||
if user_provided_strategies:
|
||||
logger.warning(message)
|
||||
else:
|
||||
logger.info(message)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up processor resources."""
|
||||
await super().cleanup()
|
||||
@@ -697,6 +868,13 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
# Interim transcriptions and translations are consumed here
|
||||
# and not pushed downstream, same as final TranscriptionFrame.
|
||||
pass
|
||||
elif isinstance(frame, STTMetadataFrame):
|
||||
# Record the STT service's reported P99 TTFS so the
|
||||
# post-turn transcript wait timer can size itself to the real
|
||||
# latency. Frame is also pushed downstream so other
|
||||
# processors keep seeing it.
|
||||
self._stt_ttfs_p99_latency = frame.ttfs_p99_latency
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, LLMRunFrame):
|
||||
await self._handle_llm_run(frame)
|
||||
elif isinstance(frame, LLMMessagesAppendFrame):
|
||||
@@ -747,13 +925,31 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await s.setup(self.task_manager)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
await self._maybe_emit_user_turn_stopped(on_session_end=True)
|
||||
await self._finalize_on_session_end()
|
||||
await self._cleanup()
|
||||
|
||||
async def _cancel(self, frame: CancelFrame):
|
||||
await self._maybe_emit_user_turn_stopped(on_session_end=True)
|
||||
await self._finalize_on_session_end()
|
||||
await self._cleanup()
|
||||
|
||||
async def _finalize_on_session_end(self):
|
||||
"""Flush any pending user message on session end.
|
||||
|
||||
If a post-turn transcript wait is in flight, complete it now so
|
||||
the user message is captured before the session shuts down.
|
||||
Otherwise, run the mode-appropriate finalize path on whatever
|
||||
is currently in the buffer.
|
||||
"""
|
||||
if (
|
||||
self._post_turn_transcript_wait_strategy is not None
|
||||
or self._inference_during_post_turn_transcript_wait
|
||||
):
|
||||
await self._complete_post_turn_transcript_wait(on_session_end=True)
|
||||
elif self._wait_for_post_turn_transcripts:
|
||||
await self._finalize_user_message(on_session_end=True)
|
||||
else:
|
||||
await self._finalize_user_turn(on_session_end=True)
|
||||
|
||||
async def _cleanup(self):
|
||||
if self._vad_controller:
|
||||
await self._vad_controller.cleanup()
|
||||
@@ -884,6 +1080,21 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
|
||||
|
||||
# Precondition guard: if the previous turn's post-turn
|
||||
# transcript wait is still active when the next turn starts,
|
||||
# the assumption that transcripts arrive before the next turn
|
||||
# has been violated. Complete the previous turn's wait now so
|
||||
# its user message is finalized before this turn proceeds.
|
||||
if (
|
||||
self._post_turn_transcript_wait_strategy is not None
|
||||
or self._inference_during_post_turn_transcript_wait
|
||||
):
|
||||
logger.warning(
|
||||
f"{self}: user turn started before previous turn's transcripts "
|
||||
f"arrived; flushing previous turn now"
|
||||
)
|
||||
await self._complete_post_turn_transcript_wait()
|
||||
|
||||
self._user_turn_start_timestamp = time_now_iso8601()
|
||||
self._full_user_turn_aggregation = None
|
||||
|
||||
@@ -904,6 +1115,14 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
|
||||
|
||||
if self._wait_for_post_turn_transcripts:
|
||||
# The aggregator is in its post-turn transcript wait.
|
||||
# Defer push_aggregation and event emission; they'll run
|
||||
# alongside user message finalization when the wait window
|
||||
# completes.
|
||||
self._inference_during_post_turn_transcript_wait = True
|
||||
return
|
||||
|
||||
# Push aggregation now: this writes the user message segment to
|
||||
# the context and emits LLMContextFrame, which kicks LLM
|
||||
# inference. Concatenate the segment into
|
||||
@@ -929,42 +1148,144 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
logger.debug(f"{self}: User stopped speaking (strategy: {strategy})")
|
||||
|
||||
# End-of-turn side effects always fire on the strategy event,
|
||||
# regardless of whether user message finalization is deferred
|
||||
# to a post-turn transcript wait window.
|
||||
if params.enable_user_speaking_frames:
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
|
||||
|
||||
await self._maybe_emit_user_turn_stopped(strategy)
|
||||
if self._wait_for_post_turn_transcripts:
|
||||
# Fire `on_user_turn_stopped` now for the end of turn —
|
||||
# content is `None` because no transcripts have arrived
|
||||
# yet. Start the post-turn transcript wait timer; when it
|
||||
# completes, the aggregator finalizes the user message and
|
||||
# emits `on_user_turn_message_finalized`. Consumers wanting
|
||||
# the assembled message subscribe to
|
||||
# `on_user_turn_message_finalized`.
|
||||
end_of_turn_message = UserTurnStoppedMessage(
|
||||
content=None, timestamp=self._user_turn_start_timestamp
|
||||
)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, end_of_turn_message)
|
||||
|
||||
self._post_turn_transcript_wait_strategy = strategy
|
||||
wait_timeout = (
|
||||
self._stt_ttfs_p99_latency
|
||||
if self._stt_ttfs_p99_latency is not None
|
||||
else DEFAULT_TTFS_P99
|
||||
)
|
||||
self._post_turn_transcript_wait_task = self.create_task(
|
||||
self._post_turn_transcript_wait_handler(wait_timeout),
|
||||
f"{self}::post_turn_transcript_wait",
|
||||
)
|
||||
return
|
||||
|
||||
await self._finalize_user_turn(strategy)
|
||||
|
||||
async def _post_turn_transcript_wait_handler(self, timeout: float):
|
||||
"""Post-turn transcript wait timer.
|
||||
|
||||
Waits ``timeout`` seconds — giving transcripts time to arrive
|
||||
after the end of turn — then completes the wait and finalizes
|
||||
the user message into context, with whatever transcripts the
|
||||
aggregator has received by then (possibly none).
|
||||
|
||||
The simple-timer approach relies on the assumptions that
|
||||
transcripts don't arrive too late and that the assistant
|
||||
response won't finish before this timer.
|
||||
|
||||
Cancelled by reset, the next-turn precondition guard, or
|
||||
session end.
|
||||
"""
|
||||
try:
|
||||
await asyncio.sleep(timeout)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
finally:
|
||||
self._post_turn_transcript_wait_task = None
|
||||
|
||||
await self._complete_post_turn_transcript_wait()
|
||||
|
||||
async def _complete_post_turn_transcript_wait(self, *, on_session_end: bool = False):
|
||||
"""Complete the active post-turn transcript wait window.
|
||||
|
||||
``on_user_turn_stopped`` already fired at the end of turn (with
|
||||
empty content) and the aggregator has been receiving
|
||||
transcripts since. This finalizes that work: flushes any
|
||||
inference-triggered segment whose push was deferred during the
|
||||
wait, then emits ``on_user_turn_message_finalized`` with the
|
||||
assembled user context message. Called from the post-turn
|
||||
transcript wait timer (the normal path), the precondition guard
|
||||
in ``_on_user_turn_started``, and the session-end paths.
|
||||
"""
|
||||
if self._post_turn_transcript_wait_task:
|
||||
await self.cancel_task(self._post_turn_transcript_wait_task)
|
||||
self._post_turn_transcript_wait_task = None
|
||||
|
||||
wait_strategy = self._post_turn_transcript_wait_strategy
|
||||
had_pending_inference = self._inference_during_post_turn_transcript_wait
|
||||
self._post_turn_transcript_wait_strategy = None
|
||||
self._inference_during_post_turn_transcript_wait = False
|
||||
|
||||
if had_pending_inference:
|
||||
segment = await self.push_aggregation()
|
||||
if segment:
|
||||
if self._full_user_turn_aggregation:
|
||||
self._full_user_turn_aggregation = (
|
||||
f"{self._full_user_turn_aggregation} {segment}".strip()
|
||||
)
|
||||
else:
|
||||
self._full_user_turn_aggregation = segment
|
||||
await self._call_event_handler("on_user_turn_inference_triggered", wait_strategy)
|
||||
|
||||
if wait_strategy is not None or on_session_end:
|
||||
# `on_user_turn_stopped` already fired at the end of turn;
|
||||
# this is the deferred user message finalization.
|
||||
await self._finalize_user_message(wait_strategy, on_session_end=on_session_end)
|
||||
|
||||
async def _on_reset_aggregation(
|
||||
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
|
||||
):
|
||||
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
|
||||
await self._cancel_post_turn_transcript_wait()
|
||||
await self.reset()
|
||||
|
||||
async def _cancel_post_turn_transcript_wait(self):
|
||||
"""Cancel any active post-turn transcript wait window without finalizing.
|
||||
|
||||
Called from reset paths (interruption, explicit reset).
|
||||
"Reset" means "throw it away" — we don't flush a partial
|
||||
transcript that was about to be invalidated anyway.
|
||||
"""
|
||||
if self._post_turn_transcript_wait_task:
|
||||
await self.cancel_task(self._post_turn_transcript_wait_task)
|
||||
self._post_turn_transcript_wait_task = None
|
||||
self._post_turn_transcript_wait_strategy = None
|
||||
self._inference_during_post_turn_transcript_wait = False
|
||||
|
||||
async def _on_user_turn_stop_timeout(self, controller):
|
||||
await self._call_event_handler("on_user_turn_stop_timeout")
|
||||
|
||||
async def _on_user_turn_idle(self, controller):
|
||||
await self._call_event_handler("on_user_turn_idle")
|
||||
|
||||
async def _maybe_emit_user_turn_stopped(
|
||||
self,
|
||||
strategy: BaseUserTurnStopStrategy | None = None,
|
||||
on_session_end: bool = False,
|
||||
):
|
||||
"""Maybe emit user turn stopped event.
|
||||
async def _flush_user_message_to_context(
|
||||
self, on_session_end: bool = False
|
||||
) -> tuple[str, str] | None:
|
||||
"""Push the aggregated user message to context, return ``(content, timestamp)``.
|
||||
|
||||
Earlier inference triggers in the same turn have already pushed
|
||||
their segments to the context and accumulated them into
|
||||
``self._full_user_turn_aggregation``. Any aggregation that
|
||||
arrived after the last inference trigger is flushed here so
|
||||
end-of-turn content is never lost from the public event.
|
||||
Earlier inference triggers in the same turn already pushed their
|
||||
segments to the context and accumulated them in
|
||||
``self._full_user_turn_aggregation``; whatever arrived after the
|
||||
last inference trigger is flushed here so end-of-turn content is
|
||||
never lost.
|
||||
|
||||
Args:
|
||||
strategy: The strategy that triggered the turn stop.
|
||||
on_session_end: If True, only emit if there's unemitted content
|
||||
(avoids duplicate events when session ends).
|
||||
Returns ``(content, timestamp)`` for the just-finalized user
|
||||
message, or ``None`` when there's no content to flush and
|
||||
``on_session_end=True`` (avoids emitting empty events during
|
||||
session shutdown). Callers construct the appropriate message
|
||||
dataclass for each event they emit.
|
||||
"""
|
||||
segment = await self.push_aggregation()
|
||||
full_aggregation = self._full_user_turn_aggregation
|
||||
@@ -975,12 +1296,53 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
else:
|
||||
content = full_aggregation or segment
|
||||
|
||||
if not on_session_end or content:
|
||||
message = UserTurnStoppedMessage(
|
||||
content=content, timestamp=self._user_turn_start_timestamp
|
||||
)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, message)
|
||||
self._user_turn_start_timestamp = ""
|
||||
if on_session_end and not content:
|
||||
return None
|
||||
|
||||
timestamp = self._user_turn_start_timestamp
|
||||
self._user_turn_start_timestamp = ""
|
||||
return content, timestamp
|
||||
|
||||
async def _finalize_user_turn(
|
||||
self,
|
||||
strategy: BaseUserTurnStopStrategy | None = None,
|
||||
on_session_end: bool = False,
|
||||
):
|
||||
"""Finalize the user turn: flush the message, emit both events.
|
||||
|
||||
Used in the default mode (``_wait_for_post_turn_transcripts ==
|
||||
False``), where end of turn and user message finalization
|
||||
coincide. Emits both ``on_user_turn_stopped`` and
|
||||
``on_user_turn_message_finalized``.
|
||||
"""
|
||||
result = await self._flush_user_message_to_context(on_session_end=on_session_end)
|
||||
if result is None:
|
||||
return
|
||||
content, timestamp = result
|
||||
stopped_msg = UserTurnStoppedMessage(content=content, timestamp=timestamp)
|
||||
finalized_msg = UserMessageFinalizedMessage(content=content, timestamp=timestamp)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, stopped_msg)
|
||||
await self._call_event_handler("on_user_turn_message_finalized", strategy, finalized_msg)
|
||||
|
||||
async def _finalize_user_message(
|
||||
self,
|
||||
strategy: BaseUserTurnStopStrategy | None = None,
|
||||
on_session_end: bool = False,
|
||||
):
|
||||
"""Finalize the user message: flush to context, emit one event.
|
||||
|
||||
Used when the aggregator runs a post-turn transcript wait
|
||||
(``_wait_for_post_turn_transcripts == True``), where user
|
||||
message finalization fires after the end of turn. Emits
|
||||
``on_user_turn_message_finalized`` only; ``on_user_turn_stopped``
|
||||
was already emitted at the end of turn.
|
||||
"""
|
||||
result = await self._flush_user_message_to_context(on_session_end=on_session_end)
|
||||
if result is None:
|
||||
return
|
||||
content, timestamp = result
|
||||
finalized_msg = UserMessageFinalizedMessage(content=content, timestamp=timestamp)
|
||||
await self._call_event_handler("on_user_turn_message_finalized", strategy, finalized_msg)
|
||||
|
||||
|
||||
class LLMAssistantAggregator(LLMContextAggregator):
|
||||
|
||||
@@ -19,10 +19,6 @@ All bots must implement a `bot(runner_args)` async function as the entry point.
|
||||
The server automatically discovers and executes this function when connections
|
||||
are established.
|
||||
|
||||
By default the runner starts a single FastAPI server that supports WebRTC, Daily,
|
||||
and telephony transports simultaneously. Clients declare which transport they want
|
||||
via the ``transport`` field in the ``/start`` request body (default: ``"webrtc"``).
|
||||
|
||||
Single transport example::
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
@@ -59,33 +55,14 @@ Supported transports:
|
||||
- WebRTC - Provides local WebRTC interface with prebuilt UI
|
||||
- Telephony - Handles webhook and WebSocket connections for Twilio, Telnyx, Plivo, Exotel
|
||||
|
||||
The ``/start`` endpoint accepts::
|
||||
|
||||
{
|
||||
"transport": "webrtc", // "webrtc" | "daily" | "twilio" | "telnyx" |
|
||||
// "plivo" | "exotel" — default: "webrtc"
|
||||
|
||||
// WebRTC-specific
|
||||
"enableDefaultIceServers": false,
|
||||
"body": {...},
|
||||
|
||||
// Daily-specific
|
||||
"createDailyRoom": true,
|
||||
"dailyRoomProperties": {...},
|
||||
"dailyMeetingTokenProperties": {...},
|
||||
"body": {...}
|
||||
}
|
||||
|
||||
To run locally:
|
||||
|
||||
- All transports (default): ``python bot.py``
|
||||
- WebRTC only: ``python bot.py -t webrtc``
|
||||
- ESP32: ``python bot.py -t webrtc --esp32 --host 192.168.1.100``
|
||||
- Daily only: ``python bot.py -t daily``
|
||||
- Daily (direct, testing only): ``python bot.py -d``
|
||||
- Telephony: ``python bot.py -t twilio -x your_username.ngrok.io``
|
||||
- Exotel: ``python bot.py -t exotel`` (no proxy needed, but ngrok connection to HTTP 7860 is required)
|
||||
- WhatsApp: ``python bot.py --whatsapp``
|
||||
- WebRTC: `python bot.py -t webrtc`
|
||||
- ESP32: `python bot.py -t webrtc --esp32 --host 192.168.1.100`
|
||||
- Daily (server): `python bot.py -t daily`
|
||||
- Daily (direct, testing only): `python bot.py -d`
|
||||
- Telephony: `python bot.py -t twilio -x your_username.ngrok.io`
|
||||
- Exotel: `python bot.py -t exotel` (no proxy needed, but ngrok connection to HTTP 7860 is required)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -209,33 +186,8 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace):
|
||||
await bot_module.bot(runner_args)
|
||||
|
||||
|
||||
async def _run_websocket_bot(websocket: WebSocket, args: argparse.Namespace):
|
||||
"""Run a bot for plain WebSocket transport."""
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
runner_args = WebSocketRunnerArguments(
|
||||
websocket=websocket,
|
||||
transport_type="websocket",
|
||||
session_id=str(uuid.uuid4()),
|
||||
)
|
||||
runner_args.cli_args = args
|
||||
|
||||
await bot_module.bot(runner_args)
|
||||
|
||||
|
||||
def _setup_websocket_routes(app: FastAPI, args: argparse.Namespace):
|
||||
"""Set up the plain WebSocket route at ``/ws-client``."""
|
||||
|
||||
@app.websocket("/ws-client")
|
||||
async def websocket_client_endpoint(websocket: WebSocket):
|
||||
"""Handle plain WebSocket connections (non-telephony)."""
|
||||
await websocket.accept()
|
||||
logger.debug("Plain WebSocket connection accepted")
|
||||
await _run_websocket_bot(websocket, args)
|
||||
|
||||
|
||||
def _configure_server_app(args: argparse.Namespace):
|
||||
"""Configure the module-level FastAPI app with routes for all transports."""
|
||||
"""Configure the module-level FastAPI app with transport-specific routes."""
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
@@ -244,198 +196,17 @@ def _configure_server_app(args: argparse.Namespace):
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Shared session store: session_id -> body data. Used by the WebRTC /start
|
||||
# flow and the /sessions/{session_id}/... proxy routes.
|
||||
active_sessions: dict[str, dict[str, Any]] = {}
|
||||
|
||||
_setup_frontend_routes(app)
|
||||
_setup_webrtc_routes(app, args, active_sessions)
|
||||
_setup_daily_routes(app, args)
|
||||
_setup_telephony_routes(app, args)
|
||||
_setup_websocket_routes(app, args)
|
||||
_setup_unified_start_route(app, args, active_sessions)
|
||||
|
||||
if args.whatsapp:
|
||||
_setup_whatsapp_routes(app, args)
|
||||
|
||||
|
||||
def _setup_unified_start_route(
|
||||
app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]]
|
||||
):
|
||||
"""Register the unified POST /start and GET /status endpoints.
|
||||
|
||||
Handles WebRTC, Daily, and telephony transport start flows. Clients specify
|
||||
which transport they want via the ``transport`` field in the request body.
|
||||
When ``-t`` was passed on the command line, requests for any other transport
|
||||
are rejected with HTTP 400.
|
||||
"""
|
||||
ALL_TRANSPORTS = ["webrtc", "daily", *TELEPHONY_TRANSPORTS, "websocket"]
|
||||
|
||||
@app.get("/status")
|
||||
async def status():
|
||||
"""Return the transports supported by this runner instance."""
|
||||
transports = [args.transport] if args.transport is not None else ALL_TRANSPORTS
|
||||
return {"status": "ready", "transports": transports}
|
||||
|
||||
class IceServer(TypedDict, total=False):
|
||||
urls: str | list[str]
|
||||
|
||||
class IceConfig(TypedDict):
|
||||
iceServers: list[IceServer]
|
||||
|
||||
class StartBotResult(TypedDict, total=False):
|
||||
sessionId: str
|
||||
iceConfig: IceConfig | None
|
||||
dailyRoom: str | None
|
||||
dailyToken: str | None
|
||||
wsUrl: str | None
|
||||
token: str | None
|
||||
|
||||
@app.post("/start")
|
||||
async def start_agent(request: Request):
|
||||
"""Start a bot session.
|
||||
|
||||
Accepts::
|
||||
|
||||
{
|
||||
"transport": "webrtc", // "webrtc" | "daily" | "twilio" | "telnyx" |
|
||||
// "plivo" | "exotel" — default: "webrtc"
|
||||
|
||||
// WebRTC-specific
|
||||
"enableDefaultIceServers": false,
|
||||
"body": {...},
|
||||
|
||||
// Daily-specific
|
||||
"createDailyRoom": true,
|
||||
"dailyRoomProperties": {...},
|
||||
"dailyMeetingTokenProperties": {...},
|
||||
"body": {...}
|
||||
}
|
||||
"""
|
||||
try:
|
||||
request_data = await request.json()
|
||||
logger.debug(f"Received request: {request_data}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
# Determine transport: explicit field → legacy Daily hint → CLI default → webrtc
|
||||
transport = request_data.get("transport")
|
||||
if transport is None and request_data.get("createDailyRoom", False):
|
||||
transport = "daily"
|
||||
if transport is None:
|
||||
transport = args.transport or "webrtc"
|
||||
|
||||
# Enforce restriction when -t was explicitly set on the command line
|
||||
if args.transport is not None and transport != args.transport:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=(
|
||||
f"Transport '{transport}' is not allowed. "
|
||||
f"Server is configured for '{args.transport}' only (-t {args.transport})."
|
||||
),
|
||||
)
|
||||
|
||||
if transport == "webrtc":
|
||||
# WebRTC: register the session; the bot starts when the WebRTC offer arrives.
|
||||
session_id = str(uuid.uuid4())
|
||||
active_sessions[session_id] = request_data.get("body", {})
|
||||
|
||||
result = StartBotResult(
|
||||
sessionId=session_id,
|
||||
)
|
||||
if request_data.get("enableDefaultIceServers"):
|
||||
result["iceConfig"] = IceConfig(
|
||||
iceServers=[IceServer(urls=["stun:stun.l.google.com:19302"])]
|
||||
)
|
||||
return result
|
||||
|
||||
elif transport == "daily":
|
||||
create_daily_room = request_data.get("createDailyRoom", False)
|
||||
body = request_data.get("body", {})
|
||||
daily_room_properties_dict = request_data.get("dailyRoomProperties", None)
|
||||
daily_token_properties_dict = request_data.get("dailyMeetingTokenProperties", None)
|
||||
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
existing_room_url = os.getenv("DAILY_ROOM_URL")
|
||||
session_id = str(uuid.uuid4())
|
||||
result: StartBotResult | None = None
|
||||
|
||||
if create_daily_room or existing_room_url:
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.transports.daily.utils import (
|
||||
DailyMeetingTokenProperties,
|
||||
DailyRoomProperties,
|
||||
)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_properties = None
|
||||
if daily_room_properties_dict:
|
||||
daily_room_properties_dict.setdefault(
|
||||
"exp", time.time() + PIPECAT_ROOM_EXP_HOURS * 3600
|
||||
)
|
||||
daily_room_properties_dict.setdefault("eject_at_room_exp", True)
|
||||
try:
|
||||
room_properties = DailyRoomProperties(**daily_room_properties_dict)
|
||||
logger.debug(f"Using custom room properties: {room_properties}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse dailyRoomProperties: {e}")
|
||||
|
||||
token_properties = None
|
||||
if daily_token_properties_dict:
|
||||
try:
|
||||
token_properties = DailyMeetingTokenProperties(
|
||||
**daily_token_properties_dict
|
||||
)
|
||||
logger.debug(f"Using custom token properties: {token_properties}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse dailyMeetingTokenProperties: {e}")
|
||||
|
||||
room_url, token = await configure(
|
||||
session,
|
||||
room_exp_duration=PIPECAT_ROOM_EXP_HOURS,
|
||||
room_properties=room_properties,
|
||||
token_properties=token_properties,
|
||||
)
|
||||
runner_args = DailyRunnerArguments(
|
||||
room_url=room_url, token=token, body=body, session_id=session_id
|
||||
)
|
||||
result = StartBotResult(
|
||||
dailyRoom=room_url,
|
||||
dailyToken=token,
|
||||
sessionId=session_id,
|
||||
)
|
||||
else:
|
||||
runner_args = RunnerArguments(body=body, session_id=session_id)
|
||||
|
||||
runner_args.cli_args = args
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
return result
|
||||
|
||||
elif transport in TELEPHONY_TRANSPORTS:
|
||||
# Telephony: the bot starts when the provider connects to /ws.
|
||||
# Return the WebSocket URL so the caller knows where to point their provider.
|
||||
scheme = "wss" if args.host != "localhost" else "ws"
|
||||
return StartBotResult(
|
||||
wsUrl=f"{scheme}://{args.host}:{args.port}/ws",
|
||||
)
|
||||
|
||||
elif transport == "websocket":
|
||||
# Plain WebSocket: the bot starts when the client connects to /ws-client.
|
||||
scheme = "wss" if args.host != "localhost" else "ws"
|
||||
session_id = str(uuid.uuid4())
|
||||
return StartBotResult(
|
||||
wsUrl=f"{scheme}://{args.host}:{args.port}/ws-client",
|
||||
sessionId=session_id,
|
||||
token="mock_token",
|
||||
)
|
||||
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Unknown transport '{transport}'.",
|
||||
)
|
||||
# Set up transport-specific routes
|
||||
if args.transport == "webrtc":
|
||||
_setup_webrtc_routes(app, args)
|
||||
if args.whatsapp:
|
||||
_setup_whatsapp_routes(app, args)
|
||||
elif args.transport == "daily":
|
||||
_setup_daily_routes(app, args)
|
||||
elif args.transport in TELEPHONY_TRANSPORTS:
|
||||
_setup_telephony_routes(app, args)
|
||||
else:
|
||||
logger.warning(f"Unknown transport type: {args.transport}")
|
||||
|
||||
|
||||
def _resolve_download_path(folder: str, filename: str) -> Path:
|
||||
@@ -449,27 +220,11 @@ def _resolve_download_path(folder: str, filename: str) -> Path:
|
||||
return file_path
|
||||
|
||||
|
||||
def _setup_frontend_routes(app: FastAPI):
|
||||
"""Mount the prebuilt frontend UI and root redirect for all transports."""
|
||||
try:
|
||||
from pipecat_ai_prebuilt.frontend import PipecatPrebuiltUI
|
||||
except ImportError as e:
|
||||
logger.error(f"Prebuilt frontend not available: {e}")
|
||||
return
|
||||
|
||||
app.mount("/client", PipecatPrebuiltUI)
|
||||
|
||||
@app.get("/", include_in_schema=False)
|
||||
async def root_redirect():
|
||||
"""Redirect root requests to client interface."""
|
||||
return RedirectResponse(url="/client/")
|
||||
|
||||
|
||||
def _setup_webrtc_routes(
|
||||
app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]]
|
||||
):
|
||||
def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
|
||||
"""Set up WebRTC-specific routes."""
|
||||
try:
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.request_handler import (
|
||||
IceCandidate,
|
||||
@@ -481,6 +236,27 @@ def _setup_webrtc_routes(
|
||||
logger.error(f"WebRTC transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
class IceServer(TypedDict, total=False):
|
||||
urls: str | list[str]
|
||||
|
||||
class IceConfig(TypedDict):
|
||||
iceServers: list[IceServer]
|
||||
|
||||
class StartBotResult(TypedDict, total=False):
|
||||
sessionId: str
|
||||
iceConfig: IceConfig | None
|
||||
|
||||
# In-memory store of active sessions: session_id -> session info
|
||||
active_sessions: dict[str, dict[str, Any]] = {}
|
||||
|
||||
# Mount the frontend
|
||||
app.mount("/client", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@app.get("/", include_in_schema=False)
|
||||
async def root_redirect():
|
||||
"""Redirect root requests to client interface."""
|
||||
return RedirectResponse(url="/client/")
|
||||
|
||||
@app.get("/files/{filename:path}")
|
||||
async def download_file(filename: str):
|
||||
"""Handle file downloads."""
|
||||
@@ -539,6 +315,29 @@ def _setup_webrtc_routes(
|
||||
await small_webrtc_handler.handle_patch_request(request)
|
||||
return {"status": "success"}
|
||||
|
||||
@app.post("/start")
|
||||
async def rtvi_start(request: Request):
|
||||
"""Mimic Pipecat Cloud's /start endpoint."""
|
||||
# Parse the request body
|
||||
try:
|
||||
request_data = await request.json()
|
||||
logger.debug(f"Received request: {request_data}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
|
||||
session_id = str(uuid.uuid4())
|
||||
active_sessions[session_id] = request_data.get("body", {})
|
||||
|
||||
result: StartBotResult = {"sessionId": session_id}
|
||||
if request_data.get("enableDefaultIceServers"):
|
||||
result["iceConfig"] = IceConfig(
|
||||
iceServers=[IceServer(urls=["stun:stun.l.google.com:19302"])]
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
@app.api_route(
|
||||
"/sessions/{session_id}/{path:path}",
|
||||
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
|
||||
@@ -764,10 +563,12 @@ def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace):
|
||||
def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
|
||||
"""Set up Daily-specific routes."""
|
||||
|
||||
@app.get("/daily")
|
||||
@app.get("/")
|
||||
async def create_room_and_start_agent():
|
||||
"""Launch a Daily bot and redirect to room."""
|
||||
logger.debug("Starting bot with Daily transport and redirecting to Daily room")
|
||||
print("Starting bot with Daily transport and redirecting to Daily room")
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
@@ -783,6 +584,105 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
return RedirectResponse(room_url)
|
||||
|
||||
@app.post("/start")
|
||||
async def start_agent(request: Request):
|
||||
"""Handler for /start endpoints.
|
||||
|
||||
Expects POST body like::
|
||||
{
|
||||
"createDailyRoom": true,
|
||||
"dailyRoomProperties": { "start_video_off": true },
|
||||
"dailyMeetingTokenProperties": { "is_owner": true, "user_name": "Bot" },
|
||||
"body": { "custom_data": "value" }
|
||||
}
|
||||
"""
|
||||
print("Starting bot with Daily transport")
|
||||
|
||||
# Parse the request body
|
||||
try:
|
||||
request_data = await request.json()
|
||||
logger.debug(f"Received request: {request_data}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
create_daily_room = request_data.get("createDailyRoom", False)
|
||||
body = request_data.get("body", {})
|
||||
daily_room_properties_dict = request_data.get("dailyRoomProperties", None)
|
||||
daily_token_properties_dict = request_data.get("dailyMeetingTokenProperties", None)
|
||||
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
existing_room_url = os.getenv("DAILY_ROOM_URL")
|
||||
|
||||
session_id = str(uuid.uuid4())
|
||||
result = None
|
||||
|
||||
# Configure room if:
|
||||
# 1. Explicitly requested via createDailyRoom in payload
|
||||
# 2. Using pre-configured room from DAILY_ROOM_URL env var
|
||||
if create_daily_room or existing_room_url:
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.transports.daily.utils import (
|
||||
DailyMeetingTokenProperties,
|
||||
DailyRoomProperties,
|
||||
)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# Parse dailyRoomProperties if provided
|
||||
room_properties = None
|
||||
if daily_room_properties_dict:
|
||||
# Apply Pipecat Cloud's session policy if caller didn't override.
|
||||
daily_room_properties_dict.setdefault(
|
||||
"exp", time.time() + PIPECAT_ROOM_EXP_HOURS * 3600
|
||||
)
|
||||
daily_room_properties_dict.setdefault("eject_at_room_exp", True)
|
||||
try:
|
||||
room_properties = DailyRoomProperties(**daily_room_properties_dict)
|
||||
logger.debug(f"Using custom room properties: {room_properties}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse dailyRoomProperties: {e}")
|
||||
# Continue without custom properties
|
||||
|
||||
# Parse dailyMeetingTokenProperties if provided
|
||||
token_properties = None
|
||||
if daily_token_properties_dict:
|
||||
try:
|
||||
token_properties = DailyMeetingTokenProperties(
|
||||
**daily_token_properties_dict
|
||||
)
|
||||
logger.debug(f"Using custom token properties: {token_properties}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse dailyMeetingTokenProperties: {e}")
|
||||
# Continue without custom properties
|
||||
|
||||
room_url, token = await configure(
|
||||
session,
|
||||
room_exp_duration=PIPECAT_ROOM_EXP_HOURS,
|
||||
room_properties=room_properties,
|
||||
token_properties=token_properties,
|
||||
)
|
||||
runner_args = DailyRunnerArguments(
|
||||
room_url=room_url, token=token, body=body, session_id=session_id
|
||||
)
|
||||
result = {
|
||||
"dailyRoom": room_url,
|
||||
"dailyToken": token,
|
||||
"sessionId": session_id,
|
||||
}
|
||||
else:
|
||||
runner_args = RunnerArguments(body=body, session_id=session_id)
|
||||
|
||||
# Update CLI args.
|
||||
runner_args.cli_args = args
|
||||
|
||||
# Start the bot in the background
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
|
||||
return result
|
||||
|
||||
if args.dialin:
|
||||
|
||||
@app.post("/daily-dialin-webhook")
|
||||
@@ -831,6 +731,8 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
|
||||
detail="Missing required fields: From, To, callId, callDomain",
|
||||
)
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.runner.types import DailyDialinRequest, DialinSettings
|
||||
|
||||
@@ -899,51 +801,44 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
|
||||
|
||||
|
||||
def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
|
||||
"""Set up telephony-specific routes.
|
||||
|
||||
The WebSocket endpoint (``/ws``) is always registered so providers can
|
||||
connect directly. The XML webhook (``POST /``) is only registered when a
|
||||
specific telephony transport is chosen via ``-t`` because the XML template
|
||||
is provider-specific and requires a proxy hostname (``--proxy``).
|
||||
"""
|
||||
if args.transport in TELEPHONY_TRANSPORTS:
|
||||
# XML response templates (Exotel doesn't use XML webhooks)
|
||||
XML_TEMPLATES = {
|
||||
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
"""Set up telephony-specific routes."""
|
||||
# XML response templates (Exotel doesn't use XML webhooks)
|
||||
XML_TEMPLATES = {
|
||||
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://{args.proxy}/ws"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>""",
|
||||
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://{args.proxy}/ws" bidirectionalMode="rtp"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>""",
|
||||
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{args.proxy}/ws</Stream>
|
||||
</Response>""",
|
||||
}
|
||||
}
|
||||
|
||||
@app.post("/")
|
||||
async def start_call():
|
||||
"""Handle telephony webhook and return XML response."""
|
||||
if args.transport == "exotel":
|
||||
# Exotel doesn't use POST webhooks - redirect to proper documentation
|
||||
logger.debug("POST Exotel endpoint - not used")
|
||||
return {
|
||||
"error": "Exotel doesn't use POST webhooks",
|
||||
"websocket_url": f"wss://{args.proxy}/ws",
|
||||
"note": "Configure the WebSocket URL above in your Exotel App Bazaar Voicebot Applet",
|
||||
}
|
||||
else:
|
||||
logger.debug(f"POST {args.transport.upper()} XML")
|
||||
xml_content = XML_TEMPLATES.get(args.transport, "<Response></Response>")
|
||||
return HTMLResponse(content=xml_content, media_type="application/xml")
|
||||
@app.post("/")
|
||||
async def start_call():
|
||||
"""Handle telephony webhook and return XML response."""
|
||||
if args.transport == "exotel":
|
||||
# Exotel doesn't use POST webhooks - redirect to proper documentation
|
||||
logger.debug("POST Exotel endpoint - not used")
|
||||
return {
|
||||
"error": "Exotel doesn't use POST webhooks",
|
||||
"websocket_url": f"wss://{args.proxy}/ws",
|
||||
"note": "Configure the WebSocket URL above in your Exotel App Bazaar Voicebot Applet",
|
||||
}
|
||||
else:
|
||||
logger.debug(f"POST {args.transport.upper()} XML")
|
||||
xml_content = XML_TEMPLATES.get(args.transport, "<Response></Response>")
|
||||
return HTMLResponse(content=xml_content, media_type="application/xml")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
@@ -952,6 +847,11 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
|
||||
logger.debug("WebSocket connection accepted")
|
||||
await _run_telephony_bot(websocket, args)
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent():
|
||||
"""Simple status endpoint for telephony transports."""
|
||||
return {"status": f"Bot started with {args.transport}"}
|
||||
|
||||
|
||||
async def _run_daily_direct(args: argparse.Namespace):
|
||||
"""Run Daily bot with direct connection (no FastAPI server)."""
|
||||
@@ -1022,27 +922,22 @@ def runner_port() -> int:
|
||||
def main(parser: argparse.ArgumentParser | None = None):
|
||||
"""Start the Pipecat development runner.
|
||||
|
||||
Parses command-line arguments and starts a FastAPI server that supports
|
||||
WebRTC, Daily, and telephony transports simultaneously. Clients declare
|
||||
which transport to use via the ``transport`` field in the ``/start`` body.
|
||||
|
||||
When ``-t`` is provided, the server restricts ``/start`` to that transport
|
||||
only and displays transport-specific startup information.
|
||||
Parses command-line arguments and starts a FastAPI server configured
|
||||
for the specified transport type.
|
||||
|
||||
The runner discovers and runs any ``bot(runner_args)`` function found in the
|
||||
calling module.
|
||||
|
||||
Command-line arguments:
|
||||
- --host: Server host address (default: localhost)
|
||||
- --host: Server host address (default: localhost) 879
|
||||
- --port: Server port (default: 7860)
|
||||
- -t/--transport: Restrict to a single transport and set as default for /start
|
||||
(daily, webrtc, twilio, telnyx, plivo, exotel). Omit to support all transports.
|
||||
- -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
|
||||
- -x/--proxy: Public proxy hostname for telephony webhooks
|
||||
- -d/--direct: Connect directly to Daily room (automatically sets transport to daily)
|
||||
- -f/--folder: Path to downloads folder
|
||||
- --dialin: Enable Daily PSTN dial-in webhook handling
|
||||
- --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport)
|
||||
- --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
|
||||
- --whatsapp: Ensure required WhatsApp environment variables are present
|
||||
- --whatsapp: Ensure requried WhatsApp environment variables are present
|
||||
- -v/--verbose: Increase logging verbosity
|
||||
|
||||
Args:
|
||||
@@ -1063,11 +958,8 @@ def main(parser: argparse.ArgumentParser | None = None):
|
||||
"--transport",
|
||||
type=str,
|
||||
choices=["daily", "webrtc", *TELEPHONY_TRANSPORTS],
|
||||
default=None,
|
||||
help=(
|
||||
"Restrict the server to a single transport and set it as the default for /start. "
|
||||
"Omit to support all transports simultaneously (default behaviour)."
|
||||
),
|
||||
default="webrtc",
|
||||
help="Transport type",
|
||||
)
|
||||
parser.add_argument("-x", "--proxy", help="Public proxy host name")
|
||||
parser.add_argument(
|
||||
@@ -1085,7 +977,7 @@ def main(parser: argparse.ArgumentParser | None = None):
|
||||
"--dialin",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Enable Daily PSTN dial-in webhook handling",
|
||||
help="Enable Daily PSTN dial-in webhook handling (requires Daily transport)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--esp32",
|
||||
@@ -1097,7 +989,7 @@ def main(parser: argparse.ArgumentParser | None = None):
|
||||
"--whatsapp",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Ensure required WhatsApp environment variables are present",
|
||||
help="Ensure requried WhatsApp environment variables are present",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
@@ -1106,13 +998,12 @@ def main(parser: argparse.ArgumentParser | None = None):
|
||||
if args.proxy:
|
||||
args.proxy = _validate_and_clean_proxy(args.proxy)
|
||||
|
||||
# --direct implies Daily transport
|
||||
if args.direct:
|
||||
if args.transport is None or args.transport == "daily":
|
||||
args.transport = "daily"
|
||||
else:
|
||||
logger.error("--direct flag only works with Daily transport (-t daily)")
|
||||
return
|
||||
# Auto-set transport to daily if --direct is used without explicit transport
|
||||
if args.direct and args.transport == "webrtc": # webrtc is the default
|
||||
args.transport = "daily"
|
||||
elif args.direct and args.transport != "daily":
|
||||
logger.error("--direct flag only works with Daily transport (-t daily)")
|
||||
return
|
||||
|
||||
# Validate ESP32 requirements
|
||||
if args.esp32 and args.host == "localhost":
|
||||
@@ -1120,7 +1011,7 @@ def main(parser: argparse.ArgumentParser | None = None):
|
||||
return
|
||||
|
||||
# Validate dial-in requirements
|
||||
if args.dialin and args.transport is not None and args.transport != "daily":
|
||||
if args.dialin and args.transport != "daily":
|
||||
logger.error("--dialin flag only works with Daily transport (-t daily)")
|
||||
return
|
||||
|
||||
@@ -1138,38 +1029,28 @@ def main(parser: argparse.ArgumentParser | None = None):
|
||||
asyncio.run(_run_daily_direct(args))
|
||||
return
|
||||
|
||||
# Print startup message
|
||||
print()
|
||||
if args.transport is None:
|
||||
print("🚀 Bot ready!")
|
||||
print(f" → WebRTC: http://{args.host}:{args.port}/client")
|
||||
print(f" → Daily: http://{args.host}:{args.port}/daily")
|
||||
print(f" → Telephony: ws://{args.host}:{args.port}/ws")
|
||||
elif args.transport == "webrtc":
|
||||
# Print startup message for server-based transports
|
||||
if args.transport == "webrtc":
|
||||
print()
|
||||
if args.esp32:
|
||||
print("🚀 Bot ready! (ESP32 mode)")
|
||||
print(f"🚀 Bot ready! (ESP32 mode)")
|
||||
elif args.whatsapp:
|
||||
print("🚀 Bot ready! (WhatsApp)")
|
||||
print(f"🚀 Bot ready! (WhatsApp)")
|
||||
else:
|
||||
print("🚀 Bot ready! (WebRTC)")
|
||||
print(f"🚀 Bot ready!")
|
||||
print(f" → Open http://{args.host}:{args.port}/client in your browser")
|
||||
print()
|
||||
elif args.transport == "daily":
|
||||
print("🚀 Bot ready! (Daily)")
|
||||
print()
|
||||
print(f"🚀 Bot ready!")
|
||||
if args.dialin:
|
||||
print(
|
||||
f" → Daily dial-in webhook: http://{args.host}:{args.port}/daily-dialin-webhook"
|
||||
)
|
||||
print(f" → Configure this URL in your Daily phone number settings")
|
||||
else:
|
||||
print(
|
||||
f" → Open http://{args.host}:{args.port}/daily in your browser to start a session"
|
||||
)
|
||||
elif args.transport in TELEPHONY_TRANSPORTS:
|
||||
print(f"🚀 Bot ready! ({args.transport.capitalize()})")
|
||||
if args.proxy:
|
||||
print(f" → XML webhook: http://{args.host}:{args.port}/")
|
||||
print(f" → WebSocket: ws://{args.host}:{args.port}/ws")
|
||||
print()
|
||||
print(f" → Open http://{args.host}:{args.port} in your browser to start a session")
|
||||
print()
|
||||
|
||||
RUNNER_DOWNLOADS_FOLDER = args.folder
|
||||
RUNNER_HOST = args.host
|
||||
|
||||
@@ -105,14 +105,10 @@ class WebSocketRunnerArguments(RunnerArguments):
|
||||
|
||||
Parameters:
|
||||
websocket: WebSocket connection for audio streaming
|
||||
transport_type: Transport type identifier. Set to ``"websocket"`` for plain
|
||||
WebSocket connections; ``None`` triggers auto-detection from the first
|
||||
telephony provider message.
|
||||
body: Additional request data
|
||||
"""
|
||||
|
||||
websocket: WebSocket
|
||||
transport_type: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -562,12 +562,6 @@ async def create_transport(
|
||||
)
|
||||
|
||||
elif isinstance(runner_args, WebSocketRunnerArguments):
|
||||
if runner_args.transport_type == "websocket":
|
||||
params = _get_transport_params("websocket", transport_params)
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketTransport
|
||||
|
||||
return FastAPIWebsocketTransport(websocket=runner_args.websocket, params=params)
|
||||
|
||||
# Parse once to determine the provider and get data
|
||||
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
|
||||
params = _get_transport_params(transport_type, transport_params)
|
||||
|
||||
@@ -419,30 +419,30 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
"""Convenience method to create a speed tag."""
|
||||
return f'<speed ratio="{speed}" />'
|
||||
|
||||
def _is_chinese_or_japanese_language(self, language: str) -> bool:
|
||||
"""Check if the given language is Chinese or Japanese.
|
||||
def _is_cjk_language(self, language: str) -> bool:
|
||||
"""Check if the given language is CJK (Chinese, Japanese, Korean).
|
||||
|
||||
Args:
|
||||
language: The language code to check.
|
||||
|
||||
Returns:
|
||||
True if the language is Chinese or Japanese.
|
||||
True if the language is Chinese, Japanese, or Korean.
|
||||
"""
|
||||
cjk_languages = {"zh", "ja", "ko"}
|
||||
base_lang = language.split("-")[0].lower()
|
||||
return base_lang in {"zh", "ja"}
|
||||
return base_lang in cjk_languages
|
||||
|
||||
def _process_word_timestamps_for_language(
|
||||
self, words: list[str], starts: list[float]
|
||||
) -> list[tuple[str, float]]:
|
||||
"""Process word timestamps based on the current language.
|
||||
|
||||
For Chinese and Japanese, Cartesia groups related characters in the same timestamp
|
||||
message.
|
||||
For CJK languages, Cartesia groups related characters in the same timestamp message.
|
||||
For example, in Japanese a single message might be `['こ', 'ん', 'に', 'ち', 'は', '。']`.
|
||||
We combine these into single words so the downstream aggregator can add natural
|
||||
spacing between meaningful units rather than individual characters.
|
||||
|
||||
For other languages, words are already properly separated and are used as-is.
|
||||
For non-CJK languages, words are already properly separated and are used as-is.
|
||||
|
||||
Args:
|
||||
words: List of words/characters from Cartesia.
|
||||
@@ -453,10 +453,10 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
"""
|
||||
current_language = assert_given(self._settings.language)
|
||||
|
||||
# Check if this is a Chinese/Japanese language (if language is None, treat as other)
|
||||
if current_language and self._is_chinese_or_japanese_language(current_language):
|
||||
# For Chinese/Japanese, combine all characters in this message into one word
|
||||
# using the first character's start time.
|
||||
# Check if this is a CJK language (if language is None, treat as non-CJK)
|
||||
if current_language and self._is_cjk_language(current_language):
|
||||
# For CJK languages, combine all characters in this message into one word
|
||||
# using the first character's start time
|
||||
if words and starts:
|
||||
combined_word = "".join(words)
|
||||
first_start = starts[0]
|
||||
@@ -467,11 +467,6 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
# For non-CJK languages, use as-is
|
||||
return list(zip(words, starts))
|
||||
|
||||
def _word_timestamps_include_inter_frame_spaces(self) -> bool:
|
||||
"""Whether timestamp text should be treated as carrying its own spacing."""
|
||||
current_language = assert_given(self._settings.language)
|
||||
return bool(current_language and self._is_chinese_or_japanese_language(current_language))
|
||||
|
||||
def _build_msg(
|
||||
self,
|
||||
text: str = "",
|
||||
@@ -665,13 +660,7 @@ class CartesiaTTSService(WebsocketTTSService):
|
||||
processed_timestamps = self._process_word_timestamps_for_language(
|
||||
msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]
|
||||
)
|
||||
await self.add_word_timestamps(
|
||||
processed_timestamps,
|
||||
ctx_id,
|
||||
includes_inter_frame_spaces=(
|
||||
True if self._word_timestamps_include_inter_frame_spaces() else None
|
||||
),
|
||||
)
|
||||
await self.add_word_timestamps(processed_timestamps, ctx_id)
|
||||
elif msg["type"] == "chunk":
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
|
||||
@@ -358,8 +358,7 @@ class ElevenLabsSTTService(SegmentedSTTService):
|
||||
|
||||
# Add required model_id and language_code
|
||||
data.add_field("model_id", self._settings.model)
|
||||
if self._settings.language:
|
||||
data.add_field("language_code", self._settings.language)
|
||||
data.add_field("language_code", self._settings.language)
|
||||
if self._settings.tag_audio_events is not None:
|
||||
data.add_field("tag_audio_events", str(self._settings.tag_audio_events).lower())
|
||||
keyterms = self._settings.keyterms
|
||||
|
||||
@@ -150,7 +150,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
api_endpoint_base_url: str = "wss://api.gradium.ai/api/speech/asr",
|
||||
api_endpoint_base_url: str = "wss://eu.api.gradium.ai/api/speech/asr",
|
||||
encoding: str = "pcm",
|
||||
sample_rate: int | None = None,
|
||||
params: InputParams | None = None,
|
||||
@@ -163,7 +163,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
|
||||
Args:
|
||||
api_key: Gradium API key for authentication.
|
||||
api_endpoint_base_url: WebSocket endpoint URL.
|
||||
api_endpoint_base_url: WebSocket endpoint URL. Defaults to Gradium's streaming endpoint.
|
||||
encoding: Base audio encoding type. One of "pcm", "wav", or "opus".
|
||||
For PCM, the sample rate is appended automatically from the
|
||||
pipeline's audio_in_sample_rate (e.g., "pcm" becomes "pcm_16000").
|
||||
|
||||
@@ -68,7 +68,7 @@ class GradiumTTSService(WebsocketTTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str | None = None,
|
||||
url: str = "wss://api.gradium.ai/api/speech/tts",
|
||||
url: str = "wss://eu.api.gradium.ai/api/speech/tts",
|
||||
model: str | None = None,
|
||||
json_config: str | None = None,
|
||||
params: InputParams | None = None,
|
||||
|
||||
@@ -48,8 +48,7 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators import async_tool_messages
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
|
||||
from pipecat.services.settings import (
|
||||
@@ -362,7 +361,6 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
self._messages_added_manually = {}
|
||||
self._pending_function_calls = {}
|
||||
self._completed_tool_calls = set()
|
||||
self._async_tool_warning_logged: bool = False
|
||||
|
||||
self._register_event_handler("on_conversation_item_created")
|
||||
self._register_event_handler("on_conversation_item_updated")
|
||||
@@ -631,7 +629,6 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
self._receive_task = None
|
||||
|
||||
self._completed_tool_calls = set()
|
||||
self._async_tool_warning_logged = False
|
||||
self._audio_buffer = b""
|
||||
self._interim_transcription_text = ""
|
||||
self._disconnecting = False
|
||||
@@ -1003,80 +1000,9 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
|
||||
async def _process_completed_function_calls(self, send_new_results: bool):
|
||||
"""Process completed function calls and send results to the service."""
|
||||
# If the user registered a function with cancel_on_interruption=False,
|
||||
# the aggregator emits async-tool-style messages into the context. As
|
||||
# of this writing, Inworld Realtime doesn't appear to handle the
|
||||
# resulting delayed tool result reliably — the routing code below
|
||||
# is best-effort. Surface a one-time warning so users see they're not
|
||||
# getting what they expect.
|
||||
if not self._async_tool_warning_logged:
|
||||
for message in self._context.get_messages():
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
continue
|
||||
if async_tool_messages.parse_message(message) is not None:
|
||||
logger.error(
|
||||
f"{self}: cancel_on_interruption=False is not reliably "
|
||||
f"supported by Inworld Realtime as of this writing. "
|
||||
f"Use cancel_on_interruption=True (the default), or "
|
||||
f"consider another LLM service if your tool needs the "
|
||||
f"async semantics."
|
||||
)
|
||||
await self.push_error(
|
||||
error_msg=(
|
||||
"cancel_on_interruption=False is not reliably supported "
|
||||
"by Inworld Realtime as of this writing."
|
||||
),
|
||||
)
|
||||
self._async_tool_warning_logged = True
|
||||
break
|
||||
|
||||
sent_new_result = False
|
||||
|
||||
for message in self._context.get_messages():
|
||||
# LLMSpecificMessages are opaque provider-specific payloads, not
|
||||
# standard tool-result messages — skip them.
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
continue
|
||||
|
||||
# Async-tool messages live alongside regular tool messages in the
|
||||
# context; detect and route them before the regular logic so we
|
||||
# don't try to send the async-tool envelope JSON as a tool result.
|
||||
async_payload = async_tool_messages.parse_message(message)
|
||||
if async_payload is not None:
|
||||
if async_payload.tool_call_id in self._completed_tool_calls:
|
||||
continue
|
||||
if async_payload.kind == "started":
|
||||
# The provider already issued the tool call and natively
|
||||
# awaits a result; nothing to send for the started marker.
|
||||
continue
|
||||
if async_payload.kind == "intermediate":
|
||||
logger.error(
|
||||
f"{self}: Inworld Realtime does not support streamed async "
|
||||
f"tool results; dropping intermediate result for "
|
||||
f"tool_call_id={async_payload.tool_call_id}. Consider "
|
||||
f"another LLM service if your tool needs to stream "
|
||||
f"intermediate results."
|
||||
)
|
||||
await self.push_error(
|
||||
error_msg="Inworld Realtime does not support streamed async tool results.",
|
||||
)
|
||||
continue
|
||||
if async_payload.kind == "final":
|
||||
# Deliver via the formal tool-result channel — same path
|
||||
# as a synchronous tool result, just delayed.
|
||||
if send_new_results:
|
||||
sent_new_result = True
|
||||
await self._send_tool_result(
|
||||
async_payload.tool_call_id, async_payload.result
|
||||
)
|
||||
self._completed_tool_calls.add(async_payload.tool_call_id)
|
||||
continue
|
||||
# Defensive: any async-tool message must not fall through
|
||||
# to the regular tool-result block below, even if it
|
||||
# carries a kind we don't recognize.
|
||||
continue
|
||||
|
||||
# Look for newly-completed "regular" (as opposed to async-tool) results
|
||||
if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS":
|
||||
tool_call_id = message.get("tool_call_id")
|
||||
if tool_call_id and tool_call_id not in self._completed_tool_calls:
|
||||
@@ -1085,8 +1011,6 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
await self._send_tool_result(tool_call_id, message.get("content"))
|
||||
self._completed_tool_calls.add(tool_call_id)
|
||||
|
||||
# If we reported any new tool call results to the service, trigger
|
||||
# another response
|
||||
if sent_new_result:
|
||||
await self._create_response()
|
||||
|
||||
|
||||
@@ -60,40 +60,9 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService
|
||||
from pipecat.transcriptions.language import Language, resolve_language
|
||||
from pipecat.utils.tracing.service_decorators import traced_tts
|
||||
|
||||
|
||||
def language_to_inworld_language(language: Language) -> str:
|
||||
"""Convert a Language enum to an Inworld TTS BCP-47 language tag.
|
||||
|
||||
Args:
|
||||
language: The Language enum value to convert.
|
||||
|
||||
Returns:
|
||||
The corresponding Inworld BCP-47 language tag (e.g. ``"en-US"``).
|
||||
Unverified languages fall back to their BCP-47 string value with a warning.
|
||||
"""
|
||||
LANGUAGE_MAP = {
|
||||
Language.AR: "ar-SA",
|
||||
Language.DE: "de-DE",
|
||||
Language.EN: "en-US",
|
||||
Language.ES: "es-ES",
|
||||
Language.FR: "fr-FR",
|
||||
Language.HE: "he-IL",
|
||||
Language.HI: "hi-IN",
|
||||
Language.IT: "it-IT",
|
||||
Language.JA: "ja-JP",
|
||||
Language.KO: "ko-KR",
|
||||
Language.NL: "nl-NL",
|
||||
Language.PL: "pl-PL",
|
||||
Language.PT: "pt-BR",
|
||||
Language.RU: "ru-RU",
|
||||
Language.ZH: "zh-CN",
|
||||
}
|
||||
return resolve_language(language, LANGUAGE_MAP, use_base_code=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class InworldTTSSettings(TTSSettings):
|
||||
"""Settings for InworldTTSService and InworldHttpTTSService.
|
||||
@@ -101,18 +70,10 @@ class InworldTTSSettings(TTSSettings):
|
||||
Parameters:
|
||||
speaking_rate: Speaking rate for speech synthesis.
|
||||
temperature: Temperature for speech synthesis.
|
||||
delivery_mode: Controls the stability vs. creativity tradeoff.
|
||||
``"STABLE"`` produces reliable, predictable speech.
|
||||
``"BALANCED"`` is the default midpoint.
|
||||
``"CREATIVE"`` produces more expressive, emotionally varied speech.
|
||||
Only supported by ``inworld-tts-2``.
|
||||
"""
|
||||
|
||||
speaking_rate: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
temperature: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
delivery_mode: Literal["STABLE", "BALANCED", "CREATIVE"] | None | _NotGiven = field(
|
||||
default_factory=lambda: NOT_GIVEN
|
||||
)
|
||||
|
||||
_aliases: ClassVar[dict[str, str]] = {
|
||||
"voiceId": "voice",
|
||||
@@ -206,7 +167,6 @@ class InworldHttpTTSService(TTSService):
|
||||
language=None,
|
||||
speaking_rate=None,
|
||||
temperature=None,
|
||||
delivery_mode=None,
|
||||
)
|
||||
|
||||
# 2. Apply direct init arg overrides (deprecated)
|
||||
@@ -267,17 +227,6 @@ class InworldHttpTTSService(TTSService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
"""Convert a Language enum to Inworld language format.
|
||||
|
||||
Args:
|
||||
language: The language to convert.
|
||||
|
||||
Returns:
|
||||
The Inworld-specific BCP-47 language code, or None if not supported.
|
||||
"""
|
||||
return language_to_inworld_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Inworld TTS service.
|
||||
|
||||
@@ -364,10 +313,6 @@ class InworldHttpTTSService(TTSService):
|
||||
|
||||
if self._settings.temperature is not None:
|
||||
payload["temperature"] = self._settings.temperature
|
||||
if self._settings.delivery_mode is not None:
|
||||
payload["deliveryMode"] = self._settings.delivery_mode
|
||||
if self._settings.language is not None:
|
||||
payload["language"] = self._settings.language
|
||||
|
||||
# Use WORD timestamps for simplicity and correct spacing/capitalization
|
||||
payload["timestampType"] = self._timestamp_type
|
||||
@@ -664,7 +609,6 @@ class InworldTTSService(WebsocketTTSService):
|
||||
language=None,
|
||||
speaking_rate=None,
|
||||
temperature=None,
|
||||
delivery_mode=None,
|
||||
)
|
||||
|
||||
# 2. Apply direct init arg overrides (deprecated)
|
||||
@@ -756,17 +700,6 @@ class InworldTTSService(WebsocketTTSService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
"""Convert a Language enum to Inworld language format.
|
||||
|
||||
Args:
|
||||
language: The language to convert.
|
||||
|
||||
Returns:
|
||||
The Inworld-specific BCP-47 language code, or None if not supported.
|
||||
"""
|
||||
return language_to_inworld_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Inworld WebSocket TTS service.
|
||||
|
||||
@@ -1156,10 +1089,6 @@ class InworldTTSService(WebsocketTTSService):
|
||||
|
||||
if self._settings.temperature is not None:
|
||||
create_config["temperature"] = self._settings.temperature
|
||||
if self._settings.delivery_mode is not None:
|
||||
create_config["deliveryMode"] = self._settings.delivery_mode
|
||||
if self._settings.language is not None:
|
||||
create_config["language"] = self._settings.language
|
||||
if self._apply_text_normalization is not None:
|
||||
create_config["applyTextNormalization"] = self._apply_text_normalization
|
||||
if self._auto_mode is not None:
|
||||
|
||||
@@ -280,8 +280,6 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
self._client: SageMakerBidiClient | None = None
|
||||
self._receive_task = None
|
||||
self._speech_completed_event = asyncio.Event()
|
||||
self._audio_buffer = b""
|
||||
self._playback_started = False
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
@@ -379,12 +377,7 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
logger.info(f"{self}: verifying if websocket connection is active {active}")
|
||||
return active
|
||||
|
||||
def _reset_audio_buffer(self):
|
||||
self._audio_buffer = b""
|
||||
self._playback_started = False
|
||||
|
||||
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
|
||||
self._reset_audio_buffer()
|
||||
if self._bot_speaking and self._client:
|
||||
logger.debug(
|
||||
f"{self}: interruption detected, sending input_text.done and waiting for speech.completed"
|
||||
@@ -398,30 +391,6 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
logger.warning(f"{self}: timed out waiting for conversation.item.speech.completed")
|
||||
await super()._handle_interruption(frame, direction)
|
||||
|
||||
async def _handle_audio_chunk(self, audio: bytes, context_id: str | None = None):
|
||||
"""Buffer audio and emit frames using a jitter-buffer approach.
|
||||
|
||||
Holds back audio until chunk_size bytes have been accumulated (to avoid
|
||||
glitches at the start of playback), then emits each subsequent chunk
|
||||
immediately as it arrives.
|
||||
"""
|
||||
self._audio_buffer += audio
|
||||
|
||||
if not self._playback_started:
|
||||
if len(self._audio_buffer) < self.chunk_size:
|
||||
return
|
||||
self._playback_started = True
|
||||
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=self._audio_buffer,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
context_id=context_id,
|
||||
)
|
||||
)
|
||||
self._audio_buffer = b""
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Receive NIM JSON events and push audio frames."""
|
||||
while self._client and self._client.is_active and not self._disconnecting:
|
||||
@@ -446,7 +415,14 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
msg = json.loads(payload.decode("utf-8"))
|
||||
except (UnicodeDecodeError, json.JSONDecodeError):
|
||||
# Unexpected binary frame — treat as raw PCM
|
||||
await self._handle_audio_chunk(payload, context_id)
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=payload,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
context_id=context_id,
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
event_type = msg.get("type", "")
|
||||
@@ -458,7 +434,14 @@ class NvidiaSageMakerTTSService(InterruptibleTTSService):
|
||||
chunk_b64 = msg.get("audio", "")
|
||||
if chunk_b64:
|
||||
await self.stop_ttfb_metrics()
|
||||
await self._handle_audio_chunk(base64.b64decode(chunk_b64), context_id)
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=base64.b64decode(chunk_b64),
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
context_id=context_id,
|
||||
)
|
||||
)
|
||||
elif event_type == "error":
|
||||
await self.push_error(error_msg=f"NIM error: {msg.get('message', msg)}")
|
||||
# In case of error we need to reconnect, otherwise we are not going to receive audio from the TTS service anymore
|
||||
|
||||
@@ -37,7 +37,6 @@ class OpenRouterLLMService(OpenAILLMService):
|
||||
|
||||
Settings = OpenRouterLLMSettings
|
||||
_settings: Settings
|
||||
supports_developer_role = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -53,7 +52,7 @@ class OpenRouterLLMService(OpenAILLMService):
|
||||
Args:
|
||||
api_key: The API key for accessing OpenRouter's API. If None, will attempt
|
||||
to read from environment variables.
|
||||
model: The model identifier to use. Defaults to "openai/gpt-4.1".
|
||||
model: The model identifier to use. Defaults to "openai/gpt-4o-2024-11-20".
|
||||
|
||||
.. deprecated:: 0.0.105
|
||||
Use ``settings=OpenRouterLLMService.Settings(model=...)`` instead.
|
||||
@@ -64,7 +63,7 @@ class OpenRouterLLMService(OpenAILLMService):
|
||||
**kwargs: Additional keyword arguments passed to OpenAILLMService.
|
||||
"""
|
||||
# 1. Initialize default_settings with hardcoded defaults
|
||||
default_settings = self.Settings(model="openai/gpt-4.1")
|
||||
default_settings = self.Settings(model="openai/gpt-4o-2024-11-20")
|
||||
|
||||
# 2. Apply direct init arg overrides (deprecated)
|
||||
if model is not None:
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
import json
|
||||
import time
|
||||
from collections import Counter
|
||||
from collections.abc import AsyncGenerator
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
@@ -202,24 +201,6 @@ def _prepare_language_hints(
|
||||
return list(set(prepared_languages))
|
||||
|
||||
|
||||
def _language_from_tokens(tokens: list[dict]) -> Language | None:
|
||||
language_counts: Counter[Language] = Counter()
|
||||
|
||||
for token in tokens:
|
||||
language = token.get("language")
|
||||
if not language:
|
||||
continue
|
||||
try:
|
||||
language_counts[Language(language)] += 1
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if not language_counts:
|
||||
return None
|
||||
|
||||
return language_counts.most_common(1)[0][0]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SonioxSTTSettings(STTSettings):
|
||||
"""Settings for SonioxSTTService.
|
||||
@@ -576,7 +557,6 @@ class SonioxSTTService(WebsocketSTTService):
|
||||
async def send_endpoint_transcript():
|
||||
if self._final_transcription_buffer:
|
||||
text = "".join(map(lambda token: token["text"], self._final_transcription_buffer))
|
||||
language = _language_from_tokens(self._final_transcription_buffer)
|
||||
# Soniox only pushes TranscriptionFrame when an end token is received,
|
||||
# so every TranscriptionFrame is inherently finalized
|
||||
await self.push_frame(
|
||||
@@ -584,12 +564,11 @@ class SonioxSTTService(WebsocketSTTService):
|
||||
text=text,
|
||||
user_id=self._user_id,
|
||||
timestamp=time_now_iso8601(),
|
||||
language=language,
|
||||
result=self._final_transcription_buffer,
|
||||
finalized=True,
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(text, is_final=True, language=language)
|
||||
await self._handle_transcription(text, is_final=True)
|
||||
await self.stop_processing_metrics()
|
||||
self._final_transcription_buffer = []
|
||||
|
||||
|
||||
@@ -1283,17 +1283,10 @@ class TTSService(AIService):
|
||||
def get_active_audio_context_id(self) -> str | None:
|
||||
"""Get the active audio context ID.
|
||||
|
||||
Returns the playback cursor when set (during active playback), falling
|
||||
back to the current turn's synthesis context_id. The fallback covers
|
||||
the gap between contexts and the start of a turn before the playback
|
||||
task has popped the just-created context off the serialization queue —
|
||||
important for services whose wire protocol does not echo context_id
|
||||
back on incoming audio.
|
||||
|
||||
Returns:
|
||||
The active context ID, or None if neither cursor is set.
|
||||
The active context ID, or None if no context is active.
|
||||
"""
|
||||
return self._playing_context_id or self._turn_context_id
|
||||
return self._playing_context_id
|
||||
|
||||
async def remove_active_audio_context(self):
|
||||
"""Remove the active audio context."""
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user