Compare commits
4 Commits
mb/remove-
...
hush/openA
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e09028aca2 | ||
|
|
b17165b7ea | ||
|
|
19a4b97504 | ||
|
|
fda762d8e8 |
274
examples/foundational/99-open-ai-agent.py
Normal file
274
examples/foundational/99-open-ai-agent.py
Normal file
@@ -0,0 +1,274 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import field
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
import httpx
|
||||
from agents import Agent, Runner
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai import AsyncStream, BaseModel
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.ai_service import AIService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.base_llm import BaseOpenAILLMService
|
||||
from pipecat.services.openai.llm import (
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIContextAggregatorPair,
|
||||
OpenAILLMService,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class LlmMessage(BaseModel):
|
||||
# ...
|
||||
role: Literal["system", "user", "assistant", "tool"]
|
||||
content: Optional[str]
|
||||
|
||||
|
||||
class AgentResponse(BaseModel):
|
||||
content: str
|
||||
msgs: list[LlmMessage] = field(default_factory=list)
|
||||
|
||||
|
||||
class BackendBase(ABC):
|
||||
@abstractmethod
|
||||
async def get_resp(self, messages: list[LlmMessage], extra_params) -> AgentResponse:
|
||||
raise NotImplementedError("The method get_resp is not implemented.")
|
||||
|
||||
|
||||
class ChoiceDelta(BaseModel):
|
||||
content: Optional[str] = None
|
||||
"""The contents of the chunk message."""
|
||||
|
||||
|
||||
class Choice(BaseModel):
|
||||
delta: ChoiceDelta
|
||||
"""The contents of the chunk message."""
|
||||
|
||||
index: int
|
||||
"""The index of the choice in the list of choices."""
|
||||
|
||||
|
||||
class CustomLLMService(BaseOpenAILLMService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._client = Agent(
|
||||
name="Assistant agent",
|
||||
instructions="Respond with haikus.",
|
||||
# tools=[get_weather],
|
||||
)
|
||||
|
||||
def create_client(
|
||||
self,
|
||||
api_key=None,
|
||||
base_url=None,
|
||||
organization=None,
|
||||
project=None,
|
||||
default_headers=None,
|
||||
**kwargs,
|
||||
):
|
||||
return Agent(
|
||||
name="Assistant agent",
|
||||
instructions="Respond with haikus.",
|
||||
# tools=[get_weather],
|
||||
)
|
||||
|
||||
def create_context_aggregator(
|
||||
self,
|
||||
context: OpenAILLMContext,
|
||||
*,
|
||||
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
|
||||
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
|
||||
) -> OpenAIContextAggregatorPair:
|
||||
"""Create an instance of OpenAIContextAggregatorPair.
|
||||
|
||||
from an
|
||||
OpenAILLMContext. Constructor keyword arguments for both the user and
|
||||
assistant aggregators can be provided.
|
||||
|
||||
Args:
|
||||
context (OpenAILLMContext): The LLM context.
|
||||
user_params (LLMUserAggregatorParams, optional): User aggregator parameters.
|
||||
assistant_params (LLMAssistantAggregatorParams, optional): User aggregator parameters.
|
||||
|
||||
Returns:
|
||||
OpenAIContextAggregatorPair: A pair of context aggregators, one for
|
||||
the user and one for the assistant, encapsulated in an
|
||||
OpenAIContextAggregatorPair.
|
||||
|
||||
"""
|
||||
context.set_llm_adapter(self.get_llm_adapter())
|
||||
user = OpenAIUserContextAggregator(context, params=user_params)
|
||||
assistant = OpenAIAssistantContextAggregator(context, params=assistant_params)
|
||||
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
|
||||
async def _process_context(self, context: OpenAILLMContext):
|
||||
functions_list = []
|
||||
arguments_list = []
|
||||
tool_id_list = []
|
||||
func_idx = 0
|
||||
function_name = ""
|
||||
arguments = ""
|
||||
tool_call_id = ""
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
result = Runner.run_streamed(
|
||||
# context=context,
|
||||
starting_agent=self._client,
|
||||
input=context.messages, # messages
|
||||
# ---
|
||||
# no func tool
|
||||
# input="give me a 2 sentences about life",
|
||||
)
|
||||
|
||||
logger.info(f"get_chat_completions: {result}")
|
||||
|
||||
if result is None:
|
||||
logger.error("Runner.run_streamed returned None")
|
||||
return
|
||||
|
||||
async for event in result.stream_events():
|
||||
if event.type == "raw_response_event":
|
||||
if event.data.type == "response.output_text.delta":
|
||||
await self.push_frame(LLMTextFrame(event.data.delta))
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
context = None
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
context: OpenAILLMContext = frame.context
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
if context:
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self._process_context(context)
|
||||
except httpx.TimeoutException:
|
||||
await self._call_event_handler("on_completion_timeout")
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = CustomLLMService(model="gpt-4.1", api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages=messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
# messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
# await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
122
examples/foundational/agent.py
Normal file
122
examples/foundational/agent.py
Normal file
@@ -0,0 +1,122 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from agents import (
|
||||
Agent,
|
||||
FunctionTool,
|
||||
HandoffOutputItem,
|
||||
ItemHelpers,
|
||||
MessageOutputItem,
|
||||
RunContextWrapper,
|
||||
Runner,
|
||||
ToolCallItem,
|
||||
ToolCallOutputItem,
|
||||
function_tool,
|
||||
set_default_openai_api,
|
||||
set_default_openai_client,
|
||||
set_tracing_disabled,
|
||||
trace,
|
||||
)
|
||||
from httpx import get
|
||||
|
||||
|
||||
@function_tool
|
||||
async def get_weather(location: str) -> str:
|
||||
"""Fetch the weather for today.
|
||||
|
||||
Args:
|
||||
location: The location to fetch the weather for.
|
||||
"""
|
||||
return f"{location} is sunny"
|
||||
|
||||
|
||||
system_prompt = """
|
||||
you are a helpful assistant for a real estate brokerage AI assistant.
|
||||
"""
|
||||
|
||||
|
||||
bot = Agent(
|
||||
name="Assistant agent",
|
||||
instructions=system_prompt,
|
||||
# tools=[get_weather],
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
# res = await Runner.run(
|
||||
# starting_agent=bot,
|
||||
# input="What is the weather today?",
|
||||
# )
|
||||
# print(res)
|
||||
|
||||
result = Runner.run_streamed(
|
||||
starting_agent=bot,
|
||||
# ---
|
||||
# with func tool
|
||||
input="Tell a joke about pirates.",
|
||||
# ---
|
||||
# no func tool
|
||||
# input="give me a 2 sentences about life",
|
||||
)
|
||||
|
||||
final = []
|
||||
async for event in result.stream_events():
|
||||
# We'll ignore the raw responses event deltas
|
||||
name = getattr(event, "name", None)
|
||||
# print(f"Event: {event.type} - name {name}")
|
||||
# print(event)
|
||||
# continue
|
||||
if event.type == "raw_response_event":
|
||||
if event.data.type == "response.output_text.delta":
|
||||
final += event.data.delta
|
||||
|
||||
print(f"raw resp: {event}")
|
||||
# When the agent updates, print that
|
||||
elif event.type == "agent_updated_stream_event":
|
||||
print(f"Agent updated: {event.new_agent.name}")
|
||||
continue
|
||||
# When items are generated, print them
|
||||
elif event.type == "run_item_stream_event":
|
||||
if event.item.type == "tool_call_item":
|
||||
print("-- Tool was called")
|
||||
elif event.item.type == "tool_call_output_item":
|
||||
print(f"-- Tool output: {event.item.output}")
|
||||
elif event.item.type == "message_output_item":
|
||||
print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")
|
||||
else:
|
||||
print(f"-- Unknown item type: {event.item.type}")
|
||||
pass # Ignore other event types
|
||||
else:
|
||||
print(f"-- Unknown out item type: {event.item.type}")
|
||||
|
||||
print(f"----------------------")
|
||||
|
||||
print(f"FinalFinalFinal: {''.join(final)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
# no func tool:
|
||||
#
|
||||
# Event: agent_updated_stream_event - name None
|
||||
# Event: raw_response_event - name None
|
||||
# ...
|
||||
# Event: raw_response_event - name None
|
||||
# Event: run_item_stream_event - name message_output_created
|
||||
|
||||
# with func tool:
|
||||
#
|
||||
# Event: agent_updated_stream_event - name None
|
||||
# Event: raw_response_event - name None
|
||||
# ...
|
||||
# Event: raw_response_event - name None
|
||||
# Event: run_item_stream_event - name tool_called
|
||||
# Event: run_item_stream_event - name tool_output
|
||||
# Event: raw_response_event - name None
|
||||
# ...
|
||||
# Event: raw_response_event - name None
|
||||
# Event: run_item_stream_event - name message_output_created
|
||||
Reference in New Issue
Block a user