diff --git a/CHANGELOG.md b/CHANGELOG.md index 5396a1eb7..75e3f28b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `pipecat.frames.frames.KeypadEntry` is deprecated use `pipecat.audio.dtmf.types.KeypadEntry` instead. +## Fixed + +- Fixed an issue where `PipelineTask` was not cleaning up the observers. + ## [0.0.82] - 2025-08-28 ### Added diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 23b8c5cc2..46c6b8312 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -571,6 +571,10 @@ class PipelineTask(BasePipelineTask): # Cleanup base object. await self.cleanup() + # Cleanup observers. + if self._observer: + await self._observer.cleanup() + # End conversation tracing if it's active - this will also close any active turn span if self._enable_tracing and hasattr(self, "_turn_trace_observer"): self._turn_trace_observer.end_conversation_tracing() diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index 6f266a0db..7bf83480d 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -119,6 +119,16 @@ class TaskObserver(BaseObserver): for proxy in self._proxies.values(): await self._task_manager.cancel_task(proxy.task) + async def cleanup(self): + """Cleanup all proxy observers.""" + await super().cleanup() + + if not self._proxies: + return + + for proxy in self._proxies: + await proxy.cleanup() + async def on_process_frame(self, data: FramePushed): """Queue frame data for all managed observers.