diff --git a/CHANGELOG.md b/CHANGELOG.md index 78620da43..32952afd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added `VisionImageRawFrame`. This is an input image frame with an associated - text. It is usually processed by vision services (e.g. Moondream). The text - guides the vision service on how to analyze the image. - - Added support for including images or audio to LLM context messages using `LLMContext.create_image_message()` or `LLMContext.create_image_url_message()` (not all LLMs support URLs) and `LLMContext.create_audio_message()`. For @@ -168,9 +164,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Updated `MoondreamService` to process `VisionImageRawFrame`. +- `UserImageRawFrame` new fields `add_to_context` and `text`. The + `add_to_context` field indicates if this image and text should be added to the + LLM context (by the LLM assistant aggregator). The `text` field, if set, might + also guide the LLM or the vision service on how to analyze the image. -- `VisionService` expects `VisionImageRawFrame` in order to analyze images. +- `UserImageRequestFrame` new fiels `add_to_context` and `text`. Both fields + will be used to set the same fields on the captured `UserImageRawFrame`. + +- `UserImageRequestFrame` don't require function call name and ID anymore. + +- Updated `MoondreamService` to process `UserImageRawFrame`. + +- `VisionService` expects `UserImageRawFrame` in order to analyze images. - `DailyTransport` triggers `on_error` event if transcription can't be started or stopped. @@ -196,6 +202,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Deprecated +- `LLMService.request_image_frame()` is deprecated, push a + `UserImageRequestFrame` instead. + - `UserResponseAggregator` is deprecated and will be removed in a future version. - The `send_transcription_frames` argument to `OpenAIRealtimeLLMService` is diff --git a/examples/foundational/14d-function-calling-anthropic-video.py b/examples/foundational/14d-function-calling-anthropic-video.py index bf35b9648..db4c75c48 100644 --- a/examples/foundational/14d-function-calling-anthropic-video.py +++ b/examples/foundational/14d-function-calling-anthropic-video.py @@ -15,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -42,21 +43,18 @@ async def fetch_user_image(params: FunctionCallParams): When called, this function pushes a UserImageRequestFrame upstream to the transport. As a result, the transport will request the user image and push a - UserImageRawFrame downstream associated to this request. When the - UserImageRawFrame reaches the LLM assistant aggregator, the image will be - added to the context. + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. """ user_id = params.arguments["user_id"] question = params.arguments["question"] logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the user image frame. Note that this image is associated to a - # function call and will be handled by the LLM assistant aggregators. - await params.llm.request_image_frame( - user_id=user_id, - function_name=params.function_name, - tool_call_id=params.tool_call_id, - text_content=question, + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, ) await params.result_callback(None) diff --git a/examples/foundational/14d-function-calling-aws-video.py b/examples/foundational/14d-function-calling-aws-video.py index 20367bad2..dcd8cf202 100644 --- a/examples/foundational/14d-function-calling-aws-video.py +++ b/examples/foundational/14d-function-calling-aws-video.py @@ -15,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -42,21 +43,18 @@ async def fetch_user_image(params: FunctionCallParams): When called, this function pushes a UserImageRequestFrame upstream to the transport. As a result, the transport will request the user image and push a - UserImageRawFrame downstream associated to this request. When the - UserImageRawFrame reaches the LLM assistant aggregator, the image will be - added to the context. + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. """ user_id = params.arguments["user_id"] question = params.arguments["question"] logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the user image frame. Note that this image is associated to a - # function call and will be handled by the LLM assistant aggregators. - await params.llm.request_image_frame( - user_id=user_id, - function_name=params.function_name, - tool_call_id=params.tool_call_id, - text_content=question, + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, ) await params.result_callback(None) diff --git a/examples/foundational/14d-function-calling-gemini-flash-video.py b/examples/foundational/14d-function-calling-gemini-flash-video.py index 867fc3180..7a8eecca1 100644 --- a/examples/foundational/14d-function-calling-gemini-flash-video.py +++ b/examples/foundational/14d-function-calling-gemini-flash-video.py @@ -15,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -42,21 +43,18 @@ async def fetch_user_image(params: FunctionCallParams): When called, this function pushes a UserImageRequestFrame upstream to the transport. As a result, the transport will request the user image and push a - UserImageRawFrame downstream associated to this request. When the - UserImageRawFrame reaches the LLM assistant aggregator, the image will be - added to the context. + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. """ user_id = params.arguments["user_id"] question = params.arguments["question"] logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the user image frame. Note that this image is associated to a - # function call and will be handled by the LLM assistant aggregators. - await params.llm.request_image_frame( - user_id=user_id, - function_name=params.function_name, - tool_call_id=params.tool_call_id, - text_content=question, + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, ) await params.result_callback(None) diff --git a/examples/foundational/14d-function-calling-moondream-video.py b/examples/foundational/14d-function-calling-moondream-video.py index eb06556a9..a14ba14e5 100644 --- a/examples/foundational/14d-function-calling-moondream-video.py +++ b/examples/foundational/14d-function-calling-moondream-video.py @@ -15,20 +15,14 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import ( - Frame, - LLMRunFrame, - UserImageRawFrame, - UserImageRequestFrame, - VisionImageRawFrame, -) +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -57,40 +51,17 @@ async def fetch_user_image(params: FunctionCallParams): question = params.arguments["question"] logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the user image frame frame. In this case we don't use - # `llm.request_image_frame()` because we don't want the LLM to analyze it. + # Request a user image frame. In this case, we don't want the requested + # image to be added to the context because we will process it with + # Moondream. await params.llm.push_frame( - UserImageRequestFrame(user_id=user_id, context=question), FrameDirection.UPSTREAM + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=False), + FrameDirection.UPSTREAM, ) await params.result_callback(None) -class UserImageProcessor(FrameProcessor): - """Converts incoming user images into vision frames. - - This processor handles the UserImageRawFrame from the transport, converts it - to a VisionImageRawFrame and pushes it downstream so it can be handled by a - vision service. - - """ - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, UserImageRawFrame): - if frame.request and frame.request.context: - frame = VisionImageRawFrame( - image=frame.image, - text=frame.request.context, - size=frame.size, - format=frame.format, - ) - await self.push_frame(frame) - else: - await self.push_frame(frame, direction) - - # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets # selected. @@ -152,9 +123,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages, tools) context_aggregator = LLMContextAggregatorPair(context) - # This will get the get the user image frame and push it to the LLM. - image_processor = UserImageProcessor() - # If you run into weird description, try with use_cpu=True moondream = MoondreamService() @@ -165,7 +133,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context_aggregator.user(), # User responses ParallelPipeline( [llm], # LLM - [image_processor, moondream], + [moondream], ), tts, # TTS transport.output(), # Transport bot output diff --git a/examples/foundational/14d-function-calling-openai-video.py b/examples/foundational/14d-function-calling-openai-video.py index c09d39231..2dbdc7e63 100644 --- a/examples/foundational/14d-function-calling-openai-video.py +++ b/examples/foundational/14d-function-calling-openai-video.py @@ -16,12 +16,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -43,21 +44,18 @@ async def fetch_user_image(params: FunctionCallParams): When called, this function pushes a UserImageRequestFrame upstream to the transport. As a result, the transport will request the user image and push a - UserImageRawFrame downstream associated to this request. When the - UserImageRawFrame reaches the LLM assistant aggregator, the image will be - added to the context. + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. """ user_id = params.arguments["user_id"] question = params.arguments["question"] logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the user image frame. Note that this image is associated to a - # function call and will be handled by the LLM assistant aggregators. - await params.llm.request_image_frame( - user_id=user_id, - function_name=params.function_name, - tool_call_id=params.tool_call_id, - text_content=question, + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, ) await params.result_callback(None) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index afa647b1c..70975d262 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -11,7 +11,6 @@ including data frames, system frames, and control frames for audio, video, text, and LLM processing. """ -import asyncio from dataclasses import dataclass, field from typing import ( TYPE_CHECKING, @@ -1202,27 +1201,23 @@ class TransportMessageUrgentFrame(OutputTransportMessageUrgentFrame): class UserImageRequestFrame(SystemFrame): """Frame requesting an image from a specific user. - A frame to request an image from the given user. The frame might be - generated by a function call in which case the corresponding fields will be - properly set. + A frame to request an image from the given user. The request might come with + a text that can be later used to describe the requested image. Parameters: user_id: Identifier of the user to request image from. - context: Optional context for the image request. - function_name: Name of function that generated this request (if any). - tool_call_id: Tool call ID if generated by function call. + text: An optional text associated to the image request. + add_to_context: Whether the requested image should be added to an LLM context. video_source: Specific video source to capture from. """ user_id: str - context: Optional[Any] = None - function_name: Optional[str] = None - tool_call_id: Optional[str] = None + text: Optional[str] = None + add_to_context: Optional[bool] = None video_source: Optional[str] = None - request_event: Optional[asyncio.Event] = None def __str__(self): - return f"{self.name}(user: {self.user_id}, video_source: {self.video_source}, function: {self.function_name}, request: {self.tool_call_id})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, add_to_context: {self.add_to_context}, {self.video_source})" @dataclass @@ -1296,33 +1291,17 @@ class UserImageRawFrame(InputImageRawFrame): Parameters: user_id: Identifier of the user who provided this image. - request: The original image request frame if this is a response. + text: An optional text associated to this image. + add_to_context: Whether this image should be added to an LLM context. """ user_id: str = "" - request: Optional[UserImageRequestFrame] = None + text: Optional[str] = None + add_to_context: Optional[bool] = None def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})" - - -@dataclass -class VisionImageRawFrame(InputImageRawFrame): - """Raw image input frame to be analyzed by vision services. - - This is just an image with an associated text describing how the vision - service should analyze the image. - - Parameters: - text: Description of how the vision service should analyze the image. - """ - - text: str - - def __str__(self): - pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text})" + return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, add_to_context: {self.add_to_context})" @dataclass diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 920775853..44c534b9b 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -616,7 +616,7 @@ class LLMAssistantAggregator(LLMContextAggregator): await self._handle_function_call_result(frame) elif isinstance(frame, FunctionCallCancelFrame): await self._handle_function_call_cancel(frame) - elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id: + elif isinstance(frame, UserImageRawFrame): await self._handle_user_image_frame(frame) elif isinstance(frame, BotStoppedSpeakingFrame): await self.push_aggregation() @@ -767,30 +767,21 @@ class LLMAssistantAggregator(LLMContextAggregator): message["content"] = result async def _handle_user_image_frame(self, frame: UserImageRawFrame): - logger.debug( - f"{self} UserImageRawFrame: [{frame.request.function_name}:{frame.request.tool_call_id}]" - ) - - if frame.request.tool_call_id not in self._function_calls_in_progress: - logger.warning( - f"UserImageRawFrame tool_call_id [{frame.request.tool_call_id}] is not running" - ) + if not frame.add_to_context: return + logger.debug(f"{self} Adding UserImageRawFrame to LLM context (size: {frame.size})") + self._context.add_image_frame_message( format=frame.format, size=frame.size, image=frame.image, - text=frame.request.context, + text=frame.text, ) await self.push_aggregation() await self.push_context_frame(FrameDirection.UPSTREAM) - # Notify who ever requested the image that we have added it to the context. - if frame.request and frame.request.request_event: - frame.request.request_event.set() - async def _handle_llm_start(self, _: LLMFullResponseStartFrame): self._started += 1 diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index c75893ea3..0a1a835f7 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -503,6 +503,9 @@ class LLMService(AIService): the image. If you expect the image to be processed by a vision service, you might want to push a UserImageRequestFrame upstream directly. + .. deprecated:: 0.0.92 + This method is deprecated, push a `UserImageRequestFrame` instead. + Args: user_id: The ID of the user to request an image from. function_name: Optional function name associated with the request. @@ -512,24 +515,19 @@ class LLMService(AIService): timeout: Optional timeout for the requested image to be added to the LLM context. """ - request_event = asyncio.Event() if timeout else None + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Method `request_image_frame()` is deprecated, push a `UserImageRequestFrame` instead.", + DeprecationWarning, + ) await self.push_frame( - UserImageRequestFrame( - user_id=user_id, - function_name=function_name, - tool_call_id=tool_call_id, - context=text_content, - video_source=video_source, - request_event=request_event, - ), + UserImageRequestFrame(user_id=user_id, text=text_content), FrameDirection.UPSTREAM, ) - # Wait for the requested image to be added to the context. - if request_event: - await asyncio.wait_for(request_event.wait(), timeout=timeout) - async def _create_sequential_runner_task(self): if not self._sequential_runner_task: self._sequential_runner_queue = asyncio.Queue() diff --git a/src/pipecat/services/moondream/vision.py b/src/pipecat/services/moondream/vision.py index 6e470071e..b7527f76c 100644 --- a/src/pipecat/services/moondream/vision.py +++ b/src/pipecat/services/moondream/vision.py @@ -16,7 +16,12 @@ from typing import AsyncGenerator, Optional from loguru import logger from PIL import Image -from pipecat.frames.frames import ErrorFrame, Frame, TextFrame, VisionImageRawFrame +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TextFrame, + UserImageRawFrame, +) from pipecat.services.vision_service import VisionService try: @@ -94,11 +99,11 @@ class MoondreamService(VisionService): logger.debug("Loaded Moondream model") - async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: + async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]: """Analyze an image and generate a description. Args: - frame: The vision image frame to process. + frame: The image frame to process. Yields: Frame: TextFrame containing the generated image description, or ErrorFrame diff --git a/src/pipecat/services/vision_service.py b/src/pipecat/services/vision_service.py index 16b25277c..5de282896 100644 --- a/src/pipecat/services/vision_service.py +++ b/src/pipecat/services/vision_service.py @@ -14,7 +14,7 @@ visual content. from abc import abstractmethod from typing import AsyncGenerator -from pipecat.frames.frames import Frame, VisionImageRawFrame +from pipecat.frames.frames import Frame, UserImageRawFrame from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService @@ -37,7 +37,7 @@ class VisionService(AIService): self._describe_text = None @abstractmethod - async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: + async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]: """Process the given vision image and generate results. This method must be implemented by subclasses to provide actual computer @@ -45,7 +45,7 @@ class VisionService(AIService): visual question answering. Args: - frame: The vision image frame to process. + frame: The image frame to process. Yields: Frame: Frames containing the vision analysis results, typically TextFrame @@ -56,7 +56,7 @@ class VisionService(AIService): async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames, handling vision image frames for analysis. - Automatically processes VisionImageRawFrame objects by calling run_vision + Automatically processes UserImageRawFrame objects by calling run_vision and handles metrics tracking. Other frames are passed through unchanged. Args: @@ -65,7 +65,7 @@ class VisionService(AIService): """ await super().process_frame(frame, direction) - if isinstance(frame, VisionImageRawFrame): + if isinstance(frame, UserImageRawFrame) and frame.text: await self.start_processing_metrics() await self.process_generator(self.run_vision(frame)) await self.stop_processing_metrics() diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 18c36ee56..c6dd06986 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -1839,10 +1839,11 @@ class DailyInputTransport(BaseInputTransport): if render_frame: frame = UserImageRawFrame( user_id=participant_id, - request=request_frame, image=video_frame.buffer, size=(video_frame.width, video_frame.height), format=video_frame.color_format, + text=request_frame.text if request_frame else None, + add_to_context=request_frame.add_to_context if request_frame else None, ) frame.transport_source = video_source await self.push_video_frame(frame) diff --git a/src/pipecat/transports/smallwebrtc/transport.py b/src/pipecat/transports/smallwebrtc/transport.py index cdefa976f..4bd18c9ef 100644 --- a/src/pipecat/transports/smallwebrtc/transport.py +++ b/src/pipecat/transports/smallwebrtc/transport.py @@ -15,7 +15,7 @@ import asyncio import fractions import time from collections import deque -from typing import Any, Awaitable, Callable, Optional +from typing import Any, Awaitable, Callable, List, Optional import numpy as np from loguru import logger @@ -567,7 +567,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): self._receive_audio_task = None self._receive_video_task = None self._receive_screen_video_task = None - self._image_requests = {} + self._image_requests: List[UserImageRequestFrame] = [] # Whether we have seen a StartFrame already. self._initialized = False @@ -657,23 +657,27 @@ class SmallWebRTCInputTransport(BaseInputTransport): if video_frame: await self.push_video_frame(video_frame) - # Check if there are any pending image requests and create UserImageRawFrame - if self._image_requests: - for req_id, request_frame in list(self._image_requests.items()): - if request_frame.video_source == video_source: - # Create UserImageRawFrame using the current video frame - image_frame = UserImageRawFrame( - user_id=request_frame.user_id, - request=request_frame, - image=video_frame.image, - size=video_frame.size, - format=video_frame.format, - ) - image_frame.transport_source = video_source - # Push the frame to the pipeline - await self.push_video_frame(image_frame) - # Remove from pending requests - del self._image_requests[req_id] + # Check if there are any pending image requests and create + # UserImageRawFrame. Use a shallow copy so we can remove + # elements. + for request_frame in self._image_requests[:]: + if request_frame.video_source == video_source: + # Create UserImageRawFrame using the current video frame + image_frame = UserImageRawFrame( + user_id=request_frame.user_id, + image=video_frame.image, + size=video_frame.size, + format=video_frame.format, + text=request_frame.text if request_frame else None, + add_to_context=request_frame.add_to_context + if request_frame + else None, + ) + image_frame.transport_source = video_source + # Push the frame to the pipeline + await self.push_video_frame(image_frame) + # Remove from pending requests + self._image_requests.remove(request_frame) except Exception as e: logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})") @@ -701,8 +705,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): logger.debug(f"Requesting image from participant: {frame.user_id}") # Store the request - request_id = f"{frame.function_name}:{frame.tool_call_id}" - self._image_requests[request_id] = frame + self._image_requests.append(frame) # Default to camera if no source specified if frame.video_source is None: