From 71a752c971d9ef2231b4f3820df8d55d3b7a683d Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 11 Feb 2026 22:42:58 -0500 Subject: [PATCH] Add tests for TracingContext and TurnTraceObserver Cover pipeline-scoped tracing context lifecycle, span hierarchy, conversation/turn context management, and concurrent pipeline isolation. --- .github/workflows/coverage.yaml | 1 + .github/workflows/tests.yaml | 1 + tests/test_tracing_context.py | 127 ++++++++ tests/test_turn_trace_observer.py | 505 ++++++++++++++++++++++++++++++ 4 files changed, 634 insertions(+) create mode 100644 tests/test_tracing_context.py create mode 100644 tests/test_turn_trace_observer.py diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml index 852611eba..b78067c97 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/coverage.yaml @@ -41,6 +41,7 @@ jobs: --extra livekit \ --extra local-smart-turn-v3 \ --extra piper \ + --extra tracing \ --extra websocket - name: Run tests with coverage diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 54495911b..5bdfb94f4 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -45,6 +45,7 @@ jobs: --extra livekit \ --extra local-smart-turn-v3 \ --extra piper \ + --extra tracing \ --extra websocket - name: Test with pytest diff --git a/tests/test_tracing_context.py b/tests/test_tracing_context.py new file mode 100644 index 000000000..06aa34683 --- /dev/null +++ b/tests/test_tracing_context.py @@ -0,0 +1,127 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import unittest + +try: + from opentelemetry.sdk.trace import TracerProvider + + HAS_OPENTELEMETRY = True +except ImportError: + HAS_OPENTELEMETRY = False + +from pipecat.utils.tracing.tracing_context import TracingContext + + +@unittest.skipUnless(HAS_OPENTELEMETRY, "opentelemetry not installed") +class TestTracingContext(unittest.TestCase): + """Tests for TracingContext.""" + + @classmethod + def setUpClass(cls): + """Set up a tracer provider for generating span contexts.""" + cls._provider = TracerProvider() + cls._tracer = cls._provider.get_tracer("test") + + def test_initial_state_is_empty(self): + """Test that a new TracingContext starts with no context set.""" + ctx = TracingContext() + self.assertIsNone(ctx.get_conversation_context()) + self.assertIsNone(ctx.get_turn_context()) + self.assertIsNone(ctx.conversation_id) + + def test_set_and_get_conversation_context(self): + """Test setting and retrieving conversation context.""" + ctx = TracingContext() + span = self._tracer.start_span("conv") + span_context = span.get_span_context() + + ctx.set_conversation_context(span_context, "conv-123") + + self.assertIsNotNone(ctx.get_conversation_context()) + self.assertEqual(ctx.conversation_id, "conv-123") + span.end() + + def test_clear_conversation_context(self): + """Test clearing conversation context by passing None.""" + ctx = TracingContext() + span = self._tracer.start_span("conv") + + ctx.set_conversation_context(span.get_span_context(), "conv-123") + self.assertIsNotNone(ctx.get_conversation_context()) + + ctx.set_conversation_context(None) + self.assertIsNone(ctx.get_conversation_context()) + self.assertIsNone(ctx.conversation_id) + span.end() + + def test_set_and_get_turn_context(self): + """Test setting and retrieving turn context.""" + ctx = TracingContext() + span = self._tracer.start_span("turn") + span_context = span.get_span_context() + + ctx.set_turn_context(span_context) + + self.assertIsNotNone(ctx.get_turn_context()) + span.end() + + def test_clear_turn_context(self): + """Test clearing turn context by passing None.""" + ctx = TracingContext() + span = self._tracer.start_span("turn") + + ctx.set_turn_context(span.get_span_context()) + self.assertIsNotNone(ctx.get_turn_context()) + + ctx.set_turn_context(None) + self.assertIsNone(ctx.get_turn_context()) + span.end() + + def test_generate_conversation_id(self): + """Test that generated conversation IDs are unique UUIDs.""" + id1 = TracingContext.generate_conversation_id() + id2 = TracingContext.generate_conversation_id() + self.assertIsInstance(id1, str) + self.assertNotEqual(id1, id2) + + def test_instances_are_isolated(self): + """Test that two TracingContext instances do not share state.""" + ctx_a = TracingContext() + ctx_b = TracingContext() + + span = self._tracer.start_span("turn") + + ctx_a.set_turn_context(span.get_span_context()) + ctx_a.set_conversation_context(span.get_span_context(), "conv-a") + + # ctx_b should still be empty + self.assertIsNone(ctx_b.get_turn_context()) + self.assertIsNone(ctx_b.get_conversation_context()) + self.assertIsNone(ctx_b.conversation_id) + span.end() + + def test_conversation_and_turn_are_independent(self): + """Test that clearing turn context does not affect conversation context.""" + ctx = TracingContext() + conv_span = self._tracer.start_span("conv") + turn_span = self._tracer.start_span("turn") + + ctx.set_conversation_context(conv_span.get_span_context(), "conv-1") + ctx.set_turn_context(turn_span.get_span_context()) + + # Clear turn but conversation should remain + ctx.set_turn_context(None) + self.assertIsNone(ctx.get_turn_context()) + self.assertIsNotNone(ctx.get_conversation_context()) + self.assertEqual(ctx.conversation_id, "conv-1") + + conv_span.end() + turn_span.end() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_turn_trace_observer.py b/tests/test_turn_trace_observer.py new file mode 100644 index 000000000..41ff41b8b --- /dev/null +++ b/tests/test_turn_trace_observer.py @@ -0,0 +1,505 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import threading +import unittest + +try: + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, SpanExportResult + + HAS_OPENTELEMETRY = True +except ImportError: + HAS_OPENTELEMETRY = False + +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.observers.turn_tracking_observer import TurnTrackingObserver +from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver +from pipecat.processors.filters.identity_filter import IdentityFilter +from pipecat.tests.utils import SleepFrame, run_test +from pipecat.utils.tracing.tracing_context import TracingContext +from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver + +if HAS_OPENTELEMETRY: + + class _InMemorySpanExporter(SpanExporter): + """Simple in-memory span exporter for testing.""" + + def __init__(self): + """Initialize the exporter.""" + self._spans = [] + self._lock = threading.Lock() + + def export(self, spans): + """Export spans to memory.""" + with self._lock: + self._spans.extend(spans) + return SpanExportResult.SUCCESS + + def get_finished_spans(self): + """Return collected spans.""" + with self._lock: + return list(self._spans) + + def clear(self): + """Clear collected spans.""" + with self._lock: + self._spans.clear() + + +@unittest.skipUnless(HAS_OPENTELEMETRY, "opentelemetry not installed") +class TestTurnTraceObserver(unittest.IsolatedAsyncioTestCase): + """Tests for TurnTraceObserver.""" + + def setUp(self): + """Set up a fresh provider and exporter for each test. + + We create a dedicated TracerProvider per test and inject its tracer + directly into the observer, avoiding the global provider singleton. + """ + self._exporter = _InMemorySpanExporter() + self._provider = TracerProvider() + self._provider.add_span_processor(SimpleSpanProcessor(self._exporter)) + self._tracer = self._provider.get_tracer("pipecat.turn") + + def tearDown(self): + """Shut down the provider to flush spans.""" + self._provider.shutdown() + + def _create_observers(self, conversation_id=None, tracing_context=None): + """Create a standard set of turn/trace observers. + + Args: + conversation_id: Optional conversation ID. + tracing_context: Optional TracingContext instance. + + Returns: + Tuple of (turn_tracker, latency_tracker, trace_observer, tracing_context). + """ + tracing_context = tracing_context or TracingContext() + turn_tracker = TurnTrackingObserver(turn_end_timeout_secs=0.2) + latency_tracker = UserBotLatencyObserver() + trace_observer = TurnTraceObserver( + turn_tracker, + latency_tracker=latency_tracker, + conversation_id=conversation_id, + tracing_context=tracing_context, + ) + # Inject the test tracer so spans go to our in-memory exporter + trace_observer._tracer = self._tracer + return turn_tracker, latency_tracker, trace_observer, tracing_context + + def _all_observers(self, trace_observer): + """Return the list of observers needed for run_test.""" + return [trace_observer._turn_tracker, trace_observer._latency_tracker, trace_observer] + + def _get_spans_by_name(self, name): + """Return finished spans with the given name.""" + return [s for s in self._exporter.get_finished_spans() if s.name == name] + + async def test_conversation_span_created_on_start_frame(self): + """Test that a conversation span is created when StartFrame is observed.""" + _, _, trace_observer, _ = self._create_observers(conversation_id="test-conv") + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + # End conversation to flush the conversation span (normally done by PipelineTask._cleanup) + trace_observer.end_conversation_tracing() + + conv_spans = self._get_spans_by_name("conversation") + self.assertEqual(len(conv_spans), 1) + self.assertEqual(conv_spans[0].attributes["conversation.id"], "test-conv") + self.assertEqual(conv_spans[0].attributes["conversation.type"], "voice") + + async def test_turn_spans_created_for_each_turn(self): + """Test that a turn span is created for each conversation turn.""" + _, _, trace_observer, _ = self._create_observers() + processor = IdentityFilter() + + frames_to_send = [ + # Turn 1 + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.05), + # Turn 2 + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + turn_spans = self._get_spans_by_name("turn") + self.assertEqual(len(turn_spans), 2) + turn_numbers = {s.attributes["turn.number"] for s in turn_spans} + self.assertEqual(turn_numbers, {1, 2}) + + async def test_turn_spans_are_children_of_conversation(self): + """Test that turn spans are parented under the conversation span.""" + _, _, trace_observer, _ = self._create_observers() + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + # End conversation to flush the conversation span + trace_observer.end_conversation_tracing() + + conv_spans = self._get_spans_by_name("conversation") + turn_spans = self._get_spans_by_name("turn") + self.assertEqual(len(conv_spans), 1) + self.assertEqual(len(turn_spans), 1) + + # Turn span's parent should be the conversation span + conv_span_id = conv_spans[0].context.span_id + turn_parent_id = turn_spans[0].parent.span_id + self.assertEqual(turn_parent_id, conv_span_id) + + async def test_interrupted_turn_marked(self): + """Test that an interrupted turn span has was_interrupted=True.""" + _, _, trace_observer, _ = self._create_observers() + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + # User interrupts + UserStartedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + UserStartedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + # End conversation to flush remaining spans + trace_observer.end_conversation_tracing() + + turn_spans = self._get_spans_by_name("turn") + self.assertGreaterEqual(len(turn_spans), 1) + # First turn should be interrupted + interrupted_turns = [s for s in turn_spans if s.attributes.get("turn.was_interrupted")] + self.assertGreaterEqual(len(interrupted_turns), 1) + + async def test_tracing_context_updated_during_turn(self): + """Test that TracingContext is populated during a turn and cleared after.""" + tracing_ctx = TracingContext() + _, _, trace_observer, _ = self._create_observers(tracing_context=tracing_ctx) + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + # After the turn ends, turn context should be cleared + self.assertIsNone(tracing_ctx.get_turn_context()) + + async def test_tracing_context_cleared_after_conversation_end(self): + """Test that TracingContext is cleared when conversation tracing ends.""" + tracing_ctx = TracingContext() + _, _, trace_observer, _ = self._create_observers(tracing_context=tracing_ctx) + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + # Manually end conversation tracing (as PipelineTask._cleanup does) + trace_observer.end_conversation_tracing() + + self.assertIsNone(tracing_ctx.get_conversation_context()) + self.assertIsNone(tracing_ctx.get_turn_context()) + self.assertIsNone(tracing_ctx.conversation_id) + + async def test_additional_span_attributes(self): + """Test that additional span attributes are added to the conversation span.""" + extra_attrs = {"deployment.id": "abc-123", "customer.tier": "premium"} + tracing_ctx = TracingContext() + turn_tracker = TurnTrackingObserver(turn_end_timeout_secs=0.2) + latency_tracker = UserBotLatencyObserver() + trace_observer = TurnTraceObserver( + turn_tracker, + latency_tracker=latency_tracker, + additional_span_attributes=extra_attrs, + tracing_context=tracing_ctx, + ) + trace_observer._tracer = self._tracer + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=[turn_tracker, latency_tracker, trace_observer], + ) + + # End conversation to flush the conversation span + trace_observer.end_conversation_tracing() + + conv_spans = self._get_spans_by_name("conversation") + self.assertEqual(len(conv_spans), 1) + self.assertEqual(conv_spans[0].attributes["deployment.id"], "abc-123") + self.assertEqual(conv_spans[0].attributes["customer.tier"], "premium") + + async def test_concurrent_pipelines_are_isolated(self): + """Test that two pipelines with separate TracingContexts don't interfere.""" + tracing_ctx_a = TracingContext() + tracing_ctx_b = TracingContext() + + _, _, trace_observer_a, _ = self._create_observers( + conversation_id="conv-a", tracing_context=tracing_ctx_a + ) + _, _, trace_observer_b, _ = self._create_observers( + conversation_id="conv-b", tracing_context=tracing_ctx_b + ) + + processor_a = IdentityFilter() + processor_b = IdentityFilter() + + frames = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + # Run both pipelines concurrently + await asyncio.gather( + run_test( + processor_a, + frames_to_send=frames, + expected_down_frames=expected, + observers=self._all_observers(trace_observer_a), + ), + run_test( + processor_b, + frames_to_send=frames, + expected_down_frames=expected, + observers=self._all_observers(trace_observer_b), + ), + ) + + # End both conversations to flush spans + trace_observer_a.end_conversation_tracing() + trace_observer_b.end_conversation_tracing() + + # Each TracingContext should have its own conversation ID + conv_spans = self._get_spans_by_name("conversation") + conv_ids = {s.attributes["conversation.id"] for s in conv_spans} + self.assertEqual(conv_ids, {"conv-a", "conv-b"}) + + # Turn spans should be children of their own conversation span, not cross-linked + turn_spans = self._get_spans_by_name("turn") + conv_span_map = {s.context.span_id: s.attributes["conversation.id"] for s in conv_spans} + for turn_span in turn_spans: + parent_id = turn_span.parent.span_id + turn_conv_id = turn_span.attributes["conversation.id"] + parent_conv_id = conv_span_map[parent_id] + self.assertEqual( + turn_conv_id, + parent_conv_id, + f"Turn span for {turn_conv_id} parented under {parent_conv_id}", + ) + + async def test_end_conversation_closes_active_turn(self): + """Test that end_conversation_tracing closes any active turn span.""" + _, _, trace_observer, _ = self._create_observers() + + # Manually start conversation and a turn + trace_observer.start_conversation_tracing("conv-end-test") + await trace_observer._handle_turn_started(1) + + self.assertIsNotNone(trace_observer._current_span) + self.assertIsNotNone(trace_observer._conversation_span) + + # End conversation — should close both turn and conversation + trace_observer.end_conversation_tracing() + + self.assertIsNone(trace_observer._current_span) + self.assertIsNone(trace_observer._conversation_span) + + # Check span attributes + turn_spans = self._get_spans_by_name("turn") + self.assertEqual(len(turn_spans), 1) + self.assertTrue(turn_spans[0].attributes["turn.was_interrupted"]) + self.assertTrue(turn_spans[0].attributes["turn.ended_by_conversation_end"]) + + async def test_conversation_id_auto_generated(self): + """Test that a conversation ID is auto-generated when none is provided.""" + _, _, trace_observer, _ = self._create_observers(conversation_id=None) + processor = IdentityFilter() + + frames_to_send = [ + UserStartedSpeakingFrame(), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + SleepFrame(sleep=0.4), + ] + + expected_down_frames = [ + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=self._all_observers(trace_observer), + ) + + # End conversation to flush the conversation span + trace_observer.end_conversation_tracing() + + conv_spans = self._get_spans_by_name("conversation") + self.assertEqual(len(conv_spans), 1) + # Should have an auto-generated UUID as conversation.id + conv_id = conv_spans[0].attributes["conversation.id"] + self.assertIsNotNone(conv_id) + self.assertGreater(len(conv_id), 0) + + +if __name__ == "__main__": + unittest.main()