# # Copyright (c) 2024–2025, Daily # # SPDX-License-Identifier: BSD 2-Clause License # import asyncio import os import sys from typing import List, Optional import aiohttp from dotenv import load_dotenv from loguru import logger from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( CancelFrame, TranscriptionMessage, TranscriptionUpdateFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.services.anthropic import AnthropicLLMService from pipecat.services.cartesia import CartesiaTTSService from pipecat.services.deepgram import DeepgramSTTService from pipecat.transports.services.daily import DailyParams, DailyTransport load_dotenv(override=True) logger.remove(0) logger.add(sys.stderr, level="DEBUG") class TranscriptHandler: """Handles real-time transcript processing and output. Maintains a list of conversation messages and outputs them either to a log or to a file as they are received. Each message includes its timestamp and role. Attributes: messages: List of all processed transcript messages output_file: Optional path to file where transcript is saved. If None, outputs to log only. """ def __init__(self, output_file: Optional[str] = None): """Initialize handler with optional file output. Args: output_file: Path to output file. If None, outputs to log only. """ self.messages: List[TranscriptionMessage] = [] self.output_file: Optional[str] = output_file logger.debug( f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}" ) async def save_message(self, message: TranscriptionMessage): """Save a single transcript message. Outputs the message to the log and optionally to a file. Args: message: The message to save """ timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}{message.role}: {message.content}" # Always log the message logger.info(f"Transcript: {line}") # Optionally write to file if self.output_file: try: with open(self.output_file, "a", encoding="utf-8") as f: f.write(line + "\n") except Exception as e: logger.error(f"Error saving transcript message to file: {e}") async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame ): """Handle new transcript messages. Args: processor: The TranscriptProcessor that emitted the update frame: TranscriptionUpdateFrame containing new messages """ logger.debug(f"Received transcript update with {len(frame.messages)} new messages") for msg in frame.messages: self.messages.append(msg) await self.save_message(msg) async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) transport = DailyTransport( room_url, None, "Respond bot", DailyParams( audio_out_enabled=True, vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), vad_audio_passthrough=True, ), ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) llm = AnthropicLLMService( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20241022" ) messages = [ { "role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", }, {"role": "user", "content": "Say hello."}, ] context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) # Create transcript processor and handler transcript = TranscriptProcessor() transcript_handler = TranscriptHandler() # Output to log only # transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log pipeline = Pipeline( [ transport.input(), # Transport user input stt, # STT transcript.user(), # User transcripts context_aggregator.user(), # User responses llm, # LLM tts, # TTS transport.output(), # Transport bot output transcript.assistant(), # Assistant transcripts context_aggregator.assistant(), # Assistant spoken responses ] ) task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. await task.queue_frames([context_aggregator.user().get_context_frame()]) # Register event handler for transcript updates @transcript.event_handler("on_transcript_update") async def on_transcript_update(processor, frame): await transcript_handler.on_transcript_update(processor, frame) @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): # Stop the pipeline immediately when the participant leaves await task.queue_frame(CancelFrame()) runner = PipelineRunner() await runner.run(task) if __name__ == "__main__": asyncio.run(main())