The parameter video_in_enabled=True was missing in DailyParams, which prevented image capture from working. Without this parameter, UserImageRequestFrame would be sent but no actual image data would be captured from participants. This fix enables the "Let me take a look" functionality to work as intended by allowing the transport to capture video frames for vision processing with Moondream.
214 lines
7.2 KiB
Python
214 lines
7.2 KiB
Python
#
|
||
# Copyright (c) 2024–2025, Daily
|
||
#
|
||
# SPDX-License-Identifier: BSD 2-Clause License
|
||
#
|
||
|
||
import asyncio
|
||
import os
|
||
import sys
|
||
|
||
import aiohttp
|
||
from dotenv import load_dotenv
|
||
from loguru import logger
|
||
from PIL import Image
|
||
from runner import configure
|
||
|
||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||
from pipecat.frames.frames import (
|
||
BotStartedSpeakingFrame,
|
||
BotStoppedSpeakingFrame,
|
||
Frame,
|
||
ImageRawFrame,
|
||
OutputImageRawFrame,
|
||
SpriteFrame,
|
||
TextFrame,
|
||
UserImageRequestFrame,
|
||
)
|
||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||
from pipecat.pipeline.pipeline import Pipeline
|
||
from pipecat.pipeline.runner import PipelineRunner
|
||
from pipecat.pipeline.task import PipelineTask
|
||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||
from pipecat.processors.aggregators.sentence import SentenceAggregator
|
||
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
|
||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||
from pipecat.services.moondream.vision import MoondreamService
|
||
from pipecat.services.openai.llm import OpenAILLMService
|
||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||
|
||
load_dotenv(override=True)
|
||
|
||
logger.remove(0)
|
||
logger.add(sys.stderr, level="DEBUG")
|
||
|
||
user_request_answer = "Let me take a look."
|
||
|
||
sprites = []
|
||
|
||
script_dir = os.path.dirname(__file__)
|
||
|
||
for i in range(1, 26):
|
||
# Build the full path to the image file
|
||
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
|
||
# Get the filename without the extension to use as the dictionary key
|
||
# Open the image and convert it to bytes
|
||
with Image.open(full_path) as img:
|
||
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
|
||
|
||
flipped = sprites[::-1]
|
||
sprites.extend(flipped)
|
||
|
||
# When the bot isn't talking, show a static image of the cat listening
|
||
quiet_frame = sprites[0]
|
||
talking_frame = SpriteFrame(images=sprites)
|
||
|
||
|
||
class TalkingAnimation(FrameProcessor):
|
||
"""This class starts a talking animation when it receives an first BotStartedSpeakingFrame,
|
||
and then returns to a "quiet" sprite when it sees a BotStoppedSpeakingFrame.
|
||
"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._is_talking = False
|
||
|
||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||
await super().process_frame(frame, direction)
|
||
|
||
if isinstance(frame, BotStartedSpeakingFrame):
|
||
if not self._is_talking:
|
||
await self.push_frame(talking_frame)
|
||
self._is_talking = True
|
||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||
await self.push_frame(quiet_frame)
|
||
self._is_talking = False
|
||
|
||
await self.push_frame(frame, direction)
|
||
|
||
|
||
class UserImageRequester(FrameProcessor):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.participant_id = None
|
||
|
||
def set_participant_id(self, participant_id: str):
|
||
self.participant_id = participant_id
|
||
|
||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||
await super().process_frame(frame, direction)
|
||
|
||
if self.participant_id and isinstance(frame, TextFrame):
|
||
if frame.text == user_request_answer:
|
||
await self.push_frame(
|
||
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
|
||
)
|
||
await self.push_frame(TextFrame("Describe the image in a short sentence."))
|
||
else:
|
||
await self.push_frame(frame, direction)
|
||
|
||
|
||
class TextFilterProcessor(FrameProcessor):
|
||
def __init__(self, text: str):
|
||
super().__init__()
|
||
self.text = text
|
||
|
||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||
await super().process_frame(frame, direction)
|
||
|
||
if isinstance(frame, TextFrame):
|
||
if frame.text != self.text:
|
||
await self.push_frame(frame)
|
||
else:
|
||
await self.push_frame(frame, direction)
|
||
|
||
|
||
class ImageFilterProcessor(FrameProcessor):
|
||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||
await super().process_frame(frame, direction)
|
||
|
||
if not isinstance(frame, ImageRawFrame):
|
||
await self.push_frame(frame, direction)
|
||
|
||
|
||
async def main():
|
||
async with aiohttp.ClientSession() as session:
|
||
(room_url, token) = await configure(session)
|
||
|
||
transport = DailyTransport(
|
||
room_url,
|
||
token,
|
||
"Chatbot",
|
||
DailyParams(
|
||
audio_in_enabled=True,
|
||
audio_out_enabled=True,
|
||
video_in_enabled=True,
|
||
video_out_enabled=True,
|
||
video_out_width=1024,
|
||
video_out_height=576,
|
||
transcription_enabled=True,
|
||
vad_analyzer=SileroVADAnalyzer(),
|
||
),
|
||
)
|
||
|
||
tts = CartesiaTTSService(
|
||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||
)
|
||
|
||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||
|
||
ta = TalkingAnimation()
|
||
|
||
sa = SentenceAggregator()
|
||
ir = UserImageRequester()
|
||
va = VisionImageFrameAggregator()
|
||
|
||
# If you run into weird description, try with use_cpu=True
|
||
moondream = MoondreamService()
|
||
|
||
tf = TextFilterProcessor(user_request_answer)
|
||
imgf = ImageFilterProcessor()
|
||
|
||
messages = [
|
||
{
|
||
"role": "system",
|
||
"content": f"You are Chatbot, a friendly, helpful robot. Let the user know that you are capable of chatting or describing what you see. Your goal is to demonstrate your capabilities in a succinct way. Reply with only '{user_request_answer}' if the user asks you to describe what you see. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
|
||
},
|
||
]
|
||
|
||
context = OpenAILLMContext(messages)
|
||
context_aggregator = llm.create_context_aggregator(context)
|
||
|
||
pipeline = Pipeline(
|
||
[
|
||
transport.input(),
|
||
context_aggregator.user(),
|
||
llm,
|
||
ParallelPipeline([sa, ir, va, moondream], [tf, imgf]),
|
||
tts,
|
||
ta,
|
||
transport.output(),
|
||
context_aggregator.assistant(),
|
||
]
|
||
)
|
||
|
||
task = PipelineTask(pipeline)
|
||
await task.queue_frame(quiet_frame)
|
||
|
||
@transport.event_handler("on_first_participant_joined")
|
||
async def on_first_participant_joined(transport, participant):
|
||
await transport.capture_participant_transcription(participant["id"])
|
||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||
ir.set_participant_id(participant["id"])
|
||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||
|
||
runner = PipelineRunner()
|
||
|
||
await runner.run(task)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|