From 14f309ce2bc05b5d45ccfc12a7d110a73fb98fec Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 10 Dec 2024 22:13:55 -0500 Subject: [PATCH] Add Gemini Live bot file --- examples/simple-chatbot/server/README.md | 31 ++- examples/simple-chatbot/server/bot-gemini.py | 188 ++++++++++++++++++ .../server/{bot.py => bot-openai.py} | 25 ++- examples/simple-chatbot/server/env.example | 4 +- examples/simple-chatbot/server/runner.py | 4 +- examples/simple-chatbot/server/server.py | 43 ++-- 6 files changed, 269 insertions(+), 26 deletions(-) create mode 100644 examples/simple-chatbot/server/bot-gemini.py rename examples/simple-chatbot/server/{bot.py => bot-openai.py} (89%) diff --git a/examples/simple-chatbot/server/README.md b/examples/simple-chatbot/server/README.md index d13425b46..8d5147522 100644 --- a/examples/simple-chatbot/server/README.md +++ b/examples/simple-chatbot/server/README.md @@ -1,11 +1,11 @@ # Simple Chatbot Server -A FastAPI server that manages bot instances and provides endpoints for both Daily Prebuilt and RTVI client connections. +A FastAPI server that manages bot instances and provides endpoints for both Daily Prebuilt and Pipecat client connections. ## Endpoints - `GET /` - Direct browser access, redirects to a Daily Prebuilt room -- `POST /connect` - RTVI client connection endpoint +- `POST /connect` - Pipecat client connection endpoint - `GET /status/{pid}` - Get status of a specific bot process ## Environment Variables @@ -13,14 +13,37 @@ A FastAPI server that manages bot instances and provides endpoints for both Dail Copy `env.example` to `.env` and configure: ```ini +# Required API Keys DAILY_API_KEY= # Your Daily API key +OPENAI_API_KEY= # Your OpenAI API key (required for OpenAI bot) +GEMINI_API_KEY= # Your Gemini API key (required for Gemini bot) +ELEVENLABS_API_KEY= # Your ElevenLabs API key + +# Bot Selection +BOT_IMPLEMENTATION= # Options: 'openai' or 'gemini' + +# Optional Configuration DAILY_API_URL= # Optional: Daily API URL (defaults to https://api.daily.co/v1) -OPENAI_API_KEY= # Your OpenAI API key -CARTESIA_API_KEY= # Your Cartesia API key +DAILY_SAMPLE_ROOM_URL= # Optional: Fixed room URL for development HOST= # Optional: Host address (defaults to 0.0.0.0) FAST_API_PORT= # Optional: Port number (defaults to 7860) ``` +## Available Bots + +The server supports two bot implementations: + +1. **OpenAI Bot** (Default) + + - Uses GPT-4 for conversation + - Requires OPENAI_API_KEY + +2. **Gemini Bot** + - Uses Google's Gemini model + - Requires GEMINI_API_KEY + +Select your preferred bot by setting `BOT_IMPLEMENTATION` in your `.env` file. + ## Running the Server Set up and activate your virtual environment: diff --git a/examples/simple-chatbot/server/bot-gemini.py b/examples/simple-chatbot/server/bot-gemini.py new file mode 100644 index 000000000..a81e213c9 --- /dev/null +++ b/examples/simple-chatbot/server/bot-gemini.py @@ -0,0 +1,188 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from PIL import Image +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + EndFrame, + Frame, + LLMMessagesFrame, + OutputImageRawFrame, + SpriteFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frameworks.rtvi import ( + RTVIBotTranscriptionProcessor, + RTVIMetricsProcessor, + RTVISpeakingProcessor, + RTVIUserTranscriptionProcessor, +) +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +sprites = [] + +script_dir = os.path.dirname(__file__) + +for i in range(1, 26): + # Build the full path to the image file + full_path = os.path.join(script_dir, f"assets/robot0{i}.png") + # Get the filename without the extension to use as the dictionary key + # Open the image and convert it to bytes + with Image.open(full_path) as img: + sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + +flipped = sprites[::-1] +sprites.extend(flipped) + +# When the bot isn't talking, show a static image of the cat listening +quiet_frame = sprites[0] +talking_frame = SpriteFrame(images=sprites) + + +class TalkingAnimation(FrameProcessor): + """This class starts a talking animation when it receives an first AudioFrame. + + It then returns to a "quiet" sprite when it sees a TTSStoppedFrame. + """ + + def __init__(self): + super().__init__() + self._is_talking = False + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, BotStartedSpeakingFrame): + if not self._is_talking: + await self.push_frame(talking_frame) + self._is_talking = True + elif isinstance(frame, BotStoppedSpeakingFrame): + await self.push_frame(quiet_frame) + self._is_talking = False + + await self.push_frame(frame, direction) + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_in_sample_rate=16000, + audio_out_sample_rate=24000, + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=576, + vad_enabled=True, + vad_audio_passthrough=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + ) + + llm = GeminiMultimodalLiveLLMService( + api_key=os.getenv("GEMINI_API_KEY"), + voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck + transcribe_user_audio=True, + transcribe_model_audio=True, + ) + + messages = [ + { + "role": "user", + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + ta = TalkingAnimation() + + # RTVI + + # This will send `user-*-speaking` and `bot-*-speaking` messages. + rtvi_speaking = RTVISpeakingProcessor() + + # This will emit UserTranscript events. + rtvi_user_transcription = RTVIUserTranscriptionProcessor() + + # This will emit BotTranscript events. + rtvi_bot_transcription = RTVIBotTranscriptionProcessor() + + # This will send `metrics` messages. + rtvi_metrics = RTVIMetricsProcessor() + + pipeline = Pipeline( + [ + transport.input(), + context_aggregator.user(), + llm, + rtvi_speaking, + rtvi_user_transcription, + rtvi_bot_transcription, + ta, + rtvi_metrics, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + await task.queue_frame(quiet_frame) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + print(f"Participant left: {participant}") + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/simple-chatbot/server/bot.py b/examples/simple-chatbot/server/bot-openai.py similarity index 89% rename from examples/simple-chatbot/server/bot.py rename to examples/simple-chatbot/server/bot-openai.py index 5bfb4a8fe..4db90c932 100644 --- a/examples/simple-chatbot/server/bot.py +++ b/examples/simple-chatbot/server/bot-openai.py @@ -31,6 +31,8 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import ( RTVIBotTranscriptionProcessor, + RTVIMetricsProcessor, + RTVISpeakingProcessor, RTVIUserTranscriptionProcessor, ) from pipecat.services.elevenlabs import ElevenLabsTTSService @@ -63,9 +65,9 @@ talking_frame = SpriteFrame(images=sprites) class TalkingAnimation(FrameProcessor): - """ - This class starts a talking animation when it receives an first AudioFrame, - and then returns to a "quiet" sprite when it sees a TTSStoppedFrame. + """This class starts a talking animation when it receives an first AudioFrame. + + It then returns to a "quiet" sprite when it sees a TTSStoppedFrame. """ def __init__(self): @@ -149,27 +151,42 @@ async def main(): # RTVI + # This will send `user-*-speaking` and `bot-*-speaking` messages. + rtvi_speaking = RTVISpeakingProcessor() + # This will emit UserTranscript events. rtvi_user_transcription = RTVIUserTranscriptionProcessor() # This will emit BotTranscript events. rtvi_bot_transcription = RTVIBotTranscriptionProcessor() + # This will send `metrics` messages. + rtvi_metrics = RTVIMetricsProcessor() + pipeline = Pipeline( [ transport.input(), + rtvi_speaking, rtvi_user_transcription, context_aggregator.user(), llm, rtvi_bot_transcription, tts, ta, + rtvi_metrics, transport.output(), context_aggregator.assistant(), ] ) - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) await task.queue_frame(quiet_frame) @transport.event_handler("on_first_participant_joined") diff --git a/examples/simple-chatbot/server/env.example b/examples/simple-chatbot/server/env.example index d368ae510..0eab9845a 100644 --- a/examples/simple-chatbot/server/env.example +++ b/examples/simple-chatbot/server/env.example @@ -1,4 +1,6 @@ DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev) DAILY_API_KEY=7df... OPENAI_API_KEY=sk-PL... -ELEVENLABS_API_KEY=aeb... \ No newline at end of file +GEMINI_API_KEY=AIza... +ELEVENLABS_API_KEY=aeb... +BOT_IMPLEMENTATION= # Options: 'openai' or 'gemini' \ No newline at end of file diff --git a/examples/simple-chatbot/server/runner.py b/examples/simple-chatbot/server/runner.py index 3df3ee81f..0c03a96bf 100644 --- a/examples/simple-chatbot/server/runner.py +++ b/examples/simple-chatbot/server/runner.py @@ -4,14 +4,16 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import aiohttp import argparse import os +import aiohttp + from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper async def configure(aiohttp_session: aiohttp.ClientSession): + """Configure the Daily room and Daily REST helper.""" parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") parser.add_argument( "-u", "--url", type=str, required=False, help="URL of the Daily room to join" diff --git a/examples/simple-chatbot/server/server.py b/examples/simple-chatbot/server/server.py index 13d8d7781..7dc6d1f5c 100644 --- a/examples/simple-chatbot/server/server.py +++ b/examples/simple-chatbot/server/server.py @@ -4,8 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -""" -RTVI Bot Server Implementation +"""RTVI Bot Server Implementation. This FastAPI server manages RTVI bot instances and provides endpoints for both direct browser access and RTVI client connections. It handles: @@ -49,8 +48,8 @@ daily_helpers = {} def cleanup(): - """ - Cleanup function to terminate all bot processes. + """Cleanup function to terminate all bot processes. + Called during server shutdown. """ for entry in bot_procs.values(): @@ -59,10 +58,22 @@ def cleanup(): proc.wait() +def get_bot_file(): + bot_implementation = os.getenv("BOT_IMPLEMENTATION", "openai").lower().strip() + # If blank or None, default to openai + if not bot_implementation: + bot_implementation = "openai" + if bot_implementation not in ["openai", "gemini"]: + raise ValueError( + f"Invalid BOT_IMPLEMENTATION: {bot_implementation}. Must be 'openai' or 'gemini'" + ) + return f"bot-{bot_implementation}" + + @asynccontextmanager async def lifespan(app: FastAPI): - """ - FastAPI lifespan manager that handles startup and shutdown tasks: + """FastAPI lifespan manager that handles startup and shutdown tasks. + - Creates aiohttp session - Initializes Daily API helper - Cleans up resources on shutdown @@ -92,8 +103,7 @@ app.add_middleware( async def create_room_and_token() -> tuple[str, str]: - """ - Helper function to create a Daily room and generate an access token. + """Helper function to create a Daily room and generate an access token. Returns: tuple[str, str]: A tuple containing (room_url, token) @@ -114,8 +124,8 @@ async def create_room_and_token() -> tuple[str, str]: @app.get("/") async def start_agent(request: Request): - """ - Endpoint for direct browser access to the bot. + """Endpoint for direct browser access to the bot. + Creates a room, starts a bot instance, and redirects to the Daily room URL. Returns: @@ -137,8 +147,9 @@ async def start_agent(request: Request): # Spawn a new bot process try: + bot_file = get_bot_file() proc = subprocess.Popen( - [f"python3 -m bot -u {room_url} -t {token}"], + [f"python3 -m {bot_file} -u {room_url} -t {token}"], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__)), @@ -152,8 +163,8 @@ async def start_agent(request: Request): @app.post("/connect") async def rtvi_connect(request: Request) -> Dict[Any, Any]: - """ - RTVI connect endpoint that creates a room and returns connection credentials. + """RTVI connect endpoint that creates a room and returns connection credentials. + This endpoint is called by RTVI clients to establish a connection. Returns: @@ -168,8 +179,9 @@ async def rtvi_connect(request: Request) -> Dict[Any, Any]: # Start the bot process try: + bot_file = get_bot_file() proc = subprocess.Popen( - [f"python3 -m bot -u {room_url} -t {token}"], + [f"python3 -m {bot_file} -u {room_url} -t {token}"], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__)), @@ -184,8 +196,7 @@ async def rtvi_connect(request: Request) -> Dict[Any, Any]: @app.get("/status/{pid}") def get_status(pid: int): - """ - Get the status of a specific bot process. + """Get the status of a specific bot process. Args: pid (int): Process ID of the bot