Remove VisionImageRawFrame, which was previously being handled directly by the LLM services, and deprecate the associated VisionImageFrameAggregator.
Removing `VisionImageRawFrame` lets us simplify LLM services' logic, getting us closer to the idealized architecture where all they care about is handling context frames. This change is in service of getting us closer to ready to deprecate usage of `OpenAILLMContext` and subclasses in favor of the universal `LLMContext`, at least for the traditional text-to-text LLMs. Why remove `VisionImageRawFrame` rather than deprecate? It's "internal"—only created by `VisionImageFrameAggregator`—and never intended to be used directly by users (it would be difficult to use directly anyway). Move the logic that was once in `VisionImageFrameAggregator` directly into the examples. Reasoning: - If `UserImageRequester` is defined in the examples, it makes sense for `UserImageProcessor` to be too, as it’s the flip side of the same coin, so to speak - The logic is now pretty trivial - This kind of one-shot, history-less image-describing pipeline shouldn't be common at all; it's ok for it to live in examples rather than as a dedicated class - In the short term, this enables us to create `LLMContext`s for services that support it and `OpenAILLMContext`s for services that don't yet (AWS) This commit also adds missing translation from OpenAI-format image context messages to AWS format. Note that this isn't a wasted effort in the face of the upcoming migration to universal `LLMContext`—this work will be reused as it has to be implemented there too.
This commit is contained in:
10
CHANGELOG.md
10
CHANGELOG.md
@@ -55,6 +55,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Fixed a `OpenAIImageGenService` issue where it was not creating
|
||||
`URLImageRawFrame` correctly.
|
||||
|
||||
### Removed
|
||||
|
||||
- Remove `VisionImageRawFrame` in favor of context frames (`LLMContextFrame` or
|
||||
`OpenAILLMContextFrame`).
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Deprecate `VisionImageFrameAggregator` because `VisionImageRawFrame` has been
|
||||
removed. See the `12*` examples for the new recommended replacement pattern.
|
||||
|
||||
## [0.0.83] - 2025-09-03
|
||||
|
||||
### Added
|
||||
|
||||
@@ -11,12 +11,19 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMContextFrame,
|
||||
TextFrame,
|
||||
TTSSpeakFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.user_response import UserResponseAggregator
|
||||
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import (
|
||||
@@ -34,6 +41,8 @@ load_dotenv(override=True)
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
"""Converts incoming text into requests for user images."""
|
||||
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
@@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor):
|
||||
|
||||
if self._participant_id and isinstance(frame, TextFrame):
|
||||
await self.push_frame(
|
||||
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
|
||||
UserImageRequestFrame(self._participant_id, context=frame.text),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class UserImageProcessor(FrameProcessor):
|
||||
"""Converts incoming user images into context frames."""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRawFrame):
|
||||
if frame.request and frame.request.context:
|
||||
context = LLMContext()
|
||||
context.add_image_frame_message(
|
||||
image=frame.image,
|
||||
text=frame.request.context,
|
||||
size=frame.size,
|
||||
format=frame.format,
|
||||
)
|
||||
frame = LLMContextFrame(context)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
@@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Initialize the image requester without setting the participant ID yet
|
||||
image_requester = UserImageRequester()
|
||||
|
||||
vision_aggregator = VisionImageFrameAggregator()
|
||||
image_processor = UserImageProcessor()
|
||||
|
||||
# If you run into weird description, try with use_cpu=True
|
||||
moondream = MoondreamService()
|
||||
@@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
stt,
|
||||
user_response,
|
||||
image_requester,
|
||||
vision_aggregator,
|
||||
image_processor,
|
||||
moondream,
|
||||
tts,
|
||||
transport.output(),
|
||||
@@ -119,7 +151,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,12 +11,19 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMContextFrame,
|
||||
TextFrame,
|
||||
TTSSpeakFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.user_response import UserResponseAggregator
|
||||
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import (
|
||||
@@ -34,6 +41,8 @@ load_dotenv(override=True)
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
"""Converts incoming text into requests for user images."""
|
||||
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
@@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor):
|
||||
|
||||
if self._participant_id and isinstance(frame, TextFrame):
|
||||
await self.push_frame(
|
||||
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
|
||||
UserImageRequestFrame(self._participant_id, context=frame.text),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class UserImageProcessor(FrameProcessor):
|
||||
"""Converts incoming user images into context frames."""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRawFrame):
|
||||
if frame.request and frame.request.context:
|
||||
context = LLMContext()
|
||||
context.add_image_frame_message(
|
||||
image=frame.image,
|
||||
text=frame.request.context,
|
||||
size=frame.size,
|
||||
format=frame.format,
|
||||
)
|
||||
frame = LLMContextFrame(context)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
@@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Initialize the image requester without setting the participant ID yet
|
||||
image_requester = UserImageRequester()
|
||||
|
||||
vision_aggregator = VisionImageFrameAggregator()
|
||||
image_processor = UserImageProcessor()
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
@@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
stt,
|
||||
user_response,
|
||||
image_requester,
|
||||
vision_aggregator,
|
||||
image_processor,
|
||||
google,
|
||||
tts,
|
||||
transport.output(),
|
||||
@@ -123,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,12 +11,19 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMContextFrame,
|
||||
TextFrame,
|
||||
TTSSpeakFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.user_response import UserResponseAggregator
|
||||
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import (
|
||||
@@ -34,6 +41,8 @@ load_dotenv(override=True)
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
"""Converts incoming text into requests for user images."""
|
||||
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
@@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor):
|
||||
|
||||
if self._participant_id and isinstance(frame, TextFrame):
|
||||
await self.push_frame(
|
||||
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
|
||||
UserImageRequestFrame(self._participant_id, context=frame.text),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class UserImageProcessor(FrameProcessor):
|
||||
"""Converts incoming user images into context frames."""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRawFrame):
|
||||
if frame.request and frame.request.context:
|
||||
context = LLMContext()
|
||||
context.add_image_frame_message(
|
||||
image=frame.image,
|
||||
text=frame.request.context,
|
||||
size=frame.size,
|
||||
format=frame.format,
|
||||
)
|
||||
frame = LLMContextFrame(context)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
@@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Initialize the image requester without setting the participant ID yet
|
||||
image_requester = UserImageRequester()
|
||||
|
||||
vision_aggregator = VisionImageFrameAggregator()
|
||||
image_processor = UserImageProcessor()
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
@@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
stt,
|
||||
user_response,
|
||||
image_requester,
|
||||
vision_aggregator,
|
||||
image_processor,
|
||||
openai,
|
||||
tts,
|
||||
transport.output(),
|
||||
@@ -123,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,12 +11,19 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMContextFrame,
|
||||
TextFrame,
|
||||
TTSSpeakFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.user_response import UserResponseAggregator
|
||||
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import (
|
||||
@@ -34,6 +41,8 @@ load_dotenv(override=True)
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
"""Converts incoming text into requests for user images."""
|
||||
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
@@ -46,9 +55,32 @@ class UserImageRequester(FrameProcessor):
|
||||
|
||||
if self._participant_id and isinstance(frame, TextFrame):
|
||||
await self.push_frame(
|
||||
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
|
||||
UserImageRequestFrame(self._participant_id, context=frame.text),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class UserImageProcessor(FrameProcessor):
|
||||
"""Converts incoming user images into context frames."""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRawFrame):
|
||||
if frame.request and frame.request.context:
|
||||
context = LLMContext()
|
||||
context.add_image_frame_message(
|
||||
image=frame.image,
|
||||
text=frame.request.context,
|
||||
size=frame.size,
|
||||
format=frame.format,
|
||||
)
|
||||
frame = LLMContextFrame(context)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
@@ -78,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Initialize the image requester without setting the participant ID yet
|
||||
image_requester = UserImageRequester()
|
||||
|
||||
vision_aggregator = VisionImageFrameAggregator()
|
||||
image_processor = UserImageProcessor()
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
@@ -96,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
stt,
|
||||
user_response,
|
||||
image_requester,
|
||||
vision_aggregator,
|
||||
image_processor,
|
||||
anthropic,
|
||||
tts,
|
||||
transport.output(),
|
||||
@@ -123,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
186
examples/foundational/12d-describe-video-aws.py
Normal file
186
examples/foundational/12d-describe-video-aws.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
TextFrame,
|
||||
TTSSpeakFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.user_response import UserResponseAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import (
|
||||
create_transport,
|
||||
get_transport_client_id,
|
||||
maybe_capture_participant_camera,
|
||||
)
|
||||
from pipecat.services.aws.llm import AWSBedrockLLMService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
"""Converts incoming text into requests for user images."""
|
||||
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
|
||||
def set_participant_id(self, participant_id: str):
|
||||
self._participant_id = participant_id
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if self._participant_id and isinstance(frame, TextFrame):
|
||||
await self.push_frame(
|
||||
UserImageRequestFrame(self._participant_id, context=frame.text),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class UserImageProcessor(FrameProcessor):
|
||||
"""Converts incoming user images into context frames."""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRawFrame):
|
||||
if frame.request and frame.request.context:
|
||||
# Note: AWS Bedrock does not yet support the universal LLMContext
|
||||
context = OpenAILLMContext()
|
||||
context.add_image_frame_message(
|
||||
image=frame.image,
|
||||
text=frame.request.context,
|
||||
size=frame.size,
|
||||
format=frame.format,
|
||||
)
|
||||
frame = OpenAILLMContextFrame(context)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_in_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_in_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
user_response = UserResponseAggregator()
|
||||
|
||||
# Initialize the image requester without setting the participant ID yet
|
||||
image_requester = UserImageRequester()
|
||||
|
||||
image_processor = UserImageProcessor()
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
# AWS for vision analysis
|
||||
aws = AWSBedrockLLMService(
|
||||
aws_region="us-west-2",
|
||||
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_response,
|
||||
image_requester,
|
||||
image_processor,
|
||||
aws,
|
||||
tts,
|
||||
transport.output(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
|
||||
await maybe_capture_participant_camera(transport, client)
|
||||
|
||||
# Set the participant ID in the image requester
|
||||
client_id = get_transport_client_id(transport, client)
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -1253,23 +1253,6 @@ class UserImageRawFrame(InputImageRawFrame):
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VisionImageRawFrame(InputImageRawFrame):
|
||||
"""Image frame for vision/image analysis with associated text prompt.
|
||||
|
||||
An image with an associated text to ask for a description of it.
|
||||
|
||||
Parameters:
|
||||
text: Optional text prompt describing what to analyze in the image.
|
||||
"""
|
||||
|
||||
text: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputDTMFFrame(DTMFFrame, SystemFrame):
|
||||
"""DTMF keypress input frame from transport."""
|
||||
|
||||
@@ -10,13 +10,22 @@ This module provides frame aggregation functionality to combine text and image
|
||||
frames into vision frames for multimodal processing.
|
||||
"""
|
||||
|
||||
from pipecat.frames.frames import Frame, InputImageRawFrame, TextFrame, VisionImageRawFrame
|
||||
from pipecat.frames.frames import Frame, InputImageRawFrame, TextFrame
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class VisionImageFrameAggregator(FrameProcessor):
|
||||
"""Aggregates consecutive text and image frames into vision frames.
|
||||
|
||||
.. deprecated:: 0.84.0
|
||||
VisionImageRawFrame has been removed in favor of context frames
|
||||
(LLMContextFrame or OpenAILLMContextFrame), so this aggregator is not
|
||||
needed anymore. See the 12* examples for the new recommended pattern.
|
||||
|
||||
This aggregator waits for a consecutive TextFrame and an InputImageRawFrame.
|
||||
After the InputImageRawFrame arrives it will output a VisionImageRawFrame
|
||||
combining both the text and image data for multimodal processing.
|
||||
@@ -28,6 +37,17 @@ class VisionImageFrameAggregator(FrameProcessor):
|
||||
The aggregator starts with no cached text, waiting for the first
|
||||
TextFrame to arrive before it can create vision frames.
|
||||
"""
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"VisionImageFrameAggregator is deprecated. "
|
||||
"VisionImageRawFrame has been removed in favor of context frames "
|
||||
"(LLMContextFrame or OpenAILLMContextFrame), so this aggregator is "
|
||||
"not needed anymore. See the 12* examples for the new recommended "
|
||||
"pattern.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
super().__init__()
|
||||
self._describe_text = None
|
||||
|
||||
@@ -47,12 +67,14 @@ class VisionImageFrameAggregator(FrameProcessor):
|
||||
self._describe_text = frame.text
|
||||
elif isinstance(frame, InputImageRawFrame):
|
||||
if self._describe_text:
|
||||
frame = VisionImageRawFrame(
|
||||
context = OpenAILLMContext()
|
||||
context.add_image_frame_message(
|
||||
text=self._describe_text,
|
||||
image=frame.image,
|
||||
size=frame.size,
|
||||
format=frame.format,
|
||||
)
|
||||
frame = OpenAILLMContextFrame(context)
|
||||
await self.push_frame(frame)
|
||||
self._describe_text = None
|
||||
else:
|
||||
|
||||
@@ -42,7 +42,6 @@ from pipecat.frames.frames import (
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
UserImageRawFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
@@ -495,12 +494,6 @@ class AnthropicLLMService(LLMService):
|
||||
context = frame.context
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
context = AnthropicLLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
# This is only useful in very simple pipelines because it creates
|
||||
# a new context. Generally we want a context manager to catch
|
||||
# UserImageRawFrames coming through the pipeline and add them
|
||||
# to the context.
|
||||
context = AnthropicLLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
elif isinstance(frame, LLMEnablePromptCachingFrame):
|
||||
@@ -626,22 +619,6 @@ class AnthropicLLMContext(OpenAILLMContext):
|
||||
self._restructure_from_openai_messages()
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def from_image_frame(cls, frame: VisionImageRawFrame) -> "AnthropicLLMContext":
|
||||
"""Create context from a vision image frame.
|
||||
|
||||
Args:
|
||||
frame: The vision image frame to process.
|
||||
|
||||
Returns:
|
||||
New Anthropic context with the image message.
|
||||
"""
|
||||
context = cls()
|
||||
context.add_image_frame_message(
|
||||
format=frame.format, size=frame.size, image=frame.image, text=frame.text
|
||||
)
|
||||
return context
|
||||
|
||||
def set_messages(self, messages: List):
|
||||
"""Set the messages list and reset cache tracking.
|
||||
|
||||
|
||||
@@ -39,7 +39,6 @@ from pipecat.frames.frames import (
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
UserImageRawFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
@@ -180,22 +179,6 @@ class AWSBedrockLLMContext(OpenAILLMContext):
|
||||
self._restructure_from_openai_messages()
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def from_image_frame(cls, frame: VisionImageRawFrame) -> "AWSBedrockLLMContext":
|
||||
"""Create AWS Bedrock context from vision image frame.
|
||||
|
||||
Args:
|
||||
frame: The vision image frame to convert.
|
||||
|
||||
Returns:
|
||||
New AWS Bedrock LLM context instance.
|
||||
"""
|
||||
context = cls()
|
||||
context.add_image_frame_message(
|
||||
format=frame.format, size=frame.size, image=frame.image, text=frame.text
|
||||
)
|
||||
return context
|
||||
|
||||
def set_messages(self, messages: List):
|
||||
"""Set the messages list and restructure for Bedrock format.
|
||||
|
||||
@@ -399,9 +382,34 @@ class AWSBedrockLLMContext(OpenAILLMContext):
|
||||
elif isinstance(content, list):
|
||||
new_content = []
|
||||
for item in content:
|
||||
# fix empty text
|
||||
if item.get("type", "") == "text":
|
||||
text_content = item["text"] if item["text"] != "" else "(empty)"
|
||||
new_content.append({"text": text_content})
|
||||
# handle image_url -> image conversion
|
||||
if item["type"] == "image_url":
|
||||
print(f"[pk] Converting image_url item: {item}")
|
||||
new_item = {
|
||||
"image": {
|
||||
"format": "jpeg",
|
||||
"source": {
|
||||
"bytes": base64.b64decode(item["image_url"]["url"].split(",")[1])
|
||||
},
|
||||
}
|
||||
}
|
||||
new_content.append(new_item)
|
||||
# In the case where there's a single image in the list (like what
|
||||
# would result from a UserImageRawFrame), ensure that the image
|
||||
# comes before text
|
||||
image_indices = [i for i, item in enumerate(new_content) if "image" in item]
|
||||
text_indices = [i for i, item in enumerate(new_content) if "text" in item]
|
||||
if len(image_indices) == 1 and text_indices:
|
||||
img_idx = image_indices[0]
|
||||
first_txt_idx = text_indices[0]
|
||||
if img_idx > first_txt_idx:
|
||||
# Move image before the first text
|
||||
image_item = new_content.pop(img_idx)
|
||||
new_content.insert(first_txt_idx, image_item)
|
||||
return {"role": message["role"], "content": new_content}
|
||||
|
||||
return message
|
||||
@@ -967,7 +975,9 @@ class AWSBedrockLLMService(LLMService):
|
||||
}
|
||||
|
||||
# Add system message
|
||||
request_params["system"] = context.system
|
||||
system = getattr(context, "system", None)
|
||||
if system:
|
||||
request_params["system"] = system
|
||||
|
||||
# Check if messages contain tool use or tool result content blocks
|
||||
has_tool_content = False
|
||||
@@ -1120,12 +1130,6 @@ class AWSBedrockLLMService(LLMService):
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Bedrock.")
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
context = AWSBedrockLLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
# This is only useful in very simple pipelines because it creates
|
||||
# a new context. Generally we want a context manager to catch
|
||||
# UserImageRawFrames coming through the pipeline and add them
|
||||
# to the context.
|
||||
context = AWSBedrockLLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
else:
|
||||
|
||||
@@ -36,7 +36,6 @@ from pipecat.frames.frames import (
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
UserImageRawFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
@@ -1013,15 +1012,6 @@ class GoogleLLMService(LLMService):
|
||||
# NOTE: LLMMessagesFrame is deprecated, so we don't support the newer universal
|
||||
# LLMContext with it
|
||||
context = GoogleLLMContext(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
# This is only useful in very simple pipelines because it creates
|
||||
# a new context. Generally we want a context manager to catch
|
||||
# UserImageRawFrames coming through the pipeline and add them
|
||||
# to the context.
|
||||
context = GoogleLLMContext()
|
||||
context.add_image_frame_message(
|
||||
format=frame.format, size=frame.size, image=frame.image, text=frame.text
|
||||
)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
else:
|
||||
|
||||
@@ -11,17 +11,20 @@ for image analysis and description generation.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, TextFrame, VisionImageRawFrame
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, TextFrame
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.services.vision_service import VisionService
|
||||
|
||||
try:
|
||||
import torch
|
||||
from transformers import AutoModelForCausalLM, AutoTokenizer
|
||||
from transformers import AutoModelForCausalLM
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use Moondream, you need to `pip install pipecat-ai[moondream]`.")
|
||||
@@ -94,11 +97,11 @@ class MoondreamService(VisionService):
|
||||
|
||||
logger.debug("Loaded Moondream model")
|
||||
|
||||
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
|
||||
async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]:
|
||||
"""Analyze an image and generate a description.
|
||||
|
||||
Args:
|
||||
frame: Vision frame containing the image data and optional question text.
|
||||
context: The context to process, containing image data.
|
||||
|
||||
Yields:
|
||||
Frame: TextFrame containing the generated image description, or ErrorFrame
|
||||
@@ -109,22 +112,45 @@ class MoondreamService(VisionService):
|
||||
yield ErrorFrame("Moondream model not available")
|
||||
return
|
||||
|
||||
logger.debug(f"Analyzing image: {frame}")
|
||||
image_bytes = None
|
||||
text = None
|
||||
try:
|
||||
messages = context.get_messages()
|
||||
last_message = messages[-1]
|
||||
last_message_content = last_message.get("content")
|
||||
|
||||
def get_image_description(frame: VisionImageRawFrame):
|
||||
"""Generate description for the given image frame.
|
||||
for item in last_message_content:
|
||||
if isinstance(item, dict):
|
||||
if (
|
||||
"image_url" in item
|
||||
and isinstance(item["image_url"], dict)
|
||||
and item["image_url"].get("url")
|
||||
):
|
||||
image_bytes = base64.b64decode(item["image_url"]["url"].split(",")[1])
|
||||
elif "text" in item and isinstance(item["text"], str):
|
||||
text = item["text"]
|
||||
|
||||
Args:
|
||||
frame: Vision frame containing image data and question.
|
||||
except Exception as e:
|
||||
logger.error(f"Exception during image extraction: {e}")
|
||||
yield ErrorFrame("Failed to extract image from context")
|
||||
return
|
||||
|
||||
Returns:
|
||||
str: Generated description of the image.
|
||||
"""
|
||||
image = Image.frombytes(frame.format, frame.size, frame.image)
|
||||
if not image_bytes:
|
||||
logger.error("No image found in context")
|
||||
yield ErrorFrame("No image found in context")
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
f"Analyzing image (bytes length: {len(image_bytes) if image_bytes else 'None'})"
|
||||
)
|
||||
|
||||
def get_image_description(bytes: bytes, text: Optional[str]) -> str:
|
||||
image_buffer = BytesIO(bytes)
|
||||
image = Image.open(image_buffer)
|
||||
image_embeds = self._model.encode_image(image)
|
||||
description = self._model.query(image_embeds, frame.text)["answer"]
|
||||
description = self._model.query(image_embeds, text)["answer"]
|
||||
return description
|
||||
|
||||
description = await asyncio.to_thread(get_image_description, frame)
|
||||
description = await asyncio.to_thread(get_image_description, image_bytes, text)
|
||||
|
||||
yield TextFrame(text=description)
|
||||
|
||||
@@ -32,7 +32,6 @@ from pipecat.frames.frames import (
|
||||
LLMMessagesFrame,
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
@@ -418,8 +417,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
"""Process frames for LLM completion requests.
|
||||
|
||||
Handles OpenAILLMContextFrame, LLMContextFrame, LLMMessagesFrame,
|
||||
VisionImageRawFrame, and LLMUpdateSettingsFrame to trigger LLM
|
||||
completions and manage settings.
|
||||
and LLMUpdateSettingsFrame to trigger LLM completions and manage
|
||||
settings.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
@@ -438,16 +437,6 @@ class BaseOpenAILLMService(LLMService):
|
||||
# NOTE: LLMMessagesFrame is deprecated, so we don't support the newer universal
|
||||
# LLMContext with it
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
# This is only useful in very simple pipelines because it creates
|
||||
# a new context. Generally we want a context manager to catch
|
||||
# UserImageRawFrames coming through the pipeline and add them
|
||||
# to the context.
|
||||
# TODO: support the newer universal LLMContext with a VisionImageRawFrame equivalent?
|
||||
context = OpenAILLMContext()
|
||||
context.add_image_frame_message(
|
||||
format=frame.format, size=frame.size, image=frame.image, text=frame.text
|
||||
)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
else:
|
||||
|
||||
@@ -14,7 +14,8 @@ visual content.
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pipecat.frames.frames import Frame, VisionImageRawFrame
|
||||
from pipecat.frames.frames import Frame, LLMContextFrame
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_service import AIService
|
||||
|
||||
@@ -37,15 +38,15 @@ class VisionService(AIService):
|
||||
self._describe_text = None
|
||||
|
||||
@abstractmethod
|
||||
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
|
||||
"""Process a vision image frame and generate results.
|
||||
async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]:
|
||||
"""Process the latest image in the context and generate results.
|
||||
|
||||
This method must be implemented by subclasses to provide actual computer
|
||||
vision functionality such as image description, object detection, or
|
||||
visual question answering.
|
||||
|
||||
Args:
|
||||
frame: The vision image frame to process, containing image data.
|
||||
context: The context to process, containing image data.
|
||||
|
||||
Yields:
|
||||
Frame: Frames containing the vision analysis results, typically TextFrame
|
||||
@@ -65,9 +66,9 @@ class VisionService(AIService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VisionImageRawFrame):
|
||||
if isinstance(frame, LLMContextFrame):
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_vision(frame))
|
||||
await self.process_generator(self.run_vision(frame.context))
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
Reference in New Issue
Block a user