diff --git a/examples/open-telemetry-tracing-langfuse/README.md b/examples/open-telemetry-tracing-langfuse/README.md new file mode 100644 index 000000000..266c62757 --- /dev/null +++ b/examples/open-telemetry-tracing-langfuse/README.md @@ -0,0 +1,140 @@ +# Langfuse Tracing for Pipecat via OpenTelemetry + +This demo showcases [Langfuse](https://langfuse.com) tracing integration for Pipecat services via OpenTelemetry, allowing you to visualize service calls, performance metrics, and dependencies. + +This is a fork of the [OpenTelemetry Tracing for Pipecat](../open-telemetry-tracing) demo, but uses Langfuse instead of Jaeger. In contrast to the original demo, this demo uses the `opentelemetry-exporter-otlp-proto-http` exporter as the `grpc` exporter is not supported by Langfuse. + +Pipecat trace in Langfuse: + +https://github.com/user-attachments/assets/13dd7431-bf5e-42e3-8d6d-2ed84c51195d + +## Features + +- **Hierarchical Tracing**: Track entire conversations, turns, and service calls +- **Service Tracing**: Detailed spans for TTS, STT, and LLM services with rich context +- **TTFB Metrics**: Capture Time To First Byte metrics for latency analysis +- **Usage Statistics**: Track character counts for TTS and token usage for LLMs + +## Trace Structure + +Traces are organized hierarchically: + +``` +Conversation (conversation-uuid) +├── turn-1 +│ ├── stt_deepgramsttservice +│ ├── llm_openaillmservice +│ └── tts_cartesiattsservice +└── turn-2 + ├── stt_deepgramsttservice + ├── llm_openaillmservice + └── tts_cartesiattsservice + turn-N + └── ... +``` + +This organization helps you track conversation-to-conversation and turn-to-turn. + +## Setup Instructions + +### 1. Create a Langfuse Project and get API keys + +[Self-host](https://langfuse.com/self-hosting) Langfuse or create a free [Langfuse Cloud](https://cloud.langfuse.com) account. +Create a new project and get the API keys. + +### 2. Environment Configuration + +Base64 encode your Langfuse public and secret key: + +```bash +echo -n "pk-lf-1234567890:sk-lf-1234567890" | base64 +``` + +Create a `.env` file with your API keys to enable tracing: + +``` +ENABLE_TRACING=true +# OTLP endpoint (defaults to localhost:4317 if not set) +OTEL_EXPORTER_OTLP_ENDPOINT=http://cloud.langfuse.com/api/public/otel +OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic%20 +# Set to any value to enable console output for debugging +# OTEL_CONSOLE_EXPORT=true +``` + +### 3. Configure Your Pipeline Task + +Enable tracing in your Pipecat application: + +```python +# Initialize OpenTelemetry with your chosen exporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +# Configured automatically from .env +exporter = OTLPSpanExporter() + +setup_tracing( + service_name="pipecat-demo", + exporter=exporter, + console_export=os.getenv("OTEL_CONSOLE_EXPORT", "false").lower() == "true", +) + +# Enable tracing in your PipelineTask +task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, # Required for some service metrics + ), + enable_tracing=True, # Enables both turn and conversation tracing + conversation_id="customer-123", # Optional - will auto-generate if not provided +) +``` + +### 4. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 5. Run the Demo + +```bash +python bot.py +``` + +### 6. View Traces in Langfuse + +Open your browser to [https://cloud.langfuse.com](https://cloud.langfuse.com) to view traces. + +## Understanding the Traces + +- **Conversation Spans**: The top-level span representing an entire conversation +- **Turn Spans**: Child spans of conversations that represent each turn in the dialog +- **Service Spans**: Detailed service operations nested under turns +- **Service Attributes**: Each service includes rich context about its operation: + - **TTS**: Voice ID, character count, service type + - **STT**: Transcription text, language, model + - **LLM**: Messages, tokens used, model, service configuration +- **Metrics**: Performance data like `metrics.ttfb_ms` and processing durations + +## How It Works + +The tracing system consists of: + +1. **TurnTrackingObserver**: Detects conversation turns +2. **TurnTraceObserver**: Creates spans for turns and conversations +3. **Service Decorators**: `@traced_tts`, `@traced_stt`, `@traced_llm` for service-specific tracing +4. **Context Providers**: Share context between different parts of the pipeline + +## Troubleshooting + +- **No Traces in Langfuse**: Ensure that your credentials are correct and follow this [troubleshooting guide](https://langfuse.com/faq/all/missing-traces) +- **Debugging Traces**: Set `OTEL_CONSOLE_EXPORT=true` to print traces to the console for debugging +- **Missing Metrics**: Check that `enable_metrics=True` in PipelineParams +- **Connection Errors**: Verify network connectivity to Langfuse +- **Exporter Issues**: Try the Console exporter (`OTEL_CONSOLE_EXPORT=true`) to verify tracing works + +## References + +- [OpenTelemetry Python Documentation](https://opentelemetry-python.readthedocs.io/) +- [Langfuse OpenTelemetry Documentation](https://langfuse.com/docs/opentelemetry/get-started) diff --git a/examples/open-telemetry-tracing-langfuse/bot.py b/examples/open-telemetry-tracing-langfuse/bot.py new file mode 100644 index 000000000..f4d6d76ac --- /dev/null +++ b/examples/open-telemetry-tracing-langfuse/bot.py @@ -0,0 +1,156 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.utils.tracing.setup import setup_tracing + +load_dotenv(override=True) + +IS_TRACING_ENABLED = bool(os.getenv("ENABLE_TRACING")) + +# Initialize tracing if enabled +if IS_TRACING_ENABLED: + # Create the exporter + otlp_exporter = OTLPSpanExporter() + + # Set up tracing with the exporter + setup_tracing( + service_name="pipecat-demo", + exporter=otlp_exporter, + console_export=bool(os.getenv("OTEL_CONSOLE_EXPORT")), + ) + logger.info("OpenTelemetry tracing initialized") + + +async def fetch_weather_from_api(params: FunctionCallParams): + await params.llm.push_frame(TTSSpeakFrame("Let me check on that.")) + await params.result_callback({"conditions": "nice", "temperature": "75"}) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), params=OpenAILLMService.InputParams(temperature=0.5) + ) + + # You can also register a function_name of None to get all functions + # sent to the same callback with an additional function_name parameter. + llm.register_function("get_current_weather", fetch_weather_from_api) + + weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the user's location.", + }, + }, + required=["location", "format"], + ) + tools = ToolsSchema(standard_tools=[weather_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + enable_tracing=IS_TRACING_ENABLED, + # Optionally, add a conversation ID to track the conversation + # conversation_id="8df26cc1-6db0-4a7a-9930-1e037c8f1fa2", + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/examples/open-telemetry-tracing-langfuse/env.example b/examples/open-telemetry-tracing-langfuse/env.example new file mode 100644 index 000000000..c86485674 --- /dev/null +++ b/examples/open-telemetry-tracing-langfuse/env.example @@ -0,0 +1,11 @@ +DEEPGRAM_API_KEY=your_deepgram_key +CARTESIA_API_KEY=your_cartesia_key +OPENAI_API_KEY=your_openai_key + +# Set to any value to enable tracing +ENABLE_TRACING=true +# OTLP endpoint (change to us.cloud.langfuse.com if you use the US data region) +OTEL_EXPORTER_OTLP_ENDPOINT=http://cloud.langfuse.com/api/public/otel +OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic%20 +# Set to any value to enable console output for debugging +# OTEL_CONSOLE_EXPORT=true \ No newline at end of file diff --git a/examples/open-telemetry-tracing-langfuse/requirements.txt b/examples/open-telemetry-tracing-langfuse/requirements.txt new file mode 100644 index 000000000..fe0f32468 --- /dev/null +++ b/examples/open-telemetry-tracing-langfuse/requirements.txt @@ -0,0 +1,6 @@ +fastapi +uvicorn +python-dotenv +pipecat-ai[webrtc,silero,cartesia,deepgram,openai,tracing] +pipecat-ai-small-webrtc-prebuilt +opentelemetry-exporter-otlp-proto-http \ No newline at end of file diff --git a/examples/open-telemetry-tracing-langfuse/run.py b/examples/open-telemetry-tracing-langfuse/run.py new file mode 100644 index 000000000..e7012c9e9 --- /dev/null +++ b/examples/open-telemetry-tracing-langfuse/run.py @@ -0,0 +1,205 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import asyncio +import importlib.util +import os +import sys +from contextlib import asynccontextmanager +from inspect import iscoroutinefunction, signature +from typing import Any, Callable, Dict, Optional, Tuple + +import uvicorn +from dotenv import load_dotenv +from fastapi import BackgroundTasks, FastAPI +from fastapi.responses import RedirectResponse +from loguru import logger +from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI + +from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection + +# Load environment variables +load_dotenv(override=True) + +app = FastAPI() + +# Store connections by pc_id +pcs_map: Dict[str, SmallWebRTCConnection] = {} + +ice_servers = [ + IceServer( + urls="stun:stun.l.google.com:19302", + ) +] + +# Mount the frontend at / +app.mount("/client", SmallWebRTCPrebuiltUI) + +# Store program arguments +args: argparse.Namespace = argparse.Namespace() + +# Store the bot module and function info +bot_module: Any = None +run_bot_func: Optional[Callable] = None +is_webrtc_bot: bool = True + + +def import_bot_file(file_path: str) -> Tuple[Any, Callable, bool]: + """Dynamically import the bot file and determine how to run it. + + Returns: + tuple: (module, run_function, is_webrtc_bot) + - module: The imported module + - run_function: Either run_bot or main function + - is_webrtc_bot: True if run_bot function exists and accepts a WebRTC connection + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"Bot file not found: {file_path}") + + # Extract module name without extension + module_name = os.path.splitext(os.path.basename(file_path))[0] + + # Load the module + spec = importlib.util.spec_from_file_location(module_name, file_path) + if not spec or not spec.loader: + raise ImportError(f"Could not load spec for {file_path}") + + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + + # Check for run_bot function first + if hasattr(module, "run_bot"): + run_func = module.run_bot + # Check if the function accepts a WebRTC connection + sig = signature(run_func) + is_webrtc = len(sig.parameters) > 0 + return module, run_func, is_webrtc + + # Fall back to main function + if hasattr(module, "main") and iscoroutinefunction(module.main): + return module, module.main, False + + raise AttributeError(f"No run_bot or async main function found in {file_path}") + + +@app.get("/", include_in_schema=False) +async def root_redirect(): + return RedirectResponse(url="/client/") + + +@app.post("/api/offer") +async def offer(request: dict, background_tasks: BackgroundTasks): + global run_bot_func, is_webrtc_bot + + if not run_bot_func: + raise RuntimeError("No bot file has been loaded") + + if not is_webrtc_bot: + return { + "error": "This bot doesn't support WebRTC connections, it's running in standalone mode" + } + + pc_id = request.get("pc_id") + + if pc_id and pc_id in pcs_map: + pipecat_connection = pcs_map[pc_id] + logger.info(f"Reusing existing connection for pc_id: {pc_id}") + await pipecat_connection.renegotiate( + sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False) + ) + else: + pipecat_connection = SmallWebRTCConnection(ice_servers) + await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"]) + + @pipecat_connection.event_handler("closed") + async def handle_disconnected(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}") + pcs_map.pop(webrtc_connection.pc_id, None) + + # We've already checked that run_bot_func exists + assert run_bot_func is not None + background_tasks.add_task(run_bot_func, pipecat_connection, args) + + answer = pipecat_connection.get_answer() + # Updating the peer connection inside the map + pcs_map[answer["pc_id"]] = pipecat_connection + + return answer + + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield # Run app + coros = [pc.close() for pc in pcs_map.values()] + await asyncio.gather(*coros) + pcs_map.clear() + + +async def run_standalone_bot() -> None: + """Run a standalone bot that doesn't require WebRTC""" + global run_bot_func + if run_bot_func is not None: + await run_bot_func() + else: + raise RuntimeError("No bot function available to run") + + +def main(parser: Optional[argparse.ArgumentParser] = None): + global args + + if not parser: + parser = argparse.ArgumentParser(description="Pipecat Bot Runner") + parser.add_argument("bot_file", nargs="?", help="Path to the bot file", default=None) + parser.add_argument( + "--host", default="localhost", help="Host for HTTP server (default: localhost)" + ) + parser.add_argument( + "--port", type=int, default=7860, help="Port for HTTP server (default: 7860)" + ) + parser.add_argument("--verbose", "-v", action="count", default=0) + args = parser.parse_args() + + logger.remove(0) + if args.verbose: + logger.add(sys.stderr, level="TRACE") + else: + logger.add(sys.stderr, level="DEBUG") + + # Infer the bot file from the caller if not provided explicitly + bot_file = args.bot_file + if bot_file is None: + # Get the __file__ of the script that called main() + import inspect + + caller_frame = inspect.stack()[1] + caller_globals = caller_frame.frame.f_globals + bot_file = caller_globals.get("__file__") + + if not bot_file: + print("❌ Could not determine the bot file. Pass it explicitly to main().") + sys.exit(1) + + # Import the bot file + try: + global run_bot_func, bot_module, is_webrtc_bot + bot_module, run_bot_func, is_webrtc_bot = import_bot_file(bot_file) + logger.info(f"Successfully loaded bot from {bot_file}") + + if is_webrtc_bot: + logger.info("Detected WebRTC-compatible bot, starting web server...") + uvicorn.run(app, host=args.host, port=args.port) + else: + logger.info("Detected standalone bot, running directly...") + asyncio.run(run_standalone_bot()) + except Exception as e: + logger.error(f"Error loading bot file: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/examples/open-telemetry-tracing/README.md b/examples/open-telemetry-tracing/README.md index 0635450f4..8695a8751 100644 --- a/examples/open-telemetry-tracing/README.md +++ b/examples/open-telemetry-tracing/README.md @@ -118,6 +118,13 @@ Many cloud providers offer OpenTelemetry-compatible observability services: See the OpenTelemetry documentation for specific exporter configurations: https://opentelemetry.io/ecosystem/vendors/ +#### LLM Tracing and Evaluation Providers + +Many LLM-focused tracing and evaluation projects support OpenTelemetry, for example: + +- Langfuse ([integration example](../open-telemetry-tracing-langfuse/)) +- Arize Phoenix + ### 5. Install Dependencies ```bash