@@ -6,10 +6,11 @@
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.frames.frames import (
|
||||
EndFrame,
|
||||
@@ -24,8 +25,6 @@ from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
|
||||
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class FrameDirection(Enum):
|
||||
DOWNSTREAM = 1
|
||||
@@ -220,11 +219,16 @@ class FrameProcessor:
|
||||
#
|
||||
|
||||
async def _start_interruption(self):
|
||||
# Cancel the push frame task. This will stop pushing frames downstream.
|
||||
await self.__cancel_push_task()
|
||||
try:
|
||||
# Cancel the push frame task. This will stop pushing frames downstream.
|
||||
await self.__cancel_push_task()
|
||||
|
||||
# Cancel the input task. This will stop processing queued frames.
|
||||
await self.__cancel_input_task()
|
||||
# Cancel the input task. This will stop processing queued frames.
|
||||
await self.__cancel_input_task()
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
raise
|
||||
|
||||
# Create a new input queue and task.
|
||||
self.__create_input_task()
|
||||
@@ -281,7 +285,11 @@ class FrameProcessor:
|
||||
|
||||
self.__input_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
logger.trace(f"Cancelled input task in {self}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
|
||||
def __create_push_task(self):
|
||||
self.__push_queue = asyncio.Queue()
|
||||
@@ -300,7 +308,11 @@ class FrameProcessor:
|
||||
running = not isinstance(frame, EndFrame)
|
||||
self.__push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
logger.trace(f"Cancelled push task in {self}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
|
||||
async def _call_event_handler(self, event_name: str, *args, **kwargs):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user