# AGENTS.md This file provides guidance to AI coding agents when working with code in this repository. ## Project Overview Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture. ## Common Commands ```bash # Setup development environment uv sync --group dev --all-extras --no-extra gstreamer --no-extra local # Install pre-commit hooks uv run pre-commit install # Run all tests uv run pytest # Run a single test file uv run pytest tests/test_name.py # Run a specific test uv run pytest tests/test_name.py::test_function_name # Preview changelog uv run towncrier build --draft --version Unreleased # Lint and format check uv run ruff check uv run ruff format --check # Update dependencies (after editing pyproject.toml) uv lock && uv sync ``` ## Architecture ### Frame-Based Pipeline Processing All data flows as **Frame** objects through a pipeline of **FrameProcessors**: ``` [Processor1] → [Processor2] → ... → [ProcessorN] ``` **Key components:** - **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors). - **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream. - **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together. - **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel. - **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`. - **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc. - **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`. - **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`. - **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law). - **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels. - **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks. ### Important Patterns - **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input - **Turn Management**: Turn management is done through `LLMUserAggregator` and `LLMAssistantAggregator`, created with `LLMContextAggregatorPair` - **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively. - **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers. - **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`. - **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast. - **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation. - **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service). ### Key Directories | Directory | Purpose | | -------------------------- | -------------------------------------------------- | | `src/pipecat/frames/` | Frame definitions (100+ types) | | `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio | | `src/pipecat/pipeline/` | Pipeline orchestration | | `src/pipecat/services/` | AI service integrations (60+ providers) | | `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) | | `src/pipecat/serializers/` | Frame serialization for WebSocket protocols | | `src/pipecat/observers/` | Pipeline observers for monitoring frame flow | | `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF | | `src/pipecat/turns/` | User turn management | ## Code Style - **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section. - **Deprecations**: Use the `.. deprecated:: ` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions. - **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting. - **Type hints**: Required for complex async code. - **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically: - `@dataclass`: Frame types, context aggregator pairs, internal data containers - `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params ### Docstring Example ```python class MyService(LLMService): """Description of what the service does. More detailed description. Event handlers available: - on_connected: Called when we are connected Example:: @service.event_handler("on_connected") async def on_connected(service, frame): ... """ def __init__(self, param1: str, **kwargs): """Initialize the service. Args: param1: Description of param1. **kwargs: Additional arguments passed to parent. """ super().__init__(**kwargs) # Pydantic params class with a deprecated field class MyParams(BaseModel): """Configuration parameters for MyService. Parameters: new_setting: Replacement for ``old_setting``. old_setting: Legacy setting, no longer used. .. deprecated:: 1.2.0 Use ``new_setting`` instead. Will be removed in 2.0.0. """ new_setting: str = "default" old_setting: str | None = None ``` ## Service Implementation When adding a new service: 1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.) 2. Implement required abstract methods 3. Handle necessary frames 4. By default, all frames should be pushed in the direction they came 5. Push `ErrorFrame` on failures 6. Add metrics tracking via `MetricsData` if relevant 7. Follow the pattern of existing services in `src/pipecat/services/` ## Testing Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.