Merge pull request #1835 from marcklingen/langfuse-tracing

Add examples/open-telemetry-tracing-langfuse
This commit is contained in:
Mark Backman
2025-05-20 18:22:55 -04:00
committed by GitHub
6 changed files with 525 additions and 0 deletions

View File

@@ -0,0 +1,140 @@
# Langfuse Tracing for Pipecat via OpenTelemetry
This demo showcases [Langfuse](https://langfuse.com) tracing integration for Pipecat services via OpenTelemetry, allowing you to visualize service calls, performance metrics, and dependencies.
This is a fork of the [OpenTelemetry Tracing for Pipecat](../open-telemetry-tracing) demo, but uses Langfuse instead of Jaeger. In contrast to the original demo, this demo uses the `opentelemetry-exporter-otlp-proto-http` exporter as the `grpc` exporter is not supported by Langfuse.
Pipecat trace in Langfuse:
https://github.com/user-attachments/assets/13dd7431-bf5e-42e3-8d6d-2ed84c51195d
## Features
- **Hierarchical Tracing**: Track entire conversations, turns, and service calls
- **Service Tracing**: Detailed spans for TTS, STT, and LLM services with rich context
- **TTFB Metrics**: Capture Time To First Byte metrics for latency analysis
- **Usage Statistics**: Track character counts for TTS and token usage for LLMs
## Trace Structure
Traces are organized hierarchically:
```
Conversation (conversation-uuid)
├── turn-1
│ ├── stt_deepgramsttservice
│ ├── llm_openaillmservice
│ └── tts_cartesiattsservice
└── turn-2
├── stt_deepgramsttservice
├── llm_openaillmservice
└── tts_cartesiattsservice
turn-N
└── ...
```
This organization helps you track conversation-to-conversation and turn-to-turn.
## Setup Instructions
### 1. Create a Langfuse Project and get API keys
[Self-host](https://langfuse.com/self-hosting) Langfuse or create a free [Langfuse Cloud](https://cloud.langfuse.com) account.
Create a new project and get the API keys.
### 2. Environment Configuration
Base64 encode your Langfuse public and secret key:
```bash
echo -n "pk-lf-1234567890:sk-lf-1234567890" | base64
```
Create a `.env` file with your API keys to enable tracing:
```
ENABLE_TRACING=true
# OTLP endpoint (defaults to localhost:4317 if not set)
OTEL_EXPORTER_OTLP_ENDPOINT=http://cloud.langfuse.com/api/public/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic%20<base64_encoded_api_key>
# Set to any value to enable console output for debugging
# OTEL_CONSOLE_EXPORT=true
```
### 3. Configure Your Pipeline Task
Enable tracing in your Pipecat application:
```python
# Initialize OpenTelemetry with your chosen exporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# Configured automatically from .env
exporter = OTLPSpanExporter()
setup_tracing(
service_name="pipecat-demo",
exporter=exporter,
console_export=os.getenv("OTEL_CONSOLE_EXPORT", "false").lower() == "true",
)
# Enable tracing in your PipelineTask
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True, # Required for some service metrics
),
enable_tracing=True, # Enables both turn and conversation tracing
conversation_id="customer-123", # Optional - will auto-generate if not provided
)
```
### 4. Install Dependencies
```bash
pip install -r requirements.txt
```
### 5. Run the Demo
```bash
python bot.py
```
### 6. View Traces in Langfuse
Open your browser to [https://cloud.langfuse.com](https://cloud.langfuse.com) to view traces.
## Understanding the Traces
- **Conversation Spans**: The top-level span representing an entire conversation
- **Turn Spans**: Child spans of conversations that represent each turn in the dialog
- **Service Spans**: Detailed service operations nested under turns
- **Service Attributes**: Each service includes rich context about its operation:
- **TTS**: Voice ID, character count, service type
- **STT**: Transcription text, language, model
- **LLM**: Messages, tokens used, model, service configuration
- **Metrics**: Performance data like `metrics.ttfb_ms` and processing durations
## How It Works
The tracing system consists of:
1. **TurnTrackingObserver**: Detects conversation turns
2. **TurnTraceObserver**: Creates spans for turns and conversations
3. **Service Decorators**: `@traced_tts`, `@traced_stt`, `@traced_llm` for service-specific tracing
4. **Context Providers**: Share context between different parts of the pipeline
## Troubleshooting
- **No Traces in Langfuse**: Ensure that your credentials are correct and follow this [troubleshooting guide](https://langfuse.com/faq/all/missing-traces)
- **Debugging Traces**: Set `OTEL_CONSOLE_EXPORT=true` to print traces to the console for debugging
- **Missing Metrics**: Check that `enable_metrics=True` in PipelineParams
- **Connection Errors**: Verify network connectivity to Langfuse
- **Exporter Issues**: Try the Console exporter (`OTEL_CONSOLE_EXPORT=true`) to verify tracing works
## References
- [OpenTelemetry Python Documentation](https://opentelemetry-python.readthedocs.io/)
- [Langfuse OpenTelemetry Documentation](https://langfuse.com/docs/opentelemetry/get-started)

View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.utils.tracing.setup import setup_tracing
load_dotenv(override=True)
IS_TRACING_ENABLED = bool(os.getenv("ENABLE_TRACING"))
# Initialize tracing if enabled
if IS_TRACING_ENABLED:
# Create the exporter
otlp_exporter = OTLPSpanExporter()
# Set up tracing with the exporter
setup_tracing(
service_name="pipecat-demo",
exporter=otlp_exporter,
console_export=bool(os.getenv("OTEL_CONSOLE_EXPORT")),
)
logger.info("OpenTelemetry tracing initialized")
async def fetch_weather_from_api(params: FunctionCallParams):
await params.llm.push_frame(TTSSpeakFrame("Let me check on that."))
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"), params=OpenAILLMService.InputParams(temperature=0.5)
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
enable_tracing=IS_TRACING_ENABLED,
# Optionally, add a conversation ID to track the conversation
# conversation_id="8df26cc1-6db0-4a7a-9930-1e037c8f1fa2",
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,11 @@
DEEPGRAM_API_KEY=your_deepgram_key
CARTESIA_API_KEY=your_cartesia_key
OPENAI_API_KEY=your_openai_key
# Set to any value to enable tracing
ENABLE_TRACING=true
# OTLP endpoint (change to us.cloud.langfuse.com if you use the US data region)
OTEL_EXPORTER_OTLP_ENDPOINT=http://cloud.langfuse.com/api/public/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic%20<base64_encoded_api_keys>
# Set to any value to enable console output for debugging
# OTEL_CONSOLE_EXPORT=true

View File

@@ -0,0 +1,6 @@
fastapi
uvicorn
python-dotenv
pipecat-ai[webrtc,silero,cartesia,deepgram,openai,tracing]
pipecat-ai-small-webrtc-prebuilt
opentelemetry-exporter-otlp-proto-http

View File

@@ -0,0 +1,205 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import importlib.util
import os
import sys
from contextlib import asynccontextmanager
from inspect import iscoroutinefunction, signature
from typing import Any, Callable, Dict, Optional, Tuple
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/client", SmallWebRTCPrebuiltUI)
# Store program arguments
args: argparse.Namespace = argparse.Namespace()
# Store the bot module and function info
bot_module: Any = None
run_bot_func: Optional[Callable] = None
is_webrtc_bot: bool = True
def import_bot_file(file_path: str) -> Tuple[Any, Callable, bool]:
"""Dynamically import the bot file and determine how to run it.
Returns:
tuple: (module, run_function, is_webrtc_bot)
- module: The imported module
- run_function: Either run_bot or main function
- is_webrtc_bot: True if run_bot function exists and accepts a WebRTC connection
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Bot file not found: {file_path}")
# Extract module name without extension
module_name = os.path.splitext(os.path.basename(file_path))[0]
# Load the module
spec = importlib.util.spec_from_file_location(module_name, file_path)
if not spec or not spec.loader:
raise ImportError(f"Could not load spec for {file_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
# Check for run_bot function first
if hasattr(module, "run_bot"):
run_func = module.run_bot
# Check if the function accepts a WebRTC connection
sig = signature(run_func)
is_webrtc = len(sig.parameters) > 0
return module, run_func, is_webrtc
# Fall back to main function
if hasattr(module, "main") and iscoroutinefunction(module.main):
return module, module.main, False
raise AttributeError(f"No run_bot or async main function found in {file_path}")
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/client/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
global run_bot_func, is_webrtc_bot
if not run_bot_func:
raise RuntimeError("No bot file has been loaded")
if not is_webrtc_bot:
return {
"error": "This bot doesn't support WebRTC connections, it's running in standalone mode"
}
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False)
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
# We've already checked that run_bot_func exists
assert run_bot_func is not None
background_tasks.add_task(run_bot_func, pipecat_connection, args)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
async def run_standalone_bot() -> None:
"""Run a standalone bot that doesn't require WebRTC"""
global run_bot_func
if run_bot_func is not None:
await run_bot_func()
else:
raise RuntimeError("No bot function available to run")
def main(parser: Optional[argparse.ArgumentParser] = None):
global args
if not parser:
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("bot_file", nargs="?", help="Path to the bot file", default=None)
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count", default=0)
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logger.add(sys.stderr, level="TRACE")
else:
logger.add(sys.stderr, level="DEBUG")
# Infer the bot file from the caller if not provided explicitly
bot_file = args.bot_file
if bot_file is None:
# Get the __file__ of the script that called main()
import inspect
caller_frame = inspect.stack()[1]
caller_globals = caller_frame.frame.f_globals
bot_file = caller_globals.get("__file__")
if not bot_file:
print("❌ Could not determine the bot file. Pass it explicitly to main().")
sys.exit(1)
# Import the bot file
try:
global run_bot_func, bot_module, is_webrtc_bot
bot_module, run_bot_func, is_webrtc_bot = import_bot_file(bot_file)
logger.info(f"Successfully loaded bot from {bot_file}")
if is_webrtc_bot:
logger.info("Detected WebRTC-compatible bot, starting web server...")
uvicorn.run(app, host=args.host, port=args.port)
else:
logger.info("Detected standalone bot, running directly...")
asyncio.run(run_standalone_bot())
except Exception as e:
logger.error(f"Error loading bot file: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -118,6 +118,13 @@ Many cloud providers offer OpenTelemetry-compatible observability services:
See the OpenTelemetry documentation for specific exporter configurations:
https://opentelemetry.io/ecosystem/vendors/
#### LLM Tracing and Evaluation Providers
Many LLM-focused tracing and evaluation projects support OpenTelemetry, for example:
- Langfuse ([integration example](../open-telemetry-tracing-langfuse/))
- Arize Phoenix
### 5. Install Dependencies
```bash