diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 7b20d43bf..6f9c95bb0 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -173,7 +173,7 @@ class PipelineTask(BaseTask): if self.has_finished(): return try: - push_task = self._create_tasks() + push_task = await self._create_tasks() await self._task_manager.wait_for_task(push_task) except asyncio.CancelledError: # We are awaiting on the push task and it might be cancelled @@ -203,7 +203,7 @@ class PipelineTask(BaseTask): for frame in frames: await self.queue_frame(frame) - def _create_tasks(self): + async def _create_tasks(self): self._process_up_task = self._task_manager.create_task( self._process_up_queue(), f"{self}::_process_up_queue" ) @@ -214,6 +214,8 @@ class PipelineTask(BaseTask): self._process_push_queue(), f"{self}::_process_push_queue" ) + await self._observer.start() + return self._process_push_task def _maybe_start_heartbeat_tasks(self): diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index 5bf167479..abcbf513b 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -58,8 +58,9 @@ class TaskObserver(BaseObserver): def __init__(self, *, observers: List[BaseObserver] = [], task_manager: TaskManager): self._id: int = obj_id() self._name: str = f"{self.__class__.__name__}#{obj_count(self)}" - self._proxies: List[Proxy] = self._create_proxies(observers) + self._observers = observers self._task_manager = task_manager + self._proxies: List[Proxy] = [] @property def id(self) -> int: @@ -69,6 +70,10 @@ class TaskObserver(BaseObserver): def name(self) -> str: return self._name + async def start(self): + """Starts all proxy observer tasks.""" + self._proxies = self._create_proxies(self._observers) + async def stop(self): """Stops all proxy observer tasks.""" for proxy in self._proxies: