Compare commits

..

8 Commits

Author SHA1 Message Date
Chad Bailey
b8e2227a21 remove extra LLMFullResponseEndFrame 2025-02-03 22:41:04 +00:00
Aleix Conchillo Flaqué
6c7474e1a2 frames: add pass to DTMFFrames 2025-01-31 18:37:40 -08:00
Aleix Conchillo Flaqué
95f0dbf3f3 CHANGELOG.md: task.cancel() and EndFrame clarification 2025-01-31 18:35:35 -08:00
Aleix Conchillo Flaqué
11aeb68ddb frames: fix type s/OuputDTMFFrame/OutputDTMFFrame/ 2025-01-31 18:28:38 -08:00
Aleix Conchillo Flaqué
a43c102fc8 Merge pull request #1064 from jcbjoe/jg/additional_dtmf_frames
Added: Additional DTMF frames
2025-01-31 18:25:08 -08:00
Mark Backman
16b49bdce6 Merge pull request #1122 from pipecat-ai/mb/openai-org-id
Add organization and project level auth in OpenAILLMService
2025-01-31 14:35:26 -05:00
Mark Backman
41477c8f78 Add organization and project level auth in OpenAILLMService 2025-01-31 14:27:25 -05:00
Joe Garlick
b72504f1cb Added: Additional DTMF frames 2025-01-22 13:47:23 +00:00
8 changed files with 42 additions and 133 deletions

View File

@@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Added `organization` and `project` level authentication to
`OpenAILLMService`.
- Improved the language checking logic in `ElevenLabsTTSService` and
`ElevenLabsHttpTTSService` to properly handle language codes based on model
compatibility, with appropriate warnings when language codes cannot be
@@ -50,6 +53,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
contain a combination of function calls, function call responses, system
messages, or just messages.
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
`OutputDTMFFrame` frame.
### Fixed
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
@@ -67,11 +73,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Other
- Updated all examples to use `task.cancel()` instead of pushing an `EndFrame`
when a participant leaves/disconnects. If you push an `EndFrame` this will
cause the bot to run through everything that is internally queued (which could
take seconds). Instead, if a participant disconnects there is nothing else to
be sent and therefore we should stop immediately.
- Updated examples to use `task.cancel()` to immediately exit the example when a
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
an `EndFrame` causes the bot to run through everything that is internally
queued (which could take some seconds). Note that using `task.cancel()` might
not always be the best option and pushing an `EndFrame` could still be
desirable to make sure all the pipeline is flushed.
## [0.0.54] - 2025-01-27

View File

@@ -12,33 +12,19 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from processors import StoryImageFrame, StoryImageProcessor, StoryPageFrame, StoryProcessor
from processors import StoryImageProcessor, StoryProcessor
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
from utils.helpers import load_images, load_sounds
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
AudioRawFrame,
EndFrame,
Frame,
ImageRawFrame,
MetadataFrame,
SystemFrame,
TextFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.google import GoogleImageGenService, GoogleLLMService
from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
@@ -54,62 +40,6 @@ sounds = load_sounds(["listening.wav"])
images = load_images(["book1.png", "book2.png"])
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000
# Create direction arrow
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
if isinstance(frame, ImageRawFrame):
logger.info(
f"⚡ RAW IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, StoryPageFrame):
logger.info(
f"⚡ STORY PAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, StoryImageFrame):
logger.info(
f"⚡ STORY IMAGE FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TextFrame):
logger.info(
f"⚡ TEXT FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TTSStartedFrame):
logger.info(
f"⚡ TTS STARTED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, TTSStoppedFrame):
logger.info(
f"⚡ TTS STOPPED FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
elif isinstance(frame, MetadataFrame):
logger.info(
f"⚡ METADATA FRAME: {src} {arrow} {dst} at {time_sec:.2f}s, metadata: {frame.metadata}"
)
async def main(room_url, token=None):
async with aiohttp.ClientSession() as session:
# -------------- Transport --------------- #
@@ -160,18 +90,13 @@ async def main(room_url, token=None):
runner = PipelineRunner()
logger.debug("Waiting for participant...")
after = FrameLogger("After", "red", ignored_frame_types=[SystemFrame, AudioRawFrame])
before = FrameLogger("Before", "cyan", ignored_frame_types=[SystemFrame, AudioRawFrame])
main_pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm_service,
story_processor,
# SyncParallelPipeline([image_processor], [tts_service]),
before,
image_processor,
after,
tts_service,
transport.output(),
context_aggregator.assistant(),
@@ -184,7 +109,6 @@ async def main(room_url, token=None):
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[DebugObserver()],
),
)

View File

@@ -17,7 +17,6 @@ from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
MetadataFrame,
TextFrame,
UserStoppedSpeakingFrame,
)
@@ -76,7 +75,6 @@ class StoryImageProcessor(FrameProcessor):
if isinstance(frame, StoryPageFrame):
# Special syntax for the first page
print(f"!!! generating image for story page frame # {frame.metadata['story_page_id']}")
if self.pages == []:
prompt = FIRST_IMAGE_PROMPT % frame.text
else:
@@ -100,14 +98,10 @@ class StoryImageProcessor(FrameProcessor):
async for i in self._image_gen_service.run_image_gen(
IMAGE_GEN_PROMPT % image_description
):
print(
f"@@@ about to push a storyimageframe, input metadata is {self._input_frame_metadata}"
)
await self.push_frame(i)
except TimeoutError:
logger.debug("Image gen timeout")
pass
print(f"### past image gen try block, source frame is {frame.name}")
await self.stop_ttfb_metrics()
# Push the StoryPageFrame so it gets TTS
await self.push_frame(frame)
@@ -134,7 +128,6 @@ class StoryProcessor(FrameProcessor):
self._messages = messages
self._text = ""
self._story = story
self._current_page = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -155,7 +148,6 @@ class StoryProcessor(FrameProcessor):
# Driven by the prompt, the LLM should have asked the user for input
elif isinstance(frame, LLMFullResponseEndFrame):
# We use a different frame type, as to avoid image generation ingest
await self.push_frame(MetadataFrame())
await self.push_frame(StoryPromptFrame(self._text))
self._text = ""
await self.push_frame(frame)
@@ -195,14 +187,7 @@ class StoryProcessor(FrameProcessor):
if len(before_break) > 2:
self._story.append(before_break)
mf = MetadataFrame()
mf.metadata = {"story_page_id": self._current_page}
await self.push_frame(mf)
spf = StoryPageFrame(before_break)
spf.metadata["story_page_id"] = self._current_page
self._current_page += 1
await self.push_frame(spf)
await self.push_frame(StoryPageFrame(before_break))
# await self.push_frame(sounds["ding"])
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))

View File

@@ -91,13 +91,6 @@ class ControlFrame(Frame):
pass
@dataclass
class MetadataFrame(ControlFrame):
"""Used to set default metadata for downstream processors to apply to newly
created frames such as a frame_group_id.
"""
#
# Mixins
#
@@ -404,12 +397,26 @@ class TransportMessageFrame(DataFrame):
@dataclass
class InputDTMFFrame(DataFrame):
"""A DTMF button input"""
class DTMFFrame(DataFrame):
"""A DTMF button frame"""
button: KeypadEntry
@dataclass
class InputDTMFFrame(DTMFFrame):
"""A DTMF button input"""
pass
@dataclass
class OutputDTMFFrame(DTMFFrame):
"""A DTMF button output"""
pass
#
# System frames
#

View File

@@ -16,7 +16,6 @@ from pipecat.frames.frames import (
CancelFrame,
ErrorFrame,
Frame,
MetadataFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -85,9 +84,6 @@ class FrameProcessor:
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
# This enables an input frame's metadata to be copied to output frames
self._input_frame_metadata = {}
@property
def id(self) -> int:
return self._id
@@ -228,12 +224,6 @@ class FrameProcessor:
self.__input_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
# System frames skip the queue and blow up determinism
if isinstance(frame, MetadataFrame):
self._input_frame_metadata = frame.metadata
# print(
# f"!!! PROCESS: I am {self._name}, input frame is a {frame.name}, metadata is {self._input_frame_metadata}"
# )
if isinstance(frame, StartFrame):
self._clock = frame.clock
self._task_manager = frame.task_manager
@@ -258,14 +248,6 @@ class FrameProcessor:
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
# print(
# f"!!! PUSH: I am {self._name}, input frame is a {frame.name}, combining input frame metadata: {self._input_frame_metadata} with frame metadata: {frame.metadata}"
# )
new_metadata = self._input_frame_metadata | frame.metadata
frame.metadata = new_metadata
# print(
# f"!!! PUSH2: I am {self._name}, input frame is a {frame.name}, frame metadata is now {frame.metadata}"
# )
await self.__push_queue.put((frame, direction))
def event_handler(self, event_name: str):

View File

@@ -250,9 +250,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
# because we are likely still playing out audio and need the
# timestamp to set send context frames.
self._context_id = None
await self.add_word_timestamps(
[("TTSStoppedFrame", 0), ("LLMFullResponseEndFrame", 0), ("Reset", 0)]
)
await self.add_word_timestamps([("TTSStoppedFrame", 0), ("Reset", 0)])
elif msg["type"] == "timestamps":
await self.add_word_timestamps(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))

View File

@@ -284,7 +284,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
self._started = False
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])
await self.add_word_timestamps([("Reset", 0)])
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

View File

@@ -116,6 +116,8 @@ class BaseOpenAILLMService(LLMService):
model: str,
api_key=None,
base_url=None,
organization=None,
project=None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -131,12 +133,16 @@ class BaseOpenAILLMService(LLMService):
"extra": params.extra if isinstance(params.extra, dict) else {},
}
self.set_model_name(model)
self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs)
self._client = self.create_client(
api_key=api_key, base_url=base_url, organization=organization, project=project, **kwargs
)
def create_client(self, api_key=None, base_url=None, **kwargs):
def create_client(self, api_key=None, base_url=None, organization=None, project=None, **kwargs):
return AsyncOpenAI(
api_key=api_key,
base_url=base_url,
organization=organization,
project=project,
http_client=DefaultAsyncHttpxClient(
limits=httpx.Limits(
max_keepalive_connections=100, max_connections=1000, keepalive_expiry=None