intervention processor
This commit is contained in:
@@ -5,16 +5,27 @@
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
Frame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
TranscriptionFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
@@ -26,6 +37,14 @@ from pipecat.transports.services.daily import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Create VAD parameters optimized for quiet speakers
|
||||
quiet_speaker_vad_params = VADParams(
|
||||
confidence=0.4, # Lower confidence threshold (default: 0.7)
|
||||
min_volume=0.3, # Lower volume threshold (default: 0.6)
|
||||
start_secs=0.1, # Faster response to speech start (default: 0.2)
|
||||
stop_secs=1.0, # Longer wait before stopping (default: 0.8)
|
||||
)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
@@ -34,21 +53,80 @@ transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_analyzer=SileroVADAnalyzer(params=quiet_speaker_vad_params),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_analyzer=SileroVADAnalyzer(params=quiet_speaker_vad_params),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_analyzer=SileroVADAnalyzer(params=quiet_speaker_vad_params),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class TranscriptionLogger(FrameProcessor):
|
||||
"""Custom processor that logs transcription frames."""
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Only log TranscriptionFrame objects
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
logger.info(f"[TRANSCRIPTION]: {frame.text}")
|
||||
|
||||
# Always pass the frame through to maintain pipeline flow
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class InterventionProcessor(FrameProcessor):
|
||||
"""Custom processor that logs LLM response frames."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._timer_task = None
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Log LLM response start frames
|
||||
if isinstance(frame, LLMFullResponseStartFrame):
|
||||
logger.info(f"[LLM_START]: Starting LLM response")
|
||||
|
||||
# Cancel any existing timer
|
||||
if self._timer_task and not self._timer_task.done():
|
||||
self._timer_task.cancel()
|
||||
|
||||
# Start a new 500ms timer
|
||||
self._timer_task = asyncio.create_task(self._log_after_delay())
|
||||
|
||||
# Cancel timer if bot started speaking before 500ms
|
||||
elif isinstance(frame, BotStartedSpeakingFrame):
|
||||
logger.info(f"[BOT_SPEAKING]: Bot started speaking, canceling intervention timer")
|
||||
if self._timer_task and not self._timer_task.done():
|
||||
self._timer_task.cancel()
|
||||
|
||||
# Log LLM text frames
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
logger.info(f"[LLM_TEXT]: {frame.text}")
|
||||
|
||||
# Always pass the frame through to maintain pipeline flow
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _log_after_delay(self):
|
||||
"""Log a message after 500ms delay."""
|
||||
try:
|
||||
await asyncio.sleep(0.5) # 500ms
|
||||
logger.info(f"500ms passed since LLMFullResponseStartFrame")
|
||||
await self.queue_frame(TTSSpeakFrame("um..."))
|
||||
except asyncio.CancelledError:
|
||||
# Timer was cancelled, which is fine
|
||||
pass
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
@@ -71,13 +149,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Create transcription logger instance
|
||||
transcription_logger = TranscriptionLogger()
|
||||
|
||||
# Create LLM logger instance
|
||||
intervention = InterventionProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
transcription_logger, # Log transcription frames
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
intervention, # Log LLM response frames
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user