From 5e22ef251ddeddf12fdfd8e54667e584533a5e51 Mon Sep 17 00:00:00 2001 From: James Hush Date: Fri, 29 Nov 2024 13:06:45 +0800 Subject: [PATCH] fix: add logging and error handling for issue #721 (#755) --- src/pipecat/processors/frame_processor.py | 26 +++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index cc3b27050..93829c8fc 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -6,10 +6,11 @@ import asyncio import inspect - from enum import Enum from typing import Awaitable, Callable, Optional +from loguru import logger + from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( EndFrame, @@ -24,8 +25,6 @@ from pipecat.metrics.metrics import LLMTokenUsage, MetricsData from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id -from loguru import logger - class FrameDirection(Enum): DOWNSTREAM = 1 @@ -220,11 +219,16 @@ class FrameProcessor: # async def _start_interruption(self): - # Cancel the push frame task. This will stop pushing frames downstream. - await self.__cancel_push_task() + try: + # Cancel the push frame task. This will stop pushing frames downstream. + await self.__cancel_push_task() - # Cancel the input task. This will stop processing queued frames. - await self.__cancel_input_task() + # Cancel the input task. This will stop processing queued frames. + await self.__cancel_input_task() + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) + raise # Create a new input queue and task. self.__create_input_task() @@ -281,7 +285,11 @@ class FrameProcessor: self.__input_queue.task_done() except asyncio.CancelledError: + logger.trace(f"Cancelled input task in {self}") break + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) def __create_push_task(self): self.__push_queue = asyncio.Queue() @@ -300,7 +308,11 @@ class FrameProcessor: running = not isinstance(frame, EndFrame) self.__push_queue.task_done() except asyncio.CancelledError: + logger.trace(f"Cancelled push task in {self}") break + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) async def _call_event_handler(self, event_name: str, *args, **kwargs): try: