Add tests for TracingContext and TurnTraceObserver

Cover pipeline-scoped tracing context lifecycle, span hierarchy,
conversation/turn context management, and concurrent pipeline isolation.
This commit is contained in:
Mark Backman
2026-02-11 22:42:58 -05:00
parent 358f237507
commit 71a752c971
4 changed files with 634 additions and 0 deletions

View File

@@ -41,6 +41,7 @@ jobs:
--extra livekit \
--extra local-smart-turn-v3 \
--extra piper \
--extra tracing \
--extra websocket
- name: Run tests with coverage

View File

@@ -45,6 +45,7 @@ jobs:
--extra livekit \
--extra local-smart-turn-v3 \
--extra piper \
--extra tracing \
--extra websocket
- name: Test with pytest

View File

@@ -0,0 +1,127 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
try:
from opentelemetry.sdk.trace import TracerProvider
HAS_OPENTELEMETRY = True
except ImportError:
HAS_OPENTELEMETRY = False
from pipecat.utils.tracing.tracing_context import TracingContext
@unittest.skipUnless(HAS_OPENTELEMETRY, "opentelemetry not installed")
class TestTracingContext(unittest.TestCase):
"""Tests for TracingContext."""
@classmethod
def setUpClass(cls):
"""Set up a tracer provider for generating span contexts."""
cls._provider = TracerProvider()
cls._tracer = cls._provider.get_tracer("test")
def test_initial_state_is_empty(self):
"""Test that a new TracingContext starts with no context set."""
ctx = TracingContext()
self.assertIsNone(ctx.get_conversation_context())
self.assertIsNone(ctx.get_turn_context())
self.assertIsNone(ctx.conversation_id)
def test_set_and_get_conversation_context(self):
"""Test setting and retrieving conversation context."""
ctx = TracingContext()
span = self._tracer.start_span("conv")
span_context = span.get_span_context()
ctx.set_conversation_context(span_context, "conv-123")
self.assertIsNotNone(ctx.get_conversation_context())
self.assertEqual(ctx.conversation_id, "conv-123")
span.end()
def test_clear_conversation_context(self):
"""Test clearing conversation context by passing None."""
ctx = TracingContext()
span = self._tracer.start_span("conv")
ctx.set_conversation_context(span.get_span_context(), "conv-123")
self.assertIsNotNone(ctx.get_conversation_context())
ctx.set_conversation_context(None)
self.assertIsNone(ctx.get_conversation_context())
self.assertIsNone(ctx.conversation_id)
span.end()
def test_set_and_get_turn_context(self):
"""Test setting and retrieving turn context."""
ctx = TracingContext()
span = self._tracer.start_span("turn")
span_context = span.get_span_context()
ctx.set_turn_context(span_context)
self.assertIsNotNone(ctx.get_turn_context())
span.end()
def test_clear_turn_context(self):
"""Test clearing turn context by passing None."""
ctx = TracingContext()
span = self._tracer.start_span("turn")
ctx.set_turn_context(span.get_span_context())
self.assertIsNotNone(ctx.get_turn_context())
ctx.set_turn_context(None)
self.assertIsNone(ctx.get_turn_context())
span.end()
def test_generate_conversation_id(self):
"""Test that generated conversation IDs are unique UUIDs."""
id1 = TracingContext.generate_conversation_id()
id2 = TracingContext.generate_conversation_id()
self.assertIsInstance(id1, str)
self.assertNotEqual(id1, id2)
def test_instances_are_isolated(self):
"""Test that two TracingContext instances do not share state."""
ctx_a = TracingContext()
ctx_b = TracingContext()
span = self._tracer.start_span("turn")
ctx_a.set_turn_context(span.get_span_context())
ctx_a.set_conversation_context(span.get_span_context(), "conv-a")
# ctx_b should still be empty
self.assertIsNone(ctx_b.get_turn_context())
self.assertIsNone(ctx_b.get_conversation_context())
self.assertIsNone(ctx_b.conversation_id)
span.end()
def test_conversation_and_turn_are_independent(self):
"""Test that clearing turn context does not affect conversation context."""
ctx = TracingContext()
conv_span = self._tracer.start_span("conv")
turn_span = self._tracer.start_span("turn")
ctx.set_conversation_context(conv_span.get_span_context(), "conv-1")
ctx.set_turn_context(turn_span.get_span_context())
# Clear turn but conversation should remain
ctx.set_turn_context(None)
self.assertIsNone(ctx.get_turn_context())
self.assertIsNotNone(ctx.get_conversation_context())
self.assertEqual(ctx.conversation_id, "conv-1")
conv_span.end()
turn_span.end()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,505 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import threading
import unittest
try:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, SpanExportResult
HAS_OPENTELEMETRY = True
except ImportError:
HAS_OPENTELEMETRY = False
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.tests.utils import SleepFrame, run_test
from pipecat.utils.tracing.tracing_context import TracingContext
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
if HAS_OPENTELEMETRY:
class _InMemorySpanExporter(SpanExporter):
"""Simple in-memory span exporter for testing."""
def __init__(self):
"""Initialize the exporter."""
self._spans = []
self._lock = threading.Lock()
def export(self, spans):
"""Export spans to memory."""
with self._lock:
self._spans.extend(spans)
return SpanExportResult.SUCCESS
def get_finished_spans(self):
"""Return collected spans."""
with self._lock:
return list(self._spans)
def clear(self):
"""Clear collected spans."""
with self._lock:
self._spans.clear()
@unittest.skipUnless(HAS_OPENTELEMETRY, "opentelemetry not installed")
class TestTurnTraceObserver(unittest.IsolatedAsyncioTestCase):
"""Tests for TurnTraceObserver."""
def setUp(self):
"""Set up a fresh provider and exporter for each test.
We create a dedicated TracerProvider per test and inject its tracer
directly into the observer, avoiding the global provider singleton.
"""
self._exporter = _InMemorySpanExporter()
self._provider = TracerProvider()
self._provider.add_span_processor(SimpleSpanProcessor(self._exporter))
self._tracer = self._provider.get_tracer("pipecat.turn")
def tearDown(self):
"""Shut down the provider to flush spans."""
self._provider.shutdown()
def _create_observers(self, conversation_id=None, tracing_context=None):
"""Create a standard set of turn/trace observers.
Args:
conversation_id: Optional conversation ID.
tracing_context: Optional TracingContext instance.
Returns:
Tuple of (turn_tracker, latency_tracker, trace_observer, tracing_context).
"""
tracing_context = tracing_context or TracingContext()
turn_tracker = TurnTrackingObserver(turn_end_timeout_secs=0.2)
latency_tracker = UserBotLatencyObserver()
trace_observer = TurnTraceObserver(
turn_tracker,
latency_tracker=latency_tracker,
conversation_id=conversation_id,
tracing_context=tracing_context,
)
# Inject the test tracer so spans go to our in-memory exporter
trace_observer._tracer = self._tracer
return turn_tracker, latency_tracker, trace_observer, tracing_context
def _all_observers(self, trace_observer):
"""Return the list of observers needed for run_test."""
return [trace_observer._turn_tracker, trace_observer._latency_tracker, trace_observer]
def _get_spans_by_name(self, name):
"""Return finished spans with the given name."""
return [s for s in self._exporter.get_finished_spans() if s.name == name]
async def test_conversation_span_created_on_start_frame(self):
"""Test that a conversation span is created when StartFrame is observed."""
_, _, trace_observer, _ = self._create_observers(conversation_id="test-conv")
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
# End conversation to flush the conversation span (normally done by PipelineTask._cleanup)
trace_observer.end_conversation_tracing()
conv_spans = self._get_spans_by_name("conversation")
self.assertEqual(len(conv_spans), 1)
self.assertEqual(conv_spans[0].attributes["conversation.id"], "test-conv")
self.assertEqual(conv_spans[0].attributes["conversation.type"], "voice")
async def test_turn_spans_created_for_each_turn(self):
"""Test that a turn span is created for each conversation turn."""
_, _, trace_observer, _ = self._create_observers()
processor = IdentityFilter()
frames_to_send = [
# Turn 1
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.05),
# Turn 2
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
turn_spans = self._get_spans_by_name("turn")
self.assertEqual(len(turn_spans), 2)
turn_numbers = {s.attributes["turn.number"] for s in turn_spans}
self.assertEqual(turn_numbers, {1, 2})
async def test_turn_spans_are_children_of_conversation(self):
"""Test that turn spans are parented under the conversation span."""
_, _, trace_observer, _ = self._create_observers()
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
# End conversation to flush the conversation span
trace_observer.end_conversation_tracing()
conv_spans = self._get_spans_by_name("conversation")
turn_spans = self._get_spans_by_name("turn")
self.assertEqual(len(conv_spans), 1)
self.assertEqual(len(turn_spans), 1)
# Turn span's parent should be the conversation span
conv_span_id = conv_spans[0].context.span_id
turn_parent_id = turn_spans[0].parent.span_id
self.assertEqual(turn_parent_id, conv_span_id)
async def test_interrupted_turn_marked(self):
"""Test that an interrupted turn span has was_interrupted=True."""
_, _, trace_observer, _ = self._create_observers()
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
# User interrupts
UserStartedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
# End conversation to flush remaining spans
trace_observer.end_conversation_tracing()
turn_spans = self._get_spans_by_name("turn")
self.assertGreaterEqual(len(turn_spans), 1)
# First turn should be interrupted
interrupted_turns = [s for s in turn_spans if s.attributes.get("turn.was_interrupted")]
self.assertGreaterEqual(len(interrupted_turns), 1)
async def test_tracing_context_updated_during_turn(self):
"""Test that TracingContext is populated during a turn and cleared after."""
tracing_ctx = TracingContext()
_, _, trace_observer, _ = self._create_observers(tracing_context=tracing_ctx)
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
# After the turn ends, turn context should be cleared
self.assertIsNone(tracing_ctx.get_turn_context())
async def test_tracing_context_cleared_after_conversation_end(self):
"""Test that TracingContext is cleared when conversation tracing ends."""
tracing_ctx = TracingContext()
_, _, trace_observer, _ = self._create_observers(tracing_context=tracing_ctx)
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
# Manually end conversation tracing (as PipelineTask._cleanup does)
trace_observer.end_conversation_tracing()
self.assertIsNone(tracing_ctx.get_conversation_context())
self.assertIsNone(tracing_ctx.get_turn_context())
self.assertIsNone(tracing_ctx.conversation_id)
async def test_additional_span_attributes(self):
"""Test that additional span attributes are added to the conversation span."""
extra_attrs = {"deployment.id": "abc-123", "customer.tier": "premium"}
tracing_ctx = TracingContext()
turn_tracker = TurnTrackingObserver(turn_end_timeout_secs=0.2)
latency_tracker = UserBotLatencyObserver()
trace_observer = TurnTraceObserver(
turn_tracker,
latency_tracker=latency_tracker,
additional_span_attributes=extra_attrs,
tracing_context=tracing_ctx,
)
trace_observer._tracer = self._tracer
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[turn_tracker, latency_tracker, trace_observer],
)
# End conversation to flush the conversation span
trace_observer.end_conversation_tracing()
conv_spans = self._get_spans_by_name("conversation")
self.assertEqual(len(conv_spans), 1)
self.assertEqual(conv_spans[0].attributes["deployment.id"], "abc-123")
self.assertEqual(conv_spans[0].attributes["customer.tier"], "premium")
async def test_concurrent_pipelines_are_isolated(self):
"""Test that two pipelines with separate TracingContexts don't interfere."""
tracing_ctx_a = TracingContext()
tracing_ctx_b = TracingContext()
_, _, trace_observer_a, _ = self._create_observers(
conversation_id="conv-a", tracing_context=tracing_ctx_a
)
_, _, trace_observer_b, _ = self._create_observers(
conversation_id="conv-b", tracing_context=tracing_ctx_b
)
processor_a = IdentityFilter()
processor_b = IdentityFilter()
frames = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
# Run both pipelines concurrently
await asyncio.gather(
run_test(
processor_a,
frames_to_send=frames,
expected_down_frames=expected,
observers=self._all_observers(trace_observer_a),
),
run_test(
processor_b,
frames_to_send=frames,
expected_down_frames=expected,
observers=self._all_observers(trace_observer_b),
),
)
# End both conversations to flush spans
trace_observer_a.end_conversation_tracing()
trace_observer_b.end_conversation_tracing()
# Each TracingContext should have its own conversation ID
conv_spans = self._get_spans_by_name("conversation")
conv_ids = {s.attributes["conversation.id"] for s in conv_spans}
self.assertEqual(conv_ids, {"conv-a", "conv-b"})
# Turn spans should be children of their own conversation span, not cross-linked
turn_spans = self._get_spans_by_name("turn")
conv_span_map = {s.context.span_id: s.attributes["conversation.id"] for s in conv_spans}
for turn_span in turn_spans:
parent_id = turn_span.parent.span_id
turn_conv_id = turn_span.attributes["conversation.id"]
parent_conv_id = conv_span_map[parent_id]
self.assertEqual(
turn_conv_id,
parent_conv_id,
f"Turn span for {turn_conv_id} parented under {parent_conv_id}",
)
async def test_end_conversation_closes_active_turn(self):
"""Test that end_conversation_tracing closes any active turn span."""
_, _, trace_observer, _ = self._create_observers()
# Manually start conversation and a turn
trace_observer.start_conversation_tracing("conv-end-test")
await trace_observer._handle_turn_started(1)
self.assertIsNotNone(trace_observer._current_span)
self.assertIsNotNone(trace_observer._conversation_span)
# End conversation — should close both turn and conversation
trace_observer.end_conversation_tracing()
self.assertIsNone(trace_observer._current_span)
self.assertIsNone(trace_observer._conversation_span)
# Check span attributes
turn_spans = self._get_spans_by_name("turn")
self.assertEqual(len(turn_spans), 1)
self.assertTrue(turn_spans[0].attributes["turn.was_interrupted"])
self.assertTrue(turn_spans[0].attributes["turn.ended_by_conversation_end"])
async def test_conversation_id_auto_generated(self):
"""Test that a conversation ID is auto-generated when none is provided."""
_, _, trace_observer, _ = self._create_observers(conversation_id=None)
processor = IdentityFilter()
frames_to_send = [
UserStartedSpeakingFrame(),
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.4),
]
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=self._all_observers(trace_observer),
)
# End conversation to flush the conversation span
trace_observer.end_conversation_tracing()
conv_spans = self._get_spans_by_name("conversation")
self.assertEqual(len(conv_spans), 1)
# Should have an auto-generated UUID as conversation.id
conv_id = conv_spans[0].attributes["conversation.id"]
self.assertIsNotNone(conv_id)
self.assertGreater(len(conv_id), 0)
if __name__ == "__main__":
unittest.main()