don't tie UserImageRawFrame with function calls

This commit is contained in:
Aleix Conchillo Flaqué
2025-10-30 12:42:47 -07:00
parent 74fb6e7676
commit ec95618b94
13 changed files with 126 additions and 180 deletions

View File

@@ -9,10 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `VisionImageRawFrame`. This is an input image frame with an associated
text. It is usually processed by vision services (e.g. Moondream). The text
guides the vision service on how to analyze the image.
- Added support for including images or audio to LLM context messages using
`LLMContext.create_image_message()` or `LLMContext.create_image_url_message()`
(not all LLMs support URLs) and `LLMContext.create_audio_message()`. For
@@ -168,9 +164,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated `MoondreamService` to process `VisionImageRawFrame`.
- `UserImageRawFrame` new fields `add_to_context` and `text`. The
`add_to_context` field indicates if this image and text should be added to the
LLM context (by the LLM assistant aggregator). The `text` field, if set, might
also guide the LLM or the vision service on how to analyze the image.
- `VisionService` expects `VisionImageRawFrame` in order to analyze images.
- `UserImageRequestFrame` new fiels `add_to_context` and `text`. Both fields
will be used to set the same fields on the captured `UserImageRawFrame`.
- `UserImageRequestFrame` don't require function call name and ID anymore.
- Updated `MoondreamService` to process `UserImageRawFrame`.
- `VisionService` expects `UserImageRawFrame` in order to analyze images.
- `DailyTransport` triggers `on_error` event if transcription can't be started
or stopped.
@@ -196,6 +202,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- `LLMService.request_image_frame()` is deprecated, push a
`UserImageRequestFrame` instead.
- `UserResponseAggregator` is deprecated and will be removed in a future version.
- The `send_transcription_frames` argument to `OpenAIRealtimeLLMService` is

View File

@@ -15,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -42,21 +43,18 @@ async def fetch_user_image(params: FunctionCallParams):
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream associated to this request. When the
UserImageRawFrame reaches the LLM assistant aggregator, the image will be
added to the context.
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the user image frame. Note that this image is associated to a
# function call and will be handled by the LLM assistant aggregators.
await params.llm.request_image_frame(
user_id=user_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
await params.result_callback(None)

View File

@@ -15,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -42,21 +43,18 @@ async def fetch_user_image(params: FunctionCallParams):
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream associated to this request. When the
UserImageRawFrame reaches the LLM assistant aggregator, the image will be
added to the context.
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the user image frame. Note that this image is associated to a
# function call and will be handled by the LLM assistant aggregators.
await params.llm.request_image_frame(
user_id=user_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
await params.result_callback(None)

View File

@@ -15,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -42,21 +43,18 @@ async def fetch_user_image(params: FunctionCallParams):
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream associated to this request. When the
UserImageRawFrame reaches the LLM assistant aggregator, the image will be
added to the context.
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the user image frame. Note that this image is associated to a
# function call and will be handled by the LLM assistant aggregators.
await params.llm.request_image_frame(
user_id=user_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
await params.result_callback(None)

View File

@@ -15,20 +15,14 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMRunFrame,
UserImageRawFrame,
UserImageRequestFrame,
VisionImageRawFrame,
)
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -57,40 +51,17 @@ async def fetch_user_image(params: FunctionCallParams):
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the user image frame frame. In this case we don't use
# `llm.request_image_frame()` because we don't want the LLM to analyze it.
# Request a user image frame. In this case, we don't want the requested
# image to be added to the context because we will process it with
# Moondream.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, context=question), FrameDirection.UPSTREAM
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=False),
FrameDirection.UPSTREAM,
)
await params.result_callback(None)
class UserImageProcessor(FrameProcessor):
"""Converts incoming user images into vision frames.
This processor handles the UserImageRawFrame from the transport, converts it
to a VisionImageRawFrame and pushes it downstream so it can be handled by a
vision service.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
frame = VisionImageRawFrame(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -152,9 +123,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# This will get the get the user image frame and push it to the LLM.
image_processor = UserImageProcessor()
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
@@ -165,7 +133,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context_aggregator.user(), # User responses
ParallelPipeline(
[llm], # LLM
[image_processor, moondream],
[moondream],
),
tts, # TTS
transport.output(), # Transport bot output

View File

@@ -16,12 +16,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -43,21 +44,18 @@ async def fetch_user_image(params: FunctionCallParams):
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream associated to this request. When the
UserImageRawFrame reaches the LLM assistant aggregator, the image will be
added to the context.
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the user image frame. Note that this image is associated to a
# function call and will be handled by the LLM assistant aggregators.
await params.llm.request_image_frame(
user_id=user_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
await params.result_callback(None)

View File

@@ -11,7 +11,6 @@ including data frames, system frames, and control frames for audio, video, text,
and LLM processing.
"""
import asyncio
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
@@ -1202,27 +1201,23 @@ class TransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
class UserImageRequestFrame(SystemFrame):
"""Frame requesting an image from a specific user.
A frame to request an image from the given user. The frame might be
generated by a function call in which case the corresponding fields will be
properly set.
A frame to request an image from the given user. The request might come with
a text that can be later used to describe the requested image.
Parameters:
user_id: Identifier of the user to request image from.
context: Optional context for the image request.
function_name: Name of function that generated this request (if any).
tool_call_id: Tool call ID if generated by function call.
text: An optional text associated to the image request.
add_to_context: Whether the requested image should be added to an LLM context.
video_source: Specific video source to capture from.
"""
user_id: str
context: Optional[Any] = None
function_name: Optional[str] = None
tool_call_id: Optional[str] = None
text: Optional[str] = None
add_to_context: Optional[bool] = None
video_source: Optional[str] = None
request_event: Optional[asyncio.Event] = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, video_source: {self.video_source}, function: {self.function_name}, request: {self.tool_call_id})"
return f"{self.name}(user: {self.user_id}, text: {self.text}, add_to_context: {self.add_to_context}, {self.video_source})"
@dataclass
@@ -1296,33 +1291,17 @@ class UserImageRawFrame(InputImageRawFrame):
Parameters:
user_id: Identifier of the user who provided this image.
request: The original image request frame if this is a response.
text: An optional text associated to this image.
add_to_context: Whether this image should be added to an LLM context.
"""
user_id: str = ""
request: Optional[UserImageRequestFrame] = None
text: Optional[str] = None
add_to_context: Optional[bool] = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
@dataclass
class VisionImageRawFrame(InputImageRawFrame):
"""Raw image input frame to be analyzed by vision services.
This is just an image with an associated text describing how the vision
service should analyze the image.
Parameters:
text: Description of how the vision service should analyze the image.
"""
text: str
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text})"
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, add_to_context: {self.add_to_context})"
@dataclass

View File

@@ -616,7 +616,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self._handle_function_call_result(frame)
elif isinstance(frame, FunctionCallCancelFrame):
await self._handle_function_call_cancel(frame)
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
elif isinstance(frame, UserImageRawFrame):
await self._handle_user_image_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_aggregation()
@@ -767,30 +767,21 @@ class LLMAssistantAggregator(LLMContextAggregator):
message["content"] = result
async def _handle_user_image_frame(self, frame: UserImageRawFrame):
logger.debug(
f"{self} UserImageRawFrame: [{frame.request.function_name}:{frame.request.tool_call_id}]"
)
if frame.request.tool_call_id not in self._function_calls_in_progress:
logger.warning(
f"UserImageRawFrame tool_call_id [{frame.request.tool_call_id}] is not running"
)
if not frame.add_to_context:
return
logger.debug(f"{self} Adding UserImageRawFrame to LLM context (size: {frame.size})")
self._context.add_image_frame_message(
format=frame.format,
size=frame.size,
image=frame.image,
text=frame.request.context,
text=frame.text,
)
await self.push_aggregation()
await self.push_context_frame(FrameDirection.UPSTREAM)
# Notify who ever requested the image that we have added it to the context.
if frame.request and frame.request.request_event:
frame.request.request_event.set()
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
self._started += 1

View File

@@ -503,6 +503,9 @@ class LLMService(AIService):
the image. If you expect the image to be processed by a vision service,
you might want to push a UserImageRequestFrame upstream directly.
.. deprecated:: 0.0.92
This method is deprecated, push a `UserImageRequestFrame` instead.
Args:
user_id: The ID of the user to request an image from.
function_name: Optional function name associated with the request.
@@ -512,24 +515,19 @@ class LLMService(AIService):
timeout: Optional timeout for the requested image to be added to the LLM context.
"""
request_event = asyncio.Event() if timeout else None
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Method `request_image_frame()` is deprecated, push a `UserImageRequestFrame` instead.",
DeprecationWarning,
)
await self.push_frame(
UserImageRequestFrame(
user_id=user_id,
function_name=function_name,
tool_call_id=tool_call_id,
context=text_content,
video_source=video_source,
request_event=request_event,
),
UserImageRequestFrame(user_id=user_id, text=text_content),
FrameDirection.UPSTREAM,
)
# Wait for the requested image to be added to the context.
if request_event:
await asyncio.wait_for(request_event.wait(), timeout=timeout)
async def _create_sequential_runner_task(self):
if not self._sequential_runner_task:
self._sequential_runner_queue = asyncio.Queue()

View File

@@ -16,7 +16,12 @@ from typing import AsyncGenerator, Optional
from loguru import logger
from PIL import Image
from pipecat.frames.frames import ErrorFrame, Frame, TextFrame, VisionImageRawFrame
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TextFrame,
UserImageRawFrame,
)
from pipecat.services.vision_service import VisionService
try:
@@ -94,11 +99,11 @@ class MoondreamService(VisionService):
logger.debug("Loaded Moondream model")
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
"""Analyze an image and generate a description.
Args:
frame: The vision image frame to process.
frame: The image frame to process.
Yields:
Frame: TextFrame containing the generated image description, or ErrorFrame

View File

@@ -14,7 +14,7 @@ visual content.
from abc import abstractmethod
from typing import AsyncGenerator
from pipecat.frames.frames import Frame, VisionImageRawFrame
from pipecat.frames.frames import Frame, UserImageRawFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
@@ -37,7 +37,7 @@ class VisionService(AIService):
self._describe_text = None
@abstractmethod
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
"""Process the given vision image and generate results.
This method must be implemented by subclasses to provide actual computer
@@ -45,7 +45,7 @@ class VisionService(AIService):
visual question answering.
Args:
frame: The vision image frame to process.
frame: The image frame to process.
Yields:
Frame: Frames containing the vision analysis results, typically TextFrame
@@ -56,7 +56,7 @@ class VisionService(AIService):
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames, handling vision image frames for analysis.
Automatically processes VisionImageRawFrame objects by calling run_vision
Automatically processes UserImageRawFrame objects by calling run_vision
and handles metrics tracking. Other frames are passed through unchanged.
Args:
@@ -65,7 +65,7 @@ class VisionService(AIService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, VisionImageRawFrame):
if isinstance(frame, UserImageRawFrame) and frame.text:
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics()

View File

@@ -1839,10 +1839,11 @@ class DailyInputTransport(BaseInputTransport):
if render_frame:
frame = UserImageRawFrame(
user_id=participant_id,
request=request_frame,
image=video_frame.buffer,
size=(video_frame.width, video_frame.height),
format=video_frame.color_format,
text=request_frame.text if request_frame else None,
add_to_context=request_frame.add_to_context if request_frame else None,
)
frame.transport_source = video_source
await self.push_video_frame(frame)

View File

@@ -15,7 +15,7 @@ import asyncio
import fractions
import time
from collections import deque
from typing import Any, Awaitable, Callable, Optional
from typing import Any, Awaitable, Callable, List, Optional
import numpy as np
from loguru import logger
@@ -567,7 +567,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
self._receive_audio_task = None
self._receive_video_task = None
self._receive_screen_video_task = None
self._image_requests = {}
self._image_requests: List[UserImageRequestFrame] = []
# Whether we have seen a StartFrame already.
self._initialized = False
@@ -657,23 +657,27 @@ class SmallWebRTCInputTransport(BaseInputTransport):
if video_frame:
await self.push_video_frame(video_frame)
# Check if there are any pending image requests and create UserImageRawFrame
if self._image_requests:
for req_id, request_frame in list(self._image_requests.items()):
if request_frame.video_source == video_source:
# Create UserImageRawFrame using the current video frame
image_frame = UserImageRawFrame(
user_id=request_frame.user_id,
request=request_frame,
image=video_frame.image,
size=video_frame.size,
format=video_frame.format,
)
image_frame.transport_source = video_source
# Push the frame to the pipeline
await self.push_video_frame(image_frame)
# Remove from pending requests
del self._image_requests[req_id]
# Check if there are any pending image requests and create
# UserImageRawFrame. Use a shallow copy so we can remove
# elements.
for request_frame in self._image_requests[:]:
if request_frame.video_source == video_source:
# Create UserImageRawFrame using the current video frame
image_frame = UserImageRawFrame(
user_id=request_frame.user_id,
image=video_frame.image,
size=video_frame.size,
format=video_frame.format,
text=request_frame.text if request_frame else None,
add_to_context=request_frame.add_to_context
if request_frame
else None,
)
image_frame.transport_source = video_source
# Push the frame to the pipeline
await self.push_video_frame(image_frame)
# Remove from pending requests
self._image_requests.remove(request_frame)
except Exception as e:
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
@@ -701,8 +705,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
logger.debug(f"Requesting image from participant: {frame.user_id}")
# Store the request
request_id = f"{frame.function_name}:{frame.tool_call_id}"
self._image_requests[request_id] = frame
self._image_requests.append(frame)
# Default to camera if no source specified
if frame.video_source is None: