diff --git a/changelog/3735.fixed.md b/changelog/3735.fixed.md new file mode 100644 index 000000000..02de936c7 --- /dev/null +++ b/changelog/3735.fixed.md @@ -0,0 +1 @@ +- Fixed tracing service decorators executing the wrapped function twice when the function itself raised an exception (e.g., LLM rate limit, TTS timeout). diff --git a/src/pipecat/utils/tracing/service_decorators.py b/src/pipecat/utils/tracing/service_decorators.py index 0fd939162..968fe8e8a 100644 --- a/src/pipecat/utils/tracing/service_decorators.py +++ b/src/pipecat/utils/tracing/service_decorators.py @@ -230,19 +230,21 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def gen_wrapper(self, text, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - async for item in f(self, text, *args, **kwargs): - yield item - return + if not getattr(self, "_tracing_enabled", False): + async for item in f(self, text, *args, **kwargs): + yield item + return + fn_called = False + try: async with tracing_context(self, text): + fn_called = True async for item in f(self, text, *args, **kwargs): yield item except Exception as e: + if fn_called: + raise logging.error(f"Error in TTS tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function async for item in f(self, text, *args, **kwargs): yield item @@ -251,16 +253,18 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def wrapper(self, text, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await f(self, text, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await f(self, text, *args, **kwargs) + fn_called = False + try: async with tracing_context(self, text): + fn_called = True return await f(self, text, *args, **kwargs) except Exception as e: + if fn_called: + raise logging.error(f"Error in TTS tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await f(self, text, *args, **kwargs) return wrapper @@ -293,11 +297,11 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) - def decorator(f): @functools.wraps(f) async def wrapper(self, transcript, is_final, language=None): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await f(self, transcript, is_final, language) + if not getattr(self, "_tracing_enabled", False): + return await f(self, transcript, is_final, language) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = "stt" @@ -332,14 +336,16 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) - ) # Call the original function + fn_called = True return await f(self, transcript, is_final, language) except Exception as e: # Log any exception but don't disrupt the main flow logging.warning(f"Error in STT transcription tracing: {e}") raise except Exception as e: + if fn_called: + raise logging.error(f"Error in STT tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await f(self, transcript, is_final, language) return wrapper @@ -374,11 +380,11 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - def decorator(f): @functools.wraps(f) async def wrapper(self, context, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await f(self, context, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await f(self, context, *args, **kwargs) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = "llm" @@ -525,6 +531,7 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - # Don't raise - let the function execute anyway # Run function with modified push_frame to capture the output + fn_called = True result = await f(self, context, *args, **kwargs) # Add aggregated output after function completes, if available @@ -550,8 +557,9 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - if ttfb is not None: current_span.set_attribute("metrics.ttfb", ttfb) except Exception as e: + if fn_called: + raise logging.error(f"Error in LLM tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await f(self, context, *args, **kwargs) return wrapper @@ -583,11 +591,11 @@ def traced_gemini_live(operation: str) -> Callable: def decorator(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await func(self, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await func(self, *args, **kwargs) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = f"{operation}" @@ -849,6 +857,7 @@ def traced_gemini_live(operation: str) -> Callable: current_span.set_attribute("metrics.ttfb", ttfb) # Run the original function + fn_called = True result = await func(self, *args, **kwargs) return result @@ -859,8 +868,9 @@ def traced_gemini_live(operation: str) -> Callable: raise except Exception as e: + if fn_called: + raise logging.error(f"Error in Gemini Live tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await func(self, *args, **kwargs) return wrapper @@ -889,11 +899,11 @@ def traced_openai_realtime(operation: str) -> Callable: def decorator(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await func(self, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await func(self, *args, **kwargs) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = f"{operation}" @@ -1072,6 +1082,7 @@ def traced_openai_realtime(operation: str) -> Callable: current_span.set_attribute("metrics.ttfb", ttfb) # Run the original function + fn_called = True result = await func(self, *args, **kwargs) return result @@ -1082,8 +1093,9 @@ def traced_openai_realtime(operation: str) -> Callable: raise except Exception as e: + if fn_called: + raise logging.error(f"Error in OpenAI Realtime tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await func(self, *args, **kwargs) return wrapper