diff --git a/examples/foundational/07d-interruptible-elevenlabs.py b/examples/foundational/07d-interruptible-elevenlabs.py index 473ebd5d0..fadf78eeb 100644 --- a/examples/foundational/07d-interruptible-elevenlabs.py +++ b/examples/foundational/07d-interruptible-elevenlabs.py @@ -5,16 +5,27 @@ # +import asyncio import os from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + Frame, + LLMFullResponseStartFrame, + LLMTextFrame, + TranscriptionFrame, + TTSSpeakFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService @@ -26,6 +37,14 @@ from pipecat.transports.services.daily import DailyParams load_dotenv(override=True) +# Create VAD parameters optimized for quiet speakers +quiet_speaker_vad_params = VADParams( + confidence=0.4, # Lower confidence threshold (default: 0.7) + min_volume=0.3, # Lower volume threshold (default: 0.6) + start_secs=0.1, # Faster response to speech start (default: 0.2) + stop_secs=1.0, # Longer wait before stopping (default: 0.8) +) + # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets @@ -34,21 +53,80 @@ transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, - vad_analyzer=SileroVADAnalyzer(), + vad_analyzer=SileroVADAnalyzer(params=quiet_speaker_vad_params), ), "twilio": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, - vad_analyzer=SileroVADAnalyzer(), + vad_analyzer=SileroVADAnalyzer(params=quiet_speaker_vad_params), ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, - vad_analyzer=SileroVADAnalyzer(), + vad_analyzer=SileroVADAnalyzer(params=quiet_speaker_vad_params), ), } +class TranscriptionLogger(FrameProcessor): + """Custom processor that logs transcription frames.""" + + async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + + # Only log TranscriptionFrame objects + if isinstance(frame, TranscriptionFrame): + logger.info(f"[TRANSCRIPTION]: {frame.text}") + + # Always pass the frame through to maintain pipeline flow + await self.push_frame(frame, direction) + + +class InterventionProcessor(FrameProcessor): + """Custom processor that logs LLM response frames.""" + + def __init__(self): + super().__init__() + self._timer_task = None + + async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + + # Log LLM response start frames + if isinstance(frame, LLMFullResponseStartFrame): + logger.info(f"[LLM_START]: Starting LLM response") + + # Cancel any existing timer + if self._timer_task and not self._timer_task.done(): + self._timer_task.cancel() + + # Start a new 500ms timer + self._timer_task = asyncio.create_task(self._log_after_delay()) + + # Cancel timer if bot started speaking before 500ms + elif isinstance(frame, BotStartedSpeakingFrame): + logger.info(f"[BOT_SPEAKING]: Bot started speaking, canceling intervention timer") + if self._timer_task and not self._timer_task.done(): + self._timer_task.cancel() + + # Log LLM text frames + elif isinstance(frame, LLMTextFrame): + logger.info(f"[LLM_TEXT]: {frame.text}") + + # Always pass the frame through to maintain pipeline flow + await self.push_frame(frame, direction) + + async def _log_after_delay(self): + """Log a message after 500ms delay.""" + try: + await asyncio.sleep(0.5) # 500ms + logger.info(f"500ms passed since LLMFullResponseStartFrame") + await self.queue_frame(TTSSpeakFrame("um...")) + except asyncio.CancelledError: + # Timer was cancelled, which is fine + pass + + async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") @@ -71,13 +149,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) + # Create transcription logger instance + transcription_logger = TranscriptionLogger() + + # Create LLM logger instance + intervention = InterventionProcessor() + pipeline = Pipeline( [ transport.input(), # Transport user input stt, + transcription_logger, # Log transcription frames context_aggregator.user(), # User responses llm, # LLM tts, # TTS + intervention, # Log LLM response frames transport.output(), # Transport bot output context_aggregator.assistant(), # Assistant spoken responses ]