From c4cdb2d809c7b0c679e70957a0bd3def3dc8090e Mon Sep 17 00:00:00 2001 From: Nikita Gamolsky Date: Sat, 2 Nov 2024 13:37:35 -0700 Subject: [PATCH] update to use context global --- .../foundational/99-anthropic-hackathon.py | 62 +++++++++++++++---- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/examples/foundational/99-anthropic-hackathon.py b/examples/foundational/99-anthropic-hackathon.py index 6022dbaf7..ab542a568 100644 --- a/examples/foundational/99-anthropic-hackathon.py +++ b/examples/foundational/99-anthropic-hackathon.py @@ -5,12 +5,16 @@ # import asyncio +import base64 +import io import os import sys +from collections import deque import aiohttp from dotenv import load_dotenv from loguru import logger +from PIL import Image from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer @@ -29,19 +33,22 @@ load_dotenv(override=True) logger.remove(0) logger.add(sys.stderr, level="DEBUG") +MAX_FRAMES = 5 # Constant to control number of frames to keep video_participant_id = None -most_recent_image_frame = None +anthropic_context = None + +recent_image_frames = deque(maxlen=MAX_FRAMES) class ImageFrameCatcher(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - global most_recent_image_frame + global recent_image_frames await super().process_frame(frame, direction) if isinstance(frame, ImageRawFrame): # logger.debug(f"ImageLogger: {frame}") - most_recent_image_frame = frame + recent_image_frames.append(frame) else: await self.push_frame(frame, direction) @@ -134,23 +141,54 @@ Your response will be turned into speech so use only simple words and punctuatio @transport.event_handler("on_app_message") async def on_app_message(transport, message, sender): - c = AnthropicLLMContext.upgrade_to_anthropic(context) + global anthropic_context + anthropic_context = AnthropicLLMContext.upgrade_to_anthropic(context) logger.debug(f"Received app message: {message} - {context}") - frame = most_recent_image_frame - if not frame: - logger.debug("No image frame to send") + if not recent_image_frames: + logger.debug("No image frames to send") return - c.add_image_frame_message( - format=frame.format, - size=frame.size, - image=frame.image, - text=message["message"], + + add_message_with_images( + anthropic_context, message["message"], frames=list(recent_image_frames) ) + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() await runner.run(task) + def add_message_with_images(c, message, frames=None): + if frames is None: + frames = list(recent_image_frames) + + if not frames: + logger.debug("No image frames to send") + return + + # Create content list starting with all images + content = [] + for frame in frames: + buffer = io.BytesIO() + Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG") + encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8") + + content.append( + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/jpeg", + "data": encoded_image, + }, + } + ) + + # Add text message at the end if provided + if message: + content.append({"type": "text", "text": message}) + + c.add_message({"role": "user", "content": content}) + if __name__ == "__main__": asyncio.run(main())