BaseInputTransport: fix OpenAILLMContext backwards compatibility

This commit is contained in:
Aleix Conchillo Flaqué
2025-12-29 19:12:55 -08:00
parent 00548769cb
commit 4dba9ea329
3 changed files with 31 additions and 2 deletions

View File

@@ -46,6 +46,7 @@ from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
@@ -704,7 +705,7 @@ class PipelineTask(BasePipelineTask):
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
interruption_strategies=self._params.interruption_strategies,
)
start_frame.metadata = self._params.start_metadata
start_frame.metadata = self._create_start_metadata()
await self._pipeline.queue_frame(start_frame)
# Wait for the pipeline to be started before pushing any other frame.
@@ -860,6 +861,7 @@ class PipelineTask(BasePipelineTask):
return True
async def _load_observer_files(self):
"""Dynamically load observers from files listed in PIPECAT_OBSERVER_FILES."""
observer_files = os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":")
for f in observer_files:
try:
@@ -885,3 +887,26 @@ class PipelineTask(BasePipelineTask):
tasks = [t.get_name() for t in self._task_manager.current_tasks()]
if tasks:
logger.warning(f"Dangling tasks detected: {tasks}")
def _create_start_metadata(self) -> Dict[str, Any]:
"""Build and return start metadata including user-provided values."""
start_metadata = {}
# NOTE(aleix): Remove when OpenAILLMContext/LLMUserContextAggregator is removed.
if self._find_deprecated_openaillmcontext(self._pipeline):
start_metadata["deprecated_openaillmcontext"] = True
# Update with user provided metadata.
start_metadata.update(self._params.start_metadata)
return start_metadata
def _find_deprecated_openaillmcontext(self, processor: FrameProcessor) -> bool:
"""Check whether there is a deprecated LLMUserContextAggregator in the pipeline."""
if isinstance(processor, LLMUserContextAggregator):
return True
for p in processor.processors:
if self._find_deprecated_openaillmcontext(p):
return True
return False

View File

@@ -196,6 +196,7 @@ class FrameProcessor(BaseObject):
# Other properties (deprecated)
self._allow_interruptions = False
self._interruption_strategies: List[BaseInterruptionStrategy] = []
self._deprecated_openaillmcontext = False
# Indicates whether we have received the StartFrame.
self.__started = False
@@ -794,6 +795,9 @@ class FrameProcessor(BaseObject):
self._interruption_strategies = frame.interruption_strategies
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
# NOTE(aleix): Remove when OpenAILLMContext/LLMUserContextAggregator is removed.
self._deprecated_openaillmcontext = "deprecated_openaillmcontext" in frame.metadata
self.__create_process_task()
async def __cancel(self, frame: CancelFrame):

View File

@@ -394,7 +394,7 @@ class BaseInputTransport(FrameProcessor):
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState) -> VADState:
"""Handle Voice Activity Detection results and generate appropriate frames."""
if self._params.turn_analyzer:
if self._params.turn_analyzer or self._deprecated_openaillmcontext:
return await self._deprecated_handle_vad(audio_frame, vad_state)
else:
return await self._new_handle_vad(audio_frame, vad_state)