Compare commits
3 Commits
hush/usage
...
hush/inter
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdf0652141 | ||
|
|
237c400f2d | ||
|
|
b6afce2a92 |
@@ -5,16 +5,27 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||||
|
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||||
|
from pipecat.frames.frames import (
|
||||||
|
BotStartedSpeakingFrame,
|
||||||
|
Frame,
|
||||||
|
LLMFullResponseStartFrame,
|
||||||
|
LLMTextFrame,
|
||||||
|
TranscriptionFrame,
|
||||||
|
TTSSpeakFrame,
|
||||||
|
)
|
||||||
from pipecat.pipeline.pipeline import Pipeline
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
from pipecat.pipeline.runner import PipelineRunner
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||||
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||||
from pipecat.runner.types import RunnerArguments
|
from pipecat.runner.types import RunnerArguments
|
||||||
from pipecat.runner.utils import create_transport
|
from pipecat.runner.utils import create_transport
|
||||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||||
@@ -49,6 +60,65 @@ transport_params = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TranscriptionLogger(FrameProcessor):
|
||||||
|
"""Custom processor that logs transcription frames."""
|
||||||
|
|
||||||
|
async def process_frame(self, frame, direction):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
# Only log TranscriptionFrame objects
|
||||||
|
if isinstance(frame, TranscriptionFrame):
|
||||||
|
logger.info(f"[TRANSCRIPTION]: {frame.text}")
|
||||||
|
|
||||||
|
# Always pass the frame through to maintain pipeline flow
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
|
||||||
|
|
||||||
|
class InterventionProcessor(FrameProcessor):
|
||||||
|
"""Custom processor that logs LLM response frames."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self._timer_task = None
|
||||||
|
|
||||||
|
async def process_frame(self, frame, direction):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
# Log LLM response start frames
|
||||||
|
if isinstance(frame, LLMFullResponseStartFrame):
|
||||||
|
logger.info(f"[LLM_START]: Starting LLM response")
|
||||||
|
|
||||||
|
# Cancel any existing timer
|
||||||
|
if self._timer_task and not self._timer_task.done():
|
||||||
|
self._timer_task.cancel()
|
||||||
|
|
||||||
|
# Start a new 500ms timer
|
||||||
|
self._timer_task = asyncio.create_task(self._log_after_delay())
|
||||||
|
|
||||||
|
# Cancel timer if bot started speaking before 500ms
|
||||||
|
elif isinstance(frame, BotStartedSpeakingFrame):
|
||||||
|
logger.info(f"[BOT_SPEAKING]: Bot started speaking, canceling intervention timer")
|
||||||
|
if self._timer_task and not self._timer_task.done():
|
||||||
|
self._timer_task.cancel()
|
||||||
|
|
||||||
|
# Log LLM text frames
|
||||||
|
elif isinstance(frame, LLMTextFrame):
|
||||||
|
logger.info(f"[LLM_TEXT]: {frame.text}")
|
||||||
|
|
||||||
|
# Always pass the frame through to maintain pipeline flow
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
|
||||||
|
async def _log_after_delay(self):
|
||||||
|
"""Log a message after 500ms delay."""
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(0.5) # 500ms
|
||||||
|
logger.info(f"500ms passed since LLMFullResponseStartFrame")
|
||||||
|
await self.queue_frame(TTSSpeakFrame("um..."))
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# Timer was cancelled, which is fine
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||||
logger.info(f"Starting bot")
|
logger.info(f"Starting bot")
|
||||||
|
|
||||||
@@ -71,13 +141,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
|||||||
context = OpenAILLMContext(messages)
|
context = OpenAILLMContext(messages)
|
||||||
context_aggregator = llm.create_context_aggregator(context)
|
context_aggregator = llm.create_context_aggregator(context)
|
||||||
|
|
||||||
|
# Create transcription logger instance
|
||||||
|
transcription_logger = TranscriptionLogger()
|
||||||
|
|
||||||
|
# Create LLM logger instance
|
||||||
|
intervention = InterventionProcessor()
|
||||||
|
|
||||||
pipeline = Pipeline(
|
pipeline = Pipeline(
|
||||||
[
|
[
|
||||||
transport.input(), # Transport user input
|
transport.input(), # Transport user input
|
||||||
stt,
|
stt,
|
||||||
|
transcription_logger, # Log transcription frames
|
||||||
context_aggregator.user(), # User responses
|
context_aggregator.user(), # User responses
|
||||||
llm, # LLM
|
llm, # LLM
|
||||||
tts, # TTS
|
tts, # TTS
|
||||||
|
intervention, # Log LLM response frames
|
||||||
transport.output(), # Transport bot output
|
transport.output(), # Transport bot output
|
||||||
context_aggregator.assistant(), # Assistant spoken responses
|
context_aggregator.assistant(), # Assistant spoken responses
|
||||||
]
|
]
|
||||||
|
|||||||
Reference in New Issue
Block a user