diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..4413bb2a4 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,174 @@ +# AGENTS.md + +This file provides guidance to AI coding agents when working with code in this repository. + +## Project Overview + +Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture. + +## Common Commands + +```bash +# Setup development environment +uv sync --group dev --all-extras --no-extra gstreamer --no-extra local + +# Install pre-commit hooks +uv run pre-commit install + +# Run all tests +uv run pytest + +# Run a single test file +uv run pytest tests/test_name.py + +# Run a specific test +uv run pytest tests/test_name.py::test_function_name + +# Preview changelog +uv run towncrier build --draft --version Unreleased + +# Lint and format check +uv run ruff check +uv run ruff format --check + +# Update dependencies (after editing pyproject.toml) +uv lock && uv sync +``` + +## Architecture + +### Frame-Based Pipeline Processing + +All data flows as **Frame** objects through a pipeline of **FrameProcessors**: + +``` +[Processor1] → [Processor2] → ... → [ProcessorN] +``` + +**Key components:** + +- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors). + +- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream. + +- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together. + +- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel. + +- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`. + +- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc. + +- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`. + +- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`. + +- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law). + +- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels. + +- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks. + +### Important Patterns + +- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input + +- **Turn Management**: Turn management is done through `LLMUserAggregator` and + `LLMAssistantAggregator`, created with `LLMContextAggregatorPair` + +- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively. + +- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers. + +- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`. + +- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast. + +- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation. + +- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service). + +### Key Directories + +| Directory | Purpose | +| -------------------------- | -------------------------------------------------- | +| `src/pipecat/frames/` | Frame definitions (100+ types) | +| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio | +| `src/pipecat/pipeline/` | Pipeline orchestration | +| `src/pipecat/services/` | AI service integrations (60+ providers) | +| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) | +| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols | +| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow | +| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF | +| `src/pipecat/turns/` | User turn management | + +## Code Style + +- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section. +- **Deprecations**: Use the `.. deprecated:: ` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions. +- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting. +- **Type hints**: Required for complex async code. +- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically: + - `@dataclass`: Frame types, context aggregator pairs, internal data containers + - `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params + +### Docstring Example + +```python +class MyService(LLMService): + """Description of what the service does. + + More detailed description. + + Event handlers available: + + - on_connected: Called when we are connected + + Example:: + + @service.event_handler("on_connected") + async def on_connected(service, frame): + ... + """ + + def __init__(self, param1: str, **kwargs): + """Initialize the service. + + Args: + param1: Description of param1. + **kwargs: Additional arguments passed to parent. + """ + super().__init__(**kwargs) + + +# Pydantic params class with a deprecated field +class MyParams(BaseModel): + """Configuration parameters for MyService. + + Parameters: + new_setting: Replacement for ``old_setting``. + old_setting: Legacy setting, no longer used. + + .. deprecated:: 1.2.0 + Use ``new_setting`` instead. Will be removed in 2.0.0. + """ + + new_setting: str = "default" + old_setting: str | None = None +``` + +## Service Implementation + +When adding a new service: + +1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.) +2. Implement required abstract methods +3. Handle necessary frames +4. By default, all frames should be pushed in the direction they came +5. Push `ErrorFrame` on failures +6. Add metrics tracking via `MetricsData` if relevant +7. Follow the pattern of existing services in `src/pipecat/services/` + +## Testing + +Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames. diff --git a/CLAUDE.md b/CLAUDE.md index 977462d6f..43c994c2d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,174 +1 @@ -# CLAUDE.md - -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. - -## Project Overview - -Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture. - -## Common Commands - -```bash -# Setup development environment -uv sync --group dev --all-extras --no-extra gstreamer --no-extra local - -# Install pre-commit hooks -uv run pre-commit install - -# Run all tests -uv run pytest - -# Run a single test file -uv run pytest tests/test_name.py - -# Run a specific test -uv run pytest tests/test_name.py::test_function_name - -# Preview changelog -uv run towncrier build --draft --version Unreleased - -# Lint and format check -uv run ruff check -uv run ruff format --check - -# Update dependencies (after editing pyproject.toml) -uv lock && uv sync -``` - -## Architecture - -### Frame-Based Pipeline Processing - -All data flows as **Frame** objects through a pipeline of **FrameProcessors**: - -``` -[Processor1] → [Processor2] → ... → [ProcessorN] -``` - -**Key components:** - -- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors). - -- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream. - -- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together. - -- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel. - -- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`. - -- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc. - -- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`. - -- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`. - -- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law). - -- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels. - -- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks. - -### Important Patterns - -- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input - -- **Turn Management**: Turn management is done through `LLMUserAggregator` and - `LLMAssistantAggregator`, created with `LLMContextAggregatorPair` - -- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively. - -- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers. - -- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`. - -- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast. - -- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation. - -- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service). - -### Key Directories - -| Directory | Purpose | -| -------------------------- | -------------------------------------------------- | -| `src/pipecat/frames/` | Frame definitions (100+ types) | -| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio | -| `src/pipecat/pipeline/` | Pipeline orchestration | -| `src/pipecat/services/` | AI service integrations (60+ providers) | -| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) | -| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols | -| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow | -| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF | -| `src/pipecat/turns/` | User turn management | - -## Code Style - -- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section. -- **Deprecations**: Use the `.. deprecated:: ` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions. -- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting. -- **Type hints**: Required for complex async code. -- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically: - - `@dataclass`: Frame types, context aggregator pairs, internal data containers - - `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params - -### Docstring Example - -```python -class MyService(LLMService): - """Description of what the service does. - - More detailed description. - - Event handlers available: - - - on_connected: Called when we are connected - - Example:: - - @service.event_handler("on_connected") - async def on_connected(service, frame): - ... - """ - - def __init__(self, param1: str, **kwargs): - """Initialize the service. - - Args: - param1: Description of param1. - **kwargs: Additional arguments passed to parent. - """ - super().__init__(**kwargs) - - -# Pydantic params class with a deprecated field -class MyParams(BaseModel): - """Configuration parameters for MyService. - - Parameters: - new_setting: Replacement for ``old_setting``. - old_setting: Legacy setting, no longer used. - - .. deprecated:: 1.2.0 - Use ``new_setting`` instead. Will be removed in 2.0.0. - """ - - new_setting: str = "default" - old_setting: str | None = None -``` - -## Service Implementation - -When adding a new service: - -1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.) -2. Implement required abstract methods -3. Handle necessary frames -4. By default, all frames should be pushed in the direction they came -5. Push `ErrorFrame` on failures -6. Add metrics tracking via `MetricsData` if relevant -7. Follow the pattern of existing services in `src/pipecat/services/` - -## Testing - -Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames. +@AGENTS.md