Apply the same span-scope fix to traced_stt

@traced_stt had the same root issue as @traced_tts: the span lifetime
was tied to a per-transcript handler call, which doesn't match the
operation we want to trace. Now uses the __set_name__ pattern to
install:

- A push_frame wrapper that drives one STT span per finalized
  TranscriptionFrame. The span is anchored at speech start
  (VADUserStartedSpeakingFrame.timestamp - start_secs) but lazy-opened
  on the first TranscriptionFrame. Opening earlier (on VAD or
  UserStartedSpeakingFrame) races with TurnTraceObserver._handle_turn_started,
  which runs as a background task via _call_event_handler (sync=False),
  so the span would end up parented to the previous turn. Deferring
  the open to the first TranscriptionFrame avoids that race because
  STT only emits transcripts well after the turn observer has set
  the current turn's context.

- A stop_ttfb_metrics wrapper that closes the span on the TTFB-timeout
  path (called with end_time != None from stt_service.py:566). The
  span is marked stt.timed_out=True and its end_time is pinned to
  the timeout's end_time (= _last_transcript_time) so the duration
  reflects when STT actually stopped responding, not when the timeout
  fired.

Span lifecycle:
- Open: lazy on first TranscriptionFrame of a segment.
- Close (success): finalized=True attaches metrics.ttfb and closes
  the span. Multiple finalized transcripts in a single turn produce
  multiple spans.
- Close (timeout): stop_ttfb_metrics(end_time=...) closes with
  stt.timed_out=True.
- Close (orphan): UserStoppedSpeakingFrame closes any still-open
  span with stt.incomplete=True (covers turns where no finalized
  transcript and no timeout fired).

No changes required outside service_decorators.py — stt_service.py
and every per-service file are untouched.
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-12 12:00:46 -07:00
parent 079282d140
commit 111e59a7b1

View File

@@ -23,9 +23,16 @@ if TYPE_CHECKING:
from opentelemetry import context as context_api
from opentelemetry import trace
from pipecat.frames.frames import MetricsFrame, TTSStoppedFrame
from pipecat.frames.frames import (
MetricsFrame,
TranscriptionFrame,
TTSStoppedFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.processors.aggregators.llm_context import NOT_GIVEN
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.tracing.service_attributes import (
add_gemini_live_span_attributes,
add_llm_span_attributes,
@@ -425,69 +432,210 @@ def traced_stt(func: Callable | None = None, *, name: str | None = None) -> Call
- Language information
- Performance metrics like TTFB
The span is scoped to one STT segment, from
``VADUserStartedSpeakingFrame`` (or the first ``TranscriptionFrame``
when VAD did not fire, e.g. whispered speech) until a finalized
``TranscriptionFrame``. Multiple finalized transcripts in a single
user turn produce multiple sequential spans, each anchored at the
point speech for that segment began. ``metrics.ttfb`` is read after
the base ``push_frame`` runs ``stop_ttfb_metrics`` for the
finalized frame, so the value is correct for the closing span.
Args:
func: The STT method to trace.
name: Custom span name. Defaults to function name.
Returns:
Wrapped method with STT-specific tracing.
The original method unchanged. The decorator's class-definition-
time work is to install a ``push_frame`` wrapper on the owning
class that owns the span lifetime.
"""
if not is_tracing_available():
return _noop_decorator if func is None else _noop_decorator(func)
def decorator(f):
@functools.wraps(f)
async def wrapper(self, transcript, is_final, language=None):
if not getattr(self, "_tracing_enabled", False):
return await f(self, transcript, is_final, language)
def patch_push_frame(owner):
"""Wrap ``owner.push_frame`` to drive the STT span lifecycle.
fn_called = False
try:
service_class_name = self.__class__.__name__
span_name = "stt"
Idempotent: if a parent class has already been wrapped, skip.
The wrapper checks ``_tracing_enabled`` at invocation time,
so it is safe to install regardless of whether tracing is
enabled.
"""
original_push_frame = owner.push_frame
if getattr(original_push_frame, "__stt_tracing_push_frame_wrapped__", False):
return
# Get the turn context first, then fall back to service context
parent_context = _get_turn_context(self) or _get_parent_service_context(self)
# Create a new span as child of the turn span or service span
def open_span(service, state):
"""Open the STT span, anchored at ``segment_start_time`` if set."""
parent = _get_turn_context(service) or _get_parent_service_context(service)
tracer = trace.get_tracer("pipecat")
with tracer.start_as_current_span(
span_name, context=parent_context
) as current_span:
kwargs = {"context": parent}
if state["segment_start_time"] is not None:
kwargs["start_time"] = int(state["segment_start_time"] * 1e9)
span = tracer.start_span("stt", **kwargs)
try:
settings = getattr(service, "_settings", None)
add_stt_span_attributes(
span=span,
service_name=service.__class__.__name__,
model=_get_model_name(service),
settings=settings,
vad_enabled=getattr(service, "vad_enabled", False),
)
except Exception as e:
logging.warning(f"Error setting STT span baseline attributes: {e}")
state["span"] = span
def handle_pre_push(service, frame, state):
"""Record speech-start anchor.
The span itself is lazy-opened in ``handle_post_push``
when the first ``TranscriptionFrame`` arrives. Opening
on ``VADUserStartedSpeakingFrame`` or
``UserStartedSpeakingFrame`` would race with
``TurnTraceObserver._handle_turn_started``, which runs
in a background task fired by ``_call_event_handler``
(``base_object.py:232``) and may not have set the new
turn's context yet — that produces STT spans parented
to the previous turn. By the time STT actually emits a
transcript, the turn observer has run.
"""
if isinstance(frame, VADUserStartedSpeakingFrame):
# Anchor the next span at the moment speech began.
# Skip if we already have an anchor (intra-turn VAD
# re-trigger) or a span open.
if state["span"] is None and state["segment_start_time"] is None:
state["segment_start_time"] = frame.timestamp - frame.start_secs
def handle_post_push(service, frame, state):
"""Lazy-open span on first transcript; attach attrs; close on finalized.
One STT span per finalized transcript: the span opens
on the first ``TranscriptionFrame`` (anchored at
speech start via ``segment_start_time``) and closes on
``finalized=True``. Multiple finalized transcripts in
a single turn produce multiple spans; subsequent
spans lazy-open with no explicit anchor (start_time
defaults to "now"). Spans that never see a finalized
transcript are closed by ``UserStoppedSpeakingFrame``
(marked ``stt.incomplete``) or by the TTFB timeout
(marked ``stt.timed_out``).
"""
if isinstance(frame, UserStoppedSpeakingFrame):
prev_span = state["span"]
if prev_span is not None:
prev_span.set_attribute("stt.incomplete", True)
prev_span.end()
state["span"] = None
state["segment_start_time"] = None
elif isinstance(frame, TranscriptionFrame):
if state["span"] is None:
open_span(service, state)
span = state["span"]
if frame.text:
span.set_attribute("transcript", frame.text)
span.set_attribute("is_final", bool(frame.finalized))
if frame.language:
span.set_attribute("language", str(frame.language))
if frame.user_id:
span.set_attribute("user_id", frame.user_id)
if frame.finalized:
ttfb = getattr(getattr(service, "_metrics", None), "ttfb", None)
if ttfb is not None:
span.set_attribute("metrics.ttfb", ttfb)
span.end()
state["span"] = None
state["segment_start_time"] = None
@functools.wraps(original_push_frame)
async def patched_push_frame(self, frame, direction=FrameDirection.DOWNSTREAM):
state = getattr(self, "_stt_span_state", None)
if state is None:
state = {"span": None, "segment_start_time": None}
self._stt_span_state = state
if getattr(self, "_tracing_enabled", False):
try:
# Get TTFB metric if available
ttfb: float | None = getattr(getattr(self, "_metrics", None), "ttfb", None)
# Use settings from the service if available
settings = getattr(self, "_settings", None)
add_stt_span_attributes(
span=current_span,
service_name=service_class_name,
model=_get_model_name(self),
transcript=transcript,
is_final=is_final,
language=str(language) if language else None,
user_id=getattr(self, "_user_id", None),
vad_enabled=getattr(self, "vad_enabled", False),
settings=settings,
ttfb=ttfb,
)
# Call the original function
fn_called = True
return await f(self, transcript, is_final, language)
handle_pre_push(self, frame, state)
except Exception as e:
# Log any exception but don't disrupt the main flow
logging.warning(f"Error in STT transcription tracing: {e}")
raise
except Exception as e:
if fn_called:
raise
logging.error(f"Error in STT tracing (continuing without tracing): {e}")
return await f(self, transcript, is_final, language)
logging.warning(f"Error in STT pre-push tracing: {e}")
return wrapper
await original_push_frame(self, frame, direction)
if getattr(self, "_tracing_enabled", False):
try:
handle_post_push(self, frame, state)
except Exception as e:
logging.warning(f"Error in STT post-push tracing: {e}")
patched_push_frame.__stt_tracing_push_frame_wrapped__ = True
owner.push_frame = patched_push_frame
def patch_stop_ttfb_metrics(owner):
"""Wrap ``owner.stop_ttfb_metrics`` to handle the TTFB-timeout path.
When ``stop_ttfb_metrics`` is invoked with ``end_time`` set,
that signals the STT service's TTFB-timeout handler firing
without a finalized ``TranscriptionFrame`` ever arriving
(`stt_service.py:566`). In that case we attach
``metrics.ttfb`` and close the span with
``stt.timed_out=true``. The default ``end_time=None`` path
(called from ``push_frame`` on finalized frames) only
attaches ``metrics.ttfb``; closing happens in the
``push_frame`` post-hook so the final transcript attributes
land first.
"""
original_stop = owner.stop_ttfb_metrics
if getattr(original_stop, "__stt_tracing_stop_ttfb_wrapped__", False):
return
@functools.wraps(original_stop)
async def patched_stop(self, *, end_time=None):
await original_stop(self, end_time=end_time)
if not getattr(self, "_tracing_enabled", False):
return
state = getattr(self, "_stt_span_state", None)
if not state or state["span"] is None:
return
span = state["span"]
try:
ttfb = getattr(getattr(self, "_metrics", None), "ttfb", None)
if ttfb is not None:
span.set_attribute("metrics.ttfb", ttfb)
if end_time is not None:
span.set_attribute("stt.timed_out", True)
# Use end_time (= _last_transcript_time) so the
# span ends where the timeout handler actually
# finalized TTFB, rather than after the timeout
# sleep.
span.end(end_time=int(end_time * 1e9))
state["span"] = None
state["segment_start_time"] = None
except Exception as e:
logging.warning(f"Error in STT stop_ttfb_metrics tracing: {e}")
patched_stop.__stt_tracing_stop_ttfb_wrapped__ = True
owner.stop_ttfb_metrics = patched_stop
class _TracedSTTDescriptor:
"""Class-level descriptor that wires up STT tracing at class definition time.
``__set_name__`` fires when the class body finishes evaluating,
giving us a chance to wrap the owner's ``push_frame`` so that
VAD, transcription, and finalization events drive the span
lifecycle, and to wrap ``stop_ttfb_metrics`` so the
TTFB-timeout path can attach metrics and close the span when
no finalized transcript ever arrives. The decorated method
itself runs unchanged.
"""
def __set_name__(self, owner, attr_name):
patch_push_frame(owner)
patch_stop_ttfb_metrics(owner)
setattr(owner, attr_name, f)
return _TracedSTTDescriptor()
if func is not None:
return decorator(func)