Compare commits
1 Commits
hush/TurnT
...
hush/delay
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bb3cb2b83 |
@@ -4,17 +4,19 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.frames.frames import Frame, LLMFullResponseEndFrame, LLMRunFrame, LLMTextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
@@ -26,6 +28,62 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class DelayProcessor(FrameProcessor):
|
||||
"""Custom processor that queues LLM text frames until response is complete.
|
||||
|
||||
This creates a more natural conversation flow by preventing the agent from
|
||||
responding immediately after the user stops speaking. It queues all LLMTextFrames
|
||||
until it sees an LLMFullResponseEndFrame, then waits for the specified delay
|
||||
before releasing all queued frames at once.
|
||||
"""
|
||||
|
||||
def __init__(self, *, delay_seconds: float = 1.0, **kwargs) -> None:
|
||||
"""Initialize the DelayProcessor.
|
||||
|
||||
Args:
|
||||
delay_seconds: Number of seconds to delay before releasing queued frames (default: 1.0)
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._delay_seconds = delay_seconds
|
||||
self._queued_frames = []
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
|
||||
"""Process frames, queuing LLM text frames until response is complete.
|
||||
|
||||
Args:
|
||||
frame: The frame to process
|
||||
direction: Direction of the frame in the pipeline
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMTextFrame):
|
||||
# Queue LLM text frames instead of pushing them immediately
|
||||
logger.debug(f"Queuing LLMTextFrame: {frame.text}")
|
||||
self._queued_frames.append((frame, direction))
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
# When we see the end frame, wait for delay then push all queued frames
|
||||
logger.debug(
|
||||
f"LLM response complete, delaying {self._delay_seconds} seconds before releasing {len(self._queued_frames)} queued frames"
|
||||
)
|
||||
await asyncio.sleep(self._delay_seconds)
|
||||
|
||||
# Push all queued LLM text frames
|
||||
for queued_frame, queued_direction in self._queued_frames:
|
||||
logger.debug(f"Releasing queued LLMTextFrame: {queued_frame.text}")
|
||||
await self.push_frame(queued_frame, queued_direction)
|
||||
|
||||
# Clear the queue
|
||||
self._queued_frames.clear()
|
||||
|
||||
# Push the end frame
|
||||
logger.debug("Pushing LLMFullResponseEndFrame")
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
# Push all other frames immediately
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
@@ -70,12 +128,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Create delay processor to add 1-second delay before agent responses
|
||||
delay_processor = DelayProcessor(delay_seconds=1.0)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
delay_processor, # Add delay before TTS
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
|
||||
Reference in New Issue
Block a user