Compare commits

...

1 Commits

Author SHA1 Message Date
vipyne
f2982b4b68 add observer to trace basic STT, LLM, & TTS spans 2025-12-02 16:15:14 -06:00
2 changed files with 302 additions and 0 deletions

View File

@@ -0,0 +1,147 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
InterruptionFrame,
LLMRunFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
from pipecat.observers.stt_llm_tts_trace_observer import STTLLMTTSTraceObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[
STTLLMTTSTraceObserver(),
],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,155 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LLM logging observer for Pipecat."""
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
TranscriptionFrame,
TTSStartedFrame,
TTSStoppedFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.stt_service import STTService
from pipecat.services.tts_service import TTSService
from pipecat.transports.base_output import BaseOutputTransport
class STTLLMTTSTraceObserver(BaseObserver):
"""Observer to basic STT, LLM, & TTS activity to the console.
Logs all frame instances of:
- UserStartedSpeakingFrame
- TranscriptionFrame
- LLMFullResponseStartFrame
- LLMFullResponseEndFrame
- TTSStartedFrame
- TTSStoppedFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
"""
def __init__(self):
"""Initialize frame start times to calculate span times."""
super().__init__()
self._last_user_started_speaking_frame_time = 0
self._last_transcription_frame_time = 0
self._last_llm_response_start_frame_time = 0
self._last_tts_started_frame_time = 0
self._last_tts_stopped_frame_time = 0
self._last_bot_started_speaking_frame_time = 0
self._arrow = ""
async def on_push_frame(self, data: FramePushed):
"""Handle frame push events and log STT, LLM, & TTS activities.
Args:
data: The frame push event data containing source, destination,
frame, direction, and timestamp information.
"""
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
time_sec = timestamp / 1_000_000_000
if isinstance(src, BaseOutputTransport):
# Trace STT
if isinstance(frame, UserStartedSpeakingFrame):
self._handle_UserStartedSpeakingFrame(src, dst, frame, time_sec)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._handle_UserStoppedSpeakingFrame(src, dst, frame, time_sec)
# TTS
if isinstance(dst, TTSService):
if isinstance(frame, BotStartedSpeakingFrame):
self._handle_BotStartedSpeakingFrame(src, dst, frame, time_sec)
elif isinstance(frame, BotStoppedSpeakingFrame):
self._handle_BotStoppedSpeakingFrame(src, dst, frame, time_sec)
# STT
elif isinstance(src, STTService):
if isinstance(frame, TranscriptionFrame):
self._handle_TranscriptionFrame(src, dst, frame, time_sec)
# Trace LLM
elif isinstance(src, LLMService):
if isinstance(frame, LLMFullResponseStartFrame):
self._handle_LLMFullResponseStartFrame(src, dst, frame, time_sec)
elif isinstance(frame, LLMFullResponseEndFrame):
self._handle_LLMFullResponseEndFrame(src, dst, frame, time_sec)
# Trace TTS
elif isinstance(src, TTSService):
if isinstance(frame, TTSStartedFrame):
self._handle_TTSStartedFrame(src, dst, frame, time_sec)
elif isinstance(frame, TTSStoppedFrame):
self._handle_TTSStoppedFrame(src, dst, frame, time_sec)
# STT frame handlers
def _handle_UserStartedSpeakingFrame(self, src, dst, frame, time_sec):
self._last_user_started_speaking_frame_time = time_sec
logger.debug(f"🙂🟢 UserStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_UserStoppedSpeakingFrame(self, src, dst, frame, time_sec):
logger.debug(f"🙂🔴 UserStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_TranscriptionFrame(self, src, dst, frame, time_sec):
self._last_transcription_frame_time = time_sec
logger.debug(f"🙂📝 TranscriptionFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
if 0 != self._last_user_started_speaking_frame_time:
stt_generation_time = time_sec - self._last_user_started_speaking_frame_time
self._last_user_started_speaking_frame_time = 0
logger.info(f"📝⏰ STT span: {stt_generation_time:.4f}s")
# LLM frame handlers
def _handle_LLMFullResponseStartFrame(self, src, dst, frame, time_sec):
self._last_llm_response_start_frame_time = time_sec
logger.debug(
f"🧠🟢 LLMFullResponseStartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
)
def _handle_LLMFullResponseEndFrame(self, src, dst, frame, time_sec):
logger.debug(f"🧠🔴 LLMFullResponseEndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
llm_time = time_sec - self._last_llm_response_start_frame_time
logger.info(f"🧠⏰ LLM span: {llm_time:.4f}s")
# TTS frame handlers
def _handle_TTSStartedFrame(self, src, dst, frame, time_sec):
self._last_tts_started_frame_time = time_sec
logger.debug(f"📢🟢 TTSStartedFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_TTSStoppedFrame(self, src, dst, frame, time_sec):
self._last_tts_stopped_frame_time = time_sec
tts_time = time_sec - self._last_tts_started_frame_time
logger.debug(f"📢🔴 TTSStoppedFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
logger.info(f"📢⏰ TTS generation span: {tts_time:.4f}s")
def _handle_BotStartedSpeakingFrame(self, src, dst, frame, time_sec):
self._last_bot_started_speaking_frame_time = time_sec
tts_time = time_sec - self._last_tts_started_frame_time
logger.debug(f"🤖🟢 BotStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
logger.info(f"📢⏰ TTS to first speech span: {tts_time:.4f}s")
def _handle_BotStoppedSpeakingFrame(self, src, dst, frame, time_sec):
logger.debug(f"🤖🔴 BotStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")