206 lines
6.8 KiB
Python
206 lines
6.8 KiB
Python
#
|
||
# Copyright (c) 2024–2025, Daily
|
||
#
|
||
# SPDX-License-Identifier: BSD 2-Clause License
|
||
#
|
||
|
||
"""
|
||
Basic OpenAI Agent service example.
|
||
|
||
This example demonstrates how to use the OpenAI Agents SDK within a Pipecat
|
||
pipeline to create an interactive agent with tool calling capabilities.
|
||
|
||
Requirements:
|
||
- OpenAI API key
|
||
- OpenAI Agents SDK: pip install openai-agents
|
||
"""
|
||
|
||
import os
|
||
import random
|
||
from typing import Any, List
|
||
|
||
# Import agents SDK for tools and agent creation
|
||
from agents import Agent, function_tool
|
||
from dotenv import load_dotenv
|
||
from loguru import logger
|
||
from openai.types.chat import ChatCompletionMessageParam
|
||
|
||
from pipecat.frames.frames import LLMRunFrame, TextFrame
|
||
from pipecat.pipeline.pipeline import Pipeline
|
||
from pipecat.pipeline.runner import PipelineRunner
|
||
from pipecat.pipeline.task import PipelineTask
|
||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||
from pipecat.runner.types import RunnerArguments
|
||
from pipecat.runner.utils import create_transport
|
||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||
from pipecat.transports.daily.transport import DailyParams
|
||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||
|
||
load_dotenv(override=True)
|
||
|
||
# Transport configuration
|
||
transport_params = {
|
||
"daily": lambda: DailyParams(audio_out_enabled=True, audio_in_enabled=True),
|
||
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True, audio_in_enabled=True),
|
||
"webrtc": lambda: TransportParams(audio_out_enabled=True, audio_in_enabled=True),
|
||
}
|
||
|
||
|
||
@function_tool
|
||
def get_weather(location: str) -> str:
|
||
"""Get the current weather for a location.
|
||
|
||
Args:
|
||
location: The location to get weather for
|
||
|
||
Returns:
|
||
A weather description string
|
||
"""
|
||
# Mock weather data - in real usage, integrate with weather API
|
||
weather_data = {
|
||
"San Francisco": "Foggy, 65°F",
|
||
"New York": "Sunny, 72°F",
|
||
"London": "Rainy, 59°F",
|
||
"Tokyo": "Partly cloudy, 68°F",
|
||
}
|
||
return weather_data.get(location, f"Weather data not available for {location}")
|
||
|
||
|
||
@function_tool
|
||
def get_random_fact() -> str:
|
||
"""Get a random interesting fact.
|
||
|
||
Returns:
|
||
A random fact string
|
||
"""
|
||
facts = [
|
||
"Honey never spoils. Archaeologists have found edible honey in ancient Egyptian tombs.",
|
||
"Octopuses have three hearts and blue blood.",
|
||
"The Great Wall of China isn't visible from space with the naked eye.",
|
||
"Bananas are berries, but strawberries aren't.",
|
||
]
|
||
return random.choice(facts)
|
||
|
||
|
||
def get_random_fact_tool():
|
||
"""Example tool function for random facts."""
|
||
|
||
def get_random_fact() -> str:
|
||
"""Get a random interesting fact.
|
||
|
||
Returns:
|
||
A random fact string.
|
||
"""
|
||
facts = [
|
||
"Honey never spoils. Archaeologists have found edible honey in ancient Egyptian tombs.",
|
||
"A group of flamingos is called a 'flamboyance'.",
|
||
"Octopuses have three hearts and blue blood.",
|
||
"The Great Wall of China isn't visible from space with the naked eye.",
|
||
"Bananas are berries, but strawberries aren't.",
|
||
]
|
||
return random.choice(facts)
|
||
|
||
return get_random_fact
|
||
|
||
|
||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||
logger.info("Starting OpenAI Agent bot")
|
||
|
||
# Set up STT for speech recognition
|
||
stt = DeepgramSTTService(
|
||
api_key=os.getenv("DEEPGRAM_API_KEY", ""),
|
||
model="nova-2",
|
||
)
|
||
|
||
# Set up TTS for voice output
|
||
tts = CartesiaTTSService(
|
||
api_key=os.getenv("CARTESIA_API_KEY", ""),
|
||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||
)
|
||
|
||
# Create tools for the agent
|
||
tools: list[Any] = [
|
||
get_weather,
|
||
get_random_fact,
|
||
]
|
||
|
||
# Create the agent with tools
|
||
agent = Agent(
|
||
name="Assistant",
|
||
instructions="""You are a helpful assistant with access to weather information and random facts.
|
||
You can:
|
||
- Check weather for any location using the get_weather tool
|
||
- Share interesting facts using the get_random_fact tool
|
||
- Have natural conversations
|
||
|
||
Be friendly, informative, and engaging in your responses.""",
|
||
tools=tools,
|
||
)
|
||
|
||
# Initialize the OpenAI Agent service with the pre-configured agent
|
||
agent_service = OpenAIAgentService(
|
||
agent=agent,
|
||
api_key=os.getenv("OPENAI_API_KEY"),
|
||
streaming=True,
|
||
)
|
||
|
||
# Set up conversation context with initial system message
|
||
messages: List[ChatCompletionMessageParam] = [
|
||
{
|
||
"role": "system",
|
||
"content": "You are a helpful assistant with access to weather information and random facts. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||
},
|
||
]
|
||
|
||
context = OpenAILLMContext(messages)
|
||
context_aggregator = agent_service.create_context_aggregator(context)
|
||
|
||
# Create the processing pipeline with context aggregators
|
||
pipeline = Pipeline(
|
||
[
|
||
transport.input(), # Transport user input
|
||
stt, # Speech to text
|
||
context_aggregator.user(), # User responses
|
||
agent_service, # OpenAI Agent processing
|
||
tts, # Text to speech
|
||
transport.output(), # Transport bot output
|
||
context_aggregator.assistant(), # Assistant spoken responses
|
||
]
|
||
)
|
||
|
||
task = PipelineTask(
|
||
pipeline,
|
||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||
)
|
||
|
||
# Send an initial greeting when client connects
|
||
@transport.event_handler("on_client_connected")
|
||
async def on_client_connected(transport, client):
|
||
logger.info("Client connected, sending greeting")
|
||
# Kick off the conversation by adding system message and running LLM
|
||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||
await task.queue_frames([LLMRunFrame()])
|
||
|
||
@transport.event_handler("on_client_disconnected")
|
||
async def on_client_disconnected(transport, client):
|
||
logger.info("Client disconnected")
|
||
await task.cancel()
|
||
|
||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||
await runner.run(task)
|
||
|
||
|
||
async def bot(runner_args: RunnerArguments):
|
||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||
transport = await create_transport(runner_args, transport_params)
|
||
await run_bot(transport, runner_args)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
from pipecat.runner.run import main
|
||
|
||
main()
|