PipelineTask: automatically add RTVI processor and RTVI observer

If `enable_rtvi` is enabled (enabled by default) and RTVI processor will be
added automatically to the pipeline. Also, and RTVI observer will be
registered.
This commit is contained in:
Aleix Conchillo Flaqué
2026-01-21 14:14:43 -08:00
parent cc61cdbba3
commit e85a00cc0e

View File

@@ -49,6 +49,7 @@ from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.processors.frameworks.rtvi import RTVIObserverParams, RTVIProcessor
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -225,9 +226,12 @@ class PipelineTask(BasePipelineTask):
conversation_id: Optional[str] = None,
enable_tracing: bool = False,
enable_turn_tracking: bool = True,
enable_rtvi: bool = True,
idle_timeout_frames: Tuple[Type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame),
idle_timeout_secs: Optional[float] = IDLE_TIMEOUT_SECS,
observers: Optional[List[BaseObserver]] = None,
rtvi_processor: Optional[RTVIProcessor] = None,
rtvi_observer_params: Optional[RTVIObserverParams] = None,
task_manager: Optional[BaseTaskManager] = None,
):
"""Initialize the PipelineTask.
@@ -244,6 +248,7 @@ class PipelineTask(BasePipelineTask):
check_dangling_tasks: Whether to check for processors' tasks finishing properly.
clock: Clock implementation for timing operations.
conversation_id: Optional custom ID for the conversation.
enable_rtvi: Whether to automatically add RTVI support to the pipeline.
enable_tracing: Whether to enable tracing.
enable_turn_tracking: Whether to enable turn tracking.
idle_timeout_frames: A tuple with the frames that should trigger an idle
@@ -252,6 +257,8 @@ class PipelineTask(BasePipelineTask):
None. If a pipeline is idle the pipeline task will be cancelled
automatically.
observers: List of observers for monitoring pipeline execution.
rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled.
rtvi_processor: The RTVI processor to add if RTVI is enabled.
task_manager: Optional task manager for handling asyncio tasks.
"""
super().__init__()
@@ -306,6 +313,16 @@ class PipelineTask(BasePipelineTask):
self._heartbeat_push_task: Optional[asyncio.Task] = None
self._heartbeat_monitor_task: Optional[asyncio.Task] = None
# RTVI support
self._rtvi = None
if enable_rtvi:
self._rtvi = rtvi_processor or RTVIProcessor()
observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params))
@self.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi: RTVIProcessor):
await rtvi.set_bot_ready()
# This is the idle event. When selected frames are pushed from any
# processor we consider the pipeline is not idle. We use an observer
# which will be listening any part of the pipeline.
@@ -335,7 +352,8 @@ class PipelineTask(BasePipelineTask):
# allows us to receive and react to downstream frames.
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
self._pipeline = Pipeline([pipeline], source=source, sink=sink)
processors = [self._rtvi, pipeline] if self._rtvi else [pipeline]
self._pipeline = Pipeline(processors, source=source, sink=sink)
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
@@ -398,6 +416,17 @@ class PipelineTask(BasePipelineTask):
"""
return self._turn_trace_observer
@property
def rtvi(self) -> RTVIProcessor:
"""Get the RTVI processor if RTVI is enabled.
Returns:
The RTVI processor added to the pipeline when RTVI is enabled.
"""
if not self._rtvi:
raise Exception(f"{self} RTVI is not enabled.")
return self._rtvi
def event_handler(self, event_name: str):
"""Decorator for registering event handlers.