diff --git a/CHANGELOG.md b/CHANGELOG.md index 7536b20de..5a0b6966d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed an issue in `MiniMaxHttpTTSService` where the `pitch` param was the incorrect type. +- Fixed an issue with OpenTelemetry tracing where the `enable_tracing` flag did + not disable the internal tracing decorator functions. + - Fixed an issue in `OLLamaLLMService` where kwargs were not passed correctly to the parent class. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 9d73b44ff..e75a56fd9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -614,6 +614,7 @@ class StartFrame(SystemFrame): audio_out_sample_rate: Output audio sample rate in Hz. allow_interruptions: Whether to allow user interruptions. enable_metrics: Whether to enable performance metrics collection. + enable_tracing: Whether to enable OpenTelemetry tracing. enable_usage_metrics: Whether to enable usage metrics collection. interruption_strategies: List of interruption handling strategies. report_only_initial_ttfb: Whether to report only initial time-to-first-byte. @@ -623,6 +624,7 @@ class StartFrame(SystemFrame): audio_out_sample_rate: int = 24000 allow_interruptions: bool = False enable_metrics: bool = False + enable_tracing: bool = False enable_usage_metrics: bool = False interruption_strategies: List[BaseInterruptionStrategy] = field(default_factory=list) report_only_initial_ttfb: bool = False diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 5ddd97628..a3dfbab24 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -638,6 +638,7 @@ class PipelineTask(BasePipelineTask): audio_in_sample_rate=self._params.audio_in_sample_rate, audio_out_sample_rate=self._params.audio_out_sample_rate, enable_metrics=self._params.enable_metrics, + enable_tracing=self._enable_tracing, enable_usage_metrics=self._params.enable_usage_metrics, report_only_initial_ttfb=self._params.report_only_initial_ttfb, interruption_strategies=self._params.interruption_strategies, diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index cc4cd3916..f3e2b843a 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -176,6 +176,7 @@ class LLMService(AIService): self._functions: Dict[Optional[str], FunctionCallRegistryItem] = {} self._function_call_tasks: Dict[asyncio.Task, FunctionCallRunnerItem] = {} self._sequential_runner_task: Optional[asyncio.Task] = None + self._tracing_enabled: bool = False self._register_event_handler("on_function_calls_started") self._register_event_handler("on_completion_timeout") @@ -218,6 +219,7 @@ class LLMService(AIService): await super().start(frame) if not self._run_in_parallel: await self._create_sequential_runner_task() + self._tracing_enabled = frame.enable_tracing async def stop(self, frame: EndFrame): """Stop the LLM service. diff --git a/src/pipecat/services/stt_service.py b/src/pipecat/services/stt_service.py index 104be4810..62d72da53 100644 --- a/src/pipecat/services/stt_service.py +++ b/src/pipecat/services/stt_service.py @@ -56,6 +56,7 @@ class STTService(AIService): self._init_sample_rate = sample_rate self._sample_rate = 0 self._settings: Dict[str, Any] = {} + self._tracing_enabled: bool = False self._muted: bool = False self._user_id: str = "" @@ -116,6 +117,7 @@ class STTService(AIService): """ await super().start(frame) self._sample_rate = self._init_sample_rate or frame.audio_in_sample_rate + self._tracing_enabled = frame.enable_tracing async def _update_settings(self, settings: Mapping[str, Any]): logger.info(f"Updating STT settings: {self._settings}") diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 62fcdd4e6..bcb485d33 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -116,6 +116,7 @@ class TTSService(AIService): self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator() self._text_filters: Sequence[BaseTextFilter] = text_filters or [] self._transport_destination: Optional[str] = transport_destination + self._tracing_enabled: bool = False if text_filter: import warnings @@ -224,6 +225,7 @@ class TTSService(AIService): self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate if self._push_stop_frames and not self._stop_frame_task: self._stop_frame_task = self.create_task(self._stop_frame_handler()) + self._tracing_enabled = frame.enable_tracing async def stop(self, frame: EndFrame): """Stop the TTS service. diff --git a/src/pipecat/utils/tracing/service_decorators.py b/src/pipecat/utils/tracing/service_decorators.py index c5612d2b2..2edec9862 100644 --- a/src/pipecat/utils/tracing/service_decorators.py +++ b/src/pipecat/utils/tracing/service_decorators.py @@ -134,7 +134,8 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - Yields: The active span for the TTS operation. """ - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): yield None return @@ -178,7 +179,8 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def gen_wrapper(self, text, *args, **kwargs): try: - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): async for item in f(self, text, *args, **kwargs): yield item return @@ -198,7 +200,8 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def wrapper(self, text, *args, **kwargs): try: - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): return await f(self, text, *args, **kwargs) async with tracing_context(self, text): @@ -239,7 +242,8 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def wrapper(self, transcript, is_final, language=None): try: - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): return await f(self, transcript, is_final, language) service_class_name = self.__class__.__name__ @@ -320,7 +324,8 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) - @functools.wraps(f) async def wrapper(self, context, *args, **kwargs): try: - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): return await f(self, context, *args, **kwargs) service_class_name = self.__class__.__name__ @@ -522,7 +527,8 @@ def traced_gemini_live(operation: str) -> Callable: @functools.wraps(func) async def wrapper(self, *args, **kwargs): try: - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): return await func(self, *args, **kwargs) service_class_name = self.__class__.__name__ @@ -826,7 +832,8 @@ def traced_openai_realtime(operation: str) -> Callable: @functools.wraps(func) async def wrapper(self, *args, **kwargs): try: - if not is_tracing_available(): + # Check if tracing is enabled for this service instance + if not getattr(self, "_tracing_enabled", False): return await func(self, *args, **kwargs) service_class_name = self.__class__.__name__