From 4835617b16a1f4a5fbbb2f70562454c75a568573 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 14 Nov 2025 15:20:01 -0800 Subject: [PATCH] ConsumerProcessor: queue frames internally instead of pushing them --- CHANGELOG.md | 4 ++++ src/pipecat/processors/consumer_processor.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4fce0917..04d789370 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `ConsumerProcessor` now queues frames from the producer internally instead of + pushing them directly. This allows us to subclass consumer processors and + manipulate frames before they are pushed. + - `BaseTextFilter` only require subclasses to implement the `filter()` method. - Extracted the logic for retrying connections, and create a new `send_with_retry` diff --git a/src/pipecat/processors/consumer_processor.py b/src/pipecat/processors/consumer_processor.py index 5445b492d..3654194ec 100644 --- a/src/pipecat/processors/consumer_processor.py +++ b/src/pipecat/processors/consumer_processor.py @@ -83,4 +83,4 @@ class ConsumerProcessor(FrameProcessor): while True: frame = await self._queue.get() new_frame = await self._transformer(frame) - await self.push_frame(new_frame, self._direction) + await self.queue_frame(new_frame, self._direction)