# # Copyright (c) 2024, Daily # # SPDX-License-Identifier: BSD 2-Clause License # """Gemini Bot Implementation. This module implements a chatbot using Google's Gemini Multimodal Live model. It includes: - Real-time audio/video interaction through Daily - Animated robot avatar - Speech-to-speech model The bot runs as part of a pipeline that processes audio/video frames and manages the conversation flow using Gemini's streaming capabilities. """ import asyncio import os import sys import aiohttp from dotenv import load_dotenv from loguru import logger from PIL import Image from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, EndFrame, Frame, OutputImageRawFrame, SpriteFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import ( RTVIBotTranscriptionProcessor, RTVIMetricsProcessor, RTVISpeakingProcessor, RTVIUserTranscriptionProcessor, ) from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport load_dotenv(override=True) logger.remove(0) logger.add(sys.stderr, level="DEBUG") sprites = [] script_dir = os.path.dirname(__file__) for i in range(1, 26): # Build the full path to the image file full_path = os.path.join(script_dir, f"assets/robot0{i}.png") # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) # Create a smooth animation by adding reversed frames flipped = sprites[::-1] sprites.extend(flipped) # Define static and animated states quiet_frame = sprites[0] # Static frame for when bot is listening talking_frame = SpriteFrame(images=sprites) # Animation sequence for when bot is talking class TalkingAnimation(FrameProcessor): """Manages the bot's visual animation states. Switches between static (listening) and animated (talking) states based on the bot's current speaking status. """ def __init__(self): super().__init__() self._is_talking = False async def process_frame(self, frame: Frame, direction: FrameDirection): """Process incoming frames and update animation state. Args: frame: The incoming frame to process direction: The direction of frame flow in the pipeline """ await super().process_frame(frame, direction) # Switch to talking animation when bot starts speaking if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True # Return to static frame when bot stops speaking elif isinstance(frame, BotStoppedSpeakingFrame): await self.push_frame(quiet_frame) self._is_talking = False await self.push_frame(frame, direction) async def main(): """Main bot execution function. Sets up and runs the bot pipeline including: - Daily video transport with specific audio parameters - Gemini Live multimodal model integration - Voice activity detection - Animation processing - RTVI event handling """ async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) # Set up Daily transport with specific audio/video parameters for Gemini transport = DailyTransport( room_url, token, "Chatbot", DailyParams( audio_in_sample_rate=16000, audio_out_sample_rate=24000, audio_out_enabled=True, camera_out_enabled=True, camera_out_width=1024, camera_out_height=576, vad_enabled=True, vad_audio_passthrough=True, vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), ), ) # Initialize the Gemini Multimodal Live model llm = GeminiMultimodalLiveLLMService( api_key=os.getenv("GEMINI_API_KEY"), voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck transcribe_user_audio=True, transcribe_model_audio=True, ) messages = [ { "role": "user", "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.", }, ] # Set up conversation context and management # The context_aggregator will automatically collect conversation context context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) ta = TalkingAnimation() # # RTVI events for Pipecat client UI # # This will send `user-*-speaking` and `bot-*-speaking` messages. rtvi_speaking = RTVISpeakingProcessor() # This will emit UserTranscript events. rtvi_user_transcription = RTVIUserTranscriptionProcessor() # This will emit BotTranscript events. rtvi_bot_transcription = RTVIBotTranscriptionProcessor() # This will send `metrics` messages. rtvi_metrics = RTVIMetricsProcessor() pipeline = Pipeline( [ transport.input(), context_aggregator.user(), llm, rtvi_speaking, rtvi_user_transcription, rtvi_bot_transcription, ta, rtvi_metrics, transport.output(), context_aggregator.assistant(), ] ) task = PipelineTask( pipeline, PipelineParams( allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True, ), ) await task.queue_frame(quiet_frame) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) await task.queue_frames([context_aggregator.user().get_context_frame()]) @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): print(f"Participant left: {participant}") await task.queue_frame(EndFrame()) runner = PipelineRunner() await runner.run(task) if __name__ == "__main__": asyncio.run(main())