diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 79816a868..0c534a006 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -46,6 +46,7 @@ from pipecat.observers.turn_tracking_observer import TurnTrackingObserver from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource from pipecat.pipeline.task_observer import TaskObserver +from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams from pipecat.utils.tracing.setup import is_tracing_available @@ -704,7 +705,7 @@ class PipelineTask(BasePipelineTask): report_only_initial_ttfb=self._params.report_only_initial_ttfb, interruption_strategies=self._params.interruption_strategies, ) - start_frame.metadata = self._params.start_metadata + start_frame.metadata = self._create_start_metadata() await self._pipeline.queue_frame(start_frame) # Wait for the pipeline to be started before pushing any other frame. @@ -860,6 +861,7 @@ class PipelineTask(BasePipelineTask): return True async def _load_observer_files(self): + """Dynamically load observers from files listed in PIPECAT_OBSERVER_FILES.""" observer_files = os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":") for f in observer_files: try: @@ -885,3 +887,26 @@ class PipelineTask(BasePipelineTask): tasks = [t.get_name() for t in self._task_manager.current_tasks()] if tasks: logger.warning(f"Dangling tasks detected: {tasks}") + + def _create_start_metadata(self) -> Dict[str, Any]: + """Build and return start metadata including user-provided values.""" + start_metadata = {} + + # NOTE(aleix): Remove when OpenAILLMContext/LLMUserContextAggregator is removed. + if self._find_deprecated_openaillmcontext(self._pipeline): + start_metadata["deprecated_openaillmcontext"] = True + + # Update with user provided metadata. + start_metadata.update(self._params.start_metadata) + + return start_metadata + + def _find_deprecated_openaillmcontext(self, processor: FrameProcessor) -> bool: + """Check whether there is a deprecated LLMUserContextAggregator in the pipeline.""" + if isinstance(processor, LLMUserContextAggregator): + return True + + for p in processor.processors: + if self._find_deprecated_openaillmcontext(p): + return True + return False diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 0a7e2751e..09ec36b9a 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -196,6 +196,7 @@ class FrameProcessor(BaseObject): # Other properties (deprecated) self._allow_interruptions = False self._interruption_strategies: List[BaseInterruptionStrategy] = [] + self._deprecated_openaillmcontext = False # Indicates whether we have received the StartFrame. self.__started = False @@ -794,6 +795,9 @@ class FrameProcessor(BaseObject): self._interruption_strategies = frame.interruption_strategies self._report_only_initial_ttfb = frame.report_only_initial_ttfb + # NOTE(aleix): Remove when OpenAILLMContext/LLMUserContextAggregator is removed. + self._deprecated_openaillmcontext = "deprecated_openaillmcontext" in frame.metadata + self.__create_process_task() async def __cancel(self, frame: CancelFrame): diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 14a6b547f..b34eadfb8 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -394,7 +394,7 @@ class BaseInputTransport(FrameProcessor): async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState) -> VADState: """Handle Voice Activity Detection results and generate appropriate frames.""" - if self._params.turn_analyzer: + if self._params.turn_analyzer or self._deprecated_openaillmcontext: return await self._deprecated_handle_vad(audio_frame, vad_state) else: return await self._new_handle_vad(audio_frame, vad_state)