This commit is contained in:
Chad Bailey
2025-01-30 23:13:07 +00:00
parent bb9a2560c3
commit 74b85a450f
3 changed files with 77 additions and 4 deletions

View File

@@ -12,19 +12,21 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from processors import StoryImageProcessor, StoryProcessor
from processors import StoryImageFrame, StoryImageProcessor, StoryPageFrame, StoryProcessor
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
from utils.helpers import load_images, load_sounds
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.frames.frames import EndFrame, Frame, ImageRawFrame, TextFrame
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.google import GoogleLLMService
from pipecat.services.google import GoogleImageGenService, GoogleLLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
@@ -40,6 +42,50 @@ sounds = load_sounds(["listening.wav"])
images = load_images(["book1.png", "book2.png"])
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000
# Create direction arrow
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
if isinstance(frame, ImageRawFrame):
logger.info(
f"⚡ RAW IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, StoryPageFrame):
logger.info(
f"⚡ STORY PAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, StoryImageFrame):
logger.info(
f"⚡ STORY IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TextFrame):
logger.info(
f"⚡ TEXT FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
async def main(room_url, token=None):
async with aiohttp.ClientSession() as session:
# -------------- Transport --------------- #
@@ -109,6 +155,7 @@ async def main(room_url, token=None):
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[DebugObserver()],
),
)

View File

@@ -75,6 +75,7 @@ class StoryImageProcessor(FrameProcessor):
if isinstance(frame, StoryPageFrame):
# Special syntax for the first page
print(f"!!! generating image for story page frame # {frame.metadata['story_page_id']}")
if self.pages == []:
prompt = FIRST_IMAGE_PROMPT % frame.text
else:
@@ -98,10 +99,14 @@ class StoryImageProcessor(FrameProcessor):
async for i in self._image_gen_service.run_image_gen(
IMAGE_GEN_PROMPT % image_description
):
print(
f"@@@ about to push a storyimageframe, input metadata is {self._input_frame_metadata}"
)
await self.push_frame(i)
except TimeoutError:
logger.debug("Image gen timeout")
pass
print(f"### past image gen try block, source frame is {frame.name}")
await self.stop_ttfb_metrics()
# Push the StoryPageFrame so it gets TTS
await self.push_frame(frame)
@@ -128,6 +133,7 @@ class StoryProcessor(FrameProcessor):
self._messages = messages
self._text = ""
self._story = story
self._current_page = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -187,7 +193,10 @@ class StoryProcessor(FrameProcessor):
if len(before_break) > 2:
self._story.append(before_break)
await self.push_frame(StoryPageFrame(before_break))
spf = StoryPageFrame(before_break)
spf.metadata["story_page_id"] = self._current_page
self._current_page += 1
await self.push_frame(spf)
# await self.push_frame(sounds["ding"])
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))

View File

@@ -84,6 +84,9 @@ class FrameProcessor:
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
# This enables an input frame's metadata to be copied to output frames
self._input_frame_metadata = {}
@property
def id(self) -> int:
return self._id
@@ -224,6 +227,12 @@ class FrameProcessor:
self.__input_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
# System frames skip the queue and blow up determinism
if not isinstance(frame, SystemFrame):
self._input_frame_metadata = frame.metadata
# print(
# f"!!! PROCESS: I am {self._name}, input frame is a {frame.name}, metadata is {self._input_frame_metadata}"
# )
if isinstance(frame, StartFrame):
self._clock = frame.clock
self._task_manager = frame.task_manager
@@ -248,6 +257,14 @@ class FrameProcessor:
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
# print(
# f"!!! PUSH: I am {self._name}, input frame is a {frame.name}, combining input frame metadata: {self._input_frame_metadata} with frame metadata: {frame.metadata}"
# )
new_metadata = self._input_frame_metadata | frame.metadata
frame.metadata = new_metadata
# print(
# f"!!! PUSH2: I am {self._name}, input frame is a {frame.name}, frame metadata is now {frame.metadata}"
# )
await self.__push_queue.put((frame, direction))
def event_handler(self, event_name: str):