Compare commits

..

1 Commits

Author SHA1 Message Date
Mark Backman
e1271aa3f7 Testing CI 2026-04-30 13:16:53 -04:00
248 changed files with 1634 additions and 11272 deletions

View File

@@ -1 +0,0 @@
../../.claude/skills/changelog

View File

@@ -1 +0,0 @@
../../.claude/skills/cleanup

View File

@@ -1 +0,0 @@
../../.claude/skills/code-review

View File

@@ -1 +0,0 @@
../../.claude/skills/docstring

View File

@@ -1 +0,0 @@
../../.claude/skills/pr-description

View File

@@ -1 +0,0 @@
../../.claude/skills/pr-submit

View File

@@ -1 +0,0 @@
../../.claude/skills/update-docs

View File

@@ -1,8 +1,3 @@
---
name: cleanup
description: Review, refactor, document, and validate code changes in the current branch
---
# Code Cleanup Skill
The **Code Cleanup Skill** reviews, refactors, and documents code changes in your current branch, ensuring alignment with **Pipecat's architecture, coding standards, and example patterns**.

View File

@@ -42,7 +42,6 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

View File

@@ -32,9 +32,7 @@ jobs:
run: uv python install 3.12
- name: Install development dependencies
# `--all-extras` (matching the dev setup in README.md) so pyright can
# resolve types from various optional dependencies.
run: uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
run: uv sync --group dev --extra daily --extra tracing
- name: Ruff formatter
id: ruff-format

View File

@@ -46,7 +46,6 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

174
AGENTS.md
View File

@@ -1,174 +0,0 @@
# AGENTS.md
This file provides guidance to AI coding agents when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Deprecations**: Use the `.. deprecated:: <version>` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
# Pydantic params class with a deprecated field
class MyParams(BaseModel):
"""Configuration parameters for MyService.
Parameters:
new_setting: Replacement for ``old_setting``.
old_setting: Legacy setting, no longer used.
.. deprecated:: 1.2.0
Use ``new_setting`` instead. Will be removed in 2.0.0.
"""
new_setting: str = "default"
old_setting: str | None = None
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.

158
CLAUDE.md
View File

@@ -1 +1,157 @@
@AGENTS.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.

View File

@@ -1 +0,0 @@
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService` for controlling Cartesia's server-side text buffering. When unset, Pipecat picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE` mode (custom buffering — avoids stacking client-side aggregation on top of Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode (Cartesia's managed buffering applies). Pass an explicit value (05000ms) to override.

View File

@@ -1 +0,0 @@
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16` to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the `use_normalized_timestamps` and `max_buffer_delay_ms` fields.

View File

@@ -1 +0,0 @@
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead of the deprecated `use_original_timestamps` field. Word timestamps now reflect what was actually spoken (post text-normalization and pronunciation-dictionary substitution), matching the convention Pipecat uses for ElevenLabs. This is a behavior change for `sonic-3` users, who were previously receiving timestamps tied to the input transcript.

View File

@@ -1 +0,0 @@
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200 response — one with the API's error text and a second, less informative "Unknown error" frame from the outer exception handler. It now pushes a single frame that includes the HTTP status code and returns cleanly.

View File

@@ -1 +0,0 @@
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`, `VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance (e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both the class and an instance.

View File

@@ -1 +0,0 @@
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as `ErrorFrame`s. The latest API emits a `flush_done` per transcript when server-side buffering is disabled; Pipecat now consumes them silently since each turn already has its own `context_id`.

View File

@@ -1 +0,0 @@
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally for user turn stop strategies. It is now only imported when `default_user_turn_stop_strategies()` is called. This improves startup time and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning when the default stop strategies are not used.

View File

@@ -1 +0,0 @@
- Broadened `tool_resources` to `app_resources` for easy access not just in tool handlers but in other places like custom `FrameProcessor`s. Three changes: a rename (`tool_resources``app_resources`), a new `app_resources` property on `PipelineTask`, and a new `pipeline_task` property on `FrameProcessor`. Tool handlers now read `params.app_resources`; custom processors read `self.pipeline_task.app_resources`. The previous `tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and `FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit `DeprecationWarning`s.

View File

@@ -1 +0,0 @@
- Lowered the per-message log in `SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App messages can be high-frequency and were noisy at debug level; set the loguru level to `TRACE` to see them again.

View File

@@ -1 +0,0 @@
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and `DeepgramHttpTTSService` so callers can opt out of the Deepgram Model Improvement Program. When set, the value is forwarded to Deepgram as a query parameter on the speak request. Defaults to `None`, which preserves the existing behavior. See https://dpgr.am/deepgram-mip for pricing implications before enabling.

View File

@@ -1 +0,0 @@
- Changed the default model for `GrokRealtimeLLMService` to `grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is being removed.

View File

@@ -1 +0,0 @@
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was stored in `Settings` but never sent to xAI, so every session silently fell back to xAI's server-side default. The model is now passed via the `?model=` query parameter on the WebSocket URL as xAI's Voice Agent API requires.

View File

@@ -1 +0,0 @@
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that appends a developer-role message to the context whenever `LLMSetToolsFrame` changes the set of advertised standard tools. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable.

View File

@@ -1 +0,0 @@
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.

View File

@@ -1 +0,0 @@
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.

View File

@@ -1 +0,0 @@
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.

View File

@@ -1 +0,0 @@
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.

View File

@@ -1 +0,0 @@
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.

View File

@@ -1 +0,0 @@
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.

View File

@@ -1 +0,0 @@
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.

View File

@@ -1,6 +0,0 @@
- Added first-class RTVI support for the UI Agent Protocol:
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server messages, plus `ui-command` and `ui-task` server-to-client messages, with paired `*Data` / `*Message` pydantic models.
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`, `Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching default handlers live in `@pipecat-ai/client-react`.
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`, and `ui-cancel-task` messages.
- Adds five UI pipeline frames, mirroring the `client-message` frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` / `RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` / `UITaskMessage` envelopes, while the processor pushes inbound `RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame` alongside `on_ui_message`.
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.

View File

@@ -1 +0,0 @@
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting across two assistant messages in the LLM context and not surfacing in `on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted at the end of a TTS context now carries a PTS just past the last word so it can't overtake clock-queued `TTSTextFrame`s in the transport's output, and `LLMAssistantAggregator` now triggers `on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting transcripts).

View File

@@ -1 +0,0 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged words (e.g. `bookLook`) when using Flash models. Flash often splits sentences mid-stream into alignment chunks that begin with a real inter-word space, but the previous fix unconditionally stripped that space from every chunk. Leading spaces are now stripped only on the first alignment chunk of an utterance, so subsequent chunks correctly flush partial words across boundaries.

View File

@@ -1 +0,0 @@
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor now resolve credentials via the standard boto3 provider chain (EC2 instance profiles, EKS pod roles / IRSA, ECS task roles, SSO, `~/.aws/credentials`) when explicit credentials and `AWS_*` environment variables are absent. Services running with IAM roles no longer need to export static credentials.

View File

@@ -1 +0,0 @@
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` was set in the environment. The half-populated kwargs are no longer forwarded to aioboto3; partial env-var configurations now fall through to the boto3 credential chain like fully-unset configurations do.

View File

@@ -1 +0,0 @@
- Fixed a path traversal issue in the development runner's `/files/{filename:path}` download endpoint. Previously, when the runner was started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could escape the configured folder because `%2F`-encoded separators bypassed Starlette's path normalisation. The endpoint now resolves the joined path and rejects any filename that escapes the allowed base with a 403, and also returns 404 (instead of an implicit `null` 200) when `--folder` is unset.

View File

@@ -1 +0,0 @@
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to `inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`, `InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing users can pin the prior model explicitly via the `model`/`tts_model` argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid model IDs.

View File

@@ -1 +0,0 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing romanized/normalized text to the LLM context. With non-Latin input (e.g., Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao !` instead of `你好!`), which then degraded subsequent LLM turns. The services now consume `alignment` by default and only switch to `normalizedAlignment` / `normalized_alignment` when `pronunciation_dictionary_locators` is configured (where `alignment` has overlapping restarts that produce duplicated/garbled words, per #4316). Both fields are read with preferred-with-fallback semantics since each is nullable per the API schema.

View File

@@ -1 +0,0 @@
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can bias transcription for both file-based and realtime transcription.

View File

@@ -1 +0,0 @@
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the `create_file_resampler()` / `create_stream_resampler()` factories). Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class will be removed in Pipecat 2.0 along with the default `resampy` and `numba` dependencies.

View File

@@ -1 +0,0 @@
- Changed the default model for `GrokLLMService` from `grok-3` to `grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.

View File

@@ -1 +0,0 @@
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.

View File

@@ -1 +0,0 @@
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.

View File

@@ -1 +0,0 @@
- Fixed a deadlock in `TTSService` that could permanently stall pipeline processing when all three conditions occurred together: `pause_frame_processing=True`, an interruption arrived before any TTS audio was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`, `FunctionCallResultFrame`) was in the processing queue at that moment. The process task would block on `__process_event.wait()` indefinitely because `BotStoppedSpeakingFrame` never arrives (no audio was played) and the interruption handler did not resume processing. Affects services using `pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and ResembleAI.

View File

@@ -1 +0,0 @@
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.

View File

@@ -1 +0,0 @@
- Fixed interruptions being delayed when a slow non-uninterruptible frame was processing and an uninterruptible frame was waiting in the queue. The bot would stall until the slow frame finished instead of cancelling it immediately on interruption.

View File

@@ -1 +0,0 @@
- Fixed `TTSService` dropping uninterruptible frames (e.g. `FunctionCallResultFrame`) from its internal serialization queue when an interruption occurs. Previously, the queue was recreated on every interruption, silently discarding any queued frames. The queue is now reset instead of recreated, preserving uninterruptible frames so they are always delivered downstream.

View File

@@ -1 +0,0 @@
- Fixed a race condition in the Daily transport that caused `AttributeError: 'NoneType' object has no attribute 'send_app_message'` when tearing down a pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the same `DailyTransportClient` and both call `cleanup()`, which was releasing the underlying `CallClient` on the first call — leaving the second caller with a `None` client.

View File

@@ -1 +0,0 @@
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService` and `OpenAIRealtimeLLMService`. These services previously honored the flag by simply not cancelling in-flight function calls on interruption; the introduction of the new async-tool mechanism (which threads started/intermediate/final messages through the LLM context) broke that path because the realtime services didn't know how to interpret those messages. Note that new-style streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are not supported on these realtime services. Similar fixes for other impacted realtime services are forthcoming.

View File

@@ -1 +0,0 @@
- Fixed two misspelled Gemini TTS voice names in `GeminiTTSService.AVAILABLE_VOICES`.

View File

@@ -1 +0,0 @@
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio encoding by default, which returns audio bytes without headers.

View File

@@ -1 +0,0 @@
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.

View File

@@ -1 +0,0 @@
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini 2.x); the conversation now continues while the tool runs. On models that don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time warning explaining the limitation. (Note: an intermittent 1008 error can occasionally fire on Gemini 2.5 during long-running tool calls; we auto-reconnect.)

View File

@@ -1 +0,0 @@
- Moved `create_task`, `cancel_task`, the `task_manager` property, and `setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom `BaseObject` subclasses (turn strategies, controllers, etc.) now inherit these methods directly instead of reimplementing the task manager wiring. Owners propagate the task manager to their child `BaseObject`s via `await child.setup(task_manager)`.

View File

@@ -1 +0,0 @@
- Changed the default OpenAI Realtime input audio transcription model from `gpt-4o-transcribe` to `gpt-realtime-whisper` for both `OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does not accept the `prompt` parameter; if a prompt is supplied alongside `gpt-realtime-whisper`, it is dropped automatically and a warning is logged. To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or `"gpt-4o-mini-transcribe"`).

View File

@@ -1 +0,0 @@
- Updated the default model for `CartesiaTTSService` and `CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.

View File

@@ -1 +0,0 @@
- Added NVIDIA Magpie TTS services via AWS SageMaker: `NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream with full interruption support via `InterruptibleTTSService`).

View File

@@ -1 +0,0 @@
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint. Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is VAD-aware, and automatically reconnects on error.

View File

@@ -1 +0,0 @@
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses (observed with `gpt-realtime-2`). A single response can now contain more than one audio item, and the first item's `audio.done` may arrive after the second item's deltas have started. Deltas still arrive strictly in playback order, so we continue to forward them as received (matching OpenAI's reference implementation). The fix removes spurious warnings, ensures truncation always targets the latest audio item, and emits a single bracketing `TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is now pushed on `response.done`).

View File

@@ -1 +0,0 @@
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.

View File

@@ -1 +0,0 @@
- Changed the default model for `OpenAIRealtimeLLMService` from `gpt-realtime-1.5` to `gpt-realtime-2`.

View File

@@ -1 +0,0 @@
- Added `wait_for_transcript_to_end_user_turn` on `LLMUserAggregatorParams` for pipelines where local turn detection drives a realtime service like Gemini Live. Set it to False to avoid unnecessary latency from transcript delay — the realtime service consumes user audio directly, so we don't need user transcripts in context before it can respond. The option makes it so that (1) turn strategies do not consider user transcripts, letting the user turn end sooner, and (2) user transcripts are then handled by the aggregator: a simple timer gives it time to gather those transcripts after the user turn ends, and once gathered, the aggregator emits a new `on_user_turn_message_finalized` event with the new user context message. The new event also fires in the default mode (coinciding with `on_user_turn_stopped`), so consumers that want the populated user transcript can subscribe to it uniformly. See `examples/realtime/realtime-gemini-live-local-vad.py` for the full pattern.

View File

@@ -132,10 +132,6 @@ NOVITA_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
SAGEMAKER_ASR_ENDPOINT_NAME=...
SAGEMAKER_MAGPIE_ENDPOINT_NAME=...
# OpenAI
OPENAI_API_KEY=...

View File

@@ -1,232 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual validation harness for the ``add_tool_change_messages`` feature.
When tools change mid-conversation, LLMs can produce a few different
flavors of tool-call-related hallucination:
- **Forward hallucination** — calling a tool that has been removed.
- **Negative hallucination** — refusing to call a tool that has been
re-added (because recent context is full of "I can't" responses).
- **Hallucinated output when tools are unavailable** — making up an
answer rather than declining gracefully, or producing JSON that
*looks* like a tool call but is actually just an assistant text
response.
The ``add_tool_change_messages`` feature mitigates these by appending a
developer-role message to the conversation whenever ``LLMSetToolsFrame``
changes the set of advertised tools, so the LLM stays in sync with what's
actually available.
This harness exercises all of those flavors by flipping the advertised
tool set on a turn counter:
Phase 0 (turns 14): weather tool ACTIVE — confirm baseline.
Phase 1 (turns 58): tool REMOVED — keep asking for weather.
Phase 2 (turn 9+): tool RE-ADDED — does the LLM call it again?
Set ``ADD_TOOL_CHANGE_MESSAGES=0`` to disable the mitigation and see the
unmitigated behavior. The default is ON so a fresh run shows the feature
working.
Defaults to Llama 3.1 8B Instruct via a locally-running Ollama —
anecdotally one of the more hallucination-prone of the easily accessible
models. Pull the model once with ``ollama pull llama3.1:8b`` and make
sure ``ollama serve`` is running. Swap the LLM service to validate other
providers.
Run with::
uv run examples/features/features-add-tool-change-messages.py
ADD_TOOL_CHANGE_MESSAGES=0 uv run examples/features/features-add-tool-change-messages.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# Default ON so a fresh run shows the feature working. Set to "0" to A/B
# against the unmitigated behavior.
ADD_TOOL_CHANGE_MESSAGES = os.environ.get("ADD_TOOL_CHANGE_MESSAGES", "1") == "1"
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(
f"Starting add_tool_change_messages demo bot "
f"(ADD_TOOL_CHANGE_MESSAGES={ADD_TOOL_CHANGE_MESSAGES})"
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OLLamaLLMService(
settings=OLLamaLLMService.Settings(
# Llama 3.1 8B Instruct is anecdotally one of the more
# hallucination-prone of the easily accessible models — exactly
# what we want for this validation harness. Pull it with
# ``ollama pull llama3.1:8b`` and make sure ``ollama serve``
# is running.
model="llama3.1:8b",
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"If the user asks for the current weather, use the `get_current_weather` "
"function if it's available. IMPORTANT: if you do not have access to the function, "
"say something along the lines of 'Sorry, I can't check the weather right now.'."
),
),
)
llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
add_tool_change_messages=ADD_TOOL_CHANGE_MESSAGES,
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Phase controller: roughly 4 turns per phase.
user_turn_count = 0
REMOVE_AT_TURN = 5 # tool gone for turn N onward
READD_AT_TURN = 9 # tool back for turn N onward
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
nonlocal user_turn_count
user_turn_count += 1
logger.info(f"=== User turn {user_turn_count} complete ===")
if user_turn_count == REMOVE_AT_TURN - 1:
logger.info(
"=== Phase 1: weather tool REMOVED. Keep asking about the weather "
"to exercise hallucination scenarios. ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=NOT_GIVEN))
elif user_turn_count == READD_AT_TURN - 1:
logger.info(
"=== Phase 2: weather tool RE-ADDED. Ask for the weather again — "
"does the LLM call it, or keep refusing? (THIS IS THE TEST.) ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=weather_tools))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Phase 0: weather tool ACTIVE. Ask for the weather a few times "
"to confirm it's working. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite them "
"to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,187 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual demonstration of the missing-handler (developer-error) recovery path.
When a tool is advertised to the LLM via ``tools``/``LLMContext`` but
the developer forgets to call ``llm.register_function(...)`` to wire up
its handler, the LLM happily emits a tool call and then... nothing
happens on the Pipecat side, leaving the conversation stuck.
Pipecat's recovery path (``LLMService._missing_function_call_handler``)
catches this case:
- Logs a ``logger.error`` distinguishing **developer error** (tool advertised
but no handler registered) from a hallucination (tool not advertised),
pointing at the missing ``register_function`` call.
- Returns a neutral terminal tool result
(``LLMService.MISSING_FUNCTION_CALL_MESSAGE_TEMPLATE``: "The function
`X` is not currently available.") so the call still terminates with a
normal tool result instead of leaving the conversation stuck.
This example is **deliberately broken**: the weather schema is in
``tools`` but ``register_function`` is *not* called. Ask the bot about
the weather and observe:
1. The LLM emits a tool call for ``get_current_weather``.
2. ``logger.error`` fires with "advertised … but has no registered handler
— did you forget to call register_function()?"
3. The terminal tool result is fed back to the LLM.
4. The LLM responds in voice based on that result (typically something
like "the weather function isn't available right now").
Uses the OpenAI LLM service with defaults. Swap to another provider to
validate this behavior elsewhere.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting missing-handler demo bot (no handler is registered on purpose)")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"Always use the get_current_weather function to answer questions "
"about the current weather."
),
),
)
# *** DELIBERATELY OMITTED ***
# The whole point of this example is to demonstrate the missing-handler
# recovery path. Re-add this line to wire the tool up correctly:
#
# llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Ask for the weather. Watch for a logger.error about the missing "
"handler, and listen for the LLM's response based on the recovery "
"message. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite "
"them to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -29,7 +29,7 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAIRealtimeSTTService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -69,7 +69,13 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
stt = OpenAISTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAISTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related weather, such as temperature and conditions. And restaurant names.",
),
)
tts = OpenAITTSService(
api_key=os.environ["OPENAI_API_KEY"],

View File

@@ -25,7 +25,7 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAIRealtimeSTTService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -63,7 +63,13 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
stt = OpenAISTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAISTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related weather, such as temperature and conditions. And restaurant names.",
),
)
tts = OpenAITTSService(
api_key=os.environ["OPENAI_API_KEY"],

View File

@@ -4,33 +4,23 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example demonstrating ``PipelineTask(app_resources=...)``.
"""Example demonstrating ``PipelineTask(tool_resources=...)``.
``app_resources`` is an application-defined bag of anything your
application code may want to share across a session: database handles,
HTTP clients, feature flags, per-user state, observability clients,
in-memory caches whatever fits your app. Pipecat passes it through
untouched and exposes it as ``task.app_resources``, so any code with a
handle on the task can read or mutate it.
``tool_resources`` is an application-defined bag of anything you want every
tool handler in a session to share by reference: database handles, HTTP
clients, feature flags, per-user state, observability clients, in-memory
caches whatever fits your app. Pipecat passes it through untouched as
``FunctionCallParams.tool_resources``.
Two of the convenience aliases exercised below:
This example uses a small ``ToolCallLogger`` as a stand-in for that "shared
thing". A real app might just as easily pass a Postgres pool, a Redis
client, a Stripe SDK instance, or any combination thereof. The mechanics
shown here construct once, hand to the task, read it from each handler,
inspect it after the session are the same regardless of what you put in.
- Tool handlers read it from ``FunctionCallParams.app_resources``.
- Custom ``FrameProcessor`` subclasses read it from
``self.pipeline_task.app_resources``.
This example uses two small loggers as stand-ins for that "shared thing":
``ToolCallLogger`` (written from tool handlers) and
``TranscriptionLogger`` (written from a custom ``FrameProcessor`` that
sits in the pipeline). A real app might just as easily pass a Postgres
pool, a Redis client, a Stripe SDK instance, or any combination thereof.
The mechanics shown here construct once, hand to the task, read it
from each site, inspect it after the session are the same regardless
of what you put in.
We bundle resources in a typed ``AppResources`` dataclass and cast back
to it at each read site. Pipecat doesn't care what type you pass (a
plain dict works too), but a typed container gives you autocomplete and
We bundle resources in a typed ``SessionResources`` dataclass and cast back
to it at the top of each handler. Pipecat doesn't care what type you pass
(a plain dict works too), but a typed container gives you autocomplete and
refactor safety instead of dict-by-string-key lookups.
"""
@@ -38,7 +28,7 @@ import json
import os
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import UTC, datetime
from datetime import UTC, datetime, timezone
from typing import Any, cast
from dotenv import load_dotenv
@@ -47,7 +37,7 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMRunFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -56,7 +46,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -97,80 +86,30 @@ class ToolCallLogger:
return json.dumps(self._calls, indent=2)
class TranscriptionLogger:
"""Records final user transcriptions — written from a custom FrameProcessor."""
def __init__(self):
"""Initialize the logger with an empty list of recorded transcriptions."""
self._entries: list[dict[str, Any]] = []
def log_transcription(self, text: str) -> None:
"""Record a transcription.
Args:
text: The transcribed user utterance.
"""
entry = {
"timestamp": datetime.now(UTC).isoformat(),
"text": text,
}
self._entries.append(entry)
logger.info(f"[TranscriptionLogger] {text!r}")
def dump(self) -> str:
"""Return all recorded transcriptions as a JSON string."""
return json.dumps(self._entries, indent=2)
@dataclass
class AppResources:
"""Typed container for everything the app shares across this session.
class SessionResources:
"""Typed container for everything the tool handlers in this session share.
Add fields here as the app grows (e.g. ``db: AsyncConnection``,
``http: httpx.AsyncClient``). Read sites ``cast()`` to this type to
get autocomplete and refactor safety:
- In tools: ``cast(AppResources, params.app_resources)``.
- In custom processors: ``cast(AppResources, self.pipeline_task.app_resources)``.
``http: httpx.AsyncClient``). Handlers ``cast()`` ``params.tool_resources``
to this type to get autocomplete and refactor safety.
"""
tool_call_logger: ToolCallLogger
transcription_logger: TranscriptionLogger
async def fetch_weather_from_api(params: FunctionCallParams):
resources = cast(AppResources, params.app_resources)
resources = cast(SessionResources, params.tool_resources)
resources.tool_call_logger.log_tool_call(params.function_name, params.arguments)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
resources = cast(AppResources, params.app_resources)
resources = cast(SessionResources, params.tool_resources)
resources.tool_call_logger.log_tool_call(params.function_name, params.arguments)
await params.result_callback({"name": "The Golden Dragon"})
class TranscriptionLoggingProcessor(FrameProcessor):
"""Logs each final user transcription into the shared app resources.
Demonstrates the second read site for ``app_resources``: any custom
``FrameProcessor`` can reach the same bag every tool handler sees by
going through ``self.pipeline_task.app_resources``. ``pipeline_task``
is ``None`` until the task sets the processor up, so we guard against
that case.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Forward all frames; log final user transcriptions on the way through."""
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame) and self.pipeline_task is not None:
resources = cast(AppResources, self.pipeline_task.app_resources)
resources.transcription_logger.log_transcription(frame.text)
await self.push_frame(frame, direction)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
@@ -264,7 +203,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
[
transport.input(),
stt,
TranscriptionLoggingProcessor(),
user_aggregator,
llm,
tts,
@@ -273,14 +211,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
)
# Keep local handles so we can read collected state after the session
# Keep a local handle so we can read collected state after the session
# ends; Pipecat never copies or clears the object.
tool_call_logger = ToolCallLogger()
transcription_logger = TranscriptionLogger()
resources = AppResources(
tool_call_logger=tool_call_logger,
transcription_logger=transcription_logger,
)
resources = SessionResources(tool_call_logger=tool_call_logger)
task = PipelineTask(
pipeline,
@@ -289,7 +223,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
app_resources=resources,
tool_resources=resources,
)
@transport.event_handler("on_client_connected")
@@ -312,7 +246,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# The session has ended; read whatever state the handlers built up.
logger.info(f"Tool calls logged during session:\n{tool_call_logger.dump()}")
logger.info(f"Transcriptions logged during session:\n{transcription_logger.dump()}")
async def bot(runner_args: RunnerArguments):

View File

@@ -1,184 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the AWS Nova Sonic LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Nova Sonic via the
formal toolResult channel so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
llm = AWSNovaSonicLLMService(
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
region=os.environ["AWS_REGION"],
session_token=os.getenv("AWS_SESSION_TOKEN"),
settings=AWSNovaSonicLLMService.Settings(
voice="tiffany",
system_instruction=system_instruction,
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -46,6 +46,11 @@ async def fetch_weather_from_api(params: FunctionCallParams):
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
# Simulate a long network delay.
# You can continue chatting while waiting for this to complete.
# With Nova 2 Sonic (the default model), the assistant will respond
# appropriately once the function call is complete.
await asyncio.sleep(5)
await params.result_callback(
{
"conditions": "nice",
@@ -145,7 +150,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register function for function calls
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function(
"get_current_weather", fetch_weather_from_api, cancel_on_interruption=False
)
# Set up context and context management.
context = LLMContext(tools=tools)

View File

@@ -1,195 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Azure Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Azure Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = AzureRealtimeLLMService(
api_key=os.environ["AZURE_REALTIME_API_KEY"],
base_url=os.environ["AZURE_REALTIME_BASE_URL"],
settings=AzureRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(model="whisper-1"),
)
),
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,25 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Gemini Live LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Gemini Live as a
FunctionResponse so the model can integrate it naturally into its next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -41,55 +31,33 @@ load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. google_search: Use this tool to search the web for information.
"""
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
@@ -109,6 +77,42 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.environ["GOOGLE_API_KEY"],
settings=GeminiLiveLLMService.Settings(
@@ -117,12 +121,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
tools=tools,
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
# You can provide the system instructions and tools in the context rather
# than as arguments to GeminiLiveLLMService, but note that doing so will
# trigger a (fast) reconnection when the GeminiLiveLLMService first
# receives the context (i.e. when we send the LLMRunFrame below).
context = LLMContext()
# Server-side VAD is enabled by default; no local VAD is added.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
@@ -149,6 +154,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
@@ -160,6 +166,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -20,7 +20,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserMessageFinalizedMessage,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
@@ -71,25 +70,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
},
],
)
# `wait_for_transcript_to_end_user_turn=False` is the right setting
# for pipelines like this one — local turn detection driving a
# realtime service. It avoids unnecessary latency from transcript
# delay: the realtime service consumes user audio directly, so
# we don't need user transcripts in context before it can respond.
# With this option:
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
wait_for_transcript_to_end_user_turn=False,
),
)
@@ -123,23 +107,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage
):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")

View File

@@ -6,13 +6,10 @@
import os
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -26,7 +23,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -34,32 +30,6 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. google_search: Use this tool to search the web for information.
"""
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
@@ -81,55 +51,23 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.environ["GOOGLE_API_KEY"],
settings=GeminiLiveLLMService.Settings(
system_instruction=system_instruction,
voice="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."
),
tools=tools,
# inference_on_context_initialization=False,
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
context = LLMContext()
context = LLMContext(
[
{
"role": "user",
"content": "Say hello. Then ask if I want to hear a joke.",
},
],
)
# Server-side VAD is enabled by default; no local VAD is added.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
@@ -156,9 +94,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -1,179 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Grok Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Grok Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.xai.realtime.events import SessionProperties
from pipecat.services.xai.realtime.llm import GrokRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
# Note: Grok has built-in server-side VAD, so we don't need local VAD.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GrokRealtimeLLMService(
api_key=os.environ["XAI_API_KEY"],
settings=GrokRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
voice="Ara",
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,198 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the OpenAI Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to OpenAI Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioNoiseReduction,
InputAudioTranscription,
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=SemanticTurnDetection(),
noise_reduction=InputAudioNoiseReduction(type="near_field"),
)
),
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,182 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserMessageFinalizedMessage,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# `turn_detection=False` disables OpenAI Realtime's server-side VAD,
# so this pipeline's local turn detection drives turn boundaries.
# The service then sends `input_audio_buffer.commit` +
# `response.create` when it sees `UserStoppedSpeakingFrame`.
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=False,
),
),
),
),
)
context = LLMContext(
[
{
"role": "developer",
"content": "Say hello. Then ask if I want to hear a joke.",
},
],
)
# `wait_for_transcript_to_end_user_turn=False` is the right setting
# for pipelines like this one — local turn detection driving a
# realtime service. It avoids unnecessary latency from transcript
# delay: the realtime service consumes user audio directly, so
# we don't need user transcripts in context before it can respond.
# With this option:
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
wait_for_transcript_to_end_user_turn=False,
),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage
):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -232,20 +232,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
# [LLMUpdateSettingsFrame(settings=SessionProperties(tools=new_tools).model_dump())]
# )
# Reasoning effort can be changed at runtime too. Only
# reasoning-capable Realtime models (e.g. gpt-realtime-2) support this.
# await task.queue_frames(
# [
# LLMUpdateSettingsFrame(
# delta=OpenAIRealtimeLLMService.Settings(
# session_properties=SessionProperties(
# reasoning=Reasoning(effort="xhigh"),
# ),
# )
# )
# ]
# )
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")

View File

@@ -1,186 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Ultravox Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
Ultravox's API freezes the conversation between ``client_tool_invocation``
and the matching ``client_tool_result``, so the service ships a placeholder
``client_tool_result`` immediately when an async-registered function is
invoked (to unfreeze the conversation). When the real tool finishes, the
actual result is injected as user-side text so the model picks it up.
"""
import asyncio
import datetime
import os
import random
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
system_prompt = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = UltravoxRealtimeLLMService(
params=OneShotInputParams(
api_key=os.environ["ULTRAVOX_API_KEY"],
system_prompt=system_prompt,
temperature=0.3,
max_duration=datetime.timedelta(minutes=3),
),
one_shot_selected_tools=ToolsSchema(standard_tools=[weather_function]),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext([])
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
vad_analyzer=SileroVADAnalyzer(),
),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -49,7 +49,13 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
stt = OpenAIRealtimeSTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeSTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related to dogs, such as breed names.",
),
)
tl = TranscriptionLogger()
vad_processor = VADProcessor(vad_analyzer=SileroVADAnalyzer())

View File

@@ -10,7 +10,7 @@ Demonstrates LLM-based turn completion detection to suppress bot responses when
the user was cut off mid-thought. The LLM outputs one of three markers:
- ✓ (complete): User finished their thought, respond normally
- ○ (incomplete short): User was cut off, wait ~5s then prompt
- ◐ (incomplete long): User needs time to think, wait ~10s then prompt
- ◐ (incomplete long): User needs time to think, wait ~15s then prompt
When incomplete is detected, the bot's response is suppressed. After the timeout
expires, the LLM is automatically prompted to re-engage the user.
@@ -41,7 +41,6 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies
load_dotenv(override=True)
@@ -84,28 +83,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
context = LLMContext()
# `FilterIncompleteUserTurnStrategies` pairs the default detector
# chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors
# trigger LLM inference but the public `on_user_turn_stopped` event
# fires only when the LLM confirms ✓. The LLM marks each response
# with one of:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 10s, then prompt)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
user_turn_strategies=FilterIncompleteUserTurnStrategies(
# Optional: customize turn completion behavior
# config=UserTurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=10.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
),
# Enable turn completion filtering - the LLM will output:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 15s, then prompt)
filter_incomplete_user_turns=True,
# Optional: customize turn completion behavior
# turn_completion_config=TurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=15.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
),
)

View File

@@ -1,129 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.nvidia.llm import NvidiaLLMService
from pipecat.services.nvidia.sagemaker.stt import NvidiaSageMakerSTTService
from pipecat.services.nvidia.sagemaker.tts import NvidiaSageMakerTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = NvidiaSageMakerSTTService(
endpoint_name=os.environ["SAGEMAKER_ASR_ENDPOINT_NAME"],
region=os.getenv("AWS_REGION", "us-west-2"),
)
llm = NvidiaLLMService(
api_key=os.environ["NVIDIA_API_KEY"],
settings=NvidiaLLMService.Settings(
model="meta/llama-3.3-70b-instruct",
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
tts = NvidiaSageMakerTTSService(
endpoint_name=os.environ["SAGEMAKER_MAGPIE_ENDPOINT_NAME"],
region=os.getenv("AWS_REGION", "us-west-2"),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -25,6 +25,7 @@ from pipecat.runner.utils import create_transport
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAIRealtimeSTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -52,7 +53,14 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
stt = OpenAIRealtimeSTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeSTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related to dogs, such as breed names.",
language=Language.EN,
),
)
tts = OpenAITTSService(
api_key=os.environ["OPENAI_API_KEY"],
@@ -64,7 +72,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
system_instruction="You are very knowledgable about dogs. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
),
)

View File

@@ -6,54 +6,116 @@
"exclude": ["**/*_pb2.py", "**/__pycache__"],
"ignore": [
"tests",
"src/pipecat/adapters/services/anthropic_adapter.py",
"src/pipecat/adapters/services/aws_nova_sonic_adapter.py",
"src/pipecat/adapters/services/bedrock_adapter.py",
"src/pipecat/adapters/services/gemini_adapter.py",
"src/pipecat/adapters/services/grok_realtime_adapter.py",
"src/pipecat/adapters/services/inworld_realtime_adapter.py",
"src/pipecat/adapters/services/open_ai_adapter.py",
"src/pipecat/adapters/services/open_ai_realtime_adapter.py",
"src/pipecat/adapters/services/open_ai_responses_adapter.py",
"src/pipecat/adapters/services/perplexity_adapter.py",
"src/pipecat/audio/dtmf/utils.py",
"src/pipecat/audio/filters/aic_filter.py",
"src/pipecat/audio/filters/krisp_viva_filter.py",
"src/pipecat/audio/filters/rnnoise_filter.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v2.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py",
"src/pipecat/audio/vad/silero.py",
"src/pipecat/processors/aggregators/llm_context.py",
"src/pipecat/processors/aggregators/llm_response_universal.py",
"src/pipecat/processors/frame_processor.py",
"src/pipecat/processors/frameworks/langchain.py",
"src/pipecat/processors/frameworks/rtvi/observer.py",
"src/pipecat/processors/frameworks/rtvi/processor.py",
"src/pipecat/processors/frameworks/strands_agents.py",
"src/pipecat/services/anthropic/llm.py",
"src/pipecat/services/assemblyai/stt.py",
"src/pipecat/services/aws/agent_core.py",
"src/pipecat/services/aws/llm.py",
"src/pipecat/services/aws/nova_sonic/llm.py",
"src/pipecat/services/aws/sagemaker/bidi_client.py",
"src/pipecat/services/aws/stt.py",
"src/pipecat/services/aws/tts.py",
"src/pipecat/services/aws/utils.py",
"src/pipecat/services/azure/stt.py",
"src/pipecat/services/azure/tts.py",
"src/pipecat/services/cartesia/stt.py",
"src/pipecat/services/cartesia/tts.py",
"src/pipecat/services/deepgram/flux/base.py",
"src/pipecat/services/deepgram/flux/sagemaker/stt.py",
"src/pipecat/services/deepgram/flux/stt.py",
"src/pipecat/services/deepgram/sagemaker/stt.py",
"src/pipecat/services/deepgram/sagemaker/tts.py",
"src/pipecat/services/deepgram/tts.py",
"src/pipecat/services/elevenlabs/stt.py",
"src/pipecat/services/elevenlabs/tts.py",
"src/pipecat/services/fish/tts.py",
"src/pipecat/services/gladia/stt.py",
"src/pipecat/services/google/gemini_live/llm.py",
"src/pipecat/services/google/gemini_live/vertex/llm.py",
"src/pipecat/services/google/image.py",
"src/pipecat/services/google/llm.py",
"src/pipecat/services/google/stt.py",
"src/pipecat/services/google/tts.py",
"src/pipecat/services/gradium/stt.py",
"src/pipecat/services/groq/tts.py",
"src/pipecat/services/heygen/api_interactive_avatar.py",
"src/pipecat/services/heygen/base_api.py",
"src/pipecat/services/heygen/client.py",
"src/pipecat/services/heygen/video.py",
"src/pipecat/services/hume/tts.py",
"src/pipecat/services/inworld/realtime/llm.py",
"src/pipecat/services/inworld/tts.py",
"src/pipecat/services/kokoro/tts.py",
"src/pipecat/services/llm_service.py",
"src/pipecat/services/lmnt/tts.py",
"src/pipecat/services/mem0/memory.py",
"src/pipecat/services/mistral/stt.py",
"src/pipecat/services/mistral/tts.py",
"src/pipecat/services/moondream/vision.py",
"src/pipecat/services/neuphonic/tts.py",
"src/pipecat/services/nvidia/stt.py",
"src/pipecat/services/nvidia/tts.py",
"src/pipecat/services/openai/base_llm.py",
"src/pipecat/services/openai/image.py",
"src/pipecat/services/openai/llm.py",
"src/pipecat/services/openai/realtime/llm.py",
"src/pipecat/services/openai/responses/llm.py",
"src/pipecat/services/openai/stt.py",
"src/pipecat/services/openai/tts.py",
"src/pipecat/services/openrouter/llm.py",
"src/pipecat/services/piper/tts.py",
"src/pipecat/services/resembleai/tts.py",
"src/pipecat/services/rime/tts.py",
"src/pipecat/services/sambanova/llm.py",
"src/pipecat/services/sarvam/stt.py",
"src/pipecat/services/sarvam/tts.py",
"src/pipecat/services/simli/video.py",
"src/pipecat/services/smallest/tts.py",
"src/pipecat/services/soniox/stt.py",
"src/pipecat/services/speechmatics/stt.py",
"src/pipecat/services/stt_service.py",
"src/pipecat/services/tavus/video.py",
"src/pipecat/services/tts_service.py",
"src/pipecat/services/ultravox/llm.py",
"src/pipecat/services/websocket_service.py",
"src/pipecat/services/whisper/stt.py",
"src/pipecat/services/xai/realtime/llm.py",
"src/pipecat/services/xtts/tts.py",
"src/pipecat/transports/base_output.py",
"src/pipecat/transports/daily/transport.py",
"src/pipecat/transports/heygen/transport.py",
"src/pipecat/transports/lemonslice/transport.py",
"src/pipecat/transports/livekit/transport.py",
"src/pipecat/transports/smallwebrtc/connection.py",
"src/pipecat/transports/smallwebrtc/request_handler.py",
"src/pipecat/transports/smallwebrtc/transport.py",
"src/pipecat/transports/tavus/transport.py",
"src/pipecat/transports/websocket/server.py"
"src/pipecat/transports/websocket/client.py",
"src/pipecat/transports/websocket/server.py",
"src/pipecat/transports/whatsapp/client.py"
],
"reportMissingImports": false
}

View File

@@ -223,11 +223,12 @@ TESTS_REALTIME = [
# ("realtime/realtime-azure.py", EVAL_WEATHER),
("realtime/realtime-openai-text.py", EVAL_WEATHER),
("realtime/realtime-openai-live-video.py", EVAL_VISION_CAMERA),
("realtime/realtime-gemini-live.py", EVAL_WEATHER),
("realtime/realtime-gemini-live.py", EVAL_SIMPLE_MATH),
("realtime/realtime-gemini-live-local-vad.py", EVAL_SIMPLE_MATH),
("realtime/realtime-gemini-live-function-calling.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-video.py", EVAL_VISION_CAMERA),
("realtime/realtime-gemini-live-google-search.py", EVAL_ONLINE_SEARCH),
("realtime/realtime-gemini-live-vertex.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-vertex-function-calling.py", EVAL_WEATHER),
("realtime/realtime-aws-nova-sonic.py", EVAL_SIMPLE_MATH),
("realtime/realtime-ultravox.py", EVAL_ORDER),
("realtime/realtime-grok.py", EVAL_WEATHER),

View File

@@ -9,7 +9,7 @@
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, TypeGuard, TypeVar, cast
from typing import Any, TypedDict, TypeGuard, TypeVar
from anthropic import NOT_GIVEN, NotGiven
from anthropic.types.message_param import MessageParam
@@ -121,20 +121,16 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
messages = self._from_universal_context_messages(self.get_messages(context)).messages
# Sanitize messages for logging
messages_for_logging: list[dict[str, Any]] = []
messages_for_logging = []
for message in messages:
msg: dict[str, Any] = copy.deepcopy(dict(message))
content = msg.get("content")
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "image":
source = item.get("source")
if isinstance(source, dict):
source["data"] = "..."
if item.get("type") == "thinking" and item.get("signature"):
item["signature"] = "..."
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item["type"] == "image":
item["source"]["data"] = "..."
if item["type"] == "thinking" and item.get("signature"):
item["signature"] = "..."
messages_for_logging.append(msg)
return messages_for_logging
@@ -189,13 +185,8 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
]
if isinstance(next_message["content"], str):
next_message["content"] = [{"type": "text", "text": next_message["content"]}]
# Concatenate the content. MessageParam types content as
# `str | Iterable[...]`, but this codebase assumes it's
# either a str or a list. The str case is handled above, so
# we assume that both are lists here.
cast(list[Any], current_message["content"]).extend(
cast(list[Any], next_message["content"])
)
# Concatenate the content
current_message["content"].extend(next_message["content"])
# Remove the next message from the list
messages.pop(i + 1)
else:
@@ -248,7 +239,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
}
# Fall back to assuming that the message is already in Anthropic format
return cast(MessageParam, copy.deepcopy(message.message))
return copy.deepcopy(message.message)
def _from_standard_message(self, message: LLMStandardMessage) -> MessageParam:
"""Convert standard universal context message to Anthropic format.
@@ -289,26 +280,20 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
]
}
"""
# ChatCompletionMessageParam (input) and MessageParam (output) are
# different TypedDicts — work with the message as a plain dict for the
# transformations below and cast back to MessageParam at return sites.
msg = cast(dict[str, Any], copy.deepcopy(message))
if msg["role"] == "tool":
return cast(
MessageParam,
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": msg["tool_call_id"],
"content": msg["content"],
},
],
},
)
if msg.get("tool_calls"):
tc = msg["tool_calls"]
message = copy.deepcopy(message)
if message["role"] == "tool":
return {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": message["tool_call_id"],
"content": message["content"],
},
],
}
if message.get("tool_calls"):
tc = message["tool_calls"]
ret = {"role": "assistant", "content": []}
for tool_call in tc:
function = tool_call["function"]
@@ -320,8 +305,8 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
"input": arguments,
}
ret["content"].append(new_tool_use)
return cast(MessageParam, ret)
content = msg.get("content")
return ret
content = message.get("content")
if isinstance(content, str):
# fix empty text
if content == "":
@@ -369,7 +354,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
image_item = content.pop(img_idx)
content.insert(first_txt_idx, image_item)
return cast(MessageParam, msg)
return message
def _with_cache_control_markers(self, messages: list[MessageParam]) -> list[MessageParam]:
"""Add cache control markers to messages for prompt caching.
@@ -384,16 +369,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
def add_cache_control_marker(message: MessageParam):
if isinstance(message["content"], str):
message["content"] = [{"type": "text", "text": message["content"]}]
# Assumptions on the next line:
# - content is a list (str case handled above; this codebase only
# ever constructs content as a str or a list)
# - the list is non-empty (guaranteed by the empty-content
# replacement in `_from_universal_context_messages`)
# - the last item is a dict. The standard-message path enforces
# this via TypedDicts (which are dicts at runtime); the
# LLMSpecificMessage passthrough doesn't, but in practice
# callers use dicts.
cast(list[Any], message["content"])[-1]["cache_control"] = {"type": "ephemeral"}
message["content"][-1]["cache_control"] = {"type": "ephemeral"}
try:
# Add cache control markers to the most recent two user messages.

View File

@@ -8,9 +8,9 @@
import copy
import json
from dataclasses import asdict, dataclass
from dataclasses import dataclass
from enum import Enum
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from loguru import logger
@@ -110,10 +110,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
List of messages in a format ready for logging about AWS Nova Sonic.
"""
return [
asdict(m)
for m in self._from_universal_context_messages(self.get_messages(context)).messages
]
return self._from_universal_context_messages(self.get_messages(context)).messages
@dataclass
class ConvertedMessages:
@@ -126,27 +123,18 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
self, universal_context_messages: list[LLMContextMessage]
) -> ConvertedMessages:
system_instruction = None
messages: list[AWSNovaSonicConversationHistoryMessage] = []
messages = []
# Bail if there are no messages
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
return self.ConvertedMessages()
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below (the role-extraction and conversion
# logic only applies to standard message dicts). If/when this
# adapter grows a per-provider passthrough like the Anthropic
# adapter has, LLMSpecific items can flow through.
ucm: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
universal_context_messages = copy.deepcopy(universal_context_messages)
# If we have a "system" message as our first message,
# pull that out into "instruction"
if ucm and ucm[0].get("role") == "system":
system = ucm.pop(0)
if universal_context_messages[0].get("role") == "system":
system = universal_context_messages.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
@@ -157,21 +145,19 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
# Convert any remaining "system"/"developer" messages to "user",
# as Nova Sonic only supports "user" and "assistant" in history.
for msg in ucm:
for msg in universal_context_messages:
if msg.get("role") in ("system", "developer"):
msg["role"] = "user"
# Process remaining messages to fill out conversation history.
for universal_context_message in ucm:
for universal_context_message in universal_context_messages:
message = self._from_universal_context_message(universal_context_message)
if message:
messages.append(message)
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(
self, message: dict[str, Any]
) -> AWSNovaSonicConversationHistoryMessage | None:
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
@@ -181,18 +167,17 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if role == "user" or role == "assistant":
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(content, list):
text_parts = []
for c in content:
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
text_parts.append(c.get("text"))
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
content = " ".join(t for t in text_parts if t)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history

View File

@@ -10,7 +10,7 @@ import base64
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from loguru import logger
@@ -68,19 +68,16 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
system_instruction,
discard_context_system=True,
)
return cast(
AWSBedrockLLMInvocationParams,
{
"system": [{"text": effective_system}] if effective_system else None,
"messages": converted.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
# To avoid refactoring in AWSBedrockLLMService, we just pass through tool_choice.
# Eventually (when we don't have to maintain the non-LLMContext code path) we should do
# the conversion to Bedrock's expected format here rather than in AWSBedrockLLMService.
"tool_choice": context.tool_choice,
},
)
return {
"system": [{"text": effective_system}] if effective_system else None,
"messages": converted.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
# To avoid refactoring in AWSBedrockLLMService, we just pass through tool_choice.
# Eventually (when we don't have to maintain the non-LLMContext code path) we should do
# the conversion to Bedrock's expected format here rather than in AWSBedrockLLMService.
"tool_choice": context.tool_choice,
}
def get_messages_for_logging(self, context) -> list[dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Bedrock.
@@ -216,36 +213,35 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
]
}
"""
# ChatCompletionMessageParam (input) and the dict shape Bedrock expects
# are different — work with the deepcopied message as a plain dict for
# the transformations below.
msg = cast(dict[str, Any], copy.deepcopy(message))
if msg["role"] == "tool":
message = copy.deepcopy(message)
if message["role"] == "tool":
# Try to parse the content as JSON if it looks like JSON
try:
if msg["content"].strip().startswith("{") and msg["content"].strip().endswith("}"):
content_json = json.loads(msg["content"])
if message["content"].strip().startswith("{") and message[
"content"
].strip().endswith("}"):
content_json = json.loads(message["content"])
tool_result_content = [{"json": content_json}]
else:
tool_result_content = [{"text": msg["content"]}]
tool_result_content = [{"text": message["content"]}]
except (json.JSONDecodeError, ValueError, AttributeError):
tool_result_content = [{"text": msg["content"]}]
tool_result_content = [{"text": message["content"]}]
return {
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": msg["tool_call_id"],
"toolUseId": message["tool_call_id"],
"content": tool_result_content,
},
},
],
}
if msg.get("tool_calls"):
tc = msg["tool_calls"]
ret: dict[str, Any] = {"role": "assistant", "content": []}
if message.get("tool_calls"):
tc = message["tool_calls"]
ret = {"role": "assistant", "content": []}
for tool_call in tc:
function = tool_call["function"]
arguments = json.loads(function["arguments"])
@@ -260,12 +256,12 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
return ret
# Handle text content
content = msg.get("content")
content = message.get("content")
if isinstance(content, str):
if content == "":
return {"role": msg["role"], "content": [{"text": "(empty)"}]}
return {"role": message["role"], "content": [{"text": "(empty)"}]}
else:
return {"role": msg["role"], "content": [{"text": content}]}
return {"role": message["role"], "content": [{"text": content}]}
elif isinstance(content, list):
new_content = []
for item in content:
@@ -304,9 +300,9 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
# Move image before the first text
image_item = new_content.pop(img_idx)
new_content.insert(first_txt_idx, image_item)
return {"role": msg["role"], "content": new_content}
return {"role": message["role"], "content": new_content}
return msg
return message
@staticmethod
def _to_bedrock_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -9,7 +9,7 @@
import base64
import json
from dataclasses import dataclass, field
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from loguru import logger
from openai import NotGiven
@@ -139,36 +139,6 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
return formatted_standard_tools + custom_gemini_tools
@staticmethod
def to_function_response_dict(content: Any) -> dict[str, Any]:
"""Convert a tool-result content value to Gemini's FunctionResponse.response shape.
Gemini's ``FunctionResponse.response`` field requires a dict, so
non-dict values (e.g. plain strings, JSON-encoded scalars, or
sentinel strings like ``"COMPLETED"`` used when a function returned
no value) are wrapped as ``{"value": <value>}``. JSON strings that
decode to a dict are passed through as-is.
Args:
content: The tool-result content. Typically the JSON-encoded
return value of a function, but can also be a plain string
(e.g. ``"COMPLETED"``) or already-parsed dict.
Returns:
A dict suitable for ``FunctionResponse.response``.
"""
if isinstance(content, dict):
return content
if not isinstance(content, str):
return {"value": content}
try:
decoded = json.loads(content)
except (json.JSONDecodeError, ValueError):
return {"value": content}
if isinstance(decoded, dict):
return decoded
return {"value": decoded}
def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about Gemini.
@@ -184,12 +154,9 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
messages = self._from_universal_context_messages(self.get_messages(context)).messages
# Sanitize messages for logging
messages_for_logging: list[dict[str, Any]] = []
messages_for_logging = []
for message in messages:
# `to_json_dict()` returns `dict[str, object]`; treat as a plain
# dict for the value indexing/mutation below. The broad `except`
# below is the safety net if any item isn't shaped as expected.
obj: dict[str, Any] = cast(dict[str, Any], message.to_json_dict())
obj = message.to_json_dict()
try:
if "parts" in obj:
for part in obj["parts"]:
@@ -307,8 +274,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
# Check if we only have function-related messages (no regular text)
effective_system = extracted_system or system_instruction
has_regular_messages = any(
msg.parts is not None
and len(msg.parts) == 1
len(msg.parts) == 1
and getattr(msg.parts[0], "text", None)
and not getattr(msg.parts[0], "function_call", None)
and not getattr(msg.parts[0], "function_response", None)
@@ -380,11 +346,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
parts=[Part(function_call=FunctionCall(name="search", args={"query": "test"}))]
)
"""
# ChatCompletionMessageParam (a union of TypedDicts) doesn't allow
# the dict-style key access used below; treat it as a plain dict.
msg = cast(dict[str, Any], message)
role = msg["role"]
content = msg.get("content", [])
role = message["role"]
content = message.get("content", [])
# Convert non-initial system/developer messages to user role,
# as Gemini doesn't support these as input messages.
@@ -396,8 +359,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
parts = []
tool_call_id_to_name_mapping = {}
if msg.get("tool_calls"):
for tc in msg["tool_calls"]:
if message.get("tool_calls"):
for tc in message["tool_calls"]:
id = tc["id"]
name = tc["function"]["name"]
tool_call_id_to_name_mapping[id] = name
@@ -412,10 +375,19 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
)
elif role == "tool":
role = "user"
response_dict = self.to_function_response_dict(msg["content"])
try:
response = json.loads(message["content"])
if isinstance(response, dict):
response_dict = response
else:
response_dict = {"value": response}
except Exception as e:
# Response might not be JSON-deserializable.
# This occurs with a UserImageFrame, for example, where we get a plain "COMPLETED" string.
response_dict = {"value": message["content"]}
# Get function name from mapping using tool_call_id, or fallback
tool_call_id = msg.get("tool_call_id")
tool_call_id = message.get("tool_call_id")
function_name = "tool_call_result" # Default fallback
if tool_call_id and tool_call_id in params.tool_call_id_to_name_mapping:
function_name = params.tool_call_id_to_name_mapping[tool_call_id]
@@ -519,7 +491,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
def is_tool_call_message(msg: Content) -> bool:
"""Check if message contains only function_call parts."""
return bool(
return (
msg.role == "model"
and msg.parts
and all(getattr(part, "function_call", None) for part in msg.parts)
@@ -527,8 +499,6 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
def message_has_thought_signature(msg: Content) -> bool:
"""Check if any part in the message has a thought_signature."""
if msg.parts is None:
return False
return any(getattr(part, "thought_signature", None) for part in msg.parts)
merged_messages = []
@@ -594,8 +564,6 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
logger.debug(f"Thought signatures to apply: {len(thought_signature_dicts)}")
for ts in thought_signature_dicts:
bookmark = ts.get("bookmark")
if bookmark is None:
continue
if bookmark.get("function_call"):
logger.trace(f" - To function call: {bookmark['function_call']}")
elif bookmark.get("text"):
@@ -697,8 +665,6 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
if (
hasattr(part, "inline_data")
and part.inline_data
and part.inline_data.data is not None
and bookmark_inline_data.data is not None
# Comparing length should be good enough for matching inline data,
# especially since we're already matching thought signatures in
# strict message order. Comparing actual data is expensive.

View File

@@ -13,7 +13,7 @@ Grok's Voice Agent API.
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from loguru import logger
@@ -85,10 +85,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
return self.get_messages(context, truncate_large_values=True)
@dataclass
class ConvertedMessages:
@@ -114,20 +111,11 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below. Other adapters (e.g. Anthropic)
# dispatch LLMSpecific items through a per-provider passthrough.
# The pack-into-single-text-message strategy here doesn't compose
# with opaque per-provider payloads.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
messages = copy.deepcopy(universal_context_messages)
system_instruction = None
# Extract system message as session instructions
if messages and messages[0].get("role") == "system":
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -145,9 +133,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
messages=[self._from_universal_context_message(messages[0])],
system_instruction=system_instruction,
)
@@ -195,29 +181,26 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
ConversationItem formatted for Grok Realtime API.
"""
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if message.get("role") == "user":
content = message.get("content")
if isinstance(content, list):
text_parts = []
text_content = ""
for c in content:
if c.get("type") == "text":
text_parts.append(c.get("text"))
text_content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {msg}"
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
content = " ".join(t for t in text_parts if t).strip()
content = text_content.strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
@@ -225,7 +208,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
arguments=tc["function"]["arguments"],
)
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
@staticmethod
def _to_grok_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -13,7 +13,7 @@ Inworld's Realtime API.
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from loguru import logger
@@ -85,10 +85,7 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
return self.get_messages(context, truncate_large_values=True)
@dataclass
class ConvertedMessages:
@@ -114,20 +111,11 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below. Other adapters (e.g. Anthropic)
# dispatch LLMSpecific items through a per-provider passthrough.
# The pack-into-single-text-message strategy here doesn't compose
# with opaque per-provider payloads.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
messages = copy.deepcopy(universal_context_messages)
system_instruction = None
# Extract system message as session instructions
if messages and messages[0].get("role") == "system":
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -145,9 +133,7 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
messages=[self._from_universal_context_message(messages[0])],
system_instruction=system_instruction,
)
@@ -195,29 +181,26 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
ConversationItem formatted for Inworld Realtime API.
"""
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if message.get("role") == "user":
content = message.get("content")
if isinstance(content, list):
text_parts = []
text_content = ""
for c in content:
if c.get("type") == "text":
text_parts.append(c.get("text"))
text_content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {msg}"
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
content = " ".join(t for t in text_parts if t).strip()
content = text_content.strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
@@ -225,7 +208,7 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
arguments=tc["function"]["arguments"],
)
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
@staticmethod
def _to_inworld_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -127,15 +127,12 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
)
if system_instruction:
# Detect initial system message for warning purposes (don't extract).
# ChatCompletionMessageParam.content is `str | Iterable[...]`; we
# only forward it for warning purposes, so coerce non-strings to
# None — the resolver handles None.
initial_content: str | None = None
if messages and messages[0].get("role") == "system":
raw_content = messages[0].get("content", "")
if isinstance(raw_content, str):
initial_content = raw_content
# Detect initial system message for warning purposes (don't extract)
initial_content = (
messages[0].get("content", "")
if messages and messages[0].get("role") == "system"
else None
)
self._resolve_system_instruction(
initial_content,
system_instruction,
@@ -143,15 +140,12 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
)
messages = [{"role": "system", "content": system_instruction}] + messages
return cast(
OpenAILLMInvocationParams,
{
"messages": messages,
# NOTE; LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools),
"tool_choice": _openai_from_llm_context_tool_choice(context.tool_choice),
},
)
return {
"messages": messages,
# NOTE; LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools),
"tool_choice": _openai_from_llm_context_tool_choice(context.tool_choice),
}
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> list[ChatCompletionToolParam]:
"""Convert function schemas to OpenAI's function-calling format.
@@ -164,19 +158,13 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
with ChatCompletion API.
"""
functions_schema = tools_schema.standard_tools
# `function=...` expects a `FunctionDefinition` TypedDict; the dict
# produced by `to_default_dict()` is structurally compatible. Cast at
# the boundary.
formatted_standard_tools: list[ChatCompletionToolParam] = [
ChatCompletionToolParam(type="function", function=cast(Any, func.to_default_dict()))
formatted_standard_tools = [
ChatCompletionToolParam(type="function", function=func.to_default_dict())
for func in functions_schema
]
custom_openai_tools: list[ChatCompletionToolParam] = []
custom_openai_tools = []
if tools_schema.custom_tools:
custom_openai_tools = cast(
list[ChatCompletionToolParam],
tools_schema.custom_tools.get(AdapterType.OPENAI, []),
)
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
return formatted_standard_tools + custom_openai_tools
def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]:
@@ -190,10 +178,7 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
Returns:
List of messages in a format ready for logging about OpenAI.
"""
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
return self.get_messages(context, truncate_large_values=True)
def _from_universal_context_messages(
self,

View File

@@ -9,7 +9,7 @@
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from loguru import logger
@@ -81,7 +81,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages in a format ready for logging about OpenAI Realtime.
"""
return cast(list[dict[str, Any]], self.get_messages(context, truncate_large_values=True))
return self.get_messages(context, truncate_large_values=True)
@dataclass
class ConvertedMessages:
@@ -101,24 +101,12 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` — those
# are filtered out below. Other adapters (e.g. Anthropic) dispatch
# LLMSpecific items through a per-provider passthrough. For OpenAI
# Realtime, the strategy here packs a multi-message history into a
# single text message (see comment further down), which doesn't
# compose with opaque per-provider payloads. If/when this adapter
# adopts the per-message strategy, LLMSpecific items can flow
# through `_from_universal_context_message` like in other adapters.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
messages = copy.deepcopy(universal_context_messages)
system_instruction = None
# If we have a "system" message as our first message,
# pull that out into session "instructions"
if messages and messages[0].get("role") == "system":
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -136,9 +124,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
messages=[self._from_universal_context_message(messages[0])],
system_instruction=system_instruction,
)
@@ -156,18 +142,18 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
return self.ConvertedMessages(
messages=[
events.ConversationItem(
role="user",
type="message",
content=[
events.ItemContent(
type="input_text",
text="\n\n".join(
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
)
}
],
)
}
],
system_instruction=system_instruction,
)
@@ -175,34 +161,31 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
def _from_universal_context_message(
self, message: LLMContextMessage
) -> events.ConversationItem:
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, list):
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in msg.get("content", []):
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {msg}"
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
@staticmethod
def _to_openai_realtime_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -6,7 +6,7 @@
"""OpenAI Responses API adapter for Pipecat."""
from typing import Any, Required, TypedDict, cast
from typing import Any, TypedDict
from openai._types import NotGiven as OpenAINotGiven
from openai.types.responses import FunctionToolParam, ResponseInputItemParam, ToolParam
@@ -23,10 +23,8 @@ from pipecat.processors.aggregators.llm_context import (
class OpenAIResponsesLLMInvocationParams(TypedDict, total=False):
"""Context-based parameters for invoking OpenAI Responses API."""
# `input` and `tools` are always populated by `get_llm_invocation_params`;
# `instructions` is only set when a system instruction is present.
input: Required[list[ResponseInputItemParam]]
tools: Required[list[ToolParam] | OpenAINotGiven]
input: list[ResponseInputItemParam]
tools: list[ToolParam] | OpenAINotGiven
instructions: str
@@ -66,11 +64,8 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
if system_instruction and messages:
first_msg = messages[0] if not isinstance(messages[0], LLMSpecificMessage) else None
if first_msg and first_msg.get("role") == "system":
# `content` is `str | Iterable[...]`; we only forward it for
# warning purposes. Coerce non-strings to None.
first_content = first_msg.get("content", "")
self._resolve_system_instruction(
first_content if isinstance(first_content, str) else None,
first_msg.get("content", ""),
system_instruction,
discard_context_system=False,
)
@@ -148,10 +143,7 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
Returns:
List of messages in a format ready for logging.
"""
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
return self.get_messages(context, truncate_large_values=True)
def _convert_messages_to_input(
self, messages: list[LLMContextMessage]
@@ -177,15 +169,13 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append(
cast(ResponseInputItemParam, {"role": "developer", "content": content})
)
result.append({"role": "developer", "content": content})
elif role == "user":
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append(cast(ResponseInputItemParam, {"role": "user", "content": content}))
result.append({"role": "user", "content": content})
elif role == "assistant":
tool_calls = message.get("tool_calls")
@@ -204,9 +194,7 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append(
cast(ResponseInputItemParam, {"role": "assistant", "content": content})
)
result.append({"role": "assistant", "content": content})
elif role == "tool":
content = message.get("content", "")

View File

@@ -28,7 +28,6 @@ the messages are sent to Perplexity's API.
"""
import copy
from typing import Any, cast
from openai.types.chat import ChatCompletionMessageParam
@@ -117,11 +116,7 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
if not messages:
return messages
# ChatCompletionMessageParam is a union of TypedDicts; the
# transformations below mutate by key/index in ways those TypedDicts
# don't permit. Work against a plain-dict view for the duration of
# the transformation and cast back at the return site.
msgs: list[dict[str, Any]] = cast(list[dict[str, Any]], copy.deepcopy(messages))
messages = copy.deepcopy(messages)
# Note: "developer" → "user" conversion is handled by the parent adapter
# via the convert_developer_to_user parameter.
@@ -130,10 +125,10 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
# Perplexity allows system messages at the start, but rejects them
# after any non-system message.
in_initial_system_block = True
for i in range(len(msgs)):
if msgs[i].get("role") == "system":
for i in range(len(messages)):
if messages[i].get("role") == "system":
if not in_initial_system_block:
msgs[i]["role"] = "user"
messages[i]["role"] = "user"
else:
in_initial_system_block = False
@@ -142,9 +137,9 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
# messages that violate Perplexity's strict alternation requirement.
# Skip consecutive system messages at the start — Perplexity allows those.
i = 0
while i < len(msgs) - 1:
current = msgs[i]
next_msg = msgs[i + 1]
while i < len(messages) - 1:
current = messages[i]
next_msg = messages[i + 1]
if current["role"] == next_msg["role"] == "system":
# Perplexity allows multiple initial system messages, don't merge
i += 1
@@ -159,7 +154,7 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
next_msg.get("content"), list
):
current["content"].extend(next_msg["content"])
msgs.pop(i + 1)
messages.pop(i + 1)
else:
i += 1
@@ -167,7 +162,7 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
# Perplexity requires the last message to be "user" or "tool".
# OpenAI appears to silently ignore trailing assistant messages
# server-side, so removing them preserves equivalent behavior.
while msgs and msgs[-1].get("role") == "assistant":
msgs.pop()
while messages and messages[-1].get("role") == "assistant":
messages.pop()
return cast(list[ChatCompletionMessageParam], msgs)
return messages

View File

@@ -14,7 +14,7 @@ in-memory after first load to improve performance on subsequent accesses.
import asyncio
import io
import wave
from importlib.resources import as_file, files
from importlib.resources import files
import aiofiles
@@ -52,12 +52,10 @@ async def load_dtmf_audio(button: KeypadEntry, *, sample_rate: int = 8000) -> by
__DTMF_RESAMPLER__ = create_file_resampler()
dtmf_file_name = __DTMF_FILE_NAME.get(button, f"dtmf-{button.value}.wav")
# `as_file` materialises the resource as a real filesystem `Path`,
# which aiofiles can open. (For installed packages this is just the
# bundled file; for zipped distributions it would extract to a temp.)
with as_file(files("pipecat.audio.dtmf").joinpath(dtmf_file_name)) as dtmf_file_path:
async with aiofiles.open(dtmf_file_path, "rb") as f:
data = await f.read()
dtmf_file_path = files("pipecat.audio.dtmf").joinpath(dtmf_file_name)
async with aiofiles.open(dtmf_file_path, "rb") as f:
data = await f.read()
with io.BytesIO(data) as buffer:
with wave.open(buffer, "rb") as wf:

Some files were not shown because too many files have changed in this diff Show More