diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 6d4b4c039..9a2f1d33d 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -49,6 +49,7 @@ from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource from pipecat.pipeline.task_observer import TaskObserver from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup +from pipecat.processors.frameworks.rtvi import RTVIObserverParams, RTVIProcessor from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams from pipecat.utils.tracing.setup import is_tracing_available from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver @@ -225,9 +226,12 @@ class PipelineTask(BasePipelineTask): conversation_id: Optional[str] = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, + enable_rtvi: bool = True, idle_timeout_frames: Tuple[Type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame), idle_timeout_secs: Optional[float] = IDLE_TIMEOUT_SECS, observers: Optional[List[BaseObserver]] = None, + rtvi_processor: Optional[RTVIProcessor] = None, + rtvi_observer_params: Optional[RTVIObserverParams] = None, task_manager: Optional[BaseTaskManager] = None, ): """Initialize the PipelineTask. @@ -244,6 +248,7 @@ class PipelineTask(BasePipelineTask): check_dangling_tasks: Whether to check for processors' tasks finishing properly. clock: Clock implementation for timing operations. conversation_id: Optional custom ID for the conversation. + enable_rtvi: Whether to automatically add RTVI support to the pipeline. enable_tracing: Whether to enable tracing. enable_turn_tracking: Whether to enable turn tracking. idle_timeout_frames: A tuple with the frames that should trigger an idle @@ -252,6 +257,8 @@ class PipelineTask(BasePipelineTask): None. If a pipeline is idle the pipeline task will be cancelled automatically. observers: List of observers for monitoring pipeline execution. + rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled. + rtvi_processor: The RTVI processor to add if RTVI is enabled. task_manager: Optional task manager for handling asyncio tasks. """ super().__init__() @@ -306,6 +313,16 @@ class PipelineTask(BasePipelineTask): self._heartbeat_push_task: Optional[asyncio.Task] = None self._heartbeat_monitor_task: Optional[asyncio.Task] = None + # RTVI support + self._rtvi = None + if enable_rtvi: + self._rtvi = rtvi_processor or RTVIProcessor() + observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params)) + + @self.rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi: RTVIProcessor): + await rtvi.set_bot_ready() + # This is the idle event. When selected frames are pushed from any # processor we consider the pipeline is not idle. We use an observer # which will be listening any part of the pipeline. @@ -335,7 +352,8 @@ class PipelineTask(BasePipelineTask): # allows us to receive and react to downstream frames. source = PipelineSource(self._source_push_frame, name=f"{self}::Source") sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink") - self._pipeline = Pipeline([pipeline], source=source, sink=sink) + processors = [self._rtvi, pipeline] if self._rtvi else [pipeline] + self._pipeline = Pipeline(processors, source=source, sink=sink) # The task observer acts as a proxy to the provided observers. This way, # we only need to pass a single observer (using the StartFrame) which @@ -398,6 +416,17 @@ class PipelineTask(BasePipelineTask): """ return self._turn_trace_observer + @property + def rtvi(self) -> RTVIProcessor: + """Get the RTVI processor if RTVI is enabled. + + Returns: + The RTVI processor added to the pipeline when RTVI is enabled. + """ + if not self._rtvi: + raise Exception(f"{self} RTVI is not enabled.") + return self._rtvi + def event_handler(self, event_name: str): """Decorator for registering event handlers.