From 946f0f4e77f89c00654756ed95f7536086ca2331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 9 Feb 2026 16:19:11 -0800 Subject: [PATCH] CLAUDE.md: add pipeline task and pipeline runner --- CLAUDE.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index de5efec0f..cf4c5baec 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,7 +42,7 @@ uv lock && uv sync All data flows as **Frame** objects through a pipeline of **FrameProcessors**: ``` -Transport Input → Pipeline Source → [Processor1] → [Processor2] → ... → Pipeline Sink → Transport Output +[Processor1] → [Processor2] → ... → [ProcessorN] ``` **Key components:** @@ -55,7 +55,11 @@ Transport Input → Pipeline Source → [Processor1] → [Processor2] → ... - **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel. -- **Transports** (`src/pipecat/transports/`): External I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`. +- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`. + +- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc. + +- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`. - **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`. @@ -78,7 +82,7 @@ Transport Input → Pipeline Source → [Processor1] → [Processor2] → ... - **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`. -- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to exectue fast. +- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast. - **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation. @@ -147,7 +151,7 @@ When adding a new service: ## Testing -Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames. +Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames. ## Pull Requests