Compare commits
3 Commits
hush/realt
...
hush/inter
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdf0652141 | ||
|
|
237c400f2d | ||
|
|
b6afce2a92 |
@@ -5,16 +5,27 @@
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
Frame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
TranscriptionFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
@@ -49,6 +60,65 @@ transport_params = {
|
||||
}
|
||||
|
||||
|
||||
class TranscriptionLogger(FrameProcessor):
|
||||
"""Custom processor that logs transcription frames."""
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Only log TranscriptionFrame objects
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
logger.info(f"[TRANSCRIPTION]: {frame.text}")
|
||||
|
||||
# Always pass the frame through to maintain pipeline flow
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class InterventionProcessor(FrameProcessor):
|
||||
"""Custom processor that logs LLM response frames."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._timer_task = None
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Log LLM response start frames
|
||||
if isinstance(frame, LLMFullResponseStartFrame):
|
||||
logger.info(f"[LLM_START]: Starting LLM response")
|
||||
|
||||
# Cancel any existing timer
|
||||
if self._timer_task and not self._timer_task.done():
|
||||
self._timer_task.cancel()
|
||||
|
||||
# Start a new 500ms timer
|
||||
self._timer_task = asyncio.create_task(self._log_after_delay())
|
||||
|
||||
# Cancel timer if bot started speaking before 500ms
|
||||
elif isinstance(frame, BotStartedSpeakingFrame):
|
||||
logger.info(f"[BOT_SPEAKING]: Bot started speaking, canceling intervention timer")
|
||||
if self._timer_task and not self._timer_task.done():
|
||||
self._timer_task.cancel()
|
||||
|
||||
# Log LLM text frames
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
logger.info(f"[LLM_TEXT]: {frame.text}")
|
||||
|
||||
# Always pass the frame through to maintain pipeline flow
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _log_after_delay(self):
|
||||
"""Log a message after 500ms delay."""
|
||||
try:
|
||||
await asyncio.sleep(0.5) # 500ms
|
||||
logger.info(f"500ms passed since LLMFullResponseStartFrame")
|
||||
await self.queue_frame(TTSSpeakFrame("um..."))
|
||||
except asyncio.CancelledError:
|
||||
# Timer was cancelled, which is fine
|
||||
pass
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
@@ -71,13 +141,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Create transcription logger instance
|
||||
transcription_logger = TranscriptionLogger()
|
||||
|
||||
# Create LLM logger instance
|
||||
intervention = InterventionProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
transcription_logger, # Log transcription frames
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
intervention, # Log LLM response frames
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user