From c71cec04d331a6465f96929572893107d1c59b69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 3 Sep 2025 11:18:13 -0700 Subject: [PATCH 1/2] ParallelPipeline: wait for CancelFrame in all branches --- CHANGELOG.md | 3 +++ src/pipecat/pipeline/parallel_pipeline.py | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78322bcc0..0cedaa5d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,6 +79,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `ParallelPipeline` now waits for `CancelFrame` to finish in all branches + before pushing it downstream. + - Added `sip_codecs` to the `DailyRoomSipParams`. - Updated the `configure()` function in `pipecat.runner.daily` to include new diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 148387de7..480eafd20 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -16,7 +16,7 @@ from typing import Dict, List from loguru import logger -from pipecat.frames.frames import EndFrame, Frame, StartFrame +from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup @@ -141,7 +141,7 @@ class ParallelPipeline(BasePipeline): await super().process_frame(frame, direction) # Parallel pipeline synchronized frames. - if isinstance(frame, (StartFrame, EndFrame)): + if isinstance(frame, (StartFrame, EndFrame, CancelFrame)): self._frame_counter[frame.id] = len(self._pipelines) await self.pause_processing_system_frames() await self.pause_processing_frames() @@ -158,7 +158,7 @@ class ParallelPipeline(BasePipeline): async def _pipeline_sink_push_frame(self, frame: Frame, direction: FrameDirection): # Parallel pipeline synchronized frames. - if isinstance(frame, (StartFrame, EndFrame)): + if isinstance(frame, (StartFrame, EndFrame, CancelFrame)): # Decrement counter. frame_counter = self._frame_counter.get(frame.id, 0) if frame_counter > 0: From b52296450ca1c893f67aa98d0c37b1cb63844176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 3 Sep 2025 09:35:38 -0700 Subject: [PATCH 2/2] DeepgramSTTService: remove raising CancelledError --- src/pipecat/services/deepgram/stt.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/pipecat/services/deepgram/stt.py b/src/pipecat/services/deepgram/stt.py index e2fea34fb..fb5f67029 100644 --- a/src/pipecat/services/deepgram/stt.py +++ b/src/pipecat/services/deepgram/stt.py @@ -238,16 +238,15 @@ class DeepgramSTTService(STTService): async def _disconnect(self): if self._connection.is_connected: logger.debug("Disconnecting from Deepgram") - result = await self._connection.finish() - # Deepgram swallows asyncio.CancelledError internally and returns False instead. - # This prevents proper cancellation propagation: tasks awaiting on queue.get() - # would remain stuck if we didn’t normalize this back into a CancelledError. + # Deepgram swallows asyncio.CancelledError internally which prevents + # proper cancellation propagation. This issue was found with + # parallel pipelines where `CancelFrame` was not awaited for to + # finish in all branches and it was pushed downstream reaching the + # end of the pipeline, which caused `cleanup()` to be called while + # Deepgram disconnection was still finishing and therefore + # preventing the task cancellation that occurs during `cleanup()`. # GH issue: https://github.com/deepgram/deepgram-python-sdk/issues/570 - if not result: - logger.warning(f"{self}: Deepgram connection failed to disconnect, forcing cancel.") - raise asyncio.CancelledError( - f"{self}: Deepgram connection cancelled during disconnect" - ) + await self._connection.finish() async def start_metrics(self): """Start TTFB and processing metrics collection."""