Compare commits
28 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0c43de13f | ||
|
|
cc4c96d099 | ||
|
|
788465cb04 | ||
|
|
db934eade0 | ||
|
|
0b8c966a11 | ||
|
|
5849485bc6 | ||
|
|
459af58540 | ||
|
|
576bd67e85 | ||
|
|
1e8629bf96 | ||
|
|
776a3526f9 | ||
|
|
2ced044418 | ||
|
|
151f187837 | ||
|
|
67afa718d0 | ||
|
|
d1f1b68b71 | ||
|
|
a479c32665 | ||
|
|
9f66b0ba41 | ||
|
|
23385ca3d2 | ||
|
|
8b24bae9c5 | ||
|
|
0502ec6c44 | ||
|
|
81645910e0 | ||
|
|
d6ab4c41b0 | ||
|
|
2f92cb8781 | ||
|
|
fbf274374c | ||
|
|
427efecf5b | ||
|
|
b3e54546ac | ||
|
|
de46631bac | ||
|
|
abf0150261 | ||
|
|
d1d74c571c |
29
CHANGELOG.md
29
CHANGELOG.md
@@ -5,10 +5,17 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
## [0.0.91] - 2025-10-21
|
||||
|
||||
### Added
|
||||
|
||||
- It is now possible to start a bot from the `/start` endpoint when using the
|
||||
runner Daily's transport. This follows the Pipecat Cloud format with
|
||||
`createDailyRoom` and `body` fields in the POST request body.
|
||||
|
||||
- Added an ellipsis character (`…`) to the end of sentence detection in the
|
||||
string utils.
|
||||
|
||||
- Expanded support for universal `LLMContext` to `AWSNovaSonicLLMService`.
|
||||
As a reminder, the context-setup pattern when using `LLMContext` is:
|
||||
|
||||
@@ -38,9 +45,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.messages
|
||||
|
||||
## AFTER:
|
||||
|
||||
# Context aggregator type
|
||||
@@ -51,9 +55,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.get_messages()
|
||||
```
|
||||
|
||||
- Added support for `bulbul:v3` model in `SarvamTTSService` and
|
||||
@@ -85,6 +86,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `RunnerArguments` now include the `body` field, so there's no need to add it
|
||||
to subclasses. Also, all `RunnerArguments` fields are now keyword-only.
|
||||
|
||||
- `CartesiaSTTService` now inherits from `WebsocketSTTService`.
|
||||
|
||||
- Package upgrades:
|
||||
@@ -106,8 +110,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where the `RTVIProcessor` was sending duplicate
|
||||
`UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` messages.
|
||||
|
||||
- Fixed an issue in `AWSBedrockLLMService` where both `temperature` and `top_p`
|
||||
were always sent together, causing conflicts with models like Claude Sonnet 4.5
|
||||
that don't allow both parameters simultaneously. The service now only includes
|
||||
inference parameters that are explicitly set, and `InputParams` defaults have
|
||||
been changed to `None` to rely on AWS Bedrock's built-in model defaults.
|
||||
|
||||
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
|
||||
to a mismatch in the _handle_transcription method's signature.
|
||||
to a mismatch in the `_handle_transcription` method's signature.
|
||||
|
||||
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
|
||||
now handled properly in `PipelineTask` making it possible to cancel an asyncio
|
||||
|
||||
@@ -44,6 +44,10 @@ Looking to build structured conversations? Check out [Pipecat Flows](https://git
|
||||
|
||||
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
|
||||
|
||||
### 🛠️ Create and deploy projects
|
||||
|
||||
Create a new project in under a minute with the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli). Then use the CLI to monitor and deploy your agent to production.
|
||||
|
||||
### 🔍 Debugging
|
||||
|
||||
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
|
||||
|
||||
200
env.example
200
env.example
@@ -4,6 +4,9 @@ AICOUSTICS_LICENSE_KEY=...
|
||||
# Anthropic
|
||||
ANTHROPIC_API_KEY=...
|
||||
|
||||
# Assembly AI
|
||||
ASSEMBLYAI_API_KEY=...
|
||||
|
||||
# Async
|
||||
ASYNCAI_API_KEY=...
|
||||
ASYNCAI_VOICE_ID=...
|
||||
@@ -21,12 +24,19 @@ AZURE_CHATGPT_API_KEY=...
|
||||
AZURE_CHATGPT_ENDPOINT=https://...
|
||||
AZURE_CHATGPT_MODEL=...
|
||||
|
||||
AZURE_REALTIME_API_KEY=...
|
||||
AZURE_REALTIME_BASE_URL=...
|
||||
|
||||
AZURE_DALLE_API_KEY=...
|
||||
AZURE_DALLE_ENDPOINT=https://...
|
||||
AZURE_DALLE_MODEL=...
|
||||
|
||||
# Cartesia
|
||||
CARTESIA_API_KEY=...
|
||||
CARTESIA_VOICE_ID=...
|
||||
|
||||
# Cerebras
|
||||
CEREBRAS_API_KEY=...
|
||||
|
||||
# Daily
|
||||
DAILY_API_KEY=...
|
||||
@@ -35,57 +45,48 @@ DAILY_SAMPLE_ROOM_URL=https://...
|
||||
# Deepgram
|
||||
DEEPGRAM_API_KEY=...
|
||||
|
||||
# DeepSeek
|
||||
DEEPSEEK_API_KEY=...
|
||||
|
||||
# ElevenLabs
|
||||
ELEVENLABS_API_KEY=...
|
||||
ELEVENLABS_VOICE_ID=...
|
||||
|
||||
# Neuphonic
|
||||
NEUPHONIC_API_KEY=...
|
||||
|
||||
# Fal
|
||||
FAL_KEY=...
|
||||
|
||||
# Fireworks
|
||||
FIREWORKS_API_KEY=...
|
||||
|
||||
# Fish Audio
|
||||
FISH_API_KEY=...
|
||||
|
||||
# Gladia
|
||||
GLADIA_API_KEY=...
|
||||
GLADIA_REGION=...
|
||||
|
||||
# Google
|
||||
GOOGLE_API_KEY=...
|
||||
GOOGLE_CLOUD_PROJECT_ID=...
|
||||
GOOGLE_TEST_CREDENTIALS=...
|
||||
GOOGLE_VERTEX_TEST_CREDENTIALS=...
|
||||
GOOGLE_CLOUD_PROJECT_ID=...
|
||||
GOOGLE_CLOUD_LOCATION=...
|
||||
GOOGLE_TEST_CREDENTIALS=...
|
||||
|
||||
# Grok
|
||||
GROK_API_KEY=...
|
||||
|
||||
# Groq
|
||||
GROQ_API_KEY=...
|
||||
|
||||
# Heygen
|
||||
HEYGEN_API_KEY=...
|
||||
|
||||
# Hume
|
||||
HUME_API_KEY=...
|
||||
HUME_VOICE_ID=...
|
||||
|
||||
# LMNT
|
||||
LMNT_API_KEY=...
|
||||
LMNT_VOICE_ID=...
|
||||
|
||||
# Perplexity
|
||||
PERPLEXITY_API_KEY=...
|
||||
|
||||
# PlayHT
|
||||
PLAYHT_USER_ID=...
|
||||
PLAYHT_API_KEY=...
|
||||
|
||||
# OpenAI
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
# OpenPipe
|
||||
OPENPIPE_API_KEY=...
|
||||
|
||||
# Tavus
|
||||
TAVUS_API_KEY=...
|
||||
TAVUS_REPLICA_ID=...
|
||||
TAVUS_PERSONA_ID=...
|
||||
|
||||
# Simli
|
||||
SIMLI_API_KEY=...
|
||||
SIMLI_FACE_ID=...
|
||||
# Inworld
|
||||
INWORLD_API_KEY=...
|
||||
|
||||
# Krisp
|
||||
KRISP_MODEL_PATH=...
|
||||
@@ -93,77 +94,100 @@ KRISP_MODEL_PATH=...
|
||||
# Krisp Viva
|
||||
KRISP_VIVA_MODEL_PATH=...
|
||||
|
||||
# DeepSeek
|
||||
DEEPSEEK_API_KEY=...
|
||||
# LiveKit
|
||||
LIVEKIT_API_KEY=...
|
||||
LIVEKIT_API_SECRET=...
|
||||
|
||||
# Groq
|
||||
GROQ_API_KEY=...
|
||||
|
||||
# Grok
|
||||
GROK_API_KEY=...
|
||||
|
||||
# Inworld
|
||||
INWORLD_API_KEY=...
|
||||
|
||||
# Together.ai
|
||||
TOGETHER_API_KEY=...
|
||||
|
||||
# Cerebras
|
||||
CEREBRAS_API_KEY=...
|
||||
|
||||
# Fish Audio
|
||||
FISH_API_KEY=...
|
||||
|
||||
# Assembly AI
|
||||
ASSEMBLYAI_API_KEY=...
|
||||
|
||||
# OpenRouter
|
||||
OPENROUTER_API_KEY=...
|
||||
|
||||
# Piper
|
||||
PIPER_BASE_URL=...
|
||||
|
||||
# Smart turn
|
||||
LOCAL_SMART_TURN_MODEL_PATH=...
|
||||
FAL_SMART_TURN_API_KEY=...
|
||||
|
||||
# Twilio
|
||||
TWILIO_ACCOUNT_SID=...
|
||||
TWILIO_AUTH_TOKEN=...
|
||||
# LMNT
|
||||
LMNT_API_KEY=...
|
||||
LMNT_VOICE_ID=...
|
||||
|
||||
# MiniMax
|
||||
MINIMAX_API_KEY=...
|
||||
MINIMAX_GROUP_ID=...
|
||||
|
||||
# Sarvam AI
|
||||
SARVAM_API_KEY=...
|
||||
|
||||
# Soniox
|
||||
SONIOX_API_KEY=
|
||||
|
||||
# Speechmatics
|
||||
SPEECHMATICS_API_KEY=...
|
||||
|
||||
# SambaNova
|
||||
SAMBANOVA_API_KEY=...
|
||||
|
||||
# Sentry
|
||||
SENTRY_DSN=...
|
||||
|
||||
# Heygen
|
||||
HEYGEN_API_KEY=...
|
||||
|
||||
# Mistral
|
||||
MISTRAL_API_KEY=...
|
||||
|
||||
# Neuphonic
|
||||
NEUPHONIC_API_KEY=...
|
||||
|
||||
# NVIDIA
|
||||
NVIDIA_API_KEY=...
|
||||
|
||||
# OpenAI
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
# OpenPipe
|
||||
OPENPIPE_API_KEY=...
|
||||
|
||||
# OpenRouter
|
||||
OPENROUTER_API_KEY=...
|
||||
|
||||
# Perplexity
|
||||
PERPLEXITY_API_KEY=...
|
||||
|
||||
# Picovoice Koala
|
||||
KOALA_ACCESS_KEY=...
|
||||
|
||||
# Piper
|
||||
PIPER_BASE_URL=...
|
||||
|
||||
# PlayHT
|
||||
PLAYHT_USER_ID=...
|
||||
PLAYHT_API_KEY=...
|
||||
|
||||
# Plivo
|
||||
PLIVO_AUTH_ID=...
|
||||
PLIVO_AUTH_TOKEN=...
|
||||
|
||||
# Qwen
|
||||
QWEN_API_KEY=...
|
||||
|
||||
# Rime
|
||||
RIME_API_KEY=...
|
||||
RIME_VOICE_ID=...
|
||||
|
||||
# SambaNova
|
||||
SAMBANOVA_API_KEY=...
|
||||
|
||||
# Sarvam AI
|
||||
SARVAM_API_KEY=...
|
||||
|
||||
# Sentry
|
||||
SENTRY_DSN=...
|
||||
|
||||
# Simli
|
||||
SIMLI_API_KEY=...
|
||||
SIMLI_FACE_ID=...
|
||||
|
||||
# Smart turn
|
||||
LOCAL_SMART_TURN_MODEL_PATH=...
|
||||
FAL_SMART_TURN_API_KEY=...
|
||||
|
||||
# Soniox
|
||||
SONIOX_API_KEY=...
|
||||
|
||||
# Speechmatics
|
||||
SPEECHMATICS_API_KEY=...
|
||||
|
||||
# Tavus
|
||||
TAVUS_API_KEY=...
|
||||
TAVUS_REPLICA_ID=...
|
||||
|
||||
# Telnyx
|
||||
TELNYX_API_KEY=...
|
||||
TELNYX_ACCOUNT_SID=...
|
||||
|
||||
# Together.ai
|
||||
TOGETHER_API_KEY=...
|
||||
|
||||
# Twilio
|
||||
TWILIO_ACCOUNT_SID=...
|
||||
TWILIO_AUTH_TOKEN=...
|
||||
|
||||
# WhatsApp
|
||||
WHATSAPP_TOKEN=
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=
|
||||
WHATSAPP_PHONE_NUMBER_ID=
|
||||
WHATSAPP_APP_SECRET=
|
||||
WHATSAPP_TOKEN=...
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...
|
||||
WHATSAPP_PHONE_NUMBER_ID=...
|
||||
WHATSAPP_APP_SECRET=...
|
||||
@@ -67,8 +67,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
llm = AWSBedrockLLMService(
|
||||
aws_region="us-west-2",
|
||||
model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"),
|
||||
model="us.anthropic.claude-haiku-4-5-20251001-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8),
|
||||
)
|
||||
|
||||
messages = [
|
||||
|
||||
@@ -1,147 +0,0 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from typing import Tuple
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMContextFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators import SentenceAggregator
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.services.azure import AzureLLMService, AzureTTSService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.fal import FalImageGenService
|
||||
from pipecat.transports.daily.transport import DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("pipecat")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
duration_minutes=10,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
tts1 = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"),
|
||||
)
|
||||
tts2 = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl",
|
||||
)
|
||||
dalle = FalImageGenService(
|
||||
params=FalImageGenService.InputParams(image_size="1024x1024"),
|
||||
aiohttp_session=session,
|
||||
key=os.getenv("FAL_KEY"),
|
||||
)
|
||||
|
||||
bot1_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a stern librarian. You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever compromise on the fundamental truth that a hot dog is a sandwich. Your responses should only be a few sentences long.",
|
||||
},
|
||||
]
|
||||
bot2_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a silly cat, and you strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences. Don't ever accept that a hot dog is a sandwich.",
|
||||
},
|
||||
]
|
||||
|
||||
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
|
||||
"""This function streams text from the LLM and uses the TTS service to convert
|
||||
that text to speech as it's received.
|
||||
"""
|
||||
source_queue = asyncio.Queue()
|
||||
sink_queue = asyncio.Queue()
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
|
||||
|
||||
await source_queue.put(LLMContextFrame(LLMContext(messages)))
|
||||
await source_queue.put(EndFrame())
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
message = ""
|
||||
all_audio = bytearray()
|
||||
while sink_queue.qsize():
|
||||
frame = sink_queue.get_nowait()
|
||||
if isinstance(frame, TextFrame):
|
||||
message += frame.text
|
||||
elif isinstance(frame, AudioFrame):
|
||||
all_audio.extend(frame.audio)
|
||||
|
||||
return (message, all_audio)
|
||||
|
||||
async def get_bot1_statement():
|
||||
message, audio = await get_text_and_audio(bot1_messages)
|
||||
|
||||
bot1_messages.append({"role": "assistant", "content": message})
|
||||
bot2_messages.append({"role": "user", "content": message})
|
||||
|
||||
return audio
|
||||
|
||||
async def get_bot2_statement():
|
||||
message, audio = await get_text_and_audio(bot2_messages)
|
||||
|
||||
bot2_messages.append({"role": "assistant", "content": message})
|
||||
bot1_messages.append({"role": "user", "content": message})
|
||||
|
||||
return audio
|
||||
|
||||
async def argue():
|
||||
for i in range(100):
|
||||
print(f"In iteration {i}")
|
||||
|
||||
bot1_description = "A woman conservatively dressed as a librarian in a library surrounded by books, cartoon, serious, highly detailed"
|
||||
|
||||
(audio1, image_data1) = await asyncio.gather(
|
||||
get_bot1_statement(), dalle.run_image_gen(bot1_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(image_data1[1], image_data1[2]),
|
||||
AudioFrame(audio1),
|
||||
]
|
||||
)
|
||||
|
||||
bot2_description = "A cat dressed in a hot dog costume, cartoon, bright colors, funny, highly detailed"
|
||||
|
||||
(audio2, image_data2) = await asyncio.gather(
|
||||
get_bot2_statement(), dalle.run_image_gen(bot2_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(image_data2[1], image_data2[2]),
|
||||
AudioFrame(audio2),
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.gather(transport.run(), argue())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
170
examples/foundational/08-custom-frame-processor.py
Normal file
170
examples/foundational/08-custom-frame-processor.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMRunFrame,
|
||||
MetricsFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def format_metrics(metrics, indent=0):
|
||||
lines = []
|
||||
tab = "\t" * indent
|
||||
|
||||
for metric in metrics:
|
||||
lines.append(tab + type(metric).__name__)
|
||||
for field, value in vars(metric).items():
|
||||
if hasattr(value, "__dict__") and not isinstance(
|
||||
value, (str, int, float, bool, type(None))
|
||||
):
|
||||
lines.append(f"{tab}\t{field}={type(value).__name__}")
|
||||
for k, v in vars(value).items():
|
||||
lines.append(f"{tab}\t\t{k}={repr(v)}")
|
||||
else:
|
||||
lines.append(f"{tab}\t{field}={repr(value)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class MetricsFrameLogger(FrameProcessor):
|
||||
"""MetricsFrameLogger formats and logs all MetericsFrames"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, MetricsFrame):
|
||||
logger.info(f"{frame.name}\n {format_metrics(frame.data)}")
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
# ALWAYS push all frames
|
||||
else:
|
||||
# SUPER IMPORTANT: always push every frame!
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
metrics_frame_processor = MetricsFrameLogger()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
metrics_frame_processor, # pretty print metrics frames
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -79,8 +79,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
llm = AWSBedrockLLMService(
|
||||
aws_region="us-west-2",
|
||||
model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"),
|
||||
model="us.anthropic.claude-haiku-4-5-20251001-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
|
||||
@@ -106,6 +106,19 @@ class LLMContext:
|
||||
self._tools: ToolsSchema | NotGiven = LLMContext._normalize_and_validate_tools(tools)
|
||||
self._tool_choice: LLMContextToolChoice | NotGiven = tool_choice
|
||||
|
||||
@property
|
||||
def messages(self) -> List[LLMContextMessage]:
|
||||
"""Get the current messages list.
|
||||
|
||||
NOTE: This is equivalent to calling `get_messages()` with no filter. If
|
||||
you want to filter out LLM-specific messages that don't pertain to your
|
||||
LLM, use `get_messages()` directly.
|
||||
|
||||
Returns:
|
||||
List of conversation messages.
|
||||
"""
|
||||
return self.get_messages()
|
||||
|
||||
def get_messages(self, llm_specific_filter: Optional[str] = None) -> List[LLMContextMessage]:
|
||||
"""Get the current messages list.
|
||||
|
||||
@@ -113,7 +126,8 @@ class LLMContext:
|
||||
llm_specific_filter: Optional filter to return LLM-specific
|
||||
messages for the given LLM, in addition to the standard
|
||||
messages. If messages end up being filtered, an error will be
|
||||
logged.
|
||||
logged; this is intended to catch accidental use of
|
||||
incompatible LLM-specific messages.
|
||||
|
||||
Returns:
|
||||
List of conversation messages.
|
||||
|
||||
@@ -1018,6 +1018,7 @@ class RTVIObserver(BaseObserver):
|
||||
|
||||
if (
|
||||
isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame))
|
||||
and (direction == FrameDirection.DOWNSTREAM)
|
||||
and self._params.user_speaking_enabled
|
||||
):
|
||||
await self._handle_interruptions(frame)
|
||||
|
||||
@@ -76,6 +76,7 @@ class DailyRoomConfig(BaseModel):
|
||||
async def configure(
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
*,
|
||||
api_key: Optional[str] = None,
|
||||
room_exp_duration: Optional[float] = 2.0,
|
||||
token_exp_duration: Optional[float] = 2.0,
|
||||
sip_caller_phone: Optional[str] = None,
|
||||
@@ -92,6 +93,7 @@ async def configure(
|
||||
|
||||
Args:
|
||||
aiohttp_session: HTTP session for making API requests.
|
||||
api_key: Daily API key.
|
||||
room_exp_duration: Room expiration time in hours.
|
||||
token_exp_duration: Token expiration time in hours.
|
||||
sip_caller_phone: Phone number or identifier for SIP display name.
|
||||
@@ -129,7 +131,7 @@ async def configure(
|
||||
config = await configure(session, room_properties=custom_props)
|
||||
"""
|
||||
# Check for required API key
|
||||
api_key = os.getenv("DAILY_API_KEY")
|
||||
api_key = api_key or os.getenv("DAILY_API_KEY")
|
||||
if not api_key:
|
||||
raise Exception(
|
||||
"DAILY_API_KEY environment variable is required. "
|
||||
|
||||
@@ -82,6 +82,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
DailyRunnerArguments,
|
||||
RunnerArguments,
|
||||
SmallWebRTCRunnerArguments,
|
||||
WebSocketRunnerArguments,
|
||||
)
|
||||
@@ -309,7 +310,7 @@ def _setup_webrtc_routes(
|
||||
):
|
||||
"""Mimic Pipecat Cloud's proxy."""
|
||||
active_session = active_sessions.get(session_id)
|
||||
if not active_session:
|
||||
if active_session is None:
|
||||
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
|
||||
|
||||
if path.endswith("api/offer"):
|
||||
@@ -529,9 +530,9 @@ def _setup_daily_routes(app: FastAPI):
|
||||
"""Set up Daily-specific routes."""
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent():
|
||||
async def create_room_and_start_agent():
|
||||
"""Launch a Daily bot and redirect to room."""
|
||||
print("Starting bot with Daily transport")
|
||||
print("Starting bot with Daily transport and redirecting to Daily room")
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -546,11 +547,11 @@ def _setup_daily_routes(app: FastAPI):
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
return RedirectResponse(room_url)
|
||||
|
||||
async def _handle_rtvi_request(request: Request):
|
||||
"""Common handler for both /start and /connect endpoints.
|
||||
@app.post("/start")
|
||||
async def start_agent(request: Request):
|
||||
"""Handler for /start endpoints.
|
||||
|
||||
Expects POST body like::
|
||||
|
||||
{
|
||||
"createDailyRoom": true,
|
||||
"dailyRoomProperties": { "start_video_off": true },
|
||||
@@ -567,45 +568,32 @@ def _setup_daily_routes(app: FastAPI):
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
# Extract the body data that should be passed to the bot
|
||||
# This mimics Pipecat Cloud's behavior
|
||||
bot_body = request_data.get("body", {})
|
||||
create_daily_room = request_data.get("createDailyRoom", False)
|
||||
body = request_data.get("body", {})
|
||||
|
||||
# Log the extracted body data for debugging
|
||||
if bot_body:
|
||||
logger.info(f"Extracted body data for bot: {bot_body}")
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
result = None
|
||||
if create_daily_room:
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=body)
|
||||
result = {
|
||||
"dailyRoom": room_url,
|
||||
"dailyToken": token,
|
||||
"sessionId": str(uuid.uuid4()),
|
||||
}
|
||||
else:
|
||||
logger.debug("No body data provided in request")
|
||||
runner_args = RunnerArguments(body=body)
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
# Start the bot in the background
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
|
||||
# Start the bot in the background with extracted body data
|
||||
bot_module = _get_bot_module()
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=bot_body)
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
# Match PCC /start endpoint response format:
|
||||
return {"dailyRoom": room_url, "dailyToken": token}
|
||||
|
||||
@app.post("/start")
|
||||
async def rtvi_start(request: Request):
|
||||
"""Launch a Daily bot and return connection info for RTVI clients."""
|
||||
return await _handle_rtvi_request(request)
|
||||
|
||||
@app.post("/connect")
|
||||
async def rtvi_connect(request: Request):
|
||||
"""Launch a Daily bot and return connection info for RTVI clients.
|
||||
|
||||
.. deprecated:: 0.0.78
|
||||
Use /start instead. This endpoint will be removed in a future version.
|
||||
"""
|
||||
logger.warning(
|
||||
"DEPRECATED: /connect endpoint is deprecated. Please use /start instead. "
|
||||
"This endpoint will be removed in a future version."
|
||||
)
|
||||
return await _handle_rtvi_request(request)
|
||||
return result
|
||||
|
||||
|
||||
def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
|
||||
|
||||
@@ -20,9 +20,11 @@ from fastapi import WebSocket
|
||||
class RunnerArguments:
|
||||
"""Base class for runner session arguments."""
|
||||
|
||||
handle_sigint: bool = field(init=False)
|
||||
handle_sigterm: bool = field(init=False)
|
||||
pipeline_idle_timeout_secs: int = field(init=False)
|
||||
# Use kw_only so subclasses don't need to worry about ordering.
|
||||
handle_sigint: bool = field(init=False, kw_only=True)
|
||||
handle_sigterm: bool = field(init=False, kw_only=True)
|
||||
pipeline_idle_timeout_secs: int = field(init=False, kw_only=True)
|
||||
body: Optional[Any] = field(default_factory=dict, kw_only=True)
|
||||
|
||||
def __post_init__(self):
|
||||
self.handle_sigint = False
|
||||
@@ -42,7 +44,6 @@ class DailyRunnerArguments(RunnerArguments):
|
||||
|
||||
room_url: str
|
||||
token: Optional[str] = None
|
||||
body: Optional[Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -55,7 +56,6 @@ class WebSocketRunnerArguments(RunnerArguments):
|
||||
"""
|
||||
|
||||
websocket: WebSocket
|
||||
body: Optional[Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -720,11 +720,11 @@ class AWSBedrockLLMService(LLMService):
|
||||
additional_model_request_fields: Additional model-specific parameters.
|
||||
"""
|
||||
|
||||
max_tokens: Optional[int] = Field(default_factory=lambda: 4096, ge=1)
|
||||
temperature: Optional[float] = Field(default_factory=lambda: 0.7, ge=0.0, le=1.0)
|
||||
top_p: Optional[float] = Field(default_factory=lambda: 0.999, ge=0.0, le=1.0)
|
||||
max_tokens: Optional[int] = Field(default=None, ge=1)
|
||||
temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
stop_sequences: Optional[List[str]] = Field(default_factory=lambda: [])
|
||||
latency: Optional[str] = Field(default_factory=lambda: "standard")
|
||||
latency: Optional[str] = Field(default=None)
|
||||
additional_model_request_fields: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
def __init__(
|
||||
@@ -801,6 +801,24 @@ class AWSBedrockLLMService(LLMService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def _build_inference_config(self) -> Dict[str, Any]:
|
||||
"""Build inference config with only the parameters that are set.
|
||||
|
||||
This prevents conflicts with models (e.g., Claude Sonnet 4.5) that don't
|
||||
allow certain parameter combinations like temperature and top_p together.
|
||||
|
||||
Returns:
|
||||
Dictionary containing only the inference parameters that are not None.
|
||||
"""
|
||||
inference_config = {}
|
||||
if self._settings["max_tokens"] is not None:
|
||||
inference_config["maxTokens"] = self._settings["max_tokens"]
|
||||
if self._settings["temperature"] is not None:
|
||||
inference_config["temperature"] = self._settings["temperature"]
|
||||
if self._settings["top_p"] is not None:
|
||||
inference_config["topP"] = self._settings["top_p"]
|
||||
return inference_config
|
||||
|
||||
async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]:
|
||||
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
|
||||
|
||||
@@ -826,16 +844,16 @@ class AWSBedrockLLMService(LLMService):
|
||||
model_id = self.model_name
|
||||
|
||||
# Prepare request parameters
|
||||
inference_config = self._build_inference_config()
|
||||
|
||||
request_params = {
|
||||
"modelId": model_id,
|
||||
"messages": messages,
|
||||
"inferenceConfig": {
|
||||
"maxTokens": 8192,
|
||||
"temperature": 0.7,
|
||||
"topP": 0.9,
|
||||
},
|
||||
}
|
||||
|
||||
if inference_config:
|
||||
request_params["inferenceConfig"] = inference_config
|
||||
|
||||
if system:
|
||||
request_params["system"] = system
|
||||
|
||||
@@ -974,21 +992,20 @@ class AWSBedrockLLMService(LLMService):
|
||||
tools = params_from_context["tools"]
|
||||
tool_choice = params_from_context["tool_choice"]
|
||||
|
||||
# Set up inference config
|
||||
inference_config = {
|
||||
"maxTokens": self._settings["max_tokens"],
|
||||
"temperature": self._settings["temperature"],
|
||||
"topP": self._settings["top_p"],
|
||||
}
|
||||
# Set up inference config - only include parameters that are set
|
||||
inference_config = self._build_inference_config()
|
||||
|
||||
# Prepare request parameters
|
||||
request_params = {
|
||||
"modelId": self.model_name,
|
||||
"messages": messages,
|
||||
"inferenceConfig": inference_config,
|
||||
"additionalModelRequestFields": self._settings["additional_model_request_fields"],
|
||||
}
|
||||
|
||||
# Only add inference config if it has parameters
|
||||
if inference_config:
|
||||
request_params["inferenceConfig"] = inference_config
|
||||
|
||||
# Add system message
|
||||
if system:
|
||||
request_params["system"] = system
|
||||
|
||||
@@ -10,7 +10,8 @@ This module provides specialized context aggregators and message handling for AW
|
||||
including conversation history management and role-specific message processing.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`.
|
||||
AWS Nova Sonic no longer uses types from this module under the hood.
|
||||
It now uses `LLMContext` and `LLMContextAggregatorPair`.
|
||||
Using the new patterns should allow you to not need types from this module.
|
||||
|
||||
BEFORE:
|
||||
@@ -26,9 +27,6 @@ including conversation history management and role-specific message processing.
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.messages
|
||||
```
|
||||
|
||||
AFTER:
|
||||
@@ -42,9 +40,6 @@ including conversation history management and role-specific message processing.
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.get_messages()
|
||||
```
|
||||
"""
|
||||
|
||||
@@ -53,8 +48,10 @@ import warnings
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.services.aws.nova_sonic.context are deprecated. \n"
|
||||
"AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`. \n"
|
||||
"Types in pipecat.services.aws.nova_sonic.context (or "
|
||||
"pipecat.services.aws_nova_sonic.context) are deprecated. \n"
|
||||
"AWS Nova Sonic no longer uses types from this module under the hood. \n"
|
||||
"It now uses `LLMContext` and `LLMContextAggregatorPair`. \n"
|
||||
"Using the new patterns should allow you to not need types from this module.\n\n"
|
||||
"BEFORE:\n"
|
||||
"```\n"
|
||||
@@ -67,8 +64,6 @@ with warnings.catch_warnings():
|
||||
"context: AWSNovaSonicLLMContext\n"
|
||||
"# or\n"
|
||||
"context: OpenAILLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```\n\n"
|
||||
"AFTER:\n"
|
||||
"```\n"
|
||||
@@ -79,9 +74,363 @@ with warnings.catch_warnings():
|
||||
"frame: LLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: LLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
import copy
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
DataFrame,
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
InterruptionFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolChoiceFrame,
|
||||
LLMSetToolsFrame,
|
||||
TextFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
|
||||
from pipecat.services.openai.llm import (
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
|
||||
|
||||
class Role(Enum):
|
||||
"""Roles supported in AWS Nova Sonic conversations.
|
||||
|
||||
Parameters:
|
||||
SYSTEM: System-level messages (not used in conversation history).
|
||||
USER: Messages sent by the user.
|
||||
ASSISTANT: Messages sent by the assistant.
|
||||
TOOL: Messages sent by tools (not used in conversation history).
|
||||
"""
|
||||
|
||||
SYSTEM = "SYSTEM"
|
||||
USER = "USER"
|
||||
ASSISTANT = "ASSISTANT"
|
||||
TOOL = "TOOL"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicConversationHistoryMessage:
|
||||
"""A single message in AWS Nova Sonic conversation history.
|
||||
|
||||
Parameters:
|
||||
role: The role of the message sender (USER or ASSISTANT only).
|
||||
text: The text content of the message.
|
||||
"""
|
||||
|
||||
role: Role # only USER and ASSISTANT
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicConversationHistory:
|
||||
"""Complete conversation history for AWS Nova Sonic initialization.
|
||||
|
||||
Parameters:
|
||||
system_instruction: System-level instruction for the conversation.
|
||||
messages: List of conversation messages between user and assistant.
|
||||
"""
|
||||
|
||||
system_instruction: str = None
|
||||
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
|
||||
|
||||
|
||||
class AWSNovaSonicLLMContext(OpenAILLMContext):
|
||||
"""Specialized LLM context for AWS Nova Sonic service.
|
||||
|
||||
Extends OpenAI context with Nova Sonic-specific message handling,
|
||||
conversation history management, and text buffering capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self, messages=None, tools=None, **kwargs):
|
||||
"""Initialize AWS Nova Sonic LLM context.
|
||||
|
||||
Args:
|
||||
messages: Initial messages for the context.
|
||||
tools: Available tools for the context.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
super().__init__(messages=messages, tools=tools, **kwargs)
|
||||
self.__setup_local()
|
||||
|
||||
def __setup_local(self, system_instruction: str = ""):
|
||||
self._assistant_text = ""
|
||||
self._user_text = ""
|
||||
self._system_instruction = system_instruction
|
||||
|
||||
@staticmethod
|
||||
def upgrade_to_nova_sonic(
|
||||
obj: OpenAILLMContext, system_instruction: str
|
||||
) -> "AWSNovaSonicLLMContext":
|
||||
"""Upgrade an OpenAI context to AWS Nova Sonic context.
|
||||
|
||||
Args:
|
||||
obj: The OpenAI context to upgrade.
|
||||
system_instruction: System instruction for the context.
|
||||
|
||||
Returns:
|
||||
The upgraded AWS Nova Sonic context.
|
||||
"""
|
||||
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
|
||||
obj.__class__ = AWSNovaSonicLLMContext
|
||||
obj.__setup_local(system_instruction)
|
||||
return obj
|
||||
|
||||
# NOTE: this method has the side-effect of updating _system_instruction from messages
|
||||
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
|
||||
"""Get conversation history for initializing AWS Nova Sonic session.
|
||||
|
||||
Processes stored messages and extracts system instruction and conversation
|
||||
history in the format expected by AWS Nova Sonic.
|
||||
|
||||
Returns:
|
||||
Formatted conversation history with system instruction and messages.
|
||||
"""
|
||||
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
|
||||
|
||||
# Bail if there are no messages
|
||||
if not self.messages:
|
||||
return history
|
||||
|
||||
messages = copy.deepcopy(self.messages)
|
||||
|
||||
# If we have a "system" message as our first message, let's pull that out into "instruction"
|
||||
if messages[0].get("role") == "system":
|
||||
system = messages.pop(0)
|
||||
content = system.get("content")
|
||||
if isinstance(content, str):
|
||||
history.system_instruction = content
|
||||
elif isinstance(content, list):
|
||||
history.system_instruction = content[0].get("text")
|
||||
if history.system_instruction:
|
||||
self._system_instruction = history.system_instruction
|
||||
|
||||
# Process remaining messages to fill out conversation history.
|
||||
# Nova Sonic supports "user" and "assistant" messages in history.
|
||||
for message in messages:
|
||||
history_message = self.from_standard_message(message)
|
||||
if history_message:
|
||||
history.messages.append(history_message)
|
||||
|
||||
return history
|
||||
|
||||
def get_messages_for_persistent_storage(self):
|
||||
"""Get messages formatted for persistent storage.
|
||||
|
||||
Returns:
|
||||
List of messages including system instruction if present.
|
||||
"""
|
||||
messages = super().get_messages_for_persistent_storage()
|
||||
# If we have a system instruction and messages doesn't already contain it, add it
|
||||
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
|
||||
messages.insert(0, {"role": "system", "content": self._system_instruction})
|
||||
return messages
|
||||
|
||||
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
|
||||
"""Convert standard message format to Nova Sonic format.
|
||||
|
||||
Args:
|
||||
message: Standard message dictionary to convert.
|
||||
|
||||
Returns:
|
||||
Nova Sonic conversation history message, or None if not convertible.
|
||||
"""
|
||||
role = message.get("role")
|
||||
if message.get("role") == "user" or message.get("role") == "assistant":
|
||||
content = message.get("content")
|
||||
if isinstance(message.get("content"), list):
|
||||
content = ""
|
||||
for c in message.get("content"):
|
||||
if c.get("type") == "text":
|
||||
content += " " + c.get("text")
|
||||
else:
|
||||
logger.error(
|
||||
f"Unhandled content type in context message: {c.get('type')} - {message}"
|
||||
)
|
||||
# There won't be content if this is an assistant tool call entry.
|
||||
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
|
||||
# history
|
||||
if content:
|
||||
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
|
||||
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
|
||||
# Sonic conversation history
|
||||
|
||||
def buffer_user_text(self, text):
|
||||
"""Buffer user text for later flushing to context.
|
||||
|
||||
Args:
|
||||
text: User text to buffer.
|
||||
"""
|
||||
self._user_text += f" {text}" if self._user_text else text
|
||||
# logger.debug(f"User text buffered: {self._user_text}")
|
||||
|
||||
def flush_aggregated_user_text(self) -> str:
|
||||
"""Flush buffered user text to context as a complete message.
|
||||
|
||||
Returns:
|
||||
The flushed user text, or empty string if no text was buffered.
|
||||
"""
|
||||
if not self._user_text:
|
||||
return ""
|
||||
user_text = self._user_text
|
||||
message = {
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": user_text}],
|
||||
}
|
||||
self._user_text = ""
|
||||
self.add_message(message)
|
||||
# logger.debug(f"Context updated (user): {self.get_messages_for_logging()}")
|
||||
return user_text
|
||||
|
||||
def buffer_assistant_text(self, text):
|
||||
"""Buffer assistant text for later flushing to context.
|
||||
|
||||
Args:
|
||||
text: Assistant text to buffer.
|
||||
"""
|
||||
self._assistant_text += text
|
||||
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
|
||||
|
||||
def flush_aggregated_assistant_text(self):
|
||||
"""Flush buffered assistant text to context as a complete message."""
|
||||
if not self._assistant_text:
|
||||
return
|
||||
message = {
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": self._assistant_text}],
|
||||
}
|
||||
self._assistant_text = ""
|
||||
self.add_message(message)
|
||||
# logger.debug(f"Context updated (assistant): {self.get_messages_for_logging()}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
|
||||
"""Frame containing updated AWS Nova Sonic context.
|
||||
|
||||
Parameters:
|
||||
context: The updated AWS Nova Sonic LLM context.
|
||||
"""
|
||||
|
||||
context: AWSNovaSonicLLMContext
|
||||
|
||||
|
||||
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
|
||||
"""Context aggregator for user messages in AWS Nova Sonic conversations.
|
||||
|
||||
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
|
||||
context update frames.
|
||||
"""
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
|
||||
):
|
||||
"""Process frames and emit Nova Sonic-specific context updates.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction the frame is traveling.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Parent does not push LLMMessagesUpdateFrame
|
||||
if isinstance(frame, LLMMessagesUpdateFrame):
|
||||
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
|
||||
|
||||
|
||||
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
|
||||
|
||||
Provides specialized handling for assistant responses and function calls
|
||||
in AWS Nova Sonic context, with custom frame processing logic.
|
||||
"""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames with Nova Sonic-specific logic.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction the frame is traveling.
|
||||
"""
|
||||
# HACK: For now, disable the context aggregator by making it just pass through all frames
|
||||
# that the parent handles (except the function call stuff, which we still need).
|
||||
# For an explanation of this hack, see
|
||||
# AWSNovaSonicLLMService._report_assistant_response_text_added.
|
||||
if isinstance(
|
||||
frame,
|
||||
(
|
||||
InterruptionFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
TextFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMSetToolChoiceFrame,
|
||||
UserImageRawFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
),
|
||||
):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
|
||||
"""Handle function call results for AWS Nova Sonic.
|
||||
|
||||
Args:
|
||||
frame: The function call result frame to handle.
|
||||
"""
|
||||
await super().handle_function_call_result(frame)
|
||||
|
||||
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
|
||||
# itself, so we didn't have a chance to add the result to the AWS Nova Sonic server-side
|
||||
# context. Let's push a special frame to do that.
|
||||
await self.push_frame(
|
||||
AWSNovaSonicFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicContextAggregatorPair:
|
||||
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
|
||||
|
||||
Parameters:
|
||||
_user: The user context aggregator.
|
||||
_assistant: The assistant context aggregator.
|
||||
"""
|
||||
|
||||
_user: AWSNovaSonicUserContextAggregator
|
||||
_assistant: AWSNovaSonicAssistantContextAggregator
|
||||
|
||||
def user(self) -> AWSNovaSonicUserContextAggregator:
|
||||
"""Get the user context aggregator.
|
||||
|
||||
Returns:
|
||||
The user context aggregator instance.
|
||||
"""
|
||||
return self._user
|
||||
|
||||
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
|
||||
"""Get the assistant context aggregator.
|
||||
|
||||
Returns:
|
||||
The assistant context aggregator instance.
|
||||
"""
|
||||
return self._assistant
|
||||
|
||||
@@ -10,78 +10,12 @@ This module provides specialized context aggregators and message handling for AW
|
||||
including conversation history management and role-specific message processing.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`.
|
||||
AWS Nova Sonic no longer uses types from this module under the hood.
|
||||
It now uses `LLMContext` and `LLMContextAggregatorPair`.
|
||||
Using the new patterns should allow you to not need types from this module.
|
||||
|
||||
BEFORE:
|
||||
```
|
||||
# Setup
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Context frame type
|
||||
frame: OpenAILLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.messages
|
||||
```
|
||||
|
||||
AFTER:
|
||||
```
|
||||
# Setup
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# Context frame type
|
||||
frame: LLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.get_messages()
|
||||
```
|
||||
See deprecation warning in pipecat.services.aws.nova_sonic.context for more
|
||||
details.
|
||||
"""
|
||||
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.services.aws_nova_sonic.context are deprecated. \n"
|
||||
"AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`. \n"
|
||||
"Using the new patterns should allow you to not need types from this module.\n\n"
|
||||
"BEFORE:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = OpenAILLMContext(messages, tools)\n"
|
||||
"context_aggregator = llm.create_context_aggregator(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: OpenAILLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: AWSNovaSonicLLMContext\n"
|
||||
"# or\n"
|
||||
"context: OpenAILLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```\n\n"
|
||||
"AFTER:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = LLMContext(messages, tools)\n"
|
||||
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: LLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: LLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
from pipecat.services.aws.nova_sonic.context import *
|
||||
|
||||
@@ -47,6 +47,7 @@ SENTENCE_ENDING_PUNCTUATION: FrozenSet[str] = frozenset(
|
||||
"!",
|
||||
"?",
|
||||
";",
|
||||
"…",
|
||||
# East Asian punctuation (Chinese (Traditional & Simplified), Japanese, Korean)
|
||||
"。", # Ideographic full stop
|
||||
"?", # Full-width question mark
|
||||
|
||||
Reference in New Issue
Block a user