Compare commits

...

3 Commits

Author SHA1 Message Date
James Hush
4721c07712 Just start with the user idle timer 2025-05-02 14:16:34 +08:00
James Hush
1e4f04330a Use StartConversationFrame 2025-04-30 14:27:08 +08:00
James Hush
3b0d4aa87b demo: say a default message after 4 seconds of being idle or after user stops speaking once 2025-04-30 12:24:02 +08:00

View File

@@ -6,16 +6,30 @@
import argparse
import os
from dataclasses import dataclass
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.frames.frames import (
BotSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
Frame,
LLMMessagesFrame,
StartInterruptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -27,6 +41,84 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000
# Create direction arrow
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
if isinstance(frame, StartInterruptionFrame):
logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStoppedSpeakingFrame):
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
@dataclass
class StartConversationFrame(Frame):
"""Frame to initiate a conversation.
This frame is used to signal the start of a conversation in the pipeline.
It can be used to trigger specific actions or responses from the system.
"""
pass
class ConversationStarterProcessor(FrameProcessor):
def __init__(self, message: str = "Hi! I'm a default message!"):
super().__init__()
self.message = message
self._user_stopped_speaking_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Say a default message when the user starts speaking.
This processor listens for the UserStartedSpeakingFrame and sends a default message
when the user starts speaking for the first time.
Args:
frame: The frame to process
direction: Direction of the frame flow
"""
await super().process_frame(frame, direction)
if isinstance(frame, (StartConversationFrame, UserStoppedSpeakingFrame)):
self._user_stopped_speaking_count += 1
logger.info(
f"++ {frame.name} User stopped speaking, count: {self._user_stopped_speaking_count}"
)
if self._user_stopped_speaking_count == 1:
# First time user started speaking, send the message
await self.push_frame(TTSSpeakFrame(self.message))
else:
await self.push_frame(frame)
else:
# Pass through other frames
await self.push_frame(frame)
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
@@ -59,15 +151,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
context_aggregator = llm.create_context_aggregator(context)
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
logger.info(f"User idle, timeout : {user_idle._timeout} retry count: {retry_count}")
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
# First attempt: Trigger the conversation starter
await user_idle.push_frame(StartConversationFrame())
return True
elif retry_count == 2:
# Second attempt: More direct prompt
@@ -84,16 +171,19 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
await user_idle.push_frame(EndFrame(), FrameDirection.UPSTREAM)
return False
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=4.0)
conversation_starter = ConversationStarterProcessor(message="This is a default message.")
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_idle, # Idle user check-in
conversation_starter,
context_aggregator.user(),
llm, # LLM
tts, # TTS
@@ -108,15 +198,15 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
observers=[DebugObserver()],
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
# Start the user idle timer
await task.queue_frames([BotSpeakingFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):