Compare commits

...

2 Commits

Author SHA1 Message Date
Chad Bailey
0369733f9c explicit MetadataFrame 2025-01-31 19:27:01 +00:00
Chad Bailey
74b85a450f wip 2025-01-31 19:27:01 +00:00
4 changed files with 120 additions and 4 deletions

View File

@@ -12,19 +12,33 @@ import sys
import aiohttp import aiohttp
from dotenv import load_dotenv from dotenv import load_dotenv
from loguru import logger from loguru import logger
from processors import StoryImageProcessor, StoryProcessor from processors import StoryImageFrame, StoryImageProcessor, StoryPageFrame, StoryProcessor
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
from utils.helpers import load_images, load_sounds from utils.helpers import load_images, load_sounds
from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame from pipecat.frames.frames import (
AudioRawFrame,
EndFrame,
Frame,
ImageRawFrame,
MetadataFrame,
SystemFrame,
TextFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService from pipecat.services.fal import FalImageGenService
from pipecat.services.google import GoogleLLMService from pipecat.services.google import GoogleImageGenService, GoogleLLMService
from pipecat.transports.services.daily import ( from pipecat.transports.services.daily import (
DailyParams, DailyParams,
DailyTransport, DailyTransport,
@@ -40,6 +54,62 @@ sounds = load_sounds(["listening.wav"])
images = load_images(["book1.png", "book2.png"]) images = load_images(["book1.png", "book2.png"])
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000
# Create direction arrow
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
if isinstance(frame, ImageRawFrame):
logger.info(
f"⚡ RAW IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, StoryPageFrame):
logger.info(
f"⚡ STORY PAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, StoryImageFrame):
logger.info(
f"⚡ STORY IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TextFrame):
logger.info(
f"⚡ TEXT FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TTSStartedFrame):
logger.info(
f"⚡ TTS STARTED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TTSStoppedFrame):
logger.info(
f"⚡ TTS STOPPED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, MetadataFrame):
logger.info(
f"⚡ METADATA FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
async def main(room_url, token=None): async def main(room_url, token=None):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
# -------------- Transport --------------- # # -------------- Transport --------------- #
@@ -90,13 +160,18 @@ async def main(room_url, token=None):
runner = PipelineRunner() runner = PipelineRunner()
logger.debug("Waiting for participant...") logger.debug("Waiting for participant...")
after = FrameLogger("After", "red", ignored_frame_types=[SystemFrame, AudioRawFrame])
before = FrameLogger("Before", "cyan", ignored_frame_types=[SystemFrame, AudioRawFrame])
main_pipeline = Pipeline( main_pipeline = Pipeline(
[ [
transport.input(), transport.input(),
context_aggregator.user(), context_aggregator.user(),
llm_service, llm_service,
story_processor, story_processor,
# SyncParallelPipeline([image_processor], [tts_service]),
before,
image_processor, image_processor,
after,
tts_service, tts_service,
transport.output(), transport.output(),
context_aggregator.assistant(), context_aggregator.assistant(),
@@ -109,6 +184,7 @@ async def main(room_url, token=None):
allow_interruptions=True, allow_interruptions=True,
enable_metrics=True, enable_metrics=True,
enable_usage_metrics=True, enable_usage_metrics=True,
observers=[DebugObserver()],
), ),
) )

View File

@@ -17,6 +17,7 @@ from pipecat.frames.frames import (
Frame, Frame,
LLMFullResponseEndFrame, LLMFullResponseEndFrame,
LLMMessagesFrame, LLMMessagesFrame,
MetadataFrame,
TextFrame, TextFrame,
UserStoppedSpeakingFrame, UserStoppedSpeakingFrame,
) )
@@ -75,6 +76,7 @@ class StoryImageProcessor(FrameProcessor):
if isinstance(frame, StoryPageFrame): if isinstance(frame, StoryPageFrame):
# Special syntax for the first page # Special syntax for the first page
print(f"!!! generating image for story page frame # {frame.metadata['story_page_id']}")
if self.pages == []: if self.pages == []:
prompt = FIRST_IMAGE_PROMPT % frame.text prompt = FIRST_IMAGE_PROMPT % frame.text
else: else:
@@ -98,10 +100,14 @@ class StoryImageProcessor(FrameProcessor):
async for i in self._image_gen_service.run_image_gen( async for i in self._image_gen_service.run_image_gen(
IMAGE_GEN_PROMPT % image_description IMAGE_GEN_PROMPT % image_description
): ):
print(
f"@@@ about to push a storyimageframe, input metadata is {self._input_frame_metadata}"
)
await self.push_frame(i) await self.push_frame(i)
except TimeoutError: except TimeoutError:
logger.debug("Image gen timeout") logger.debug("Image gen timeout")
pass pass
print(f"### past image gen try block, source frame is {frame.name}")
await self.stop_ttfb_metrics() await self.stop_ttfb_metrics()
# Push the StoryPageFrame so it gets TTS # Push the StoryPageFrame so it gets TTS
await self.push_frame(frame) await self.push_frame(frame)
@@ -128,6 +134,7 @@ class StoryProcessor(FrameProcessor):
self._messages = messages self._messages = messages
self._text = "" self._text = ""
self._story = story self._story = story
self._current_page = 0
async def process_frame(self, frame: Frame, direction: FrameDirection): async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction) await super().process_frame(frame, direction)
@@ -148,6 +155,7 @@ class StoryProcessor(FrameProcessor):
# Driven by the prompt, the LLM should have asked the user for input # Driven by the prompt, the LLM should have asked the user for input
elif isinstance(frame, LLMFullResponseEndFrame): elif isinstance(frame, LLMFullResponseEndFrame):
# We use a different frame type, as to avoid image generation ingest # We use a different frame type, as to avoid image generation ingest
await self.push_frame(MetadataFrame())
await self.push_frame(StoryPromptFrame(self._text)) await self.push_frame(StoryPromptFrame(self._text))
self._text = "" self._text = ""
await self.push_frame(frame) await self.push_frame(frame)
@@ -187,7 +195,14 @@ class StoryProcessor(FrameProcessor):
if len(before_break) > 2: if len(before_break) > 2:
self._story.append(before_break) self._story.append(before_break)
await self.push_frame(StoryPageFrame(before_break)) mf = MetadataFrame()
mf.metadata = {"story_page_id": self._current_page}
await self.push_frame(mf)
spf = StoryPageFrame(before_break)
spf.metadata["story_page_id"] = self._current_page
self._current_page += 1
await self.push_frame(spf)
# await self.push_frame(sounds["ding"]) # await self.push_frame(sounds["ding"])
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))

View File

@@ -91,6 +91,13 @@ class ControlFrame(Frame):
pass pass
@dataclass
class MetadataFrame(ControlFrame):
"""Used to set default metadata for downstream processors to apply to newly
created frames such as a frame_group_id.
"""
# #
# Mixins # Mixins
# #

View File

@@ -16,6 +16,7 @@ from pipecat.frames.frames import (
CancelFrame, CancelFrame,
ErrorFrame, ErrorFrame,
Frame, Frame,
MetadataFrame,
StartFrame, StartFrame,
StartInterruptionFrame, StartInterruptionFrame,
StopInterruptionFrame, StopInterruptionFrame,
@@ -84,6 +85,9 @@ class FrameProcessor:
# exception to this rule. This create this task. # exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None self.__push_frame_task: Optional[asyncio.Task] = None
# This enables an input frame's metadata to be copied to output frames
self._input_frame_metadata = {}
@property @property
def id(self) -> int: def id(self) -> int:
return self._id return self._id
@@ -224,6 +228,12 @@ class FrameProcessor:
self.__input_event.set() self.__input_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection): async def process_frame(self, frame: Frame, direction: FrameDirection):
# System frames skip the queue and blow up determinism
if isinstance(frame, MetadataFrame):
self._input_frame_metadata = frame.metadata
# print(
# f"!!! PROCESS: I am {self._name}, input frame is a {frame.name}, metadata is {self._input_frame_metadata}"
# )
if isinstance(frame, StartFrame): if isinstance(frame, StartFrame):
self._clock = frame.clock self._clock = frame.clock
self._task_manager = frame.task_manager self._task_manager = frame.task_manager
@@ -248,6 +258,14 @@ class FrameProcessor:
if isinstance(frame, SystemFrame): if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction) await self.__internal_push_frame(frame, direction)
else: else:
# print(
# f"!!! PUSH: I am {self._name}, input frame is a {frame.name}, combining input frame metadata: {self._input_frame_metadata} with frame metadata: {frame.metadata}"
# )
new_metadata = self._input_frame_metadata | frame.metadata
frame.metadata = new_metadata
# print(
# f"!!! PUSH2: I am {self._name}, input frame is a {frame.name}, frame metadata is now {frame.metadata}"
# )
await self.__push_queue.put((frame, direction)) await self.__push_queue.put((frame, direction))
def event_handler(self, event_name: str): def event_handler(self, event_name: str):