diff --git a/examples/storytelling-chatbot/src/bot.py b/examples/storytelling-chatbot/src/bot.py index 05f8974bb..26853716b 100644 --- a/examples/storytelling-chatbot/src/bot.py +++ b/examples/storytelling-chatbot/src/bot.py @@ -12,19 +12,21 @@ import sys import aiohttp from dotenv import load_dotenv from loguru import logger -from processors import StoryImageProcessor, StoryProcessor +from processors import StoryImageFrame, StoryImageProcessor, StoryPageFrame, StoryProcessor from prompts import CUE_USER_TURN, LLM_BASE_PROMPT from utils.helpers import load_images, load_sounds from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import EndFrame +from pipecat.frames.frames import EndFrame, Frame, ImageRawFrame, TextFrame +from pipecat.observers.base_observer import BaseObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService -from pipecat.services.google import GoogleLLMService +from pipecat.services.google import GoogleImageGenService, GoogleLLMService from pipecat.transports.services.daily import ( DailyParams, DailyTransport, @@ -40,6 +42,50 @@ sounds = load_sounds(["listening.wav"]) images = load_images(["book1.png", "book2.png"]) +class DebugObserver(BaseObserver): + """Observer to log interruptions and bot speaking events to the console. + + Logs all frame instances of: + - StartInterruptionFrame + - BotStartedSpeakingFrame + - BotStoppedSpeakingFrame + + This allows you to see the frame flow from processor to processor through the pipeline for these frames. + Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s + """ + + async def on_push_frame( + self, + src: FrameProcessor, + dst: FrameProcessor, + frame: Frame, + direction: FrameDirection, + timestamp: int, + ): + # Convert timestamp to seconds for readability + time_sec = timestamp / 1_000_000_000 + + # Create direction arrow + arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←" + + if isinstance(frame, ImageRawFrame): + logger.info( + f"⚡ RAW IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}" + ) + elif isinstance(frame, StoryPageFrame): + logger.info( + f"⚡ STORY PAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}" + ) + elif isinstance(frame, StoryImageFrame): + logger.info( + f"⚡ STORY IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}" + ) + elif isinstance(frame, TextFrame): + logger.info( + f"⚡ TEXT FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}" + ) + + async def main(room_url, token=None): async with aiohttp.ClientSession() as session: # -------------- Transport --------------- # @@ -109,6 +155,7 @@ async def main(room_url, token=None): allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True, + observers=[DebugObserver()], ), ) diff --git a/examples/storytelling-chatbot/src/processors.py b/examples/storytelling-chatbot/src/processors.py index 62a8fcee4..7de23b1aa 100644 --- a/examples/storytelling-chatbot/src/processors.py +++ b/examples/storytelling-chatbot/src/processors.py @@ -75,6 +75,7 @@ class StoryImageProcessor(FrameProcessor): if isinstance(frame, StoryPageFrame): # Special syntax for the first page + print(f"!!! generating image for story page frame # {frame.metadata['story_page_id']}") if self.pages == []: prompt = FIRST_IMAGE_PROMPT % frame.text else: @@ -98,10 +99,14 @@ class StoryImageProcessor(FrameProcessor): async for i in self._image_gen_service.run_image_gen( IMAGE_GEN_PROMPT % image_description ): + print( + f"@@@ about to push a storyimageframe, input metadata is {self._input_frame_metadata}" + ) await self.push_frame(i) except TimeoutError: logger.debug("Image gen timeout") pass + print(f"### past image gen try block, source frame is {frame.name}") await self.stop_ttfb_metrics() # Push the StoryPageFrame so it gets TTS await self.push_frame(frame) @@ -128,6 +133,7 @@ class StoryProcessor(FrameProcessor): self._messages = messages self._text = "" self._story = story + self._current_page = 0 async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -187,7 +193,10 @@ class StoryProcessor(FrameProcessor): if len(before_break) > 2: self._story.append(before_break) - await self.push_frame(StoryPageFrame(before_break)) + spf = StoryPageFrame(before_break) + spf.metadata["story_page_id"] = self._current_page + self._current_page += 1 + await self.push_frame(spf) # await self.push_frame(sounds["ding"]) await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 260e53ef2..6fbcff712 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -84,6 +84,9 @@ class FrameProcessor: # exception to this rule. This create this task. self.__push_frame_task: Optional[asyncio.Task] = None + # This enables an input frame's metadata to be copied to output frames + self._input_frame_metadata = {} + @property def id(self) -> int: return self._id @@ -224,6 +227,12 @@ class FrameProcessor: self.__input_event.set() async def process_frame(self, frame: Frame, direction: FrameDirection): + # System frames skip the queue and blow up determinism + if not isinstance(frame, SystemFrame): + self._input_frame_metadata = frame.metadata + # print( + # f"!!! PROCESS: I am {self._name}, input frame is a {frame.name}, metadata is {self._input_frame_metadata}" + # ) if isinstance(frame, StartFrame): self._clock = frame.clock self._task_manager = frame.task_manager @@ -248,6 +257,14 @@ class FrameProcessor: if isinstance(frame, SystemFrame): await self.__internal_push_frame(frame, direction) else: + # print( + # f"!!! PUSH: I am {self._name}, input frame is a {frame.name}, combining input frame metadata: {self._input_frame_metadata} with frame metadata: {frame.metadata}" + # ) + new_metadata = self._input_frame_metadata | frame.metadata + frame.metadata = new_metadata + # print( + # f"!!! PUSH2: I am {self._name}, input frame is a {frame.name}, frame metadata is now {frame.metadata}" + # ) await self.__push_queue.put((frame, direction)) def event_handler(self, event_name: str):