From e50b138ab21da65fad3182c91d66a286054d6615 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 12 Feb 2026 22:03:55 -0500 Subject: [PATCH 1/2] Fix double execution of service functions when tracing errors occur The outer try/except in each service decorator caught both tracing setup errors and application errors from the wrapped function. If the function itself raised (e.g. LLM rate limit, TTS timeout), the exception was caught and the function was called a second time. Fix by tracking whether the original function was called via a fn_called flag. If the function was already called, re-raise the exception instead of falling back to untraced re-execution. --- .../utils/tracing/service_decorators.py | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/src/pipecat/utils/tracing/service_decorators.py b/src/pipecat/utils/tracing/service_decorators.py index 0fd939162..968fe8e8a 100644 --- a/src/pipecat/utils/tracing/service_decorators.py +++ b/src/pipecat/utils/tracing/service_decorators.py @@ -230,19 +230,21 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def gen_wrapper(self, text, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - async for item in f(self, text, *args, **kwargs): - yield item - return + if not getattr(self, "_tracing_enabled", False): + async for item in f(self, text, *args, **kwargs): + yield item + return + fn_called = False + try: async with tracing_context(self, text): + fn_called = True async for item in f(self, text, *args, **kwargs): yield item except Exception as e: + if fn_called: + raise logging.error(f"Error in TTS tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function async for item in f(self, text, *args, **kwargs): yield item @@ -251,16 +253,18 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def wrapper(self, text, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await f(self, text, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await f(self, text, *args, **kwargs) + fn_called = False + try: async with tracing_context(self, text): + fn_called = True return await f(self, text, *args, **kwargs) except Exception as e: + if fn_called: + raise logging.error(f"Error in TTS tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await f(self, text, *args, **kwargs) return wrapper @@ -293,11 +297,11 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) - def decorator(f): @functools.wraps(f) async def wrapper(self, transcript, is_final, language=None): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await f(self, transcript, is_final, language) + if not getattr(self, "_tracing_enabled", False): + return await f(self, transcript, is_final, language) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = "stt" @@ -332,14 +336,16 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) - ) # Call the original function + fn_called = True return await f(self, transcript, is_final, language) except Exception as e: # Log any exception but don't disrupt the main flow logging.warning(f"Error in STT transcription tracing: {e}") raise except Exception as e: + if fn_called: + raise logging.error(f"Error in STT tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await f(self, transcript, is_final, language) return wrapper @@ -374,11 +380,11 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - def decorator(f): @functools.wraps(f) async def wrapper(self, context, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await f(self, context, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await f(self, context, *args, **kwargs) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = "llm" @@ -525,6 +531,7 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - # Don't raise - let the function execute anyway # Run function with modified push_frame to capture the output + fn_called = True result = await f(self, context, *args, **kwargs) # Add aggregated output after function completes, if available @@ -550,8 +557,9 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - if ttfb is not None: current_span.set_attribute("metrics.ttfb", ttfb) except Exception as e: + if fn_called: + raise logging.error(f"Error in LLM tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await f(self, context, *args, **kwargs) return wrapper @@ -583,11 +591,11 @@ def traced_gemini_live(operation: str) -> Callable: def decorator(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await func(self, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await func(self, *args, **kwargs) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = f"{operation}" @@ -849,6 +857,7 @@ def traced_gemini_live(operation: str) -> Callable: current_span.set_attribute("metrics.ttfb", ttfb) # Run the original function + fn_called = True result = await func(self, *args, **kwargs) return result @@ -859,8 +868,9 @@ def traced_gemini_live(operation: str) -> Callable: raise except Exception as e: + if fn_called: + raise logging.error(f"Error in Gemini Live tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await func(self, *args, **kwargs) return wrapper @@ -889,11 +899,11 @@ def traced_openai_realtime(operation: str) -> Callable: def decorator(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): - try: - # Check if tracing is enabled for this service instance - if not getattr(self, "_tracing_enabled", False): - return await func(self, *args, **kwargs) + if not getattr(self, "_tracing_enabled", False): + return await func(self, *args, **kwargs) + fn_called = False + try: service_class_name = self.__class__.__name__ span_name = f"{operation}" @@ -1072,6 +1082,7 @@ def traced_openai_realtime(operation: str) -> Callable: current_span.set_attribute("metrics.ttfb", ttfb) # Run the original function + fn_called = True result = await func(self, *args, **kwargs) return result @@ -1082,8 +1093,9 @@ def traced_openai_realtime(operation: str) -> Callable: raise except Exception as e: + if fn_called: + raise logging.error(f"Error in OpenAI Realtime tracing (continuing without tracing): {e}") - # If tracing fails, fall back to the original function return await func(self, *args, **kwargs) return wrapper From a5f95acaf59e3e8a526362e65cc546b970048257 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 12 Feb 2026 22:07:09 -0500 Subject: [PATCH 2/2] Add changelog for PR #3735 --- changelog/3735.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/3735.fixed.md diff --git a/changelog/3735.fixed.md b/changelog/3735.fixed.md new file mode 100644 index 000000000..02de936c7 --- /dev/null +++ b/changelog/3735.fixed.md @@ -0,0 +1 @@ +- Fixed tracing service decorators executing the wrapped function twice when the function itself raised an exception (e.g., LLM rate limit, TTS timeout).