From 6f33aff0c6c1396b43e63b63c7764afac4dbbdff Mon Sep 17 00:00:00 2001 From: Rupesh Date: Fri, 27 Feb 2026 13:29:01 -0800 Subject: [PATCH 1/2] Fix PipelineTask double-inserting RTVIProcessor when custom RTVIObserver is provided When the user places an RTVIProcessor inside their pipeline and provides a custom RTVIObserver subclass in observers, PipelineTask correctly detects both and logs "skipping default ones." However it then unconditionally prepends self._rtvi to the pipeline, causing the processor to appear twice in the frame chain. Track whether the RTVIProcessor was found externally (inside the user pipeline) vs created internally. Only prepend it when created internally. Fixes #3867 --- changelog/3867.fixed.md | 1 + src/pipecat/pipeline/task.py | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 changelog/3867.fixed.md diff --git a/changelog/3867.fixed.md b/changelog/3867.fixed.md new file mode 100644 index 000000000..41ee584a2 --- /dev/null +++ b/changelog/3867.fixed.md @@ -0,0 +1 @@ +- Fixed `PipelineTask` double-inserting `RTVIProcessor` into the frame chain when the user provides both an `RTVIProcessor` in the pipeline and a custom `RTVIObserver` subclass in observers. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 2cfe26606..1db23e7d4 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -330,6 +330,7 @@ class PipelineTask(BasePipelineTask): # RTVI support self._rtvi = None + self._rtvi_external = False external_rtvi = self._find_processor(pipeline, RTVIProcessor) external_observer_found = any(isinstance(o, RTVIObserver) for o in observers) @@ -349,6 +350,7 @@ class PipelineTask(BasePipelineTask): "They are both added by default, no need to add them yourself." ) self._rtvi = external_rtvi + self._rtvi_external = True elif enable_rtvi: self._rtvi = rtvi_processor or RTVIProcessor() observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params)) @@ -388,7 +390,13 @@ class PipelineTask(BasePipelineTask): # allows us to receive and react to downstream frames. source = PipelineSource(self._source_push_frame, name=f"{self}::Source") sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink") - processors = [self._rtvi, pipeline] if self._rtvi else [pipeline] + # Only prepend the RTVIProcessor if we created it ourselves. When the + # user already placed it inside their pipeline we must not insert it + # again or it will appear twice in the frame chain. + if self._rtvi and not self._rtvi_external: + processors = [self._rtvi, pipeline] + else: + processors = [pipeline] self._pipeline = Pipeline(processors, source=source, sink=sink) # The task observer acts as a proxy to the provided observers. This way, From 56f2564ed10da68d96675e7a482a0f8f29e26561 Mon Sep 17 00:00:00 2001 From: Rupesh Date: Fri, 27 Feb 2026 14:45:37 -0800 Subject: [PATCH 2/2] Use local variable instead of instance variable for RTVI prepend decision Replace _rtvi_external instance variable with a local prepend_rtvi flag since it is only used during __init__ to decide whether to prepend the RTVIProcessor to the pipeline. --- src/pipecat/pipeline/task.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 1db23e7d4..eeb39c9b6 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -330,7 +330,7 @@ class PipelineTask(BasePipelineTask): # RTVI support self._rtvi = None - self._rtvi_external = False + prepend_rtvi = False external_rtvi = self._find_processor(pipeline, RTVIProcessor) external_observer_found = any(isinstance(o, RTVIObserver) for o in observers) @@ -350,10 +350,10 @@ class PipelineTask(BasePipelineTask): "They are both added by default, no need to add them yourself." ) self._rtvi = external_rtvi - self._rtvi_external = True elif enable_rtvi: self._rtvi = rtvi_processor or RTVIProcessor() observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params)) + prepend_rtvi = True if self._rtvi: # Automatically call RTVIProcessor.set_bot_ready() @@ -393,10 +393,7 @@ class PipelineTask(BasePipelineTask): # Only prepend the RTVIProcessor if we created it ourselves. When the # user already placed it inside their pipeline we must not insert it # again or it will appear twice in the frame chain. - if self._rtvi and not self._rtvi_external: - processors = [self._rtvi, pipeline] - else: - processors = [pipeline] + processors = [self._rtvi, pipeline] if prepend_rtvi else [pipeline] self._pipeline = Pipeline(processors, source=source, sink=sink) # The task observer acts as a proxy to the provided observers. This way,