From f3a4b416dff75167671b0c002049e9bf9a3872e2 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Fri, 5 Sep 2025 12:28:28 -0400 Subject: [PATCH] Remove `VisionImageRawFrame`, which was previously being handled directly by the LLM services, and deprecate the associated `VisionImageFrameAggregator`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removing `VisionImageRawFrame` lets us simplify LLM services' logic, getting us closer to the idealized architecture where all they care about is handling context frames. This change is in service of getting us closer to ready to deprecate usage of `OpenAILLMContext` and subclasses in favor of the universal `LLMContext`, at least for the traditional text-to-text LLMs. Why remove `VisionImageRawFrame` rather than deprecate? It's "internal"—only created by `VisionImageFrameAggregator`—and never intended to be used directly by users (it would be difficult to use directly anyway). Move the logic that was once in `VisionImageFrameAggregator` directly into the examples. Reasoning: - If `UserImageRequester` is defined in the examples, it makes sense for `UserImageProcessor` to be too, as it’s the flip side of the same coin, so to speak - The logic is now pretty trivial - This kind of one-shot, history-less image-describing pipeline shouldn't be common at all; it's ok for it to live in examples rather than as a dedicated class - In the short term, this enables us to create `LLMContext`s for services that support it and `OpenAILLMContext`s for services that don't yet (AWS) This commit also adds missing translation from OpenAI-format image context messages to AWS format. Note that this isn't a wasted effort in the face of the upcoming migration to universal `LLMContext`—this work will be reused as it has to be implemented there too. --- CHANGELOG.md | 10 + examples/foundational/12-describe-video.py | 46 ++++- .../12a-describe-video-gemini-flash.py | 46 ++++- .../foundational/12b-describe-video-gpt-4o.py | 46 ++++- .../12c-describe-video-anthropic.py | 46 ++++- .../foundational/12d-describe-video-aws.py | 186 ++++++++++++++++++ src/pipecat/frames/frames.py | 17 -- .../aggregators/vision_image_frame.py | 26 ++- src/pipecat/services/anthropic/llm.py | 23 --- src/pipecat/services/aws/llm.py | 52 ++--- src/pipecat/services/google/llm.py | 10 - src/pipecat/services/moondream/vision.py | 58 ++++-- src/pipecat/services/openai/base_llm.py | 15 +- src/pipecat/services/vision_service.py | 13 +- 14 files changed, 455 insertions(+), 139 deletions(-) create mode 100644 examples/foundational/12d-describe-video-aws.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 586c464d8..b15f132c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed a `OpenAIImageGenService` issue where it was not creating `URLImageRawFrame` correctly. +### Removed + +- Remove `VisionImageRawFrame` in favor of context frames (`LLMContextFrame` or + `OpenAILLMContextFrame`). + +### Deprecated + +- Deprecate `VisionImageFrameAggregator` because `VisionImageRawFrame` has been + removed. See the `12*` examples for the new recommended replacement pattern. + ## [0.0.83] - 2025-09-03 ### Added diff --git a/examples/foundational/12-describe-video.py b/examples/foundational/12-describe-video.py index d221aea21..aeae2b946 100644 --- a/examples/foundational/12-describe-video.py +++ b/examples/foundational/12-describe-video.py @@ -11,12 +11,19 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + TextFrame, + TTSSpeakFrame, + UserImageRawFrame, + UserImageRequestFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( @@ -34,6 +41,8 @@ load_dotenv(override=True) class UserImageRequester(FrameProcessor): + """Converts incoming text into requests for user images.""" + def __init__(self, participant_id: Optional[str] = None): super().__init__() self._participant_id = participant_id @@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor): if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( - UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM + UserImageRequestFrame(self._participant_id, context=frame.text), + FrameDirection.UPSTREAM, ) - await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) + + +class UserImageProcessor(FrameProcessor): + """Converts incoming user images into context frames.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, UserImageRawFrame): + if frame.request and frame.request.context: + context = LLMContext() + context.add_image_frame_message( + image=frame.image, + text=frame.request.context, + size=frame.size, + format=frame.format, + ) + frame = LLMContextFrame(context) + await self.push_frame(frame) + else: + await self.push_frame(frame, direction) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Initialize the image requester without setting the participant ID yet image_requester = UserImageRequester() - vision_aggregator = VisionImageFrameAggregator() + image_processor = UserImageProcessor() # If you run into weird description, try with use_cpu=True moondream = MoondreamService() @@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt, user_response, image_requester, - vision_aggregator, + image_processor, moondream, tts, transport.output(), @@ -119,7 +151,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): image_requester.set_participant_id(client_id) # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see.")) + await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12a-describe-video-gemini-flash.py b/examples/foundational/12a-describe-video-gemini-flash.py index 0ad511e27..77997649e 100644 --- a/examples/foundational/12a-describe-video-gemini-flash.py +++ b/examples/foundational/12a-describe-video-gemini-flash.py @@ -11,12 +11,19 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + TextFrame, + TTSSpeakFrame, + UserImageRawFrame, + UserImageRequestFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( @@ -34,6 +41,8 @@ load_dotenv(override=True) class UserImageRequester(FrameProcessor): + """Converts incoming text into requests for user images.""" + def __init__(self, participant_id: Optional[str] = None): super().__init__() self._participant_id = participant_id @@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor): if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( - UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM + UserImageRequestFrame(self._participant_id, context=frame.text), + FrameDirection.UPSTREAM, ) - await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) + + +class UserImageProcessor(FrameProcessor): + """Converts incoming user images into context frames.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, UserImageRawFrame): + if frame.request and frame.request.context: + context = LLMContext() + context.add_image_frame_message( + image=frame.image, + text=frame.request.context, + size=frame.size, + format=frame.format, + ) + frame = LLMContextFrame(context) + await self.push_frame(frame) + else: + await self.push_frame(frame, direction) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Initialize the image requester without setting the participant ID yet image_requester = UserImageRequester() - vision_aggregator = VisionImageFrameAggregator() + image_processor = UserImageProcessor() stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) @@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt, user_response, image_requester, - vision_aggregator, + image_processor, google, tts, transport.output(), @@ -123,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): image_requester.set_participant_id(client_id) # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see.")) + await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12b-describe-video-gpt-4o.py b/examples/foundational/12b-describe-video-gpt-4o.py index 0451a0341..39c600df0 100644 --- a/examples/foundational/12b-describe-video-gpt-4o.py +++ b/examples/foundational/12b-describe-video-gpt-4o.py @@ -11,12 +11,19 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + TextFrame, + TTSSpeakFrame, + UserImageRawFrame, + UserImageRequestFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( @@ -34,6 +41,8 @@ load_dotenv(override=True) class UserImageRequester(FrameProcessor): + """Converts incoming text into requests for user images.""" + def __init__(self, participant_id: Optional[str] = None): super().__init__() self._participant_id = participant_id @@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor): if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( - UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM + UserImageRequestFrame(self._participant_id, context=frame.text), + FrameDirection.UPSTREAM, ) - await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) + + +class UserImageProcessor(FrameProcessor): + """Converts incoming user images into context frames.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, UserImageRawFrame): + if frame.request and frame.request.context: + context = LLMContext() + context.add_image_frame_message( + image=frame.image, + text=frame.request.context, + size=frame.size, + format=frame.format, + ) + frame = LLMContextFrame(context) + await self.push_frame(frame) + else: + await self.push_frame(frame, direction) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Initialize the image requester without setting the participant ID yet image_requester = UserImageRequester() - vision_aggregator = VisionImageFrameAggregator() + image_processor = UserImageProcessor() stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) @@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt, user_response, image_requester, - vision_aggregator, + image_processor, openai, tts, transport.output(), @@ -123,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): image_requester.set_participant_id(client_id) # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see.")) + await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12c-describe-video-anthropic.py b/examples/foundational/12c-describe-video-anthropic.py index 54ebc20bf..236fcf68a 100644 --- a/examples/foundational/12c-describe-video-anthropic.py +++ b/examples/foundational/12c-describe-video-anthropic.py @@ -11,12 +11,19 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + TextFrame, + TTSSpeakFrame, + UserImageRawFrame, + UserImageRequestFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( @@ -34,6 +41,8 @@ load_dotenv(override=True) class UserImageRequester(FrameProcessor): + """Converts incoming text into requests for user images.""" + def __init__(self, participant_id: Optional[str] = None): super().__init__() self._participant_id = participant_id @@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor): if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( - UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM + UserImageRequestFrame(self._participant_id, context=frame.text), + FrameDirection.UPSTREAM, ) - await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) + + +class UserImageProcessor(FrameProcessor): + """Converts incoming user images into context frames.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, UserImageRawFrame): + if frame.request and frame.request.context: + context = LLMContext() + context.add_image_frame_message( + image=frame.image, + text=frame.request.context, + size=frame.size, + format=frame.format, + ) + frame = LLMContextFrame(context) + await self.push_frame(frame) + else: + await self.push_frame(frame, direction) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Initialize the image requester without setting the participant ID yet image_requester = UserImageRequester() - vision_aggregator = VisionImageFrameAggregator() + image_processor = UserImageProcessor() stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) @@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt, user_response, image_requester, - vision_aggregator, + image_processor, anthropic, tts, transport.output(), @@ -123,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): image_requester.set_participant_id(client_id) # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see.")) + await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12d-describe-video-aws.py b/examples/foundational/12d-describe-video-aws.py new file mode 100644 index 000000000..7c0535169 --- /dev/null +++ b/examples/foundational/12d-describe-video-aws.py @@ -0,0 +1,186 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os +from typing import Optional + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + TextFrame, + TTSSpeakFrame, + UserImageRawFrame, + UserImageRequestFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) +from pipecat.processors.aggregators.user_response import UserResponseAggregator +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import ( + create_transport, + get_transport_client_id, + maybe_capture_participant_camera, +) +from pipecat.services.aws.llm import AWSBedrockLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +class UserImageRequester(FrameProcessor): + """Converts incoming text into requests for user images.""" + + def __init__(self, participant_id: Optional[str] = None): + super().__init__() + self._participant_id = participant_id + + def set_participant_id(self, participant_id: str): + self._participant_id = participant_id + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if self._participant_id and isinstance(frame, TextFrame): + await self.push_frame( + UserImageRequestFrame(self._participant_id, context=frame.text), + FrameDirection.UPSTREAM, + ) + else: + await self.push_frame(frame, direction) + + +class UserImageProcessor(FrameProcessor): + """Converts incoming user images into context frames.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, UserImageRawFrame): + if frame.request and frame.request.context: + # Note: AWS Bedrock does not yet support the universal LLMContext + context = OpenAILLMContext() + context.add_image_frame_message( + image=frame.image, + text=frame.request.context, + size=frame.size, + format=frame.format, + ) + frame = OpenAILLMContextFrame(context) + await self.push_frame(frame) + else: + await self.push_frame(frame, direction) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + user_response = UserResponseAggregator() + + # Initialize the image requester without setting the participant ID yet + image_requester = UserImageRequester() + + image_processor = UserImageProcessor() + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + # AWS for vision analysis + aws = AWSBedrockLLMService( + aws_region="us-west-2", + model="us.anthropic.claude-3-7-sonnet-20250219-v1:0", + params=AWSBedrockLLMService.InputParams(temperature=0.8), + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + user_response, + image_requester, + image_processor, + aws, + tts, + transport.output(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + + await maybe_capture_participant_camera(transport, client) + + # Set the participant ID in the image requester + client_id = get_transport_client_id(transport, client) + image_requester.set_participant_id(client_id) + + # Welcome message + await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4b380c845..83488f2a4 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1253,23 +1253,6 @@ class UserImageRawFrame(InputImageRawFrame): return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})" -@dataclass -class VisionImageRawFrame(InputImageRawFrame): - """Image frame for vision/image analysis with associated text prompt. - - An image with an associated text to ask for a description of it. - - Parameters: - text: Optional text prompt describing what to analyze in the image. - """ - - text: Optional[str] = None - - def __str__(self): - pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})" - - @dataclass class InputDTMFFrame(DTMFFrame, SystemFrame): """DTMF keypress input frame from transport.""" diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index de4d74186..ca87dd0c3 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -10,13 +10,22 @@ This module provides frame aggregation functionality to combine text and image frames into vision frames for multimodal processing. """ -from pipecat.frames.frames import Frame, InputImageRawFrame, TextFrame, VisionImageRawFrame +from pipecat.frames.frames import Frame, InputImageRawFrame, TextFrame +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class VisionImageFrameAggregator(FrameProcessor): """Aggregates consecutive text and image frames into vision frames. + .. deprecated:: 0.84.0 + VisionImageRawFrame has been removed in favor of context frames + (LLMContextFrame or OpenAILLMContextFrame), so this aggregator is not + needed anymore. See the 12* examples for the new recommended pattern. + This aggregator waits for a consecutive TextFrame and an InputImageRawFrame. After the InputImageRawFrame arrives it will output a VisionImageRawFrame combining both the text and image data for multimodal processing. @@ -28,6 +37,17 @@ class VisionImageFrameAggregator(FrameProcessor): The aggregator starts with no cached text, waiting for the first TextFrame to arrive before it can create vision frames. """ + import warnings + + warnings.warn( + "VisionImageFrameAggregator is deprecated. " + "VisionImageRawFrame has been removed in favor of context frames " + "(LLMContextFrame or OpenAILLMContextFrame), so this aggregator is " + "not needed anymore. See the 12* examples for the new recommended " + "pattern.", + DeprecationWarning, + stacklevel=2, + ) super().__init__() self._describe_text = None @@ -47,12 +67,14 @@ class VisionImageFrameAggregator(FrameProcessor): self._describe_text = frame.text elif isinstance(frame, InputImageRawFrame): if self._describe_text: - frame = VisionImageRawFrame( + context = OpenAILLMContext() + context.add_image_frame_message( text=self._describe_text, image=frame.image, size=frame.size, format=frame.format, ) + frame = OpenAILLMContextFrame(context) await self.push_frame(frame) self._describe_text = None else: diff --git a/src/pipecat/services/anthropic/llm.py b/src/pipecat/services/anthropic/llm.py index 4ec477dac..b5328be50 100644 --- a/src/pipecat/services/anthropic/llm.py +++ b/src/pipecat/services/anthropic/llm.py @@ -42,7 +42,6 @@ from pipecat.frames.frames import ( LLMTextFrame, LLMUpdateSettingsFrame, UserImageRawFrame, - VisionImageRawFrame, ) from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_context import LLMContext @@ -495,12 +494,6 @@ class AnthropicLLMService(LLMService): context = frame.context elif isinstance(frame, LLMMessagesFrame): context = AnthropicLLMContext.from_messages(frame.messages) - elif isinstance(frame, VisionImageRawFrame): - # This is only useful in very simple pipelines because it creates - # a new context. Generally we want a context manager to catch - # UserImageRawFrames coming through the pipeline and add them - # to the context. - context = AnthropicLLMContext.from_image_frame(frame) elif isinstance(frame, LLMUpdateSettingsFrame): await self._update_settings(frame.settings) elif isinstance(frame, LLMEnablePromptCachingFrame): @@ -626,22 +619,6 @@ class AnthropicLLMContext(OpenAILLMContext): self._restructure_from_openai_messages() return self - @classmethod - def from_image_frame(cls, frame: VisionImageRawFrame) -> "AnthropicLLMContext": - """Create context from a vision image frame. - - Args: - frame: The vision image frame to process. - - Returns: - New Anthropic context with the image message. - """ - context = cls() - context.add_image_frame_message( - format=frame.format, size=frame.size, image=frame.image, text=frame.text - ) - return context - def set_messages(self, messages: List): """Set the messages list and reset cache tracking. diff --git a/src/pipecat/services/aws/llm.py b/src/pipecat/services/aws/llm.py index 32d1789bc..068d18149 100644 --- a/src/pipecat/services/aws/llm.py +++ b/src/pipecat/services/aws/llm.py @@ -39,7 +39,6 @@ from pipecat.frames.frames import ( LLMTextFrame, LLMUpdateSettingsFrame, UserImageRawFrame, - VisionImageRawFrame, ) from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_context import LLMContext @@ -180,22 +179,6 @@ class AWSBedrockLLMContext(OpenAILLMContext): self._restructure_from_openai_messages() return self - @classmethod - def from_image_frame(cls, frame: VisionImageRawFrame) -> "AWSBedrockLLMContext": - """Create AWS Bedrock context from vision image frame. - - Args: - frame: The vision image frame to convert. - - Returns: - New AWS Bedrock LLM context instance. - """ - context = cls() - context.add_image_frame_message( - format=frame.format, size=frame.size, image=frame.image, text=frame.text - ) - return context - def set_messages(self, messages: List): """Set the messages list and restructure for Bedrock format. @@ -399,9 +382,34 @@ class AWSBedrockLLMContext(OpenAILLMContext): elif isinstance(content, list): new_content = [] for item in content: + # fix empty text if item.get("type", "") == "text": text_content = item["text"] if item["text"] != "" else "(empty)" new_content.append({"text": text_content}) + # handle image_url -> image conversion + if item["type"] == "image_url": + print(f"[pk] Converting image_url item: {item}") + new_item = { + "image": { + "format": "jpeg", + "source": { + "bytes": base64.b64decode(item["image_url"]["url"].split(",")[1]) + }, + } + } + new_content.append(new_item) + # In the case where there's a single image in the list (like what + # would result from a UserImageRawFrame), ensure that the image + # comes before text + image_indices = [i for i, item in enumerate(new_content) if "image" in item] + text_indices = [i for i, item in enumerate(new_content) if "text" in item] + if len(image_indices) == 1 and text_indices: + img_idx = image_indices[0] + first_txt_idx = text_indices[0] + if img_idx > first_txt_idx: + # Move image before the first text + image_item = new_content.pop(img_idx) + new_content.insert(first_txt_idx, image_item) return {"role": message["role"], "content": new_content} return message @@ -967,7 +975,9 @@ class AWSBedrockLLMService(LLMService): } # Add system message - request_params["system"] = context.system + system = getattr(context, "system", None) + if system: + request_params["system"] = system # Check if messages contain tool use or tool result content blocks has_tool_content = False @@ -1120,12 +1130,6 @@ class AWSBedrockLLMService(LLMService): raise NotImplementedError("Universal LLMContext is not yet supported for AWS Bedrock.") elif isinstance(frame, LLMMessagesFrame): context = AWSBedrockLLMContext.from_messages(frame.messages) - elif isinstance(frame, VisionImageRawFrame): - # This is only useful in very simple pipelines because it creates - # a new context. Generally we want a context manager to catch - # UserImageRawFrames coming through the pipeline and add them - # to the context. - context = AWSBedrockLLMContext.from_image_frame(frame) elif isinstance(frame, LLMUpdateSettingsFrame): await self._update_settings(frame.settings) else: diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index 63066c45a..70d4ca2bf 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -36,7 +36,6 @@ from pipecat.frames.frames import ( LLMTextFrame, LLMUpdateSettingsFrame, UserImageRawFrame, - VisionImageRawFrame, ) from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_context import LLMContext @@ -1013,15 +1012,6 @@ class GoogleLLMService(LLMService): # NOTE: LLMMessagesFrame is deprecated, so we don't support the newer universal # LLMContext with it context = GoogleLLMContext(frame.messages) - elif isinstance(frame, VisionImageRawFrame): - # This is only useful in very simple pipelines because it creates - # a new context. Generally we want a context manager to catch - # UserImageRawFrames coming through the pipeline and add them - # to the context. - context = GoogleLLMContext() - context.add_image_frame_message( - format=frame.format, size=frame.size, image=frame.image, text=frame.text - ) elif isinstance(frame, LLMUpdateSettingsFrame): await self._update_settings(frame.settings) else: diff --git a/src/pipecat/services/moondream/vision.py b/src/pipecat/services/moondream/vision.py index a6239342f..bd01daf34 100644 --- a/src/pipecat/services/moondream/vision.py +++ b/src/pipecat/services/moondream/vision.py @@ -11,17 +11,20 @@ for image analysis and description generation. """ import asyncio -from typing import AsyncGenerator +import base64 +from io import BytesIO +from typing import AsyncGenerator, Optional from loguru import logger from PIL import Image -from pipecat.frames.frames import ErrorFrame, Frame, TextFrame, VisionImageRawFrame +from pipecat.frames.frames import ErrorFrame, Frame, TextFrame +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.services.vision_service import VisionService try: import torch - from transformers import AutoModelForCausalLM, AutoTokenizer + from transformers import AutoModelForCausalLM except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Moondream, you need to `pip install pipecat-ai[moondream]`.") @@ -94,11 +97,11 @@ class MoondreamService(VisionService): logger.debug("Loaded Moondream model") - async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: + async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]: """Analyze an image and generate a description. Args: - frame: Vision frame containing the image data and optional question text. + context: The context to process, containing image data. Yields: Frame: TextFrame containing the generated image description, or ErrorFrame @@ -109,22 +112,45 @@ class MoondreamService(VisionService): yield ErrorFrame("Moondream model not available") return - logger.debug(f"Analyzing image: {frame}") + image_bytes = None + text = None + try: + messages = context.get_messages() + last_message = messages[-1] + last_message_content = last_message.get("content") - def get_image_description(frame: VisionImageRawFrame): - """Generate description for the given image frame. + for item in last_message_content: + if isinstance(item, dict): + if ( + "image_url" in item + and isinstance(item["image_url"], dict) + and item["image_url"].get("url") + ): + image_bytes = base64.b64decode(item["image_url"]["url"].split(",")[1]) + elif "text" in item and isinstance(item["text"], str): + text = item["text"] - Args: - frame: Vision frame containing image data and question. + except Exception as e: + logger.error(f"Exception during image extraction: {e}") + yield ErrorFrame("Failed to extract image from context") + return - Returns: - str: Generated description of the image. - """ - image = Image.frombytes(frame.format, frame.size, frame.image) + if not image_bytes: + logger.error("No image found in context") + yield ErrorFrame("No image found in context") + return + + logger.debug( + f"Analyzing image (bytes length: {len(image_bytes) if image_bytes else 'None'})" + ) + + def get_image_description(bytes: bytes, text: Optional[str]) -> str: + image_buffer = BytesIO(bytes) + image = Image.open(image_buffer) image_embeds = self._model.encode_image(image) - description = self._model.query(image_embeds, frame.text)["answer"] + description = self._model.query(image_embeds, text)["answer"] return description - description = await asyncio.to_thread(get_image_description, frame) + description = await asyncio.to_thread(get_image_description, image_bytes, text) yield TextFrame(text=description) diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index b9b216441..6f6376c02 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -32,7 +32,6 @@ from pipecat.frames.frames import ( LLMMessagesFrame, LLMTextFrame, LLMUpdateSettingsFrame, - VisionImageRawFrame, ) from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_context import LLMContext @@ -418,8 +417,8 @@ class BaseOpenAILLMService(LLMService): """Process frames for LLM completion requests. Handles OpenAILLMContextFrame, LLMContextFrame, LLMMessagesFrame, - VisionImageRawFrame, and LLMUpdateSettingsFrame to trigger LLM - completions and manage settings. + and LLMUpdateSettingsFrame to trigger LLM completions and manage + settings. Args: frame: The frame to process. @@ -438,16 +437,6 @@ class BaseOpenAILLMService(LLMService): # NOTE: LLMMessagesFrame is deprecated, so we don't support the newer universal # LLMContext with it context = OpenAILLMContext.from_messages(frame.messages) - elif isinstance(frame, VisionImageRawFrame): - # This is only useful in very simple pipelines because it creates - # a new context. Generally we want a context manager to catch - # UserImageRawFrames coming through the pipeline and add them - # to the context. - # TODO: support the newer universal LLMContext with a VisionImageRawFrame equivalent? - context = OpenAILLMContext() - context.add_image_frame_message( - format=frame.format, size=frame.size, image=frame.image, text=frame.text - ) elif isinstance(frame, LLMUpdateSettingsFrame): await self._update_settings(frame.settings) else: diff --git a/src/pipecat/services/vision_service.py b/src/pipecat/services/vision_service.py index e245e9a01..0eeee98cd 100644 --- a/src/pipecat/services/vision_service.py +++ b/src/pipecat/services/vision_service.py @@ -14,7 +14,8 @@ visual content. from abc import abstractmethod from typing import AsyncGenerator -from pipecat.frames.frames import Frame, VisionImageRawFrame +from pipecat.frames.frames import Frame, LLMContextFrame +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService @@ -37,15 +38,15 @@ class VisionService(AIService): self._describe_text = None @abstractmethod - async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: - """Process a vision image frame and generate results. + async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]: + """Process the latest image in the context and generate results. This method must be implemented by subclasses to provide actual computer vision functionality such as image description, object detection, or visual question answering. Args: - frame: The vision image frame to process, containing image data. + context: The context to process, containing image data. Yields: Frame: Frames containing the vision analysis results, typically TextFrame @@ -65,9 +66,9 @@ class VisionService(AIService): """ await super().process_frame(frame, direction) - if isinstance(frame, VisionImageRawFrame): + if isinstance(frame, LLMContextFrame): await self.start_processing_metrics() - await self.process_generator(self.run_vision(frame)) + await self.process_generator(self.run_vision(frame.context)) await self.stop_processing_metrics() else: await self.push_frame(frame, direction)