diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bca536c6..f153dda13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an event loop blocking issue when using `SentryMetrics`. + - Fixed an issue in `FastAPIWebsocketClient` to ensure proper disconnection when the websocket is already closed. diff --git a/examples/freeze-test/freeze_test_bot.py b/examples/freeze-test/freeze_test_bot.py index 15c2f58ed..5be79ad12 100644 --- a/examples/freeze-test/freeze_test_bot.py +++ b/examples/freeze-test/freeze_test_bot.py @@ -7,7 +7,6 @@ import argparse import asyncio import os -import random from contextlib import asynccontextmanager from typing import Any, Dict @@ -26,13 +25,11 @@ from pipecat.frames.frames import ( Frame, InterimTranscriptionFrame, LLMFullResponseEndFrame, - LLMTextFrame, StartFrame, StartInterruptionFrame, StopFrame, StopInterruptionFrame, TranscriptionFrame, - TTSTextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -49,7 +46,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIProcessor from pipecat.serializers.protobuf import ProtobufFrameSerializer from pipecat.services.cartesia.tts import CartesiaTTSService -from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.network.fastapi_websocket import ( FastAPIWebsocketParams, diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 5f80c17c6..bee5ce91c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -220,11 +220,15 @@ class FrameProcessor(BaseObject): self._clock = setup.clock self._task_manager = setup.task_manager self._observer = setup.observer + if self._metrics is not None: + await self._metrics.setup(self._task_manager) async def cleanup(self): await super().cleanup() await self.__cancel_input_task() await self.__cancel_push_task() + if self._metrics is not None: + await self._metrics.cleanup() def link(self, processor: "FrameProcessor"): self._next = processor diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py index b24cfcd61..40a83fa38 100644 --- a/src/pipecat/processors/metrics/frame_processor_metrics.py +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -18,15 +18,29 @@ from pipecat.metrics.metrics import ( TTFBMetricsData, TTSUsageMetricsData, ) +from pipecat.utils.asyncio import TaskManager +from pipecat.utils.base_object import BaseObject -class FrameProcessorMetrics: +class FrameProcessorMetrics(BaseObject): def __init__(self): + super().__init__() + self._task_manager = None self._start_ttfb_time = 0 self._start_processing_time = 0 self._last_ttfb_time = 0 self._should_report_ttfb = True + async def setup(self, task_manager: TaskManager): + self._task_manager = task_manager + + async def cleanup(self): + await super().cleanup() + + @property + def task_manager(self) -> TaskManager: + return self._task_manager + @property def ttfb(self) -> Optional[float]: """Get the current TTFB value in seconds. diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py index f3ee40ae0..20d854ffb 100644 --- a/src/pipecat/processors/metrics/sentry.py +++ b/src/pipecat/processors/metrics/sentry.py @@ -4,8 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import asyncio + from loguru import logger +from pipecat.utils.asyncio import TaskManager + try: import sentry_sdk except ModuleNotFoundError as e: @@ -24,6 +28,25 @@ class SentryMetrics(FrameProcessorMetrics): self._sentry_available = sentry_sdk.is_initialized() if not self._sentry_available: logger.warning("Sentry SDK not initialized. Sentry features will be disabled.") + self._sentry_queue = asyncio.Queue() + self._sentry_task = None + + async def setup(self, task_manager: TaskManager): + await super().setup(task_manager) + if self._sentry_available: + self._sentry_queue = asyncio.Queue() + self._sentry_task = self.task_manager.create_task( + self._sentry_task_handler(), name=f"{self}::_sentry_task_handler" + ) + + async def cleanup(self): + await super().cleanup() + if self._sentry_task: + await self._sentry_queue.put(None) + await self.task_manager.wait_for_task(self._sentry_task) + self._sentry_task = None + logger.trace(f"{self} Flushing Sentry metrics") + sentry_sdk.flush(timeout=5.0) async def start_ttfb_metrics(self, report_only_initial_ttfb): await super().start_ttfb_metrics(report_only_initial_ttfb) @@ -34,14 +57,15 @@ class SentryMetrics(FrameProcessorMetrics): name=f"TTFB for {self._processor_name()}", ) logger.debug( - f"Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})" + f"{self} Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})" ) async def stop_ttfb_metrics(self): await super().stop_ttfb_metrics() if self._sentry_available and self._ttfb_metrics_tx: - self._ttfb_metrics_tx.finish() + await self._sentry_queue.put(self._ttfb_metrics_tx) + self._ttfb_metrics_tx = None async def start_processing_metrics(self): await super().start_processing_metrics() @@ -52,11 +76,20 @@ class SentryMetrics(FrameProcessorMetrics): name=f"Processing for {self._processor_name()}", ) logger.debug( - f"Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})" + f"{self} Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})" ) async def stop_processing_metrics(self): await super().stop_processing_metrics() if self._sentry_available and self._processing_metrics_tx: - self._processing_metrics_tx.finish() + await self._sentry_queue.put(self._processing_metrics_tx) + self._processing_metrics_tx = None + + async def _sentry_task_handler(self): + running = True + while running: + tx = await self._sentry_queue.get() + if tx: + await self.task_manager.get_event_loop().run_in_executor(None, tx.finish) + running = tx is not None