PipelineTask: cleanup observers

This commit is contained in:
Aleix Conchillo Flaqué
2025-08-29 10:54:36 -07:00
parent c1b8d2acab
commit b5a644dd6f
3 changed files with 18 additions and 0 deletions

View File

@@ -30,6 +30,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `pipecat.frames.frames.KeypadEntry` is deprecated use
`pipecat.audio.dtmf.types.KeypadEntry` instead.
## Fixed
- Fixed an issue where `PipelineTask` was not cleaning up the observers.
## [0.0.82] - 2025-08-28
### Added

View File

@@ -571,6 +571,10 @@ class PipelineTask(BasePipelineTask):
# Cleanup base object.
await self.cleanup()
# Cleanup observers.
if self._observer:
await self._observer.cleanup()
# End conversation tracing if it's active - this will also close any active turn span
if self._enable_tracing and hasattr(self, "_turn_trace_observer"):
self._turn_trace_observer.end_conversation_tracing()

View File

@@ -119,6 +119,16 @@ class TaskObserver(BaseObserver):
for proxy in self._proxies.values():
await self._task_manager.cancel_task(proxy.task)
async def cleanup(self):
"""Cleanup all proxy observers."""
await super().cleanup()
if not self._proxies:
return
for proxy in self._proxies:
await proxy.cleanup()
async def on_process_frame(self, data: FramePushed):
"""Queue frame data for all managed observers.