Compare commits

..

3 Commits

Author SHA1 Message Date
James Hush
c6d759e043 Save 2025-05-09 12:17:15 +08:00
kompfner
9643296e29 Merge pull request #1779 from pipecat-ai/pk/aws-nova-sonic-missing-params-export
Add missing `Params` export to AWS Nova Sonic module
2025-05-08 16:04:38 -04:00
Paul Kompfner
c83c5b5a34 Add missing Params export to AWS Nova Sonic module 2025-05-08 15:23:25 -04:00
4 changed files with 14 additions and 407 deletions

View File

@@ -34,6 +34,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.google.llm import GoogleLLMContext, GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -286,16 +287,18 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
conversation_llm = GoogleLLMService(
name="Conversation",
model="gemini-2.0-flash-001",
# model="gemini-exp-1121",
api_key=os.getenv("GOOGLE_API_KEY"),
# we can give the GoogleLLMService a system instruction to use directly
# in the GenerativeModel constructor. Let's do that rather than put
# our system message in the messages list.
system_instruction=conversation_system_message,
)
# conversation_llm = GoogleLLMService(
# name="Conversation",
# model="gemini-2.0-flash-001",
# # model="gemini-exp-1121",
# api_key=os.getenv("GOOGLE_API_KEY"),
# # we can give the GoogleLLMService a system instruction to use directly
# # in the GenerativeModel constructor. Let's do that rather than put
# # our system message in the messages list.
# system_instruction=conversation_system_message,
# )
conversation_llm = OpenAILLMService(name="Conversation", api_key=os.getenv("OPENAI_API_KEY"))
input_transcription_llm = GoogleLLMService(
name="Transcription",

View File

@@ -1,274 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from abc import ABC, abstractmethod
from dataclasses import field
from typing import List, Literal, Optional
import httpx
from agents import Agent, Runner
from dotenv import load_dotenv
from loguru import logger
from openai import AsyncStream, BaseModel
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMTextFrame,
LLMUpdateSettingsFrame,
VisionImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_service import AIService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIContextAggregatorPair,
OpenAILLMService,
OpenAIUserContextAggregator,
)
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
class LlmMessage(BaseModel):
# ...
role: Literal["system", "user", "assistant", "tool"]
content: Optional[str]
class AgentResponse(BaseModel):
content: str
msgs: list[LlmMessage] = field(default_factory=list)
class BackendBase(ABC):
@abstractmethod
async def get_resp(self, messages: list[LlmMessage], extra_params) -> AgentResponse:
raise NotImplementedError("The method get_resp is not implemented.")
class ChoiceDelta(BaseModel):
content: Optional[str] = None
"""The contents of the chunk message."""
class Choice(BaseModel):
delta: ChoiceDelta
"""The contents of the chunk message."""
index: int
"""The index of the choice in the list of choices."""
class CustomLLMService(BaseOpenAILLMService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._client = Agent(
name="Assistant agent",
instructions="Respond with haikus.",
# tools=[get_weather],
)
def create_client(
self,
api_key=None,
base_url=None,
organization=None,
project=None,
default_headers=None,
**kwargs,
):
return Agent(
name="Assistant agent",
instructions="Respond with haikus.",
# tools=[get_weather],
)
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> OpenAIContextAggregatorPair:
"""Create an instance of OpenAIContextAggregatorPair.
from an
OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_params (LLMUserAggregatorParams, optional): User aggregator parameters.
assistant_params (LLMAssistantAggregatorParams, optional): User aggregator parameters.
Returns:
OpenAIContextAggregatorPair: A pair of context aggregators, one for
the user and one for the assistant, encapsulated in an
OpenAIContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
user = OpenAIUserContextAggregator(context, params=user_params)
assistant = OpenAIAssistantContextAggregator(context, params=assistant_params)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
async def _process_context(self, context: OpenAILLMContext):
functions_list = []
arguments_list = []
tool_id_list = []
func_idx = 0
function_name = ""
arguments = ""
tool_call_id = ""
await self.start_ttfb_metrics()
result = Runner.run_streamed(
# context=context,
starting_agent=self._client,
input=context.messages, # messages
# ---
# no func tool
# input="give me a 2 sentences about life",
)
logger.info(f"get_chat_completions: {result}")
if result is None:
logger.error("Runner.run_streamed returned None")
return
async for event in result.stream_events():
if event.type == "raw_response_event":
if event.data.type == "response.output_text.delta":
await self.push_frame(LLMTextFrame(event.data.delta))
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
context = None
if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
elif isinstance(frame, LLMMessagesFrame):
context = OpenAILLMContext.from_messages(frame.messages)
else:
await self.push_frame(frame, direction)
if context:
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(context)
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = CustomLLMService(model="gpt-4.1", api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages=messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
# messages.append({"role": "system", "content": "Please introduce yourself to the user."})
# await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -1,122 +0,0 @@
import asyncio
import logging
import os
from datetime import datetime
from agents import (
Agent,
FunctionTool,
HandoffOutputItem,
ItemHelpers,
MessageOutputItem,
RunContextWrapper,
Runner,
ToolCallItem,
ToolCallOutputItem,
function_tool,
set_default_openai_api,
set_default_openai_client,
set_tracing_disabled,
trace,
)
from httpx import get
@function_tool
async def get_weather(location: str) -> str:
"""Fetch the weather for today.
Args:
location: The location to fetch the weather for.
"""
return f"{location} is sunny"
system_prompt = """
you are a helpful assistant for a real estate brokerage AI assistant.
"""
bot = Agent(
name="Assistant agent",
instructions=system_prompt,
# tools=[get_weather],
)
async def main():
# res = await Runner.run(
# starting_agent=bot,
# input="What is the weather today?",
# )
# print(res)
result = Runner.run_streamed(
starting_agent=bot,
# ---
# with func tool
input="Tell a joke about pirates.",
# ---
# no func tool
# input="give me a 2 sentences about life",
)
final = []
async for event in result.stream_events():
# We'll ignore the raw responses event deltas
name = getattr(event, "name", None)
# print(f"Event: {event.type} - name {name}")
# print(event)
# continue
if event.type == "raw_response_event":
if event.data.type == "response.output_text.delta":
final += event.data.delta
print(f"raw resp: {event}")
# When the agent updates, print that
elif event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
continue
# When items are generated, print them
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print("-- Tool was called")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")
else:
print(f"-- Unknown item type: {event.item.type}")
pass # Ignore other event types
else:
print(f"-- Unknown out item type: {event.item.type}")
print(f"----------------------")
print(f"FinalFinalFinal: {''.join(final)}")
if __name__ == "__main__":
asyncio.run(main())
# no func tool:
#
# Event: agent_updated_stream_event - name None
# Event: raw_response_event - name None
# ...
# Event: raw_response_event - name None
# Event: run_item_stream_event - name message_output_created
# with func tool:
#
# Event: agent_updated_stream_event - name None
# Event: raw_response_event - name None
# ...
# Event: raw_response_event - name None
# Event: run_item_stream_event - name tool_called
# Event: run_item_stream_event - name tool_output
# Event: raw_response_event - name None
# ...
# Event: raw_response_event - name None
# Event: run_item_stream_event - name message_output_created

View File

@@ -1 +1 @@
from .aws import AWSNovaSonicLLMService
from .aws import AWSNovaSonicLLMService, Params