Compare commits

...

55 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
d8cd28bb8b Merge pull request #2640 from pipecat-ai/aleix/pipecat-0.0.85
update CHANGELOG for 0.0.85
2025-09-12 11:06:41 -07:00
Aleix Conchillo Flaqué
c2df6c8aee update CHANGELOG for 0.0.85 2025-09-12 11:03:32 -07:00
Aleix Conchillo Flaqué
82478be861 scripts(evals): add 19b-openai-realtime-text 2025-09-12 11:03:32 -07:00
Aleix Conchillo Flaqué
0f2b7bc01b examples(foundational): fix 19b-openai-realtime-beta-text 2025-09-12 11:03:32 -07:00
Aleix Conchillo Flaqué
1b2a5df017 Merge pull request #2622 from pipecat-ai/mb/call-data-runner
Add to, from phone info and custom data to the development runner
2025-09-12 10:28:17 -07:00
Mark Backman
2f496ac74f Add optional body parameter to WebsocketRunnerArguments 2025-09-12 11:28:12 -04:00
Mark Backman
22633a63b0 Update changelog 2025-09-12 11:15:03 -04:00
Mark Backman
e5ed0424e4 Remove to/from data from Plivo, as it will rely on body information 2025-09-12 11:10:03 -04:00
Mark Backman
99cfcb1d4e Parsed custom data from Plivo extraHeaders 2025-09-12 08:11:30 -04:00
Mark Backman
d595676436 Add custom data handling for Twilio 2025-09-12 08:11:30 -04:00
Aleix Conchillo Flaqué
0190812ee8 Merge pull request #2639 from pipecat-ai/aleix/min-words-interruption-unit-test
MinWordsInterruptionStrategy unit test
2025-09-11 18:52:39 -07:00
Aleix Conchillo Flaqué
2a24061bbb examples(07ad): remove deprecated user_continuous_stream 2025-09-11 18:50:00 -07:00
Aleix Conchillo Flaqué
89f7e7d199 update CHANGELOG with BaseOutputTransport fix 2025-09-11 16:58:44 -07:00
Aleix Conchillo Flaqué
384814e640 Merge pull request #2456 from a6kme/patch-1
Only set last_frame_time when handling OutputAudioRawFrame
2025-09-11 16:56:25 -07:00
Aleix Conchillo Flaqué
ab4364b833 update CHANGELOG and fix formatting 2025-09-11 15:34:47 -07:00
Aleix Conchillo Flaqué
fafdadad3c Merge pull request #2473 from TheNotary/adds-interim-transcription-frame-support
adds support to Azure STT for creating InterimTranscriptFrames
2025-09-11 15:33:38 -07:00
Aleix Conchillo Flaqué
05dc2fa916 updated CHANGELOG.md with GoogleTTSService updates 2025-09-11 14:36:21 -07:00
Aleix Conchillo Flaqué
0c30cc6ea6 Merge pull request #2547 from manishkjs/feat/google-tts-voice-cloning
feat: add voice cloning and speaking rate to GoogleTTSService
2025-09-11 14:32:21 -07:00
Aleix Conchillo Flaqué
c26d336e34 Merge pull request #2545 from pipecat-ai/aleix/aws-nova-sonic-pre-load-cue
AWSNovaSonicLLMService: pre-load audio cue in the constructor
2025-09-11 14:31:26 -07:00
Mark Backman
37b6198787 Merge pull request #2635 from pipecat-ai/mb/openai-tts-speed 2025-09-11 14:22:51 -07:00
kompfner
3c271da94c Merge pull request #2633 from pipecat-ai/pk/uv-readme-updates
Updating the README to reflect that:
2025-09-11 17:07:41 -04:00
kompfner
be28d3f93b Merge pull request #2637 from pipecat-ai/pk/llm-context-evals-and-bug-fix
`LLMContext` evals and bug fix
2025-09-11 17:00:07 -04:00
marcus-daily
d2f210e960 Bundle Smart Turn v3 with Pipecat 2025-09-11 21:37:16 +01:00
Aleix Conchillo Flaqué
57add41971 tests: add unit test for MinWordsInterruptionStrategy 2025-09-11 13:07:30 -07:00
Aleix Conchillo Flaqué
74b38b59d6 tests(utils): allow passing PipelineParams to run_test() 2025-09-11 13:02:21 -07:00
kompfner
dac58deffc Merge pull request #2636 from pipecat-ai/pk/uv-lock-update-for-smart-turn-v3
uv.lock update for Smart Turn v3
2025-09-11 14:35:36 -04:00
Paul Kompfner
aff11f5121 Fix missing import in llm_response_universal.py 2025-09-11 14:33:17 -04:00
Paul Kompfner
a4023d3915 Update evals to include examples that exercise the universal LLMContext 2025-09-11 14:32:56 -04:00
Paul Kompfner
d6543d244d uv.lock update for Smart Turn v3 2025-09-11 14:07:17 -04:00
Mark Backman
fafcd79870 OpenAITTSService: add speed arg 2025-09-11 13:53:52 -04:00
Paul Kompfner
6a717fbbd1 Updating the README to reflect that:
- various dependencies that previously didn't work with Python 3.13 now seem to
- ultravox isn't fully supported on macOS
2025-09-11 12:27:43 -04:00
Aleix Conchillo Flaqué
9b3f6927c2 Merge pull request #2621 from pipecat-ai/aleix/interruption-task-frame
interruption task frame
2025-09-11 09:22:35 -07:00
Aleix Conchillo Flaqué
0b21f8a6bd FrameProcessor: add push_interruption_task_frame_and_wait() 2025-09-11 09:19:44 -07:00
Aleix Conchillo Flaqué
8249b014f0 frames: BotInterruptionFrame is deprecated, use InterruptionTaskFrame 2025-09-11 09:01:54 -07:00
Aleix Conchillo Flaqué
9d9f10ae0e frames: StartInterruptionFrame is deprecated, use InterruptionFrame 2025-09-11 09:01:54 -07:00
Aleix Conchillo Flaqué
e27b23694d frames: add new TaskFrame
TaskFrame is a base class for other frames that are meant to be sent to the
pipeline task.
2025-09-11 09:01:52 -07:00
marcus-daily
66ce5fe6bd Ruff fixes 2025-09-11 16:04:56 +01:00
marcus-daily
a9b53dc800 Update inference session options 2025-09-11 16:04:56 +01:00
marcus-daily
818352a300 Formatting 2025-09-11 16:04:56 +01:00
marcus-daily
3e9fc7be19 Update onnxruntime version 2025-09-11 16:04:56 +01:00
marcus-daily
a2e76bcad8 Smart Turn V3 support 2025-09-11 16:04:56 +01:00
Mark Backman
8e8e42717b Add to and from phone information to the development runner 2025-09-11 10:44:21 -04:00
kompfner
b31322e38e Merge pull request #2619 from pipecat-ai/pk/aws-universal-context
Expand universal `LLMContext` support to AWS Bedrock
2025-09-11 09:33:08 -04:00
Paul Kompfner
fedb8a201f Update 12d example to use LLMContext, now that AWS Bedrock supports it 2025-09-09 16:24:13 -04:00
Paul Kompfner
8ccd220a60 Add universal LLMContext support to AWSBedrockLLMService.run_inference() 2025-09-09 16:00:32 -04:00
Paul Kompfner
fe79de8f27 When converting universal LLMContext messages to AWS Bedrock expected format, automatically update non-initial "system"-role messages to "user"-role messages, as we do in other non-OpenAI LLM services 2025-09-09 15:50:03 -04:00
Paul Kompfner
176573c342 Add to CHANGELOG AWS Bedrock's support for universal LLMContext 2025-09-09 15:31:56 -04:00
Paul Kompfner
75f9914f49 Add support for universal LLMContext to AWS Bedrock LLM service 2025-09-09 15:25:04 -04:00
Paul Kompfner
f4d6715e32 Add foundational example using AWS Bedrock with universal LLMContext 2025-09-09 10:49:51 -04:00
TheNotary
7366b1aee0 adds missing InterimTranscriptionFrame import 2025-09-06 14:40:19 -05:00
Manish Kumar
4699ee8d86 docs: add docstring for voice_cloning_key and update CHANGELOG 2025-09-04 22:45:51 +05:30
Aleix Conchillo Flaqué
e3597801d4 AWSNovaSonicLLMService: pre-load audio cue in the constructor 2025-09-04 09:31:39 -07:00
Manish Kumar
2ee481d541 feat: add voice cloning and speaking rate to GoogleTTSService 2025-08-30 23:04:59 +05:30
TheNotary
48b3ad8f8f adds support for creating InterimTranscriptFrames for Azure speech services 2025-08-19 17:00:42 -05:00
Abhishek
8bbdc7c8d1 Only set last_frame_time when handling OutputAudioRawFrame
We don't want to set `last_frame_time` on other frames like `HeartBeatFrame`, `LLMGeneratedTextFrame`, `InterruptionFrames` so that we can calculate `diff_time` and compare it against `vad_stop_secs` properly
2025-08-16 16:25:14 +05:30
73 changed files with 1282 additions and 369 deletions

View File

@@ -5,15 +5,68 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.85] - 2025-09-12
### Added
- `AzureSTTService` now pushes interim transcriptions.
- Added `voice_cloning_key` to `GoogleTTSService` to support custom cloned
voices.
- Added `speaking_rate` to `GoogleTTSService.InputParams` to control the
speaking rate.
- Added a `speed` arg to `OpenAITTSService` to control the speed of the voice
response.
- Added `FrameProcessor.push_interruption_task_frame_and_wait()`. Use this
method to programatically interrupt the bot from any part of the
pipeline. This guarantees that all the processors in the pipeline are
interrupted in order (from upstream to downstream). Internally, this works by
first pushing an `InterruptionTaskFrame` upstream until it reaches the
pipeline task. The pipeline task then generates an `InterruptionFrame`, which
flows downstream through all processors. Once the `InterruptionFrame` has
reaches the processor waiting for the interruption, the function returns and
execution continues after the call. Think of it as sending an upstream request
for interruption and waiting until the acknowledgment flows back downstream.
- Added new base `TaskFrame` (which is a system frame). This is the base class
for all task frames (`EndTaskFrame`, `CancelTaskFrame`, etc.) that are meant
to be pushed upstream to reach the pipeline task.
- Expanded support for universal `LLMContext` to the AWS Bedrock LLM service.
Using the universal `LLMContext` and associated `LLMContextAggregatorPair` is
a pre-requisite for using `LLMSwitcher` to switch between LLMs at runtime.
- Added new fields to the development runner's `parse_telephony_websocket`
method in support of providing dynamic data to a bot.
- Twilio: Added a new `body` parameter, which parses the websocket message
for `customParameters`. Provide data via the `Parameter` nouns in your
TwiML to use this feature.
- Telnyx & Exotel: Both providers make the `to` and `from` phone numbers
available in the websocket messages. You can now access these numbers as
`call_data["to"]` and `call_data["from"]`.
Note: Each telephony provider offers different features. Refer to the
corresponding example in `pipecat-examples` to see how to pass custom data
to your bot.
- Added `body` to the `WebsocketRunnerArguments` as an optional parameter.
Custom `body` information can be passed from the server into the bot file via
the `bot()` method using this new parameter.
- Added video streaming support to `LiveKitTransport`.
- Added `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` which provide
access to OpenAI Realtime.
### Changed
- `pipeline.tests.utils.run_test()` now allows passing `PipelineParams` instead
of individual parameters.
### Removed
- Remove `VisionImageRawFrame` in favor of context frames (`LLMContextFrame` or
@@ -21,6 +74,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- `BotInterruptionFrame` is now deprecated, use `InterruptionTaskFrame` instead.
- `StartInterruptionFrame` is now deprected, use `InterruptionFrame` instead.
- Deprecate `VisionImageFrameAggregator` because `VisionImageRawFrame` has been
removed. See the `12*` examples for the new recommended replacement pattern.
@@ -33,6 +90,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a `BaseOutputTransport` issue that caused incorrect detection of when
the bot stopped talking while using an audio mixer.
- Fixed a `LiveKitTransport` issue where RTVI messages were not properly
encoded.

View File

@@ -153,7 +153,11 @@ You can get started with Pipecat running on your local machine, then move your a
2. Install development and testing dependencies:
```bash
uv sync --group dev --all-extras --no-extra gstreamer --no-extra krisp --no-extra local
uv sync --group dev --all-extras \
--no-extra gstreamer \
--no-extra krisp \
--no-extra local \
--no-extra ultravox # (ultravox not fully supported on macOS)
```
3. Install the git pre-commit hooks:
@@ -162,23 +166,6 @@ You can get started with Pipecat running on your local machine, then move your a
uv run pre-commit install
```
### Python 3.13+ Compatibility
Some features require PyTorch, which doesn't yet support Python 3.13+. Install using:
```bash
uv sync --group dev --all-extras \
--no-extra gstreamer \
--no-extra krisp \
--no-extra local \
--no-extra local-smart-turn \
--no-extra mlx-whisper \
--no-extra moondream \
--no-extra ultravox
```
> **Tip:** For full compatibility, use Python 3.12: `uv python pin 3.12`
> **Note**: Some extras (local, gstreamer) require system dependencies. See documentation if you encounter build errors.
### Running tests

View File

@@ -14,7 +14,7 @@ from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotInterruptionFrame,
InterruptionFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -115,7 +115,7 @@ async def main():
await task.queue_frames(
[
BotInterruptionFrame(),
InterruptionFrame(),
UserStartedSpeakingFrame(),
TranscriptionFrame(
user_id=participant_id,

View File

@@ -36,7 +36,6 @@ load_dotenv(override=True)
audiobuffer = AudioBufferProcessor(
num_channels=2, # 1 for mono, 2 for stereo (user left, bot right)
enable_turn_audio=False, # Enable per-turn audio recording
user_continuous_stream=True, # User has continuous audio stream
)

View File

@@ -12,8 +12,8 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
InterruptionFrame,
LLMRunFrame,
StartInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -97,7 +97,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@stt.event_handler("on_speech_started")
async def on_speech_started(stt, *args, **kwargs):
await task.queue_frames([StartInterruptionFrame(), UserStartedSpeakingFrame()])
await task.queue_frames([InterruptionFrame(), UserStartedSpeakingFrame()])
@stt.event_handler("on_utterance_end")
async def on_utterance_end(stt, *args, **kwargs):

View File

@@ -16,10 +16,10 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -181,9 +181,7 @@ class TranscriptionContextFixup(FrameProcessor):
if isinstance(frame, MagicDemoTranscriptionFrame):
self._transcript = frame.text
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(
frame, StartInterruptionFrame
):
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, InterruptionFrame):
self.swap_user_audio()
self.add_transcript_back_to_inference_output()
self._transcript = ""

View File

@@ -13,6 +13,7 @@ from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
TextFrame,
TTSSpeakFrame,
UserImageRawFrame,
@@ -21,10 +22,7 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
@@ -73,14 +71,14 @@ class UserImageProcessor(FrameProcessor):
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
# Note: AWS Bedrock does not yet support the universal LLMContext
context = OpenAILLMContext()
context = LLMContext()
context.add_image_frame_message(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
frame = OpenAILLMContextFrame(context)
frame = LLMContextFrame(context)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
@@ -121,6 +119,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
aws = AWSBedrockLLMService(
aws_region="us-west-2",
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
# Note: usually, prefer providing latency="optimized" param.
# Here we can't because AWS Bedrock doesn't support it for Claude 3.7,
# which we need for image input.
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)

View File

@@ -0,0 +1,214 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.services.aws.llm import AWSBedrockLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# Global variable to store the client ID
client_id = ""
async def get_weather(params: FunctionCallParams):
location = params.arguments["location"]
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(params: FunctionCallParams):
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={client_id}, question={question}")
# Request the image frame
await params.llm.request_image_frame(
user_id=client_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
)
# Wait a short time for the frame to be processed
await asyncio.sleep(0.5)
# Return a result to complete the function call
await params.result_callback(
f"I've captured an image from your camera and I'm analyzing what you asked about: {question}"
)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = AWSBedrockLLMService(
aws_region="us-west-2",
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
# Note: usually, prefer providing latency="optimized" param.
# Here we can't because AWS Bedrock doesn't support it for Claude 3.7,
# which we need for image input.
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Get an image from the video stream.",
properties={
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
},
required=["question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function])
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
If you need to use a tool, simply use the tool. Do not tell the user the tool you are using. Be brief and concise.
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Start the conversation by introducing yourself."},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
await maybe_capture_participant_camera(transport, client)
global client_id
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -22,7 +22,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai_realtime_beta import (
InputAudioNoiseReduction,
@@ -31,7 +31,6 @@ from pipecat.services.openai_realtime_beta import (
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai_realtime_beta.events import AudioConfiguration, AudioInput
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -114,18 +113,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
session_properties = SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=SemanticTurnDetection(),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
noise_reduction=InputAudioNoiseReduction(type="near_field"),
)
),
output_modalities=["text"],
input_audio_transcription=InputAudioTranscription(),
modalities=["text"],
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=SemanticTurnDetection(),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
input_audio_noise_reduction=InputAudioNoiseReduction(type="near_field"),
# tools=tools,
instructions="""You are a helpful and friendly AI.

View File

@@ -18,9 +18,9 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterruptionFrame,
LLMRunFrame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
TextFrame,
TranscriptionFrame,
@@ -144,7 +144,7 @@ class OutputGate(FrameProcessor):
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
self._frames_buffer = []
self.close_gate()
await self.push_frame(frame, direction)
@@ -232,7 +232,7 @@ class TurnDetectionLLM(Pipeline):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, InterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)

View File

@@ -18,9 +18,9 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterruptionFrame,
LLMRunFrame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
TextFrame,
TranscriptionFrame,
@@ -347,7 +347,7 @@ class OutputGate(FrameProcessor):
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
self._frames_buffer = []
self.close_gate()
await self.push_frame(frame, direction)
@@ -426,7 +426,7 @@ class TurnDetectionLLM(Pipeline):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, InterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)

View File

@@ -20,10 +20,10 @@ from pipecat.frames.frames import (
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
InterruptionFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
TextFrame,
TranscriptionFrame,
@@ -570,7 +570,7 @@ class OutputGate(FrameProcessor):
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
self._frames_buffer = []
self.close_gate()
await self.push_frame(frame, direction)

View File

@@ -15,8 +15,8 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
InterruptionFrame,
LLMRunFrame,
StartInterruptionFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
)
@@ -48,7 +48,7 @@ class CustomObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- InterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
@@ -69,7 +69,7 @@ class CustomObserver(BaseObserver):
# Create direction arrow
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
if isinstance(frame, StartInterruptionFrame) and isinstance(src, BaseOutputTransport):
if isinstance(frame, InterruptionFrame) and isinstance(src, BaseOutputTransport):
logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v2 import LocalSmartTurnAnalyzerV2
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
@@ -31,20 +31,7 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
# to the path where the smart-turn repo is cloned.
#
# Example setup:
#
# # Git LFS (Large File Storage)
# brew install git-lfs
# # Hugging Face uses LFS to store large model files, including .mlpackage
# git lfs install
# # Clone the repo with the smart_turn_classifier.mlpackage
# git clone https://huggingface.co/pipecat-ai/smart-turn-v2
#
# Then set the env variable:
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
# or add it to your .env file
# to the Smart Turn v3 ONNX model file.
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -55,7 +42,7 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV2(
turn_analyzer=LocalSmartTurnAnalyzerV3(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
@@ -63,7 +50,7 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV2(
turn_analyzer=LocalSmartTurnAnalyzerV3(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
@@ -71,7 +58,7 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV2(
turn_analyzer=LocalSmartTurnAnalyzerV3(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),

View File

@@ -95,8 +95,9 @@ sambanova = []
sarvam = [ "websockets>=13.1,<15.0" ]
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
local-smart-turn-v3 = [ "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3", "onnxruntime>=1.20.1, <2" ]
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
silero = [ "onnxruntime>=1.20.1, <2" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "websockets>=13.1,<15.0" ]
soundfile = [ "soundfile~=0.13.0" ]
@@ -154,6 +155,7 @@ where = ["src"]
"src/pipecat/audio/dtmf/dtmf-star.wav",
]
"pipecat.services.aws_nova_sonic" = ["src/pipecat/services/aws_nova_sonic/ready.wav"]
"pipecat.audio.turn.smart_turn.data" = ["src/pipecat/audio/turn/smart_turn/data/smart-turn-v3.0.onnx"]
[tool.pytest.ini_options]
addopts = "--verbose"

View File

@@ -135,6 +135,25 @@ TESTS_14 = [
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14x-function-calling-universal-context.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
(
"14y-function-calling-google-universal-context.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
(
"14z-function-calling-anthropic-universal-context.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
(
"14aa-function-calling-aws-universal-context.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
# Currently not working.
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
@@ -148,6 +167,7 @@ TESTS_15 = [
TESTS_19 = [
("19-openai-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-beta-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
]

View File

@@ -9,7 +9,7 @@
import copy
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
from typing import Any, Dict, List, TypedDict
from anthropic import NOT_GIVEN, NotGiven
from anthropic.types.message_param import MessageParam
@@ -28,10 +28,7 @@ from pipecat.processors.aggregators.llm_context import (
class AnthropicLLMInvocationParams(TypedDict):
"""Context-based parameters for invoking Anthropic's LLM API.
This is a placeholder until support for universal LLMContext machinery is added for Anthropic.
"""
"""Context-based parameters for invoking Anthropic's LLM API."""
system: str | NotGiven
messages: List[MessageParam]
@@ -50,8 +47,6 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
) -> AnthropicLLMInvocationParams:
"""Get Anthropic-specific LLM invocation parameters from a universal LLM context.
This is a placeholder until support for universal LLMContext machinery is added for Anthropic.
Args:
context: The LLM context containing messages, tools, etc.
enable_prompt_caching: Whether prompt caching should be enabled.
@@ -76,8 +71,6 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
Removes or truncates sensitive data like image content for safe logging.
This is a placeholder until support for universal LLMContext machinery is added for Anthropic.
Args:
context: The LLM context containing messages.

View File

@@ -6,21 +6,33 @@
"""AWS Bedrock LLM adapter for Pipecat."""
from typing import Any, Dict, List, TypedDict
import base64
import copy
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, TypedDict
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
LLMContextToolChoice,
LLMSpecificMessage,
LLMStandardMessage,
)
class AWSBedrockLLMInvocationParams(TypedDict):
"""Context-based parameters for invoking AWS Bedrock's LLM API.
"""Context-based parameters for invoking AWS Bedrock's LLM API."""
This is a placeholder until support for universal LLMContext machinery is added for Bedrock.
"""
pass
system: Optional[List[dict[str, Any]]] # [{"text": "system message"}]
messages: List[dict[str, Any]]
tools: List[dict[str, Any]]
tool_choice: LLMContextToolChoice
class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
@@ -33,30 +45,239 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
def get_llm_invocation_params(self, context: LLMContext) -> AWSBedrockLLMInvocationParams:
"""Get AWS Bedrock-specific LLM invocation parameters from a universal LLM context.
This is a placeholder until support for universal LLMContext machinery is added for Bedrock.
Args:
context: The LLM context containing messages, tools, etc.
Returns:
Dictionary of parameters for invoking AWS Bedrock's LLM API.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Bedrock.")
messages = self._from_universal_context_messages(self._get_messages(context))
return {
"system": messages.system,
"messages": messages.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
# To avoid refactoring in AWSBedrockLLMService, we just pass through tool_choice.
# Eventually (when we don't have to maintain the non-LLMContext code path) we should do
# the conversion to Bedrock's expected format here rather than in AWSBedrockLLMService.
"tool_choice": context.tool_choice,
}
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Bedrock.
Removes or truncates sensitive data like image content for safe logging.
This is a placeholder until support for universal LLMContext machinery is added for Bedrock.
Args:
context: The LLM context containing messages.
Returns:
List of messages in a format ready for logging about AWS Bedrock.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Bedrock.")
# Get messages in Anthropic's format
messages = self._from_universal_context_messages(self._get_messages(context)).messages
# Sanitize messages for logging
messages_for_logging = []
for message in messages:
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item.get("image"):
item["image"]["source"]["bytes"] = "..."
messages_for_logging.append(msg)
return messages_for_logging
def _get_messages(self, context: LLMContext) -> List[LLMContextMessage]:
return context.get_messages("anthropic")
@dataclass
class ConvertedMessages:
"""Container for Anthropic-formatted messages converted from universal context."""
messages: List[dict[str, Any]]
system: Optional[str]
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
system = None
messages = []
# first, map messages using self._from_universal_context_message(m)
try:
messages = [self._from_universal_context_message(m) for m in universal_context_messages]
except Exception as e:
logger.error(f"Error mapping messages: {e}")
# See if we should pull the system message out of our messages list
if messages and messages[0]["role"] == "system":
system = messages[0]["content"]
messages.pop(0)
# Convert any subsequent "system"-role messages to "user"-role
# messages, as AWS Bedrock doesn't support system input messages.
for message in messages:
if message["role"] == "system":
message["role"] = "user"
# Merge consecutive messages with the same role.
i = 0
while i < len(messages) - 1:
current_message = messages[i]
next_message = messages[i + 1]
if current_message["role"] == next_message["role"]:
# Convert content to list of dictionaries if it's a string
if isinstance(current_message["content"], str):
current_message["content"] = [
{"type": "text", "text": current_message["content"]}
]
if isinstance(next_message["content"], str):
next_message["content"] = [{"type": "text", "text": next_message["content"]}]
# Concatenate the content
current_message["content"].extend(next_message["content"])
# Remove the next message from the list
messages.pop(i + 1)
else:
i += 1
# Avoid empty content in messages
for message in messages:
if isinstance(message["content"], str) and message["content"] == "":
message["content"] = "(empty)"
elif isinstance(message["content"], list) and len(message["content"]) == 0:
message["content"] = [{"type": "text", "text": "(empty)"}]
return self.ConvertedMessages(messages=messages, system=system)
def _from_universal_context_message(self, message: LLMContextMessage) -> dict[str, Any]:
if isinstance(message, LLMSpecificMessage):
return copy.deepcopy(message.message)
return self._from_standard_message(message)
def _from_standard_message(self, message: LLMStandardMessage) -> dict[str, Any]:
"""Convert standard format message to AWS Bedrock format.
Handles conversion of text content, tool calls, and tool results.
Empty text content is converted to "(empty)".
Args:
message: Message in standard format.
Returns:
Message in AWS Bedrock format.
Examples:
Standard format input::
{
"role": "assistant",
"tool_calls": [
{
"id": "123",
"function": {"name": "search", "arguments": '{"q": "test"}'}
}
]
}
AWS Bedrock format output::
{
"role": "assistant",
"content": [
{
"toolUse": {
"toolUseId": "123",
"name": "search",
"input": {"q": "test"}
}
}
]
}
"""
message = copy.deepcopy(message)
if message["role"] == "tool":
# Try to parse the content as JSON if it looks like JSON
try:
if message["content"].strip().startswith("{") and message[
"content"
].strip().endswith("}"):
content_json = json.loads(message["content"])
tool_result_content = [{"json": content_json}]
else:
tool_result_content = [{"text": message["content"]}]
except:
tool_result_content = [{"text": message["content"]}]
return {
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": message["tool_call_id"],
"content": tool_result_content,
},
},
],
}
if message.get("tool_calls"):
tc = message["tool_calls"]
ret = {"role": "assistant", "content": []}
for tool_call in tc:
function = tool_call["function"]
arguments = json.loads(function["arguments"])
new_tool_use = {
"toolUse": {
"toolUseId": tool_call["id"],
"name": function["name"],
"input": arguments,
}
}
ret["content"].append(new_tool_use)
return ret
# Handle text content
content = message.get("content")
if isinstance(content, str):
if content == "":
return {"role": message["role"], "content": [{"text": "(empty)"}]}
else:
return {"role": message["role"], "content": [{"text": content}]}
elif isinstance(content, list):
new_content = []
for item in content:
# fix empty text
if item.get("type", "") == "text":
text_content = item["text"] if item["text"] != "" else "(empty)"
new_content.append({"text": text_content})
# handle image_url -> image conversion
if item["type"] == "image_url":
new_item = {
"image": {
"format": "jpeg",
"source": {
"bytes": base64.b64decode(item["image_url"]["url"].split(",")[1])
},
}
}
new_content.append(new_item)
# In the case where there's a single image in the list (like what
# would result from a UserImageRawFrame), ensure that the image
# comes before text
image_indices = [i for i, item in enumerate(new_content) if "image" in item]
text_indices = [i for i, item in enumerate(new_content) if "text" in item]
if len(image_indices) == 1 and text_indices:
img_idx = image_indices[0]
first_txt_idx = text_indices[0]
if img_idx > first_txt_idx:
# Move image before the first text
image_item = new_content.pop(img_idx)
new_content.insert(first_txt_idx, image_item)
return {"role": message["role"], "content": new_content}
return message
@staticmethod
def _to_bedrock_function_format(function: FunctionSchema) -> Dict[str, Any]:

View File

@@ -0,0 +1,124 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Local turn analyzer for on-device ML inference using the smart-turn-v3 model.
This module provides a smart turn analyzer that uses an ONNX model for
local end-of-turn detection without requiring network connectivity.
"""
from typing import Any, Dict, Optional
import numpy as np
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
try:
import onnxruntime as ort
from transformers import WhisperFeatureExtractor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use LocalSmartTurnAnalyzerV3, you need to `pip install pipecat-ai[local-smart-turn-v3]`."
)
raise Exception(f"Missing module: {e}")
class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
"""Local turn analyzer using the smart-turn-v3 ONNX model.
Provides end-of-turn detection using locally-stored ONNX model,
enabling offline operation without network dependencies.
"""
def __init__(self, *, smart_turn_model_path: Optional[str] = None, **kwargs):
"""Initialize the local ONNX smart-turn-v3 analyzer.
Args:
smart_turn_model_path: Path to the ONNX model file. If this is not
set, the bundled smart-turn-v3.0 model will be used.
**kwargs: Additional arguments passed to BaseSmartTurn.
"""
super().__init__(**kwargs)
logger.debug("Loading Local Smart Turn v3 model...")
if not smart_turn_model_path:
# Load bundled model
model_name = "smart-turn-v3.0.onnx"
package_path = "pipecat.audio.turn.smart_turn.data"
try:
import importlib_resources as impresources
smart_turn_model_path = str(impresources.files(package_path).joinpath(model_name))
except BaseException:
from importlib import resources as impresources
try:
with impresources.path(package_path, model_name) as f:
smart_turn_model_path = f
except BaseException:
smart_turn_model_path = str(
impresources.files(package_path).joinpath(model_name)
)
so = ort.SessionOptions()
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
so.inter_op_num_threads = 1
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self._feature_extractor = WhisperFeatureExtractor(chunk_length=8)
self._session = ort.InferenceSession(smart_turn_model_path, sess_options=so)
logger.debug("Loaded Local Smart Turn v3")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local ONNX model."""
def truncate_audio_to_last_n_seconds(audio_array, n_seconds=8, sample_rate=16000):
"""Truncate audio to last n seconds or pad with zeros to meet n seconds."""
max_samples = n_seconds * sample_rate
if len(audio_array) > max_samples:
return audio_array[-max_samples:]
elif len(audio_array) < max_samples:
# Pad with zeros at the beginning
padding = max_samples - len(audio_array)
return np.pad(audio_array, (padding, 0), mode="constant", constant_values=0)
return audio_array
# Truncate to 8 seconds (keeping the end) or pad to 8 seconds
audio_array = truncate_audio_to_last_n_seconds(audio_array, n_seconds=8)
# Process audio using Whisper's feature extractor
inputs = self._feature_extractor(
audio_array,
sampling_rate=16000,
return_tensors="pt",
padding="max_length",
max_length=8 * 16000,
truncation=True,
do_normalize=True,
)
# Convert to numpy and ensure correct shape for ONNX
input_features = inputs.input_features.squeeze(0).numpy().astype(np.float32)
input_features = np.expand_dims(input_features, axis=0) # Add batch dimension
# Run ONNX inference
outputs = self._session.run(None, {"input_features": input_features})
# Extract probability (ONNX model returns sigmoid probabilities)
probability = outputs[0][0].item()
# Make prediction (1 for Complete, 0 for Incomplete)
prediction = 1 if probability > 0.5 else 0
return {
"prediction": prediction,
"probability": probability,
}

View File

@@ -21,7 +21,6 @@ from typing import List, Optional
from loguru import logger
from pipecat.frames.frames import (
BotInterruptionFrame,
EndFrame,
Frame,
LLMFullResponseEndFrame,
@@ -360,7 +359,7 @@ class ClassificationProcessor(FrameProcessor):
await self._voicemail_notifier.notify() # Clear buffered TTS frames
# Interrupt the current pipeline to stop any ongoing processing
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
await self.push_interruption_task_frame_and_wait()
# Set the voicemail event to trigger the voicemail handler
self._voicemail_event.clear()

View File

@@ -788,43 +788,6 @@ class FatalErrorFrame(ErrorFrame):
fatal: bool = field(default=True, init=False)
@dataclass
class EndTaskFrame(SystemFrame):
"""Frame to request graceful pipeline task closure.
This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream. This frame should be pushed upstream.
"""
pass
@dataclass
class CancelTaskFrame(SystemFrame):
"""Frame to request immediate pipeline task cancellation.
This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream. This frame
should be pushed upstream.
"""
pass
@dataclass
class StopTaskFrame(SystemFrame):
"""Frame to request pipeline task stop while keeping processors running.
This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state. This frame should be pushed
upstream.
"""
pass
@dataclass
class FrameProcessorPauseUrgentFrame(SystemFrame):
"""Frame to pause frame processing immediately.
@@ -857,7 +820,7 @@ class FrameProcessorResumeUrgentFrame(SystemFrame):
@dataclass
class StartInterruptionFrame(SystemFrame):
class InterruptionFrame(SystemFrame):
"""Frame indicating user started speaking (interruption detected).
Emitted by the BaseInputTransport to indicate that a user has started
@@ -869,6 +832,34 @@ class StartInterruptionFrame(SystemFrame):
pass
@dataclass
class StartInterruptionFrame(InterruptionFrame):
"""Frame indicating user started speaking (interruption detected).
.. deprecated:: 0.0.85
This frame is deprecated and will be removed in a future version.
Instead, use `InterruptionFrame`.
Emitted by the BaseInputTransport to indicate that a user has started
speaking (i.e. is interrupting). This is similar to
UserStartedSpeakingFrame except that it should be pushed concurrently
with other frames (so the order is not guaranteed).
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"StartInterruptionFrame is deprecated and will be removed in a future version. "
"Instead, use InterruptionFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class UserStartedSpeakingFrame(SystemFrame):
"""Frame indicating user has started speaking.
@@ -944,20 +935,6 @@ class VADUserStoppedSpeakingFrame(SystemFrame):
pass
@dataclass
class BotInterruptionFrame(SystemFrame):
"""Frame indicating the bot should be interrupted.
Emitted when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
This frame should be pushed upstreams. It results in the BaseInputTransport
starting an interruption by pushing a StartInterruptionFrame downstream.
"""
pass
@dataclass
class BotStartedSpeakingFrame(SystemFrame):
"""Frame indicating the bot started speaking.
@@ -1289,6 +1266,103 @@ class SpeechControlParamsFrame(SystemFrame):
turn_params: Optional[SmartTurnParams] = None
#
# Task frames
#
@dataclass
class TaskFrame(SystemFrame):
"""Base frame for task frames.
This is a base class for frames that are meant to be sent and handled
upstream by the pipeline task. This might result in a corresponding frame
sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or
`EndTaskFrame` / `EndFrame`).
"""
pass
@dataclass
class EndTaskFrame(TaskFrame):
"""Frame to request graceful pipeline task closure.
This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream. This frame should be pushed upstream.
"""
pass
@dataclass
class CancelTaskFrame(TaskFrame):
"""Frame to request immediate pipeline task cancellation.
This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream. This frame
should be pushed upstream.
"""
pass
@dataclass
class StopTaskFrame(TaskFrame):
"""Frame to request pipeline task stop while keeping processors running.
This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state. This frame should be pushed
upstream.
"""
pass
@dataclass
class InterruptionTaskFrame(TaskFrame):
"""Frame indicating the bot should be interrupted.
Emitted when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
This frame should be pushed upstream.
"""
pass
@dataclass
class BotInterruptionFrame(InterruptionTaskFrame):
"""Frame indicating the bot should be interrupted.
.. deprecated:: 0.0.85
This frame is deprecated and will be removed in a future version.
Instead, use `InterruptionTaskFrame`.
Emitted when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
This frame should be pushed upstream.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"BotInterruptionFrame is deprecated and will be removed in a future version. "
"Instead, use InterruptionTaskFrame.",
DeprecationWarning,
stacklevel=2,
)
#
# Control frames
#

View File

@@ -54,7 +54,7 @@ class DebugLogObserver(BaseObserver):
Log frames with specific source/destination filters::
from pipecat.frames.frames import StartInterruptionFrame, UserStartedSpeakingFrame, LLMTextFrame
from pipecat.frames.frames import InterruptionFrame, UserStartedSpeakingFrame, LLMTextFrame
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.services.stt_service import STTService
@@ -62,8 +62,8 @@ class DebugLogObserver(BaseObserver):
observers=[
DebugLogObserver(
frame_types={
# Only log StartInterruptionFrame when source is BaseOutputTransport
StartInterruptionFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
# Only log InterruptionFrame when source is BaseOutputTransport
InterruptionFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
# Only log UserStartedSpeakingFrame when destination is STTService
UserStartedSpeakingFrame: (STTService, FrameEndpoint.DESTINATION),
# Log LLMTextFrame regardless of source or destination type

View File

@@ -32,6 +32,8 @@ from pipecat.frames.frames import (
Frame,
HeartbeatFrame,
InputAudioRawFrame,
InterruptionFrame,
InterruptionTaskFrame,
MetricsFrame,
StartFrame,
StopFrame,
@@ -627,13 +629,23 @@ class PipelineTask(BasePipelineTask):
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
logger.debug(f"{self}: received end task frame {frame}")
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
logger.debug(f"{self}: received cancel task frame {frame}")
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
# Tell the task we should stop nicely.
logger.debug(f"{self}: received stop task frame {frame}")
await self.queue_frame(StopFrame())
elif isinstance(frame, InterruptionTaskFrame):
# Tell the task we should interrupt the pipeline. Note that we are
# bypassing the push queue and directly queue into the
# pipeline. This is in case the push task is blocked waiting for a
# pipeline-ending frame to finish traversing the pipeline.
logger.debug(f"{self}: received interruption task frame {frame}")
await self._pipeline.queue_frame(InterruptionFrame())
elif isinstance(frame, ErrorFrame):
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
@@ -642,7 +654,7 @@ class PipelineTask(BasePipelineTask):
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
else:
logger.warning(f"Something went wrong: {frame}")
logger.warning(f"{self}: Something went wrong: {frame}")
async def _sink_push_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames coming downstream from the pipeline.

View File

@@ -16,7 +16,6 @@ from typing import Optional
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.frames.frames import (
BotInterruptionFrame,
CancelFrame,
EndFrame,
Frame,
@@ -24,7 +23,7 @@ from pipecat.frames.frames import (
StartFrame,
TranscriptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.time import time_now_iso8601
@@ -105,7 +104,7 @@ class DTMFAggregator(FrameProcessor):
# For first digit, schedule interruption.
if is_first_digit:
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
await self.push_interruption_task_frame_and_wait()
# Check for immediate flush conditions
if frame.button == self._termination_digit:

View File

@@ -22,7 +22,6 @@ from pipecat.audio.interruptions.base_interruption_strategy import BaseInterrupt
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -36,6 +35,7 @@ from pipecat.frames.frames import (
FunctionCallsStartedFrame,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
@@ -48,7 +48,6 @@ from pipecat.frames.frames import (
OpenAILLMContextAssistantTimestampFrame,
SpeechControlParamsFrame,
StartFrame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserImageRawFrame,
@@ -138,7 +137,7 @@ class LLMFullResponseAggregator(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._call_event_handler("on_completion", self._aggregation, False)
self._aggregation = ""
self._started = False
@@ -532,9 +531,9 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
if should_interrupt:
logger.debug(
"Interruption conditions met - pushing BotInterruptionFrame and aggregation"
"Interruption conditions met - pushing interruption and aggregation"
)
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
await self.push_interruption_task_frame_and_wait()
await self._process_aggregation()
else:
logger.debug("Interruption conditions not met - not pushing aggregation")
@@ -838,7 +837,7 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, LLMFullResponseStartFrame):
@@ -904,7 +903,7 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
if frame.run_llm:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_interruptions(self, frame: StartInterruptionFrame):
async def _handle_interruptions(self, frame: InterruptionFrame):
await self.push_aggregation()
self._started = 0
await self.reset()

View File

@@ -13,7 +13,6 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
import asyncio
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, Set
from loguru import logger
@@ -23,7 +22,6 @@ from pipecat.audio.interruptions.base_interruption_strategy import BaseInterrupt
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -37,6 +35,7 @@ from pipecat.frames.frames import (
FunctionCallsStartedFrame,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
@@ -48,7 +47,6 @@ from pipecat.frames.frames import (
LLMSetToolsFrame,
SpeechControlParamsFrame,
StartFrame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserImageRawFrame,
@@ -311,9 +309,9 @@ class LLMUserAggregator(LLMContextAggregator):
if should_interrupt:
logger.debug(
"Interruption conditions met - pushing BotInterruptionFrame and aggregation"
"Interruption conditions met - pushing interruption and aggregation"
)
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
await self.push_interruption_task_frame_and_wait()
await self._process_aggregation()
else:
logger.debug("Interruption conditions not met - not pushing aggregation")
@@ -579,7 +577,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, LLMFullResponseStartFrame):
@@ -645,7 +643,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
if frame.run_llm:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_interruptions(self, frame: StartInterruptionFrame):
async def _handle_interruptions(self, frame: InterruptionFrame):
await self._push_aggregation()
self._started = 0
await self.reset()

View File

@@ -25,8 +25,8 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
STTMuteFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -204,7 +204,7 @@ class STTMuteFilter(FrameProcessor):
if isinstance(
frame,
(
StartInterruptionFrame,
InterruptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
UserStartedSpeakingFrame,

View File

@@ -28,8 +28,9 @@ from pipecat.frames.frames import (
FrameProcessorPauseUrgentFrame,
FrameProcessorResumeFrame,
FrameProcessorResumeUrgentFrame,
InterruptionFrame,
InterruptionTaskFrame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
@@ -219,6 +220,9 @@ class FrameProcessor(BaseObject):
self.__process_event: Optional[asyncio.Event] = None
self.__process_frame_task: Optional[asyncio.Task] = None
self._wait_for_interruption = False
self._wait_interruption_event = asyncio.Event()
@property
def id(self) -> int:
"""Get the unique identifier for this processor.
@@ -542,6 +546,14 @@ class FrameProcessor(BaseObject):
if self._cancelling:
return
# If we are waiting for an interruption we will bypass all queued system
# frames and we will process the frame right away. This is because a
# previous system frame might be waiting for the interruption frame and
# it's blocking the input task.
if self._wait_for_interruption and isinstance(frame, InterruptionFrame):
await self.__process_frame(frame, direction, callback)
return
if self._enable_direct_mode:
await self.__process_frame(frame, direction, callback)
else:
@@ -588,7 +600,7 @@ class FrameProcessor(BaseObject):
if isinstance(frame, StartFrame):
await self.__start(frame)
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
await self._start_interruption()
await self.stop_all_metrics()
elif isinstance(frame, CancelFrame):
@@ -620,6 +632,32 @@ class FrameProcessor(BaseObject):
await self.__internal_push_frame(frame, direction)
if isinstance(frame, InterruptionFrame):
self._wait_interruption_event.set()
async def push_interruption_task_frame_and_wait(self):
"""Push an interruption task frame upstream and wait for the interruption.
This function sends an `InterruptionTaskFrame` upstream to the pipeline
task and waits to receive the corresponding `InterruptionFrame`. When
the function finishes it is guaranteed that the `InterruptionFrame` has
been pushed downstream.
"""
self._wait_for_interruption = True
await self.push_frame(InterruptionTaskFrame(), FrameDirection.UPSTREAM)
# Wait for an `InterruptionFrame` to come to this processor and be
# pushed. Take a look at `push_frame()` to see how we first push the
# `InterruptionFrame` and then we set the event in order to maintain
# frame ordering.
await self._wait_interruption_event.wait()
# Clean the event.
self._wait_interruption_event.clear()
self._wait_for_interruption = False
async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
@@ -669,20 +707,22 @@ class FrameProcessor(BaseObject):
async def _start_interruption(self):
"""Start handling an interruption by cancelling current tasks."""
try:
# Cancel the process task. This will stop processing queued frames.
await self.__cancel_process_task()
if self._wait_for_interruption:
# If we get here we know the process task was just waiting for
# an interruption (push_interruption_task_frame_and_wait()), so
# we can't cancel the task because it might still need to do
# more things (e.g. pushing a frame after the
# interruption). Instead we just drain the queue because this is
# an interruption.
self.__reset_process_task()
else:
# Cancel and re-create the process task including the queue.
await self.__cancel_process_task()
self.__create_process_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
await self.push_error(ErrorFrame(str(e)))
# Create a new process queue and task.
self.__create_process_task()
async def _stop_interruption(self):
"""Stop handling an interruption."""
# Nothing to do right now.
pass
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
"""Internal method to push frames to adjacent processors.
@@ -764,6 +804,17 @@ class FrameProcessor(BaseObject):
self.__process_queue = asyncio.Queue()
self.__process_frame_task = self.create_task(self.__process_frame_task_handler())
def __reset_process_task(self):
"""Reset non-system frame processing task."""
if self._enable_direct_mode:
return
self.__should_block_frames = False
self.__process_event = asyncio.Event()
while not self.__process_queue.empty():
self.__process_queue.get_nowait()
self.__process_queue.task_done()
async def __cancel_process_task(self):
"""Cancel the non-system frame processing task."""
if self.__process_frame_task:

View File

@@ -30,7 +30,6 @@ from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
from pipecat.frames.frames import (
BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -1206,7 +1205,7 @@ class RTVIProcessor(FrameProcessor):
async def interrupt_bot(self):
"""Send a bot interruption frame upstream."""
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
await self.push_interruption_task_frame_and_wait()
async def send_server_message(self, data: Any):
"""Send a server message to the client."""

View File

@@ -19,7 +19,7 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartInterruptionFrame,
InterruptionFrame,
TranscriptionFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
@@ -86,7 +86,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
transcript messages. Utterances are completed when:
- The bot stops speaking (BotStoppedSpeakingFrame)
- The bot is interrupted (StartInterruptionFrame)
- The bot is interrupted (InterruptionFrame)
- The pipeline ends (EndFrame)
"""
@@ -185,7 +185,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
- TTSTextFrame: Aggregates text for current utterance
- BotStoppedSpeakingFrame: Completes current utterance
- StartInterruptionFrame: Completes current utterance due to interruption
- InterruptionFrame: Completes current utterance due to interruption
- EndFrame: Completes current utterance at pipeline end
- CancelFrame: Completes current utterance due to cancellation
@@ -195,7 +195,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, (StartInterruptionFrame, CancelFrame)):
if isinstance(frame, (InterruptionFrame, CancelFrame)):
# Push frame first otherwise our emitted transcription update frame
# might get cleaned up.
await self.push_frame(frame, direction)

View File

@@ -51,9 +51,11 @@ class WebSocketRunnerArguments(RunnerArguments):
Parameters:
websocket: WebSocket connection for audio streaming
body: Additional request data
"""
websocket: WebSocket
body: Optional[Any] = field(default_factory=dict)
@dataclass

View File

@@ -99,16 +99,35 @@ async def parse_telephony_websocket(websocket: WebSocket):
tuple: (transport_type: str, call_data: dict)
call_data contains provider-specific fields:
- Twilio: {"stream_id": str, "call_id": str}
- Telnyx: {"stream_id": str, "call_control_id": str, "outbound_encoding": str}
- Plivo: {"stream_id": str, "call_id": str}
- Exotel: {"stream_id": str, "call_id": str, "account_sid": str}
- Twilio: {
"stream_id": str,
"call_id": str,
"body": dict
}
- Telnyx: {
"stream_id": str,
"call_control_id": str,
"outbound_encoding": str,
"from": str,
"to": str,
}
- Plivo: {
"stream_id": str,
"call_id": str,
}
- Exotel: {
"stream_id": str,
"call_id": str,
"account_sid": str,
"from": str,
"to": str,
}
Example usage::
transport_type, call_data = await parse_telephony_websocket(websocket)
if transport_type == "telnyx":
outbound_encoding = call_data["outbound_encoding"]
if transport_type == "twilio":
user_id = call_data["body"]["user_id"]
"""
# Read first two messages
start_data = websocket.iter_text()
@@ -151,9 +170,12 @@ async def parse_telephony_websocket(websocket: WebSocket):
# Extract provider-specific data
if transport_type == "twilio":
start_data = call_data_raw.get("start", {})
body_data = start_data.get("customParameters", {})
call_data = {
"stream_id": start_data.get("streamSid"),
"call_id": start_data.get("callSid"),
# All custom parameters
"body": body_data,
}
elif transport_type == "telnyx":
@@ -163,6 +185,8 @@ async def parse_telephony_websocket(websocket: WebSocket):
"outbound_encoding": call_data_raw.get("start", {})
.get("media_format", {})
.get("encoding"),
"from": call_data_raw.get("start", {}).get("from", ""),
"to": call_data_raw.get("start", {}).get("to", ""),
}
elif transport_type == "plivo":
@@ -178,6 +202,8 @@ async def parse_telephony_websocket(websocket: WebSocket):
"stream_id": start_data.get("stream_sid"),
"call_id": start_data.get("call_sid"),
"account_sid": start_data.get("account_sid"),
"from": start_data.get("from", ""),
"to": start_data.get("to", ""),
}
else:

View File

@@ -20,8 +20,8 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -98,7 +98,7 @@ class ExotelFrameSerializer(FrameSerializer):
Returns:
Serialized data as string or bytes, or None if the frame isn't handled.
"""
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):

View File

@@ -22,8 +22,8 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -122,7 +122,7 @@ class PlivoFrameSerializer(FrameSerializer):
self._hangup_attempted = True
await self._hang_up_call()
return None
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
answer = {"event": "clearAudio", "streamId": self._stream_id}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):

View File

@@ -29,8 +29,8 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -137,7 +137,7 @@ class TelnyxFrameSerializer(FrameSerializer):
self._hangup_attempted = True
await self._hang_up_call()
return None
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
answer = {"event": "clear"}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):

View File

@@ -22,8 +22,8 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -122,7 +122,7 @@ class TwilioFrameSerializer(FrameSerializer):
self._hangup_attempted = True
await self._hang_up_call()
return None
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):

View File

@@ -20,8 +20,8 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -275,7 +275,7 @@ class AsyncAITTSService(InterruptibleTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
self._started = False
async def _receive_messages(self):

View File

@@ -25,7 +25,10 @@ from loguru import logger
from PIL import Image
from pydantic import BaseModel, Field
from pipecat.adapters.services.bedrock_adapter import AWSBedrockLLMAdapter
from pipecat.adapters.services.bedrock_adapter import (
AWSBedrockLLMAdapter,
AWSBedrockLLMInvocationParams,
)
from pipecat.frames.frames import (
Frame,
FunctionCallCancelFrame,
@@ -812,14 +815,10 @@ class AWSBedrockLLMService(LLMService):
messages = []
system = []
if isinstance(context, LLMContext):
# Future code will be something like this:
# adapter = self.get_llm_adapter()
# params: AWSBedrockLLMInvocationParams = adapter.get_llm_invocation_params(context)
# messages = params["messages"]
# system = params["system_instruction"] # [{"text": "system message"}]
raise NotImplementedError(
"Universal LLMContext is not yet supported for AWS Bedrock."
)
adapter: AWSBedrockLLMAdapter = self.get_llm_adapter()
params: AWSBedrockLLMInvocationParams = adapter.get_llm_invocation_params(context)
messages = params["messages"]
system = params["system"] # [{"text": "system message"}]
else:
context = AWSBedrockLLMContext.upgrade_to_bedrock(context)
messages = context.messages
@@ -940,8 +939,25 @@ class AWSBedrockLLMService(LLMService):
}
}
def _get_llm_invocation_params(
self, context: OpenAILLMContext | LLMContext
) -> AWSBedrockLLMInvocationParams:
# Universal LLMContext
if isinstance(context, LLMContext):
adapter: AWSBedrockLLMAdapter = self.get_llm_adapter()
params = adapter.get_llm_invocation_params(context)
return params
# AWS Bedrock-specific context
return AWSBedrockLLMInvocationParams(
system=getattr(context, "system", None),
messages=context.messages,
tools=context.tools or [],
tool_choice=context.tool_choice,
)
@traced_llm
async def _process_context(self, context: AWSBedrockLLMContext):
async def _process_context(self, context: AWSBedrockLLMContext | LLMContext):
# Usage tracking
prompt_tokens = 0
completion_tokens = 0
@@ -958,6 +974,12 @@ class AWSBedrockLLMService(LLMService):
await self.start_ttfb_metrics()
params_from_context = self._get_llm_invocation_params(context)
messages = params_from_context["messages"]
system = params_from_context["system"]
tools = params_from_context["tools"]
tool_choice = params_from_context["tool_choice"]
# Set up inference config
inference_config = {
"maxTokens": self._settings["max_tokens"],
@@ -968,19 +990,18 @@ class AWSBedrockLLMService(LLMService):
# Prepare request parameters
request_params = {
"modelId": self.model_name,
"messages": context.messages,
"messages": messages,
"inferenceConfig": inference_config,
"additionalModelRequestFields": self._settings["additional_model_request_fields"],
}
# Add system message
system = getattr(context, "system", None)
if system:
request_params["system"] = system
# Check if messages contain tool use or tool result content blocks
has_tool_content = False
for message in context.messages:
for message in messages:
if isinstance(message.get("content"), list):
for content_item in message["content"]:
if "toolUse" in content_item or "toolResult" in content_item:
@@ -990,7 +1011,6 @@ class AWSBedrockLLMService(LLMService):
break
# Handle tools: use current tools, or no-op if tool content exists but no current tools
tools = context.tools or []
if has_tool_content and not tools:
tools = [self._create_no_op_tool()]
using_noop_tool = True
@@ -999,17 +1019,15 @@ class AWSBedrockLLMService(LLMService):
tool_config = {"tools": tools}
# Only add tool_choice if we have real tools (not just no-op)
if not using_noop_tool and context.tool_choice:
if context.tool_choice == "auto":
if not using_noop_tool and tool_choice:
if tool_choice == "auto":
tool_config["toolChoice"] = {"auto": {}}
elif context.tool_choice == "none":
elif tool_choice == "none":
# Skip adding toolChoice for "none"
pass
elif (
isinstance(context.tool_choice, dict) and "function" in context.tool_choice
):
elif isinstance(tool_choice, dict) and "function" in tool_choice:
tool_config["toolChoice"] = {
"tool": {"name": context.tool_choice["function"]["name"]}
"tool": {"name": tool_choice["function"]["name"]}
}
request_params["toolConfig"] = tool_config
@@ -1019,9 +1037,16 @@ class AWSBedrockLLMService(LLMService):
request_params["performanceConfig"] = {"latency": self._settings["latency"]}
# Log request params with messages redacted for logging
log_params = dict(request_params)
log_params["messages"] = context.get_messages_for_logging()
logger.debug(f"Calling AWS Bedrock model with: {log_params}")
if isinstance(context, LLMContext):
adapter = self.get_llm_adapter()
context_type_for_logging = "universal"
messages_for_logging = adapter.get_messages_for_logging(context)
else:
context_type_for_logging = "LLM-specific"
messages_for_logging = context.get_messages_for_logging()
logger.debug(
f"{self}: Generating chat from {context_type_for_logging} context [{system}] | {messages_for_logging}"
)
async with self._aws_session.client(
service_name="bedrock-runtime", **self._aws_params
@@ -1129,7 +1154,7 @@ class AWSBedrockLLMService(LLMService):
if isinstance(frame, OpenAILLMContextFrame):
context = AWSBedrockLLMContext.upgrade_to_bedrock(frame.context)
if isinstance(frame, LLMContextFrame):
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Bedrock.")
context = frame.context
elif isinstance(frame, LLMMessagesFrame):
context = AWSBedrockLLMContext.from_messages(frame.messages)
elif isinstance(frame, LLMUpdateSettingsFrame):

View File

@@ -247,13 +247,14 @@ class AWSNovaSonicLLMService(LLMService):
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._assistant_response_trigger_audio: Optional[bytes] = (
None # Not cleared on _disconnect()
)
self._disconnecting = False
self._connected_time: Optional[float] = None
self._wants_connection = False
file_path = files("pipecat.services.aws_nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
self._assistant_response_trigger_audio = wav_file.readframes(wav_file.getnframes())
#
# standard AIService frame handling
#
@@ -1099,20 +1100,13 @@ class AWSNovaSonicLLMService(LLMService):
self._triggering_assistant_response = True
# Read audio bytes, if we don't already have them cached
if not self._assistant_response_trigger_audio:
file_path = files("pipecat.services.aws_nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
self._assistant_response_trigger_audio = wav_file.readframes(wav_file.getnframes())
# Send the trigger audio, if we're fully connected and set up
if self._connected_time is not None:
if self._connected_time:
await self._send_assistant_response_trigger()
async def _send_assistant_response_trigger(self):
if (
not self._assistant_response_trigger_audio or self._connected_time is None
): # should never happen
if not self._connected_time:
# should never happen
return
try:

View File

@@ -21,13 +21,13 @@ from pipecat.frames.frames import (
DataFrame,
Frame,
FunctionCallResultFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
StartInterruptionFrame,
TextFrame,
UserImageRawFrame,
)
@@ -306,7 +306,7 @@ class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
if isinstance(
frame,
(
StartInterruptionFrame,
InterruptionFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TextFrame,

View File

@@ -19,6 +19,7 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
)
@@ -140,6 +141,7 @@ class AzureSTTService(STTService):
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
@@ -197,3 +199,15 @@ class AzureSTTService(STTService):
self._handle_transcription(event.result.text, True, language), self.get_event_loop()
)
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
def _on_handle_recognizing(self, event):
if event.result.reason == ResultReason.RecognizingSpeech and len(event.result.text) > 0:
language = getattr(event.result, "language", None) or self._settings.get("language")
frame = InterimTranscriptionFrame(
event.result.text,
self._user_id,
time_now_iso8601(),
language,
result=event,
)
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())

View File

@@ -20,8 +20,8 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -371,7 +371,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
if self._context_id:

View File

@@ -25,9 +25,9 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -460,7 +460,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
self._started = False
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("Reset", 0)])
@@ -549,7 +549,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by closing the current context."""
await super()._handle_interruption(frame, direction)
@@ -558,7 +558,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
logger.trace(f"Closing context {self._context_id} due to interruption")
try:
# ElevenLabs requires that Pipecat manages the contexts and closes them
# when they're not longer in use. Since a StartInterruptionFrame is pushed
# when they're not longer in use. Since an InterruptionFrame is pushed
# every time the user speaks, we'll use this as a trigger to close the context
# and reset the state.
# Note: We do not need to call remove_audio_context here, as the context is
@@ -856,7 +856,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (StartInterruptionFrame, TTSStoppedFrame)):
if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)):
# Reset timing on interruption or stop
self._reset_state()

View File

@@ -21,8 +21,8 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -259,7 +259,7 @@ class FishAudioTTSService(InterruptibleTTSService):
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
self._request_id = None

View File

@@ -33,6 +33,7 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
InputImageRawFrame,
InputTextRawFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -41,7 +42,6 @@ from pipecat.frames.frames import (
LLMTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
StartInterruptionFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
@@ -752,7 +752,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
elif isinstance(frame, InputImageRawFrame):
await self._send_user_video(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption()
await self.push_frame(frame, direction)
elif isinstance(frame, UserStartedSpeakingFrame):

View File

@@ -500,9 +500,11 @@ class GoogleTTSService(TTSService):
Parameters:
language: Language for synthesis. Defaults to English.
speaking_rate: The speaking rate, in the range [0.25, 4.0].
"""
language: Optional[Language] = Language.EN
speaking_rate: Optional[float] = None
def __init__(
self,
@@ -510,6 +512,7 @@ class GoogleTTSService(TTSService):
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
voice_id: str = "en-US-Chirp3-HD-Charon",
voice_cloning_key: Optional[str] = None,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
@@ -520,6 +523,7 @@ class GoogleTTSService(TTSService):
credentials: JSON string containing Google Cloud service account credentials.
credentials_path: Path to Google Cloud service account JSON file.
voice_id: Google TTS voice identifier (e.g., "en-US-Chirp3-HD-Charon").
voice_cloning_key: The voice cloning key for Chirp 3 custom voices.
sample_rate: Audio sample rate in Hz. If None, uses default.
params: Language configuration parameters.
**kwargs: Additional arguments passed to parent TTSService.
@@ -532,8 +536,10 @@ class GoogleTTSService(TTSService):
"language": self.language_to_service_language(params.language)
if params.language
else "en-US",
"speaking_rate": params.speaking_rate,
}
self.set_voice(voice_id)
self._voice_cloning_key = voice_cloning_key
self._client: texttospeech_v1.TextToSpeechAsyncClient = self._create_client(
credentials, credentials_path
)
@@ -600,15 +606,24 @@ class GoogleTTSService(TTSService):
try:
await self.start_ttfb_metrics()
voice = texttospeech_v1.VoiceSelectionParams(
language_code=self._settings["language"], name=self._voice_id
)
if self._voice_cloning_key:
voice_clone_params = texttospeech_v1.VoiceCloneParams(
voice_cloning_key=self._voice_cloning_key
)
voice = texttospeech_v1.VoiceSelectionParams(
language_code=self._settings["language"], voice_clone=voice_clone_params
)
else:
voice = texttospeech_v1.VoiceSelectionParams(
language_code=self._settings["language"], name=self._voice_id
)
streaming_config = texttospeech_v1.StreamingSynthesizeConfig(
voice=voice,
streaming_audio_config=texttospeech_v1.StreamingAudioConfig(
audio_encoding=texttospeech_v1.AudioEncoding.PCM,
sample_rate_hertz=self.sample_rate,
speaking_rate=self._settings["speaking_rate"],
),
)
config_request = texttospeech_v1.StreamingSynthesizeRequest(

View File

@@ -36,12 +36,12 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
FunctionCallResultProperties,
FunctionCallsStartedFrame,
InterruptionFrame,
LLMConfigureOutputFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
StartInterruptionFrame,
UserImageRequestFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
@@ -269,7 +269,7 @@ class LLMService(AIService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._handle_interruptions(frame)
elif isinstance(frame, LLMConfigureOutputFrame):
self._skip_tts = frame.skip_tts
@@ -286,7 +286,7 @@ class LLMService(AIService):
await super().push_frame(frame, direction)
async def _handle_interruptions(self, _: StartInterruptionFrame):
async def _handle_interruptions(self, _: InterruptionFrame):
for function_name, entry in self._functions.items():
if entry.cancel_on_interruption:
await self._cancel_function_call(function_name)

View File

@@ -16,8 +16,8 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -180,7 +180,7 @@ class LmntTTSService(InterruptibleTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
self._started = False
async def _connect(self):

View File

@@ -25,9 +25,9 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
@@ -224,7 +224,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
self._started = False
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -64,6 +64,7 @@ class OpenAITTSService(TTSService):
model: str = "gpt-4o-mini-tts",
sample_rate: Optional[int] = None,
instructions: Optional[str] = None,
speed: Optional[float] = None,
**kwargs,
):
"""Initialize OpenAI TTS service.
@@ -75,6 +76,7 @@ class OpenAITTSService(TTSService):
model: TTS model to use. Defaults to "gpt-4o-mini-tts".
sample_rate: Output audio sample rate in Hz. If None, uses OpenAI's default 24kHz.
instructions: Optional instructions to guide voice synthesis behavior.
speed: Voice speed control (0.25 to 4.0, default 1.0).
**kwargs: Additional keyword arguments passed to TTSService.
"""
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
@@ -84,6 +86,7 @@ class OpenAITTSService(TTSService):
)
super().__init__(sample_rate=sample_rate, **kwargs)
self._speed = speed
self.set_model_name(model)
self.set_voice(voice)
self._instructions = instructions
@@ -133,17 +136,22 @@ class OpenAITTSService(TTSService):
try:
await self.start_ttfb_metrics()
# Setup extra body parameters
extra_body = {}
# Setup API parameters
create_params = {
"input": text,
"model": self.model_name,
"voice": VALID_VOICES[self._voice_id],
"response_format": "pcm",
}
if self._instructions:
extra_body["instructions"] = self._instructions
create_params["instructions"] = self._instructions
if self._speed:
create_params["speed"] = self._speed
async with self._client.audio.speech.with_streaming_response.create(
input=text,
model=self.model_name,
voice=VALID_VOICES[self._voice_id],
response_format="pcm",
extra_body=extra_body,
**create_params
) as r:
if r.status_code != 200:
error = await r.text()

View File

@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -31,7 +32,6 @@ from pipecat.frames.frames import (
LLMTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
StartInterruptionFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
@@ -366,7 +366,7 @@ class OpenAIRealtimeLLMService(LLMService):
elif isinstance(frame, InputAudioRawFrame):
if not self._audio_input_paused:
await self._send_user_audio(frame)
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption()
elif isinstance(frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(frame)
@@ -716,14 +716,12 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_speech_started(self, evt):
await self._truncate_current_audio_response()
await self._start_interruption() # cancels this processor task
await self.push_frame(StartInterruptionFrame()) # cancels downstream tasks
await self.push_interruption_task_frame_and_wait()
await self.push_frame(UserStartedSpeakingFrame())
async def _handle_evt_speech_stopped(self, evt):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self._stop_interruption()
await self.push_frame(UserStoppedSpeakingFrame())
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):

View File

@@ -24,6 +24,7 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -32,7 +33,6 @@ from pipecat.frames.frames import (
LLMTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
StartInterruptionFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
@@ -364,7 +364,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
elif isinstance(frame, InputAudioRawFrame):
if not self._audio_input_paused:
await self._send_user_audio(frame)
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption()
elif isinstance(frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(frame)
@@ -658,14 +658,12 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_speech_started(self, evt):
await self._truncate_current_audio_response()
await self._start_interruption() # cancels this processor task
await self.push_frame(StartInterruptionFrame()) # cancels downstream tasks
await self.push_interruption_task_frame_and_wait()
await self.push_frame(UserStartedSpeakingFrame())
async def _handle_evt_speech_stopped(self, evt):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self._stop_interruption()
await self.push_frame(UserStoppedSpeakingFrame())
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):

View File

@@ -25,8 +25,8 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -312,7 +312,7 @@ class PlayHTTTSService(InterruptibleTTSService):
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by stopping metrics and clearing request ID."""
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()

View File

@@ -24,15 +24,14 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import AudioContextWordTTSService, TTSService
from pipecat.transcriptions import language
from pipecat.transcriptions.language import Language
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
@@ -280,7 +279,7 @@ class RimeTTSService(AudioContextWordTTSService):
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by clearing current context."""
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
@@ -375,7 +374,7 @@ class RimeTTSService(AudioContextWordTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("Reset", 0)])

View File

@@ -20,9 +20,9 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -455,7 +455,7 @@ class SarvamTTSService(InterruptibleTTSService):
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
self._started = False
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -15,8 +15,8 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterruptionFrame,
OutputImageRawFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
UserStartedSpeakingFrame,
@@ -179,7 +179,7 @@ class SimliVideoService(FrameProcessor):
return
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
elif isinstance(frame, (StartInterruptionFrame, UserStartedSpeakingFrame)):
elif isinstance(frame, (InterruptionFrame, UserStartedSpeakingFrame)):
if not self._previously_interrupted:
await self._simli_client.clearBuffer()
self._previously_interrupted = self._is_trinity_avatar

View File

@@ -19,7 +19,6 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotInterruptionFrame,
CancelFrame,
EndFrame,
ErrorFrame,
@@ -749,14 +748,13 @@ class SpeechmaticsSTTService(STTService):
return
# Frames to send
upstream_frames: list[Frame] = []
downstream_frames: list[Frame] = []
# If VAD is enabled, then send a speaking frame
if self._params.enable_vad and not self._is_speaking:
logger.debug("User started speaking")
self._is_speaking = True
upstream_frames += [BotInterruptionFrame()]
await self.push_interruption_task_frame_and_wait()
downstream_frames += [UserStartedSpeakingFrame()]
# If final, then re-parse into TranscriptionFrame
@@ -794,10 +792,6 @@ class SpeechmaticsSTTService(STTService):
self._is_speaking = False
downstream_frames += [UserStoppedSpeakingFrame()]
# Send UPSTREAM frames
for frame in upstream_frames:
await self.push_frame(frame, FrameDirection.UPSTREAM)
# Send the DOWNSTREAM frames
for frame in downstream_frames:
await self.push_frame(frame, FrameDirection.DOWNSTREAM)

View File

@@ -23,12 +23,12 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterruptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
OutputTransportReadyFrame,
SpeechOutputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
)
@@ -222,7 +222,7 @@ class TavusVideoService(AIService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._handle_interruptions()
await self.push_frame(frame, direction)
elif isinstance(frame, TTSAudioRawFrame):

View File

@@ -20,10 +20,10 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
StartFrame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
TTSAudioRawFrame,
@@ -309,7 +309,7 @@ class TTSService(AIService):
and not isinstance(frame, TranscriptionFrame)
):
await self._process_text_frame(frame)
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption(frame, direction)
await self.push_frame(frame, direction)
elif isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
@@ -367,14 +367,14 @@ class TTSService(AIService):
await super().push_frame(frame, direction)
if self._push_stop_frames and (
isinstance(frame, StartInterruptionFrame)
isinstance(frame, InterruptionFrame)
or isinstance(frame, TTSStartedFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TTSStoppedFrame)
):
await self._stop_frame_queue.put(frame)
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
self._processing_text = False
await self._text_aggregator.handle_interruption()
for filter in self._text_filters:
@@ -438,7 +438,7 @@ class TTSService(AIService):
)
if isinstance(frame, TTSStartedFrame):
has_started = True
elif isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
elif isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
has_started = False
except asyncio.TimeoutError:
if has_started:
@@ -523,7 +523,7 @@ class WordTTSService(TTSService):
elif isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
await self.flush_audio()
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
self._llm_response_started = False
self.reset_word_timestamps()
@@ -613,7 +613,7 @@ class InterruptibleTTSService(WebsocketTTSService):
# user interrupts we need to reconnect.
self._bot_speaking = False
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
if self._bot_speaking:
await self._disconnect()
@@ -685,7 +685,7 @@ class InterruptibleWordTTSService(WebsocketWordTTSService):
# user interrupts we need to reconnect.
self._bot_speaking = False
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
if self._bot_speaking:
await self._disconnect()
@@ -813,7 +813,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
await super().cancel(frame)
await self._stop_audio_context_task()
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self._stop_audio_context_task()
self._create_audio_context_task()

View File

@@ -128,7 +128,7 @@ async def run_test(
expected_up_frames: Optional[Sequence[type]] = None,
ignore_start: bool = True,
observers: Optional[List[BaseObserver]] = None,
start_metadata: Optional[Dict[str, Any]] = None,
pipeline_params: Optional[PipelineParams] = None,
send_end_frame: bool = True,
) -> Tuple[Sequence[Frame], Sequence[Frame]]:
"""Run a test pipeline with the specified processor and validate frame flow.
@@ -144,7 +144,7 @@ async def run_test(
expected_up_frames: Expected frame types flowing upstream (optional).
ignore_start: Whether to ignore StartFrames in frame validation.
observers: Optional list of observers to attach to the pipeline.
start_metadata: Optional metadata to include with the StartFrame.
pipeline_params: Optional pipeline parameters.
send_end_frame: Whether to send an EndFrame at the end of the test.
Returns:
@@ -154,7 +154,7 @@ async def run_test(
AssertionError: If the received frames don't match the expected frame types.
"""
observers = observers or []
start_metadata = start_metadata or {}
pipeline_params = pipeline_params or PipelineParams()
received_up = asyncio.Queue()
received_down = asyncio.Queue()
@@ -173,7 +173,7 @@ async def run_test(
task = PipelineTask(
pipeline,
params=PipelineParams(start_metadata=start_metadata),
params=pipeline_params,
observers=observers,
cancel_on_idle_timeout=False,
)

View File

@@ -22,7 +22,6 @@ from pipecat.audio.turn.base_turn_analyzer import (
)
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -36,7 +35,6 @@ from pipecat.frames.frames import (
MetricsFrame,
SpeechControlParamsFrame,
StartFrame,
StartInterruptionFrame,
StopFrame,
SystemFrame,
UserSpeakingFrame,
@@ -289,8 +287,6 @@ class BaseInputTransport(FrameProcessor):
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, BotInterruptionFrame):
await self._handle_bot_interruption(frame)
elif isinstance(frame, BotStartedSpeakingFrame):
await self._handle_bot_started_speaking(frame)
await self.push_frame(frame, direction)
@@ -335,13 +331,6 @@ class BaseInputTransport(FrameProcessor):
# Handle interruptions
#
async def _handle_bot_interruption(self, frame: BotInterruptionFrame):
"""Handle bot interruption frames."""
logger.debug("Bot interruption")
if self.interruptions_allowed:
await self._start_interruption()
await self.push_frame(StartInterruptionFrame())
async def _handle_user_interruption(self, vad_state: VADState, emulated: bool = False):
"""Handle user interruption events based on speaking state."""
if vad_state == VADState.SPEAKING:
@@ -353,7 +342,7 @@ class BaseInputTransport(FrameProcessor):
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
# Only push StartInterruptionFrame if:
# Only push InterruptionFrame if:
# 1. No interruption config is set, OR
# 2. Interruption config is set but bot is not speaking
should_push_immediate_interruption = (
@@ -362,11 +351,7 @@ class BaseInputTransport(FrameProcessor):
# Make sure we notify about interruptions quickly out-of-band.
if should_push_immediate_interruption and self.interruptions_allowed:
await self._start_interruption()
# Push an out-of-band frame (i.e. not using the ordered push
# frame task) to stop everything, specially at the output
# transport.
await self.push_frame(StartInterruptionFrame())
await self.push_interruption_task_frame_and_wait()
elif self.interruption_strategies and self._bot_speaking:
logger.debug(
"User started speaking while bot is speaking with interruption config - "
@@ -381,9 +366,6 @@ class BaseInputTransport(FrameProcessor):
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
if self.interruptions_allowed:
await self._stop_interruption()
#
# Handle bot speaking state
#

View File

@@ -30,6 +30,7 @@ from pipecat.frames.frames import (
EndFrame,
Frame,
InputTransportMessageUrgentFrame,
InterruptionFrame,
MixerControlFrame,
OutputAudioRawFrame,
OutputDTMFFrame,
@@ -39,7 +40,6 @@ from pipecat.frames.frames import (
SpeechOutputAudioRawFrame,
SpriteFrame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
@@ -287,9 +287,8 @@ class BaseOutputTransport(FrameProcessor):
await super().process_frame(frame, direction)
#
# System frames (like StartInterruptionFrame) are pushed
# immediately. Other frames require order so they are put in the sink
# queue.
# System frames (like InterruptionFrame) are pushed immediately. Other
# frames require order so they are put in the sink queue.
#
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
@@ -299,7 +298,7 @@ class BaseOutputTransport(FrameProcessor):
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame):
elif isinstance(frame, InterruptionFrame):
await self.push_frame(frame, direction)
await self._handle_frame(frame)
elif isinstance(frame, TransportMessageUrgentFrame) and not isinstance(
@@ -340,7 +339,7 @@ class BaseOutputTransport(FrameProcessor):
sender = self._media_senders[frame.transport_destination]
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await sender.handle_interruptions(frame)
elif isinstance(frame, OutputAudioRawFrame):
await sender.handle_audio_frame(frame)
@@ -491,7 +490,7 @@ class BaseOutputTransport(FrameProcessor):
await self._cancel_clock_task()
await self._cancel_video_task()
async def handle_interruptions(self, _: StartInterruptionFrame):
async def handle_interruptions(self, _: InterruptionFrame):
"""Handle interruption events by restarting tasks and clearing buffers.
Args:
@@ -672,7 +671,7 @@ class BaseOutputTransport(FrameProcessor):
frame = self._audio_queue.get_nowait()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._mixer.mix(frame.audio)
last_frame_time = time.time()
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
# Notify the bot stopped speaking upstream if necessary.

View File

@@ -25,9 +25,9 @@ from pipecat.frames.frames import (
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
OutputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -618,7 +618,7 @@ class TavusOutputTransport(BaseOutputTransport):
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._handle_interruptions()
async def _handle_interruptions(self):

View File

@@ -26,9 +26,9 @@ from pipecat.frames.frames import (
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
OutputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -398,7 +398,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._write_frame(frame)
self._next_send_time = 0

View File

@@ -25,9 +25,9 @@ from pipecat.frames.frames import (
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
OutputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -334,7 +334,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
if isinstance(frame, InterruptionFrame):
await self._write_frame(frame)
self._next_send_time = 0

View File

@@ -8,25 +8,31 @@ import json
import unittest
from typing import Any
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
EmulateUserStartedSpeakingFrame,
EmulateUserStoppedSpeakingFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
FunctionCallResultProperties,
InterimTranscriptionFrame,
InterruptionFrame,
InterruptionTaskFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
OpenAILLMContextAssistantTimestampFrame,
SpeechControlParamsFrame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
@@ -36,6 +42,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.anthropic.llm import (
AnthropicAssistantContextAggregator,
AnthropicLLMContext,
@@ -481,6 +488,103 @@ class BaseTestUserContextAggregator:
)
self.check_message_content(context, 0, "How are you?")
async def test_min_words_interruption_strategy_one_word(self):
assert self.CONTEXT_CLASS is not None, "CONTEXT_CLASS must be set in a subclass"
assert self.AGGREGATOR_CLASS is not None, "AGGREGATOR_CLASS must be set in a subclass"
class ContextProcessor(FrameProcessor):
def __init__(self):
super().__init__()
self.context_received = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
self.context_received = True
await self.push_frame(frame, direction)
context = self.CONTEXT_CLASS()
aggregator = self.AGGREGATOR_CLASS(context)
context_processor = ContextProcessor()
pipeline = Pipeline([aggregator, context_processor])
frames_to_send = [
BotStartedSpeakingFrame(),
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Can", user_id="cat", timestamp=""),
SleepFrame(),
UserStoppedSpeakingFrame(),
]
expected_down_frames = [
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
pipeline_params=PipelineParams(
interruption_strategies=[MinWordsInterruptionStrategy(min_words=2)]
),
)
assert not context_processor.context_received
async def test_min_words_interruption_strategy_two_words(self):
assert self.CONTEXT_CLASS is not None, "CONTEXT_CLASS must be set in a subclass"
assert self.AGGREGATOR_CLASS is not None, "AGGREGATOR_CLASS must be set in a subclass"
class ContextProcessor(FrameProcessor):
def __init__(self):
super().__init__()
self.context_received = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
self.context_received = True
elif isinstance(frame, InterruptionFrame):
self.context_received = False
await self.push_frame(frame, direction)
context = self.CONTEXT_CLASS()
aggregator = self.AGGREGATOR_CLASS(context)
context_processor = ContextProcessor()
pipeline = Pipeline([aggregator, context_processor])
frames_to_send = [
BotStartedSpeakingFrame(),
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Can you", user_id="cat", timestamp=""),
SleepFrame(),
UserStoppedSpeakingFrame(),
]
expected_up_frames = [InterruptionTaskFrame]
expected_down_frames = [
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
InterruptionFrame,
UserStoppedSpeakingFrame,
*self.EXPECTED_CONTEXT_FRAMES,
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_up_frames=expected_up_frames,
expected_down_frames=expected_down_frames,
pipeline_params=PipelineParams(
interruption_strategies=[MinWordsInterruptionStrategy(min_words=2)]
),
)
self.check_message_content(context, 0, "Can you")
# If the context is not received or it has been cleared by the
# interruption then we have an issue.
assert context_processor.context_received
class BaseTestAssistantContextAggreagator:
CONTEXT_CLASS = None # To be set in subclasses
@@ -618,7 +722,7 @@ class BaseTestAssistantContextAggreagator:
TextFrame(text="Pipecat."),
LLMFullResponseEndFrame(),
SleepFrame(AGGREGATION_SLEEP),
StartInterruptionFrame(),
InterruptionFrame(),
LLMFullResponseStartFrame(),
TextFrame(text="How are "),
TextFrame(text="you?"),
@@ -626,7 +730,7 @@ class BaseTestAssistantContextAggreagator:
]
expected_down_frames = [
*self.EXPECTED_CONTEXT_FRAMES,
StartInterruptionFrame,
InterruptionFrame,
*self.EXPECTED_CONTEXT_FRAMES,
]
await run_test(

View File

@@ -10,6 +10,7 @@ from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.frames.frames import (
EndFrame,
InputDTMFFrame,
InterruptionFrame,
TranscriptionFrame,
)
from pipecat.processors.aggregators.dtmf_aggregator import DTMFAggregator
@@ -28,6 +29,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
InputDTMFFrame,
InputDTMFFrame,
@@ -59,9 +61,11 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
TranscriptionFrame, # First aggregation "12"
InputDTMFFrame,
InterruptionFrame,
TranscriptionFrame, # Second aggregation "3"
]
@@ -93,10 +97,12 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
InputDTMFFrame,
TranscriptionFrame, # "12#"
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
TranscriptionFrame, # "45"
]
@@ -125,6 +131,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
TranscriptionFrame, # Should flush before EndFrame
EndFrame,
@@ -152,6 +159,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
TranscriptionFrame,
]
@@ -178,6 +186,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InputDTMFFrame,
InputDTMFFrame,
TranscriptionFrame,
@@ -214,7 +223,11 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
# All the InputDTMFFrames plus one TranscriptionFrame
expected_down_frames = [InputDTMFFrame] * len(frames_to_send) + [TranscriptionFrame]
expected_down_frames = (
[InputDTMFFrame, InterruptionFrame]
+ [InputDTMFFrame] * (len(frames_to_send) - 1)
+ [TranscriptionFrame]
)
received_down_frames, _ = await run_test(
aggregator,

View File

@@ -7,10 +7,10 @@
import unittest
from pipecat.frames.frames import (
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
)
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.tests.utils import SleepFrame, run_test
@@ -113,7 +113,7 @@ class TestLLMFullResponseAggregator(unittest.IsolatedAsyncioTestCase):
LLMFullResponseStartFrame(),
LLMTextFrame("Hello "),
SleepFrame(),
StartInterruptionFrame(),
InterruptionFrame(),
LLMFullResponseStartFrame(),
LLMTextFrame("Hello "),
LLMTextFrame("there!"),
@@ -122,7 +122,7 @@ class TestLLMFullResponseAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
InterruptionFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
LLMTextFrame,

View File

@@ -65,7 +65,7 @@ class TestPipeline(unittest.IsolatedAsyncioTestCase):
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
ignore_start=False,
start_metadata={"foo": "bar"},
pipeline_params=PipelineParams(start_metadata={"foo": "bar"}),
)
assert "foo" in received_down[-1].metadata

View File

@@ -14,7 +14,7 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
StartInterruptionFrame,
InterruptionFrame,
TranscriptionFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
@@ -238,7 +238,7 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase):
TTSTextFrame(text="Hello"),
TTSTextFrame(text="world!"),
SleepFrame(),
StartInterruptionFrame(), # User interrupts here
InterruptionFrame(), # User interrupts here
SleepFrame(),
BotStartedSpeakingFrame(),
TTSTextFrame(text="New"),
@@ -252,7 +252,7 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase):
BotStartedSpeakingFrame,
TTSTextFrame, # "Hello"
TTSTextFrame, # "world!"
StartInterruptionFrame,
InterruptionFrame,
TranscriptionUpdateFrame, # First message (emitted due to interruption)
BotStartedSpeakingFrame,
TTSTextFrame, # "New"

14
uv.lock generated
View File

@@ -4307,6 +4307,12 @@ local-smart-turn = [
{ name = "torchaudio" },
{ name = "transformers" },
]
local-smart-turn-v3 = [
{ name = "onnxruntime" },
{ name = "torch" },
{ name = "torchaudio" },
{ name = "transformers" },
]
mcp = [
{ name = "mcp", extra = ["cli"] },
]
@@ -4460,7 +4466,8 @@ requires-dist = [
{ name = "numba", specifier = "==0.61.2" },
{ name = "numpy", specifier = ">=1.26.4,<3" },
{ name = "nvidia-riva-client", marker = "extra == 'riva'", specifier = "~=2.21.1" },
{ name = "onnxruntime", marker = "extra == 'silero'", specifier = "~=1.20.1" },
{ name = "onnxruntime", marker = "extra == 'local-smart-turn-v3'", specifier = ">=1.20.1,<2" },
{ name = "onnxruntime", marker = "extra == 'silero'", specifier = ">=1.20.1,<2" },
{ name = "openai", specifier = ">=1.74.0,<=1.99.1" },
{ name = "opencv-python", marker = "extra == 'webrtc'", specifier = "~=4.11.0.86" },
{ name = "openpipe", marker = "extra == 'openpipe'", specifier = "~=4.50.0" },
@@ -4488,8 +4495,11 @@ requires-dist = [
{ name = "tenacity", marker = "extra == 'livekit'", specifier = ">=8.2.3,<10.0.0" },
{ name = "timm", marker = "extra == 'moondream'", specifier = "~=1.0.13" },
{ name = "torch", marker = "extra == 'local-smart-turn'", specifier = ">=2.5.0,<3" },
{ name = "torch", marker = "extra == 'local-smart-turn-v3'", specifier = ">=2.5.0,<3" },
{ name = "torchaudio", marker = "extra == 'local-smart-turn'", specifier = ">=2.5.0,<3" },
{ name = "torchaudio", marker = "extra == 'local-smart-turn-v3'", specifier = ">=2.5.0,<3" },
{ name = "transformers", marker = "extra == 'local-smart-turn'" },
{ name = "transformers", marker = "extra == 'local-smart-turn-v3'" },
{ name = "transformers", marker = "extra == 'moondream'", specifier = ">=4.48.0" },
{ name = "transformers", marker = "extra == 'ultravox'", specifier = ">=4.48.0" },
{ name = "uvicorn", marker = "extra == 'runner'", specifier = ">=0.32.0,<1.0.0" },
@@ -4513,7 +4523,7 @@ requires-dist = [
{ name = "websockets", marker = "extra == 'soniox'", specifier = ">=13.1,<15.0" },
{ name = "websockets", marker = "extra == 'websocket'", specifier = ">=13.1,<15.0" },
]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "deepseek", "daily", "deepgram", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "inworld", "krisp", "koala", "langchain", "livekit", "lmnt", "local", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nim", "neuphonic", "noisereduce", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "rime", "riva", "runner", "sambanova", "sarvam", "sentry", "local-smart-turn", "remote-smart-turn", "silero", "simli", "soniox", "soundfile", "speechmatics", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "whisper"]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "deepseek", "daily", "deepgram", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "inworld", "krisp", "koala", "langchain", "livekit", "lmnt", "local", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nim", "neuphonic", "noisereduce", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "rime", "riva", "runner", "sambanova", "sarvam", "sentry", "local-smart-turn", "local-smart-turn-v3", "remote-smart-turn", "silero", "simli", "soniox", "soundfile", "speechmatics", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "whisper"]
[package.metadata.requires-dev]
dev = [