Compare commits
2 Commits
main
...
cb/frame-g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0369733f9c | ||
|
|
74b85a450f |
@@ -12,19 +12,33 @@ import sys
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from processors import StoryImageProcessor, StoryProcessor
|
from processors import StoryImageFrame, StoryImageProcessor, StoryPageFrame, StoryProcessor
|
||||||
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
|
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
|
||||||
from utils.helpers import load_images, load_sounds
|
from utils.helpers import load_images, load_sounds
|
||||||
|
|
||||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||||
from pipecat.frames.frames import EndFrame
|
from pipecat.frames.frames import (
|
||||||
|
AudioRawFrame,
|
||||||
|
EndFrame,
|
||||||
|
Frame,
|
||||||
|
ImageRawFrame,
|
||||||
|
MetadataFrame,
|
||||||
|
SystemFrame,
|
||||||
|
TextFrame,
|
||||||
|
TTSStartedFrame,
|
||||||
|
TTSStoppedFrame,
|
||||||
|
)
|
||||||
|
from pipecat.observers.base_observer import BaseObserver
|
||||||
from pipecat.pipeline.pipeline import Pipeline
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
from pipecat.pipeline.runner import PipelineRunner
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
|
||||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||||
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||||
|
from pipecat.processors.logger import FrameLogger
|
||||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||||
from pipecat.services.fal import FalImageGenService
|
from pipecat.services.fal import FalImageGenService
|
||||||
from pipecat.services.google import GoogleLLMService
|
from pipecat.services.google import GoogleImageGenService, GoogleLLMService
|
||||||
from pipecat.transports.services.daily import (
|
from pipecat.transports.services.daily import (
|
||||||
DailyParams,
|
DailyParams,
|
||||||
DailyTransport,
|
DailyTransport,
|
||||||
@@ -40,6 +54,62 @@ sounds = load_sounds(["listening.wav"])
|
|||||||
images = load_images(["book1.png", "book2.png"])
|
images = load_images(["book1.png", "book2.png"])
|
||||||
|
|
||||||
|
|
||||||
|
class DebugObserver(BaseObserver):
|
||||||
|
"""Observer to log interruptions and bot speaking events to the console.
|
||||||
|
|
||||||
|
Logs all frame instances of:
|
||||||
|
- StartInterruptionFrame
|
||||||
|
- BotStartedSpeakingFrame
|
||||||
|
- BotStoppedSpeakingFrame
|
||||||
|
|
||||||
|
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
|
||||||
|
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def on_push_frame(
|
||||||
|
self,
|
||||||
|
src: FrameProcessor,
|
||||||
|
dst: FrameProcessor,
|
||||||
|
frame: Frame,
|
||||||
|
direction: FrameDirection,
|
||||||
|
timestamp: int,
|
||||||
|
):
|
||||||
|
# Convert timestamp to seconds for readability
|
||||||
|
time_sec = timestamp / 1_000_000_000
|
||||||
|
|
||||||
|
# Create direction arrow
|
||||||
|
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
|
||||||
|
|
||||||
|
if isinstance(frame, ImageRawFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ RAW IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
elif isinstance(frame, StoryPageFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ STORY PAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
elif isinstance(frame, StoryImageFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ STORY IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
elif isinstance(frame, TextFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ TEXT FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
elif isinstance(frame, TTSStartedFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ TTS STARTED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
elif isinstance(frame, TTSStoppedFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ TTS STOPPED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
elif isinstance(frame, MetadataFrame):
|
||||||
|
logger.info(
|
||||||
|
f"⚡ METADATA FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def main(room_url, token=None):
|
async def main(room_url, token=None):
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
# -------------- Transport --------------- #
|
# -------------- Transport --------------- #
|
||||||
@@ -90,13 +160,18 @@ async def main(room_url, token=None):
|
|||||||
runner = PipelineRunner()
|
runner = PipelineRunner()
|
||||||
|
|
||||||
logger.debug("Waiting for participant...")
|
logger.debug("Waiting for participant...")
|
||||||
|
after = FrameLogger("After", "red", ignored_frame_types=[SystemFrame, AudioRawFrame])
|
||||||
|
before = FrameLogger("Before", "cyan", ignored_frame_types=[SystemFrame, AudioRawFrame])
|
||||||
main_pipeline = Pipeline(
|
main_pipeline = Pipeline(
|
||||||
[
|
[
|
||||||
transport.input(),
|
transport.input(),
|
||||||
context_aggregator.user(),
|
context_aggregator.user(),
|
||||||
llm_service,
|
llm_service,
|
||||||
story_processor,
|
story_processor,
|
||||||
|
# SyncParallelPipeline([image_processor], [tts_service]),
|
||||||
|
before,
|
||||||
image_processor,
|
image_processor,
|
||||||
|
after,
|
||||||
tts_service,
|
tts_service,
|
||||||
transport.output(),
|
transport.output(),
|
||||||
context_aggregator.assistant(),
|
context_aggregator.assistant(),
|
||||||
@@ -109,6 +184,7 @@ async def main(room_url, token=None):
|
|||||||
allow_interruptions=True,
|
allow_interruptions=True,
|
||||||
enable_metrics=True,
|
enable_metrics=True,
|
||||||
enable_usage_metrics=True,
|
enable_usage_metrics=True,
|
||||||
|
observers=[DebugObserver()],
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from pipecat.frames.frames import (
|
|||||||
Frame,
|
Frame,
|
||||||
LLMFullResponseEndFrame,
|
LLMFullResponseEndFrame,
|
||||||
LLMMessagesFrame,
|
LLMMessagesFrame,
|
||||||
|
MetadataFrame,
|
||||||
TextFrame,
|
TextFrame,
|
||||||
UserStoppedSpeakingFrame,
|
UserStoppedSpeakingFrame,
|
||||||
)
|
)
|
||||||
@@ -75,6 +76,7 @@ class StoryImageProcessor(FrameProcessor):
|
|||||||
|
|
||||||
if isinstance(frame, StoryPageFrame):
|
if isinstance(frame, StoryPageFrame):
|
||||||
# Special syntax for the first page
|
# Special syntax for the first page
|
||||||
|
print(f"!!! generating image for story page frame # {frame.metadata['story_page_id']}")
|
||||||
if self.pages == []:
|
if self.pages == []:
|
||||||
prompt = FIRST_IMAGE_PROMPT % frame.text
|
prompt = FIRST_IMAGE_PROMPT % frame.text
|
||||||
else:
|
else:
|
||||||
@@ -98,10 +100,14 @@ class StoryImageProcessor(FrameProcessor):
|
|||||||
async for i in self._image_gen_service.run_image_gen(
|
async for i in self._image_gen_service.run_image_gen(
|
||||||
IMAGE_GEN_PROMPT % image_description
|
IMAGE_GEN_PROMPT % image_description
|
||||||
):
|
):
|
||||||
|
print(
|
||||||
|
f"@@@ about to push a storyimageframe, input metadata is {self._input_frame_metadata}"
|
||||||
|
)
|
||||||
await self.push_frame(i)
|
await self.push_frame(i)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
logger.debug("Image gen timeout")
|
logger.debug("Image gen timeout")
|
||||||
pass
|
pass
|
||||||
|
print(f"### past image gen try block, source frame is {frame.name}")
|
||||||
await self.stop_ttfb_metrics()
|
await self.stop_ttfb_metrics()
|
||||||
# Push the StoryPageFrame so it gets TTS
|
# Push the StoryPageFrame so it gets TTS
|
||||||
await self.push_frame(frame)
|
await self.push_frame(frame)
|
||||||
@@ -128,6 +134,7 @@ class StoryProcessor(FrameProcessor):
|
|||||||
self._messages = messages
|
self._messages = messages
|
||||||
self._text = ""
|
self._text = ""
|
||||||
self._story = story
|
self._story = story
|
||||||
|
self._current_page = 0
|
||||||
|
|
||||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
await super().process_frame(frame, direction)
|
await super().process_frame(frame, direction)
|
||||||
@@ -148,6 +155,7 @@ class StoryProcessor(FrameProcessor):
|
|||||||
# Driven by the prompt, the LLM should have asked the user for input
|
# Driven by the prompt, the LLM should have asked the user for input
|
||||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||||
# We use a different frame type, as to avoid image generation ingest
|
# We use a different frame type, as to avoid image generation ingest
|
||||||
|
await self.push_frame(MetadataFrame())
|
||||||
await self.push_frame(StoryPromptFrame(self._text))
|
await self.push_frame(StoryPromptFrame(self._text))
|
||||||
self._text = ""
|
self._text = ""
|
||||||
await self.push_frame(frame)
|
await self.push_frame(frame)
|
||||||
@@ -187,7 +195,14 @@ class StoryProcessor(FrameProcessor):
|
|||||||
|
|
||||||
if len(before_break) > 2:
|
if len(before_break) > 2:
|
||||||
self._story.append(before_break)
|
self._story.append(before_break)
|
||||||
await self.push_frame(StoryPageFrame(before_break))
|
mf = MetadataFrame()
|
||||||
|
mf.metadata = {"story_page_id": self._current_page}
|
||||||
|
await self.push_frame(mf)
|
||||||
|
|
||||||
|
spf = StoryPageFrame(before_break)
|
||||||
|
spf.metadata["story_page_id"] = self._current_page
|
||||||
|
self._current_page += 1
|
||||||
|
await self.push_frame(spf)
|
||||||
# await self.push_frame(sounds["ding"])
|
# await self.push_frame(sounds["ding"])
|
||||||
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
|
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
|
||||||
|
|
||||||
|
|||||||
@@ -91,6 +91,13 @@ class ControlFrame(Frame):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MetadataFrame(ControlFrame):
|
||||||
|
"""Used to set default metadata for downstream processors to apply to newly
|
||||||
|
created frames such as a frame_group_id.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Mixins
|
# Mixins
|
||||||
#
|
#
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from pipecat.frames.frames import (
|
|||||||
CancelFrame,
|
CancelFrame,
|
||||||
ErrorFrame,
|
ErrorFrame,
|
||||||
Frame,
|
Frame,
|
||||||
|
MetadataFrame,
|
||||||
StartFrame,
|
StartFrame,
|
||||||
StartInterruptionFrame,
|
StartInterruptionFrame,
|
||||||
StopInterruptionFrame,
|
StopInterruptionFrame,
|
||||||
@@ -84,6 +85,9 @@ class FrameProcessor:
|
|||||||
# exception to this rule. This create this task.
|
# exception to this rule. This create this task.
|
||||||
self.__push_frame_task: Optional[asyncio.Task] = None
|
self.__push_frame_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
# This enables an input frame's metadata to be copied to output frames
|
||||||
|
self._input_frame_metadata = {}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def id(self) -> int:
|
def id(self) -> int:
|
||||||
return self._id
|
return self._id
|
||||||
@@ -224,6 +228,12 @@ class FrameProcessor:
|
|||||||
self.__input_event.set()
|
self.__input_event.set()
|
||||||
|
|
||||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
# System frames skip the queue and blow up determinism
|
||||||
|
if isinstance(frame, MetadataFrame):
|
||||||
|
self._input_frame_metadata = frame.metadata
|
||||||
|
# print(
|
||||||
|
# f"!!! PROCESS: I am {self._name}, input frame is a {frame.name}, metadata is {self._input_frame_metadata}"
|
||||||
|
# )
|
||||||
if isinstance(frame, StartFrame):
|
if isinstance(frame, StartFrame):
|
||||||
self._clock = frame.clock
|
self._clock = frame.clock
|
||||||
self._task_manager = frame.task_manager
|
self._task_manager = frame.task_manager
|
||||||
@@ -248,6 +258,14 @@ class FrameProcessor:
|
|||||||
if isinstance(frame, SystemFrame):
|
if isinstance(frame, SystemFrame):
|
||||||
await self.__internal_push_frame(frame, direction)
|
await self.__internal_push_frame(frame, direction)
|
||||||
else:
|
else:
|
||||||
|
# print(
|
||||||
|
# f"!!! PUSH: I am {self._name}, input frame is a {frame.name}, combining input frame metadata: {self._input_frame_metadata} with frame metadata: {frame.metadata}"
|
||||||
|
# )
|
||||||
|
new_metadata = self._input_frame_metadata | frame.metadata
|
||||||
|
frame.metadata = new_metadata
|
||||||
|
# print(
|
||||||
|
# f"!!! PUSH2: I am {self._name}, input frame is a {frame.name}, frame metadata is now {frame.metadata}"
|
||||||
|
# )
|
||||||
await self.__push_queue.put((frame, direction))
|
await self.__push_queue.put((frame, direction))
|
||||||
|
|
||||||
def event_handler(self, event_name: str):
|
def event_handler(self, event_name: str):
|
||||||
|
|||||||
Reference in New Issue
Block a user