Compare commits
1 Commits
hush/usage
...
hush/delay
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bb3cb2b83 |
@@ -4,17 +4,19 @@
|
|||||||
# SPDX-License-Identifier: BSD 2-Clause License
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
#
|
#
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||||
from pipecat.frames.frames import LLMRunFrame
|
from pipecat.frames.frames import Frame, LLMFullResponseEndFrame, LLMRunFrame, LLMTextFrame
|
||||||
from pipecat.pipeline.pipeline import Pipeline
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
from pipecat.pipeline.runner import PipelineRunner
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||||
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||||
from pipecat.runner.types import RunnerArguments
|
from pipecat.runner.types import RunnerArguments
|
||||||
from pipecat.runner.utils import create_transport
|
from pipecat.runner.utils import create_transport
|
||||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||||
@@ -26,6 +28,62 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
|||||||
|
|
||||||
load_dotenv(override=True)
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
|
||||||
|
class DelayProcessor(FrameProcessor):
|
||||||
|
"""Custom processor that queues LLM text frames until response is complete.
|
||||||
|
|
||||||
|
This creates a more natural conversation flow by preventing the agent from
|
||||||
|
responding immediately after the user stops speaking. It queues all LLMTextFrames
|
||||||
|
until it sees an LLMFullResponseEndFrame, then waits for the specified delay
|
||||||
|
before releasing all queued frames at once.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *, delay_seconds: float = 1.0, **kwargs) -> None:
|
||||||
|
"""Initialize the DelayProcessor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
delay_seconds: Number of seconds to delay before releasing queued frames (default: 1.0)
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self._delay_seconds = delay_seconds
|
||||||
|
self._queued_frames = []
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
|
||||||
|
"""Process frames, queuing LLM text frames until response is complete.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
frame: The frame to process
|
||||||
|
direction: Direction of the frame in the pipeline
|
||||||
|
"""
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, LLMTextFrame):
|
||||||
|
# Queue LLM text frames instead of pushing them immediately
|
||||||
|
logger.debug(f"Queuing LLMTextFrame: {frame.text}")
|
||||||
|
self._queued_frames.append((frame, direction))
|
||||||
|
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||||
|
# When we see the end frame, wait for delay then push all queued frames
|
||||||
|
logger.debug(
|
||||||
|
f"LLM response complete, delaying {self._delay_seconds} seconds before releasing {len(self._queued_frames)} queued frames"
|
||||||
|
)
|
||||||
|
await asyncio.sleep(self._delay_seconds)
|
||||||
|
|
||||||
|
# Push all queued LLM text frames
|
||||||
|
for queued_frame, queued_direction in self._queued_frames:
|
||||||
|
logger.debug(f"Releasing queued LLMTextFrame: {queued_frame.text}")
|
||||||
|
await self.push_frame(queued_frame, queued_direction)
|
||||||
|
|
||||||
|
# Clear the queue
|
||||||
|
self._queued_frames.clear()
|
||||||
|
|
||||||
|
# Push the end frame
|
||||||
|
logger.debug("Pushing LLMFullResponseEndFrame")
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
else:
|
||||||
|
# Push all other frames immediately
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
|
||||||
|
|
||||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||||
# instantiated. The function will be called when the desired transport gets
|
# instantiated. The function will be called when the desired transport gets
|
||||||
# selected.
|
# selected.
|
||||||
@@ -70,12 +128,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
|||||||
context = OpenAILLMContext(messages)
|
context = OpenAILLMContext(messages)
|
||||||
context_aggregator = llm.create_context_aggregator(context)
|
context_aggregator = llm.create_context_aggregator(context)
|
||||||
|
|
||||||
|
# Create delay processor to add 1-second delay before agent responses
|
||||||
|
delay_processor = DelayProcessor(delay_seconds=1.0)
|
||||||
|
|
||||||
pipeline = Pipeline(
|
pipeline = Pipeline(
|
||||||
[
|
[
|
||||||
transport.input(), # Transport user input
|
transport.input(), # Transport user input
|
||||||
stt,
|
stt,
|
||||||
context_aggregator.user(), # User responses
|
context_aggregator.user(), # User responses
|
||||||
llm, # LLM
|
llm, # LLM
|
||||||
|
delay_processor, # Add delay before TTS
|
||||||
tts, # TTS
|
tts, # TTS
|
||||||
transport.output(), # Transport bot output
|
transport.output(), # Transport bot output
|
||||||
context_aggregator.assistant(), # Assistant spoken responses
|
context_aggregator.assistant(), # Assistant spoken responses
|
||||||
|
|||||||
Reference in New Issue
Block a user