Files
pipecat/examples/function-calling/function-calling-moondream-video.py
Mark Backman d3021b4590 Rename example files to prepend parent folder name, preventing package shadowing
Example files like openai.py shadow installed packages when Python adds the
script directory to sys.path. Prepend the parent folder name to each example
file (e.g. openai.py -> function-calling-openai.py). Also split
thinking-and-mcp/ into separate mcp/ and thinking/ directories.
2026-03-31 22:06:01 -04:00

231 lines
8.2 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
TextFrame,
TTSSpeakFrame,
UserImageRequestFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.moondream.vision import MoondreamService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image.
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream. The result_callback will be invoked once the
image is retrieved and processed.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request a user image frame. In this case, we don't want the requested
# image to be added to the context because we will process it with
# Moondream. Also associate it to the function call. Pass the result_callback
# so it can be invoked when the image is actually retrieved.
await params.llm.push_frame(
UserImageRequestFrame(
user_id=user_id,
text=question,
append_to_context=False,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
result_callback=params.result_callback,
),
FrameDirection.UPSTREAM,
)
class MoondreamTextFrameWrapper(FrameProcessor):
"""Wraps Moondream-provided TextFrames with LLM response start/end frames.
This processor detects TextFrames and automatically wraps them with
LLMFullResponseStartFrame and LLMFullResponseEndFrame to provide proper
response boundaries for downstream processors.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we receive a TextFrame, wrap it with response start/end frames
if isinstance(frame, TextFrame):
await self.push_frame(LLMFullResponseStartFrame(), direction)
await self.push_frame(frame, direction)
await self.push_frame(LLMFullResponseEndFrame(), direction)
else:
# For all other frames, just pass them through
await self.push_frame(frame, direction)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way. You are able to describe images from the user camera.",
),
)
llm.register_function("fetch_user_image", fetch_user_image)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"user_id": {
"type": "string",
"description": "The ID of the user to grab the image from",
},
"question": {
"type": "string",
"description": "The question that the user is asking about the image",
},
},
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[fetch_image_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
# Wrap TextFrames with LLM response start/end frames, which makes Moondream
# output be treated like LLM responses for the purpose of context
# aggregation. Without this, the assistant context aggregator would ignore
# Moondream output (if the TTS service is disabled).
moondream_text_wrapper = MoondreamTextFrameWrapper()
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator, # User responses
ParallelPipeline(
[llm], # LLM
[moondream, moondream_text_wrapper],
),
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
await maybe_capture_participant_camera(transport, client)
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
context.add_message(
{
"role": "developer",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()