From f0dfab23e7bbd68c189d2e62ca1a444186588df8 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 21 Aug 2025 16:16:10 -0400 Subject: [PATCH] Cleanup --- .../foundational/44-voicemail-detection.py | 2 +- .../voicemail/voicemail_detector.py | 185 +++++++++--------- 2 files changed, 90 insertions(+), 97 deletions(-) diff --git a/examples/foundational/44-voicemail-detection.py b/examples/foundational/44-voicemail-detection.py index 60eefa0c2..f7019f937 100644 --- a/examples/foundational/44-voicemail-detection.py +++ b/examples/foundational/44-voicemail-detection.py @@ -83,7 +83,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context_aggregator.user(), llm, tts, - voicemail.buffer(), # TTS buffering — Immediately after the TTS service + voicemail.gate(), # TTS buffering — Immediately after the TTS service transport.output(), context_aggregator.assistant(), ] diff --git a/src/pipecat/extensions/voicemail/voicemail_detector.py b/src/pipecat/extensions/voicemail/voicemail_detector.py index c1d3b723a..6c4ba221e 100644 --- a/src/pipecat/extensions/voicemail/voicemail_detector.py +++ b/src/pipecat/extensions/voicemail/voicemail_detector.py @@ -41,7 +41,7 @@ from pipecat.frames.frames import ( ) from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup from pipecat.services.llm_service import LLMService from pipecat.sync.base_notifier import BaseNotifier from pipecat.sync.event_notifier import EventNotifier @@ -72,6 +72,22 @@ class NotifierGate(FrameProcessor): self._gate_opened = True self._gate_task: Optional[asyncio.Task] = None + async def setup(self, setup: FrameProcessorSetup): + """Set up the processor with required components. + + Args: + setup: Configuration object containing setup parameters. + """ + await super().setup(setup) + self._gate_task = self.create_task(self._wait_for_notification()) + + async def cleanup(self): + """Clean up the processor resources.""" + await super().cleanup() + if self._gate_task: + await self.cancel_task(self._gate_task) + self._gate_task = None + async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames and control gate state based on notifier signals. @@ -81,16 +97,6 @@ class NotifierGate(FrameProcessor): """ await super().process_frame(frame, direction) - if isinstance(frame, StartFrame): - # Start the notification waiting task immediately - self._gate_task = self.create_task(self._wait_for_notification()) - - elif isinstance(frame, (EndFrame, CancelFrame)): - # Clean up the gate task when pipeline ends or is cancelled - if self._gate_task: - await self.cancel_task(self._gate_task) - self._gate_task = None - # Gate logic: open gate allows all frames, closed gate filters frames if self._gate_opened: await self.push_frame(frame, direction) @@ -118,17 +124,10 @@ class NotifierGate(FrameProcessor): This method blocks until the notifier signals, then closes the gate permanently to change frame filtering behavior. """ - try: - await self._notifier.wait() + await self._notifier.wait() - if self._gate_opened: - self._gate_opened = False - except asyncio.CancelledError: - logger.debug(f"{self}: {self._task_name} task was cancelled") - raise - except Exception as e: - logger.exception(f"{self}: Error in {self._task_name} task: {e}") - raise + if self._gate_opened: + self._gate_opened = False class ClassifierGate(NotifierGate): @@ -229,7 +228,25 @@ class ClassificationProcessor(FrameProcessor): # Voicemail timing state self._voicemail_detected = False - self._voicemail_handler_task: Optional[asyncio.Task] = None + self._voicemail_task: Optional[asyncio.Task] = None + self._voicemail_event = asyncio.Event() + self._voicemail_event.set() + + async def setup(self, setup: FrameProcessorSetup): + """Set up the processor with required components. + + Args: + setup: Configuration object containing setup parameters. + """ + await super().setup(setup) + self._voicemail_task = self.create_task(self._delayed_voicemail_handler()) + + async def cleanup(self): + """Clean up the processor resources.""" + await super().cleanup() + if self._voicemail_task: + await self.cancel_task(self._voicemail_task) + self._voicemail_task = None async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames and handle LLM classification responses. @@ -263,15 +280,12 @@ class ClassificationProcessor(FrameProcessor): self._response_buffer += frame.text elif isinstance(frame, UserStartedSpeakingFrame): - # User started speaking - cancel voicemail handler timer - if self._voicemail_handler_task: - await self.cancel_task(self._voicemail_handler_task) - self._voicemail_handler_task = None + # User started speaking - set the voicemail event + self._voicemail_event.set() elif isinstance(frame, UserStoppedSpeakingFrame): - # User stopped speaking - restart voicemail handler timer if voicemail detected - if self._voicemail_detected and not self._voicemail_handler_task: - self._voicemail_handler_task = self.create_task(self._delayed_voicemail_handler()) + # User stopped speaking - clear the voicemail event + self._voicemail_event.clear() else: # Pass all non-LLM frames through @@ -312,10 +326,8 @@ class ClassificationProcessor(FrameProcessor): # Interrupt the current pipeline to stop any ongoing processing await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM) - # Always start the handler timer immediately - # It will be cancelled and restarted if user starts/stops speaking - if not self._voicemail_handler_task: - self._voicemail_handler_task = self.create_task(self._delayed_voicemail_handler()) + # Set the voicemail event to trigger the voicemail handler + self._voicemail_event.clear() else: # This can happen if the LLM is interrupted before completing the response @@ -328,22 +340,18 @@ class ClassificationProcessor(FrameProcessor): developer's voicemail event handler. The timer can be cancelled and restarted based on user speech patterns to ensure proper timing. """ - try: - await asyncio.sleep(self._voicemail_response_delay) - + while True: try: - logger.debug(f"{self}: Triggering voicemail detected event") + await asyncio.wait_for( + self._voicemail_event.wait(), timeout=self._voicemail_response_delay + ) + await asyncio.sleep(0.1) + except asyncio.TimeoutError: await self._call_event_handler("on_voicemail_detected") - except Exception as e: - logger.exception(f"{self}: Error in voicemail event handler: {e}") - - except asyncio.CancelledError: - raise - finally: - self._voicemail_handler_task = None + break -class TTSBuffer(FrameProcessor): +class TTSGate(FrameProcessor): """Buffers TTS frames until voicemail classification decision is made. This processor holds TTS output frames in a buffer while the voicemail @@ -376,6 +384,27 @@ class TTSBuffer(FrameProcessor): self._conversation_task: Optional[asyncio.Task] = None self._voicemail_task: Optional[asyncio.Task] = None + async def setup(self, setup: FrameProcessorSetup): + """Set up the processor with required components. + + Args: + setup: Configuration object containing setup parameters. + """ + await super().setup(setup) + + self._conversation_task = self.create_task(self._wait_for_conversation()) + self._voicemail_task = self.create_task(self._wait_for_voicemail()) + + async def cleanup(self): + """Clean up the processor resources.""" + await super().cleanup() + if self._conversation_task: + await self.cancel_task(self._conversation_task) + self._conversation_task = None + if self._voicemail_task: + await self.cancel_task(self._voicemail_task) + self._voicemail_task = None + async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames and handle buffering logic based on frame type. @@ -389,24 +418,8 @@ class TTSBuffer(FrameProcessor): """ await super().process_frame(frame, direction) - if isinstance(frame, StartFrame): - # Start notification waiting tasks for both conversation and voicemail - self._conversation_task = self.create_task(self._wait_for_conversation()) - self._voicemail_task = self.create_task(self._wait_for_voicemail()) - await self.push_frame(frame, direction) - - elif isinstance(frame, (EndFrame, CancelFrame)): - # Clean up notification tasks when pipeline ends - if self._conversation_task: - await self.cancel_task(self._conversation_task) - self._conversation_task = None - if self._voicemail_task: - await self.cancel_task(self._voicemail_task) - self._voicemail_task = None - await self.push_frame(frame, direction) - # Core buffering logic: hold TTS frames, pass everything else through - elif self._buffering_active and isinstance( + if self._buffering_active and isinstance( frame, (TTSStartedFrame, TTSStoppedFrame, TTSTextFrame, TTSAudioRawFrame) ): # Buffer TTS frames while waiting for classification decision @@ -422,23 +435,13 @@ class TTSBuffer(FrameProcessor): in order to continue normal dialogue flow. This allows the bot to respond naturally to the human caller. """ - try: - await self._conversation_notifier.wait() + await self._conversation_notifier.wait() - # Release all buffered frames in original order - self._buffering_active = False - for frame, direction in self._frame_buffer: - await self.push_frame(frame, direction) - self._frame_buffer.clear() - - # Cancel the voicemail task since decision is final - if self._voicemail_task: - await self.cancel_task(self._voicemail_task) - self._voicemail_task = None - - except asyncio.CancelledError: - logger.debug(f"{self}: Conversation task was cancelled") - raise + # Release all buffered frames in original order + self._buffering_active = False + for frame, direction in self._frame_buffer: + await self.push_frame(frame, direction) + self._frame_buffer.clear() async def _wait_for_voicemail(self): """Wait for voicemail detection notification and clear buffered frames. @@ -448,21 +451,11 @@ class TTSBuffer(FrameProcessor): for voicemail systems. The developer event handlers will handle voicemail- specific audio output. """ - try: - await self._voicemail_notifier.wait() + await self._voicemail_notifier.wait() - # Clear buffered frames without playing them - self._buffering_active = False - self._frame_buffer.clear() - - # Cancel the conversation task since decision is final - if self._conversation_task: - await self.cancel_task(self._conversation_task) - self._conversation_task = None - - except asyncio.CancelledError: - logger.debug(f"{self}: Voicemail task was cancelled") - raise + # Clear buffered frames without playing them + self._buffering_active = False + self._frame_buffer.clear() class VoicemailDetector(ParallelPipeline): @@ -501,7 +494,7 @@ class VoicemailDetector(ParallelPipeline): context_aggregator.user(), llm, tts, - detector.buffer(), # TTS buffering + detector.gate(), # TTS buffering transport.output(), context_aggregator.assistant(), ]) @@ -589,7 +582,7 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo voicemail_notifier=self._voicemail_notifier, voicemail_response_delay=voicemail_response_delay, ) - self._voicemail_buffer = TTSBuffer(self._conversation_notifier, self._voicemail_notifier) + self._voicemail_gate = TTSGate(self._conversation_notifier, self._voicemail_notifier) # Initialize the parallel pipeline with conversation and classifier branches super().__init__( @@ -639,7 +632,7 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo """ return self - def buffer(self) -> TTSBuffer: + def gate(self) -> TTSGate: """Get the buffer processor for placement after TTS in the main pipeline. This should be placed after the TTS service and before the transport @@ -648,7 +641,7 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo Returns: The TTSBuffer processor instance. """ - return self._voicemail_buffer + return self._voicemail_gate def add_event_handler(self, event_name: str, handler): """Add an event handler for voicemail detection events.