Compare commits

...

1 Commits

Author SHA1 Message Date
James Hush
6bb3cb2b83 demo: DelayProcessor 2025-09-11 16:05:08 +08:00

View File

@@ -4,17 +4,19 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import Frame, LLMFullResponseEndFrame, LLMRunFrame, LLMTextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -26,6 +28,62 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
class DelayProcessor(FrameProcessor):
"""Custom processor that queues LLM text frames until response is complete.
This creates a more natural conversation flow by preventing the agent from
responding immediately after the user stops speaking. It queues all LLMTextFrames
until it sees an LLMFullResponseEndFrame, then waits for the specified delay
before releasing all queued frames at once.
"""
def __init__(self, *, delay_seconds: float = 1.0, **kwargs) -> None:
"""Initialize the DelayProcessor.
Args:
delay_seconds: Number of seconds to delay before releasing queued frames (default: 1.0)
"""
super().__init__(**kwargs)
self._delay_seconds = delay_seconds
self._queued_frames = []
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process frames, queuing LLM text frames until response is complete.
Args:
frame: The frame to process
direction: Direction of the frame in the pipeline
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMTextFrame):
# Queue LLM text frames instead of pushing them immediately
logger.debug(f"Queuing LLMTextFrame: {frame.text}")
self._queued_frames.append((frame, direction))
elif isinstance(frame, LLMFullResponseEndFrame):
# When we see the end frame, wait for delay then push all queued frames
logger.debug(
f"LLM response complete, delaying {self._delay_seconds} seconds before releasing {len(self._queued_frames)} queued frames"
)
await asyncio.sleep(self._delay_seconds)
# Push all queued LLM text frames
for queued_frame, queued_direction in self._queued_frames:
logger.debug(f"Releasing queued LLMTextFrame: {queued_frame.text}")
await self.push_frame(queued_frame, queued_direction)
# Clear the queue
self._queued_frames.clear()
# Push the end frame
logger.debug("Pushing LLMFullResponseEndFrame")
await self.push_frame(frame, direction)
else:
# Push all other frames immediately
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -70,12 +128,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Create delay processor to add 1-second delay before agent responses
delay_processor = DelayProcessor(delay_seconds=1.0)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
delay_processor, # Add delay before TTS
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses