Compare commits
8 Commits
cb/frame-g
...
cb/extra-l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8e2227a21 | ||
|
|
6c7474e1a2 | ||
|
|
95f0dbf3f3 | ||
|
|
11aeb68ddb | ||
|
|
a43c102fc8 | ||
|
|
16b49bdce6 | ||
|
|
41477c8f78 | ||
|
|
b72504f1cb |
17
CHANGELOG.md
17
CHANGELOG.md
@@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- Added `organization` and `project` level authentication to
|
||||
`OpenAILLMService`.
|
||||
|
||||
- Improved the language checking logic in `ElevenLabsTTSService` and
|
||||
`ElevenLabsHttpTTSService` to properly handle language codes based on model
|
||||
compatibility, with appropriate warnings when language codes cannot be
|
||||
@@ -50,6 +53,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
contain a combination of function calls, function call responses, system
|
||||
messages, or just messages.
|
||||
|
||||
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
|
||||
`OutputDTMFFrame` frame.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
|
||||
@@ -67,11 +73,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Other
|
||||
|
||||
- Updated all examples to use `task.cancel()` instead of pushing an `EndFrame`
|
||||
when a participant leaves/disconnects. If you push an `EndFrame` this will
|
||||
cause the bot to run through everything that is internally queued (which could
|
||||
take seconds). Instead, if a participant disconnects there is nothing else to
|
||||
be sent and therefore we should stop immediately.
|
||||
- Updated examples to use `task.cancel()` to immediately exit the example when a
|
||||
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
|
||||
an `EndFrame` causes the bot to run through everything that is internally
|
||||
queued (which could take some seconds). Note that using `task.cancel()` might
|
||||
not always be the best option and pushing an `EndFrame` could still be
|
||||
desirable to make sure all the pipeline is flushed.
|
||||
|
||||
## [0.0.54] - 2025-01-27
|
||||
|
||||
|
||||
@@ -12,33 +12,19 @@ import sys
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from processors import StoryImageFrame, StoryImageProcessor, StoryPageFrame, StoryProcessor
|
||||
from processors import StoryImageProcessor, StoryProcessor
|
||||
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
|
||||
from utils.helpers import load_images, load_sounds
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
ImageRawFrame,
|
||||
MetadataFrame,
|
||||
SystemFrame,
|
||||
TextFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.frames.frames import EndFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.logger import FrameLogger
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.fal import FalImageGenService
|
||||
from pipecat.services.google import GoogleImageGenService, GoogleLLMService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.transports.services.daily import (
|
||||
DailyParams,
|
||||
DailyTransport,
|
||||
@@ -54,62 +40,6 @@ sounds = load_sounds(["listening.wav"])
|
||||
images = load_images(["book1.png", "book2.png"])
|
||||
|
||||
|
||||
class DebugObserver(BaseObserver):
|
||||
"""Observer to log interruptions and bot speaking events to the console.
|
||||
|
||||
Logs all frame instances of:
|
||||
- StartInterruptionFrame
|
||||
- BotStartedSpeakingFrame
|
||||
- BotStoppedSpeakingFrame
|
||||
|
||||
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
|
||||
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
# Convert timestamp to seconds for readability
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
# Create direction arrow
|
||||
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
|
||||
|
||||
if isinstance(frame, ImageRawFrame):
|
||||
logger.info(
|
||||
f"⚡ RAW IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
elif isinstance(frame, StoryPageFrame):
|
||||
logger.info(
|
||||
f"⚡ STORY PAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
elif isinstance(frame, StoryImageFrame):
|
||||
logger.info(
|
||||
f"⚡ STORY IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
elif isinstance(frame, TextFrame):
|
||||
logger.info(
|
||||
f"⚡ TEXT FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
logger.info(
|
||||
f"⚡ TTS STARTED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
logger.info(
|
||||
f"⚡ TTS STOPPED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
elif isinstance(frame, MetadataFrame):
|
||||
logger.info(
|
||||
f"⚡ METADATA FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
|
||||
)
|
||||
|
||||
|
||||
async def main(room_url, token=None):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# -------------- Transport --------------- #
|
||||
@@ -160,18 +90,13 @@ async def main(room_url, token=None):
|
||||
runner = PipelineRunner()
|
||||
|
||||
logger.debug("Waiting for participant...")
|
||||
after = FrameLogger("After", "red", ignored_frame_types=[SystemFrame, AudioRawFrame])
|
||||
before = FrameLogger("Before", "cyan", ignored_frame_types=[SystemFrame, AudioRawFrame])
|
||||
main_pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm_service,
|
||||
story_processor,
|
||||
# SyncParallelPipeline([image_processor], [tts_service]),
|
||||
before,
|
||||
image_processor,
|
||||
after,
|
||||
tts_service,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
@@ -184,7 +109,6 @@ async def main(room_url, token=None):
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
observers=[DebugObserver()],
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMMessagesFrame,
|
||||
MetadataFrame,
|
||||
TextFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
@@ -76,7 +75,6 @@ class StoryImageProcessor(FrameProcessor):
|
||||
|
||||
if isinstance(frame, StoryPageFrame):
|
||||
# Special syntax for the first page
|
||||
print(f"!!! generating image for story page frame # {frame.metadata['story_page_id']}")
|
||||
if self.pages == []:
|
||||
prompt = FIRST_IMAGE_PROMPT % frame.text
|
||||
else:
|
||||
@@ -100,14 +98,10 @@ class StoryImageProcessor(FrameProcessor):
|
||||
async for i in self._image_gen_service.run_image_gen(
|
||||
IMAGE_GEN_PROMPT % image_description
|
||||
):
|
||||
print(
|
||||
f"@@@ about to push a storyimageframe, input metadata is {self._input_frame_metadata}"
|
||||
)
|
||||
await self.push_frame(i)
|
||||
except TimeoutError:
|
||||
logger.debug("Image gen timeout")
|
||||
pass
|
||||
print(f"### past image gen try block, source frame is {frame.name}")
|
||||
await self.stop_ttfb_metrics()
|
||||
# Push the StoryPageFrame so it gets TTS
|
||||
await self.push_frame(frame)
|
||||
@@ -134,7 +128,6 @@ class StoryProcessor(FrameProcessor):
|
||||
self._messages = messages
|
||||
self._text = ""
|
||||
self._story = story
|
||||
self._current_page = 0
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -155,7 +148,6 @@ class StoryProcessor(FrameProcessor):
|
||||
# Driven by the prompt, the LLM should have asked the user for input
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
# We use a different frame type, as to avoid image generation ingest
|
||||
await self.push_frame(MetadataFrame())
|
||||
await self.push_frame(StoryPromptFrame(self._text))
|
||||
self._text = ""
|
||||
await self.push_frame(frame)
|
||||
@@ -195,14 +187,7 @@ class StoryProcessor(FrameProcessor):
|
||||
|
||||
if len(before_break) > 2:
|
||||
self._story.append(before_break)
|
||||
mf = MetadataFrame()
|
||||
mf.metadata = {"story_page_id": self._current_page}
|
||||
await self.push_frame(mf)
|
||||
|
||||
spf = StoryPageFrame(before_break)
|
||||
spf.metadata["story_page_id"] = self._current_page
|
||||
self._current_page += 1
|
||||
await self.push_frame(spf)
|
||||
await self.push_frame(StoryPageFrame(before_break))
|
||||
# await self.push_frame(sounds["ding"])
|
||||
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
|
||||
|
||||
|
||||
@@ -91,13 +91,6 @@ class ControlFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetadataFrame(ControlFrame):
|
||||
"""Used to set default metadata for downstream processors to apply to newly
|
||||
created frames such as a frame_group_id.
|
||||
"""
|
||||
|
||||
|
||||
#
|
||||
# Mixins
|
||||
#
|
||||
@@ -404,12 +397,26 @@ class TransportMessageFrame(DataFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputDTMFFrame(DataFrame):
|
||||
"""A DTMF button input"""
|
||||
class DTMFFrame(DataFrame):
|
||||
"""A DTMF button frame"""
|
||||
|
||||
button: KeypadEntry
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputDTMFFrame(DTMFFrame):
|
||||
"""A DTMF button input"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class OutputDTMFFrame(DTMFFrame):
|
||||
"""A DTMF button output"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
#
|
||||
# System frames
|
||||
#
|
||||
|
||||
@@ -16,7 +16,6 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
MetadataFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -85,9 +84,6 @@ class FrameProcessor:
|
||||
# exception to this rule. This create this task.
|
||||
self.__push_frame_task: Optional[asyncio.Task] = None
|
||||
|
||||
# This enables an input frame's metadata to be copied to output frames
|
||||
self._input_frame_metadata = {}
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._id
|
||||
@@ -228,12 +224,6 @@ class FrameProcessor:
|
||||
self.__input_event.set()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
# System frames skip the queue and blow up determinism
|
||||
if isinstance(frame, MetadataFrame):
|
||||
self._input_frame_metadata = frame.metadata
|
||||
# print(
|
||||
# f"!!! PROCESS: I am {self._name}, input frame is a {frame.name}, metadata is {self._input_frame_metadata}"
|
||||
# )
|
||||
if isinstance(frame, StartFrame):
|
||||
self._clock = frame.clock
|
||||
self._task_manager = frame.task_manager
|
||||
@@ -258,14 +248,6 @@ class FrameProcessor:
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
else:
|
||||
# print(
|
||||
# f"!!! PUSH: I am {self._name}, input frame is a {frame.name}, combining input frame metadata: {self._input_frame_metadata} with frame metadata: {frame.metadata}"
|
||||
# )
|
||||
new_metadata = self._input_frame_metadata | frame.metadata
|
||||
frame.metadata = new_metadata
|
||||
# print(
|
||||
# f"!!! PUSH2: I am {self._name}, input frame is a {frame.name}, frame metadata is now {frame.metadata}"
|
||||
# )
|
||||
await self.__push_queue.put((frame, direction))
|
||||
|
||||
def event_handler(self, event_name: str):
|
||||
|
||||
@@ -250,9 +250,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
# because we are likely still playing out audio and need the
|
||||
# timestamp to set send context frames.
|
||||
self._context_id = None
|
||||
await self.add_word_timestamps(
|
||||
[("TTSStoppedFrame", 0), ("LLMFullResponseEndFrame", 0), ("Reset", 0)]
|
||||
)
|
||||
await self.add_word_timestamps([("TTSStoppedFrame", 0), ("Reset", 0)])
|
||||
elif msg["type"] == "timestamps":
|
||||
await self.add_word_timestamps(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
|
||||
|
||||
@@ -284,7 +284,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
self._started = False
|
||||
if isinstance(frame, TTSStoppedFrame):
|
||||
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])
|
||||
await self.add_word_timestamps([("Reset", 0)])
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
@@ -116,6 +116,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
model: str,
|
||||
api_key=None,
|
||||
base_url=None,
|
||||
organization=None,
|
||||
project=None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -131,12 +133,16 @@ class BaseOpenAILLMService(LLMService):
|
||||
"extra": params.extra if isinstance(params.extra, dict) else {},
|
||||
}
|
||||
self.set_model_name(model)
|
||||
self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs)
|
||||
self._client = self.create_client(
|
||||
api_key=api_key, base_url=base_url, organization=organization, project=project, **kwargs
|
||||
)
|
||||
|
||||
def create_client(self, api_key=None, base_url=None, **kwargs):
|
||||
def create_client(self, api_key=None, base_url=None, organization=None, project=None, **kwargs):
|
||||
return AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
organization=organization,
|
||||
project=project,
|
||||
http_client=DefaultAsyncHttpxClient(
|
||||
limits=httpx.Limits(
|
||||
max_keepalive_connections=100, max_connections=1000, keepalive_expiry=None
|
||||
|
||||
Reference in New Issue
Block a user