Merge pull request #3735 from pipecat-ai/mb/fix-llm-tracing-error-handilng

Fix double execution of service functions on tracing errors
This commit is contained in:
Mark Backman
2026-02-13 13:23:55 -05:00
committed by GitHub
2 changed files with 45 additions and 32 deletions

1
changelog/3735.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed tracing service decorators executing the wrapped function twice when the function itself raised an exception (e.g., LLM rate limit, TTS timeout).

View File

@@ -230,19 +230,21 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -
@functools.wraps(f)
async def gen_wrapper(self, text, *args, **kwargs):
try:
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
async for item in f(self, text, *args, **kwargs):
yield item
return
if not getattr(self, "_tracing_enabled", False):
async for item in f(self, text, *args, **kwargs):
yield item
return
fn_called = False
try:
async with tracing_context(self, text):
fn_called = True
async for item in f(self, text, *args, **kwargs):
yield item
except Exception as e:
if fn_called:
raise
logging.error(f"Error in TTS tracing (continuing without tracing): {e}")
# If tracing fails, fall back to the original function
async for item in f(self, text, *args, **kwargs):
yield item
@@ -251,16 +253,18 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -
@functools.wraps(f)
async def wrapper(self, text, *args, **kwargs):
try:
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await f(self, text, *args, **kwargs)
if not getattr(self, "_tracing_enabled", False):
return await f(self, text, *args, **kwargs)
fn_called = False
try:
async with tracing_context(self, text):
fn_called = True
return await f(self, text, *args, **kwargs)
except Exception as e:
if fn_called:
raise
logging.error(f"Error in TTS tracing (continuing without tracing): {e}")
# If tracing fails, fall back to the original function
return await f(self, text, *args, **kwargs)
return wrapper
@@ -293,11 +297,11 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) -
def decorator(f):
@functools.wraps(f)
async def wrapper(self, transcript, is_final, language=None):
try:
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await f(self, transcript, is_final, language)
if not getattr(self, "_tracing_enabled", False):
return await f(self, transcript, is_final, language)
fn_called = False
try:
service_class_name = self.__class__.__name__
span_name = "stt"
@@ -332,14 +336,16 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) -
)
# Call the original function
fn_called = True
return await f(self, transcript, is_final, language)
except Exception as e:
# Log any exception but don't disrupt the main flow
logging.warning(f"Error in STT transcription tracing: {e}")
raise
except Exception as e:
if fn_called:
raise
logging.error(f"Error in STT tracing (continuing without tracing): {e}")
# If tracing fails, fall back to the original function
return await f(self, transcript, is_final, language)
return wrapper
@@ -374,11 +380,11 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
def decorator(f):
@functools.wraps(f)
async def wrapper(self, context, *args, **kwargs):
try:
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await f(self, context, *args, **kwargs)
if not getattr(self, "_tracing_enabled", False):
return await f(self, context, *args, **kwargs)
fn_called = False
try:
service_class_name = self.__class__.__name__
span_name = "llm"
@@ -525,6 +531,7 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
# Don't raise - let the function execute anyway
# Run function with modified push_frame to capture the output
fn_called = True
result = await f(self, context, *args, **kwargs)
# Add aggregated output after function completes, if available
@@ -550,8 +557,9 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if ttfb is not None:
current_span.set_attribute("metrics.ttfb", ttfb)
except Exception as e:
if fn_called:
raise
logging.error(f"Error in LLM tracing (continuing without tracing): {e}")
# If tracing fails, fall back to the original function
return await f(self, context, *args, **kwargs)
return wrapper
@@ -583,11 +591,11 @@ def traced_gemini_live(operation: str) -> Callable:
def decorator(func):
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
try:
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await func(self, *args, **kwargs)
if not getattr(self, "_tracing_enabled", False):
return await func(self, *args, **kwargs)
fn_called = False
try:
service_class_name = self.__class__.__name__
span_name = f"{operation}"
@@ -849,6 +857,7 @@ def traced_gemini_live(operation: str) -> Callable:
current_span.set_attribute("metrics.ttfb", ttfb)
# Run the original function
fn_called = True
result = await func(self, *args, **kwargs)
return result
@@ -859,8 +868,9 @@ def traced_gemini_live(operation: str) -> Callable:
raise
except Exception as e:
if fn_called:
raise
logging.error(f"Error in Gemini Live tracing (continuing without tracing): {e}")
# If tracing fails, fall back to the original function
return await func(self, *args, **kwargs)
return wrapper
@@ -889,11 +899,11 @@ def traced_openai_realtime(operation: str) -> Callable:
def decorator(func):
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
try:
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await func(self, *args, **kwargs)
if not getattr(self, "_tracing_enabled", False):
return await func(self, *args, **kwargs)
fn_called = False
try:
service_class_name = self.__class__.__name__
span_name = f"{operation}"
@@ -1072,6 +1082,7 @@ def traced_openai_realtime(operation: str) -> Callable:
current_span.set_attribute("metrics.ttfb", ttfb)
# Run the original function
fn_called = True
result = await func(self, *args, **kwargs)
return result
@@ -1082,8 +1093,9 @@ def traced_openai_realtime(operation: str) -> Callable:
raise
except Exception as e:
if fn_called:
raise
logging.error(f"Error in OpenAI Realtime tracing (continuing without tracing): {e}")
# If tracing fails, fall back to the original function
return await func(self, *args, **kwargs)
return wrapper