Merge pull request #3871 from rupesh-svg/fix/rtvi-processor-double-insert

Fix PipelineTask double-inserting RTVIProcessor with custom RTVIObserver
This commit is contained in:
Mark Backman
2026-02-27 19:34:46 -05:00
committed by GitHub
2 changed files with 7 additions and 1 deletions

1
changelog/3867.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `PipelineTask` double-inserting `RTVIProcessor` into the frame chain when the user provides both an `RTVIProcessor` in the pipeline and a custom `RTVIObserver` subclass in observers.

View File

@@ -330,6 +330,7 @@ class PipelineTask(BasePipelineTask):
# RTVI support
self._rtvi = None
prepend_rtvi = False
external_rtvi = self._find_processor(pipeline, RTVIProcessor)
external_observer_found = any(isinstance(o, RTVIObserver) for o in observers)
@@ -352,6 +353,7 @@ class PipelineTask(BasePipelineTask):
elif enable_rtvi:
self._rtvi = rtvi_processor or RTVIProcessor()
observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params))
prepend_rtvi = True
if self._rtvi:
# Automatically call RTVIProcessor.set_bot_ready()
@@ -388,7 +390,10 @@ class PipelineTask(BasePipelineTask):
# allows us to receive and react to downstream frames.
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
processors = [self._rtvi, pipeline] if self._rtvi else [pipeline]
# Only prepend the RTVIProcessor if we created it ourselves. When the
# user already placed it inside their pipeline we must not insert it
# again or it will appear twice in the frame chain.
processors = [self._rtvi, pipeline] if prepend_rtvi else [pipeline]
self._pipeline = Pipeline(processors, source=source, sink=sink)
# The task observer acts as a proxy to the provided observers. This way,