This commit is contained in:
Mark Backman
2025-08-21 16:16:10 -04:00
parent fbc907c371
commit f0dfab23e7
2 changed files with 90 additions and 97 deletions

View File

@@ -83,7 +83,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context_aggregator.user(),
llm,
tts,
voicemail.buffer(), # TTS buffering — Immediately after the TTS service
voicemail.gate(), # TTS buffering — Immediately after the TTS service
transport.output(),
context_aggregator.assistant(),
]

View File

@@ -41,7 +41,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.services.llm_service import LLMService
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.sync.event_notifier import EventNotifier
@@ -72,6 +72,22 @@ class NotifierGate(FrameProcessor):
self._gate_opened = True
self._gate_task: Optional[asyncio.Task] = None
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._gate_task = self.create_task(self._wait_for_notification())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and control gate state based on notifier signals.
@@ -81,16 +97,6 @@ class NotifierGate(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
# Start the notification waiting task immediately
self._gate_task = self.create_task(self._wait_for_notification())
elif isinstance(frame, (EndFrame, CancelFrame)):
# Clean up the gate task when pipeline ends or is cancelled
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
# Gate logic: open gate allows all frames, closed gate filters frames
if self._gate_opened:
await self.push_frame(frame, direction)
@@ -118,17 +124,10 @@ class NotifierGate(FrameProcessor):
This method blocks until the notifier signals, then closes the gate
permanently to change frame filtering behavior.
"""
try:
await self._notifier.wait()
await self._notifier.wait()
if self._gate_opened:
self._gate_opened = False
except asyncio.CancelledError:
logger.debug(f"{self}: {self._task_name} task was cancelled")
raise
except Exception as e:
logger.exception(f"{self}: Error in {self._task_name} task: {e}")
raise
if self._gate_opened:
self._gate_opened = False
class ClassifierGate(NotifierGate):
@@ -229,7 +228,25 @@ class ClassificationProcessor(FrameProcessor):
# Voicemail timing state
self._voicemail_detected = False
self._voicemail_handler_task: Optional[asyncio.Task] = None
self._voicemail_task: Optional[asyncio.Task] = None
self._voicemail_event = asyncio.Event()
self._voicemail_event.set()
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._voicemail_task = self.create_task(self._delayed_voicemail_handler())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._voicemail_task:
await self.cancel_task(self._voicemail_task)
self._voicemail_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and handle LLM classification responses.
@@ -263,15 +280,12 @@ class ClassificationProcessor(FrameProcessor):
self._response_buffer += frame.text
elif isinstance(frame, UserStartedSpeakingFrame):
# User started speaking - cancel voicemail handler timer
if self._voicemail_handler_task:
await self.cancel_task(self._voicemail_handler_task)
self._voicemail_handler_task = None
# User started speaking - set the voicemail event
self._voicemail_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
# User stopped speaking - restart voicemail handler timer if voicemail detected
if self._voicemail_detected and not self._voicemail_handler_task:
self._voicemail_handler_task = self.create_task(self._delayed_voicemail_handler())
# User stopped speaking - clear the voicemail event
self._voicemail_event.clear()
else:
# Pass all non-LLM frames through
@@ -312,10 +326,8 @@ class ClassificationProcessor(FrameProcessor):
# Interrupt the current pipeline to stop any ongoing processing
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
# Always start the handler timer immediately
# It will be cancelled and restarted if user starts/stops speaking
if not self._voicemail_handler_task:
self._voicemail_handler_task = self.create_task(self._delayed_voicemail_handler())
# Set the voicemail event to trigger the voicemail handler
self._voicemail_event.clear()
else:
# This can happen if the LLM is interrupted before completing the response
@@ -328,22 +340,18 @@ class ClassificationProcessor(FrameProcessor):
developer's voicemail event handler. The timer can be cancelled and restarted
based on user speech patterns to ensure proper timing.
"""
try:
await asyncio.sleep(self._voicemail_response_delay)
while True:
try:
logger.debug(f"{self}: Triggering voicemail detected event")
await asyncio.wait_for(
self._voicemail_event.wait(), timeout=self._voicemail_response_delay
)
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
await self._call_event_handler("on_voicemail_detected")
except Exception as e:
logger.exception(f"{self}: Error in voicemail event handler: {e}")
except asyncio.CancelledError:
raise
finally:
self._voicemail_handler_task = None
break
class TTSBuffer(FrameProcessor):
class TTSGate(FrameProcessor):
"""Buffers TTS frames until voicemail classification decision is made.
This processor holds TTS output frames in a buffer while the voicemail
@@ -376,6 +384,27 @@ class TTSBuffer(FrameProcessor):
self._conversation_task: Optional[asyncio.Task] = None
self._voicemail_task: Optional[asyncio.Task] = None
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
self._conversation_task = self.create_task(self._wait_for_conversation())
self._voicemail_task = self.create_task(self._wait_for_voicemail())
async def cleanup(self):
"""Clean up the processor resources."""
await super().cleanup()
if self._conversation_task:
await self.cancel_task(self._conversation_task)
self._conversation_task = None
if self._voicemail_task:
await self.cancel_task(self._voicemail_task)
self._voicemail_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and handle buffering logic based on frame type.
@@ -389,24 +418,8 @@ class TTSBuffer(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
# Start notification waiting tasks for both conversation and voicemail
self._conversation_task = self.create_task(self._wait_for_conversation())
self._voicemail_task = self.create_task(self._wait_for_voicemail())
await self.push_frame(frame, direction)
elif isinstance(frame, (EndFrame, CancelFrame)):
# Clean up notification tasks when pipeline ends
if self._conversation_task:
await self.cancel_task(self._conversation_task)
self._conversation_task = None
if self._voicemail_task:
await self.cancel_task(self._voicemail_task)
self._voicemail_task = None
await self.push_frame(frame, direction)
# Core buffering logic: hold TTS frames, pass everything else through
elif self._buffering_active and isinstance(
if self._buffering_active and isinstance(
frame, (TTSStartedFrame, TTSStoppedFrame, TTSTextFrame, TTSAudioRawFrame)
):
# Buffer TTS frames while waiting for classification decision
@@ -422,23 +435,13 @@ class TTSBuffer(FrameProcessor):
in order to continue normal dialogue flow. This allows the bot to
respond naturally to the human caller.
"""
try:
await self._conversation_notifier.wait()
await self._conversation_notifier.wait()
# Release all buffered frames in original order
self._buffering_active = False
for frame, direction in self._frame_buffer:
await self.push_frame(frame, direction)
self._frame_buffer.clear()
# Cancel the voicemail task since decision is final
if self._voicemail_task:
await self.cancel_task(self._voicemail_task)
self._voicemail_task = None
except asyncio.CancelledError:
logger.debug(f"{self}: Conversation task was cancelled")
raise
# Release all buffered frames in original order
self._buffering_active = False
for frame, direction in self._frame_buffer:
await self.push_frame(frame, direction)
self._frame_buffer.clear()
async def _wait_for_voicemail(self):
"""Wait for voicemail detection notification and clear buffered frames.
@@ -448,21 +451,11 @@ class TTSBuffer(FrameProcessor):
for voicemail systems. The developer event handlers will handle voicemail-
specific audio output.
"""
try:
await self._voicemail_notifier.wait()
await self._voicemail_notifier.wait()
# Clear buffered frames without playing them
self._buffering_active = False
self._frame_buffer.clear()
# Cancel the conversation task since decision is final
if self._conversation_task:
await self.cancel_task(self._conversation_task)
self._conversation_task = None
except asyncio.CancelledError:
logger.debug(f"{self}: Voicemail task was cancelled")
raise
# Clear buffered frames without playing them
self._buffering_active = False
self._frame_buffer.clear()
class VoicemailDetector(ParallelPipeline):
@@ -501,7 +494,7 @@ class VoicemailDetector(ParallelPipeline):
context_aggregator.user(),
llm,
tts,
detector.buffer(), # TTS buffering
detector.gate(), # TTS buffering
transport.output(),
context_aggregator.assistant(),
])
@@ -589,7 +582,7 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo
voicemail_notifier=self._voicemail_notifier,
voicemail_response_delay=voicemail_response_delay,
)
self._voicemail_buffer = TTSBuffer(self._conversation_notifier, self._voicemail_notifier)
self._voicemail_gate = TTSGate(self._conversation_notifier, self._voicemail_notifier)
# Initialize the parallel pipeline with conversation and classifier branches
super().__init__(
@@ -639,7 +632,7 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo
"""
return self
def buffer(self) -> TTSBuffer:
def gate(self) -> TTSGate:
"""Get the buffer processor for placement after TTS in the main pipeline.
This should be placed after the TTS service and before the transport
@@ -648,7 +641,7 @@ Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it's vo
Returns:
The TTSBuffer processor instance.
"""
return self._voicemail_buffer
return self._voicemail_gate
def add_event_handler(self, event_name: str, handler):
"""Add an event handler for voicemail detection events.