Compare commits
8 Commits
mb/remove-
...
hush/openA
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
29d4a56663 | ||
|
|
373a09ecd6 | ||
|
|
07f54c48f3 | ||
|
|
c8a3d65aa4 | ||
|
|
50a2a0dc86 | ||
|
|
0421d97954 | ||
|
|
54c8f336c3 | ||
|
|
b086fbafe6 |
285
AGENTS.md
Normal file
285
AGENTS.md
Normal file
@@ -0,0 +1,285 @@
|
||||
# AGENTS.md
|
||||
|
||||
## Project Overview
|
||||
|
||||
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. The codebase is organized around a pipeline architecture where data flows through connected services (STT → LLM → TTS).
|
||||
|
||||
## Development Environment Setup
|
||||
|
||||
### Prerequisites
|
||||
- **Minimum Python Version:** 3.10
|
||||
- **Recommended Python Version:** 3.12
|
||||
- **Package Manager:** uv (recommended) or pip
|
||||
|
||||
### Setup Commands
|
||||
|
||||
```bash
|
||||
# Clone the repository
|
||||
git clone https://github.com/pipecat-ai/pipecat.git
|
||||
cd pipecat
|
||||
|
||||
# Install dependencies with uv (recommended)
|
||||
uv sync --group dev --all-extras \
|
||||
--no-extra gstreamer \
|
||||
--no-extra krisp \
|
||||
--no-extra local \
|
||||
--no-extra ultravox
|
||||
|
||||
# Or with pip
|
||||
pip install -e ".[dev]"
|
||||
|
||||
# Install pre-commit hooks
|
||||
uv run pre-commit install
|
||||
|
||||
# Set up environment variables
|
||||
cp env.example .env
|
||||
```
|
||||
|
||||
## Build and Test Commands
|
||||
|
||||
### Running Tests
|
||||
```bash
|
||||
# Run all tests
|
||||
uv run pytest
|
||||
|
||||
# Run specific test file
|
||||
uv run pytest tests/test_name.py
|
||||
|
||||
# Run tests with coverage
|
||||
uv run pytest --cov=pipecat --cov-report=html
|
||||
```
|
||||
|
||||
### Code Quality
|
||||
```bash
|
||||
# Format code (required before commits)
|
||||
uv run ruff format
|
||||
|
||||
# Lint code
|
||||
uv run ruff check
|
||||
|
||||
# Type checking
|
||||
uv run mypy src/pipecat
|
||||
|
||||
# Run pre-commit checks manually
|
||||
uv run pre-commit run --all-files
|
||||
```
|
||||
|
||||
### Documentation
|
||||
```bash
|
||||
# Build API documentation
|
||||
cd docs/api
|
||||
./build-docs.sh
|
||||
|
||||
# Build docs manually
|
||||
sphinx-build -b html . _build/html -W --keep-going
|
||||
```
|
||||
|
||||
## Code Style Guidelines
|
||||
|
||||
### Python Standards
|
||||
- **Formatting:** Strict PEP 8 via Ruff
|
||||
- **Docstrings:** Google-style format
|
||||
- **Type Hints:** Required for all public APIs
|
||||
- **Import Organization:** Automated via Ruff
|
||||
|
||||
### Docstring Conventions
|
||||
- **Classes:** Describe purpose + `__init__` with complete `Args:` section
|
||||
- **Dataclasses:** Use `Parameters:` section, no `__init__` docstring
|
||||
- **Methods:** Include `Args:` and `Returns:` sections
|
||||
- **Properties:** Must have `Returns:` section
|
||||
- **Examples:** Use `Examples:` section with `::` syntax
|
||||
|
||||
### File Organization
|
||||
```
|
||||
src/pipecat/ # Main package
|
||||
├── processors/ # Frame processors
|
||||
├── services/ # AI service integrations
|
||||
├── transports/ # Communication layers
|
||||
├── frames/ # Data frame definitions
|
||||
└── pipeline/ # Pipeline orchestration
|
||||
|
||||
examples/foundational/ # Step-by-step tutorials
|
||||
tests/ # Test suite
|
||||
```
|
||||
|
||||
## Testing Instructions
|
||||
|
||||
### Test Structure
|
||||
- **Unit Tests:** Test individual components in isolation
|
||||
- **Integration Tests:** Test service interactions
|
||||
- **Example Tests:** Validate foundational examples work
|
||||
|
||||
### Adding Tests
|
||||
```bash
|
||||
# Test naming convention
|
||||
test_<component>_<functionality>.py
|
||||
|
||||
# Run specific test pattern
|
||||
uv run pytest -k "test_pipeline"
|
||||
|
||||
# Run with debugging
|
||||
uv run pytest -s -vv tests/test_name.py::test_function
|
||||
```
|
||||
|
||||
### Pre-commit Requirements
|
||||
All commits must pass:
|
||||
- Ruff formatting
|
||||
- Ruff linting
|
||||
- Type checking
|
||||
- Basic test suite
|
||||
|
||||
## Dependency Management
|
||||
|
||||
### Using uv (Recommended)
|
||||
```bash
|
||||
# Add runtime dependency
|
||||
uv add package-name
|
||||
|
||||
# Add optional dependency
|
||||
uv add --optional service package-name
|
||||
|
||||
# Add development dependency
|
||||
uv add --group dev package-name
|
||||
|
||||
# Update lockfile
|
||||
uv lock
|
||||
|
||||
# Sync dependencies
|
||||
uv sync
|
||||
```
|
||||
|
||||
### Important Notes
|
||||
- **Always commit both `pyproject.toml` and `uv.lock` together**
|
||||
- **Never manually edit `uv.lock`** - it's auto-generated
|
||||
- **Use extras for optional service dependencies** (e.g., `[openai]`, `[cartesia]`)
|
||||
|
||||
## Project Structure Guidelines
|
||||
|
||||
### Service Integration
|
||||
When adding new AI services:
|
||||
1. Create service class in `src/pipecat/services/<provider>/`
|
||||
2. Follow existing patterns (e.g., STTService, LLMService)
|
||||
3. Add to appropriate extras in `pyproject.toml`
|
||||
4. Include tests in `tests/`
|
||||
5. Add documentation examples
|
||||
|
||||
### Frame Processing
|
||||
For custom processors:
|
||||
1. Inherit from `FrameProcessor`
|
||||
2. Implement `process_frame()` method. ALWAYS explicitly call `await super().process_frame(frame, direction)` at the top of this method.
|
||||
3. Handle frame direction (FrameDirection.UPSTREAM/DOWNSTREAM)
|
||||
4. Add proper type hints and docstrings
|
||||
|
||||
### Transport Implementation
|
||||
For new transport layers:
|
||||
1. Inherit from `BaseTransport`
|
||||
2. Implement required abstract methods
|
||||
3. Handle connection lifecycle
|
||||
4. Support both input and output streams
|
||||
|
||||
## Security Considerations
|
||||
|
||||
### API Keys
|
||||
- **Never commit API keys** to the repository
|
||||
- **Use environment variables** for all secrets
|
||||
- **Reference `env.example`** for required variables
|
||||
- **Use `.env` files** for local development
|
||||
|
||||
### Input Validation
|
||||
- **Validate all external inputs** (audio, text, API responses)
|
||||
- **Sanitize user data** before processing
|
||||
- **Handle rate limiting** for external services
|
||||
- **Implement proper timeout handling**
|
||||
|
||||
## Performance Guidelines
|
||||
|
||||
### Memory Management
|
||||
- **Clean up resources** in transport disconnection handlers
|
||||
- **Use async context managers** for service connections
|
||||
- **Implement proper frame lifecycle** management
|
||||
|
||||
### Latency Optimization
|
||||
- **Choose appropriate STT services** for latency requirements
|
||||
- **Use streaming TTS** when possible
|
||||
- **Implement connection pooling** for HTTP services
|
||||
- **Consider WebRTC** for real-time applications
|
||||
|
||||
## Common Patterns
|
||||
|
||||
### Error Handling
|
||||
```python
|
||||
@transport.event_handler("on_error")
|
||||
async def on_error(transport, error):
|
||||
logger.error(f"Transport error: {error}")
|
||||
|
||||
# Shutdown the pipeline
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
```
|
||||
|
||||
### Service Configuration
|
||||
```python
|
||||
# Use environment variables for configuration
|
||||
service = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY", ""),
|
||||
model="gpt-4o",
|
||||
params={"temperature": 0.7}
|
||||
)
|
||||
```
|
||||
|
||||
### Pipeline Assembly
|
||||
```python
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
stt_service,
|
||||
context_aggregator.user(),
|
||||
llm_service,
|
||||
tts_service,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
])
|
||||
```
|
||||
|
||||
## Commit and PR Guidelines
|
||||
|
||||
### Commit Message Format
|
||||
```
|
||||
<type>(<scope>): <description>
|
||||
|
||||
[optional body]
|
||||
|
||||
[optional footer]
|
||||
```
|
||||
|
||||
Types: `feat`, `fix`, `docs`, `style`, `refactor`, `test`, `chore`
|
||||
|
||||
### PR Requirements
|
||||
- **All tests must pass**
|
||||
- **Code must be properly formatted** (Ruff)
|
||||
- **Include appropriate tests** for new functionality
|
||||
- **Update documentation** if needed
|
||||
- **Reference related issues** in description
|
||||
|
||||
### Review Process
|
||||
1. Automated checks must pass
|
||||
2. Manual code review by maintainers
|
||||
3. Documentation review for user-facing changes
|
||||
4. Integration testing for service additions
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
- **Import errors:** Run `uv sync` to ensure dependencies are installed
|
||||
- **Test failures:** Check environment variables in `.env`
|
||||
- **Format errors:** Run `uv run ruff format` before committing
|
||||
- **Type errors:** Ensure all public methods have type hints
|
||||
|
||||
### Development Tips
|
||||
- **Use foundational examples** as starting points for testing
|
||||
- **Check existing services** for integration patterns
|
||||
- **Run tests frequently** during development
|
||||
- **Use IDE integration** for Ruff formatting
|
||||
|
||||
### Getting Help
|
||||
- **Documentation:** [docs.pipecat.ai](https://docs.pipecat.ai)
|
||||
- **Issues:** [GitHub Issues](https://github.com/pipecat-ai/pipecat/issues)
|
||||
205
examples/foundational/45-openai-agent-basic.py
Normal file
205
examples/foundational/45-openai-agent-basic.py
Normal file
@@ -0,0 +1,205 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""
|
||||
Basic OpenAI Agent service example.
|
||||
|
||||
This example demonstrates how to use the OpenAI Agents SDK within a Pipecat
|
||||
pipeline to create an interactive agent with tool calling capabilities.
|
||||
|
||||
Requirements:
|
||||
- OpenAI API key
|
||||
- OpenAI Agents SDK: pip install openai-agents
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
from typing import Any, List
|
||||
|
||||
# Import agents SDK for tools and agent creation
|
||||
from agents import Agent, function_tool
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.frames.frames import LLMRunFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Transport configuration
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(audio_out_enabled=True, audio_in_enabled=True),
|
||||
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True, audio_in_enabled=True),
|
||||
"webrtc": lambda: TransportParams(audio_out_enabled=True, audio_in_enabled=True),
|
||||
}
|
||||
|
||||
|
||||
@function_tool
|
||||
def get_weather(location: str) -> str:
|
||||
"""Get the current weather for a location.
|
||||
|
||||
Args:
|
||||
location: The location to get weather for
|
||||
|
||||
Returns:
|
||||
A weather description string
|
||||
"""
|
||||
# Mock weather data - in real usage, integrate with weather API
|
||||
weather_data = {
|
||||
"San Francisco": "Foggy, 65°F",
|
||||
"New York": "Sunny, 72°F",
|
||||
"London": "Rainy, 59°F",
|
||||
"Tokyo": "Partly cloudy, 68°F",
|
||||
}
|
||||
return weather_data.get(location, f"Weather data not available for {location}")
|
||||
|
||||
|
||||
@function_tool
|
||||
def get_random_fact() -> str:
|
||||
"""Get a random interesting fact.
|
||||
|
||||
Returns:
|
||||
A random fact string
|
||||
"""
|
||||
facts = [
|
||||
"Honey never spoils. Archaeologists have found edible honey in ancient Egyptian tombs.",
|
||||
"Octopuses have three hearts and blue blood.",
|
||||
"The Great Wall of China isn't visible from space with the naked eye.",
|
||||
"Bananas are berries, but strawberries aren't.",
|
||||
]
|
||||
return random.choice(facts)
|
||||
|
||||
|
||||
def get_random_fact_tool():
|
||||
"""Example tool function for random facts."""
|
||||
|
||||
def get_random_fact() -> str:
|
||||
"""Get a random interesting fact.
|
||||
|
||||
Returns:
|
||||
A random fact string.
|
||||
"""
|
||||
facts = [
|
||||
"Honey never spoils. Archaeologists have found edible honey in ancient Egyptian tombs.",
|
||||
"A group of flamingos is called a 'flamboyance'.",
|
||||
"Octopuses have three hearts and blue blood.",
|
||||
"The Great Wall of China isn't visible from space with the naked eye.",
|
||||
"Bananas are berries, but strawberries aren't.",
|
||||
]
|
||||
return random.choice(facts)
|
||||
|
||||
return get_random_fact
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info("Starting OpenAI Agent bot")
|
||||
|
||||
# Set up STT for speech recognition
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY", ""),
|
||||
model="nova-2",
|
||||
)
|
||||
|
||||
# Set up TTS for voice output
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY", ""),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
# Create tools for the agent
|
||||
tools: list[Any] = [
|
||||
get_weather,
|
||||
get_random_fact,
|
||||
]
|
||||
|
||||
# Create the agent with tools
|
||||
agent = Agent(
|
||||
name="Assistant",
|
||||
instructions="""You are a helpful assistant with access to weather information and random facts.
|
||||
You can:
|
||||
- Check weather for any location using the get_weather tool
|
||||
- Share interesting facts using the get_random_fact tool
|
||||
- Have natural conversations
|
||||
|
||||
Be friendly, informative, and engaging in your responses.""",
|
||||
tools=tools,
|
||||
)
|
||||
|
||||
# Initialize the OpenAI Agent service with the pre-configured agent
|
||||
agent_service = OpenAIAgentService(
|
||||
agent=agent,
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Set up conversation context with initial system message
|
||||
messages: List[ChatCompletionMessageParam] = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful assistant with access to weather information and random facts. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = agent_service.create_context_aggregator(context)
|
||||
|
||||
# Create the processing pipeline with context aggregators
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # Speech to text
|
||||
context_aggregator.user(), # User responses
|
||||
agent_service, # OpenAI Agent processing
|
||||
tts, # Text to speech
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
# Send an initial greeting when client connects
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected, sending greeting")
|
||||
# Kick off the conversation by adding system message and running LLM
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
276
examples/foundational/46-openai-agent-handoffs.py
Normal file
276
examples/foundational/46-openai-agent-handoffs.py
Normal file
@@ -0,0 +1,276 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""
|
||||
Advanced OpenAI Agent service example with handoffs.
|
||||
|
||||
This example demonstrates how to use multiple agents with handoffs in the
|
||||
OpenAI Agents SDK within a Pipecat pipeline, showcasing agent orchestration
|
||||
and specialization.
|
||||
|
||||
Requirements:
|
||||
- OpenAI API key
|
||||
- OpenAI Agents SDK: pip install openai-agents
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.frames.frames import LLMRunFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Transport configuration
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(audio_out_enabled=True, audio_in_enabled=True),
|
||||
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True, audio_in_enabled=True),
|
||||
"webrtc": lambda: TransportParams(audio_out_enabled=True, audio_in_enabled=True),
|
||||
}
|
||||
|
||||
|
||||
def create_weather_tools():
|
||||
"""Create weather-related tools."""
|
||||
|
||||
def get_weather(location: str) -> str:
|
||||
"""Get current weather for a location."""
|
||||
conditions = ["sunny", "cloudy", "rainy", "snowy", "windy"]
|
||||
temp = random.randint(-10, 35)
|
||||
condition = random.choice(conditions)
|
||||
return f"The weather in {location} is {condition} with a temperature of {temp}°C."
|
||||
|
||||
def get_forecast(location: str, days: int = 3) -> str:
|
||||
"""Get weather forecast for multiple days."""
|
||||
forecast = []
|
||||
for i in range(days):
|
||||
conditions = ["sunny", "cloudy", "rainy", "snowy"]
|
||||
temp = random.randint(-5, 30)
|
||||
condition = random.choice(conditions)
|
||||
day = "today" if i == 0 else f"in {i} day{'s' if i > 1 else ''}"
|
||||
forecast.append(f"{day.capitalize()}: {condition}, {temp}°C")
|
||||
return f"Weather forecast for {location}:\n" + "\n".join(forecast)
|
||||
|
||||
return [get_weather, get_forecast]
|
||||
|
||||
|
||||
def create_trivia_tools():
|
||||
"""Create trivia and fact tools."""
|
||||
|
||||
def get_random_fact() -> str:
|
||||
"""Get a random interesting fact."""
|
||||
facts = [
|
||||
"Honey never spoils. Archaeologists have found edible honey in ancient Egyptian tombs.",
|
||||
"A group of flamingos is called a 'flamboyance'.",
|
||||
"Octopuses have three hearts and blue blood.",
|
||||
"The Great Wall of China isn't visible from space with the naked eye.",
|
||||
"Bananas are berries, but strawberries aren't.",
|
||||
"Wombat poop is cube-shaped.",
|
||||
"A shrimp's heart is in its head.",
|
||||
"It's impossible to hum while holding your nose.",
|
||||
]
|
||||
return random.choice(facts)
|
||||
|
||||
def get_science_fact() -> str:
|
||||
"""Get a random science fact."""
|
||||
facts = [
|
||||
"The speed of light in a vacuum is approximately 299,792,458 meters per second.",
|
||||
"DNA stands for Deoxyribonucleic Acid.",
|
||||
"The human brain uses about 20% of the body's total energy.",
|
||||
"There are more possible games of chess than atoms in the observable universe.",
|
||||
"A single bolt of lightning contains enough energy to toast 100,000 slices of bread.",
|
||||
]
|
||||
return random.choice(facts)
|
||||
|
||||
return [get_random_fact, get_science_fact]
|
||||
|
||||
|
||||
def create_math_tools():
|
||||
"""Create math calculation tools."""
|
||||
|
||||
def calculate(expression: str) -> str:
|
||||
"""Safely calculate a mathematical expression."""
|
||||
try:
|
||||
# Only allow basic math operations for safety
|
||||
allowed_chars = set("0123456789+-*/.() ")
|
||||
if not all(c in allowed_chars for c in expression):
|
||||
return "Sorry, I can only calculate basic math expressions with +, -, *, /, and parentheses."
|
||||
|
||||
result = eval(expression)
|
||||
return f"{expression} = {result}"
|
||||
except Exception as e:
|
||||
return f"Error calculating '{expression}': {str(e)}"
|
||||
|
||||
def generate_math_problem() -> str:
|
||||
"""Generate a random math problem."""
|
||||
operations = ["+", "-", "*"]
|
||||
a = random.randint(1, 20)
|
||||
b = random.randint(1, 20)
|
||||
op = random.choice(operations)
|
||||
|
||||
if op == "+":
|
||||
answer = a + b
|
||||
elif op == "-":
|
||||
answer = a - b
|
||||
else: # multiplication
|
||||
answer = a * b
|
||||
|
||||
return f"Here's a math problem for you: {a} {op} {b} = ?"
|
||||
|
||||
return [calculate, generate_math_problem]
|
||||
|
||||
|
||||
async def create_specialist_agents():
|
||||
"""Create specialized agents for different domains."""
|
||||
|
||||
# Weather specialist agent
|
||||
weather_agent = OpenAIAgentService(
|
||||
name="Weather Specialist",
|
||||
instructions="""You are a weather specialist. You provide detailed weather information,
|
||||
forecasts, and weather-related advice. Use your tools to get accurate weather data.
|
||||
Be informative and helpful about weather conditions and what they might mean for
|
||||
outdoor activities.""",
|
||||
tools=create_weather_tools(),
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Trivia specialist agent
|
||||
trivia_agent = OpenAIAgentService(
|
||||
name="Trivia Master",
|
||||
instructions="""You are a trivia and facts specialist. You love sharing interesting
|
||||
facts, trivia, and educational content. Use your tools to provide fascinating
|
||||
information and engage users with fun facts. Make learning enjoyable!""",
|
||||
tools=create_trivia_tools(),
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Math specialist agent
|
||||
math_agent = OpenAIAgentService(
|
||||
name="Math Helper",
|
||||
instructions="""You are a mathematics specialist. You help with calculations,
|
||||
math problems, and mathematical concepts. Use your tools to solve problems
|
||||
and generate practice questions. Make math accessible and fun!""",
|
||||
tools=create_math_tools(),
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
return weather_agent, trivia_agent, math_agent
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info("Starting OpenAI Agent bot with handoffs")
|
||||
|
||||
# Set up STT for speech recognition
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY", ""),
|
||||
model="nova-2",
|
||||
)
|
||||
|
||||
# Set up TTS for voice output
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY", ""),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
# Create specialist agents
|
||||
weather_agent, trivia_agent, math_agent = await create_specialist_agents()
|
||||
|
||||
# Create the main triage agent that can hand off to specialists
|
||||
triage_agent = OpenAIAgentService(
|
||||
name="Assistant Coordinator",
|
||||
instructions="""You are a helpful assistant coordinator. Your role is to understand
|
||||
what the user needs and direct them to the right specialist:
|
||||
|
||||
- For weather questions, forecasts, or outdoor activity planning -> Weather Specialist
|
||||
- For interesting facts, trivia, or educational content -> Trivia Master
|
||||
- For calculations, math problems, or mathematical help -> Math Helper
|
||||
|
||||
If the request doesn't clearly fit a specialist, you can handle general conversation
|
||||
yourself. Always be friendly and explain when you're connecting them to a specialist.""",
|
||||
handoffs=[weather_agent.agent, trivia_agent.agent, math_agent.agent], # type: ignore
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Set up conversation context with initial system message
|
||||
messages: List[ChatCompletionMessageParam] = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful assistant coordinator with access to weather information, trivia, and math tools. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = triage_agent.create_context_aggregator(context)
|
||||
|
||||
# Create the processing pipeline with context aggregators
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # Speech to text
|
||||
context_aggregator.user(), # User responses
|
||||
triage_agent, # OpenAI Agent processing
|
||||
tts, # Text to speech
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
# Send an initial greeting when client connects
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected, sending greeting")
|
||||
# Kick off the conversation by adding system message and running LLM
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Please introduce yourself to the user as an AI assistant coordinator who works with specialists for weather, trivia, and math topics.",
|
||||
}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -34,7 +34,7 @@ dependencies = [
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0",
|
||||
"openai>=1.74.0,<=1.99.1",
|
||||
"openai>=1.74.0,<2.0.0",
|
||||
# Pinning numba to resolve package dependencies
|
||||
"numba==0.61.2",
|
||||
"wait_for2>=0.4.1; python_version<'3.12'",
|
||||
@@ -74,7 +74,7 @@ langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-ope
|
||||
livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity>=8.2.3,<10.0.0" ]
|
||||
lmnt = [ "websockets>=13.1,<15.0" ]
|
||||
local = [ "pyaudio~=0.2.14" ]
|
||||
mcp = [ "mcp[cli]~=1.9.4" ]
|
||||
mcp = [ "mcp[cli]>=1.11.0,<2.0.0" ]
|
||||
mem0 = [ "mem0ai~=0.1.94" ]
|
||||
mistral = []
|
||||
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
|
||||
@@ -83,7 +83,8 @@ nim = []
|
||||
neuphonic = [ "websockets>=13.1,<15.0" ]
|
||||
noisereduce = [ "noisereduce~=3.0.3" ]
|
||||
openai = [ "websockets>=13.1,<15.0" ]
|
||||
openpipe = [ "openpipe~=4.50.0" ]
|
||||
openai-agent = [ "openai-agents~=0.3.0" ]
|
||||
# openpipe = [ "openpipe~=4.50.0" ] # Temporarily disabled due to openai version conflict
|
||||
openrouter = []
|
||||
perplexity = []
|
||||
playht = [ "websockets>=13.1,<15.0" ]
|
||||
|
||||
209
src/pipecat/services/openai_agent/README.md
Normal file
209
src/pipecat/services/openai_agent/README.md
Normal file
@@ -0,0 +1,209 @@
|
||||
# OpenAI Agents SDK Integration
|
||||
|
||||
This service integrates the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/) with Pipecat, enabling powerful agentic workflows with features like:
|
||||
|
||||
- **Agent loops** with tool calling and response streaming
|
||||
- **Handoffs** between specialized agents
|
||||
- **Guardrails** for input/output validation
|
||||
- **Sessions** with automatic conversation history
|
||||
- **Built-in tracing** and monitoring
|
||||
|
||||
## Installation
|
||||
|
||||
Install the OpenAI Agents SDK dependency:
|
||||
|
||||
```bash
|
||||
pip install "pipecat-ai[openai-agent]"
|
||||
# or
|
||||
uv add "pipecat-ai[openai-agent]"
|
||||
```
|
||||
|
||||
## Basic Usage
|
||||
|
||||
```python
|
||||
from pipecat.services.openai_agent import OpenAIAgentService
|
||||
|
||||
# Create a simple agent
|
||||
agent_service = OpenAIAgentService(
|
||||
name="Assistant",
|
||||
instructions="You are a helpful assistant.",
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Use in a pipeline
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
stt,
|
||||
agent_service,
|
||||
tts,
|
||||
transport.output(),
|
||||
])
|
||||
```
|
||||
|
||||
## Features
|
||||
|
||||
### Tool Integration
|
||||
|
||||
```python
|
||||
def get_weather(location: str) -> str:
|
||||
"""Get weather for a location."""
|
||||
return f"Weather in {location}: sunny, 22°C"
|
||||
|
||||
agent_service = OpenAIAgentService(
|
||||
name="Weather Assistant",
|
||||
instructions="Help users with weather information.",
|
||||
tools=[get_weather],
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
)
|
||||
```
|
||||
|
||||
### Agent Handoffs
|
||||
|
||||
```python
|
||||
# Create specialized agents
|
||||
weather_agent = OpenAIAgentService(
|
||||
name="Weather Specialist",
|
||||
instructions="Provide weather information and forecasts.",
|
||||
tools=[get_weather, get_forecast],
|
||||
)
|
||||
|
||||
trivia_agent = OpenAIAgentService(
|
||||
name="Trivia Master",
|
||||
instructions="Share interesting facts and trivia.",
|
||||
tools=[get_random_fact],
|
||||
)
|
||||
|
||||
# Create coordinator that can hand off to specialists
|
||||
coordinator = OpenAIAgentService(
|
||||
name="Coordinator",
|
||||
instructions="Route users to the right specialist.",
|
||||
handoffs=[weather_agent.agent, trivia_agent.agent],
|
||||
)
|
||||
```
|
||||
|
||||
### Guardrails
|
||||
|
||||
```python
|
||||
from agents import InputGuardrail, GuardrailFunctionOutput
|
||||
|
||||
async def content_filter(ctx, agent, input_data):
|
||||
# Check input for appropriate content
|
||||
if is_inappropriate(input_data):
|
||||
return GuardrailFunctionOutput(
|
||||
tripwire_triggered=True,
|
||||
output_info="Content not allowed"
|
||||
)
|
||||
return GuardrailFunctionOutput(tripwire_triggered=False)
|
||||
|
||||
agent_service = OpenAIAgentService(
|
||||
name="Safe Assistant",
|
||||
instructions="You are a helpful and safe assistant.",
|
||||
input_guardrails=[InputGuardrail(guardrail_function=content_filter)],
|
||||
)
|
||||
```
|
||||
|
||||
### Session Management
|
||||
|
||||
```python
|
||||
agent_service = OpenAIAgentService(
|
||||
name="Personal Assistant",
|
||||
instructions="Remember user preferences and context.",
|
||||
session_config={
|
||||
"user_id": "user_123",
|
||||
"memory_enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
# Update session context dynamically
|
||||
agent_service.update_session_context({
|
||||
"user_preferences": {"language": "en", "style": "formal"}
|
||||
})
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
### Basic Parameters
|
||||
|
||||
- `name`: Agent identifier for handoffs and tracing
|
||||
- `instructions`: System prompt defining agent behavior
|
||||
- `api_key`: OpenAI API key (or use `OPENAI_API_KEY` env var)
|
||||
- `streaming`: Enable real-time token streaming (default: True)
|
||||
|
||||
### Advanced Configuration
|
||||
|
||||
- `tools`: List of callable functions for the agent to use
|
||||
- `handoffs`: List of other agents this agent can transfer to
|
||||
- `input_guardrails`: Input validation and filtering
|
||||
- `output_guardrails`: Output validation and filtering
|
||||
- `model_config`: Model settings (model, temperature, etc.)
|
||||
- `session_config`: Session and memory configuration
|
||||
|
||||
### Model Configuration
|
||||
|
||||
```python
|
||||
agent_service = OpenAIAgentService(
|
||||
name="Precise Assistant",
|
||||
instructions="Provide accurate, concise responses.",
|
||||
model_config={
|
||||
"model": "gpt-4o",
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 150,
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
See the foundational examples:
|
||||
|
||||
- [`45-openai-agent-basic.py`](../examples/foundational/45-openai-agent-basic.py) - Basic agent with tools
|
||||
- [`46-openai-agent-handoffs.py`](../examples/foundational/46-openai-agent-handoffs.py) - Multi-agent system with handoffs
|
||||
|
||||
## Methods
|
||||
|
||||
### Core Methods
|
||||
|
||||
- `update_agent_config()` - Update instructions and model settings
|
||||
- `add_tool()` - Add new tools dynamically
|
||||
- `add_handoff_agent()` - Add handoff destinations
|
||||
- `get_session_context()` - Get current session state
|
||||
- `update_session_context()` - Update session variables
|
||||
|
||||
### Lifecycle Methods
|
||||
|
||||
Inherited from `AIService`:
|
||||
- `start()` - Initialize the agent
|
||||
- `stop()` - Clean up resources
|
||||
- `cancel()` - Cancel ongoing operations
|
||||
|
||||
## Integration with Pipecat
|
||||
|
||||
The service processes `TextFrame` inputs and generates:
|
||||
- `LLMFullResponseStartFrame` - Response beginning
|
||||
- `LLMTextFrame` - Streaming text tokens (if streaming enabled)
|
||||
- `LLMFullResponseEndFrame` - Response completion
|
||||
|
||||
This integrates seamlessly with Pipecat's conversation pipeline and context aggregators.
|
||||
|
||||
## Error Handling
|
||||
|
||||
The service includes robust error handling for:
|
||||
- Missing API keys or SDK installation
|
||||
- Agent processing failures
|
||||
- Network connectivity issues
|
||||
- Malformed tool responses
|
||||
|
||||
Errors are emitted as `ErrorFrame` objects in the pipeline.
|
||||
|
||||
## Requirements
|
||||
|
||||
- OpenAI API key
|
||||
- `openai-agents` package
|
||||
- Python 3.10+
|
||||
|
||||
## Limitations
|
||||
|
||||
- Currently supports OpenAI models only (via Agents SDK)
|
||||
- Handoffs work within individual requests (no cross-request state)
|
||||
- Real-time voice features require additional setup
|
||||
11
src/pipecat/services/openai_agent/__init__.py
Normal file
11
src/pipecat/services/openai_agent/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Agents SDK service for Pipecat integration."""
|
||||
|
||||
from .agent_service import OpenAIAgentService
|
||||
|
||||
__all__ = ["OpenAIAgentService"]
|
||||
567
src/pipecat/services/openai_agent/agent_service.py
Normal file
567
src/pipecat/services/openai_agent/agent_service.py
Normal file
@@ -0,0 +1,567 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Agents SDK integration service.
|
||||
|
||||
Provides integration with the OpenAI Agents SDK for building AI applications
|
||||
within Pipecat pipelines. This service allows leveraging agent loops, handoffs,
|
||||
guardrails, sessions, and tools from the OpenAI Agents SDK.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Protocol,
|
||||
Sequence,
|
||||
Union,
|
||||
override,
|
||||
runtime_checkable,
|
||||
)
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from agents import Agent, InputGuardrail, OutputGuardrail, Runner, Tool
|
||||
from agents.result import RunResult, RunResultStreaming
|
||||
from agents.stream_events import StreamEvent
|
||||
except ImportError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use OpenAI Agents SDK, you need to `pip install openai-agents`. "
|
||||
"Also, set `OPENAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartFrame,
|
||||
TextFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserAggregatorParams,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_service import AIService
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class ToolLike(Protocol):
|
||||
"""Protocol for tool-like objects."""
|
||||
|
||||
def __call__(self, *args: Any, **kwargs: Any) -> Any:
|
||||
"""Tool call interface."""
|
||||
...
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AgentLike(Protocol):
|
||||
"""Protocol for agent-like objects."""
|
||||
|
||||
name: str
|
||||
|
||||
def __call__(self, *args: Any, **kwargs: Any) -> Any:
|
||||
"""Agent call interface."""
|
||||
...
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenAIAgentContextAggregatorPair:
|
||||
"""Pair of OpenAI Agent context aggregators for user and assistant messages.
|
||||
|
||||
Parameters:
|
||||
_user: User context aggregator for processing user messages.
|
||||
_assistant: Assistant context aggregator for processing assistant messages.
|
||||
"""
|
||||
|
||||
_user: "OpenAIAgentUserContextAggregator"
|
||||
_assistant: "OpenAIAgentAssistantContextAggregator"
|
||||
|
||||
def user(self) -> "OpenAIAgentUserContextAggregator":
|
||||
"""Get the user context aggregator.
|
||||
|
||||
Returns:
|
||||
The user context aggregator instance.
|
||||
"""
|
||||
return self._user
|
||||
|
||||
def assistant(self) -> "OpenAIAgentAssistantContextAggregator":
|
||||
"""Get the assistant context aggregator.
|
||||
|
||||
Returns:
|
||||
The assistant context aggregator instance.
|
||||
"""
|
||||
return self._assistant
|
||||
|
||||
|
||||
class OpenAIAgentService(AIService):
|
||||
"""OpenAI Agents SDK service for Pipecat.
|
||||
|
||||
Integrates the OpenAI Agents SDK with Pipecat's pipeline architecture,
|
||||
enabling advanced agentic workflows with features like handoffs, guardrails,
|
||||
sessions, and tools within real-time conversational AI applications.
|
||||
|
||||
The service processes text input frames and generates streaming responses
|
||||
using the agent's configured capabilities.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
agent: Optional[Agent] = None,
|
||||
name: str = "Assistant",
|
||||
instructions: Union[str, Sequence[str]] = "You are a helpful assistant.",
|
||||
handoffs: Optional[Sequence[AgentLike]] = None,
|
||||
tools: Optional[Sequence[ToolLike]] = None,
|
||||
input_guardrails: Optional[Sequence[InputGuardrail]] = None,
|
||||
output_guardrails: Optional[Sequence[OutputGuardrail]] = None,
|
||||
model_config: Optional[Dict[str, Any]] = None,
|
||||
session_config: Optional[Dict[str, Any]] = None,
|
||||
api_key: Optional[str] = None,
|
||||
streaming: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the OpenAI Agent service.
|
||||
|
||||
Args:
|
||||
agent: Pre-configured Agent instance. If provided, other agent configuration
|
||||
parameters will be ignored.
|
||||
name: Name of the agent for identification and handoffs.
|
||||
instructions: System instructions that define the agent's behavior.
|
||||
handoffs: List of other agents this agent can hand off to.
|
||||
tools: List of callable functions the agent can use as tools.
|
||||
input_guardrails: List of input validation guardrails.
|
||||
output_guardrails: List of output validation guardrails.
|
||||
model_config: Configuration for the underlying language model.
|
||||
session_config: Configuration for session management.
|
||||
api_key: OpenAI API key. If not provided, will use OPENAI_API_KEY env var.
|
||||
streaming: Whether to use streaming responses for real-time output.
|
||||
**kwargs: Additional arguments passed to the parent AIService.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Set up API key
|
||||
if api_key:
|
||||
os.environ["OPENAI_API_KEY"] = api_key
|
||||
elif not os.getenv("OPENAI_API_KEY"):
|
||||
logger.warning("No OpenAI API key provided. Set OPENAI_API_KEY environment variable.")
|
||||
|
||||
# Create or use existing agent
|
||||
if agent:
|
||||
self._agent = agent
|
||||
else:
|
||||
# Convert sequences to lists and handle string instructions
|
||||
agent_handoffs: List[Any] = list(handoffs) if handoffs else []
|
||||
agent_tools: List[Any] = list(tools) if tools else []
|
||||
agent_input_guardrails: List[Any] = list(input_guardrails) if input_guardrails else []
|
||||
agent_output_guardrails: List[Any] = (
|
||||
list(output_guardrails) if output_guardrails else []
|
||||
)
|
||||
|
||||
# Handle instructions - convert sequence to string if needed
|
||||
if isinstance(instructions, str):
|
||||
agent_instructions = instructions
|
||||
else:
|
||||
agent_instructions = " ".join(str(instr) for instr in instructions)
|
||||
|
||||
self._agent = Agent(
|
||||
name=name,
|
||||
instructions=agent_instructions,
|
||||
handoffs=agent_handoffs,
|
||||
tools=agent_tools,
|
||||
input_guardrails=agent_input_guardrails,
|
||||
output_guardrails=agent_output_guardrails,
|
||||
model=model_config.get("model", "gpt-4o") if model_config else "gpt-4o",
|
||||
)
|
||||
|
||||
self._streaming = streaming
|
||||
self._session_config = session_config or {}
|
||||
self._current_session = None
|
||||
self._accumulated_text = ""
|
||||
|
||||
# Set model name for metrics
|
||||
if model_config and "model" in model_config:
|
||||
self.set_model_name(model_config["model"])
|
||||
else:
|
||||
self.set_model_name("gpt-4o") # Default model
|
||||
|
||||
logger.info(f"Initialized OpenAI Agent service: {self._agent.name}")
|
||||
|
||||
@property
|
||||
def agent(self) -> Agent:
|
||||
"""Get the underlying OpenAI Agent.
|
||||
|
||||
Returns:
|
||||
The configured Agent instance.
|
||||
"""
|
||||
return self._agent
|
||||
|
||||
def create_context_aggregator(
|
||||
self,
|
||||
context: OpenAILLMContext,
|
||||
*,
|
||||
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
|
||||
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
|
||||
) -> OpenAIAgentContextAggregatorPair:
|
||||
"""Create OpenAI-specific context aggregators for agent interactions.
|
||||
|
||||
Creates a pair of context aggregators optimized for OpenAI Agent interactions,
|
||||
including support for function calls, tool usage, and conversation management.
|
||||
|
||||
Args:
|
||||
context: The LLM context to create aggregators for.
|
||||
user_params: Parameters for user message aggregation.
|
||||
assistant_params: Parameters for assistant message aggregation.
|
||||
|
||||
Returns:
|
||||
OpenAIAgentContextAggregatorPair: A pair of context aggregators, one for
|
||||
the user and one for the assistant, encapsulated in an
|
||||
OpenAIAgentContextAggregatorPair.
|
||||
"""
|
||||
user = OpenAIAgentUserContextAggregator(context, params=user_params)
|
||||
assistant = OpenAIAgentAssistantContextAggregator(context, params=assistant_params)
|
||||
return OpenAIAgentContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
|
||||
def update_agent_config(
|
||||
self,
|
||||
*,
|
||||
instructions: Optional[str] = None,
|
||||
model_config: Optional[Dict[str, Any]] = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Update agent configuration dynamically.
|
||||
|
||||
Args:
|
||||
instructions: New system instructions for the agent.
|
||||
model_config: Updated model configuration.
|
||||
**kwargs: Additional agent configuration parameters.
|
||||
"""
|
||||
if instructions:
|
||||
self._agent.instructions = instructions
|
||||
logger.info(f"Updated agent instructions for {self._agent.name}")
|
||||
|
||||
if model_config:
|
||||
# Note: OpenAI Agents SDK handles model configuration during agent creation
|
||||
# We can't update model_config after agent is created, but we can update our model name
|
||||
if "model" in model_config:
|
||||
self.set_model_name(model_config["model"])
|
||||
logger.info(f"Updated model config for {self._agent.name}")
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the OpenAI Agent service.
|
||||
|
||||
Initializes the agent session and prepares for processing.
|
||||
|
||||
Args:
|
||||
frame: The start frame containing initialization parameters.
|
||||
"""
|
||||
logger.info(f"Starting OpenAI Agent service: {self._agent.name}")
|
||||
await super().start(frame)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the OpenAI Agent service.
|
||||
|
||||
Cleans up resources and ends the current session.
|
||||
|
||||
Args:
|
||||
frame: The end frame.
|
||||
"""
|
||||
logger.info(f"Stopping OpenAI Agent service: {self._agent.name}")
|
||||
await super().stop(frame)
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the OpenAI Agent service.
|
||||
|
||||
Cancels any ongoing operations.
|
||||
|
||||
Args:
|
||||
frame: The cancel frame.
|
||||
"""
|
||||
logger.info(f"Cancelling OpenAI Agent service: {self._agent.name}")
|
||||
await super().cancel(frame)
|
||||
|
||||
@override
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
|
||||
"""Process frames and handle agent interactions.
|
||||
|
||||
Processes OpenAILLMContextFrame and TextFrame by running them through the OpenAI Agent
|
||||
and streams the results back as LLM frames.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction of frame processing.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
# Process context frame through the agent
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
# Extract the latest user message from the context
|
||||
messages = frame.context.get_messages()
|
||||
if messages:
|
||||
# Get the last user message
|
||||
for message in reversed(messages):
|
||||
if message.get("role") == "user":
|
||||
content = message.get("content", "")
|
||||
if isinstance(content, list):
|
||||
# Extract text from content array
|
||||
text_parts = []
|
||||
for part in content:
|
||||
if isinstance(part, dict) and part.get("type") == "text":
|
||||
text_parts.append(part.get("text", ""))
|
||||
user_input = " ".join(text_parts)
|
||||
else:
|
||||
user_input = str(content)
|
||||
|
||||
if user_input.strip():
|
||||
await self._process_agent_request(user_input)
|
||||
break
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing agent context: {e}")
|
||||
await self.push_error(ErrorFrame(f"Agent processing error: {e}"))
|
||||
elif isinstance(frame, TextFrame):
|
||||
# Process text input through the agent directly (for backwards compatibility)
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self._process_agent_request(frame.text)
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing agent request: {e}")
|
||||
await self.push_error(ErrorFrame(f"Agent processing error: {e}"))
|
||||
else:
|
||||
# For frames we don't handle, pass them through with direction
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _process_agent_request(self, input_text: str):
|
||||
"""Process an agent request and stream the results.
|
||||
|
||||
Args:
|
||||
input_text: The user input text to process.
|
||||
"""
|
||||
logger.debug(f"Processing agent request: {input_text}")
|
||||
|
||||
if self._streaming:
|
||||
await self._process_streaming_response(input_text)
|
||||
else:
|
||||
await self._process_non_streaming_response(input_text)
|
||||
|
||||
async def _process_streaming_response(self, input_text: str):
|
||||
"""Process a streaming agent response.
|
||||
|
||||
Args:
|
||||
input_text: The user input text to process.
|
||||
"""
|
||||
try:
|
||||
# Run the agent with streaming
|
||||
result: RunResultStreaming = Runner.run_streamed(
|
||||
self._agent, input_text, context=self._session_config
|
||||
)
|
||||
|
||||
has_streaming_deltas = False
|
||||
|
||||
# Process the stream events
|
||||
async for event in result.stream_events():
|
||||
if event.type == "raw_response_event":
|
||||
# Handle token-by-token streaming
|
||||
# Only check for delta on events that are known to have it
|
||||
if hasattr(event.data, "delta") and getattr(event.data, "delta", None):
|
||||
delta_text = getattr(event.data, "delta", "")
|
||||
if delta_text:
|
||||
has_streaming_deltas = True
|
||||
self._accumulated_text += delta_text
|
||||
await self.push_frame(LLMTextFrame(text=delta_text))
|
||||
|
||||
elif event.type == "run_item_stream_event":
|
||||
# Handle completed items
|
||||
if event.item.type == "message_output_item":
|
||||
# Only process complete message if we didn't get streaming deltas
|
||||
if not has_streaming_deltas:
|
||||
message_text = self._extract_message_text(event.item)
|
||||
logger.debug(
|
||||
f"Processing complete message (no deltas): {message_text[:50]}..."
|
||||
if len(message_text) > 50
|
||||
else f"Processing complete message: {message_text}"
|
||||
)
|
||||
if message_text:
|
||||
await self.push_frame(LLMTextFrame(text=message_text))
|
||||
|
||||
elif event.item.type == "tool_call_item":
|
||||
# Use getattr for safe attribute access
|
||||
tool_name = getattr(event.item, "tool_name", "unknown")
|
||||
logger.debug(f"Tool called: {tool_name}")
|
||||
|
||||
elif event.item.type == "tool_call_output_item":
|
||||
output = getattr(event.item, "output", "no output")
|
||||
logger.debug(f"Tool output: {output}")
|
||||
|
||||
elif event.type == "agent_updated_stream_event":
|
||||
logger.debug(f"Agent updated: {event.new_agent.name}")
|
||||
|
||||
# Reset accumulated text for next request
|
||||
self._accumulated_text = ""
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in streaming response: {e}")
|
||||
raise
|
||||
|
||||
async def _process_non_streaming_response(self, input_text: str):
|
||||
"""Process a non-streaming agent response.
|
||||
|
||||
Args:
|
||||
input_text: The user input text to process.
|
||||
"""
|
||||
try:
|
||||
# Run the agent without streaming
|
||||
result: RunResult = await Runner.run(
|
||||
self._agent, input_text, context=self._session_config
|
||||
)
|
||||
|
||||
# Send the final output
|
||||
if result.final_output:
|
||||
await self.push_frame(LLMTextFrame(text=result.final_output))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in non-streaming response: {e}")
|
||||
raise
|
||||
|
||||
def _extract_message_text(self, item) -> str:
|
||||
"""Extract text from a message output item.
|
||||
|
||||
Args:
|
||||
item: The message output item from the agent.
|
||||
|
||||
Returns:
|
||||
The extracted text content.
|
||||
"""
|
||||
try:
|
||||
# Handle OpenAI Agents SDK MessageOutputItem format
|
||||
if hasattr(item, "raw_item") and hasattr(item.raw_item, "content"):
|
||||
content = item.raw_item.content
|
||||
if isinstance(content, list):
|
||||
text_parts = []
|
||||
for content_part in content:
|
||||
if hasattr(content_part, "text"):
|
||||
text_parts.append(content_part.text)
|
||||
elif (
|
||||
isinstance(content_part, dict)
|
||||
and content_part.get("type") == "output_text"
|
||||
):
|
||||
text_parts.append(content_part.get("text", ""))
|
||||
elif isinstance(content_part, dict) and content_part.get("type") == "text":
|
||||
text_parts.append(content_part.get("text", ""))
|
||||
return "".join(text_parts)
|
||||
elif isinstance(content, str):
|
||||
return content
|
||||
|
||||
# Handle direct content attribute
|
||||
elif hasattr(item, "content"):
|
||||
if isinstance(item.content, str):
|
||||
return item.content
|
||||
elif isinstance(item.content, list):
|
||||
# Extract text from content array
|
||||
text_parts = []
|
||||
for content_part in item.content:
|
||||
if isinstance(content_part, dict) and content_part.get("type") == "text":
|
||||
text_parts.append(content_part.get("text", ""))
|
||||
elif isinstance(content_part, str):
|
||||
text_parts.append(content_part)
|
||||
return "".join(text_parts)
|
||||
|
||||
# If no text content found, return empty string instead of str(item)
|
||||
logger.debug(f"No extractable text content found in item: {type(item)}")
|
||||
return ""
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not extract text from message item: {e}")
|
||||
return ""
|
||||
|
||||
async def add_tool(self, tool_function: ToolLike):
|
||||
"""Add a tool function to the agent.
|
||||
|
||||
Args:
|
||||
tool_function: A callable function or Tool object to add as a tool.
|
||||
"""
|
||||
if hasattr(self._agent, "tools"):
|
||||
# Cast to Any to handle the type variance issue
|
||||
tools_list: List[Any] = self._agent.tools
|
||||
tools_list.append(tool_function)
|
||||
tool_name = getattr(
|
||||
tool_function, "__name__", getattr(tool_function, "name", "unknown")
|
||||
)
|
||||
logger.info(f"Added tool {tool_name} to agent {self._agent.name}")
|
||||
|
||||
async def add_handoff_agent(self, agent: AgentLike):
|
||||
"""Add a handoff agent.
|
||||
|
||||
Args:
|
||||
agent: Another Agent instance or handoff object that this agent can hand off to.
|
||||
"""
|
||||
if hasattr(self._agent, "handoffs"):
|
||||
# Cast to Any to handle the type variance issue
|
||||
handoffs_list: List[Any] = self._agent.handoffs
|
||||
handoffs_list.append(agent)
|
||||
agent_name = getattr(agent, "name", "unknown")
|
||||
logger.info(f"Added handoff agent {agent_name} to agent {self._agent.name}")
|
||||
|
||||
def get_session_context(self) -> Dict[str, Any]:
|
||||
"""Get the current session context.
|
||||
|
||||
Returns:
|
||||
Dictionary containing the current session context.
|
||||
"""
|
||||
return self._session_config.copy()
|
||||
|
||||
def update_session_context(self, context: Dict[str, Any]):
|
||||
"""Update the session context.
|
||||
|
||||
Args:
|
||||
context: Dictionary of context updates to apply.
|
||||
"""
|
||||
self._session_config.update(context)
|
||||
logger.debug(f"Updated session context for agent {self._agent.name}")
|
||||
|
||||
|
||||
class OpenAIAgentUserContextAggregator(LLMUserContextAggregator):
|
||||
"""OpenAI Agent-specific user context aggregator.
|
||||
|
||||
Handles aggregation of user messages for OpenAI Agent services.
|
||||
Inherits all functionality from the base LLMUserContextAggregator.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class OpenAIAgentAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
"""OpenAI Agent-specific assistant context aggregator.
|
||||
|
||||
Handles aggregation of assistant messages for OpenAI Agent services,
|
||||
with specialized support for OpenAI's function calling format,
|
||||
tool usage tracking, and agent interaction management.
|
||||
"""
|
||||
|
||||
pass
|
||||
172
test_openai_agent.py
Normal file
172
test_openai_agent.py
Normal file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""Simple test script for OpenAI Agent service."""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Mock the OpenAI API key for testing
|
||||
os.environ["OPENAI_API_KEY"] = "test-key-for-testing"
|
||||
|
||||
from pipecat.frames.frames import TextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.openai_agent import OpenAIAgentService
|
||||
|
||||
|
||||
async def test_basic_functionality():
|
||||
"""Test basic OpenAI Agent service functionality."""
|
||||
print("🧪 Testing OpenAI Agent Service...")
|
||||
|
||||
# Create a simple weather tool for testing
|
||||
def get_weather(location: str) -> str:
|
||||
"""Get weather for a location."""
|
||||
return f"The weather in {location} is sunny and 22°C."
|
||||
|
||||
try:
|
||||
# Create the service
|
||||
print("📋 Creating OpenAI Agent service...")
|
||||
service = OpenAIAgentService(
|
||||
name="Test Assistant",
|
||||
instructions="You are a helpful test assistant.",
|
||||
tools=[get_weather],
|
||||
api_key="test-key",
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
print(f"✅ Service created successfully!")
|
||||
print(f" - Agent name: {service.agent.name}")
|
||||
print(f" - Model name: {service.model_name}")
|
||||
print(f" - Streaming enabled: {service._streaming}")
|
||||
|
||||
# Test basic configuration
|
||||
print("⚙️ Testing configuration updates...")
|
||||
service.update_agent_config(
|
||||
instructions="Updated test instructions",
|
||||
model_config={"model": "gpt-4o", "temperature": 0.5},
|
||||
)
|
||||
|
||||
print(f"✅ Configuration updated!")
|
||||
print(f" - New instructions: {service.agent.instructions}")
|
||||
print(f" - New model: {service.model_name}")
|
||||
|
||||
# Test session context
|
||||
print("💾 Testing session context...")
|
||||
service.update_session_context({"user_id": "test-user", "session": "test-session"})
|
||||
context = service.get_session_context()
|
||||
|
||||
print(f"✅ Session context managed!")
|
||||
print(f" - Context keys: {list(context.keys())}")
|
||||
|
||||
# Test adding tools
|
||||
print("🔧 Testing tool management...")
|
||||
|
||||
def get_time() -> str:
|
||||
"""Get current time."""
|
||||
return "The current time is 3:00 PM."
|
||||
|
||||
await service.add_tool(get_time)
|
||||
print(f"✅ Tool added successfully!")
|
||||
|
||||
print("\n🎉 All basic functionality tests passed!")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Test failed with error: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def test_frame_processing():
|
||||
"""Test frame processing with mocked responses."""
|
||||
print("\n🔄 Testing frame processing...")
|
||||
|
||||
try:
|
||||
# Mock the Runner to avoid actual API calls
|
||||
with patch("pipecat.services.openai_agent.agent_service.Runner") as mock_runner:
|
||||
# Set up mock responses
|
||||
mock_stream_result = MagicMock()
|
||||
|
||||
# Mock stream events
|
||||
async def mock_stream_events():
|
||||
# Simulate streaming response
|
||||
yield MagicMock(type="raw_response_event", data=MagicMock(delta="Hello "))
|
||||
yield MagicMock(type="raw_response_event", data=MagicMock(delta="from "))
|
||||
yield MagicMock(type="raw_response_event", data=MagicMock(delta="agent!"))
|
||||
|
||||
# Simulate completed message
|
||||
mock_item = MagicMock()
|
||||
mock_item.type = "message_output_item"
|
||||
mock_item.content = "Hello from agent!"
|
||||
yield MagicMock(type="run_item_stream_event", item=mock_item)
|
||||
|
||||
mock_stream_result.stream_events.return_value = mock_stream_events()
|
||||
mock_runner.run_streamed.return_value = mock_stream_result
|
||||
|
||||
# Create service with mocked runner
|
||||
service = OpenAIAgentService(
|
||||
name="Test Assistant",
|
||||
instructions="You are a helpful test assistant.",
|
||||
api_key="test-key",
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Collect output frames
|
||||
output_frames = []
|
||||
|
||||
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
|
||||
output_frames.append(frame)
|
||||
print(f" 📤 Frame: {type(frame).__name__}")
|
||||
if hasattr(frame, "text"):
|
||||
print(f" Text: '{frame.text}'")
|
||||
|
||||
service.push_frame = mock_push_frame
|
||||
|
||||
# Process a text frame
|
||||
print("📝 Processing text frame...")
|
||||
text_frame = TextFrame("Hello, how are you?")
|
||||
await service.process_frame(text_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
# Wait for async processing
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
print(f"✅ Frame processing completed!")
|
||||
print(f" - Generated {len(output_frames)} output frames")
|
||||
|
||||
# Check if we got expected frame types
|
||||
frame_types = [type(frame).__name__ for frame in output_frames]
|
||||
print(f" - Frame types: {frame_types}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Frame processing test failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def main():
|
||||
"""Run all tests."""
|
||||
print("🚀 Starting OpenAI Agent Service Tests\n")
|
||||
|
||||
try:
|
||||
# Run basic functionality tests
|
||||
basic_test = await test_basic_functionality()
|
||||
|
||||
# Run frame processing tests
|
||||
frame_test = await test_frame_processing()
|
||||
|
||||
# Summary
|
||||
print(f"\n📊 Test Results:")
|
||||
print(f" - Basic functionality: {'✅ PASS' if basic_test else '❌ FAIL'}")
|
||||
print(f" - Frame processing: {'✅ PASS' if frame_test else '❌ FAIL'}")
|
||||
|
||||
if basic_test and frame_test:
|
||||
print(f"\n🎉 All tests passed! The OpenAI Agent service is working correctly.")
|
||||
else:
|
||||
print(f"\n⚠️ Some tests failed. Please check the output above.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Test suite failed with error: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
33
test_simple_agent.py
Normal file
33
test_simple_agent.py
Normal file
@@ -0,0 +1,33 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# Test the actual agents package API
|
||||
try:
|
||||
from agents import Agent, run
|
||||
|
||||
# Create a simple agent
|
||||
agent = Agent(
|
||||
name="test-agent",
|
||||
instructions="You are a helpful assistant.",
|
||||
)
|
||||
|
||||
print("✅ Agent created successfully!")
|
||||
print(f"Agent name: {agent.name}")
|
||||
|
||||
# Test a simple conversation
|
||||
async def test_agent():
|
||||
result = await run(agent, "Hello, how are you?")
|
||||
print(f"Agent response: {result}")
|
||||
|
||||
# Run the test
|
||||
asyncio.run(test_agent())
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
286
tests/test_openai_agent_service.py
Normal file
286
tests/test_openai_agent_service.py
Normal file
@@ -0,0 +1,286 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Tests for OpenAI Agent service."""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
import unittest.mock
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Add src to path for testing
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartFrame,
|
||||
TextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
|
||||
|
||||
class MockAgent:
|
||||
"""Mock Agent for testing."""
|
||||
|
||||
def __init__(self, name="Test Agent", instructions="Test instructions"):
|
||||
self.name = name
|
||||
self.instructions = instructions
|
||||
self.tools = []
|
||||
self.handoffs = []
|
||||
|
||||
|
||||
class MockRunResult:
|
||||
"""Mock RunResult for testing."""
|
||||
|
||||
def __init__(self, final_output="Test response"):
|
||||
self.final_output = final_output
|
||||
|
||||
|
||||
class MockStreamEvent:
|
||||
"""Mock StreamEvent for testing."""
|
||||
|
||||
def __init__(self, event_type, data=None, item=None):
|
||||
self.type = event_type
|
||||
self.data = data
|
||||
self.item = item
|
||||
|
||||
|
||||
class MockMessageItem:
|
||||
"""Mock message item for testing."""
|
||||
|
||||
def __init__(self, content="Test content"):
|
||||
self.type = "message_output_item"
|
||||
self.content = content
|
||||
|
||||
|
||||
class MockRunner:
|
||||
"""Mock Runner for testing."""
|
||||
|
||||
@staticmethod
|
||||
async def run(agent, input_text, context=None):
|
||||
return MockRunResult("Mocked response")
|
||||
|
||||
@staticmethod
|
||||
def run_streamed(agent, input_text, context=None):
|
||||
class MockStreamResult:
|
||||
async def stream_events(self):
|
||||
yield MockStreamEvent("raw_response_event", data=MagicMock(delta="Test "))
|
||||
yield MockStreamEvent("raw_response_event", data=MagicMock(delta="response"))
|
||||
yield MockStreamEvent(
|
||||
"run_item_stream_event", item=MockMessageItem("Test response")
|
||||
)
|
||||
|
||||
return MockStreamResult()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_openai_agents():
|
||||
"""Mock the OpenAI Agents SDK imports."""
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"agents": MagicMock(),
|
||||
"agents.stream_events": MagicMock(),
|
||||
"agents.result": MagicMock(),
|
||||
},
|
||||
):
|
||||
# Mock the classes and functions we need
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.return_value = MockAgent()
|
||||
|
||||
mock_runner = MagicMock()
|
||||
mock_runner.run = AsyncMock(return_value=MockRunResult())
|
||||
mock_runner.run_streamed = MagicMock(return_value=MockRunner.run_streamed(None, None))
|
||||
|
||||
with (
|
||||
patch("pipecat.services.openai_agent.agent_service.Agent", mock_agent),
|
||||
patch("pipecat.services.openai_agent.agent_service.Runner", mock_runner),
|
||||
):
|
||||
yield {
|
||||
"Agent": mock_agent,
|
||||
"Runner": mock_runner,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_init(mock_openai_agents):
|
||||
"""Test OpenAI Agent service initialization."""
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent", instructions="Test instructions", api_key="test-key", streaming=True
|
||||
)
|
||||
|
||||
assert service.agent.name == "Test Agent"
|
||||
assert service._streaming is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_process_text_frame_streaming(mock_openai_agents):
|
||||
"""Test processing text frame with streaming enabled."""
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent", instructions="Test instructions", api_key="test-key", streaming=True
|
||||
)
|
||||
|
||||
# Mock the push_frame method to capture output
|
||||
output_frames = []
|
||||
|
||||
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
|
||||
output_frames.append(frame)
|
||||
|
||||
service.push_frame = mock_push_frame
|
||||
|
||||
# Process a text frame
|
||||
text_frame = TextFrame("Hello, agent!")
|
||||
await service.process_frame(text_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
# Wait a bit for async processing
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Check that appropriate frames were generated
|
||||
assert len(output_frames) > 0
|
||||
assert any(isinstance(frame, LLMFullResponseStartFrame) for frame in output_frames)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_process_text_frame_non_streaming(mock_openai_agents):
|
||||
"""Test processing text frame with streaming disabled."""
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent", instructions="Test instructions", api_key="test-key", streaming=False
|
||||
)
|
||||
|
||||
# Mock the push_frame method to capture output
|
||||
output_frames = []
|
||||
|
||||
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
|
||||
output_frames.append(frame)
|
||||
|
||||
service.push_frame = mock_push_frame
|
||||
|
||||
# Process a text frame
|
||||
text_frame = TextFrame("Hello, agent!")
|
||||
await service.process_frame(text_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
# Wait a bit for async processing
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Check that appropriate frames were generated
|
||||
assert len(output_frames) > 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_update_config(mock_openai_agents):
|
||||
"""Test updating agent configuration."""
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent", instructions="Test instructions", api_key="test-key"
|
||||
)
|
||||
|
||||
# Update configuration
|
||||
service.update_agent_config(
|
||||
instructions="Updated instructions", model_config={"model": "gpt-4o", "temperature": 0.7}
|
||||
)
|
||||
|
||||
assert service.agent.instructions == "Updated instructions"
|
||||
assert service.agent.model_config["model"] == "gpt-4o"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_session_context(mock_openai_agents):
|
||||
"""Test session context management."""
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent",
|
||||
instructions="Test instructions",
|
||||
api_key="test-key",
|
||||
session_config={"user_id": "test-user"},
|
||||
)
|
||||
|
||||
# Get initial context
|
||||
context = service.get_session_context()
|
||||
assert context["user_id"] == "test-user"
|
||||
|
||||
# Update context
|
||||
service.update_session_context({"session_id": "test-session"})
|
||||
|
||||
updated_context = service.get_session_context()
|
||||
assert updated_context["user_id"] == "test-user"
|
||||
assert updated_context["session_id"] == "test-session"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_add_tools(mock_openai_agents):
|
||||
"""Test adding tools to the agent."""
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent", instructions="Test instructions", api_key="test-key"
|
||||
)
|
||||
|
||||
# Define a test tool
|
||||
def test_tool():
|
||||
return "test result"
|
||||
|
||||
# Add the tool
|
||||
await service.add_tool(test_tool)
|
||||
|
||||
# Check if tool was added (this depends on the mock implementation)
|
||||
assert hasattr(service.agent, "tools")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_agent_service_lifecycle(mock_openai_agents):
|
||||
"""Test service lifecycle methods."""
|
||||
from pipecat.frames.frames import CancelFrame, EndFrame, StartFrame
|
||||
from pipecat.services.openai_agent.agent_service import OpenAIAgentService
|
||||
|
||||
service = OpenAIAgentService(
|
||||
name="Test Agent", instructions="Test instructions", api_key="test-key"
|
||||
)
|
||||
|
||||
# Test start
|
||||
start_frame = StartFrame()
|
||||
await service.start(start_frame)
|
||||
|
||||
# Test cancel
|
||||
cancel_frame = CancelFrame()
|
||||
await service.cancel(cancel_frame)
|
||||
|
||||
# Test stop
|
||||
end_frame = EndFrame()
|
||||
await service.stop(end_frame)
|
||||
|
||||
|
||||
def test_openai_agent_service_import_error():
|
||||
"""Test that import error is handled gracefully."""
|
||||
# Mock the import to fail
|
||||
with patch.dict("sys.modules", {"agents": None}):
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
# This should trigger the import error
|
||||
import importlib
|
||||
|
||||
import pipecat.services.openai_agent.agent_service
|
||||
|
||||
importlib.reload(pipecat.services.openai_agent.agent_service)
|
||||
|
||||
assert "Missing module" in str(exc_info.value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
Reference in New Issue
Block a user