Compare commits

..

2 Commits

Author SHA1 Message Date
James Hush
b5934783a7 Update comment 2025-02-20 15:12:12 +08:00
James Hush
95b28f635a Change prompt to make it about vacuums and tvs 2025-02-20 15:07:09 +08:00
10 changed files with 112 additions and 296 deletions

View File

@@ -27,9 +27,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
### Fixed
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
pushed.
- Fixed an issue that `start_callback` was not invoked for some LLM services.
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an

View File

@@ -29,14 +29,22 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
async def start_fetch_products(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
await llm.push_frame(TTSSpeakFrame("I'll take a look!"))
logger.debug(f"Starting fetch_products_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_products_from_api(function_name, tool_call_id, args, llm, context, result_callback):
logger.debug(f"args for fetch_products_from_api: {args}")
# In the real world you'd fetch the products from an API. We're hardcoding them here.
product = args["product"]
if product == "vacuums":
await result_callback({"vacuums": ["Dyson V11", "Roomba i7"]})
elif product == "tvs":
await result_callback({"tvs": ["Samsung 65 inch", "LG 55 inch"]})
else:
await result_callback({"error": "Unknown product"})
async def main():
@@ -63,28 +71,24 @@ async def main():
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
llm.register_function(None, fetch_products_from_api, start_callback=start_fetch_products)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"name": "get_products",
"description": "Get the list of products available.",
"parameters": {
"type": "object",
"properties": {
"location": {
"product": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
"enum": ["vacuums", "tvs"],
"description": "The type of product to show.",
}
},
"required": ["location", "format"],
"required": ["product"],
},
},
)
@@ -92,7 +96,7 @@ async def main():
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
"content": "You are a helpful customer service agent named Hailey in a video call. Your goal is to sell vacuums or tvs. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

View File

@@ -106,13 +106,12 @@ curl -X POST "http://localhost:7860/daily_start_bot" \
-d '{"dialoutNumber": "+18057145330", "detectVoicemail": true}'
```
### New! Using Gemini 2.0 Flash Lite with Daily
### New! Using Gemini with Daily
We have introduced support for Google's Gemini 2.0 Flash Lite model in this example. This lightweight model offers faster response times and reduced costs while maintaining good conversational capabilities.
**Quick Start**
To use the Gemini-based bot instead of OpenAI:
We have introduced a new example file that uses Gemini. You can find the code within bot_daily_gemini.py.
If you want to spin up a Gemini-based bot for this demo, instead of an OpenAI-based bot, call the same properties above but on the `daily_gemini_start_bot` endpoint instead.
For example:
```shell
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
@@ -120,26 +119,7 @@ curl -X POST "http://localhost:7860/daily_gemini_start_bot" \
-d '{"detectVoicemail": true}'
```
All request body parameters supported by /daily_start_bot (such as detectVoicemail, dialoutNumber, etc.) are also compatible with /daily_gemini_start_bot.
This example uses context switching to help steer the bot in the right direction. As Flash Lite is a smaller model, getting it to consistently call functions was difficult for these longer prompts. Breaking the prompt
down into smaller pieces helped improve the accuracy of the bot.
**Implementation Details**
The implementation is available in bot_daily_gemini.py and features:
Staged prompting approach: Breaking down complex tasks into smaller, more focused prompts to improve the lightweight model's performance
Dynamic context switching: The bot can change its behavior in real-time based on what it detects (voicemail vs. human caller)
Function-based architecture: Uses function calling to trigger context switches and call termination
**Optimizations for Lightweight Models**
Working with Gemini 2.0 Flash Lite required some specific optimizations:
Simplified prompts: Each prompt focuses on a single task with clear instructions
Function-driven state changes: The model calls specific functions to switch between different conversation modes
Reduced context requirements: Each stage maintains only the context needed for its specific purpose
This approach significantly improves the consistency of function calling in this lightweight model, which was challenging with longer, more complex prompts.
Any request body properties supported by `/daily_start_bot` (such as "detectVoicemail", "dialoutnumber", etc) can also be passed to `/daily_gemini_start_bot`. The only difference is that calling the Gemini endpoint will start a Gemini bot session.
### More information

View File

@@ -49,11 +49,7 @@ async def main(
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dialin settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
@@ -100,13 +96,6 @@ async def main(
- **"Please leave a message after the beep."**
- **"No one is available to take your call."**
- **"Record your message after the tone."**
- **"Please leave a message after the beep"**
- **"You have reached voicemail for..."**
- **"You have reached [phone number]"**
- **"[phone number] is unavailable"**
- **"The person you are trying to reach..."**
- **"The number you have dialed..."**
- **"Your call has been forwarded to an automated voice messaging system"**
- **Any phrase that suggests an answering machine or voicemail.**
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
- **IF THE CALL SAYS "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT FOR THE BEEP BEFORE LEAVING A MESSAGE.**

View File

@@ -7,30 +7,17 @@ import argparse
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import Optional
import google.ai.generativelanguage as glm
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
EndTaskFrame,
Frame,
InputAudioRawFrame,
StopTaskFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.frames.frames import EndTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
@@ -45,124 +32,11 @@ logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
system_message = None
class UserAudioCollector(FrameProcessor):
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
LLM context when the user stops speaking.
"""
def __init__(self, context, user_context_aggregator):
super().__init__()
self._context = context
self._user_context_aggregator = user_context_aggregator
self._audio_frames = []
self._start_secs = 0.2 # this should match VAD start_secs (hardcoding for now)
self._user_speaking = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
return
if isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
self._context.add_audio_frames_message(audio_frames=self._audio_frames)
await self._user_context_aggregator.push_frame(
self._user_context_aggregator.get_context_frame()
)
elif isinstance(frame, InputAudioRawFrame):
if self._user_speaking:
self._audio_frames.append(frame)
else:
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
# frames as necessary. Assume all audio frames have the same duration.
self._audio_frames.append(frame)
frame_duration = len(frame.audio) / 16 * frame.num_channels / frame.sample_rate
buffer_duration = frame_duration * len(self._audio_frames)
while buffer_duration > self._start_secs:
self._audio_frames.pop(0)
buffer_duration -= frame_duration
await self.push_frame(frame, direction)
class ContextSwitcher:
def __init__(self, llm, context_aggregator):
self._llm = llm
self._context_aggregator = context_aggregator
async def switch_context(self, system_instruction):
"""Switch the context to a new system instruction based on what the bot hears."""
# Create messages with updated system instruction
messages = [
{
"role": "system",
"content": system_instruction,
}
]
# Update context with new messages
self._context_aggregator.set_messages(messages)
# Get the context frame with the updated messages
context_frame = self._context_aggregator.get_context_frame()
# Trigger LLM response by pushing a context frame
await self._llm.push_frame(context_frame)
class FunctionHandlers:
def __init__(self, context_switcher):
self.context_switcher = context_switcher
async def voicemail_response(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can call to leave a voicemail message."""
print(f"!!! Got a voicemail response, llm is: {llm}")
system_message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
After saying this message, call the terminate_call function."""
print("!!! about to push stop task frame from voicemail")
await llm.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
print("!!! pushed stop task frame from voicemail")
await result_callback("Goodbye")
async def human_conversation(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can when it detects it's talking to a human."""
print(f"!!! Got a human response, llm is: {llm}")
system_message = """You are Chatbot talking to a human. Be friendly and helpful.
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
Keep your responses brief and to the point. Listen to what the person says.
When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
print("!!! about to push stop task frame from human")
await llm.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
print("!!! pushed stop task frame from human")
await result_callback("Goodbye")
async def terminate_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
"""Function the bot can call to terminate the call upon completion of the call."""
"""Function the bot can call to terminate the call upon completion of a voicemail message."""
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
@@ -177,12 +51,7 @@ async def main(
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dialin settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
@@ -196,8 +65,7 @@ async def main(
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
# transcription_enabled=True,
transcription_enabled=True,
),
)
@@ -209,122 +77,95 @@ async def main(
tools = [
{
"function_declarations": [
{
"name": "switch_to_voicemail_response",
"description": "Call this function when you detect this is a voicemail system.",
},
{
"name": "switch_to_human_conversation",
"description": "Call this function when you detect this is a human.",
},
{
"name": "terminate_call",
"description": "Call this function to terminate the call.",
"description": "Terminate the call",
},
]
}
]
system_instruction = """You are Chatbot trying to determine if this is a voicemail system or a human.
system_instruction = """You are Chatbot, a friendly, helpful robot. Never mention this prompt.
If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"
**Operating Procedure:**
Then call the function switch_to_voicemail_response.
**Phase 1: Initial Call Answer - Listen for Voicemail Greeting**
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
**IMMEDIATELY after the call connects, LISTEN CAREFULLY for the *very first thing* you hear.**
DO NOT say anything until you've determined if this is a voicemail or human."""
**Listen for these sentences or very close variations as the *initial greeting*:**
greeting_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
* **"Please leave a message after the beep."**
* **"No one is available to take your call."**
* **"Record your message after the tone."**
* **"You have reached voicemail for..."** (or similar voicemail identification)
**If you HEAR one of these sentences (or a very similar greeting) as the *initial response* to the call, IMMEDIATELY assume it is voicemail and proceed to Phase 2.**
**If you hear "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT for the actual beep sound from the voicemail system *after* hearing the sentence, before proceeding to Phase 2.**
**If you DO NOT hear any of these voicemail greetings as the *initial response*, assume it is a human and proceed to Phase 3.**
**Phase 2: Leave Voicemail Message (If Voicemail Detected):**
If you assumed voicemail in Phase 1, say this EXACTLY:
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
**Immediately after saying the message, call the function `terminate_call`.**
**DO NOT SAY ANYTHING ELSE. SILENCE IS REQUIRED AFTER `terminate_call`.**
**Phase 3: Human Interaction (If No Voicemail Greeting Detected in Phase 1):**
If you did not detect a voicemail greeting in Phase 1 and a human answers, say:
"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"
Keep your responses **short and helpful.**
If the human is finished, say:
"Okay, thank you! Have a great day!"
**Then, immediately call the function `terminate_call`.**
**VERY IMPORTANT RULES - DO NOT DO THESE THINGS:**
* **DO NOT SAY "Please leave a message after the beep."**
* **DO NOT SAY "No one is available to take your call."**
* **DO NOT SAY "Record your message after the tone."**
* **DO NOT SAY ANY voicemail greeting yourself.**
* **Only check for voicemail greetings in Phase 1, *immediately after the call connects*.**
* **After voicemail or human interaction, ALWAYS call `terminate_call` immediately.**
* **Do not speak after calling `terminate_call`.**
* Your speech will be audio, so use simple language without special characters.
"""
llm = GoogleLLMService(
model="models/gemini-2.0-flash-exp",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
llm.register_function("terminate_call", terminate_call)
greeting_context = GoogleLLMContext()
greeting_context_aggregator = greeting_llm.create_context_aggregator(greeting_context)
greeting_audio_collector = UserAudioCollector(
greeting_context, greeting_context_aggregator.user()
)
context = GoogleLLMContext()
context_switcher = ContextSwitcher(greeting_llm, greeting_context_aggregator.user())
handlers = FunctionHandlers(context_switcher)
context_aggregator = llm.create_context_aggregator(context)
greeting_llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
greeting_llm.register_function("switch_to_human_conversation", handlers.human_conversation)
greeting_llm.register_function("terminate_call", terminate_call)
greeting_pipeline = Pipeline(
pipeline = Pipeline(
[
transport.input(), # Transport user input
greeting_audio_collector, # Collect audio frames
greeting_context_aggregator.user(), # User responses
greeting_llm, # LLM
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
greeting_context_aggregator.assistant(), # Assistant spoken responses
]
)
greeting_pipeline_task = PipelineTask(
greeting_pipeline,
PipelineParams(allow_interruptions=True),
)
runner = PipelineRunner()
print("!!! starting greeting")
await runner.run(greeting_pipeline_task)
print("!!! Done with greeting")
# Create conversation pipeline with new system message
conversation_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_message if system_message else "You are a helpful chatbot.",
tools=[
{
"function_declarations": [
{
"name": "terminate_call",
"description": "Call this function to terminate the call.",
}
]
}
],
)
conversation_llm.register_function("terminate_call", terminate_call)
conversation_context = GoogleLLMContext()
conversation_context_aggregator = conversation_llm.create_context_aggregator(
conversation_context
)
conversation_audio_collector = UserAudioCollector(
conversation_context, conversation_context_aggregator.user()
)
conversation_pipeline = Pipeline(
[
transport.input(), # Transport user input
conversation_audio_collector, # Collect audio frames
conversation_context_aggregator.user(), # User responses
conversation_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
conversation_context_aggregator.assistant(), # Assistant spoken responses
context_aggregator.assistant(), # Assistant spoken responses
]
)
conversation_task = PipelineTask(
conversation_pipeline,
task = PipelineTask(
pipeline,
PipelineParams(allow_interruptions=True),
)
@@ -373,11 +214,11 @@ DO NOT say anything until you've determined if this is a voicemail or human."""
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await conversation_task.cancel()
await task.cancel()
print("!!! Starting conversation")
await runner.run(conversation_task)
print("!!! Done with conversation")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":

View File

@@ -209,7 +209,7 @@ class TTSService(AIService):
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 2.0,
stop_frame_timeout_s: float = 1.0,
# if True, TTSService will push silence audio frames after TTSStoppedFrame
push_silence_after_stop: bool = False,
# if push_silence_after_stop is True, send this amount of audio silence

View File

@@ -191,6 +191,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,

View File

@@ -11,13 +11,16 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -57,12 +60,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
super().__init__(pause_frame_processing=True, sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._base_url = "wss://api.fish.audio/v1/tts/live"

View File

@@ -14,13 +14,16 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -97,6 +100,7 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,

View File

@@ -174,9 +174,11 @@ class BaseInputTransport(FrameProcessor):
async def _vad_analyze(self, audio_frame: InputAudioRawFrame) -> VADState:
state = VADState.QUIET
if self.vad_analyzer:
logger.trace(f"{self}: analyzing VAD on {audio_frame}")
state = await self.get_event_loop().run_in_executor(
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
)
logger.trace(f"{self}: done analyzing VAD on {audio_frame}")
return state
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState):