Merge pull request #2233 from pipecat-ai/mb/enable-tracing-flag

fix: enable_tracing PipelineParam controls the service class decorators
This commit is contained in:
Mark Backman
2025-07-22 08:14:32 -07:00
committed by GitHub
7 changed files with 26 additions and 7 deletions

View File

@@ -31,6 +31,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue in `MiniMaxHttpTTSService` where the `pitch` param was the
incorrect type.
- Fixed an issue with OpenTelemetry tracing where the `enable_tracing` flag did
not disable the internal tracing decorator functions.
- Fixed an issue in `OLLamaLLMService` where kwargs were not passed correctly
to the parent class.

View File

@@ -614,6 +614,7 @@ class StartFrame(SystemFrame):
audio_out_sample_rate: Output audio sample rate in Hz.
allow_interruptions: Whether to allow user interruptions.
enable_metrics: Whether to enable performance metrics collection.
enable_tracing: Whether to enable OpenTelemetry tracing.
enable_usage_metrics: Whether to enable usage metrics collection.
interruption_strategies: List of interruption handling strategies.
report_only_initial_ttfb: Whether to report only initial time-to-first-byte.
@@ -623,6 +624,7 @@ class StartFrame(SystemFrame):
audio_out_sample_rate: int = 24000
allow_interruptions: bool = False
enable_metrics: bool = False
enable_tracing: bool = False
enable_usage_metrics: bool = False
interruption_strategies: List[BaseInterruptionStrategy] = field(default_factory=list)
report_only_initial_ttfb: bool = False

View File

@@ -638,6 +638,7 @@ class PipelineTask(BasePipelineTask):
audio_in_sample_rate=self._params.audio_in_sample_rate,
audio_out_sample_rate=self._params.audio_out_sample_rate,
enable_metrics=self._params.enable_metrics,
enable_tracing=self._enable_tracing,
enable_usage_metrics=self._params.enable_usage_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
interruption_strategies=self._params.interruption_strategies,

View File

@@ -176,6 +176,7 @@ class LLMService(AIService):
self._functions: Dict[Optional[str], FunctionCallRegistryItem] = {}
self._function_call_tasks: Dict[asyncio.Task, FunctionCallRunnerItem] = {}
self._sequential_runner_task: Optional[asyncio.Task] = None
self._tracing_enabled: bool = False
self._register_event_handler("on_function_calls_started")
self._register_event_handler("on_completion_timeout")
@@ -218,6 +219,7 @@ class LLMService(AIService):
await super().start(frame)
if not self._run_in_parallel:
await self._create_sequential_runner_task()
self._tracing_enabled = frame.enable_tracing
async def stop(self, frame: EndFrame):
"""Stop the LLM service.

View File

@@ -56,6 +56,7 @@ class STTService(AIService):
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._settings: Dict[str, Any] = {}
self._tracing_enabled: bool = False
self._muted: bool = False
self._user_id: str = ""
@@ -116,6 +117,7 @@ class STTService(AIService):
"""
await super().start(frame)
self._sample_rate = self._init_sample_rate or frame.audio_in_sample_rate
self._tracing_enabled = frame.enable_tracing
async def _update_settings(self, settings: Mapping[str, Any]):
logger.info(f"Updating STT settings: {self._settings}")

View File

@@ -116,6 +116,7 @@ class TTSService(AIService):
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
self._text_filters: Sequence[BaseTextFilter] = text_filters or []
self._transport_destination: Optional[str] = transport_destination
self._tracing_enabled: bool = False
if text_filter:
import warnings
@@ -224,6 +225,7 @@ class TTSService(AIService):
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
if self._push_stop_frames and not self._stop_frame_task:
self._stop_frame_task = self.create_task(self._stop_frame_handler())
self._tracing_enabled = frame.enable_tracing
async def stop(self, frame: EndFrame):
"""Stop the TTS service.

View File

@@ -134,7 +134,8 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -
Yields:
The active span for the TTS operation.
"""
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
yield None
return
@@ -178,7 +179,8 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -
@functools.wraps(f)
async def gen_wrapper(self, text, *args, **kwargs):
try:
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
async for item in f(self, text, *args, **kwargs):
yield item
return
@@ -198,7 +200,8 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -
@functools.wraps(f)
async def wrapper(self, text, *args, **kwargs):
try:
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await f(self, text, *args, **kwargs)
async with tracing_context(self, text):
@@ -239,7 +242,8 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) -
@functools.wraps(f)
async def wrapper(self, transcript, is_final, language=None):
try:
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await f(self, transcript, is_final, language)
service_class_name = self.__class__.__name__
@@ -320,7 +324,8 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
@functools.wraps(f)
async def wrapper(self, context, *args, **kwargs):
try:
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await f(self, context, *args, **kwargs)
service_class_name = self.__class__.__name__
@@ -522,7 +527,8 @@ def traced_gemini_live(operation: str) -> Callable:
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
try:
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await func(self, *args, **kwargs)
service_class_name = self.__class__.__name__
@@ -826,7 +832,8 @@ def traced_openai_realtime(operation: str) -> Callable:
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
try:
if not is_tracing_available():
# Check if tracing is enabled for this service instance
if not getattr(self, "_tracing_enabled", False):
return await func(self, *args, **kwargs)
service_class_name = self.__class__.__name__