diff --git a/CHANGELOG.md b/CHANGELOG.md index ab969fe52..220b721be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with streaming WebSocket support for `saarika` (STT) and `saaras` (STT-translate) models. +- Added support for including images or audio to LLM context messages using + `LLMContext.create_image_message()` or `LLMContext.create_image_url_message()` + (not all LLMs support URLs) and `LLMContext.create_audio_message()`. For + example, when creating `LLMMessagesAppendFrame`: + + ```python + message = LLMContext.create_image_message(image=..., size= ...) + await self.push_frame(LLMMessagesAppendFrame(messages=[message], run_llm=True)) + ``` + +- New event handlers for the `DeepgramFluxSTTService`: `on_start_of_turn`, + `on_turn_resumed`, `on_end_of_turn`, `on_eager_end_of_turn`, `on_update`. + - Added `generation_config` parameter support to `CartesiaTTSService` and `CartesiaHttpTTSService` for Cartesia Sonic-3 models. Includes a new `GenerationConfig` class with `volume` (0.5-2.0), `speed` (0.6-1.5), @@ -154,6 +167,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `UserImageRawFrame` new fields `add_to_context` and `text`. The + `add_to_context` field indicates if this image and text should be added to the + LLM context (by the LLM assistant aggregator). The `text` field, if set, might + also guide the LLM or the vision service on how to analyze the image. + +- `UserImageRequestFrame` new fiels `add_to_context` and `text`. Both fields + will be used to set the same fields on the captured `UserImageRawFrame`. + +- `UserImageRequestFrame` don't require function call name and ID anymore. + +- Updated `MoondreamService` to process `UserImageRawFrame`. + +- `VisionService` expects `UserImageRawFrame` in order to analyze images. + - `DailyTransport` triggers `on_error` event if transcription can't be started or stopped. @@ -178,6 +205,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Deprecated +- `LLMService.request_image_frame()` is deprecated, push a + `UserImageRequestFrame` instead. + +- `UserResponseAggregator` is deprecated and will be removed in a future version. + - The `send_transcription_frames` argument to `OpenAIRealtimeLLMService` is deprecated. Transcription frames are now always sent. They go upstream, to be handled by the user context aggregator. See "Added" section for details. @@ -196,6 +228,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue in `HumeTTSService` that was only using Octave 2, which does + not support the `description` field. Now, if a description is provided, it + switches to Octave 1. + - Fixed an issue where `DailyTransport` would timeout prematurely on join and on leave. @@ -205,7 +241,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed an issue in `ServiceSwitcher` where the `STTService`s would result in all STT services producing `TranscriptionFrame`s. -- Fixed an issue in `HumeTTSService` that was only using Octave 2, which does not support the `description` field. Now, if a description is provided, it switches to Octave 1. +### Other + +- Updated all vision 12-series foundational examples to load images from a file. + +- Added 14-series video examples for different services. These new examples + request an image from the user camera through a function call. ## [0.0.91] - 2025-10-21 diff --git a/examples/foundational/07a-interruptible-speechmatics-vad.py b/examples/foundational/07a-interruptible-speechmatics-vad.py index 55514017f..6e78a5147 100644 --- a/examples/foundational/07a-interruptible-speechmatics-vad.py +++ b/examples/foundational/07a-interruptible-speechmatics-vad.py @@ -6,6 +6,7 @@ import os +import aiohttp from dotenv import load_dotenv from loguru import logger @@ -20,10 +21,10 @@ from pipecat.processors.aggregators.llm_response import ( from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.elevenlabs.tts import ElevenLabsTTSService from pipecat.services.openai.base_llm import BaseOpenAILLMService from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.speechmatics.stt import SpeechmaticsSTTService +from pipecat.services.speechmatics.tts import SpeechmaticsTTSService from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -51,121 +52,127 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): - """Speechmatics STT Service Example + """Speechmatics STT and TTS Service Example - This example demonstrates using Speechmatics Speech-to-Text service with speaker diarization and intelligent speaker management. Key features: + This example demonstrates using Speechmatics Speech-to-Text and Text-to-Speech services + with speaker diarization and intelligent speaker management. Key features: - 1. Speaker Diarization + 1. Speaker Diarization (STT) - Automatically identifies and distinguishes between different speakers - First speaker is identified as 'S1', others get subsequent IDs - Uses `enable_diarization` parameter to manage speaker detection - 2. Smart Speaker Control + 2. Smart Speaker Control (STT) - `focus_speakers` parameter lets you target specific speakers (e.g. ["S1"]) - Other speakers will be wrapped in PASSIVE tags - Only processes speech from focused speakers - Words from all speakers are wrapped with XML tags for clear speaker identification - Other speakers' speech only sent when focused speaker is active - 3. Voice Activity Detection + 3. Voice Activity Detection (STT) - Built-in VAD using `enable_vad` parameter - Remove `vad_analyzer` from `transport` config to use module's VAD - Emits speaker started/stopped events - 4. Configuration Options + 4. Text-to-Speech (TTS) + - Low latency streaming audio synthesis + - Multiple voice options available including `sarah`, `theo`, and `megan` + + 5. Configuration Options - `operating_point` parameter defaults to `ENHANCED` for optimal accuracy - Configurable `end_of_utterance_silence_trigger` (default 0.5s) - Customizable speaker formatting - Additional diarization settings available - For detailed information about operating points and configuration: - https://docs.speechmatics.com/rt-api-ref + For detailed information: + - STT: https://docs.speechmatics.com/rt-api-ref + - TTS: https://docs.speechmatics.com/text-to-speech/quickstart """ logger.info(f"Starting bot") - - stt = SpeechmaticsSTTService( - api_key=os.getenv("SPEECHMATICS_API_KEY"), - params=SpeechmaticsSTTService.InputParams( - language=Language.EN, - enable_vad=True, - enable_diarization=True, - focus_speakers=["S1"], - end_of_utterance_silence_trigger=0.5, - speaker_active_format="<{speaker_id}>{text}", - speaker_passive_format="<{speaker_id}>{text}", - ), - ) - - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), - model="eleven_turbo_v2_5", - ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - params=BaseOpenAILLMService.InputParams(temperature=0.75), - ) - - messages = [ - { - "role": "system", - "content": ( - "You are a helpful British assistant called Alfred. " - "Your goal is to demonstrate your capabilities in a succinct way. " - "Your output will be converted to audio so don't include special characters in your answers. " - "Always include punctuation in your responses. " - "Give very short replies - do not give longer replies unless strictly necessary. " - "Respond to what the user said in a concise, funny, creative and helpful way. " - "Use `` tags to identify different speakers - do not use tags in your replies. " - "Do not respond to speakers within `` tags unless explicitly asked to. " + async with aiohttp.ClientSession() as session: + stt = SpeechmaticsSTTService( + api_key=os.getenv("SPEECHMATICS_API_KEY"), + params=SpeechmaticsSTTService.InputParams( + language=Language.EN, + enable_vad=True, + enable_diarization=True, + focus_speakers=["S1"], + end_of_utterance_silence_trigger=0.5, + speaker_active_format="<{speaker_id}>{text}", + speaker_passive_format="<{speaker_id}>{text}", ), - }, - ] + ) - context = LLMContext(messages) - context_aggregator = LLMContextAggregatorPair( - context, - user_params=LLMUserAggregatorParams(aggregation_timeout=0.005), - ) + tts = SpeechmaticsTTSService( + api_key=os.getenv("SPEECHMATICS_API_KEY"), + voice_id="sarah", + aiohttp_session=session, + ) - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + params=BaseOpenAILLMService.InputParams(temperature=0.75), + ) + + messages = [ + { + "role": "system", + "content": ( + "You are a helpful British assistant called Sarah. " + "Your goal is to demonstrate your capabilities in a succinct way. " + "Your output will be converted to audio so don't include special characters in your answers. " + "Always include punctuation in your responses. " + "Give very short replies - do not give longer replies unless strictly necessary. " + "Respond to what the user said in a concise, funny, creative and helpful way. " + "Use `` tags to identify different speakers - do not use tags in your replies. " + "Do not respond to speakers within `` tags unless explicitly asked to. " + ), + }, ] - ) - task = PipelineTask( - pipeline, - params=PipelineParams( - enable_metrics=True, - enable_usage_metrics=True, - ), - idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - ) + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(aggregation_timeout=0.005), + ) - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected") - # Kick off the conversation. - messages.append({"role": "system", "content": "Say a short hello to the user."}) - await task.queue_frames([LLMRunFrame()]) + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - await task.cancel() + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Say a short hello to the user."}) + await task.queue_frames([LLMRunFrame()]) - await runner.run(task) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) async def bot(runner_args: RunnerArguments): diff --git a/examples/foundational/07a-interruptible-speechmatics.py b/examples/foundational/07a-interruptible-speechmatics.py index 3d1e639b9..36ac39b82 100644 --- a/examples/foundational/07a-interruptible-speechmatics.py +++ b/examples/foundational/07a-interruptible-speechmatics.py @@ -6,6 +6,7 @@ import os +import aiohttp from dotenv import load_dotenv from loguru import logger @@ -24,10 +25,10 @@ from pipecat.processors.aggregators.llm_response import ( from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.elevenlabs.tts import ElevenLabsTTSService from pipecat.services.openai.base_llm import BaseOpenAILLMService from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.speechmatics.stt import SpeechmaticsSTTService +from pipecat.services.speechmatics.tts import SpeechmaticsTTSService from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -61,100 +62,106 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): - """Run example using Speechmatics STT. + """Run example using Speechmatics STT and TTS. - This example will use diarization within our STT service and output the words spoken by - each individual speaker and wrap them with XML tags for the LLM to process. Note the - instructions in the system context for the LLM. This greatly improves the conversation - experience by allowing the LLM to understand who is speaking in a multi-party call. + This example demonstrates a complete Speechmatics integration with both Speech-to-Text + and Text-to-Speech services: - By default, this example will use our ENHANCED operating point, which is optimized for - high accuracy. You can change this by setting the `operating_point` parameter to a different - value. + STT Features: + - Diarization to identify and distinguish between different speakers + - Words spoken by each speaker are wrapped with XML tags for LLM processing + - System context instructions help the LLM understand multi-party conversations + - ENHANCED operating point by default for optimal accuracy - For more information on operating points, see the Speechmatics documentation: - https://docs.speechmatics.com/rt-api-ref + TTS Features: + - Low latency streaming audio synthesis + - Multiple voice options available including `sarah`, `theo`, and `megan` + + For more information: + - STT: https://docs.speechmatics.com/rt-api-ref + - TTS: https://docs.speechmatics.com/text-to-speech/quickstart """ logger.info(f"Starting bot") - stt = SpeechmaticsSTTService( - api_key=os.getenv("SPEECHMATICS_API_KEY"), - params=SpeechmaticsSTTService.InputParams( - language=Language.EN, - enable_diarization=True, - end_of_utterance_silence_trigger=0.5, - speaker_active_format="<{speaker_id}>{text}", - ), - ) - - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), - model="eleven_turbo_v2_5", - ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - params=BaseOpenAILLMService.InputParams(temperature=0.75), - ) - - messages = [ - { - "role": "system", - "content": ( - "You are a helpful British assistant called Alfred. " - "Your goal is to demonstrate your capabilities in a succinct way. " - "Your output will be converted to audio so don't include special characters in your answers. " - "Always include punctuation in your responses. " - "Give very short replies - do not give longer replies unless strictly necessary. " - "Respond to what the user said in a concise, funny, creative and helpful way. " - "Use `` tags to identify different speakers - do not use tags in your replies." + async with aiohttp.ClientSession() as session: + stt = SpeechmaticsSTTService( + api_key=os.getenv("SPEECHMATICS_API_KEY"), + params=SpeechmaticsSTTService.InputParams( + language=Language.EN, + enable_diarization=True, + end_of_utterance_silence_trigger=0.5, + speaker_active_format="<{speaker_id}>{text}", ), - }, - ] + ) - context = LLMContext(messages) - context_aggregator = LLMContextAggregatorPair( - context, - user_params=LLMUserAggregatorParams(aggregation_timeout=0.005), - ) + tts = SpeechmaticsTTSService( + api_key=os.getenv("SPEECHMATICS_API_KEY"), + voice_id="sarah", + aiohttp_session=session, + ) - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + params=BaseOpenAILLMService.InputParams(temperature=0.75), + ) + + messages = [ + { + "role": "system", + "content": ( + "You are a helpful British assistant called Sarah. " + "Your goal is to demonstrate your capabilities in a succinct way. " + "Your output will be converted to audio so don't include special characters in your answers. " + "Always include punctuation in your responses. " + "Give very short replies - do not give longer replies unless strictly necessary. " + "Respond to what the user said in a concise, funny, creative and helpful way. " + "Use `` tags to identify different speakers - do not use tags in your replies." + ), + }, ] - ) - task = PipelineTask( - pipeline, - params=PipelineParams( - enable_metrics=True, - enable_usage_metrics=True, - ), - idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - ) + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(aggregation_timeout=0.005), + ) - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected") - # Kick off the conversation. - messages.append({"role": "system", "content": "Say a short hello to the user."}) - await task.queue_frames([LLMRunFrame()]) + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - await task.cancel() + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Say a short hello to the user."}) + await task.queue_frames([LLMRunFrame()]) - await runner.run(task) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) async def bot(runner_args: RunnerArguments): diff --git a/examples/foundational/07c-interruptible-deepgram-flux.py b/examples/foundational/07c-interruptible-deepgram-flux.py index 1331ab2a3..75a022a5c 100644 --- a/examples/foundational/07c-interruptible-deepgram-flux.py +++ b/examples/foundational/07c-interruptible-deepgram-flux.py @@ -101,6 +101,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() + @stt.event_handler("on_update") + async def on_deepgram_flux_update(stt, transcript): + logger.debug(f"On deeggram flux update: {transcript}") + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/examples/foundational/12b-describe-video-gpt-4o.py b/examples/foundational/12-describe-image-openai.py similarity index 50% rename from examples/foundational/12b-describe-video-gpt-4o.py rename to examples/foundational/12-describe-image-openai.py index 894d70d7b..97cb82054 100644 --- a/examples/foundational/12b-describe-video-gpt-4o.py +++ b/examples/foundational/12-describe-image-openai.py @@ -4,36 +4,25 @@ # SPDX-License-Identifier: BSD 2-Clause License # + import os -from typing import Optional from dotenv import load_dotenv from loguru import logger +from PIL import Image from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import ( - Frame, - LLMContextFrame, - TextFrame, - TTSSpeakFrame, - UserImageRawFrame, - UserImageRequestFrame, -) +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments -from pipecat.runner.utils import ( - create_transport, - get_transport_client_id, - maybe_capture_participant_camera, -) +from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService @@ -43,49 +32,6 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -class UserImageRequester(FrameProcessor): - """Converts incoming text into requests for user images.""" - - def __init__(self, participant_id: Optional[str] = None): - super().__init__() - self._participant_id = participant_id - - def set_participant_id(self, participant_id: str): - self._participant_id = participant_id - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if self._participant_id and isinstance(frame, TextFrame): - await self.push_frame( - UserImageRequestFrame(self._participant_id, context=frame.text), - FrameDirection.UPSTREAM, - ) - else: - await self.push_frame(frame, direction) - - -class UserImageProcessor(FrameProcessor): - """Converts incoming user images into context frames.""" - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, UserImageRawFrame): - if frame.request and frame.request.context: - context = LLMContext() - context.add_image_frame_message( - image=frame.image, - text=frame.request.context, - size=frame.size, - format=frame.format, - ) - frame = LLMContextFrame(context) - await self.push_frame(frame) - else: - await self.push_frame(frame, direction) - - # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets # selected. @@ -93,14 +39,12 @@ transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, - video_in_enabled=True, vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, - video_in_enabled=True, vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), ), @@ -110,33 +54,34 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - user_response = UserResponseAggregator() - - # Initialize the image requester without setting the participant ID yet - image_requester = UserImageRequester() - - image_processor = UserImageProcessor() - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - # OpenAI GPT-4o for vision analysis - openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + pipeline = Pipeline( [ - transport.input(), - stt, - user_response, - image_requester, - image_processor, - openai, - tts, - transport.output(), + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -151,16 +96,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") + logger.info(f"Client connected") - await maybe_capture_participant_camera(transport, client) + if not runner_args.body: + script_dir = os.path.dirname(__file__) + runner_args.body = { + "image_path": os.path.join(script_dir, "assets", "cat.jpg"), + "question": "Describe this image", + } - # Set the participant ID in the image requester - client_id = get_transport_client_id(transport, client) - image_requester.set_participant_id(client_id) + image_path = runner_args.body["image_path"] + question = runner_args.body["question"] - # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) + # Kick off the conversation. + image = Image.open(image_path) + message = LLMContext.create_image_message( + image=image.tobytes(), + format="RGB", + size=image.size, + text=question, + ) + messages.append(message) + await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12-describe-video.py b/examples/foundational/12-describe-video.py deleted file mode 100644 index eb783ad75..000000000 --- a/examples/foundational/12-describe-video.py +++ /dev/null @@ -1,180 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import os -from typing import Optional - -from dotenv import load_dotenv -from loguru import logger - -from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams -from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import ( - Frame, - LLMContextFrame, - TextFrame, - TTSSpeakFrame, - UserImageRawFrame, - UserImageRequestFrame, -) -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.runner.types import RunnerArguments -from pipecat.runner.utils import ( - create_transport, - get_transport_client_id, - maybe_capture_participant_camera, -) -from pipecat.services.cartesia.tts import CartesiaTTSService -from pipecat.services.deepgram.stt import DeepgramSTTService -from pipecat.services.moondream.vision import MoondreamService -from pipecat.transports.base_transport import BaseTransport, TransportParams -from pipecat.transports.daily.transport import DailyParams - -load_dotenv(override=True) - - -class UserImageRequester(FrameProcessor): - """Converts incoming text into requests for user images.""" - - def __init__(self, participant_id: Optional[str] = None): - super().__init__() - self._participant_id = participant_id - - def set_participant_id(self, participant_id: str): - self._participant_id = participant_id - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if self._participant_id and isinstance(frame, TextFrame): - await self.push_frame( - UserImageRequestFrame(self._participant_id, context=frame.text), - FrameDirection.UPSTREAM, - ) - else: - await self.push_frame(frame, direction) - - -class UserImageProcessor(FrameProcessor): - """Converts incoming user images into context frames.""" - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, UserImageRawFrame): - if frame.request and frame.request.context: - context = LLMContext() - context.add_image_frame_message( - image=frame.image, - text=frame.request.context, - size=frame.size, - format=frame.format, - ) - frame = LLMContextFrame(context) - await self.push_frame(frame) - else: - await self.push_frame(frame, direction) - - -# We store functions so objects (e.g. SileroVADAnalyzer) don't get -# instantiated. The function will be called when the desired transport gets -# selected. -transport_params = { - "daily": lambda: DailyParams( - audio_in_enabled=True, - audio_out_enabled=True, - video_in_enabled=True, - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), - turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), - ), - "webrtc": lambda: TransportParams( - audio_in_enabled=True, - audio_out_enabled=True, - video_in_enabled=True, - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), - turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), - ), -} - - -async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): - logger.info(f"Starting bot") - - user_response = UserResponseAggregator() - - # Initialize the image requester without setting the participant ID yet - image_requester = UserImageRequester() - - image_processor = UserImageProcessor() - - # If you run into weird description, try with use_cpu=True - moondream = MoondreamService() - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - - pipeline = Pipeline( - [ - transport.input(), - stt, - user_response, - image_requester, - image_processor, - moondream, - tts, - transport.output(), - ] - ) - - task = PipelineTask( - pipeline, - idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - ) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - - await maybe_capture_participant_camera(transport, client) - - # Set the participant ID in the image requester - client_id = get_transport_client_id(transport, client) - image_requester.set_participant_id(client_id) - - # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - await task.cancel() - - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) - - await runner.run(task) - - -async def bot(runner_args: RunnerArguments): - """Main bot entry point compatible with Pipecat Cloud.""" - transport = await create_transport(runner_args, transport_params) - await run_bot(transport, runner_args) - - -if __name__ == "__main__": - from pipecat.runner.run import main - - main() diff --git a/examples/foundational/12c-describe-video-anthropic.py b/examples/foundational/12a-describe-image-anthropic.py similarity index 50% rename from examples/foundational/12c-describe-video-anthropic.py rename to examples/foundational/12a-describe-image-anthropic.py index a8134d535..1690a06bf 100644 --- a/examples/foundational/12c-describe-video-anthropic.py +++ b/examples/foundational/12a-describe-image-anthropic.py @@ -4,36 +4,25 @@ # SPDX-License-Identifier: BSD 2-Clause License # + import os -from typing import Optional from dotenv import load_dotenv from loguru import logger +from PIL import Image from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import ( - Frame, - LLMContextFrame, - TextFrame, - TTSSpeakFrame, - UserImageRawFrame, - UserImageRequestFrame, -) +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments -from pipecat.runner.utils import ( - create_transport, - get_transport_client_id, - maybe_capture_participant_camera, -) +from pipecat.runner.utils import create_transport from pipecat.services.anthropic.llm import AnthropicLLMService from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService @@ -43,49 +32,6 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -class UserImageRequester(FrameProcessor): - """Converts incoming text into requests for user images.""" - - def __init__(self, participant_id: Optional[str] = None): - super().__init__() - self._participant_id = participant_id - - def set_participant_id(self, participant_id: str): - self._participant_id = participant_id - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if self._participant_id and isinstance(frame, TextFrame): - await self.push_frame( - UserImageRequestFrame(self._participant_id, context=frame.text), - FrameDirection.UPSTREAM, - ) - else: - await self.push_frame(frame, direction) - - -class UserImageProcessor(FrameProcessor): - """Converts incoming user images into context frames.""" - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, UserImageRawFrame): - if frame.request and frame.request.context: - context = LLMContext() - context.add_image_frame_message( - image=frame.image, - text=frame.request.context, - size=frame.size, - format=frame.format, - ) - frame = LLMContextFrame(context) - await self.push_frame(frame) - else: - await self.push_frame(frame, direction) - - # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets # selected. @@ -93,14 +39,12 @@ transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, - video_in_enabled=True, vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, - video_in_enabled=True, vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), ), @@ -110,33 +54,34 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - user_response = UserResponseAggregator() - - # Initialize the image requester without setting the participant ID yet - image_requester = UserImageRequester() - - image_processor = UserImageProcessor() - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - # Anthropic for vision analysis - anthropic = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY")) - tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) + llm = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + pipeline = Pipeline( [ - transport.input(), - stt, - user_response, - image_requester, - image_processor, - anthropic, - tts, - transport.output(), + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -151,16 +96,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") + logger.info(f"Client connected") - await maybe_capture_participant_camera(transport, client) + if not runner_args.body: + script_dir = os.path.dirname(__file__) + runner_args.body = { + "image_path": os.path.join(script_dir, "assets", "cat.jpg"), + "question": "Describe this image", + } - # Set the participant ID in the image requester - client_id = get_transport_client_id(transport, client) - image_requester.set_participant_id(client_id) + image_path = runner_args.body["image_path"] + question = runner_args.body["question"] - # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) + # Kick off the conversation. + image = Image.open(image_path) + message = LLMContext.create_image_message( + image=image.tobytes(), + format="RGB", + size=image.size, + text=question, + ) + messages.append(message) + await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12b-describe-image-aws.py b/examples/foundational/12b-describe-image-aws.py new file mode 100644 index 000000000..1827c8906 --- /dev/null +++ b/examples/foundational/12b-describe-image-aws.py @@ -0,0 +1,148 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os + +from dotenv import load_dotenv +from loguru import logger +from PIL import Image + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.aws.llm import AWSBedrockLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AWSBedrockLLMService( + aws_region="us-west-2", + model="us.anthropic.claude-3-7-sonnet-20250219-v1:0", + # Note: usually, prefer providing latency="optimized" param. + # Here we can't because AWS Bedrock doesn't support it for Claude 3.7, + # which we need for image input. + params=AWSBedrockLLMService.InputParams(temperature=0.8), + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + if not runner_args.body: + script_dir = os.path.dirname(__file__) + runner_args.body = { + "image_path": os.path.join(script_dir, "assets", "cat.jpg"), + "question": "Describe this image", + } + + image_path = runner_args.body["image_path"] + question = runner_args.body["question"] + + # Kick off the conversation. + image = Image.open(image_path) + message = LLMContext.create_image_message( + image=image.tobytes(), + format="RGB", + size=image.size, + text=question, + ) + messages.append(message) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/12c-describe-image-gemini-flash.py b/examples/foundational/12c-describe-image-gemini-flash.py new file mode 100644 index 000000000..9a36785e8 --- /dev/null +++ b/examples/foundational/12c-describe-image-gemini-flash.py @@ -0,0 +1,141 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os + +from dotenv import load_dotenv +from loguru import logger +from PIL import Image + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.google.llm import GoogleLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + if not runner_args.body: + script_dir = os.path.dirname(__file__) + runner_args.body = { + "image_path": os.path.join(script_dir, "assets", "cat.jpg"), + "question": "Describe this image", + } + + image_path = runner_args.body["image_path"] + question = runner_args.body["question"] + + # Kick off the conversation. + image = Image.open(image_path) + message = LLMContext.create_image_message( + image=image.tobytes(), + format="RGB", + size=image.size, + text=question, + ) + messages.append(message) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/12d-describe-image-moondream.py b/examples/foundational/12d-describe-image-moondream.py new file mode 100644 index 000000000..ee6f328f1 --- /dev/null +++ b/examples/foundational/12d-describe-image-moondream.py @@ -0,0 +1,122 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os + +from dotenv import load_dotenv +from loguru import logger +from PIL import Image + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import UserImageRawFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.moondream.vision import MoondreamService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "webrtc": lambda: TransportParams( + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + vision = MoondreamService() + + pipeline = Pipeline( + [ + vision, # Vision + tts, # TTS + transport.output(), # Transport bot output + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + if not runner_args.body: + script_dir = os.path.dirname(__file__) + runner_args.body = { + "image_path": os.path.join(script_dir, "assets", "cat.jpg"), + "question": "Describe this image", + } + + image_path = runner_args.body["image_path"] + question = runner_args.body["question"] + + # Describe the image. + image = Image.open(image_path) + await task.queue_frames( + [ + UserImageRawFrame( + image=image.tobytes(), + format="RGB", + size=image.size, + text=question, + ) + ] + ) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/14b-function-calling-anthropic-video.py b/examples/foundational/14d-function-calling-anthropic-video.py similarity index 60% rename from examples/foundational/14b-function-calling-anthropic-video.py rename to examples/foundational/14d-function-calling-anthropic-video.py index 009f59500..b21b9fdba 100644 --- a/examples/foundational/14b-function-calling-anthropic-video.py +++ b/examples/foundational/14d-function-calling-anthropic-video.py @@ -4,8 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # - -import asyncio import os from dotenv import load_dotenv @@ -17,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -39,34 +38,30 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -# Global variable to store the client ID -client_id = "" +async def fetch_user_image(params: FunctionCallParams): + """Fetch the user image and push it to the LLM. - -async def get_weather(params: FunctionCallParams): - location = params.arguments["location"] - await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.") - - -async def get_image(params: FunctionCallParams): + When called, this function pushes a UserImageRequestFrame upstream to the + transport. As a result, the transport will request the user image and push a + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. + """ + user_id = params.arguments["user_id"] question = params.arguments["question"] - logger.debug(f"Requesting image with user_id={client_id}, question={question}") + logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the image frame - await params.llm.request_image_frame( - user_id=client_id, - function_name=params.function_name, - tool_call_id=params.tool_call_id, - text_content=question, + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, ) - # Wait a short time for the frame to be processed - await asyncio.sleep(0.5) + await params.result_callback(None) - # Return a result to complete the function call - await params.result_callback( - f"I've captured an image from your camera and I'm analyzing what you asked about: {question}" - ) + # Instead of None, it's possible to also provide a tool call answer to + # tell the LLM that we are grabbing the image to analyze. + # await params.result_callback({"result": "Image is being captured."}) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -100,70 +95,32 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) - llm = AnthropicLLMService( - api_key=os.getenv("ANTHROPIC_API_KEY"), - model="claude-3-7-sonnet-latest", - params=AnthropicLLMService.InputParams(enable_prompt_caching=True), - ) - llm.register_function("get_weather", get_weather) - llm.register_function("get_image", get_image) + # Anthropic for vision analysis + llm = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY")) + llm.register_function("fetch_user_image", fetch_user_image) - weather_function = FunctionSchema( - name="get_weather", - description="Get the current weather", + fetch_image_function = FunctionSchema( + name="fetch_user_image", + description="Called when the user requests a description of their camera feed", properties={ - "location": { + "user_id": { "type": "string", - "description": "The city and state, e.g. San Francisco, CA", + "description": "The ID of the user to grab the image from", }, - }, - required=["location"], - ) - get_image_function = FunctionSchema( - name="get_image", - description="Get an image from the video stream.", - properties={ "question": { "type": "string", - "description": "The question that the user is asking about the image.", - } + "description": "The question that the user is asking about the image", + }, }, - required=["question"], + required=["user_id", "question"], ) - tools = ToolsSchema(standard_tools=[weather_function, get_image_function]) - - system_prompt = """\ -You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions. - -Your response will be turned into speech so use only simple words and punctuation. - -You have access to two tools: get_weather and get_image. - -You can respond to questions about the weather using the get_weather tool. - -You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \ -indicate you should use the get_image tool are: -- What do you see? -- What's in the video? -- Can you describe the video? -- Tell me about what you see. -- Tell me something interesting about what you see. -- What's happening in the video? - -If you need to use a tool, simply use the tool. Do not tell the user the tool you are using. Be brief and concise. - """ + tools = ToolsSchema(standard_tools=[fetch_image_function]) messages = [ { "role": "system", - "content": [ - { - "type": "text", - "text": system_prompt, - } - ], + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.", }, - {"role": "user", "content": "Start the conversation by introducing yourself."}, ] context = LLMContext(messages, tools) @@ -173,11 +130,11 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo [ transport.input(), # Transport user input stt, # STT - context_aggregator.user(), # User speech to text + context_aggregator.user(), # User responses llm, # LLM tts, # TTS transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses and tool context + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -196,10 +153,16 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo await maybe_capture_participant_camera(transport, client) - global client_id + # Set the participant ID in the image requester client_id = get_transport_client_id(transport, client) # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.", + } + ) await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") diff --git a/examples/foundational/12d-describe-video-aws.py b/examples/foundational/14d-function-calling-aws-video.py similarity index 53% rename from examples/foundational/12d-describe-video-aws.py rename to examples/foundational/14d-function-calling-aws-video.py index 5436b81ba..dffee193c 100644 --- a/examples/foundational/12d-describe-video-aws.py +++ b/examples/foundational/14d-function-calling-aws-video.py @@ -5,29 +5,23 @@ # import os -from typing import Optional from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import ( - Frame, - LLMContextFrame, - TextFrame, - TTSSpeakFrame, - UserImageRawFrame, - UserImageRequestFrame, -) +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -37,54 +31,37 @@ from pipecat.runner.utils import ( from pipecat.services.aws.llm import AWSBedrockLLMService from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -class UserImageRequester(FrameProcessor): - """Converts incoming text into requests for user images.""" +async def fetch_user_image(params: FunctionCallParams): + """Fetch the user image and push it to the LLM. - def __init__(self, participant_id: Optional[str] = None): - super().__init__() - self._participant_id = participant_id + When called, this function pushes a UserImageRequestFrame upstream to the + transport. As a result, the transport will request the user image and push a + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. + """ + user_id = params.arguments["user_id"] + question = params.arguments["question"] + logger.debug(f"Requesting image with user_id={user_id}, question={question}") - def set_participant_id(self, participant_id: str): - self._participant_id = participant_id + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, + ) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) + await params.result_callback(None) - if self._participant_id and isinstance(frame, TextFrame): - await self.push_frame( - UserImageRequestFrame(self._participant_id, context=frame.text), - FrameDirection.UPSTREAM, - ) - else: - await self.push_frame(frame, direction) - - -class UserImageProcessor(FrameProcessor): - """Converts incoming user images into context frames.""" - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, UserImageRawFrame): - if frame.request and frame.request.context: - # Note: AWS Bedrock does not yet support the universal LLMContext - context = LLMContext() - context.add_image_frame_message( - image=frame.image, - text=frame.request.context, - size=frame.size, - format=frame.format, - ) - frame = LLMContextFrame(context) - await self.push_frame(frame) - else: - await self.push_frame(frame, direction) + # Instead of None, it's possible to also provide a tool call answer to + # tell the LLM that we are grabbing the image to analyze. + # await params.result_callback({"result": "Image is being captured."}) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -111,17 +88,15 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - user_response = UserResponseAggregator() - - # Initialize the image requester without setting the participant ID yet - image_requester = UserImageRequester() - - image_processor = UserImageProcessor() - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + # AWS for vision analysis - aws = AWSBedrockLLMService( + llm = AWSBedrockLLMService( aws_region="us-west-2", model="us.anthropic.claude-3-7-sonnet-20250219-v1:0", # Note: usually, prefer providing latency="optimized" param. @@ -129,22 +104,44 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # which we need for image input. params=AWSBedrockLLMService.InputParams(temperature=0.8), ) + llm.register_function("fetch_user_image", fetch_user_image) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + fetch_image_function = FunctionSchema( + name="fetch_user_image", + description="Called when the user requests a description of their camera feed", + properties={ + "user_id": { + "type": "string", + "description": "The ID of the user to grab the image from", + }, + "question": { + "type": "string", + "description": "The question that the user is asking about the image", + }, + }, + required=["user_id", "question"], ) + tools = ToolsSchema(standard_tools=[fetch_image_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.", + }, + ] + + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) pipeline = Pipeline( [ - transport.input(), - stt, - user_response, - image_requester, - image_processor, - aws, - tts, - transport.output(), + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -165,10 +162,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Set the participant ID in the image requester client_id = get_transport_client_id(transport, client) - image_requester.set_participant_id(client_id) - # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) + # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.", + } + ) + await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/12a-describe-video-gemini-flash.py b/examples/foundational/14d-function-calling-gemini-flash-video.py similarity index 51% rename from examples/foundational/12a-describe-video-gemini-flash.py rename to examples/foundational/14d-function-calling-gemini-flash-video.py index 63c1fd677..acb977c68 100644 --- a/examples/foundational/12a-describe-video-gemini-flash.py +++ b/examples/foundational/14d-function-calling-gemini-flash-video.py @@ -5,29 +5,23 @@ # import os -from typing import Optional from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import ( - Frame, - LLMContextFrame, - TextFrame, - TTSSpeakFrame, - UserImageRawFrame, - UserImageRequestFrame, -) +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.user_response import UserResponseAggregator -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -37,53 +31,37 @@ from pipecat.runner.utils import ( from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.google.llm import GoogleLLMService +from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -class UserImageRequester(FrameProcessor): - """Converts incoming text into requests for user images.""" +async def fetch_user_image(params: FunctionCallParams): + """Fetch the user image and push it to the LLM. - def __init__(self, participant_id: Optional[str] = None): - super().__init__() - self._participant_id = participant_id + When called, this function pushes a UserImageRequestFrame upstream to the + transport. As a result, the transport will request the user image and push a + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. + """ + user_id = params.arguments["user_id"] + question = params.arguments["question"] + logger.debug(f"Requesting image with user_id={user_id}, question={question}") - def set_participant_id(self, participant_id: str): - self._participant_id = participant_id + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, + ) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) + await params.result_callback(None) - if self._participant_id and isinstance(frame, TextFrame): - await self.push_frame( - UserImageRequestFrame(self._participant_id, context=frame.text), - FrameDirection.UPSTREAM, - ) - else: - await self.push_frame(frame, direction) - - -class UserImageProcessor(FrameProcessor): - """Converts incoming user images into context frames.""" - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, UserImageRawFrame): - if frame.request and frame.request.context: - context = LLMContext() - context.add_image_frame_message( - image=frame.image, - text=frame.request.context, - size=frame.size, - format=frame.format, - ) - frame = LLMContextFrame(context) - await self.push_frame(frame) - else: - await self.push_frame(frame, direction) + # Instead of None, it's possible to also provide a tool call answer to + # tell the LLM that we are grabbing the image to analyze. + # await params.result_callback({"result": "Image is being captured."}) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -110,33 +88,53 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - user_response = UserResponseAggregator() - - # Initialize the image requester without setting the participant ID yet - image_requester = UserImageRequester() - - image_processor = UserImageProcessor() - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - # Google Gemini model for vision analysis - google = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY")) - tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) + # Google Gemini model for vision analysis + llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY")) + llm.register_function("fetch_user_image", fetch_user_image) + + fetch_image_function = FunctionSchema( + name="fetch_user_image", + description="Called when the user requests a description of their camera feed", + properties={ + "user_id": { + "type": "string", + "description": "The ID of the user to grab the image from", + }, + "question": { + "type": "string", + "description": "The question that the user is asking about the image", + }, + }, + required=["user_id", "question"], + ) + tools = ToolsSchema(standard_tools=[fetch_image_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.", + }, + ] + + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) + pipeline = Pipeline( [ - transport.input(), - stt, - user_response, - image_requester, - image_processor, - google, - tts, - transport.output(), + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -157,10 +155,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Set the participant ID in the image requester client_id = get_transport_client_id(transport, client) - image_requester.set_participant_id(client_id) - # Welcome message - await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see.")) + # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.", + } + ) + await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): diff --git a/examples/foundational/14d-function-calling-moondream-video.py b/examples/foundational/14d-function-calling-moondream-video.py new file mode 100644 index 000000000..3a7889c00 --- /dev/null +++ b/examples/foundational/14d-function-calling-moondream-video.py @@ -0,0 +1,190 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame +from pipecat.pipeline.parallel_pipeline import ParallelPipeline +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import ( + create_transport, + get_transport_client_id, + maybe_capture_participant_camera, +) +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.moondream.vision import MoondreamService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +async def fetch_user_image(params: FunctionCallParams): + """Fetch the user image. + + When called, this function pushes a UserImageRequestFrame upstream to the + transport. As a result, the transport will request the user image and push a + UserImageRawFrame downstream. + """ + user_id = params.arguments["user_id"] + question = params.arguments["question"] + logger.debug(f"Requesting image with user_id={user_id}, question={question}") + + # Request a user image frame. In this case, we don't want the requested + # image to be added to the context because we will process it with + # Moondream. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=False), + FrameDirection.UPSTREAM, + ) + + await params.result_callback(None) + + # Instead of None, it's possible to also provide a tool call answer to + # tell the LLM that we are grabbing the image to analyze. + # await params.result_callback({"result": "Image is being captured."}) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + llm.register_function("fetch_user_image", fetch_user_image) + + fetch_image_function = FunctionSchema( + name="fetch_user_image", + description="Called when the user requests a description of their camera feed", + properties={ + "user_id": { + "type": "string", + "description": "The ID of the user to grab the image from", + }, + "question": { + "type": "string", + "description": "The question that the user is asking about the image", + }, + }, + required=["user_id", "question"], + ) + tools = ToolsSchema(standard_tools=[fetch_image_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.", + }, + ] + + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) + + # If you run into weird description, try with use_cpu=True + moondream = MoondreamService() + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + ParallelPipeline( + [llm], # LLM + [moondream], + ), + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + + await maybe_capture_participant_camera(transport, client) + + # Set the participant ID in the image requester + client_id = get_transport_client_id(transport, client) + + # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.", + } + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/14d-function-calling-video.py b/examples/foundational/14d-function-calling-openai-video.py similarity index 60% rename from examples/foundational/14d-function-calling-video.py rename to examples/foundational/14d-function-calling-openai-video.py index 48cf95ee9..ec6fb008b 100644 --- a/examples/foundational/14d-function-calling-video.py +++ b/examples/foundational/14d-function-calling-openai-video.py @@ -5,7 +5,6 @@ # -import asyncio import os from dotenv import load_dotenv @@ -17,12 +16,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -39,34 +39,30 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -# Global variable to store the client ID -client_id = "" +async def fetch_user_image(params: FunctionCallParams): + """Fetch the user image and push it to the LLM. - -async def get_weather(params: FunctionCallParams): - location = params.arguments["location"] - await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.") - - -async def get_image(params: FunctionCallParams): + When called, this function pushes a UserImageRequestFrame upstream to the + transport. As a result, the transport will request the user image and push a + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. + """ + user_id = params.arguments["user_id"] question = params.arguments["question"] - logger.debug(f"Requesting image with user_id={client_id}, question={question}") + logger.debug(f"Requesting image with user_id={user_id}, question={question}") - # Request the image frame - await params.llm.request_image_frame( - user_id=client_id, - function_name=params.function_name, - tool_call_id=params.tool_call_id, - text_content=question, + # Request a user image frame and indicate that it should be added to the + # context. + await params.llm.push_frame( + UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True), + FrameDirection.UPSTREAM, ) - # Wait a short time for the frame to be processed - await asyncio.sleep(0.5) + await params.result_callback(None) - # Return a result to complete the function call - await params.result_callback( - f"I've captured an image from your camera and I'm analyzing what you asked about: {question}" - ) + # Instead of None, it's possible to also provide a tool call answer to + # tell the LLM that we are grabbing the image to analyze. + # await params.result_callback({"result": "Image is being captured."}) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -101,58 +97,30 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - llm.register_function("get_weather", get_weather) - llm.register_function("get_image", get_image) + llm.register_function("fetch_user_image", fetch_user_image) - weather_function = FunctionSchema( - name="get_weather", - description="Get the current weather", + fetch_image_function = FunctionSchema( + name="fetch_user_image", + description="Called when the user requests a description of their camera feed", properties={ - "location": { + "user_id": { "type": "string", - "description": "The city and state, e.g. San Francisco, CA", + "description": "The ID of the user to grab the image from", }, - "format": { - "type": "string", - "enum": ["celsius", "fahrenheit"], - "description": "The temperature unit to use. Infer this from the user's location.", - }, - }, - required=["location"], - ) - get_image_function = FunctionSchema( - name="get_image", - description="Get an image from the video stream.", - properties={ "question": { "type": "string", - "description": "The question that the user is asking about the image.", - } + "description": "The question that the user is asking about the image", + }, }, - required=["question"], + required=["user_id", "question"], ) - tools = ToolsSchema(standard_tools=[weather_function, get_image_function]) + tools = ToolsSchema(standard_tools=[fetch_image_function]) - system_prompt = """\ -You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions. - -Your response will be turned into speech so use only simple words and punctuation. - -You have access to two tools: get_weather and get_image. - -You can respond to questions about the weather using the get_weather tool. - -You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \ -indicate you should use the get_image tool are: -- What do you see? -- What's in the video? -- Can you describe the video? -- Tell me about what you see. -- Tell me something interesting about what you see. -- What's happening in the video? -""" messages = [ - {"role": "system", "content": system_prompt}, + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.", + }, ] context = LLMContext(messages, tools) @@ -160,13 +128,13 @@ indicate you should use the get_image tool are: pipeline = Pipeline( [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -185,10 +153,15 @@ indicate you should use the get_image tool are: await maybe_capture_participant_camera(transport, client) - global client_id client_id = get_transport_client_id(transport, client) # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.", + } + ) await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") diff --git a/examples/foundational/assets/cat.jpg b/examples/foundational/assets/cat.jpg new file mode 100644 index 000000000..700b5fc92 Binary files /dev/null and b/examples/foundational/assets/cat.jpg differ diff --git a/scripts/evals/eval.py b/scripts/evals/eval.py index 5a164ad84..e23a99420 100644 --- a/scripts/evals/eval.py +++ b/scripts/evals/eval.py @@ -10,9 +10,10 @@ import os import re import time import wave +from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import List, Optional, Tuple +from typing import Any, List, Optional, Tuple import aiofiles from deepgram import LiveOptions @@ -53,6 +54,14 @@ EVAL_TIMEOUT_SECS = 120 EvalPrompt = str | Tuple[str, ImageFile] +@dataclass +class EvalConfig: + prompt: EvalPrompt + eval: str + eval_speaks_first: bool = False + runner_args_body: Optional[Any] = None + + class EvalRunner: def __init__( self, @@ -93,9 +102,7 @@ class EvalRunner: async def run_eval( self, example_file: str, - prompt: EvalPrompt, - eval: str, - user_speaks_first: bool = False, + eval_config: EvalConfig, ): if not re.match(self._pattern, example_file): return @@ -112,10 +119,8 @@ class EvalRunner: try: tasks = [ - asyncio.create_task(run_example_pipeline(script_path)), - asyncio.create_task( - run_eval_pipeline(self, example_file, prompt, eval, user_speaks_first) - ), + asyncio.create_task(run_example_pipeline(script_path, eval_config)), + asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)), ] _, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS) if pending: @@ -177,7 +182,7 @@ class EvalRunner: return os.path.join(self._recordings_dir, f"{base_name}.wav") -async def run_example_pipeline(script_path: Path): +async def run_example_pipeline(script_path: Path, eval_config: EvalConfig): room_url = os.getenv("DAILY_SAMPLE_ROOM_URL") module = load_module_from_path(script_path) @@ -196,6 +201,7 @@ async def run_example_pipeline(script_path: Path): runner_args = RunnerArguments() runner_args.pipeline_idle_timeout_secs = PIPELINE_IDLE_TIMEOUT_SECS + runner_args.body = eval_config.runner_args_body await module.run_bot(transport, runner_args) @@ -203,9 +209,7 @@ async def run_example_pipeline(script_path: Path): async def run_eval_pipeline( eval_runner: EvalRunner, example_file: str, - prompt: EvalPrompt, - eval: str, - user_speaks_first: bool = False, + eval_config: EvalConfig, ): logger.info(f"Starting eval bot") @@ -262,17 +266,16 @@ async def run_eval_pipeline( # Load example prompt depending on image. example_prompt = "" example_image: Optional[ImageFile] = None - if isinstance(prompt, str): - example_prompt = prompt - elif isinstance(prompt, tuple): - example_prompt, example_image = prompt + if isinstance(eval_config.prompt, str): + example_prompt = eval_config.prompt + elif isinstance(eval_config.prompt, tuple): + example_prompt, example_image = eval_config.prompt - eval_prompt = f"The answer is correct if it matches: {eval}." common_system_prompt = ( "The user might say things other than the answer and that's allowed. " - f"You should only call the eval function with your assessment when the user actually answers the question. {eval_prompt}" + f"You should only call the eval function when the user: {eval_config.eval}" ) - if user_speaks_first: + if eval_config.eval_speaks_first: system_prompt = f"You are an LLM eval, be extremly brief. You will start the conversation by saying: '{example_prompt}'. {common_system_prompt}" else: system_prompt = f"You are an LLM eval, be extremly brief. Your goal is to first ask one question: {example_prompt}. {common_system_prompt}" @@ -330,9 +333,9 @@ async def run_eval_pipeline( # Default behavior is for the bot to speak first # If the eval bot speaks first, we append the prompt to the messages - if user_speaks_first: + if eval_config.eval_speaks_first: messages.append( - {"role": "user", "content": f"Start by saying this exactly: '{prompt}'"} + {"role": "user", "content": f"Start by saying this exactly: '{eval_config.prompt}'"} ) await task.queue_frames([LLMRunFrame()]) diff --git a/scripts/evals/run-release-evals.py b/scripts/evals/run-release-evals.py index 14f9dee52..ce0a32dd6 100644 --- a/scripts/evals/run-release-evals.py +++ b/scripts/evals/run-release-evals.py @@ -11,7 +11,7 @@ from datetime import datetime, timezone from pathlib import Path from dotenv import load_dotenv -from eval import EvalRunner +from eval import EvalConfig, EvalRunner from loguru import logger from PIL import Image from utils import check_env_variables @@ -24,189 +24,183 @@ ASSETS_DIR = SCRIPT_DIR / "assets" FOUNDATIONAL_DIR = SCRIPT_DIR.parent.parent / "examples" / "foundational" -# Speaking order constants -USER_SPEAKS_FIRST = True -BOT_SPEAKS_FIRST = False - -# Math -PROMPT_SIMPLE_MATH = "A simple math addition." -EVAL_SIMPLE_MATH = "Correct math addition." - -# Weather -PROMPT_WEATHER = "What's the weather in San Francisco?" -EVAL_WEATHER = ( - "Something specific about the current weather in San Francisco, including the degrees." +EVAL_SIMPLE_MATH = EvalConfig( + prompt="A simple math addition.", + eval="The user answers the math addition correctly.", ) -# Online search -PROMPT_ONLINE_SEARCH = "What's the date right now in London?" -EVAL_ONLINE_SEARCH = f"Today is {datetime.now(timezone.utc).strftime('%B %d, %Y')}." +EVAL_WEATHER = EvalConfig( + prompt="What's the weather in San Francisco?", + eval="The user says something specific about the current weather in San Francisco, including the degrees.", +) -# Switch language -PROMPT_SWITCH_LANGUAGE = "Say something in Spanish." -EVAL_SWITCH_LANGUAGE = "The user is now talking in Spanish." +EVAL_ONLINE_SEARCH = EvalConfig( + prompt="What's the date right now in London?", + eval=f"The user says today is {datetime.now(timezone.utc).strftime('%B %d, %Y')} in London.", +) -# Vision -PROMPT_VISION = ("What do you see?", Image.open(ASSETS_DIR / "cat.jpg")) -EVAL_VISION = "A cat description." +EVAL_SWITCH_LANGUAGE = EvalConfig( + prompt="Say something in Spanish.", + eval="The user talks in Spanish.", +) + +EVAL_VISION_CAMERA = EvalConfig( + prompt=("Briefly describe what you see.", Image.open(ASSETS_DIR / "cat.jpg")), + eval="The user provides a cat description.", +) + + +def EVAL_VISION_IMAGE(*, eval_speaks_first: bool = False): + return EvalConfig( + prompt="Briefly describe this image.", + eval="The user provides a cat description.", + eval_speaks_first=eval_speaks_first, + runner_args_body={ + "image_path": ASSETS_DIR / "cat.jpg", + "question": "Briefly describe this image.", + }, + ) + + +EVAL_VOICEMAIL = EvalConfig( + prompt="Please leave a message.", + eval="The user leaves a voicemail message.", + eval_speaks_first=True, +) + +EVAL_CONVERSATION = EvalConfig( + prompt="Hello, this is Mark.", + eval="The user replies with a greeting.", + eval_speaks_first=True, +) -# Voicemail -PROMPT_VOICEMAIL = "Please leave a message after the beep." -EVAL_VOICEMAIL = "Assess the conversation and determine if it is a voicemail." -PROMPT_CONVERSATION = "Hello, this is Mark." -EVAL_CONVERSATION = "A start of a conversation, not a voicemail." TESTS_07 = [ # 07 series - ("07-interruptible.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07-interruptible-cartesia-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07a-interruptible-speechmatics.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07aa-interruptible-soniox.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07ab-interruptible-inworld-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07ac-interruptible-asyncai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07ac-interruptible-asyncai-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07c-interruptible-deepgram-flux.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ( - "07d-interruptible-elevenlabs-http.py", - PROMPT_SIMPLE_MATH, - EVAL_SIMPLE_MATH, - BOT_SPEAKS_FIRST, - ), - ("07f-interruptible-azure.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07g-interruptible-openai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07h-interruptible-openpipe.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07j-interruptible-gladia.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07k-interruptible-lmnt.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07l-interruptible-groq.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07m-interruptible-aws.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07m-interruptible-aws-strands.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("07n-interruptible-gemini.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07n-interruptible-google.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07o-interruptible-assemblyai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07q-interruptible-rime.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07q-interruptible-rime-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07r-interruptible-riva-nim.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ( - "07s-interruptible-google-audio-in.py", - PROMPT_SIMPLE_MATH, - EVAL_SIMPLE_MATH, - BOT_SPEAKS_FIRST, - ), - ("07t-interruptible-fish.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07v-interruptible-neuphonic.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07v-interruptible-neuphonic-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07w-interruptible-fal.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07y-interruptible-minimax.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07z-interruptible-sarvam.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ("07ae-interruptible-hume.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + ("07-interruptible.py", EVAL_SIMPLE_MATH), + ("07-interruptible-cartesia-http.py", EVAL_SIMPLE_MATH), + ("07a-interruptible-speechmatics.py", EVAL_SIMPLE_MATH), + ("07aa-interruptible-soniox.py", EVAL_SIMPLE_MATH), + ("07ab-interruptible-inworld-http.py", EVAL_SIMPLE_MATH), + ("07ac-interruptible-asyncai.py", EVAL_SIMPLE_MATH), + ("07ac-interruptible-asyncai-http.py", EVAL_SIMPLE_MATH), + ("07b-interruptible-langchain.py", EVAL_SIMPLE_MATH), + ("07c-interruptible-deepgram.py", EVAL_SIMPLE_MATH), + ("07c-interruptible-deepgram-flux.py", EVAL_SIMPLE_MATH), + ("07d-interruptible-elevenlabs.py", EVAL_SIMPLE_MATH), + ("07d-interruptible-elevenlabs-http.py", EVAL_SIMPLE_MATH), + ("07f-interruptible-azure.py", EVAL_SIMPLE_MATH), + ("07g-interruptible-openai.py", EVAL_SIMPLE_MATH), + ("07h-interruptible-openpipe.py", EVAL_SIMPLE_MATH), + ("07j-interruptible-gladia.py", EVAL_SIMPLE_MATH), + ("07k-interruptible-lmnt.py", EVAL_SIMPLE_MATH), + ("07l-interruptible-groq.py", EVAL_SIMPLE_MATH), + ("07m-interruptible-aws.py", EVAL_SIMPLE_MATH), + ("07m-interruptible-aws-strands.py", EVAL_WEATHER), + ("07n-interruptible-gemini.py", EVAL_SIMPLE_MATH), + ("07n-interruptible-google.py", EVAL_SIMPLE_MATH), + ("07o-interruptible-assemblyai.py", EVAL_SIMPLE_MATH), + ("07q-interruptible-rime.py", EVAL_SIMPLE_MATH), + ("07q-interruptible-rime-http.py", EVAL_SIMPLE_MATH), + ("07r-interruptible-riva-nim.py", EVAL_SIMPLE_MATH), + ("07s-interruptible-google-audio-in.py", EVAL_SIMPLE_MATH), + ("07t-interruptible-fish.py", EVAL_SIMPLE_MATH), + ("07v-interruptible-neuphonic.py", EVAL_SIMPLE_MATH), + ("07v-interruptible-neuphonic-http.py", EVAL_SIMPLE_MATH), + ("07w-interruptible-fal.py", EVAL_SIMPLE_MATH), + ("07y-interruptible-minimax.py", EVAL_SIMPLE_MATH), + ("07z-interruptible-sarvam.py", EVAL_SIMPLE_MATH), + ("07ae-interruptible-hume.py", EVAL_SIMPLE_MATH), # Needs a local XTTS docker instance running. - # ("07i-interruptible-xtts.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + # ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH), # Needs a Krisp license. - # ("07p-interruptible-krisp.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + # ("07p-interruptible-krisp.py", EVAL_SIMPLE_MATH), # Needs GPU resources. - # ("07u-interruptible-ultravox.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + # ("07u-interruptible-ultravox.py", EVAL_SIMPLE_MATH), ] TESTS_12 = [ - ("12-describe-video.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST), - ("12a-describe-video-gemini-flash.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST), - ("12b-describe-video-gpt-4o.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST), - ("12c-describe-video-anthropic.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST), + ("12-describe-image-openai.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), + ("12a-describe-image-anthropic.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), + ("12b-describe-image-aws.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), + ("12c-describe-image-gemini-flash.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), + ("12d-describe-image-moondream.py", EVAL_VISION_IMAGE()), ] TESTS_14 = [ - ("14-function-calling.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14a-function-calling-anthropic.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14b-function-calling-anthropic-video.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14d-function-calling-video.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14e-function-calling-google.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14f-function-calling-groq.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14g-function-calling-grok.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14h-function-calling-azure.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14i-function-calling-fireworks.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14j-function-calling-nim.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14k-function-calling-cerebras.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14m-function-calling-openrouter.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14n-function-calling-perplexity.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14p-function-calling-gemini-vertex-ai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14q-function-calling-qwen.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("14x-function-calling-openpipe.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), + ("14-function-calling.py", EVAL_WEATHER), + ("14a-function-calling-anthropic.py", EVAL_WEATHER), + ("14e-function-calling-google.py", EVAL_WEATHER), + ("14f-function-calling-groq.py", EVAL_WEATHER), + ("14g-function-calling-grok.py", EVAL_WEATHER), + ("14h-function-calling-azure.py", EVAL_WEATHER), + ("14i-function-calling-fireworks.py", EVAL_WEATHER), + ("14j-function-calling-nim.py", EVAL_WEATHER), + ("14k-function-calling-cerebras.py", EVAL_WEATHER), + ("14m-function-calling-openrouter.py", EVAL_WEATHER), + ("14n-function-calling-perplexity.py", EVAL_WEATHER), + ("14p-function-calling-gemini-vertex-ai.py", EVAL_WEATHER), + ("14q-function-calling-qwen.py", EVAL_WEATHER), + ("14r-function-calling-aws.py", EVAL_WEATHER), + ("14v-function-calling-openai.py", EVAL_WEATHER), + ("14w-function-calling-mistral.py", EVAL_WEATHER), + ("14x-function-calling-openpipe.py", EVAL_WEATHER), + # Video + ("14d-function-calling-anthropic-video.py", EVAL_VISION_CAMERA), + ("14d-function-calling-aws-video.py", EVAL_VISION_CAMERA), + ("14d-function-calling-gemini-flash-video.py", EVAL_VISION_CAMERA), + ("14d-function-calling-moondream-video.py", EVAL_VISION_CAMERA), + ("14d-function-calling-openai-video.py", EVAL_VISION_CAMERA), # Currently not working. - # ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - # ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - # ("14o-function-calling-gemini-openai-format.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), + # ("14c-function-calling-together.py", EVAL_WEATHER), + # ("14l-function-calling-deepseek.py", EVAL_WEATHER), + # ("14o-function-calling-gemini-openai-format.py", EVAL_WEATHER), ] TESTS_15 = [ - ("15a-switch-languages.py", PROMPT_SWITCH_LANGUAGE, EVAL_SWITCH_LANGUAGE, BOT_SPEAKS_FIRST), + ("15a-switch-languages.py", EVAL_SWITCH_LANGUAGE), ] TESTS_19 = [ - ("19-openai-realtime.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("19-openai-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), + ("19-openai-realtime.py", EVAL_WEATHER), + ("19-openai-realtime-beta.py", EVAL_WEATHER), # OpenAI Realtime not released on Azure yet - # ("19a-azure-realtime.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("19b-openai-realtime-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), - ("19b-openai-realtime-beta-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST), + # ("19a-azure-realtime.py", EVAL_WEATHER), + ("19a-azure-realtime-beta.py", EVAL_WEATHER), + ("19b-openai-realtime-text.py", EVAL_WEATHER), + ("19b-openai-realtime-beta-text.py", EVAL_WEATHER), ] TESTS_21 = [ - ("21a-tavus-video-service.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + ("21a-tavus-video-service.py", EVAL_SIMPLE_MATH), ] TESTS_26 = [ - ("26-gemini-multimodal-live.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ( - "26a-gemini-live-transcription.py", - PROMPT_SIMPLE_MATH, - EVAL_SIMPLE_MATH, - BOT_SPEAKS_FIRST, - ), - ( - "26b-gemini-live-function-calling.py", - PROMPT_WEATHER, - EVAL_WEATHER, - BOT_SPEAKS_FIRST, - ), - ("26c-gemini-live-video.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ( - "26e-gemini-multimodal-google-search.py", - PROMPT_ONLINE_SEARCH, - EVAL_ONLINE_SEARCH, - BOT_SPEAKS_FIRST, - ), + ("26-gemini-live.py", EVAL_SIMPLE_MATH), + ("26a-gemini-live-transcription.py", EVAL_SIMPLE_MATH), + ("26b-gemini-live-function-calling.py", EVAL_WEATHER), + ("26c-gemini-live-video.py", EVAL_SIMPLE_MATH), + ("26e-gemini-live-google-search.py", EVAL_ONLINE_SEARCH), + ("26h-gemini-live-vertex-function-calling.py", EVAL_WEATHER), # Currently not working. - # ("26d-gemini-live-text.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), - ( - "26h-gemini-live-vertex-function-calling.py", - PROMPT_WEATHER, - EVAL_WEATHER, - BOT_SPEAKS_FIRST, - ), + # ("26d-gemini-live-text.py", EVAL_SIMPLE_MATH), ] TESTS_27 = [ - ("27-simli-layer.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + ("27-simli-layer.py", EVAL_SIMPLE_MATH), ] TESTS_40 = [ - ("40-aws-nova-sonic.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + ("40-aws-nova-sonic.py", EVAL_SIMPLE_MATH), ] TESTS_43 = [ - ("43a-heygen-video-service.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST), + ("43a-heygen-video-service.py", EVAL_SIMPLE_MATH), ] TESTS_44 = [ - ("44-voicemail-detection.py", PROMPT_VOICEMAIL, EVAL_VOICEMAIL, USER_SPEAKS_FIRST), - ("44-voicemail-detection.py", PROMPT_CONVERSATION, EVAL_CONVERSATION, USER_SPEAKS_FIRST), + ("44-voicemail-detection.py", EVAL_VOICEMAIL), + ("44-voicemail-detection.py", EVAL_CONVERSATION), ] TESTS = [ @@ -244,9 +238,9 @@ async def main(args: argparse.Namespace): # Parse test config: (test, prompt, eval, user_speaks_first) for test_config in TESTS: - test, prompt, eval, user_speaks_first = test_config + test, eval_config = test_config - await runner.run_eval(test, prompt, eval, user_speaks_first) + await runner.run_eval(test, eval_config) runner.print_results() diff --git a/src/pipecat/adapters/services/anthropic_adapter.py b/src/pipecat/adapters/services/anthropic_adapter.py index a106b4de4..75fa5899d 100644 --- a/src/pipecat/adapters/services/anthropic_adapter.py +++ b/src/pipecat/adapters/services/anthropic_adapter.py @@ -245,13 +245,25 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]): item["text"] = "(empty)" # handle image_url -> image conversion if item["type"] == "image_url": - item["type"] = "image" - item["source"] = { - "type": "base64", - "media_type": "image/jpeg", - "data": item["image_url"]["url"].split(",")[1], - } - del item["image_url"] + if item["image_url"]["url"].startswith("data:"): + item["type"] = "image" + item["source"] = { + "type": "base64", + "media_type": "image/jpeg", + "data": item["image_url"]["url"].split(",")[1], + } + del item["image_url"] + elif item["image_url"]["url"].startswith("http"): + item["type"] = "image" + item["source"] = { + "type": "url", + "url": item["image_url"]["url"], + } + del item["image_url"] + else: + url = item["image_url"]["url"] + logger.warning(f"Unsupported 'image_url': {url}") + # In the case where there's a single image in the list (like what # would result from a UserImageRawFrame), ensure that the image # comes before text, as recommended by Anthropic docs diff --git a/src/pipecat/adapters/services/bedrock_adapter.py b/src/pipecat/adapters/services/bedrock_adapter.py index 852ea17a4..213e3b01c 100644 --- a/src/pipecat/adapters/services/bedrock_adapter.py +++ b/src/pipecat/adapters/services/bedrock_adapter.py @@ -256,15 +256,22 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]): new_content.append({"text": text_content}) # handle image_url -> image conversion if item["type"] == "image_url": - new_item = { - "image": { - "format": "jpeg", - "source": { - "bytes": base64.b64decode(item["image_url"]["url"].split(",")[1]) - }, + if item["image_url"]["url"].startswith("data:"): + new_item = { + "image": { + "format": "jpeg", + "source": { + "bytes": base64.b64decode( + item["image_url"]["url"].split(",")[1] + ) + }, + } } - } - new_content.append(new_item) + new_content.append(new_item) + else: + url = item["image_url"]["url"] + logger.warning(f"Unsupported 'image_url': {url}") + # In the case where there's a single image in the list (like what # would result from a UserImageRawFrame), ensure that the image # comes before text diff --git a/src/pipecat/adapters/services/gemini_adapter.py b/src/pipecat/adapters/services/gemini_adapter.py index 1fa8d9e6f..dc5bae559 100644 --- a/src/pipecat/adapters/services/gemini_adapter.py +++ b/src/pipecat/adapters/services/gemini_adapter.py @@ -343,7 +343,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): for c in content: if c["type"] == "text": parts.append(Part(text=c["text"])) - elif c["type"] == "image_url": + elif c["type"] == "image_url" and c["image_url"]["url"].startswith("data:"): parts.append( Part( inline_data=Blob( @@ -352,6 +352,9 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): ) ) ) + elif c["type"] == "image_url": + url = c["image_url"]["url"] + logger.warning(f"Unsupported 'image_url': {url}") elif c["type"] == "input_audio": input_audio = c["input_audio"] audio_bytes = base64.b64decode(input_audio["data"]) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 2a9651b83..70975d262 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1201,26 +1201,23 @@ class TransportMessageUrgentFrame(OutputTransportMessageUrgentFrame): class UserImageRequestFrame(SystemFrame): """Frame requesting an image from a specific user. - A frame to request an image from the given user. The frame might be - generated by a function call in which case the corresponding fields will be - properly set. + A frame to request an image from the given user. The request might come with + a text that can be later used to describe the requested image. Parameters: user_id: Identifier of the user to request image from. - context: Optional context for the image request. - function_name: Name of function that generated this request (if any). - tool_call_id: Tool call ID if generated by function call. + text: An optional text associated to the image request. + add_to_context: Whether the requested image should be added to an LLM context. video_source: Specific video source to capture from. """ user_id: str - context: Optional[Any] = None - function_name: Optional[str] = None - tool_call_id: Optional[str] = None + text: Optional[str] = None + add_to_context: Optional[bool] = None video_source: Optional[str] = None def __str__(self): - return f"{self.name}(user: {self.user_id}, video_source: {self.video_source}, function: {self.function_name}, request: {self.tool_call_id})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, add_to_context: {self.add_to_context}, {self.video_source})" @dataclass @@ -1294,15 +1291,17 @@ class UserImageRawFrame(InputImageRawFrame): Parameters: user_id: Identifier of the user who provided this image. - request: The original image request frame if this is a response. + text: An optional text associated to this image. + add_to_context: Whether this image should be added to an LLM context. """ user_id: str = "" - request: Optional[UserImageRequestFrame] = None + text: Optional[str] = None + add_to_context: Optional[bool] = None def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})" + return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, add_to_context: {self.add_to_context})" @dataclass diff --git a/src/pipecat/processors/aggregators/llm_context.py b/src/pipecat/processors/aggregators/llm_context.py index 8dc79fb50..d9280f9c0 100644 --- a/src/pipecat/processors/aggregators/llm_context.py +++ b/src/pipecat/processors/aggregators/llm_context.py @@ -16,6 +16,7 @@ service-specific adapter. import base64 import io +import wave from dataclasses import dataclass from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union @@ -113,6 +114,89 @@ class LLMContext: self._tools: ToolsSchema | NotGiven = LLMContext._normalize_and_validate_tools(tools) self._tool_choice: LLMContextToolChoice | NotGiven = tool_choice + @staticmethod + def create_image_url_message( + *, + role: str = "user", + url: str, + text: Optional[str] = None, + ) -> LLMContextMessage: + """Create a context message containing an image URL. + + Args: + role: The role of this message (defaults to "user"). + url: The URL of the image. + text: Optional text to include with the image. + """ + content = [] + if text: + content.append({"type": "text", "text": text}) + + content.append({"type": "image_url", "image_url": {"url": url}}) + + return {"role": role, "content": content} + + @staticmethod + def create_image_message( + *, + role: str = "user", + format: str, + size: tuple[int, int], + image: bytes, + text: Optional[str] = None, + ) -> LLMContextMessage: + """Create a context message containing an image. + + Args: + role: The role of this message (defaults to "user"). + format: Image format (e.g., 'RGB', 'RGBA'). + size: Image dimensions as (width, height) tuple. + image: Raw image bytes. + text: Optional text to include with the image. + """ + buffer = io.BytesIO() + Image.frombytes(format, size, image).save(buffer, format="JPEG") + encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8") + url = f"data:image/jpeg;base64,{encoded_image}" + + return LLMContext.create_image_url_message(role=role, url=url, text=text) + + @staticmethod + def create_audio_message( + *, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows" + ) -> LLMContextMessage: + """Create a context message containing audio. + + Args: + role: The role of this message (defaults to "user"). + audio_frames: List of audio frame objects to include. + text: Optional text to include with the audio. + """ + sample_rate = audio_frames[0].sample_rate + num_channels = audio_frames[0].num_channels + + content = [] + content.append({"type": "text", "text": text}) + data = b"".join(frame.audio for frame in audio_frames) + + with io.BytesIO() as buffer: + with wave.open(buffer, "wb") as wf: + wf.setsampwidth(2) + wf.setnchannels(num_channels) + wf.setframerate(sample_rate) + wf.writeframes(data) + + encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8") + + content.append( + { + "type": "input_audio", + "input_audio": {"data": encoded_audio, "format": "wav"}, + } + ) + + return {"role": role, "content": content} + @property def messages(self) -> List[LLMContextMessage]: """Get the current messages list. @@ -238,7 +322,7 @@ class LLMContext: self._tool_choice = tool_choice def add_image_frame_message( - self, *, format: str, size: tuple[int, int], image: bytes, text: str = None + self, *, format: str, size: tuple[int, int], image: bytes, text: Optional[str] = None ): """Add a message containing an image frame. @@ -248,17 +332,8 @@ class LLMContext: image: Raw image bytes. text: Optional text to include with the image. """ - buffer = io.BytesIO() - Image.frombytes(format, size, image).save(buffer, format="JPEG") - encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8") - - content = [] - if text: - content.append({"type": "text", "text": text}) - content.append( - {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}}, - ) - self.add_message({"role": "user", "content": content}) + message = LLMContext.create_image_message(format=format, size=size, image=image, text=text) + self.add_message(message) def add_audio_frames_message( self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows" @@ -269,66 +344,8 @@ class LLMContext: audio_frames: List of audio frame objects to include. text: Optional text to include with the audio. """ - if not audio_frames: - return - - sample_rate = audio_frames[0].sample_rate - num_channels = audio_frames[0].num_channels - - content = [] - content.append({"type": "text", "text": text}) - data = b"".join(frame.audio for frame in audio_frames) - data = bytes( - self._create_wav_header( - sample_rate, - num_channels, - 16, - len(data), - ) - + data - ) - encoded_audio = base64.b64encode(data).decode("utf-8") - content.append( - { - "type": "input_audio", - "input_audio": {"data": encoded_audio, "format": "wav"}, - } - ) - self.add_message({"role": "user", "content": content}) - - def _create_wav_header(self, sample_rate, num_channels, bits_per_sample, data_size): - """Create a WAV file header for audio data. - - Args: - sample_rate: Audio sample rate in Hz. - num_channels: Number of audio channels. - bits_per_sample: Bits per audio sample. - data_size: Size of audio data in bytes. - - Returns: - WAV header as a bytearray. - """ - # RIFF chunk descriptor - header = bytearray() - header.extend(b"RIFF") # ChunkID - header.extend((data_size + 36).to_bytes(4, "little")) # ChunkSize: total size - 8 - header.extend(b"WAVE") # Format - # "fmt " sub-chunk - header.extend(b"fmt ") # Subchunk1ID - header.extend((16).to_bytes(4, "little")) # Subchunk1Size (16 for PCM) - header.extend((1).to_bytes(2, "little")) # AudioFormat (1 for PCM) - header.extend(num_channels.to_bytes(2, "little")) # NumChannels - header.extend(sample_rate.to_bytes(4, "little")) # SampleRate - # Calculate byte rate and block align - byte_rate = sample_rate * num_channels * (bits_per_sample // 8) - block_align = num_channels * (bits_per_sample // 8) - header.extend(byte_rate.to_bytes(4, "little")) # ByteRate - header.extend(block_align.to_bytes(2, "little")) # BlockAlign - header.extend(bits_per_sample.to_bytes(2, "little")) # BitsPerSample - # "data" sub-chunk - header.extend(b"data") # Subchunk2ID - header.extend(data_size.to_bytes(4, "little")) # Subchunk2Size - return header + message = LLMContext.create_audio_message(audio_frames=audio_frames, text=text) + self.add_message(message) @staticmethod def _normalize_and_validate_tools(tools: ToolsSchema | NotGiven) -> ToolsSchema | NotGiven: diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 9f1e04fe0..44c534b9b 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -616,7 +616,7 @@ class LLMAssistantAggregator(LLMContextAggregator): await self._handle_function_call_result(frame) elif isinstance(frame, FunctionCallCancelFrame): await self._handle_function_call_cancel(frame) - elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id: + elif isinstance(frame, UserImageRawFrame): await self._handle_user_image_frame(frame) elif isinstance(frame, BotStoppedSpeakingFrame): await self.push_aggregation() @@ -767,27 +767,16 @@ class LLMAssistantAggregator(LLMContextAggregator): message["content"] = result async def _handle_user_image_frame(self, frame: UserImageRawFrame): - logger.debug( - f"{self} UserImageRawFrame: [{frame.request.function_name}:{frame.request.tool_call_id}]" - ) - - if frame.request.tool_call_id not in self._function_calls_in_progress: - logger.warning( - f"UserImageRawFrame tool_call_id [{frame.request.tool_call_id}] is not running" - ) + if not frame.add_to_context: return - del self._function_calls_in_progress[frame.request.tool_call_id] + logger.debug(f"{self} Adding UserImageRawFrame to LLM context (size: {frame.size})") - # Update context with the image frame - self._update_function_call_result( - frame.request.function_name, frame.request.tool_call_id, "COMPLETED" - ) self._context.add_image_frame_message( format=frame.format, size=frame.size, image=frame.image, - text=frame.request.context, + text=frame.text, ) await self.push_aggregation() diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index 274a31d52..a7872c35e 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -27,11 +27,24 @@ class UserResponseAggregator(LLMUserAggregator): def __init__(self, **kwargs): """Initialize the user response aggregator. + .. deprecated:: 0.0.92 + `UserResponseAggregator` is deprecated and will be removed in a future version. + Args: **kwargs: Additional arguments passed to parent LLMUserAggregator. """ super().__init__(context=LLMContext(), **kwargs) + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "`UserResponseAggregator` is deprecated and will be removed in a future version.", + DeprecationWarning, + stacklevel=2, + ) + async def push_aggregation(self): """Push the aggregated user response as a TextFrame. diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index f0b1a5baa..78f615a4d 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -156,6 +156,12 @@ class DeepgramFluxSTTService(WebsocketSTTService): self._language = Language.EN self._websocket_url = None self._receive_task = None + # Flux event handlers + self._register_event_handler("on_start_of_turn") + self._register_event_handler("on_turn_resumed") + self._register_event_handler("on_end_of_turn") + self._register_event_handler("on_eager_end_of_turn") + self._register_event_handler("on_update") async def _connect(self): """Connect to WebSocket and start background tasks. @@ -523,6 +529,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM) await self.start_metrics() + await self._call_event_handler("on_start_of_turn", transcript) if transcript: logger.trace(f"Start of turn transcript: {transcript}") @@ -537,6 +544,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): event: The event type string for logging purposes. """ logger.trace(f"Received event TurnResumed: {event}") + await self._call_event_handler("on_turn_resumed") async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]): """Handle EndOfTurn events from Deepgram Flux. @@ -571,6 +579,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): await self.stop_processing_metrics() await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM) await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM) + await self._call_event_handler("on_end_of_turn", transcript) async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]): """Handle EagerEndOfTurn events from Deepgram Flux. @@ -615,6 +624,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): result=data, ) ) + await self._call_event_handler("on_eager_end_of_turn", transcript) async def _handle_update(self, transcript: str): """Handle Update events from Deepgram Flux. @@ -638,3 +648,4 @@ class DeepgramFluxSTTService(WebsocketSTTService): # both the "user started speaking" event and the first transcript simultaneously, # making this timing measurement meaningless in this context. # await self.stop_ttfb_metrics() + await self._call_event_handler("on_update", transcript) diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index 6f87b95a5..0a1a835f7 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -492,11 +492,19 @@ class LLMService(AIService): tool_call_id: Optional[str] = None, text_content: Optional[str] = None, video_source: Optional[str] = None, + timeout: Optional[float] = 10.0, ): """Request an image from a user. Pushes a UserImageRequestFrame upstream to request an image from the - specified user. + specified user. The user image can then be processed by the LLM. + + Use this function from a function call if you want the LLM to process + the image. If you expect the image to be processed by a vision service, + you might want to push a UserImageRequestFrame upstream directly. + + .. deprecated:: 0.0.92 + This method is deprecated, push a `UserImageRequestFrame` instead. Args: user_id: The ID of the user to request an image from. @@ -504,15 +512,19 @@ class LLMService(AIService): tool_call_id: Optional tool call ID associated with the request. text_content: Optional text content/context for the image request. video_source: Optional video source identifier. + timeout: Optional timeout for the requested image to be added to the LLM context. + """ + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Method `request_image_frame()` is deprecated, push a `UserImageRequestFrame` instead.", + DeprecationWarning, + ) await self.push_frame( - UserImageRequestFrame( - user_id=user_id, - function_name=function_name, - tool_call_id=tool_call_id, - context=text_content, - video_source=video_source, - ), + UserImageRequestFrame(user_id=user_id, text=text_content), FrameDirection.UPSTREAM, ) diff --git a/src/pipecat/services/moondream/vision.py b/src/pipecat/services/moondream/vision.py index bd01daf34..b7527f76c 100644 --- a/src/pipecat/services/moondream/vision.py +++ b/src/pipecat/services/moondream/vision.py @@ -11,15 +11,17 @@ for image analysis and description generation. """ import asyncio -import base64 -from io import BytesIO from typing import AsyncGenerator, Optional from loguru import logger from PIL import Image -from pipecat.frames.frames import ErrorFrame, Frame, TextFrame -from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TextFrame, + UserImageRawFrame, +) from pipecat.services.vision_service import VisionService try: @@ -92,16 +94,16 @@ class MoondreamService(VisionService): trust_remote_code=True, revision=revision, device_map={"": device}, - torch_dtype=dtype, + dtype=dtype, ).eval() logger.debug("Loaded Moondream model") - async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]: + async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]: """Analyze an image and generate a description. Args: - context: The context to process, containing image data. + frame: The image frame to process. Yields: Frame: TextFrame containing the generated image description, or ErrorFrame @@ -112,45 +114,14 @@ class MoondreamService(VisionService): yield ErrorFrame("Moondream model not available") return - image_bytes = None - text = None - try: - messages = context.get_messages() - last_message = messages[-1] - last_message_content = last_message.get("content") + logger.debug(f"Analyzing image (bytes length: {len(frame.image)})") - for item in last_message_content: - if isinstance(item, dict): - if ( - "image_url" in item - and isinstance(item["image_url"], dict) - and item["image_url"].get("url") - ): - image_bytes = base64.b64decode(item["image_url"]["url"].split(",")[1]) - elif "text" in item and isinstance(item["text"], str): - text = item["text"] - - except Exception as e: - logger.error(f"Exception during image extraction: {e}") - yield ErrorFrame("Failed to extract image from context") - return - - if not image_bytes: - logger.error("No image found in context") - yield ErrorFrame("No image found in context") - return - - logger.debug( - f"Analyzing image (bytes length: {len(image_bytes) if image_bytes else 'None'})" - ) - - def get_image_description(bytes: bytes, text: Optional[str]) -> str: - image_buffer = BytesIO(bytes) - image = Image.open(image_buffer) + def get_image_description(image_bytes: bytes, text: Optional[str]) -> str: + image = Image.frombytes(frame.format, frame.size, image_bytes) image_embeds = self._model.encode_image(image) description = self._model.query(image_embeds, text)["answer"] return description - description = await asyncio.to_thread(get_image_description, image_bytes, text) + description = await asyncio.to_thread(get_image_description, frame.image, frame.text) yield TextFrame(text=description) diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index 1cf2d5194..1447774e1 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -49,6 +49,33 @@ END_TOKEN = "" FINALIZED_TOKEN = "" +class SonioxContextGeneralItem(BaseModel): + """Represents a key-value pair for structured general context information.""" + + key: str + value: str + + +class SonioxContextTranslationTerm(BaseModel): + """Represents a custom translation mapping for ambiguous or domain-specific terms.""" + + source: str + target: str + + +class SonioxContextObject(BaseModel): + """Context object for models with context_version 2, for Soniox stt-rt-v3-preview and higher. + + Learn more about context in the documentation: + https://soniox.com/docs/stt/concepts/context + """ + + general: Optional[List[SonioxContextGeneralItem]] = None + text: Optional[str] = None + terms: Optional[List[str]] = None + translation_terms: Optional[List[SonioxContextTranslationTerm]] = None + + class SonioxInputParams(BaseModel): """Real-time transcription settings. @@ -60,9 +87,9 @@ class SonioxInputParams(BaseModel): audio_format: Audio format to use for transcription. num_channels: Number of channels to use for transcription. language_hints: List of language hints to use for transcription. - context: Customization for transcription. - enable_non_final_tokens: Whether to enable non-final tokens. If false, only final tokens will be returned. - max_non_final_tokens_duration_ms: Maximum duration of non-final tokens. + context: Customization for transcription. String for models with context_version 1 and ContextObject for models with context_version 2. + enable_speaker_diarization: Whether to enable speaker diarization. Tokens are annotated with speaker IDs. + enable_language_identification: Whether to enable language identification. Tokens are annotated with language IDs. client_reference_id: Client reference ID to use for transcription. """ @@ -72,10 +99,10 @@ class SonioxInputParams(BaseModel): num_channels: Optional[int] = 1 language_hints: Optional[List[Language]] = None - context: Optional[str] = None + context: Optional[SonioxContextObject | str] = None - enable_non_final_tokens: Optional[bool] = True - max_non_final_tokens_duration_ms: Optional[int] = None + enable_speaker_diarization: Optional[bool] = False + enable_language_identification: Optional[bool] = False client_reference_id: Optional[str] = None @@ -173,6 +200,10 @@ class SonioxSTTService(STTService): # Either one or the other is required. enable_endpoint_detection = not self._vad_force_turn_endpoint + context = self._params.context + if isinstance(context, SonioxContextObject): + context = context.model_dump() + # Send the initial configuration message. config = { "api_key": self._api_key, @@ -182,9 +213,9 @@ class SonioxSTTService(STTService): "enable_endpoint_detection": enable_endpoint_detection, "sample_rate": self.sample_rate, "language_hints": _prepare_language_hints(self._params.language_hints), - "context": self._params.context, - "enable_non_final_tokens": self._params.enable_non_final_tokens, - "max_non_final_tokens_duration_ms": self._params.max_non_final_tokens_duration_ms, + "context": context, + "enable_speaker_diarization": self._params.enable_speaker_diarization, + "enable_language_identification": self._params.enable_language_identification, "client_reference_id": self._params.client_reference_id, } diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py new file mode 100644 index 000000000..23d10c5e1 --- /dev/null +++ b/src/pipecat/services/speechmatics/tts.py @@ -0,0 +1,189 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Speechmatics TTS service integration.""" + +from typing import AsyncGenerator, Optional +from urllib.parse import urlencode + +import aiohttp +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.tts_service import TTSService +from pipecat.utils.tracing.service_decorators import traced_tts + +try: + from speechmatics.rt import __version__ +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`." + ) + raise Exception(f"Missing module: {e}") + + +class SpeechmaticsTTSService(TTSService): + """Speechmatics TTS service implementation. + + This service provides text-to-speech synthesis using the Speechmatics HTTP API. + It converts text to speech and returns raw PCM audio data for real-time playback. + """ + + SPEECHMATICS_SAMPLE_RATE = 16000 + + class InputParams(BaseModel): + """Optional input parameters for Speechmatics TTS configuration.""" + + pass + + def __init__( + self, + *, + api_key: str, + base_url: str = "https://preview.tts.speechmatics.com", + voice_id: str = "sarah", + aiohttp_session: aiohttp.ClientSession, + sample_rate: Optional[int] = SPEECHMATICS_SAMPLE_RATE, + params: Optional[InputParams] = None, + **kwargs, + ): + """Initialize the Speechmatics TTS service. + + Args: + api_key: Speechmatics API key for authentication. + base_url: Base URL for Speechmatics TTS API. + voice_id: Voice model to use for synthesis. + aiohttp_session: Shared aiohttp session for HTTP requests. + sample_rate: Audio sample rate in Hz. + params: Optional[InputParams]: Input parameters for the service. + **kwargs: Additional arguments passed to TTSService. + """ + if sample_rate and sample_rate != self.SPEECHMATICS_SAMPLE_RATE: + logger.warning( + f"Speechmatics TTS only supports {self.SPEECHMATICS_SAMPLE_RATE}Hz sample rate. " + f"Current rate of {sample_rate}Hz may cause issues." + ) + super().__init__(sample_rate=sample_rate, **kwargs) + + # Service parameters + self._api_key: str = api_key + self._base_url: str = base_url + self._session = aiohttp_session + + # Check we have required attributes + if not self._api_key: + raise ValueError("Missing Speechmatics API key") + + # Default parameters + self._params = params or SpeechmaticsTTSService.InputParams() + + # Set voice from constructor parameter + self.set_voice(voice_id) + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Speechmatics service supports metrics generation. + """ + return True + + @traced_tts + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Speechmatics' HTTP API. + + Args: + text: The text to synthesize into speech. + + Yields: + Frame: Audio frames containing the synthesized speech. + """ + logger.debug(f"{self}: Generating TTS [{text}]") + + headers = { + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + } + + payload = { + "text": text, + } + + url = _get_endpoint_url(self._base_url, self._voice_id, self.sample_rate) + + try: + await self.start_ttfb_metrics() + + async with self._session.post(url, json=payload, headers=headers) as response: + if response.status != 200: + error_message = f"Speechmatics TTS error: HTTP {response.status}" + logger.error(error_message) + yield ErrorFrame(error=error_message) + return + + await self.start_tts_usage_metrics(text) + + yield TTSStartedFrame() + + # Process the response in streaming chunks + first_chunk = True + buffer = b"" + + async for chunk in response.content.iter_any(): + if not chunk: + continue + if first_chunk: + await self.stop_ttfb_metrics() + first_chunk = False + + buffer += chunk + + # Emit all complete 2-byte int16 samples from buffer + if len(buffer) >= 2: + complete_samples = len(buffer) // 2 + complete_bytes = complete_samples * 2 + + audio_data = buffer[:complete_bytes] + buffer = buffer[complete_bytes:] # Keep remaining bytes for next iteration + + yield TTSAudioRawFrame( + audio=audio_data, + sample_rate=self.sample_rate, + num_channels=1, + ) + + except Exception as e: + logger.exception(f"Error generating TTS: {e}") + yield ErrorFrame(error=f"Speechmatics TTS error: {str(e)}") + finally: + yield TTSStoppedFrame() + + +def _get_endpoint_url(base_url: str, voice: str, sample_rate: int) -> str: + """Format the TTS endpoint URL with voice, output format, and version params. + + Args: + base_url: The base URL for the TTS endpoint. + voice: The voice model to use. + sample_rate: The audio sample rate. + + Returns: + str: The formatted TTS endpoint URL. + """ + query_params = {} + query_params["output_format"] = f"pcm_{sample_rate}" + query_params["sm-app"] = f"pipecat/{__version__}" + query = urlencode(query_params) + + return f"{base_url}/generate/{voice}?{query}" diff --git a/src/pipecat/services/vision_service.py b/src/pipecat/services/vision_service.py index 0eeee98cd..5de282896 100644 --- a/src/pipecat/services/vision_service.py +++ b/src/pipecat/services/vision_service.py @@ -14,8 +14,7 @@ visual content. from abc import abstractmethod from typing import AsyncGenerator -from pipecat.frames.frames import Frame, LLMContextFrame -from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.frames.frames import Frame, UserImageRawFrame from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService @@ -38,15 +37,15 @@ class VisionService(AIService): self._describe_text = None @abstractmethod - async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]: - """Process the latest image in the context and generate results. + async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]: + """Process the given vision image and generate results. This method must be implemented by subclasses to provide actual computer vision functionality such as image description, object detection, or visual question answering. Args: - context: The context to process, containing image data. + frame: The image frame to process. Yields: Frame: Frames containing the vision analysis results, typically TextFrame @@ -57,7 +56,7 @@ class VisionService(AIService): async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames, handling vision image frames for analysis. - Automatically processes VisionImageRawFrame objects by calling run_vision + Automatically processes UserImageRawFrame objects by calling run_vision and handles metrics tracking. Other frames are passed through unchanged. Args: @@ -66,9 +65,9 @@ class VisionService(AIService): """ await super().process_frame(frame, direction) - if isinstance(frame, LLMContextFrame): + if isinstance(frame, UserImageRawFrame) and frame.text: await self.start_processing_metrics() - await self.process_generator(self.run_vision(frame.context)) + await self.process_generator(self.run_vision(frame)) await self.stop_processing_metrics() else: await self.push_frame(frame, direction) diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 18c36ee56..c6dd06986 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -1839,10 +1839,11 @@ class DailyInputTransport(BaseInputTransport): if render_frame: frame = UserImageRawFrame( user_id=participant_id, - request=request_frame, image=video_frame.buffer, size=(video_frame.width, video_frame.height), format=video_frame.color_format, + text=request_frame.text if request_frame else None, + add_to_context=request_frame.add_to_context if request_frame else None, ) frame.transport_source = video_source await self.push_video_frame(frame) diff --git a/src/pipecat/transports/smallwebrtc/transport.py b/src/pipecat/transports/smallwebrtc/transport.py index cdefa976f..4bd18c9ef 100644 --- a/src/pipecat/transports/smallwebrtc/transport.py +++ b/src/pipecat/transports/smallwebrtc/transport.py @@ -15,7 +15,7 @@ import asyncio import fractions import time from collections import deque -from typing import Any, Awaitable, Callable, Optional +from typing import Any, Awaitable, Callable, List, Optional import numpy as np from loguru import logger @@ -567,7 +567,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): self._receive_audio_task = None self._receive_video_task = None self._receive_screen_video_task = None - self._image_requests = {} + self._image_requests: List[UserImageRequestFrame] = [] # Whether we have seen a StartFrame already. self._initialized = False @@ -657,23 +657,27 @@ class SmallWebRTCInputTransport(BaseInputTransport): if video_frame: await self.push_video_frame(video_frame) - # Check if there are any pending image requests and create UserImageRawFrame - if self._image_requests: - for req_id, request_frame in list(self._image_requests.items()): - if request_frame.video_source == video_source: - # Create UserImageRawFrame using the current video frame - image_frame = UserImageRawFrame( - user_id=request_frame.user_id, - request=request_frame, - image=video_frame.image, - size=video_frame.size, - format=video_frame.format, - ) - image_frame.transport_source = video_source - # Push the frame to the pipeline - await self.push_video_frame(image_frame) - # Remove from pending requests - del self._image_requests[req_id] + # Check if there are any pending image requests and create + # UserImageRawFrame. Use a shallow copy so we can remove + # elements. + for request_frame in self._image_requests[:]: + if request_frame.video_source == video_source: + # Create UserImageRawFrame using the current video frame + image_frame = UserImageRawFrame( + user_id=request_frame.user_id, + image=video_frame.image, + size=video_frame.size, + format=video_frame.format, + text=request_frame.text if request_frame else None, + add_to_context=request_frame.add_to_context + if request_frame + else None, + ) + image_frame.transport_source = video_source + # Push the frame to the pipeline + await self.push_video_frame(image_frame) + # Remove from pending requests + self._image_requests.remove(request_frame) except Exception as e: logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})") @@ -701,8 +705,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): logger.debug(f"Requesting image from participant: {frame.user_id}") # Store the request - request_id = f"{frame.function_name}:{frame.tool_call_id}" - self._image_requests[request_id] = frame + self._image_requests.append(frame) # Default to camera if no source specified if frame.video_source is None: