diff --git a/changelog/3867.fixed.md b/changelog/3867.fixed.md new file mode 100644 index 000000000..41ee584a2 --- /dev/null +++ b/changelog/3867.fixed.md @@ -0,0 +1 @@ +- Fixed `PipelineTask` double-inserting `RTVIProcessor` into the frame chain when the user provides both an `RTVIProcessor` in the pipeline and a custom `RTVIObserver` subclass in observers. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 2cfe26606..eeb39c9b6 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -330,6 +330,7 @@ class PipelineTask(BasePipelineTask): # RTVI support self._rtvi = None + prepend_rtvi = False external_rtvi = self._find_processor(pipeline, RTVIProcessor) external_observer_found = any(isinstance(o, RTVIObserver) for o in observers) @@ -352,6 +353,7 @@ class PipelineTask(BasePipelineTask): elif enable_rtvi: self._rtvi = rtvi_processor or RTVIProcessor() observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params)) + prepend_rtvi = True if self._rtvi: # Automatically call RTVIProcessor.set_bot_ready() @@ -388,7 +390,10 @@ class PipelineTask(BasePipelineTask): # allows us to receive and react to downstream frames. source = PipelineSource(self._source_push_frame, name=f"{self}::Source") sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink") - processors = [self._rtvi, pipeline] if self._rtvi else [pipeline] + # Only prepend the RTVIProcessor if we created it ourselves. When the + # user already placed it inside their pipeline we must not insert it + # again or it will appear twice in the frame chain. + processors = [self._rtvi, pipeline] if prepend_rtvi else [pipeline] self._pipeline = Pipeline(processors, source=source, sink=sink) # The task observer acts as a proxy to the provided observers. This way,