diff --git a/src/pipecat/utils/tracing/service_decorators.py b/src/pipecat/utils/tracing/service_decorators.py index 94770db52..fe4bbe96e 100644 --- a/src/pipecat/utils/tracing/service_decorators.py +++ b/src/pipecat/utils/tracing/service_decorators.py @@ -23,9 +23,16 @@ if TYPE_CHECKING: from opentelemetry import context as context_api from opentelemetry import trace -from pipecat.frames.frames import MetricsFrame, TTSStoppedFrame +from pipecat.frames.frames import ( + MetricsFrame, + TranscriptionFrame, + TTSStoppedFrame, + UserStoppedSpeakingFrame, + VADUserStartedSpeakingFrame, +) from pipecat.metrics.metrics import TTFBMetricsData from pipecat.processors.aggregators.llm_context import NOT_GIVEN +from pipecat.processors.frame_processor import FrameDirection from pipecat.utils.tracing.service_attributes import ( add_gemini_live_span_attributes, add_llm_span_attributes, @@ -425,69 +432,210 @@ def traced_stt(func: Callable | None = None, *, name: str | None = None) -> Call - Language information - Performance metrics like TTFB + The span is scoped to one STT segment, from + ``VADUserStartedSpeakingFrame`` (or the first ``TranscriptionFrame`` + when VAD did not fire, e.g. whispered speech) until a finalized + ``TranscriptionFrame``. Multiple finalized transcripts in a single + user turn produce multiple sequential spans, each anchored at the + point speech for that segment began. ``metrics.ttfb`` is read after + the base ``push_frame`` runs ``stop_ttfb_metrics`` for the + finalized frame, so the value is correct for the closing span. + Args: func: The STT method to trace. name: Custom span name. Defaults to function name. Returns: - Wrapped method with STT-specific tracing. + The original method unchanged. The decorator's class-definition- + time work is to install a ``push_frame`` wrapper on the owning + class that owns the span lifetime. """ if not is_tracing_available(): return _noop_decorator if func is None else _noop_decorator(func) def decorator(f): - @functools.wraps(f) - async def wrapper(self, transcript, is_final, language=None): - if not getattr(self, "_tracing_enabled", False): - return await f(self, transcript, is_final, language) + def patch_push_frame(owner): + """Wrap ``owner.push_frame`` to drive the STT span lifecycle. - fn_called = False - try: - service_class_name = self.__class__.__name__ - span_name = "stt" + Idempotent: if a parent class has already been wrapped, skip. + The wrapper checks ``_tracing_enabled`` at invocation time, + so it is safe to install regardless of whether tracing is + enabled. + """ + original_push_frame = owner.push_frame + if getattr(original_push_frame, "__stt_tracing_push_frame_wrapped__", False): + return - # Get the turn context first, then fall back to service context - parent_context = _get_turn_context(self) or _get_parent_service_context(self) - - # Create a new span as child of the turn span or service span + def open_span(service, state): + """Open the STT span, anchored at ``segment_start_time`` if set.""" + parent = _get_turn_context(service) or _get_parent_service_context(service) tracer = trace.get_tracer("pipecat") - with tracer.start_as_current_span( - span_name, context=parent_context - ) as current_span: + kwargs = {"context": parent} + if state["segment_start_time"] is not None: + kwargs["start_time"] = int(state["segment_start_time"] * 1e9) + span = tracer.start_span("stt", **kwargs) + try: + settings = getattr(service, "_settings", None) + add_stt_span_attributes( + span=span, + service_name=service.__class__.__name__, + model=_get_model_name(service), + settings=settings, + vad_enabled=getattr(service, "vad_enabled", False), + ) + except Exception as e: + logging.warning(f"Error setting STT span baseline attributes: {e}") + state["span"] = span + + def handle_pre_push(service, frame, state): + """Record speech-start anchor. + + The span itself is lazy-opened in ``handle_post_push`` + when the first ``TranscriptionFrame`` arrives. Opening + on ``VADUserStartedSpeakingFrame`` or + ``UserStartedSpeakingFrame`` would race with + ``TurnTraceObserver._handle_turn_started``, which runs + in a background task fired by ``_call_event_handler`` + (``base_object.py:232``) and may not have set the new + turn's context yet — that produces STT spans parented + to the previous turn. By the time STT actually emits a + transcript, the turn observer has run. + """ + if isinstance(frame, VADUserStartedSpeakingFrame): + # Anchor the next span at the moment speech began. + # Skip if we already have an anchor (intra-turn VAD + # re-trigger) or a span open. + if state["span"] is None and state["segment_start_time"] is None: + state["segment_start_time"] = frame.timestamp - frame.start_secs + + def handle_post_push(service, frame, state): + """Lazy-open span on first transcript; attach attrs; close on finalized. + + One STT span per finalized transcript: the span opens + on the first ``TranscriptionFrame`` (anchored at + speech start via ``segment_start_time``) and closes on + ``finalized=True``. Multiple finalized transcripts in + a single turn produce multiple spans; subsequent + spans lazy-open with no explicit anchor (start_time + defaults to "now"). Spans that never see a finalized + transcript are closed by ``UserStoppedSpeakingFrame`` + (marked ``stt.incomplete``) or by the TTFB timeout + (marked ``stt.timed_out``). + """ + if isinstance(frame, UserStoppedSpeakingFrame): + prev_span = state["span"] + if prev_span is not None: + prev_span.set_attribute("stt.incomplete", True) + prev_span.end() + state["span"] = None + state["segment_start_time"] = None + elif isinstance(frame, TranscriptionFrame): + if state["span"] is None: + open_span(service, state) + span = state["span"] + if frame.text: + span.set_attribute("transcript", frame.text) + span.set_attribute("is_final", bool(frame.finalized)) + if frame.language: + span.set_attribute("language", str(frame.language)) + if frame.user_id: + span.set_attribute("user_id", frame.user_id) + if frame.finalized: + ttfb = getattr(getattr(service, "_metrics", None), "ttfb", None) + if ttfb is not None: + span.set_attribute("metrics.ttfb", ttfb) + span.end() + state["span"] = None + state["segment_start_time"] = None + + @functools.wraps(original_push_frame) + async def patched_push_frame(self, frame, direction=FrameDirection.DOWNSTREAM): + state = getattr(self, "_stt_span_state", None) + if state is None: + state = {"span": None, "segment_start_time": None} + self._stt_span_state = state + + if getattr(self, "_tracing_enabled", False): try: - # Get TTFB metric if available - ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None) - - # Use settings from the service if available - settings = getattr(self, "_settings", None) - - add_stt_span_attributes( - span=current_span, - service_name=service_class_name, - model=_get_model_name(self), - transcript=transcript, - is_final=is_final, - language=str(language) if language else None, - user_id=getattr(self, "_user_id", None), - vad_enabled=getattr(self, "vad_enabled", False), - settings=settings, - ttfb=ttfb, - ) - - # Call the original function - fn_called = True - return await f(self, transcript, is_final, language) + handle_pre_push(self, frame, state) except Exception as e: - # Log any exception but don't disrupt the main flow - logging.warning(f"Error in STT transcription tracing: {e}") - raise - except Exception as e: - if fn_called: - raise - logging.error(f"Error in STT tracing (continuing without tracing): {e}") - return await f(self, transcript, is_final, language) + logging.warning(f"Error in STT pre-push tracing: {e}") - return wrapper + await original_push_frame(self, frame, direction) + + if getattr(self, "_tracing_enabled", False): + try: + handle_post_push(self, frame, state) + except Exception as e: + logging.warning(f"Error in STT post-push tracing: {e}") + + patched_push_frame.__stt_tracing_push_frame_wrapped__ = True + owner.push_frame = patched_push_frame + + def patch_stop_ttfb_metrics(owner): + """Wrap ``owner.stop_ttfb_metrics`` to handle the TTFB-timeout path. + + When ``stop_ttfb_metrics`` is invoked with ``end_time`` set, + that signals the STT service's TTFB-timeout handler firing + without a finalized ``TranscriptionFrame`` ever arriving + (`stt_service.py:566`). In that case we attach + ``metrics.ttfb`` and close the span with + ``stt.timed_out=true``. The default ``end_time=None`` path + (called from ``push_frame`` on finalized frames) only + attaches ``metrics.ttfb``; closing happens in the + ``push_frame`` post-hook so the final transcript attributes + land first. + """ + original_stop = owner.stop_ttfb_metrics + if getattr(original_stop, "__stt_tracing_stop_ttfb_wrapped__", False): + return + + @functools.wraps(original_stop) + async def patched_stop(self, *, end_time=None): + await original_stop(self, end_time=end_time) + if not getattr(self, "_tracing_enabled", False): + return + state = getattr(self, "_stt_span_state", None) + if not state or state["span"] is None: + return + span = state["span"] + try: + ttfb = getattr(getattr(self, "_metrics", None), "ttfb", None) + if ttfb is not None: + span.set_attribute("metrics.ttfb", ttfb) + if end_time is not None: + span.set_attribute("stt.timed_out", True) + # Use end_time (= _last_transcript_time) so the + # span ends where the timeout handler actually + # finalized TTFB, rather than after the timeout + # sleep. + span.end(end_time=int(end_time * 1e9)) + state["span"] = None + state["segment_start_time"] = None + except Exception as e: + logging.warning(f"Error in STT stop_ttfb_metrics tracing: {e}") + + patched_stop.__stt_tracing_stop_ttfb_wrapped__ = True + owner.stop_ttfb_metrics = patched_stop + + class _TracedSTTDescriptor: + """Class-level descriptor that wires up STT tracing at class definition time. + + ``__set_name__`` fires when the class body finishes evaluating, + giving us a chance to wrap the owner's ``push_frame`` so that + VAD, transcription, and finalization events drive the span + lifecycle, and to wrap ``stop_ttfb_metrics`` so the + TTFB-timeout path can attach metrics and close the span when + no finalized transcript ever arrives. The decorated method + itself runs unchanged. + """ + + def __set_name__(self, owner, attr_name): + patch_push_frame(owner) + patch_stop_ttfb_metrics(owner) + setattr(owner, attr_name, f) + + return _TracedSTTDescriptor() if func is not None: return decorator(func)