diff --git a/examples/foundational/44-voicemail-detection.py b/examples/foundational/44-voicemail-detection.py index f2957309d..84e24cf2f 100644 --- a/examples/foundational/44-voicemail-detection.py +++ b/examples/foundational/44-voicemail-detection.py @@ -50,27 +50,6 @@ transport_params = { } -async def handle_voicemail(processor): - """Called when a voicemail is detected. - - Args: - processor: The VoicemailProcessor instance. processor.push_frame() is - available to push frames. - """ - logger.info("Voicemail detected! Leaving a message...") - - # Push frames using standard Pipecat pattern - await processor.push_frame( - TTSSpeakFrame( - "Hello, this is Jamie calling about your appointment. Please call me back at 555-0123 when you get this." - ) - ) - - # NOTE: A common pattern is to end pipeline after the voicemail is left. - # Uncomment the following line to end the pipeline after leaving the voicemail. - # await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - - async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") @@ -84,7 +63,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) classifier_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - voicemail = VoicemailDetector(llm=classifier_llm, on_voicemail_detected=handle_voicemail) + voicemail = VoicemailDetector(llm=classifier_llm) messages = [ { @@ -128,6 +107,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() + @voicemail.event_handler("on_voicemail_detected") + async def handle_voicemail(processor): + logger.info("Voicemail detected! Leaving a message...") + + # Push frames using standard Pipecat pattern + await processor.push_frame( + TTSSpeakFrame( + "Hello, this is Jamie calling about your appointment. Please call me back at 555-0123 when you get this." + ) + ) + + # NOTE: A common pattern is to end pipeline after the voicemail is left. + # Uncomment the following line to end the pipeline after leaving the voicemail. + # await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/src/pipecat/utils/voicemail/voicemail_detector.py b/src/pipecat/utils/voicemail/voicemail_detector.py index af7692918..c8ed6efd6 100644 --- a/src/pipecat/utils/voicemail/voicemail_detector.py +++ b/src/pipecat/utils/voicemail/voicemail_detector.py @@ -13,7 +13,7 @@ a bot needs to determine if a human answered or if the call went to voicemail. """ import asyncio -from typing import Awaitable, Callable, List, Optional +from typing import List, Optional from loguru import logger @@ -208,7 +208,7 @@ class ConversationGate(FrameProcessor): class ClassificationProcessor(FrameProcessor): - """Processor that handles LLM classification responses and triggers callbacks. + """Processor that handles LLM classification responses and triggers events. This processor aggregates LLM text tokens into complete responses and analyzes them to determine if the call reached a voicemail system or a live person. @@ -218,9 +218,9 @@ class ClassificationProcessor(FrameProcessor): The processor expects responses containing either "CONVERSATION" (indicating a human answered) or "VOICEMAIL" (indicating an automated system). Once a - decision is made, it triggers the appropriate notifications and callbacks. + decision is made, it triggers the appropriate notifications and event handlers. - For voicemail detection, the callback timer starts immediately and is cancelled + For voicemail detection, the event handler timer starts immediately and is cancelled and restarted based on user speech patterns to ensure proper timing. """ @@ -230,9 +230,6 @@ class ClassificationProcessor(FrameProcessor): gate_notifier: BaseNotifier, conversation_notifier: BaseNotifier, voicemail_notifier: BaseNotifier, - on_voicemail_detected: Optional[ - Callable[["ClassificationProcessor"], Awaitable[None]] - ] = None, voicemail_response_delay: float, ): """Initialize the voicemail processor. @@ -244,20 +241,19 @@ class ClassificationProcessor(FrameProcessor): all buffered TTS frames for normal conversation flow. voicemail_notifier: Notifier to signal the TTSBuffer to clear buffered TTS frames since voicemail was detected. - on_voicemail_detected: Optional callback function called when voicemail - is detected. The callback receives this processor instance and can - use it to push custom frames (like voicemail greetings). voicemail_response_delay: Delay in seconds after user stops speaking - before triggering the voicemail callback. This ensures the voicemail + before triggering the voicemail event handler. This ensures the voicemail greeting or user message is complete before responding. """ super().__init__() self._gate_notifier = gate_notifier self._conversation_notifier = conversation_notifier self._voicemail_notifier = voicemail_notifier - self._on_voicemail_detected = on_voicemail_detected self._voicemail_response_delay = voicemail_response_delay + # Register the voicemail detected event + self._register_event_handler("on_voicemail_detected") + # Aggregation state for collecting complete LLM responses self._processing_response = False self._response_buffer = "" @@ -265,7 +261,7 @@ class ClassificationProcessor(FrameProcessor): # Voicemail timing state self._voicemail_detected = False - self._voicemail_callback_task: Optional[asyncio.Task] = None + self._voicemail_handler_task: Optional[asyncio.Task] = None async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames and handle LLM classification responses. @@ -299,15 +295,15 @@ class ClassificationProcessor(FrameProcessor): self._response_buffer += frame.text elif isinstance(frame, UserStartedSpeakingFrame): - # User started speaking - cancel voicemail callback timer - if self._voicemail_callback_task: - await self.cancel_task(self._voicemail_callback_task) - self._voicemail_callback_task = None + # User started speaking - cancel voicemail handler timer + if self._voicemail_handler_task: + await self.cancel_task(self._voicemail_handler_task) + self._voicemail_handler_task = None elif isinstance(frame, UserStoppedSpeakingFrame): - # User stopped speaking - restart voicemail callback timer if voicemail detected - if self._voicemail_detected and not self._voicemail_callback_task: - self._voicemail_callback_task = self.create_task(self._delayed_voicemail_callback()) + # User stopped speaking - restart voicemail handler timer if voicemail detected + if self._voicemail_detected and not self._voicemail_handler_task: + self._voicemail_handler_task = self.create_task(self._delayed_voicemail_handler()) else: # Pass all non-LLM frames through @@ -348,36 +344,35 @@ class ClassificationProcessor(FrameProcessor): # Interrupt the current pipeline to stop any ongoing processing await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM) - # Always start the callback timer immediately + # Always start the handler timer immediately # It will be cancelled and restarted if user starts/stops speaking - if not self._voicemail_callback_task: - self._voicemail_callback_task = self.create_task(self._delayed_voicemail_callback()) + if not self._voicemail_handler_task: + self._voicemail_handler_task = self.create_task(self._delayed_voicemail_handler()) else: # This can happen if the LLM is interrupted before completing the response logger.debug(f"{self}: No classification found: '{full_response}'") - async def _delayed_voicemail_callback(self): - """Execute the voicemail callback after the configured delay. + async def _delayed_voicemail_handler(self): + """Execute the voicemail event handler after the configured delay. This method waits for the specified delay period, then triggers the - developer's voicemail callback. The timer can be cancelled and restarted + developer's voicemail event handler. The timer can be cancelled and restarted based on user speech patterns to ensure proper timing. """ try: await asyncio.sleep(self._voicemail_response_delay) - if self._on_voicemail_detected: - try: - logger.debug(f"{self}: Executing voicemail callback") - await self._on_voicemail_detected(self) - except Exception as e: - logger.exception(f"{self}: Error in voicemail callback: {e}") + try: + logger.debug(f"{self}: Triggering voicemail detected event") + await self._call_event_handler("on_voicemail_detected") + except Exception as e: + logger.exception(f"{self}: Error in voicemail event handler: {e}") except asyncio.CancelledError: raise finally: - self._voicemail_callback_task = None + self._voicemail_handler_task = None class TTSBuffer(FrameProcessor): @@ -482,7 +477,7 @@ class TTSBuffer(FrameProcessor): When voicemail is detected, all buffered TTS frames are discarded since they were intended for human conversation and are not appropriate - for voicemail systems. The developer callback will handle voicemail- + for voicemail systems. The developer event handlers will handle voicemail- specific audio output. """ try: @@ -520,20 +515,17 @@ class VoicemailDetector(ParallelPipeline): Once a decision is made, the appropriate action is taken: - CONVERSATION: Continue normal bot dialogue - - VOICEMAIL: Trigger developer callback for custom voicemail handling + - VOICEMAIL: Trigger developer event handler for custom voicemail handling Example:: classification_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + detector = VoicemailDetector(llm=classification_llm) + @detector.event_handler("on_voicemail_detected") async def handle_voicemail(processor): await processor.push_frame(TTSSpeakFrame("Please leave a message.")) - detector = VoicemailDetector( - llm=classification_llm, - on_voicemail_detected=handle_voicemail - ) - pipeline = Pipeline([ transport.input(), stt, @@ -545,6 +537,11 @@ class VoicemailDetector(ParallelPipeline): transport.output(), context_aggregator.assistant(), ]) + + Events: + on_voicemail_detected: Triggered when voicemail is detected after the configured + delay. The event handler receives one argument: the ClassificationProcessor + instance which can be used to push frames. """ DEFAULT_SYSTEM_PROMPT = """You are a voicemail detection classifier for an OUTBOUND calling system. A bot has called a phone number and you need to determine if a human answered or if the call went to voicemail based on the provided text. @@ -573,7 +570,6 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo self, *, llm: LLMService, - on_voicemail_detected: Callable[["ClassificationProcessor"], Awaitable[None]], voicemail_response_delay: float = 2.0, system_prompt: Optional[str] = None, ): @@ -582,11 +578,8 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo Args: llm: LLM service used for voicemail vs conversation classification. Should be fast and reliable for real-time classification. - on_voicemail_detected: Optional callback function invoked when voicemail - is detected. Receives the ClassificationProcessor instance which can be - used to push frames (like custom voicemail greetings). voicemail_response_delay: Delay in seconds after user stops speaking - before triggering the voicemail callback. This allows voicemail + before triggering the voicemail event handler. This allows voicemail responses to be played back after a short delay to ensure the response occurs during the voicemail recording. Default is 2.0 seconds. system_prompt: Optional custom system prompt for classification. If None, @@ -626,7 +619,6 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo gate_notifier=self._gate_notifier, conversation_notifier=self._conversation_notifier, voicemail_notifier=self._voicemail_notifier, - on_voicemail_detected=on_voicemail_detected, voicemail_response_delay=voicemail_response_delay, ) self._voicemail_buffer = TTSBuffer(self._conversation_notifier, self._voicemail_notifier) @@ -645,6 +637,9 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo ], ) + # Register the voicemail detected event after super().__init__() + self._register_event_handler("on_voicemail_detected") + def _validate_prompt(self, prompt: str) -> None: """Validate custom prompt contains required response format instructions. @@ -686,3 +681,15 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo The TTSBuffer processor instance. """ return self._voicemail_buffer + + def add_event_handler(self, event_name: str, handler): + """Add an event handler for voicemail detection events. + + Args: + event_name: The name of the event to handle. + handler: The function to call when the event occurs. + """ + if event_name == "on_voicemail_detected": + self._classification_processor.add_event_handler(event_name, handler) + else: + super().add_event_handler(event_name, handler)