Compare commits

...

62 Commits

Author SHA1 Message Date
Chad Bailey
33d813ed8f added JSON serializtion and additional testing utilities 2025-10-27 15:57:24 +00:00
kompfner
20f4b0e8ff Merge pull request #2914 from pipecat-ai/pk/gemini-function-calling-fixes
Gemini function calling fixes
2025-10-27 09:45:29 -04:00
Paul Kompfner
6feaf91789 Fix a bug in GeminiLLMAdapter's handling of Gemini-specific context messages 2025-10-27 09:42:24 -04:00
Mark Backman
91d3ae07b3 Merge pull request #2915 from Rickaym/fix--rounding-the-edges-of-observer-function-method-deprecation
fix: use correct  property names
2025-10-24 19:42:34 -04:00
Pyae Sone Myo
71841f71ef fix: use correct property names 2025-10-25 00:47:46 +06:30
Paul Kompfner
949b807023 Close genai client more gracefully to avoid printed warnings. We're now following the genai library guidance: https://github.com/googleapis/python-genai?tab=readme-ov-file#close-a-client 2025-10-24 11:36:25 -04:00
Paul Kompfner
4ad15f9a01 Update Gemini service to include function name when sending function responses in context 2025-10-24 11:04:52 -04:00
Paul Kompfner
99d94fc625 Update Gemini service to use "user" role for function responses, as shown in the Gemini docs 2025-10-24 10:05:14 -04:00
Mark Backman
a3d630c0d1 Merge pull request #2908 from pipecat-ai/mb/runner-daily-start-route
fix: add support for DAILY_SAMPLE_ROOM_URL when calling /start for Da…
2025-10-23 14:15:42 -04:00
Mark Backman
04b482c445 Merge branch 'main' into mb/runner-daily-start-route 2025-10-23 14:11:38 -04:00
Mark Backman
b2bce4916f Merge pull request #2900 from pipecat-ai/mb/quickstart-pipecat-cli
Quickstart to use Pipecat CLI
2025-10-23 10:55:42 -04:00
Mark Backman
60e9817f16 fix: add support for DAILY_SAMPLE_ROOM_URL when calling /start for DailyTransport 2025-10-22 16:48:30 -04:00
kompfner
c655d0d313 Merge pull request #2907 from pipecat-ai/mb/service-switcher-updates
ServiceSwitcher updates
2025-10-22 11:23:48 -04:00
Paul Kompfner
ea6e146f2d Update TestServiceSwitcher to exercise targeting system frames only to the active service 2025-10-22 11:14:27 -04:00
Mark Backman
ec890a834f Rename to filter_system_frames 2025-10-22 11:01:33 -04:00
Mark Backman
5b921fc054 fix: FunctionFilter adds block_system_frame arg 2025-10-22 10:53:01 -04:00
Mark Backman
f1040100f4 Update ServiceSwitcher and LLMSwitcher docstrings 2025-10-22 10:51:03 -04:00
Mark Backman
54691ee781 Merge pull request #2904 from pipecat-ai/mb/bump-aws-sdk-bedrock-runtime
Upgrade aws_sdk_bedrock_runtime to v0.1.1
2025-10-22 08:58:48 -04:00
Mark Backman
49239a23c6 Upgrade aws_sdk_bedrock_runtime to v0.1.1 2025-10-21 23:27:38 -04:00
Aleix Conchillo Flaqué
e0c43de13f Merge pull request #2903 from pipecat-ai/aleix/pipecat-0.0.91
update CHANGELOG for 0.0.91
2025-10-21 19:09:23 -07:00
Aleix Conchillo Flaqué
cc4c96d099 update CHANGELOG for 0.0.91 2025-10-21 19:00:51 -07:00
Aleix Conchillo Flaqué
788465cb04 Merge pull request #2901 from pipecat-ai/pk/llmcontext-messages
Add `messages` property to `LLMContext` for usage parity with `OpenAI…
2025-10-21 18:00:33 -07:00
Aleix Conchillo Flaqué
db934eade0 Merge pull request #2891 from pipecat-ai/aleix/daily-pipecat-runner-args
runner: allow starting a bot from Daily's /start endpoint
2025-10-21 17:59:13 -07:00
Mark Backman
0b8c966a11 Merge pull request #2892 from pipecat-ai/mb/aws-llm-claude-fix
fix: AWSBedrockLLMService compatibility for newer Claude models
2025-10-21 20:50:20 -04:00
Mark Backman
5849485bc6 fix: AWSBedrockLLMService compatibility for newer Claude models 2025-10-21 19:47:27 -04:00
Aleix Conchillo Flaqué
459af58540 runner: allow starting a bot from Daily's /start endpoint 2025-10-21 16:28:11 -07:00
Aleix Conchillo Flaqué
576bd67e85 runner: add body field to RunnerArguments 2025-10-21 16:27:48 -07:00
Aleix Conchillo Flaqué
1e8629bf96 runner: allow passing an api_key to configure 2025-10-21 16:27:48 -07:00
Paul Kompfner
776a3526f9 Add messages property to LLMContext for usage parity with OpenAILLMContext.
This wasn't really an issue before, when folks were *knowingly* migrating from `OpenAILLMContext` to `LLMContext`. But in the latest AWS Nova Sonic change, we're swapping it out from under folks, so this kind of compatibility is more important.

For context, the reason we *didn't* offer the `messages` property earlier was to aid in the development of `LLMContext`—we wanted to draw attention to all the places where messages were being read from context, so we could find the places where we might need to pass an argument to the read.
2025-10-21 17:38:39 -04:00
kompfner
2ced044418 Merge pull request #2896 from pipecat-ai/pk/add-back-types-that-were-meant-to-be-deprecated-not-removed
Add back types that were removed, when they should only have been dep…
2025-10-21 17:33:17 -04:00
Mark Backman
151f187837 Merge pull request #2895 from pipecat-ai/mb/update-env-example
Organize the env.example file
2025-10-21 17:15:22 -04:00
Mark Backman
67afa718d0 Merge pull request #2898 from pipecat-ai/mb/ellipses-changelog
Changelog entry for PR #2877
2025-10-21 17:02:08 -04:00
Mark Backman
52ab0eccc0 Quickstart to use Pipecat CLI 2025-10-21 15:57:45 -04:00
Vanessa Pyne
d1f1b68b71 Merge pull request #2863 from pipecat-ai/vp-custom-frame-processor-ex
add 08-custom-frame-processor.py to foundational examples
2025-10-21 14:15:38 -05:00
Mark Backman
a479c32665 Merge pull request #2894 from pipecat-ai/mb/cli-readme
Add Pipecat CLI to README's ecosystem section
2025-10-21 13:20:12 -04:00
Mark Backman
9f66b0ba41 Add Pipecat CLI to README's ecosystem section 2025-10-21 13:17:37 -04:00
vipyne
23385ca3d2 replace foundational example 08-bots-arguing.py with 08-custom-frame-processor.py 2025-10-21 11:56:35 -05:00
vipyne
8b24bae9c5 pr notes 2025-10-21 11:42:06 -05:00
Mark Backman
0502ec6c44 Changelog entry for PR #2877 2025-10-21 11:58:27 -04:00
Mark Backman
81645910e0 Merge pull request #2877 from nimobeeren/patch-1
Add ellipsis character to sentence ending punctuation list
2025-10-21 11:53:17 -04:00
Filipi da Silva Fuchter
d6ab4c41b0 Merge pull request #2897 from pipecat-ai/filipi/fix_proxy_route
Fixed an issue in the runner's proxy_request
2025-10-21 12:28:04 -03:00
Filipi Fuchter
2f92cb8781 Fixed an issue in the runner's proxy_request where a session that exists but has empty data was being treated as invalid. 2025-10-21 11:41:52 -03:00
Paul Kompfner
fbf274374c Add back types that were removed, when they should only have been deprecated 2025-10-21 09:56:31 -04:00
Mark Backman
427efecf5b Organize the env.example file 2025-10-21 09:43:46 -04:00
Filipi da Silva Fuchter
b3e54546ac Merge pull request #2888 from pipecat-ai/filipi/rtvi_duplicated_frames
Fixed an issue where the RTVIProcessor was sending duplicate UserStartedSpeakingFrame and UserStoppedSpeakingFrame messages.
2025-10-21 08:57:32 -03:00
Filipi Fuchter
de46631bac Fixed an issue where the RTVIProcessor was sending duplicate UserStartedSpeakingFrame and UserStoppedSpeakingFrame messages. 2025-10-20 18:39:00 -03:00
vipyne
abf0150261 add 47-custom-frame-processor.py to foundational examples 2025-10-20 12:11:40 -05:00
Aleix Conchillo Flaqué
a0c93ab6de update CHANGELOG cosmetics 2025-10-20 09:07:50 -07:00
Aleix Conchillo Flaqué
4bec566bbf Merge pull request #2885 from pipecat-ai/aleix/daily-python-0.20.0
pyproject: update daily-python to 0.20.0
2025-10-20 08:04:52 -07:00
Aleix Conchillo Flaqué
ec3cd24182 pyproject: update daily-python to 0.20.0 2025-10-20 08:04:34 -07:00
kompfner
e36e64c2e8 Merge pull request #2750 from pipecat-ai/pk/aws-nova-sonic-universal-llmcontext-1
Support new `LLMContext` pattern with `AWSNovaSonicLLMService`
2025-10-20 10:12:53 -04:00
Paul Kompfner
02a88022dd Add a bit more detail to CHANGELOG related to AWSNovaSonicLLMService's support for LLMContext 2025-10-20 10:06:09 -04:00
Paul Kompfner
6cae61f2cc Add a bit more detail to CHANGELOG entry about AWSNovaSonicLLMService's support for LLMContext 2025-10-20 09:50:23 -04:00
Paul Kompfner
3b40079120 Add a detailed warning when trying to import things from pipecat.services.aws_nova_sonic.context or pipecat.services.aws.nova_sonic.context 2025-10-20 09:49:05 -04:00
Paul Kompfner
ff0b38859b Remove AWS Nova Sonic's context.py, which was always concerned with types for internal use only. Now those types are either gone or moved elsewhere. 2025-10-20 09:49:05 -04:00
Paul Kompfner
4d499324d1 Re-apply a change to AWSNovaSonicLLMService that was lost in a rebase 2025-10-20 09:49:05 -04:00
Paul Kompfner
f13e006db2 Bump version in deprecation message in docstring 2025-10-20 09:49:05 -04:00
Paul Kompfner
87d9e8c9cd Re-apply a couple of recent changes to AWSNovaSonicLLMService that were lost in a rebase 2025-10-20 09:49:05 -04:00
Paul Kompfner
4820f1c059 Address some AWSNovaSonicLLMService context-recording edge cases 2025-10-20 09:49:05 -04:00
Paul Kompfner
860c39d1b1 Get rid of LLMContext.get_messages_for_persistent_storage().
The reason for its `system_instruction` argument was to support usage with LLMs where you might pass the system instruction as a parameter to the `LLMService` rather than specifying it in the context.

But as I thought about it more I became unconvinced that the `system_instruction` argument was really beneficial:

- If you specified your system instruction in your context in the first place, it'll still be there when you read messages for persistent storage
- If you didn't specify your system instruction in the context and instead passed it in as an `LLMService` parameter, you most likely *don't* want it to be in the context when you read messages for persistent storage
- ...and if you really really do need to inject it at the start of the context, it's quite easy to do anyway

And if we remove the `system_instruction` argument from `get_messages_for_persistent_storage()`, then it's essentially just `get_messages()`.
2025-10-20 09:49:05 -04:00
Paul Kompfner
ae5c5ed7f6 Update AWSNovaSonicLLMService to work with LLMContext and LLMContextAggregatorPair 2025-10-20 09:49:00 -04:00
Nimo Beeren
d1d74c571c add ellipsis character to sentence ending punctuation list 2025-10-17 10:38:06 +02:00
40 changed files with 1686 additions and 535 deletions

View File

@@ -7,14 +7,82 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- `FunctionFilter` now has a `filter_system_frames` arg, which controls whether
or not SystemFrames are filtered.
- Upgraded `aws_sdk_bedrock_runtime` to v0.1.1 to resolve potential CPU issues
when running `AWSNovaSonicLLMService`.
### Fixed
- Fixed an issue in the runner where starting a DailyTransport room via
`/start` didn't support using the `DAILY_SAMPLE_ROOM_URL` env var.
- Fixed an issue in `ServiceSwitcher` where the `STTService`s would result in
all STT services producing `TranscriptionFrame`s.
## [0.0.91] - 2025-10-21
### Added
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
- It is now possible to start a bot from the `/start` endpoint when using the
runner Daily's transport. This follows the Pipecat Cloud format with
`createDailyRoom` and `body` fields in the POST request body.
- Added an ellipsis character (`…`) to the end of sentence detection in the
string utils.
- Expanded support for universal `LLMContext` to `AWSNovaSonicLLMService`.
As a reminder, the context-setup pattern when using `LLMContext` is:
```python
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
(Note that even though `AWSNovaSonicLLMService` now supports the universal
`LLMContext`, it is not meant to be swapped out for another LLM service at
runtime.)
Worth noting: whether or not you use the new context-setup pattern with
`AWSNovaSonicLLMService`, some types have changed under the hood:
```python
## BEFORE:
# Context aggregator type
context_aggregator: AWSNovaSonicContextAggregatorPair
# Context frame type
frame: OpenAILLMContextFrame
# Context type
context: AWSNovaSonicLLMContext
# or
context: OpenAILLMContext
## AFTER:
# Context aggregator type
context_aggregator: LLMContextAggregatorPair
# Context frame type
frame: LLMContextFrame
# Context type
context: LLMContext
```
- Added support for `bulbul:v3` model in `SarvamTTSService` and
`SarvamHttpTTSService`.
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
-
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the
multilingual model.
- Added support for trickle ICE to the `SmallWebRTCTransport`.
- Added support for updating `OpenAITTSService` settings (`instructions` and
@@ -36,19 +104,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `RunnerArguments` now include the `body` field, so there's no need to add it
to subclasses. Also, all `RunnerArguments` fields are now keyword-only.
- `CartesiaSTTService` now inherits from `WebsocketSTTService`.
- Package upgrades:
- `daily-python` upgraded to 0.20.0.
- `openai` upgraded to support up to 2.x.x.
- `openpipe` upgraded to support up to 5.x.x.
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
### Deprecated
- The `send_transcription_frames` argument to `AWSNovaSonicLLMService` is
deprecated. Transcription frames are now always sent. They go upstream, to be
handled by the user context aggregator. See "Added" section for details.
- Types in `pipecat.services.aws.nova_sonic.context` have been deprecated due
to changes to support `LLMContext`. See "Changed" section for details.
### Fixed
- Fixed an issue where the `RTVIProcessor` was sending duplicate
`UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` messages.
- Fixed an issue in `AWSBedrockLLMService` where both `temperature` and `top_p`
were always sent together, causing conflicts with models like Claude Sonnet 4.5
that don't allow both parameters simultaneously. The service now only includes
inference parameters that are explicitly set, and `InputParams` defaults have
been changed to `None` to rely on AWS Bedrock's built-in model defaults.
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
to a mismatch in the _handle_transcription method's signature.
to a mismatch in the `_handle_transcription` method's signature.
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
now handled properly in `PipelineTask` making it possible to cancel an asyncio

View File

@@ -44,6 +44,10 @@ Looking to build structured conversations? Check out [Pipecat Flows](https://git
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
### 🛠️ Create and deploy projects
Create a new project in under a minute with the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli). Then use the CLI to monitor and deploy your agent to production.
### 🔍 Debugging
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.

View File

@@ -4,6 +4,9 @@ AICOUSTICS_LICENSE_KEY=...
# Anthropic
ANTHROPIC_API_KEY=...
# Assembly AI
ASSEMBLYAI_API_KEY=...
# Async
ASYNCAI_API_KEY=...
ASYNCAI_VOICE_ID=...
@@ -21,12 +24,19 @@ AZURE_CHATGPT_API_KEY=...
AZURE_CHATGPT_ENDPOINT=https://...
AZURE_CHATGPT_MODEL=...
AZURE_REALTIME_API_KEY=...
AZURE_REALTIME_BASE_URL=...
AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Cartesia
CARTESIA_API_KEY=...
CARTESIA_VOICE_ID=...
# Cerebras
CEREBRAS_API_KEY=...
# Daily
DAILY_API_KEY=...
@@ -35,57 +45,48 @@ DAILY_SAMPLE_ROOM_URL=https://...
# Deepgram
DEEPGRAM_API_KEY=...
# DeepSeek
DEEPSEEK_API_KEY=...
# ElevenLabs
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...
# Neuphonic
NEUPHONIC_API_KEY=...
# Fal
FAL_KEY=...
# Fireworks
FIREWORKS_API_KEY=...
# Fish Audio
FISH_API_KEY=...
# Gladia
GLADIA_API_KEY=...
GLADIA_REGION=...
# Google
GOOGLE_API_KEY=...
GOOGLE_CLOUD_PROJECT_ID=...
GOOGLE_TEST_CREDENTIALS=...
GOOGLE_VERTEX_TEST_CREDENTIALS=...
GOOGLE_CLOUD_PROJECT_ID=...
GOOGLE_CLOUD_LOCATION=...
GOOGLE_TEST_CREDENTIALS=...
# Grok
GROK_API_KEY=...
# Groq
GROQ_API_KEY=...
# Heygen
HEYGEN_API_KEY=...
# Hume
HUME_API_KEY=...
HUME_VOICE_ID=...
# LMNT
LMNT_API_KEY=...
LMNT_VOICE_ID=...
# Perplexity
PERPLEXITY_API_KEY=...
# PlayHT
PLAYHT_USER_ID=...
PLAYHT_API_KEY=...
# OpenAI
OPENAI_API_KEY=...
# OpenPipe
OPENPIPE_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
TAVUS_PERSONA_ID=...
# Simli
SIMLI_API_KEY=...
SIMLI_FACE_ID=...
# Inworld
INWORLD_API_KEY=...
# Krisp
KRISP_MODEL_PATH=...
@@ -93,77 +94,100 @@ KRISP_MODEL_PATH=...
# Krisp Viva
KRISP_VIVA_MODEL_PATH=...
# DeepSeek
DEEPSEEK_API_KEY=...
# LiveKit
LIVEKIT_API_KEY=...
LIVEKIT_API_SECRET=...
# Groq
GROQ_API_KEY=...
# Grok
GROK_API_KEY=...
# Inworld
INWORLD_API_KEY=...
# Together.ai
TOGETHER_API_KEY=...
# Cerebras
CEREBRAS_API_KEY=...
# Fish Audio
FISH_API_KEY=...
# Assembly AI
ASSEMBLYAI_API_KEY=...
# OpenRouter
OPENROUTER_API_KEY=...
# Piper
PIPER_BASE_URL=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=...
FAL_SMART_TURN_API_KEY=...
# Twilio
TWILIO_ACCOUNT_SID=...
TWILIO_AUTH_TOKEN=...
# LMNT
LMNT_API_KEY=...
LMNT_VOICE_ID=...
# MiniMax
MINIMAX_API_KEY=...
MINIMAX_GROUP_ID=...
# Sarvam AI
SARVAM_API_KEY=...
# Soniox
SONIOX_API_KEY=
# Speechmatics
SPEECHMATICS_API_KEY=...
# SambaNova
SAMBANOVA_API_KEY=...
# Sentry
SENTRY_DSN=...
# Heygen
HEYGEN_API_KEY=...
# Mistral
MISTRAL_API_KEY=...
# Neuphonic
NEUPHONIC_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# OpenAI
OPENAI_API_KEY=...
# OpenPipe
OPENPIPE_API_KEY=...
# OpenRouter
OPENROUTER_API_KEY=...
# Perplexity
PERPLEXITY_API_KEY=...
# Picovoice Koala
KOALA_ACCESS_KEY=...
# Piper
PIPER_BASE_URL=...
# PlayHT
PLAYHT_USER_ID=...
PLAYHT_API_KEY=...
# Plivo
PLIVO_AUTH_ID=...
PLIVO_AUTH_TOKEN=...
# Qwen
QWEN_API_KEY=...
# Rime
RIME_API_KEY=...
RIME_VOICE_ID=...
# SambaNova
SAMBANOVA_API_KEY=...
# Sarvam AI
SARVAM_API_KEY=...
# Sentry
SENTRY_DSN=...
# Simli
SIMLI_API_KEY=...
SIMLI_FACE_ID=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=...
FAL_SMART_TURN_API_KEY=...
# Soniox
SONIOX_API_KEY=...
# Speechmatics
SPEECHMATICS_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
# Telnyx
TELNYX_API_KEY=...
TELNYX_ACCOUNT_SID=...
# Together.ai
TOGETHER_API_KEY=...
# Twilio
TWILIO_ACCOUNT_SID=...
TWILIO_AUTH_TOKEN=...
# WhatsApp
WHATSAPP_TOKEN=
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=
WHATSAPP_PHONE_NUMBER_ID=
WHATSAPP_APP_SECRET=
WHATSAPP_TOKEN=...
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...
WHATSAPP_PHONE_NUMBER_ID=...
WHATSAPP_APP_SECRET=...

View File

@@ -67,8 +67,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = AWSBedrockLLMService(
aws_region="us-west-2",
model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"),
model="us.anthropic.claude-haiku-4-5-20251001-v1:0",
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
messages = [

View File

@@ -1,147 +0,0 @@
import asyncio
import logging
import os
from typing import Tuple
import aiohttp
from dotenv import load_dotenv
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMContextFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.daily import configure
from pipecat.services.azure import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.daily.transport import DailyTransport
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("pipecat")
logger.setLevel(logging.DEBUG)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
duration_minutes=10,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=True,
camera_width=1024,
camera_height=1024,
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
tts1 = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts2 = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="jBpfuIE2acCO8z3wKNLl",
)
dalle = FalImageGenService(
params=FalImageGenService.InputParams(image_size="1024x1024"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
bot1_messages = [
{
"role": "system",
"content": "You are a stern librarian. You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever compromise on the fundamental truth that a hot dog is a sandwich. Your responses should only be a few sentences long.",
},
]
bot2_messages = [
{
"role": "system",
"content": "You are a silly cat, and you strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences. Don't ever accept that a hot dog is a sandwich.",
},
]
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
"""This function streams text from the LLM and uses the TTS service to convert
that text to speech as it's received.
"""
source_queue = asyncio.Queue()
sink_queue = asyncio.Queue()
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
await source_queue.put(LLMContextFrame(LLMContext(messages)))
await source_queue.put(EndFrame())
await pipeline.run_pipeline()
message = ""
all_audio = bytearray()
while sink_queue.qsize():
frame = sink_queue.get_nowait()
if isinstance(frame, TextFrame):
message += frame.text
elif isinstance(frame, AudioFrame):
all_audio.extend(frame.audio)
return (message, all_audio)
async def get_bot1_statement():
message, audio = await get_text_and_audio(bot1_messages)
bot1_messages.append({"role": "assistant", "content": message})
bot2_messages.append({"role": "user", "content": message})
return audio
async def get_bot2_statement():
message, audio = await get_text_and_audio(bot2_messages)
bot2_messages.append({"role": "assistant", "content": message})
bot1_messages.append({"role": "user", "content": message})
return audio
async def argue():
for i in range(100):
print(f"In iteration {i}")
bot1_description = "A woman conservatively dressed as a librarian in a library surrounded by books, cartoon, serious, highly detailed"
(audio1, image_data1) = await asyncio.gather(
get_bot1_statement(), dalle.run_image_gen(bot1_description)
)
await transport.send_queue.put(
[
ImageFrame(image_data1[1], image_data1[2]),
AudioFrame(audio1),
]
)
bot2_description = "A cat dressed in a hot dog costume, cartoon, bright colors, funny, highly detailed"
(audio2, image_data2) = await asyncio.gather(
get_bot2_statement(), dalle.run_image_gen(bot2_description)
)
await transport.send_queue.put(
[
ImageFrame(image_data2[1], image_data2[2]),
AudioFrame(audio2),
]
)
await asyncio.gather(transport.run(), argue())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,170 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import io
import os
import re
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMRunFrame,
MetricsFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
def format_metrics(metrics, indent=0):
lines = []
tab = "\t" * indent
for metric in metrics:
lines.append(tab + type(metric).__name__)
for field, value in vars(metric).items():
if hasattr(value, "__dict__") and not isinstance(
value, (str, int, float, bool, type(None))
):
lines.append(f"{tab}\t{field}={type(value).__name__}")
for k, v in vars(value).items():
lines.append(f"{tab}\t\t{k}={repr(v)}")
else:
lines.append(f"{tab}\t{field}={repr(value)}")
return "\n".join(lines)
class MetricsFrameLogger(FrameProcessor):
"""MetricsFrameLogger formats and logs all MetericsFrames"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, MetricsFrame):
logger.info(f"{frame.name}\n {format_metrics(frame.data)}")
await self.push_frame(frame, direction)
# ALWAYS push all frames
else:
# SUPER IMPORTANT: always push every frame!
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
metrics_frame_processor = MetricsFrameLogger()
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
metrics_frame_processor, # pretty print metrics frames
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -79,8 +79,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = AWSBedrockLLMService(
aws_region="us-west-2",
model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"),
model="us.anthropic.claude-haiku-4-5-20251001-v1:0",
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
# You can also register a function_name of None to get all functions

View File

@@ -72,7 +72,6 @@ async def save_conversation(params: FunctionCallParams):
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = params.context.get_messages()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()

View File

@@ -90,7 +90,6 @@ async def save_conversation(params: FunctionCallParams):
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = params.context.get_messages()
# remove the last message (the instruction to save the context)
messages.pop()

View File

@@ -20,6 +20,8 @@ from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
@@ -75,7 +77,7 @@ async def save_conversation(params: FunctionCallParams):
filename = f"{BASE_FILENAME}{timestamp}.json"
try:
with open(filename, "w") as file:
messages = params.context.get_messages_for_persistent_storage()
messages = params.context.get_messages()
# remove the last few messages. in reverse order, they are:
# - the in progress save tool call
# - the invocation of the save tool call
@@ -223,13 +225,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext(
context = LLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
],
tools=tools,
)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[

View File

@@ -18,7 +18,8 @@ from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
@@ -119,9 +120,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm.register_function("get_current_weather", fetch_weather_from_api)
# Set up context and context management.
# AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to
# what's expected by Nova Sonic.
context = OpenAILLMContext(
context = LLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
{
@@ -131,7 +130,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
tools=tools,
)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = LLMContextAggregatorPair(context)
# Build the pipeline
pipeline = Pipeline(

View File

@@ -0,0 +1,153 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, ManuallySwitchServiceFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategyManual
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.stt import CartesiaSTTService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt_cartesia = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
stt_deepgram = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt_switcher = ServiceSwitcher(
services=[stt_cartesia, stt_deepgram], strategy_type=ServiceSwitcherStrategyManual
)
tts_cartesia = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",
)
tts_deepgram = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts_switcher = ServiceSwitcher(
services=[tts_cartesia, tts_deepgram], strategy_type=ServiceSwitcherStrategyManual
)
llm_openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm_google = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
llm_switcher = ServiceSwitcher(
services=[llm_openai, llm_google], strategy_type=ServiceSwitcherStrategyManual
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt_switcher,
context_aggregator.user(), # User responses
llm_switcher, # LLM
tts_switcher, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
await asyncio.sleep(15)
print(f"Switching to {stt_deepgram}")
await task.queue_frames([ManuallySwitchServiceFrame(service=stt_deepgram)])
await asyncio.sleep(15)
print(f"Switching to {llm_google}")
await task.queue_frames([ManuallySwitchServiceFrame(service=llm_google)])
await asyncio.sleep(15)
print(f"Switching to {tts_deepgram}")
await task.queue_frames([ManuallySwitchServiceFrame(service=tts_deepgram)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -73,13 +73,13 @@ Transform your local bot into a production-ready service. Pipecat Cloud handles
1. [Sign up for Pipecat Cloud](https://pipecat.daily.co/sign-up).
2. Install the Pipecat Cloud CLI:
2. Install the Pipecat CLI:
```bash
uv add pipecatcloud
uv tool install pipecat-ai-cli
```
> 💡 Tip: You can run the `pipecatcloud` CLI using the `pcc` alias.
> 💡 Tip: You can run the `pipecat` CLI using the `pc` alias.
3. Set up Docker for building your bot image:
@@ -113,12 +113,22 @@ secret_set = "quickstart-secrets"
> 💡 Tip: [Set up `image_credentials`](https://docs.pipecat.ai/deployment/pipecat-cloud/fundamentals/secrets#image-pull-secrets) in your TOML file for authenticated image pulls
### Log in to Pipecat Cloud
To start using the CLI, authenticate to Pipecat Cloud:
```bash
pipecat cloud auth login
```
You'll be presented with a link that you can click to authenticate your client.
### Configure secrets
Upload your API keys to Pipecat Cloud's secure storage:
```bash
uv run pcc secrets set quickstart-secrets --file .env
pipecat cloud secrets set quickstart-secrets --file .env
```
This creates a secret set called `quickstart-secrets` (matching your TOML file) and uploads all your API keys from `.env`.
@@ -128,13 +138,13 @@ This creates a secret set called `quickstart-secrets` (matching your TOML file)
Build your Docker image and push to Docker Hub:
```bash
uv run pcc docker build-push
pipecat cloud docker build-push
```
Deploy to Pipecat Cloud:
```bash
uv run pcc deploy
pipecat cloud deploy
```
### Connect to your agent

View File

@@ -1,6 +1,11 @@
agent_name = "quickstart"
image = "your_username/quickstart:0.1"
secret_set = "quickstart-secrets"
agent_profile = "agent-1x"
# RECOMMENDED: Set an image pull secret:
# https://docs.pipecat.ai/deployment/pipecat-cloud/fundamentals/secrets#image-pull-secrets
# image_credentials = "your_image_pull_secret"
[scaling]
min_agents = 1

View File

@@ -4,13 +4,14 @@ version = "0.1.0"
description = "Quickstart example for building voice AI bots with Pipecat"
requires-python = ">=3.10"
dependencies = [
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.86",
"pipecatcloud>=0.2.4"
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]",
"pipecat-ai-cli"
]
[dependency-groups]
dev = [
"ruff~=0.12.1",
"pyright>=1.1.404,<2",
"ruff>=0.12.11,<1",
]
[tool.ruff]

View File

@@ -50,12 +50,12 @@ anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "pipecat-ai[websockets-base]" ]
asyncai = [ "pipecat-ai[websockets-base]" ]
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.0; python_version>='3.12'" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.1; python_version>='3.12'" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.19.9" ]
daily = [ "daily-python~=0.20.0" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
elevenlabs = [ "pipecat-ai[websockets-base]" ]
fal = [ "fal-client~=0.5.9" ]

View File

@@ -110,7 +110,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
system = NOT_GIVEN
messages = []
# first, map messages using self._from_universal_context_message(m)
# First, map messages using self._from_universal_context_message(m)
try:
messages = [self._from_universal_context_message(m) for m in universal_context_messages]
except Exception as e:

View File

@@ -6,13 +6,47 @@
"""AWS Nova Sonic LLM adapter for Pipecat."""
import copy
import json
from typing import Any, Dict, List, TypedDict
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
class AWSNovaSonicLLMInvocationParams(TypedDict):
@@ -21,7 +55,9 @@ class AWSNovaSonicLLMInvocationParams(TypedDict):
This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic.
"""
pass
system_instruction: Optional[str]
messages: List[AWSNovaSonicConversationHistoryMessage]
tools: List[Dict[str, Any]]
class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
@@ -34,7 +70,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances for AWS Nova Sonic."""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
return "aws-nova-sonic"
def get_llm_invocation_params(self, context: LLMContext) -> AWSNovaSonicLLMInvocationParams:
"""Get AWS Nova Sonic-specific LLM invocation parameters from a universal LLM context.
@@ -47,7 +83,13 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
Dictionary of parameters for invoking AWS Nova Sonic's LLM API.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
messages = self._from_universal_context_messages(self.get_messages(context))
return {
"system_instruction": messages.system_instruction,
"messages": messages.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
}
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Nova Sonic.
@@ -62,7 +104,75 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
List of messages in a format ready for logging about AWS Nova Sonic.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
return self._from_universal_context_messages(self.get_messages(context)).messages
@dataclass
class ConvertedMessages:
"""Container for Google-formatted messages converted from universal context."""
messages: List[AWSNovaSonicConversationHistoryMessage]
system_instruction: Optional[str] = None
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
system_instruction = None
messages = []
# Bail if there are no messages
if not universal_context_messages:
return self.ConvertedMessages()
universal_context_messages = copy.deepcopy(universal_context_messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if universal_context_messages[0].get("role") == "system":
system = universal_context_messages.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
elif isinstance(content, list):
system_instruction = content[0].get("text")
if system_instruction:
self._system_instruction = system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for universal_context_message in universal_context_messages:
message = self._from_universal_context_message(universal_context_message)
if message:
messages.append(message)
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
@staticmethod
def _to_aws_nova_sonic_function_format(function: FunctionSchema) -> Dict[str, Any]:

View File

@@ -107,7 +107,7 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
system = None
messages = []
# first, map messages using self._from_universal_context_message(m)
# First, map messages using self._from_universal_context_message(m)
try:
messages = [self._from_universal_context_message(m) for m in universal_context_messages]
except Exception as e:

View File

@@ -8,8 +8,8 @@
import base64
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, TypedDict
from loguru import logger
from openai import NotGiven
@@ -133,6 +133,28 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
messages: List[Content]
system_instruction: Optional[str] = None
@dataclass
class MessageConversionResult:
"""Result of converting a single universal context message to Google format.
Either content (a Google Content object) or a system instruction string
is guaranteed to be set.
Also returns a tool call ID to name mapping for any tool calls
discovered in the message.
"""
content: Optional[Content] = None
system_instruction: Optional[str] = None
tool_call_id_to_name_mapping: Dict[str, str] = field(default_factory=dict)
@dataclass
class MessageConversionParams:
"""Parameters for converting a single universal context message to Google format."""
already_have_system_instruction: bool
tool_call_id_to_name_mapping: Dict[str, str]
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
@@ -156,24 +178,26 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
"""
system_instruction = None
messages = []
tool_call_id_to_name_mapping = {}
# Process each message, preserving Google-formatted messages and converting others
for message in universal_context_messages:
if isinstance(message, LLMSpecificMessage):
# Assume that LLMSpecificMessage wraps a message in Google format
messages.append(message.message)
continue
# Convert standard format to Google format
converted = self._from_standard_message(
message, already_have_system_instruction=bool(system_instruction)
result = self._from_universal_context_message(
message,
params=self.MessageConversionParams(
already_have_system_instruction=bool(system_instruction),
tool_call_id_to_name_mapping=tool_call_id_to_name_mapping,
),
)
if isinstance(converted, Content):
# Regular (non-system) message
messages.append(converted)
else:
# System instruction
system_instruction = converted
# Each result is either a Content or a system instruction
if result.content:
messages.append(result.content)
elif result.system_instruction:
system_instruction = result.system_instruction
# Merge tool call ID to name mapping
if result.tool_call_id_to_name_mapping:
tool_call_id_to_name_mapping.update(result.tool_call_id_to_name_mapping)
# Check if we only have function-related messages (no regular text)
has_regular_messages = any(
@@ -193,9 +217,16 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(
self, message: LLMContextMessage, *, params: MessageConversionParams
) -> MessageConversionResult:
if isinstance(message, LLMSpecificMessage):
return self.MessageConversionResult(content=message.message)
return self._from_standard_message(message, params=params)
def _from_standard_message(
self, message: LLMStandardMessage, already_have_system_instruction: bool
) -> Content | str:
self, message: LLMStandardMessage, *, params: MessageConversionParams
) -> MessageConversionResult:
"""Convert standard universal context message to Google Content object.
Handles conversion of text, images, and function calls to Google's
@@ -205,10 +236,11 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
Args:
message: Message in standard universal context format.
already_have_system_instruction: Whether we already have a system instruction
params: Parameters for conversion.
Returns:
Content object with role and parts, or a plain string for system
messages.
MessageConversionResult containing either a Content object or a
system instruction string.
Examples:
Standard text message::
@@ -242,38 +274,48 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
Converts to Google Content with::
Content(
role="model",
role="user",
parts=[Part(function_call=FunctionCall(name="search", args={"query": "test"}))]
)
"""
role = message["role"]
content = message.get("content", [])
if role == "system":
if already_have_system_instruction:
if params.already_have_system_instruction:
role = "user" # Convert system message to user role if we already have a system instruction
else:
# System instructions are returned as plain text
system_instruction: str = None
if isinstance(content, str):
return content
system_instruction = content
elif isinstance(content, list):
# If content is a list, we assume it's a list of text parts, per the standard
return " ".join(part["text"] for part in content if part.get("type") == "text")
system_instruction = " ".join(
part["text"] for part in content if part.get("type") == "text"
)
if system_instruction:
return self.MessageConversionResult(system_instruction=system_instruction)
elif role == "assistant":
role = "model"
parts = []
tool_call_id_to_name_mapping = {}
if message.get("tool_calls"):
for tc in message["tool_calls"]:
id = tc["id"]
name = tc["function"]["name"]
tool_call_id_to_name_mapping[id] = name
parts.append(
Part(
function_call=FunctionCall(
name=tc["function"]["name"],
name=name,
args=json.loads(tc["function"]["arguments"]),
)
)
)
elif role == "tool":
role = "model"
role = "user"
try:
response = json.loads(message["content"])
if isinstance(response, dict):
@@ -284,12 +326,17 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
# Response might not be JSON-deserializable.
# This occurs with a UserImageFrame, for example, where we get a plain "COMPLETED" string.
response_dict = {"value": message["content"]}
# Get function name from mapping using tool_call_id, or fallback
tool_call_id = message.get("tool_call_id")
function_name = "tool_call_result" # Default fallback
if tool_call_id and tool_call_id in params.tool_call_id_to_name_mapping:
function_name = params.tool_call_id_to_name_mapping[tool_call_id]
parts.append(
Part(
function_response=FunctionResponse(
name="tool_call_result", # seems to work to hard-code the same name every time
response=response_dict,
)
Part.from_function_response(
name=function_name,
response=response_dict,
)
)
elif isinstance(content, str):
@@ -312,4 +359,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
audio_bytes = base64.b64decode(input_audio["data"])
parts.append(Part(inline_data=Blob(mime_type="audio/wav", data=audio_bytes)))
return Content(role=role, parts=parts)
return self.MessageConversionResult(
content=Content(role=role, parts=parts),
tool_call_id_to_name_mapping=tool_call_id_to_name_mapping,
)

View File

@@ -14,20 +14,41 @@ from pipecat.services.llm_service import LLMService
class LLMSwitcher(ServiceSwitcher[StrategyType]):
"""A pipeline that switches between different LLMs at runtime."""
"""A pipeline that switches between different LLMs at runtime.
Example::
llm_switcher = LLMSwitcher(
llms=[openai_llm, anthropic_llm],
strategy_type=ServiceSwitcherStrategyManual
)
"""
def __init__(self, llms: List[LLMService], strategy_type: Type[StrategyType]):
"""Initialize the service switcher with a list of LLMs and a switching strategy."""
"""Initialize the service switcher with a list of LLMs and a switching strategy.
Args:
llms: List of LLM services to switch between.
strategy_type: The strategy class to use for switching between LLMs.
"""
super().__init__(llms, strategy_type)
@property
def llms(self) -> List[LLMService]:
"""Get the list of LLMs managed by this switcher."""
"""Get the list of LLMs managed by this switcher.
Returns:
List of LLM services managed by this switcher.
"""
return self.services
@property
def active_llm(self) -> Optional[LLMService]:
"""Get the currently active LLM, if any."""
"""Get the currently active LLM.
Returns:
The currently active LLM service, or None if no LLM is active.
"""
return self.strategy.active_service
async def run_inference(self, context: LLMContext) -> Optional[str]:

View File

@@ -21,10 +21,22 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class ServiceSwitcherStrategy:
"""Base class for service switching strategies."""
"""Base class for service switching strategies.
Note:
Strategy classes are instantiated internally by ServiceSwitcher.
Developers should pass the strategy class (not an instance) to ServiceSwitcher.
"""
def __init__(self, services: List[FrameProcessor]):
"""Initialize the service switcher strategy with a list of services."""
"""Initialize the service switcher strategy with a list of services.
Note:
This is called internally by ServiceSwitcher. Do not instantiate directly.
Args:
services: List of frame processors to switch between.
"""
self.services = services
self.active_service: Optional[FrameProcessor] = None
@@ -46,10 +58,24 @@ class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy):
This strategy allows the user to manually select which service is active.
The initial active service is the first one in the list.
Example::
stt_switcher = ServiceSwitcher(
services=[stt_1, stt_2],
strategy_type=ServiceSwitcherStrategyManual
)
"""
def __init__(self, services: List[FrameProcessor]):
"""Initialize the manual service switcher strategy with a list of services."""
"""Initialize the manual service switcher strategy with a list of services.
Note:
This is called internally by ServiceSwitcher. Do not instantiate directly.
Args:
services: List of frame processors to switch between.
"""
super().__init__(services)
self.active_service = services[0] if services else None
@@ -85,7 +111,12 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
"""A pipeline that switches between different services at runtime."""
def __init__(self, services: List[FrameProcessor], strategy_type: Type[StrategyType]):
"""Initialize the service switcher with a list of services and a switching strategy."""
"""Initialize the service switcher with a list of services and a switching strategy.
Args:
services: List of frame processors to switch between.
strategy_type: The strategy class to use for switching between services.
"""
strategy = strategy_type(services)
super().__init__(*self._make_pipeline_definitions(services, strategy))
self.services = services
@@ -100,14 +131,20 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
active_service: FrameProcessor,
direction: FrameDirection,
):
"""Initialize the service switcher filter with a strategy and direction."""
"""Initialize the service switcher filter with a strategy and direction.
Args:
wrapped_service: The service that this filter wraps.
active_service: The currently active service.
direction: The direction of frame flow to filter.
"""
self._wrapped_service = wrapped_service
self._active_service = active_service
async def filter(_: Frame) -> bool:
return self._wrapped_service == self._active_service
super().__init__(filter, direction)
self._wrapped_service = wrapped_service
self._active_service = active_service
super().__init__(filter, direction, filter_system_frames=True)
async def process_frame(self, frame, direction):
"""Process a frame through the filter, handling special internal filter-updating frames."""

View File

@@ -189,7 +189,7 @@ class TaskObserver(BaseObserver):
if isinstance(data, FramePushed):
if on_push_frame_deprecated:
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
data.source, data.destination, data.frame, data.direction, data.timestamp
)
else:
await observer.on_push_frame(data)

View File

@@ -15,9 +15,10 @@ service-specific adapter.
"""
import base64
import copy
import io
from dataclasses import dataclass
from typing import Any, List, Optional, TypeAlias, Union
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
from loguru import logger
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
@@ -31,6 +32,9 @@ from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import AudioRawFrame
if TYPE_CHECKING:
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
# "Re-export" types from OpenAI that we're using as universal context types.
# NOTE: if universal message types need to someday diverge from OpenAI's, we
# should consider managing our own definitions. But we should do so carefully,
@@ -65,6 +69,26 @@ class LLMContext:
and content formatting.
"""
@staticmethod
def from_openai_context(openai_context: "OpenAILLMContext") -> "LLMContext":
"""Create a universal LLM context from an OpenAI-specific context.
NOTE: this should only be used internally, for facilitating migration
from OpenAILLMContext to LLMContext. New user code should use
LLMContext directly.
Args:
openai_context: The OpenAI LLM context to convert.
Returns:
New LLMContext instance with converted messages and settings.
"""
return LLMContext(
messages=openai_context.get_messages(),
tools=openai_context.tools,
tool_choice=openai_context.tool_choice,
)
def __init__(
self,
messages: Optional[List[LLMContextMessage]] = None,
@@ -82,6 +106,19 @@ class LLMContext:
self._tools: ToolsSchema | NotGiven = LLMContext._normalize_and_validate_tools(tools)
self._tool_choice: LLMContextToolChoice | NotGiven = tool_choice
@property
def messages(self) -> List[LLMContextMessage]:
"""Get the current messages list.
NOTE: This is equivalent to calling `get_messages()` with no filter. If
you want to filter out LLM-specific messages that don't pertain to your
LLM, use `get_messages()` directly.
Returns:
List of conversation messages.
"""
return self.get_messages()
def get_messages(self, llm_specific_filter: Optional[str] = None) -> List[LLMContextMessage]:
"""Get the current messages list.
@@ -89,7 +126,8 @@ class LLMContext:
llm_specific_filter: Optional filter to return LLM-specific
messages for the given LLM, in addition to the standard
messages. If messages end up being filtered, an error will be
logged.
logged; this is intended to catch accidental use of
incompatible LLM-specific messages.
Returns:
List of conversation messages.

View File

@@ -12,7 +12,7 @@ allowing for flexible frame filtering logic in processing pipelines.
from typing import Awaitable, Callable
from pipecat.frames.frames import EndFrame, Frame, SystemFrame
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame, SystemFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -28,6 +28,7 @@ class FunctionFilter(FrameProcessor):
self,
filter: Callable[[Frame], Awaitable[bool]],
direction: FrameDirection = FrameDirection.DOWNSTREAM,
filter_system_frames: bool = False,
):
"""Initialize the function filter.
@@ -36,22 +37,32 @@ class FunctionFilter(FrameProcessor):
frame should pass through, False otherwise.
direction: The direction to apply filtering. Only frames moving in
this direction will be filtered. Defaults to DOWNSTREAM.
filter_system_frames: Whether to filter system frames. Defaults to False.
"""
super().__init__()
self._filter = filter
self._direction = direction
self._filter_system_frames = filter_system_frames
#
# Frame processor
#
# Ignore system frames, end frames and frames that are not following the
# direction of this gate
def _should_passthrough_frame(self, frame, direction):
"""Check if a frame should pass through without filtering."""
# Ignore system frames, end frames and frames that are not following the
# direction of this gate
return isinstance(frame, (SystemFrame, EndFrame)) or direction != self._direction
# Always passthrough frames in the wrong direction
if direction != self._direction:
return True
# Always passthrough lifecycle frames
if isinstance(frame, (StartFrame, EndFrame, CancelFrame)):
return True
# If not filtering system frames, passthrough all other system frames
if not self._filter_system_frames and isinstance(frame, SystemFrame):
return True
return False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process a frame through the filter.

View File

@@ -1018,6 +1018,7 @@ class RTVIObserver(BaseObserver):
if (
isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame))
and (direction == FrameDirection.DOWNSTREAM)
and self._params.user_speaking_enabled
):
await self._handle_interruptions(frame)

View File

@@ -76,6 +76,7 @@ class DailyRoomConfig(BaseModel):
async def configure(
aiohttp_session: aiohttp.ClientSession,
*,
api_key: Optional[str] = None,
room_exp_duration: Optional[float] = 2.0,
token_exp_duration: Optional[float] = 2.0,
sip_caller_phone: Optional[str] = None,
@@ -92,6 +93,7 @@ async def configure(
Args:
aiohttp_session: HTTP session for making API requests.
api_key: Daily API key.
room_exp_duration: Room expiration time in hours.
token_exp_duration: Token expiration time in hours.
sip_caller_phone: Phone number or identifier for SIP display name.
@@ -129,7 +131,7 @@ async def configure(
config = await configure(session, room_properties=custom_props)
"""
# Check for required API key
api_key = os.getenv("DAILY_API_KEY")
api_key = api_key or os.getenv("DAILY_API_KEY")
if not api_key:
raise Exception(
"DAILY_API_KEY environment variable is required. "

View File

@@ -82,6 +82,7 @@ from loguru import logger
from pipecat.runner.types import (
DailyRunnerArguments,
RunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
@@ -309,7 +310,7 @@ def _setup_webrtc_routes(
):
"""Mimic Pipecat Cloud's proxy."""
active_session = active_sessions.get(session_id)
if not active_session:
if active_session is None:
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
if path.endswith("api/offer"):
@@ -529,9 +530,9 @@ def _setup_daily_routes(app: FastAPI):
"""Set up Daily-specific routes."""
@app.get("/")
async def start_agent():
async def create_room_and_start_agent():
"""Launch a Daily bot and redirect to room."""
print("Starting bot with Daily transport")
print("Starting bot with Daily transport and redirecting to Daily room")
import aiohttp
@@ -546,11 +547,11 @@ def _setup_daily_routes(app: FastAPI):
asyncio.create_task(bot_module.bot(runner_args))
return RedirectResponse(room_url)
async def _handle_rtvi_request(request: Request):
"""Common handler for both /start and /connect endpoints.
@app.post("/start")
async def start_agent(request: Request):
"""Handler for /start endpoints.
Expects POST body like::
{
"createDailyRoom": true,
"dailyRoomProperties": { "start_video_off": true },
@@ -567,45 +568,38 @@ def _setup_daily_routes(app: FastAPI):
logger.error(f"Failed to parse request body: {e}")
request_data = {}
# Extract the body data that should be passed to the bot
# This mimics Pipecat Cloud's behavior
bot_body = request_data.get("body", {})
create_daily_room = request_data.get("createDailyRoom", False)
body = request_data.get("body", {})
# Log the extracted body data for debugging
if bot_body:
logger.info(f"Extracted body data for bot: {bot_body}")
bot_module = _get_bot_module()
existing_room_url = os.getenv("DAILY_SAMPLE_ROOM_URL")
result = None
# Configure room if:
# 1. Explicitly requested via createDailyRoom in payload
# 2. Using pre-configured room from DAILY_SAMPLE_ROOM_URL env var
if create_daily_room or existing_room_url:
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=body)
result = {
"dailyRoom": room_url,
"dailyToken": token,
"sessionId": str(uuid.uuid4()),
}
else:
logger.debug("No body data provided in request")
runner_args = RunnerArguments(body=body)
from pipecat.runner.daily import configure
# Start the bot in the background
asyncio.create_task(bot_module.bot(runner_args))
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background with extracted body data
bot_module = _get_bot_module()
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=bot_body)
asyncio.create_task(bot_module.bot(runner_args))
# Match PCC /start endpoint response format:
return {"dailyRoom": room_url, "dailyToken": token}
@app.post("/start")
async def rtvi_start(request: Request):
"""Launch a Daily bot and return connection info for RTVI clients."""
return await _handle_rtvi_request(request)
@app.post("/connect")
async def rtvi_connect(request: Request):
"""Launch a Daily bot and return connection info for RTVI clients.
.. deprecated:: 0.0.78
Use /start instead. This endpoint will be removed in a future version.
"""
logger.warning(
"DEPRECATED: /connect endpoint is deprecated. Please use /start instead. "
"This endpoint will be removed in a future version."
)
return await _handle_rtvi_request(request)
return result
def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):

View File

@@ -20,9 +20,11 @@ from fastapi import WebSocket
class RunnerArguments:
"""Base class for runner session arguments."""
handle_sigint: bool = field(init=False)
handle_sigterm: bool = field(init=False)
pipeline_idle_timeout_secs: int = field(init=False)
# Use kw_only so subclasses don't need to worry about ordering.
handle_sigint: bool = field(init=False, kw_only=True)
handle_sigterm: bool = field(init=False, kw_only=True)
pipeline_idle_timeout_secs: int = field(init=False, kw_only=True)
body: Optional[Any] = field(default_factory=dict, kw_only=True)
def __post_init__(self):
self.handle_sigint = False
@@ -42,7 +44,6 @@ class DailyRunnerArguments(RunnerArguments):
room_url: str
token: Optional[str] = None
body: Optional[Any] = field(default_factory=dict)
@dataclass
@@ -55,7 +56,6 @@ class WebSocketRunnerArguments(RunnerArguments):
"""
websocket: WebSocket
body: Optional[Any] = field(default_factory=dict)
@dataclass

View File

@@ -720,11 +720,11 @@ class AWSBedrockLLMService(LLMService):
additional_model_request_fields: Additional model-specific parameters.
"""
max_tokens: Optional[int] = Field(default_factory=lambda: 4096, ge=1)
temperature: Optional[float] = Field(default_factory=lambda: 0.7, ge=0.0, le=1.0)
top_p: Optional[float] = Field(default_factory=lambda: 0.999, ge=0.0, le=1.0)
max_tokens: Optional[int] = Field(default=None, ge=1)
temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0)
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
stop_sequences: Optional[List[str]] = Field(default_factory=lambda: [])
latency: Optional[str] = Field(default_factory=lambda: "standard")
latency: Optional[str] = Field(default=None)
additional_model_request_fields: Optional[Dict[str, Any]] = Field(default_factory=dict)
def __init__(
@@ -801,6 +801,24 @@ class AWSBedrockLLMService(LLMService):
"""
return True
def _build_inference_config(self) -> Dict[str, Any]:
"""Build inference config with only the parameters that are set.
This prevents conflicts with models (e.g., Claude Sonnet 4.5) that don't
allow certain parameter combinations like temperature and top_p together.
Returns:
Dictionary containing only the inference parameters that are not None.
"""
inference_config = {}
if self._settings["max_tokens"] is not None:
inference_config["maxTokens"] = self._settings["max_tokens"]
if self._settings["temperature"] is not None:
inference_config["temperature"] = self._settings["temperature"]
if self._settings["top_p"] is not None:
inference_config["topP"] = self._settings["top_p"]
return inference_config
async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]:
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
@@ -826,16 +844,16 @@ class AWSBedrockLLMService(LLMService):
model_id = self.model_name
# Prepare request parameters
inference_config = self._build_inference_config()
request_params = {
"modelId": model_id,
"messages": messages,
"inferenceConfig": {
"maxTokens": 8192,
"temperature": 0.7,
"topP": 0.9,
},
}
if inference_config:
request_params["inferenceConfig"] = inference_config
if system:
request_params["system"] = system
@@ -974,21 +992,20 @@ class AWSBedrockLLMService(LLMService):
tools = params_from_context["tools"]
tool_choice = params_from_context["tool_choice"]
# Set up inference config
inference_config = {
"maxTokens": self._settings["max_tokens"],
"temperature": self._settings["temperature"],
"topP": self._settings["top_p"],
}
# Set up inference config - only include parameters that are set
inference_config = self._build_inference_config()
# Prepare request parameters
request_params = {
"modelId": self.model_name,
"messages": messages,
"inferenceConfig": inference_config,
"additionalModelRequestFields": self._settings["additional_model_request_fields"],
}
# Only add inference config if it has parameters
if inference_config:
request_params["inferenceConfig"] = inference_config
# Add system message
if system:
request_params["system"] = system

View File

@@ -8,8 +8,77 @@
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
.. deprecated:: 0.0.91
AWS Nova Sonic no longer uses types from this module under the hood.
It now uses `LLMContext` and `LLMContextAggregatorPair`.
Using the new patterns should allow you to not need types from this module.
BEFORE:
```
# Setup
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# Context frame type
frame: OpenAILLMContextFrame
# Context type
context: AWSNovaSonicLLMContext
# or
context: OpenAILLMContext
```
AFTER:
```
# Setup
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# Context frame type
frame: LLMContextFrame
# Context type
context: LLMContext
```
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws.nova_sonic.context (or "
"pipecat.services.aws_nova_sonic.context) are deprecated. \n"
"AWS Nova Sonic no longer uses types from this module under the hood. \n"
"It now uses `LLMContext` and `LLMContextAggregatorPair`. \n"
"Using the new patterns should allow you to not need types from this module.\n\n"
"BEFORE:\n"
"```\n"
"# Setup\n"
"context = OpenAILLMContext(messages, tools)\n"
"context_aggregator = llm.create_context_aggregator(context)\n\n"
"# Context frame type\n"
"frame: OpenAILLMContextFrame\n\n"
"# Context type\n"
"context: AWSNovaSonicLLMContext\n"
"# or\n"
"context: OpenAILLMContext\n\n"
"```\n\n"
"AFTER:\n"
"```\n"
"# Setup\n"
"context = LLMContext(messages, tools)\n"
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
"# Context frame type\n"
"frame: LLMContextFrame\n\n"
"# Context type\n"
"context: LLMContext\n\n"
"```",
DeprecationWarning,
stacklevel=2,
)
import copy
from dataclasses import dataclass, field
from enum import Enum

View File

@@ -25,7 +25,7 @@ from loguru import logger
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
@@ -33,35 +33,30 @@ from pipecat.frames.frames import (
Frame,
FunctionCallFromLLM,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.context import (
AWSNovaSonicAssistantContextAggregator,
AWSNovaSonicContextAggregatorPair,
AWSNovaSonicLLMContext,
AWSNovaSonicUserContextAggregator,
Role,
)
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.llm_service import LLMService
from pipecat.utils.time import time_now_iso8601
@@ -217,6 +212,11 @@ class AWSNovaSonicLLMService(LLMService):
system_instruction: System-level instruction for the model.
tools: Available tools/functions for the model to use.
send_transcription_frames: Whether to emit transcription frames.
.. deprecated:: 0.0.91
This parameter is deprecated and will be removed in a future version.
Transcription frames are always sent.
**kwargs: Additional arguments passed to the parent LLMService.
"""
super().__init__(**kwargs)
@@ -230,8 +230,20 @@ class AWSNovaSonicLLMService(LLMService):
self._params = params or Params()
self._system_instruction = system_instruction
self._tools = tools
self._send_transcription_frames = send_transcription_frames
self._context: Optional[AWSNovaSonicLLMContext] = None
if not send_transcription_frames:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`send_transcription_frames` is deprecated and will be removed in a future version. "
"Transcription frames are always sent.",
DeprecationWarning,
stacklevel=2,
)
self._context: Optional[LLMContext] = None
self._stream: Optional[
DuplexEventStream[
InvokeModelWithBidirectionalStreamInput,
@@ -244,12 +256,17 @@ class AWSNovaSonicLLMService(LLMService):
self._input_audio_content_name: Optional[str] = None
self._content_being_received: Optional[CurrentContent] = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time: Optional[float] = None
self._wants_connection = False
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
@@ -302,12 +319,12 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Resetting conversation")
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
# Carry over previous context through disconnect
# Grab context to carry through disconnect/reconnect
context = self._context
await self._disconnect()
self._context = context
await self._disconnect()
await self._start_connecting()
await self._handle_context(context)
#
# frame processing
@@ -322,28 +339,35 @@ class AWSNovaSonicLLMService(LLMService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
await self._handle_context(frame.context)
elif isinstance(frame, LLMContextFrame):
raise NotImplementedError(
"Universal LLMContext is not yet supported for AWS Nova Sonic."
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
context = (
frame.context
if isinstance(frame, LLMContextFrame)
else LLMContext.from_openai_context(frame.context)
)
await self._handle_context(context)
elif isinstance(frame, InputAudioRawFrame):
await self._handle_input_audio_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
elif isinstance(frame, AWSNovaSonicFunctionCallResultFrame):
await self._handle_function_call_result(frame)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption_frame()
await self.push_frame(frame, direction)
async def _handle_context(self, context: OpenAILLMContext):
async def _handle_context(self, context: LLMContext):
if self._disconnecting:
return
if not self._context:
# We got our initial context - try to finish connecting
self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(
context, self._system_instruction
)
# We got our initial context
# Try to finish connecting
self._context = context
await self._finish_connecting_if_context_available()
else:
# We got an updated context
# Send results for any newly-completed function calls
await self._process_completed_function_calls(send_new_results=True)
async def _handle_input_audio_frame(self, frame: InputAudioRawFrame):
# Wait until we're done sending the assistant response trigger audio before sending audio
@@ -393,9 +417,9 @@ class AWSNovaSonicLLMService(LLMService):
else:
await finalize_assistant_response()
async def _handle_function_call_result(self, frame: AWSNovaSonicFunctionCallResultFrame):
result = frame.result_frame
await self._send_tool_result(tool_call_id=result.tool_call_id, result=result.result)
async def _handle_interruption_frame(self):
if self._assistant_is_responding:
self._may_need_repush_assistant_text = True
#
# LLM communication: lifecycle
@@ -431,6 +455,17 @@ class AWSNovaSonicLLMService(LLMService):
logger.error(f"{self} initialization error: {e}")
await self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
# Check for set of completed function calls in the context
for message in self._context.get_messages():
if message.get("role") and message.get("content") != "IN_PROGRESS":
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
# Found a newly-completed function call - send the result to the service
if send_new_results:
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to
# send it
@@ -439,30 +474,38 @@ class AWSNovaSonicLLMService(LLMService):
logger.info("Finishing connecting (setting up session)...")
# Initialize our bookkeeping of already-completed tool calls in the
# context
await self._process_completed_function_calls(send_new_results=False)
# Read context
history = self._context.get_messages_for_initializing_history()
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
llm_connection_params = adapter.get_llm_invocation_params(self._context)
# Send prompt start event, specifying tools.
# Tools from context take priority over self._tools.
tools = (
self._context.tools
if self._context.tools
else self.get_llm_adapter().from_standard_tools(self._tools)
llm_connection_params["tools"]
if llm_connection_params["tools"]
else adapter.from_standard_tools(self._tools)
)
logger.debug(f"Using tools: {tools}")
await self._send_prompt_start_event(tools)
# Send system instruction.
# Instruction from context takes priority over self._system_instruction.
# (NOTE: this prioritizing occurred automatically behind the scenes: the context was
# initialized with self._system_instruction and then updated itself from its messages when
# get_messages_for_initializing_history() was called).
logger.debug(f"Using system instruction: {history.system_instruction}")
if history.system_instruction:
await self._send_text_event(text=history.system_instruction, role=Role.SYSTEM)
system_instruction = (
llm_connection_params["system_instruction"]
if llm_connection_params["system_instruction"]
else self._system_instruction
)
logger.debug(f"Using system instruction: {system_instruction}")
if system_instruction:
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
# Send conversation history
for message in history.messages:
for message in llm_connection_params["messages"]:
# logger.debug(f"Seeding conversation history with message: {message}")
await self._send_text_event(text=message.text, role=message.role)
# Start audio input
@@ -492,9 +535,12 @@ class AWSNovaSonicLLMService(LLMService):
await self._send_session_end_events()
self._client = None
# Clean up context
self._context = None
# Clean up stream
if self._stream:
await self._stream.input_stream.close()
await self._stream.close()
self._stream = None
# NOTE: see explanation of HACK, below
@@ -510,15 +556,23 @@ class AWSNovaSonicLLMService(LLMService):
self._receive_task = None
# Reset remaining connection-specific state
# Should be all private state except:
# - _wants_connection
# - _assistant_response_trigger_audio
self._prompt_name = None
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time = None
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
logger.info("Finished disconnecting")
except Exception as e:
@@ -826,6 +880,10 @@ class AWSNovaSonicLLMService(LLMService):
# Handle the LLM completion ending
await self._handle_completion_end_event(event_json)
except Exception as e:
if self._disconnecting:
# Errors are kind of expected while disconnecting, so just
# ignore them and do nothing
return
logger.error(f"{self} error processing responses: {e}")
if self._wants_connection:
await self.reset_conversation()
@@ -956,7 +1014,7 @@ class AWSNovaSonicLLMService(LLMService):
async def _report_assistant_response_started(self):
logger.debug("Assistant response started")
# Report that the assistant has started their response.
# Report the start of the assistant response.
await self.push_frame(LLMFullResponseStartFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) started
@@ -968,23 +1026,16 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"Assistant response text added: {text}")
# Report some text added to the ongoing assistant response
await self.push_frame(LLMTextFrame(text))
# Report some text added to the *equivalent* of TTS (this is a speech-to-speech model)
# Report the text of the assistant response.
await self.push_frame(TTSTextFrame(text))
# TODO: this is a (hopefully temporary) HACK. Here we directly manipulate the context rather
# than relying on the frames pushed to the assistant context aggregator. The pattern of
# receiving full-sentence text after the assistant has spoken does not easily fit with the
# Pipecat expectation of chunks of text streaming in while the assistant is speaking.
# Interruption handling was especially challenging. Rather than spend days trying to fit a
# square peg in a round hole, I decided on this hack for the time being. We can most cleanly
# abandon this hack if/when AWS Nova Sonic implements streaming smaller text chunks
# interspersed with audio. Note that when we move away from this hack, we need to make sure
# that on an interruption we avoid sending LLMFullResponseEndFrame, which gets the
# LLMAssistantContextAggregator into a bad state.
self._context.buffer_assistant_text(text)
# HACK: here we're also buffering the assistant text ourselves as a
# backup rather than relying solely on the assistant context aggregator
# to do it, because the text arrives from Nova Sonic only after all the
# assistant audio frames have been pushed, meaning that if an
# interruption frame were to arrive we would lose all of it (the text
# frames sitting in the queue would be wiped).
self._assistant_text_buffer += text
async def _report_assistant_response_ended(self):
if not self._context: # should never happen
@@ -992,14 +1043,34 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Assistant response ended")
# Report that the assistant has finished their response.
# If an interruption frame arrived while the assistant was responding
# we may have lost all of the assistant text (see HACK, above), so
# re-push it downstream to the aggregator now.
if self._may_need_repush_assistant_text:
# Just in case, check that assistant text hasn't already made it
# into the context (sometimes it does, despite the interruption).
messages = self._context.get_messages()
last_message = messages[-1] if messages else None
if (
not last_message
or last_message.get("role") != "assistant"
or last_message.get("content") != self._assistant_text_buffer
):
# We also need to re-push the LLMFullResponseStartFrame since the
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSTextFrame(self._assistant_text_buffer))
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
await self.push_frame(LLMFullResponseEndFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())
# For an explanation of this hack, see _report_assistant_response_text_added.
self._context.flush_aggregated_assistant_text()
# Clear out the buffered assistant text
self._assistant_text_buffer = ""
#
# user transcription reporting
@@ -1016,33 +1087,67 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"User transcription text added: {text}")
# Manually add new user transcription text to context.
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
self._context.buffer_user_text(text)
# Report that some new user transcription text is available.
if self._send_transcription_frames:
await self.push_frame(
InterimTranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601())
)
# HACK: here we're buffering the user text ourselves rather than
# relying on the upstream user context aggregator to do it, because the
# text arrives in fairly large chunks spaced fairly far apart in time.
# That means the user text would be split between different messages in
# context. Even if we sent placeholder InterimTranscriptionFrames in
# between each TranscriptionFrame to tell the aggregator to hold off on
# finalizing the user message, the aggregator would likely get the last
# chunk too late.
self._user_text_buffer += f" {text}" if self._user_text_buffer else text
async def _report_user_transcription_ended(self):
if not self._context: # should never happen
return
# Manually add user transcription to context (if any has been buffered).
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
transcription = self._context.flush_aggregated_user_text()
if not transcription:
return
logger.debug(f"User transcription ended")
if self._send_transcription_frames:
await self.push_frame(
TranscriptionFrame(text=transcription, user_id="", timestamp=time_now_iso8601())
# Report to the upstream user context aggregator that some new user
# transcription text is available.
# HACK: Check if this transcription was triggered by our own
# assistant response trigger. If so, we need to wrap it with
# UserStarted/StoppedSpeakingFrames; otherwise the user aggregator
# would fire an EmulatedUserStartedSpeakingFrame, which would
# trigger an interruption, which would prevent us from writing the
# assistant response to context.
#
# Sending an EmulateUserStartedSpeakingFrame ourselves doesn't
# work: it just causes the interruption we're trying to avoid.
#
# Setting enable_emulated_vad_interruptions also doesn't work: at
# the time the user aggregator receives the TranscriptionFrame, it
# doesn't yet know the assistant has started responding, so it
# doesn't know that emulating the user starting to speak would
# cause an interruption.
should_wrap_in_user_started_stopped_speaking_frames = (
self._waiting_for_trigger_transcription
and self._user_text_buffer.strip().lower() == "ready"
)
# Start wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
logger.debug(
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
)
await self.push_frame(UserStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
# Send the transcription upstream for the user context aggregator
frame = TranscriptionFrame(
text=self._user_text_buffer, user_id="", timestamp=time_now_iso8601()
)
await self.push_frame(frame, direction=FrameDirection.UPSTREAM)
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
await self.push_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
# Clear out the buffered user text
self._user_text_buffer = ""
# We're no longer waiting for a trigger transcription
self._waiting_for_trigger_transcription = False
#
# context
@@ -1054,23 +1159,26 @@ class AWSNovaSonicLLMService(LLMService):
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> AWSNovaSonicContextAggregatorPair:
) -> LLMContextAggregatorPair:
"""Create context aggregator pair for managing conversation context.
NOTE: this method exists only for backward compatibility. New code
should instead do:
context = LLMContext(...)
context_aggregator = LLMContextAggregatorPair(context)
Args:
context: The OpenAI LLM context to upgrade.
context: The OpenAI LLM context.
user_params: Parameters for the user context aggregator.
assistant_params: Parameters for the assistant context aggregator.
Returns:
A pair of user and assistant context aggregators.
"""
context.set_llm_adapter(self.get_llm_adapter())
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params)
return AWSNovaSonicContextAggregatorPair(user, assistant)
context = LLMContext.from_openai_context(context)
return LLMContextAggregatorPair(
context, user_params=user_params, assistant_params=assistant_params
)
#
# assistant response trigger (HACK)
@@ -1108,6 +1216,8 @@ class AWSNovaSonicLLMService(LLMService):
try:
logger.debug("Sending assistant response trigger...")
self._waiting_for_trigger_transcription = True
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
chunk_size = int(
chunk_duration

View File

@@ -8,18 +8,14 @@
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
.. deprecated:: 0.0.91
AWS Nova Sonic no longer uses types from this module under the hood.
It now uses `LLMContext` and `LLMContextAggregatorPair`.
Using the new patterns should allow you to not need types from this module.
See deprecation warning in pipecat.services.aws.nova_sonic.context for more
details.
"""
import warnings
from pipecat.services.aws.nova_sonic.context import *
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws_nova_sonic.context are deprecated. "
"Please use the equivalent types from "
"pipecat.services.aws.nova_sonic.context instead.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -1034,6 +1034,23 @@ class GoogleLLMService(LLMService):
if context:
await self._process_context(context)
async def stop(self, frame):
"""Override stop to gracefully close the client."""
await super().stop(frame)
await self._close_client()
async def cancel(self, frame):
"""Override cancel to gracefully close the client."""
await super().cancel(frame)
await self._close_client()
async def _close_client(self):
try:
await self._client.aio.aclose()
except Exception:
# Do nothing - we're shutting down anyway
pass
def create_context_aggregator(
self,
context: OpenAILLMContext,

View File

@@ -0,0 +1,12 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Public testing API for Pipecat frame processors."""
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
from .test_runner import run_test_from_file
__all__ = ["dict_to_frame", "frame_to_dict", "load_frames_from_json", "run_test_from_file"]

View File

@@ -0,0 +1,150 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Frame serialization and deserialization for testing."""
import base64
import inspect
import json
from pathlib import Path
from typing import Any, Dict, List
from pipecat.frames import frames
def _get_frame_class(frame_type: str):
"""Get a frame class by name from the frames module.
Args:
frame_type: The name of the frame class (e.g., "TextFrame")
Returns:
The frame class object
Raises:
ValueError: If the frame type is not found
"""
if not hasattr(frames, frame_type):
raise ValueError(f"Unknown frame type: {frame_type}")
cls = getattr(frames, frame_type)
if not inspect.isclass(cls) or not issubclass(cls, frames.Frame):
raise ValueError(f"{frame_type} is not a valid Frame class")
return cls
def dict_to_frame(data: Dict[str, Any]) -> frames.Frame:
"""Convert a dictionary to a Frame object.
Args:
data: Dictionary containing frame data with a "type" key
Returns:
A Frame instance
Raises:
ValueError: If frame type is missing or invalid
Example:
>>> dict_to_frame({"type": "TextFrame", "text": "hello"})
TextFrame(text="hello")
"""
if "type" not in data:
raise ValueError("Frame dictionary must contain a 'type' field")
frame_type = data["type"]
frame_cls = _get_frame_class(frame_type)
# Build kwargs from data, excluding 'type'
kwargs = {k: v for k, v in data.items() if k != "type"}
# Special handling for audio frames with base64 encoded audio
if "audio" in kwargs and isinstance(kwargs["audio"], str):
kwargs["audio"] = base64.b64decode(kwargs["audio"])
# Special handling for image frames with base64 encoded images
if "image" in kwargs and isinstance(kwargs["image"], str):
kwargs["image"] = base64.b64decode(kwargs["image"])
try:
return frame_cls(**kwargs)
except TypeError as e:
raise ValueError(f"Failed to create {frame_type}: {e}")
def load_frames_from_json(filepath: str) -> List[frames.Frame]:
"""Load frames from a JSON file.
Args:
filepath: Path to JSON file containing frame data
Returns:
List of Frame objects
Raises:
FileNotFoundError: If the file doesn't exist
ValueError: If JSON is invalid or frames cannot be deserialized
Example JSON format:
{
"input_frames": [
{"type": "TextFrame", "text": "hello"},
{"type": "EndFrame"}
]
}
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Frame file not found: {filepath}")
with open(path, "r") as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError("JSON must contain a dictionary")
if "input_frames" not in data:
raise ValueError("JSON must contain an 'input_frames' key")
frame_dicts = data["input_frames"]
if not isinstance(frame_dicts, list):
raise ValueError("'input_frames' must be a list")
return [dict_to_frame(frame_dict) for frame_dict in frame_dicts]
def frame_to_dict(frame: frames.Frame) -> Dict[str, Any]:
"""Convert a Frame object to a dictionary.
Args:
frame: Frame object to serialize
Returns:
Dictionary representation of the frame
Example:
>>> frame_to_dict(TextFrame(text="hello"))
{"type": "TextFrame", "text": "hello"}
"""
result = {"type": frame.__class__.__name__}
# Get all fields from the dataclass
if hasattr(frame, "__dataclass_fields__"):
for field_name in frame.__dataclass_fields__:
# Skip internal fields from base Frame class
if field_name in ("id", "name", "pts", "metadata", "transport_source", "transport_destination"):
continue
value = getattr(frame, field_name, None)
if value is not None:
# Special handling for bytes (audio/image data)
if isinstance(value, bytes):
result[field_name] = base64.b64encode(value).decode("utf-8")
else:
result[field_name] = value
return result

View File

@@ -0,0 +1,169 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Test runner for frame processors from JSON test files."""
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameProcessor
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
async def run_test_from_file(
processor: FrameProcessor,
test_file: str,
) -> Tuple[List[Frame], Optional[List[Dict[str, Any]]], bool]:
"""Run a processor test from a JSON test file.
Args:
processor: The frame processor to test
test_file: Path to JSON test file
Returns:
Tuple of (output_frames, expected_output, passed)
- output_frames: List of Frame objects that were output
- expected_output: List of expected frame dicts (None if not specified)
- passed: True if test passed, False if failed, None if no validation
Raises:
FileNotFoundError: If test file doesn't exist
ValueError: If test file is invalid
Example test file format:
{
"input_frames": [
{"type": "TextFrame", "text": "hello"}
],
"expected_output": [
{"type": "TextFrame"},
{"type": "EndFrame"}
]
}
"""
path = Path(test_file)
if not path.exists():
raise FileNotFoundError(f"Test file not found: {test_file}")
with open(path, "r") as f:
test_data = json.load(f)
# Load input frames
if "input_frames" not in test_data:
raise ValueError("Test file must contain 'input_frames'")
input_frames = [dict_to_frame(frame_dict) for frame_dict in test_data["input_frames"]]
# Load expected output (optional)
expected_output = test_data.get("expected_output", None)
# Run the test
# Note: run_test() only collects frames if expected_down_frames is provided,
# so we need to manually collect from the pipeline ourselves
import asyncio
from pipecat.frames.frames import EndFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.tests.utils import QueuedFrameProcessor
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.pipeline.runner import PipelineRunner
# Set up the test pipeline manually
received_down = asyncio.Queue()
received_up = asyncio.Queue()
source = QueuedFrameProcessor(
queue=received_up,
queue_direction=FrameDirection.UPSTREAM,
ignore_start=True,
)
sink = QueuedFrameProcessor(
queue=received_down,
queue_direction=FrameDirection.DOWNSTREAM,
ignore_start=True,
)
pipeline = Pipeline([source, processor, sink])
task = PipelineTask(
pipeline,
params=PipelineParams(),
observers=[],
cancel_on_idle_timeout=False,
)
async def push_frames():
await asyncio.sleep(0.01)
for frame in input_frames:
await task.queue_frame(frame)
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await asyncio.gather(runner.run(task), push_frames())
# Collect all frames from the downstream queue
downstream_frames = []
while not received_down.empty():
frame = await received_down.get()
if not isinstance(frame, EndFrame):
downstream_frames.append(frame)
# Validate if expected_output is provided
passed = None
if expected_output is not None:
passed = _validate_output(downstream_frames, expected_output)
return downstream_frames, expected_output, passed
def _validate_output(actual_frames: List[Frame], expected_output: List[Dict[str, Any]]) -> bool:
"""Validate actual output frames against expected output.
Args:
actual_frames: List of frames that were actually output
expected_output: List of expected frame specifications
Returns:
True if validation passed, False otherwise
"""
if len(actual_frames) != len(expected_output):
return False
for actual, expected in zip(actual_frames, expected_output):
# Check frame type
if "type" not in expected:
return False
expected_type = expected["type"]
if actual.__class__.__name__ != expected_type:
return False
# Check specific fields if provided
for field_name, expected_value in expected.items():
if field_name == "type":
continue
if not hasattr(actual, field_name):
return False
actual_value = getattr(actual, field_name)
# Special handling for different types
if isinstance(expected_value, str) and isinstance(actual_value, str):
# For string fields, support partial matching with "contains"
if field_name.endswith("_contains"):
base_field = field_name.replace("_contains", "")
if hasattr(actual, base_field):
actual_text = getattr(actual, base_field)
if expected_value not in actual_text:
return False
elif actual_value != expected_value:
return False
elif actual_value != expected_value:
return False
return True

View File

@@ -47,6 +47,7 @@ SENTENCE_ENDING_PUNCTUATION: FrozenSet[str] = frozenset(
"!",
"?",
";",
"",
# East Asian punctuation (Chinese (Traditional & Simplified), Japanese, Korean)
"", # Ideographic full stop
"", # Full-width question mark

View File

@@ -7,10 +7,12 @@
"""Unit tests for ServiceSwitcher and related components."""
import unittest
from dataclasses import dataclass
from pipecat.frames.frames import (
Frame,
ManuallySwitchServiceFrame,
SystemFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
@@ -52,6 +54,13 @@ class MockFrameProcessor(FrameProcessor):
self.frame_count = 0
@dataclass
class DummySystemFrame(SystemFrame):
"""A dummy system frame for testing purposes."""
text: str = ""
class TestServiceSwitcherStrategyManual(unittest.IsolatedAsyncioTestCase):
"""Test cases for ServiceSwitcherStrategyManual."""
@@ -140,14 +149,22 @@ class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
# Send some test frames
frames_to_send = [
TextFrame(text="Hello 1"),
DummySystemFrame(text="System Message 1"),
TextFrame(text="Hello 2"),
DummySystemFrame(text="System Message 2"),
TextFrame(text="Hello 3"),
]
await run_test(
switcher,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame, TextFrame, TextFrame],
expected_down_frames=[
DummySystemFrame,
DummySystemFrame,
TextFrame,
TextFrame,
TextFrame,
],
expected_up_frames=[], # Expect no error frames
)
@@ -156,7 +173,13 @@ class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
text_frames = [f for f in self.service1.processed_frames if isinstance(f, TextFrame)]
self.assertEqual(len(text_frames), 3)
# Check that other services don't receive text frames (they might get StartFrame/EndFrame)
# Only service1 should have processed the system frames
system_frames = [
f for f in self.service1.processed_frames if isinstance(f, DummySystemFrame)
]
self.assertEqual(len(system_frames), 2)
# Check that other services don't receive text frames (they still get StartFrame/EndFrame)
service2_text_frames = [
f for f in self.service2.processed_frames if isinstance(f, TextFrame)
]
@@ -166,10 +189,24 @@ class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
self.assertEqual(len(service2_text_frames), 0)
self.assertEqual(len(service3_text_frames), 0)
# Check that other services don't receive dummy system frames (they still get StartFrame/EndFrame)
service2_system_frames = [
f for f in self.service2.processed_frames if isinstance(f, DummySystemFrame)
]
service3_system_frames = [
f for f in self.service3.processed_frames if isinstance(f, DummySystemFrame)
]
self.assertEqual(len(service2_system_frames), 0)
self.assertEqual(len(service3_system_frames), 0)
# Verify the actual text frames processed
for i, frame in enumerate(text_frames):
self.assertEqual(frame.text, f"Hello {i + 1}")
# Verify the actual system frames processed
for i, frame in enumerate(system_frames):
self.assertEqual(frame.text, f"System Message {i + 1}")
async def test_service_switching(self):
"""Test that after service switching using ManuallySwitchServiceFrame, the new active service receives frames while others don't."""
switcher = ServiceSwitcher(self.services, ServiceSwitcherStrategyManual)

86
uv.lock generated
View File

@@ -410,16 +410,16 @@ wheels = [
[[package]]
name = "aws-sdk-bedrock-runtime"
version = "0.1.0"
version = "0.1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "smithy-aws-core", extra = ["eventstream", "json"], marker = "python_full_version >= '3.12'" },
{ name = "smithy-core", marker = "python_full_version >= '3.12'" },
{ name = "smithy-http", extra = ["awscrt"], marker = "python_full_version >= '3.12'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/84/e1/39971b907c83a7525bab112c9b395e1bb6d4bc23bc1712d6d7a050662217/aws_sdk_bedrock_runtime-0.1.0.tar.gz", hash = "sha256:bd062de5a48404f64e1dfe6fb8841fbbf68e8f1798c357d14eb427274cb96a2b", size = 85419, upload-time = "2025-09-29T19:40:01.855Z" }
sdist = { url = "https://files.pythonhosted.org/packages/1d/78/48574454b3cac869df67665e4a403ebfc3abfcfba2c2ff01ccfd67d55f8f/aws_sdk_bedrock_runtime-0.1.1.tar.gz", hash = "sha256:c896f99e675c3a1ab600633a07b785f3dc9fe8ab94f640b1f992b63da2dfc784", size = 82446, upload-time = "2025-10-21T20:25:25.845Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3d/e1/5b36bffe85010cdcd44730d1c2d5244653d57c002f440141d7fc3b9f1347/aws_sdk_bedrock_runtime-0.1.0-py3-none-any.whl", hash = "sha256:aac6ff47069d456ca5e23083d96a01e3e0cbc215414e6753c289d7d9efef3335", size = 78853, upload-time = "2025-09-29T19:40:00.341Z" },
{ url = "https://files.pythonhosted.org/packages/83/07/62c0b70223d178c138f29124ac2f7973a6ba803abc7735b6a01a85217f3d/aws_sdk_bedrock_runtime-0.1.1-py3-none-any.whl", hash = "sha256:c0336b377b2112cf88197d3d44302fbeb3efb1101989fa49ae55e78f49cfe345", size = 74954, upload-time = "2025-10-21T20:25:24.973Z" },
]
[[package]]
@@ -433,31 +433,31 @@ wheels = [
[[package]]
name = "awscrt"
version = "0.28.1"
version = "0.28.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a0/1c/5c9e6a7375c2a1355aadeb2d06c96c95934ec37ff29ebaab2919f59c3ff1/awscrt-0.28.1.tar.gz", hash = "sha256:70a28fd6ff3e0abb7854ea8a9133bc9e5de681a0d9bdbd8a599a23d13a448685", size = 37956730, upload-time = "2025-09-19T00:58:31.564Z" }
sdist = { url = "https://files.pythonhosted.org/packages/d4/1b/a885a699217967c3ff0e1c49ac5b1e2a050d1a8b87d1e85e958a56e3d3f5/awscrt-0.28.2.tar.gz", hash = "sha256:9715a888f2042e710dc8aeb355963a29b77e7a4cc25a14659cebd21a5fa476c1", size = 37894849, upload-time = "2025-10-14T19:06:16.867Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2d/75/dd62276f2907a9ffcf9f8f780c08ce9938bd0550a15c887db198b47f24d3/awscrt-0.28.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:47f885104065918d311102e2b08b943966717c0f3b0c5de5908d2fd08de32198", size = 3376838, upload-time = "2025-09-19T00:57:32.988Z" },
{ url = "https://files.pythonhosted.org/packages/a7/93/562709cdf13a7606548426ecc31326ba3f6839f91e98a1e9230208308afb/awscrt-0.28.1-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:3df2316e77ad88c456b7eb2c9928007d379ed892154c1969d35b98653617e576", size = 3821522, upload-time = "2025-09-19T00:57:35.456Z" },
{ url = "https://files.pythonhosted.org/packages/43/f0/6c6ff81f5a4c6d085eb450854149087bf9240c37c467c747521f47901b32/awscrt-0.28.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:3a060d930939f142345f46a344e19ffc0dada657b04d02216b8adffba550c0a0", size = 4087344, upload-time = "2025-09-19T00:57:36.62Z" },
{ url = "https://files.pythonhosted.org/packages/37/0a/71c097505add4ceea4ac05153311715acb7489cd82ec69db4570130f4698/awscrt-0.28.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:43f81ca6bfe85c38ad9765605aaaa646a1ed6fd7210dbedf67c113dd245f425e", size = 3745148, upload-time = "2025-09-19T00:57:38Z" },
{ url = "https://files.pythonhosted.org/packages/79/1b/2b02b705a47b64e6c4d401087ddd30d4ad9af70172812ae8c62fb2b7a70c/awscrt-0.28.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:fc8e2307d9dbe76842015a14701ff7e9cf2619d674621b2d55b769414e17b3fc", size = 3972439, upload-time = "2025-09-19T00:57:39.74Z" },
{ url = "https://files.pythonhosted.org/packages/f1/19/429c81c7a0d81a5edce9cc6d9a878c8b65d8b5b69fa5a2725a6e0b1380c1/awscrt-0.28.1-cp310-cp310-win32.whl", hash = "sha256:6e7b094587e5332d428300340dcc18794a1fcfa76d636f216fc0f5c8405ba604", size = 3915231, upload-time = "2025-09-19T00:57:41.096Z" },
{ url = "https://files.pythonhosted.org/packages/83/81/769ad51fc6dcfd8bf9e0aa59c252013da0eb9e32c050ecbd1fc25f71689a/awscrt-0.28.1-cp310-cp310-win_amd64.whl", hash = "sha256:ac02f10f7384fdb68187f8d5d94743a271b16fa94be81481ce7684942f6a4b35", size = 4051668, upload-time = "2025-09-19T00:57:42.696Z" },
{ url = "https://files.pythonhosted.org/packages/9e/55/0ee537d146f24d6e76eaf02d462a83c572788233603bb9bda969fbf23307/awscrt-0.28.1-cp311-abi3-macosx_10_15_universal2.whl", hash = "sha256:cb36052f9aa34e77687a8037559bbea331fc9d5d77cd71ab0cf4e6d72af73f72", size = 3376673, upload-time = "2025-09-19T00:57:43.875Z" },
{ url = "https://files.pythonhosted.org/packages/f0/54/12700a4b9545680baa3e2d4d0e543bb4775a639df56ee51cbb29b71e0947/awscrt-0.28.1-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:fc59829152a5806eb2708aca5c5084c11dd18ecbe765e03eb314d5a360eeaa62", size = 3782870, upload-time = "2025-09-19T00:57:45.737Z" },
{ url = "https://files.pythonhosted.org/packages/1d/e7/7b189ace9e187b9b55ed4a6ec9a451579b2f16bd01d402f79a19cc8e1603/awscrt-0.28.1-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d2f20bc774599b9d85ce66689415da529ddd1d2215da818e005deedc4688fe61", size = 4048789, upload-time = "2025-09-19T00:57:47.327Z" },
{ url = "https://files.pythonhosted.org/packages/9c/e0/2e5472019906dfcc5fadcdba4bad9e69dabb95bbc0c110cfe555ee8461dc/awscrt-0.28.1-cp311-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:491b8b9c73a288cfd5e0cbdac16aabb5313d5cfc33bbe461763a5ddc26624f70", size = 3687832, upload-time = "2025-09-19T00:57:48.563Z" },
{ url = "https://files.pythonhosted.org/packages/71/f2/7e05d371bb888ee9f15e83d189287838f7b6ea40dfc91eacb3acd24b8529/awscrt-0.28.1-cp311-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:4c6c7125b7e9fcc999eb685d1cace8d4f2ffc63f8f3d8ef7f77e1a97d9552863", size = 3913378, upload-time = "2025-09-19T00:57:50.185Z" },
{ url = "https://files.pythonhosted.org/packages/79/6b/a542a65a22edb85d64742970c21721e66e0f9f67911a11c7a5c3626a1b17/awscrt-0.28.1-cp311-abi3-win32.whl", hash = "sha256:1dcb33d7cf8f69881ac6ef75a5b9b40816be58678b1bb07ccbe0230281bdbc81", size = 3912809, upload-time = "2025-09-19T00:57:51.797Z" },
{ url = "https://files.pythonhosted.org/packages/df/64/16cc8a0011e3ca5dda13605befa7e6db29bfb3073c67f6e8dad90be0a8ae/awscrt-0.28.1-cp311-abi3-win_amd64.whl", hash = "sha256:670caaf556876913bcfb9d8183d43d67a6c7b52998f2f398abd1c21632a006f8", size = 4048979, upload-time = "2025-09-19T00:57:53.061Z" },
{ url = "https://files.pythonhosted.org/packages/ca/ac/debbd3a2f03c5953b56b1c3b321bab16293f857ea3005e3f7e5dded5e0b2/awscrt-0.28.1-cp313-abi3-macosx_10_15_universal2.whl", hash = "sha256:22311d25135b937ee5617e35a6554961727527dcfa3e06efdefe187a6abe65c4", size = 3375565, upload-time = "2025-09-19T00:57:54.598Z" },
{ url = "https://files.pythonhosted.org/packages/ea/4f/9388917ad45c043acd7c4ab2c28b9e2b5ddf29e21a82bfc01a7626c18c04/awscrt-0.28.1-cp313-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:e58740cf0e41552fdf7909e10814b312ab090ebe54741354a61507e0c6d4ebfd", size = 3775366, upload-time = "2025-09-19T00:57:56.238Z" },
{ url = "https://files.pythonhosted.org/packages/8a/e3/3ef301cdef76b22ce14b041e04c6cf65ba4491d00e9f5b400c0699f6c63e/awscrt-0.28.1-cp313-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:9e69f163a207a8b172abbfea1f51045301ed1ac8bbaf76958a6b5e81d72e5b89", size = 4043403, upload-time = "2025-09-19T00:57:57.4Z" },
{ url = "https://files.pythonhosted.org/packages/60/9c/4f89922333724c4da851752549ca97dd147420734ef6c4ece56d5dd65e09/awscrt-0.28.1-cp313-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:592f4b234ecafa6cde86e55e42c4fe84c4e1ffe9fb11b0a8b8f0ffb8c62fa2cc", size = 3678742, upload-time = "2025-09-19T00:57:59.055Z" },
{ url = "https://files.pythonhosted.org/packages/0e/d4/adb97ba5f888ed201aa1f9e9f8d6cfc0dbaf80f0e937b3acb7411febdaa8/awscrt-0.28.1-cp313-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:b16321f1d2bf5b4991a213059c1b5dc07954edfc424d154b093824465ec94ce2", size = 3908438, upload-time = "2025-09-19T00:58:00.71Z" },
{ url = "https://files.pythonhosted.org/packages/41/ac/600ea0a6f4ba6543c50417c8e78b09f2cd73dd0f0d4c3e9e52220a8badbe/awscrt-0.28.1-cp313-abi3-win32.whl", hash = "sha256:3e0a23635aa75b4af163ff9bf5a0873928369b1ac32c8b1351741a95472ccf71", size = 3907625, upload-time = "2025-09-19T00:58:03.235Z" },
{ url = "https://files.pythonhosted.org/packages/9e/24/d22c7197b1e53c76b5eb71d640a4728b9b7621075d8dbcc054e16b5b98f0/awscrt-0.28.1-cp313-abi3-win_amd64.whl", hash = "sha256:9849c88ca0830396724acf988e2759895118fe7dd2a23dab21978c8600d01a11", size = 4043878, upload-time = "2025-09-19T00:58:04.595Z" },
{ url = "https://files.pythonhosted.org/packages/73/b4/1a566e493bdfa6e918ba78bcd2e45dda99a25407a4fd974db2666228d154/awscrt-0.28.2-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:bec19c0dd780293a26c809aabb9f7675b28cb3a1bf05b4a5bc9f28d5ced75a81", size = 3380735, upload-time = "2025-10-14T19:05:16.58Z" },
{ url = "https://files.pythonhosted.org/packages/1f/53/6602a87aead1d413c7bd77d059b301745146635cda99ee2a61ec0d23691e/awscrt-0.28.2-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:01f33076759ba6285f25ccc6016355607df2e715d0bab3a1ef2416b87a6c3ade", size = 3827084, upload-time = "2025-10-14T19:05:19.335Z" },
{ url = "https://files.pythonhosted.org/packages/d8/62/61fe39ae5950ad00e10dcbf6e4f4f344dc93957757160c0000390331a11b/awscrt-0.28.2-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:2b5c807b9972795ce54c05aea6918c60983c51d879ebbff7a67adb8b0d28a121", size = 4092678, upload-time = "2025-10-14T19:05:20.8Z" },
{ url = "https://files.pythonhosted.org/packages/25/7d/e38f18cfb203e8f09842c0e3f422992887ce285ecc3bf18816d559a13c80/awscrt-0.28.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:bf4ff9c8c6a233246320c2d41d939b6e25cdae97728d827186e4771a9edda688", size = 3749978, upload-time = "2025-10-14T19:05:22.16Z" },
{ url = "https://files.pythonhosted.org/packages/16/6f/e8a3c0daed8f7b60c76fc2721bd4e83580ddecace24e0cb0ebb99564f699/awscrt-0.28.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0c738b83b66d1a8b43089556247fbe4adf2b73d610c7938d3bae1718a0fe8b1d", size = 3977237, upload-time = "2025-10-14T19:05:23.368Z" },
{ url = "https://files.pythonhosted.org/packages/92/3d/8400203f02dd924bcc8255703179b0c26efd03c84f838db6f026fcef9ba6/awscrt-0.28.2-cp310-cp310-win32.whl", hash = "sha256:23c30004c736a2f826a32c9720f1ccf71e8e4deb8535da5915d6073604853098", size = 3919413, upload-time = "2025-10-14T19:05:24.477Z" },
{ url = "https://files.pythonhosted.org/packages/c0/5e/b5ccf377880a70425b100f1e5f5ba516ff75e291585b3dc129239fbd1ec3/awscrt-0.28.2-cp310-cp310-win_amd64.whl", hash = "sha256:859ae8a195d51f15b631147d6792953a563bfe0a1cc7a75b6750977634de54b8", size = 4056024, upload-time = "2025-10-14T19:05:25.956Z" },
{ url = "https://files.pythonhosted.org/packages/ed/79/94e9f0ee7c60ec6233c7ad6293589c56d5145172e49eb5328eda37d3fdd1/awscrt-0.28.2-cp311-abi3-macosx_10_15_universal2.whl", hash = "sha256:025eab99b58586d8c95f8fafe1f4695ad477eda20d1207240ee4f8ee79742059", size = 3381061, upload-time = "2025-10-14T19:05:27.187Z" },
{ url = "https://files.pythonhosted.org/packages/2d/b8/0da80dd58682ddf3ec204e877d5891198654647c085e65b6b8eacd214edb/awscrt-0.28.2-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:e5c18d035d6cd92228e1db2f043517c1bcf9e0f6430c0af60cc34257dcca092c", size = 3788011, upload-time = "2025-10-14T19:05:28.768Z" },
{ url = "https://files.pythonhosted.org/packages/d6/d2/f51cf4364364399fe90d557e2fed14c1f114720191a5825898b1242bd607/awscrt-0.28.2-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c75f077e90d0220a49b75a9bca914e5aa1a3c8f28af6bce4d0332be0b98dd3cb", size = 4055226, upload-time = "2025-10-14T19:05:30.054Z" },
{ url = "https://files.pythonhosted.org/packages/41/47/0fde8738a8c76de278ce431d8468ef18aeaca424329decca9ad5092df812/awscrt-0.28.2-cp311-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:1432c5c59a7e36b33eb2746cfbf30058f19ed43f2c117863897681f70bc246ba", size = 3692839, upload-time = "2025-10-14T19:05:31.471Z" },
{ url = "https://files.pythonhosted.org/packages/18/25/cb3762f6b47fe503eea7f337eca7cfd044ab28bcc2452fbf298c6492ec8b/awscrt-0.28.2-cp311-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:f96703c30b22ba1e43e1bb2fe996ac7af513bea411c54dbf09a3a1af329b9a76", size = 3918023, upload-time = "2025-10-14T19:05:33.162Z" },
{ url = "https://files.pythonhosted.org/packages/95/0a/0b609acd45dbb83c04c7ecb8c7c789f5c15bbdd422129360bde093bc4a99/awscrt-0.28.2-cp311-abi3-win32.whl", hash = "sha256:3e94f63497b454d30892d7a7ce917a451c6f33590964d3a475d93f93b20083b6", size = 3917048, upload-time = "2025-10-14T19:05:34.745Z" },
{ url = "https://files.pythonhosted.org/packages/d1/38/bf33abd6d09c8572f8e09488db2b0a60124767d7f5d6d9a33cf8b051b7af/awscrt-0.28.2-cp311-abi3-win_amd64.whl", hash = "sha256:3e094772b1f6fd0f8c5f7cf37655d0984739f99493f66f534979a2a7bb7fc9f6", size = 4052877, upload-time = "2025-10-14T19:05:36.01Z" },
{ url = "https://files.pythonhosted.org/packages/10/71/4be198e472d95702434cee1f9dd889c56e22bea8554b466fad754148fd24/awscrt-0.28.2-cp313-abi3-macosx_10_15_universal2.whl", hash = "sha256:5fda9e7d0eb800491fadebe2b6c2560ac2f5742b60f4106440dca4b49da7fb03", size = 3379585, upload-time = "2025-10-14T19:05:37.225Z" },
{ url = "https://files.pythonhosted.org/packages/43/09/77084249d07dca71352341ad3fbcfa75deaccf25bd65f9fdbb36ce1f978b/awscrt-0.28.2-cp313-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:994a795bdc83344922a15891abb30155ec292093e856eef3929dd63dd6cadaca", size = 3779843, upload-time = "2025-10-14T19:05:38.774Z" },
{ url = "https://files.pythonhosted.org/packages/a6/bb/fcee9365e58e5860582398317571a9a5517da258cd81c3d987b9882f61d4/awscrt-0.28.2-cp313-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:28537c4517168927ef74aa007a2e0c9f436921227934d82da31e9a1cec7e0c4a", size = 4049154, upload-time = "2025-10-14T19:05:40.301Z" },
{ url = "https://files.pythonhosted.org/packages/ba/8e/ac92b2707dbe05e56d0dd5af73cb4e07a3da4aee66936071123966523759/awscrt-0.28.2-cp313-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:b9fc6be63832da3ff244d56c7d9a43326d89d79e68162419c35f33e6ad033be0", size = 3683672, upload-time = "2025-10-14T19:05:41.536Z" },
{ url = "https://files.pythonhosted.org/packages/ef/d0/15308ec37e762691f5d1871b0f1a6e462da8e421c6c38d6724e3cf0994b2/awscrt-0.28.2-cp313-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:efb57103a368de1d33148cb70a382c4f82ac376c744de9484e0f621cef8313f3", size = 3912823, upload-time = "2025-10-14T19:05:43.781Z" },
{ url = "https://files.pythonhosted.org/packages/bc/cd/7693b1d72069908b7a3ee30e4ef2b5fc8f54948a96397729277cb0b0c7b4/awscrt-0.28.2-cp313-abi3-win32.whl", hash = "sha256:594dc61f4f0c1c9fb7292364d25c21810b3608cd67c0de78a032ad48f7bfd88c", size = 3911514, upload-time = "2025-10-14T19:05:45.019Z" },
{ url = "https://files.pythonhosted.org/packages/93/d6/5d8545c967690f03d55d44ed56ceff26d88363cd7d0435fd80a1c843ac2a/awscrt-0.28.2-cp313-abi3-win_amd64.whl", hash = "sha256:a17f0ab9dc5e5301da0fb00ccc4511a136d13abbd4a9564827547333fcd7ba16", size = 4047912, upload-time = "2025-10-14T19:05:46.302Z" },
]
[[package]]
@@ -1282,13 +1282,13 @@ wheels = [
[[package]]
name = "daily-python"
version = "0.19.9"
version = "0.20.0"
source = { registry = "https://pypi.org/simple" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/22/85/6064c3225e5b190e522e8f3bc6a460efc5e3e6632f16fd5f9799c44ba57a/daily_python-0.19.9-cp37-abi3-macosx_10_15_x86_64.whl", hash = "sha256:cbc558ad7d49e79b550bf7567b9ceae75e2864d4fcaf41c90377b620e38a2461", size = 13365213, upload-time = "2025-09-06T00:31:00.224Z" },
{ url = "https://files.pythonhosted.org/packages/23/58/af986c6881180a46a7b60dd418ce58d6d7c0c4ffc48d261748067c679317/daily_python-0.19.9-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:446bb9ee848d88bc68ca29a2216793c9b5ebaf5991bf604daf76f7c5a53d5919", size = 11711673, upload-time = "2025-09-06T00:31:02.526Z" },
{ url = "https://files.pythonhosted.org/packages/9d/48/1cad4c3e92cdb5ef06467d972c76a510fe5e807513334b10ad7f8c21bf74/daily_python-0.19.9-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:2facaf82b614404c642c70bbf0874fb045d8ad46400acb051470cd4df93cb4db", size = 13679393, upload-time = "2025-09-06T00:31:04.999Z" },
{ url = "https://files.pythonhosted.org/packages/3c/e9/354f4699619e83d13e266256b2352b21741ac527e3e5ab5f2264d5c482cd/daily_python-0.19.9-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:ffc205efca7b47739efd358febab17577248c8db2ebc4d17d819307a83b9eefc", size = 14221932, upload-time = "2025-09-06T00:31:07.471Z" },
{ url = "https://files.pythonhosted.org/packages/9b/02/ce81ebf11a04cd133a5539e08f85060574711fff05a1d6ad29705f0755c1/daily_python-0.20.0-cp37-abi3-macosx_10_15_x86_64.whl", hash = "sha256:7da3f1df8cd9ef7f7fcc96ce688348dc903f62d82b6dd155a53bc64b7a74f3a7", size = 13259887, upload-time = "2025-10-16T22:14:12.262Z" },
{ url = "https://files.pythonhosted.org/packages/4a/1e/51f06f3486c978e1184af2271e800ce6a6e8a8f95d61ee6624bae88ae9cd/daily_python-0.20.0-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:d02fd7b8c8079ceaa550ef23db052cdf70a8ffaf8ab6a8bc1a1e97bf0b939464", size = 11642453, upload-time = "2025-10-16T22:14:14.477Z" },
{ url = "https://files.pythonhosted.org/packages/71/c9/f767f0b479abd39330569ad61fb9db4661aae56cd74bb27c6f3483595463/daily_python-0.20.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:a5c8718982c221dc18b41fb0692c9f8435f115f72e74994c94d3b9c6dad7c534", size = 13634216, upload-time = "2025-10-16T22:14:16.235Z" },
{ url = "https://files.pythonhosted.org/packages/e8/10/5c6d7b000bee36c2a0587a092a34c7486d2de831fc8e44ed42b16a6bd99f/daily_python-0.20.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:ca9132aef1bdb5be663d1894b440dab1f998ebb3f45dfc31d44effabded4bc08", size = 14282189, upload-time = "2025-10-16T22:14:18.229Z" },
]
[[package]]
@@ -4633,11 +4633,11 @@ requires-dist = [
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = ">=1.13.0,<2" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.49.0" },
{ name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = "~=0.2.1" },
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.1.0" },
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.1.1" },
{ name = "azure-cognitiveservices-speech", marker = "extra == 'azure'", specifier = "~=1.42.0" },
{ name = "cartesia", marker = "extra == 'cartesia'", specifier = "~=2.0.3" },
{ name = "coremltools", marker = "extra == 'local-smart-turn'", specifier = ">=8.0" },
{ name = "daily-python", marker = "extra == 'daily'", specifier = "~=0.19.9" },
{ name = "daily-python", marker = "extra == 'daily'", specifier = "~=0.20.0" },
{ name = "deepgram-sdk", marker = "extra == 'deepgram'", specifier = "~=4.7.0" },
{ name = "docstring-parser", specifier = "~=0.16" },
{ name = "einops", marker = "extra == 'moondream'", specifier = "~=0.8.0" },
@@ -4708,7 +4708,7 @@ requires-dist = [
{ name = "simli-ai", marker = "extra == 'simli'", specifier = "~=0.1.10" },
{ name = "soundfile", marker = "extra == 'soundfile'", specifier = "~=0.13.0" },
{ name = "soxr", specifier = "~=0.5.0" },
{ name = "speechmatics-rt", marker = "extra == 'speechmatics'", specifier = ">=0.4.0" },
{ name = "speechmatics-rt", marker = "extra == 'speechmatics'", specifier = ">=0.5.0" },
{ name = "strands-agents", marker = "extra == 'strands'", specifier = ">=1.9.1,<2" },
{ name = "tenacity", marker = "extra == 'livekit'", specifier = ">=8.2.3,<10.0.0" },
{ name = "timm", marker = "extra == 'moondream'", specifier = "~=1.0.13" },
@@ -6483,16 +6483,16 @@ wheels = [
[[package]]
name = "smithy-aws-core"
version = "0.1.0"
version = "0.1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aws-sdk-signers", marker = "python_full_version >= '3.12'" },
{ name = "smithy-core", marker = "python_full_version >= '3.12'" },
{ name = "smithy-http", marker = "python_full_version >= '3.12'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ec/e8/8cef48be92ed09a112c54747a4515313ba96e767e7e0118a769aeb147e07/smithy_aws_core-0.1.0.tar.gz", hash = "sha256:5f197b69ad1380e9118e1e3c9032e0e305525ef56fb4fc97dea6414281065526", size = 11135, upload-time = "2025-09-29T19:37:13.072Z" }
sdist = { url = "https://files.pythonhosted.org/packages/56/d3/f847e0fd36b95aa36ce3a4c9ce1a08e16b2aa9a56b71714045c9c924e282/smithy_aws_core-0.1.1.tar.gz", hash = "sha256:78dfd7040fc2bc72b6af293096642fc9a7bfd2db28ddbdf7c4110535eab9d662", size = 11196, upload-time = "2025-10-21T20:21:18.648Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/46/7e/6d05275646bc2cdf7b0749e9bd54958a4e808aafeee4d8ff2fdaa8233dc2/smithy_aws_core-0.1.0-py3-none-any.whl", hash = "sha256:a8cda4011562f45f1fc5957c3a981b6016d736178450e5f2a1586937632af487", size = 18959, upload-time = "2025-09-29T19:37:12.041Z" },
{ url = "https://files.pythonhosted.org/packages/d0/04/87cb06f0f6d664b5cffdef6d4042dd52c11c138436084d30ffdaa3543031/smithy_aws_core-0.1.1-py3-none-any.whl", hash = "sha256:0d1634f276c2999dc2a04fafef63b9d28309de50d939d1d49df952773a7063c4", size = 18963, upload-time = "2025-10-21T20:21:17.692Z" },
]
[package.optional-dependencies]
@@ -6526,14 +6526,14 @@ wheels = [
[[package]]
name = "smithy-http"
version = "0.1.0"
version = "0.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "smithy-core", marker = "python_full_version >= '3.12'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/4e/62/5ba46c7432fbb0852acf8340402879ba53bb4c009b875e1b5b2e9df844ff/smithy_http-0.1.0.tar.gz", hash = "sha256:ed44552531f594e31101f7186c7b01b508ecd38a860b45390a1cce7da700df4b", size = 28269, upload-time = "2025-09-29T19:37:18.629Z" }
sdist = { url = "https://files.pythonhosted.org/packages/3c/1c/44e99a7dfb8c39bf0c3d998accdf4573a7a3488863b90f21af260cec2d45/smithy_http-0.2.0.tar.gz", hash = "sha256:2382562fa9af326be455f14b18615f16ffe9db756e51b2a4ca0d23e1b881cff8", size = 26729, upload-time = "2025-10-21T20:21:06.146Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5b/23/d18076ea45b3000c5e9eb8ebd75a4ea1b65b5c59e5c2080a119e2679dfba/smithy_http-0.1.0-py3-none-any.whl", hash = "sha256:7657aaf4b9e025cb9d317406f417b49cf19fba9d1b2ab4f5e6d9dc5a2dd7cdba", size = 38995, upload-time = "2025-09-29T19:37:17.506Z" },
{ url = "https://files.pythonhosted.org/packages/d4/e2/d475fad81ac74ec0e145cb6d72afe5ecde4e2358bd632c2fd5d3f4bc87dc/smithy_http-0.2.0-py3-none-any.whl", hash = "sha256:49ee2402d7737798d70f99f491fbfb2a5767283ae562e21b6f86e3fd14f3e3e0", size = 37328, upload-time = "2025-10-21T20:21:05.362Z" },
]
[package.optional-dependencies]
@@ -6619,14 +6619,14 @@ wheels = [
[[package]]
name = "speechmatics-rt"
version = "0.4.2"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/17/2e/d694390d58b9b6807280441d1275856f5a316c3e8a815c2037502636bbea/speechmatics_rt-0.4.2.tar.gz", hash = "sha256:c0f7ed34442b0f505a12d1b19c8cc8dc2cc0b1a423aeb5669ca0738fc5e59f0d", size = 26142, upload-time = "2025-09-30T10:50:36.804Z" }
sdist = { url = "https://files.pythonhosted.org/packages/57/26/10359e1f16c2aa6a198eb11a9056f4a86a8bb8d4e610bbbe4a118b227b59/speechmatics_rt-0.5.0.tar.gz", hash = "sha256:ca974a186a012f946fd997deeaf3bf1c4f203f6d6e05a866172d27709183afc8", size = 26832, upload-time = "2025-10-15T15:54:25.695Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9a/c7/2cd551c71e14256ca463f31feec17f466b57c2730d636e20803e7a541104/speechmatics_rt-0.4.2-py3-none-any.whl", hash = "sha256:70b91ff750e2f7516eaf1839d39f7a8ac65ff6665638b837cf67bab9cc9967bc", size = 32131, upload-time = "2025-09-30T10:50:35.656Z" },
{ url = "https://files.pythonhosted.org/packages/47/2e/9931ebe9360e9d385c68826b33137c2c9a4cfa361cd929d1ac6e72ebfe53/speechmatics_rt-0.5.0-py3-none-any.whl", hash = "sha256:58151488f891fa00cf7054f0cfab1b1eb94b55c3441be587f7941c726caef991", size = 32850, upload-time = "2025-10-15T15:54:24.5Z" },
]
[[package]]