Merge pull request #2055 from pipecat-ai/aleix/fix-sentry-async
SentryMetrics: send metrics to sentry asynchronously
This commit is contained in:
@@ -84,6 +84,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an event loop blocking issue when using `SentryMetrics`.
|
||||
|
||||
- Fixed an issue in `FastAPIWebsocketClient` to ensure proper disconnection
|
||||
when the websocket is already closed.
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import random
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Dict
|
||||
|
||||
@@ -26,13 +25,11 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMTextFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopFrame,
|
||||
StopInterruptionFrame,
|
||||
TranscriptionFrame,
|
||||
TTSTextFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
@@ -49,7 +46,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIProcessor
|
||||
from pipecat.serializers.protobuf import ProtobufFrameSerializer
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
FastAPIWebsocketParams,
|
||||
|
||||
@@ -220,11 +220,15 @@ class FrameProcessor(BaseObject):
|
||||
self._clock = setup.clock
|
||||
self._task_manager = setup.task_manager
|
||||
self._observer = setup.observer
|
||||
if self._metrics is not None:
|
||||
await self._metrics.setup(self._task_manager)
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
if self._metrics is not None:
|
||||
await self._metrics.cleanup()
|
||||
|
||||
def link(self, processor: "FrameProcessor"):
|
||||
self._next = processor
|
||||
|
||||
@@ -18,15 +18,29 @@ from pipecat.metrics.metrics import (
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
from pipecat.utils.asyncio import TaskManager
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
|
||||
class FrameProcessorMetrics:
|
||||
class FrameProcessorMetrics(BaseObject):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._task_manager = None
|
||||
self._start_ttfb_time = 0
|
||||
self._start_processing_time = 0
|
||||
self._last_ttfb_time = 0
|
||||
self._should_report_ttfb = True
|
||||
|
||||
async def setup(self, task_manager: TaskManager):
|
||||
self._task_manager = task_manager
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
|
||||
@property
|
||||
def task_manager(self) -> TaskManager:
|
||||
return self._task_manager
|
||||
|
||||
@property
|
||||
def ttfb(self) -> Optional[float]:
|
||||
"""Get the current TTFB value in seconds.
|
||||
|
||||
@@ -4,8 +4,12 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.utils.asyncio import TaskManager
|
||||
|
||||
try:
|
||||
import sentry_sdk
|
||||
except ModuleNotFoundError as e:
|
||||
@@ -24,6 +28,25 @@ class SentryMetrics(FrameProcessorMetrics):
|
||||
self._sentry_available = sentry_sdk.is_initialized()
|
||||
if not self._sentry_available:
|
||||
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
|
||||
self._sentry_queue = asyncio.Queue()
|
||||
self._sentry_task = None
|
||||
|
||||
async def setup(self, task_manager: TaskManager):
|
||||
await super().setup(task_manager)
|
||||
if self._sentry_available:
|
||||
self._sentry_queue = asyncio.Queue()
|
||||
self._sentry_task = self.task_manager.create_task(
|
||||
self._sentry_task_handler(), name=f"{self}::_sentry_task_handler"
|
||||
)
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
if self._sentry_task:
|
||||
await self._sentry_queue.put(None)
|
||||
await self.task_manager.wait_for_task(self._sentry_task)
|
||||
self._sentry_task = None
|
||||
logger.trace(f"{self} Flushing Sentry metrics")
|
||||
sentry_sdk.flush(timeout=5.0)
|
||||
|
||||
async def start_ttfb_metrics(self, report_only_initial_ttfb):
|
||||
await super().start_ttfb_metrics(report_only_initial_ttfb)
|
||||
@@ -34,14 +57,15 @@ class SentryMetrics(FrameProcessorMetrics):
|
||||
name=f"TTFB for {self._processor_name()}",
|
||||
)
|
||||
logger.debug(
|
||||
f"Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})"
|
||||
f"{self} Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})"
|
||||
)
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
await super().stop_ttfb_metrics()
|
||||
|
||||
if self._sentry_available and self._ttfb_metrics_tx:
|
||||
self._ttfb_metrics_tx.finish()
|
||||
await self._sentry_queue.put(self._ttfb_metrics_tx)
|
||||
self._ttfb_metrics_tx = None
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
await super().start_processing_metrics()
|
||||
@@ -52,11 +76,20 @@ class SentryMetrics(FrameProcessorMetrics):
|
||||
name=f"Processing for {self._processor_name()}",
|
||||
)
|
||||
logger.debug(
|
||||
f"Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
|
||||
f"{self} Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
|
||||
)
|
||||
|
||||
async def stop_processing_metrics(self):
|
||||
await super().stop_processing_metrics()
|
||||
|
||||
if self._sentry_available and self._processing_metrics_tx:
|
||||
self._processing_metrics_tx.finish()
|
||||
await self._sentry_queue.put(self._processing_metrics_tx)
|
||||
self._processing_metrics_tx = None
|
||||
|
||||
async def _sentry_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
tx = await self._sentry_queue.get()
|
||||
if tx:
|
||||
await self.task_manager.get_event_loop().run_in_executor(None, tx.finish)
|
||||
running = tx is not None
|
||||
|
||||
Reference in New Issue
Block a user