Merge pull request #2572 from pipecat-ai/aleix/deepgram-disconnect-task
ParallePipeline: wait for CancelFrame in all branches
This commit is contained in:
@@ -79,6 +79,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `ParallelPipeline` now waits for `CancelFrame` to finish in all branches
|
||||
before pushing it downstream.
|
||||
|
||||
- Added `sip_codecs` to the `DailyRoomSipParams`.
|
||||
|
||||
- Updated the `configure()` function in `pipecat.runner.daily` to include new
|
||||
|
||||
@@ -16,7 +16,7 @@ from typing import Dict, List
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import EndFrame, Frame, StartFrame
|
||||
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
|
||||
from pipecat.pipeline.base_pipeline import BasePipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
|
||||
@@ -141,7 +141,7 @@ class ParallelPipeline(BasePipeline):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Parallel pipeline synchronized frames.
|
||||
if isinstance(frame, (StartFrame, EndFrame)):
|
||||
if isinstance(frame, (StartFrame, EndFrame, CancelFrame)):
|
||||
self._frame_counter[frame.id] = len(self._pipelines)
|
||||
await self.pause_processing_system_frames()
|
||||
await self.pause_processing_frames()
|
||||
@@ -158,7 +158,7 @@ class ParallelPipeline(BasePipeline):
|
||||
|
||||
async def _pipeline_sink_push_frame(self, frame: Frame, direction: FrameDirection):
|
||||
# Parallel pipeline synchronized frames.
|
||||
if isinstance(frame, (StartFrame, EndFrame)):
|
||||
if isinstance(frame, (StartFrame, EndFrame, CancelFrame)):
|
||||
# Decrement counter.
|
||||
frame_counter = self._frame_counter.get(frame.id, 0)
|
||||
if frame_counter > 0:
|
||||
|
||||
@@ -238,16 +238,15 @@ class DeepgramSTTService(STTService):
|
||||
async def _disconnect(self):
|
||||
if self._connection.is_connected:
|
||||
logger.debug("Disconnecting from Deepgram")
|
||||
result = await self._connection.finish()
|
||||
# Deepgram swallows asyncio.CancelledError internally and returns False instead.
|
||||
# This prevents proper cancellation propagation: tasks awaiting on queue.get()
|
||||
# would remain stuck if we didn’t normalize this back into a CancelledError.
|
||||
# Deepgram swallows asyncio.CancelledError internally which prevents
|
||||
# proper cancellation propagation. This issue was found with
|
||||
# parallel pipelines where `CancelFrame` was not awaited for to
|
||||
# finish in all branches and it was pushed downstream reaching the
|
||||
# end of the pipeline, which caused `cleanup()` to be called while
|
||||
# Deepgram disconnection was still finishing and therefore
|
||||
# preventing the task cancellation that occurs during `cleanup()`.
|
||||
# GH issue: https://github.com/deepgram/deepgram-python-sdk/issues/570
|
||||
if not result:
|
||||
logger.warning(f"{self}: Deepgram connection failed to disconnect, forcing cancel.")
|
||||
raise asyncio.CancelledError(
|
||||
f"{self}: Deepgram connection cancelled during disconnect"
|
||||
)
|
||||
await self._connection.finish()
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start TTFB and processing metrics collection."""
|
||||
|
||||
Reference in New Issue
Block a user