From 9f45ad4d2e2ec9713a95c9d7c39b2bba5dd258cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 18 Nov 2025 08:38:45 -0800 Subject: [PATCH] LLMContext: create_image_message/create_audio_message are now async --- CHANGELOG.md | 5 ++ .../foundational/12-describe-image-openai.py | 2 +- .../12a-describe-image-anthropic.py | 2 +- .../foundational/12b-describe-image-aws.py | 2 +- .../12c-describe-image-gemini-flash.py | 2 +- .../processors/aggregators/llm_context.py | 46 ++++++++++++------- 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de1f520a7..82da28b26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- ⚠️ Breaking change: `LLMContext.create_image_message()` and + `LLMContext.create_audio_message()` are now async methods. This fixes and + issue where the asyncio event loop would be blocked while encoding audio or + images. + - `ConsumerProcessor` now queues frames from the producer internally instead of pushing them directly. This allows us to subclass consumer processors and manipulate frames before they are pushed. diff --git a/examples/foundational/12-describe-image-openai.py b/examples/foundational/12-describe-image-openai.py index 8c72075e8..477803da6 100644 --- a/examples/foundational/12-describe-image-openai.py +++ b/examples/foundational/12-describe-image-openai.py @@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Kick off the conversation. image = Image.open(image_path) - message = LLMContext.create_image_message( + message = await LLMContext.create_image_message( image=image.tobytes(), format="RGB", size=image.size, diff --git a/examples/foundational/12a-describe-image-anthropic.py b/examples/foundational/12a-describe-image-anthropic.py index 6a9891712..ac4e8f01c 100644 --- a/examples/foundational/12a-describe-image-anthropic.py +++ b/examples/foundational/12a-describe-image-anthropic.py @@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Kick off the conversation. image = Image.open(image_path) - message = LLMContext.create_image_message( + message = await LLMContext.create_image_message( image=image.tobytes(), format="RGB", size=image.size, diff --git a/examples/foundational/12b-describe-image-aws.py b/examples/foundational/12b-describe-image-aws.py index 441c49cfd..cf1ce66a0 100644 --- a/examples/foundational/12b-describe-image-aws.py +++ b/examples/foundational/12b-describe-image-aws.py @@ -117,7 +117,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Kick off the conversation. image = Image.open(image_path) - message = LLMContext.create_image_message( + message = await LLMContext.create_image_message( image=image.tobytes(), format="RGB", size=image.size, diff --git a/examples/foundational/12c-describe-image-gemini-flash.py b/examples/foundational/12c-describe-image-gemini-flash.py index 919bf3553..bfd7f5146 100644 --- a/examples/foundational/12c-describe-image-gemini-flash.py +++ b/examples/foundational/12c-describe-image-gemini-flash.py @@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Kick off the conversation. image = Image.open(image_path) - message = LLMContext.create_image_message( + message = await LLMContext.create_image_message( image=image.tobytes(), format="RGB", size=image.size, diff --git a/src/pipecat/processors/aggregators/llm_context.py b/src/pipecat/processors/aggregators/llm_context.py index d9280f9c0..b9216103a 100644 --- a/src/pipecat/processors/aggregators/llm_context.py +++ b/src/pipecat/processors/aggregators/llm_context.py @@ -14,6 +14,7 @@ translation from this universal context into whatever format it needs, using a service-specific adapter. """ +import asyncio import base64 import io import wave @@ -137,7 +138,7 @@ class LLMContext: return {"role": role, "content": content} @staticmethod - def create_image_message( + async def create_image_message( *, role: str = "user", format: str, @@ -154,15 +155,21 @@ class LLMContext: image: Raw image bytes. text: Optional text to include with the image. """ - buffer = io.BytesIO() - Image.frombytes(format, size, image).save(buffer, format="JPEG") - encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8") + + def encode_image(): + buffer = io.BytesIO() + Image.frombytes(format, size, image).save(buffer, format="JPEG") + encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8") + return encoded_image + + encoded_image = await asyncio.to_thread(encode_image) + url = f"data:image/jpeg;base64,{encoded_image}" return LLMContext.create_image_url_message(role=role, url=url, text=text) @staticmethod - def create_audio_message( + async def create_audio_message( *, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows" ) -> LLMContextMessage: """Create a context message containing audio. @@ -172,21 +179,26 @@ class LLMContext: audio_frames: List of audio frame objects to include. text: Optional text to include with the audio. """ - sample_rate = audio_frames[0].sample_rate - num_channels = audio_frames[0].num_channels - content = [] - content.append({"type": "text", "text": text}) - data = b"".join(frame.audio for frame in audio_frames) + def encode_audio(): + sample_rate = audio_frames[0].sample_rate + num_channels = audio_frames[0].num_channels - with io.BytesIO() as buffer: - with wave.open(buffer, "wb") as wf: - wf.setsampwidth(2) - wf.setnchannels(num_channels) - wf.setframerate(sample_rate) - wf.writeframes(data) + content = [] + content.append({"type": "text", "text": text}) + data = b"".join(frame.audio for frame in audio_frames) - encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8") + with io.BytesIO() as buffer: + with wave.open(buffer, "wb") as wf: + wf.setsampwidth(2) + wf.setnchannels(num_channels) + wf.setframerate(sample_rate) + wf.writeframes(data) + + encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8") + return encoded_audio + + encoded_audio = asyncio.to_thread(encode_audio) content.append( {