Compare commits

..

1 Commits

Author SHA1 Message Date
Mark Backman
d331649736 Add context aggregation to Google Gemini LLM 2024-09-30 09:29:54 -04:00
30 changed files with 582 additions and 811 deletions

View File

@@ -9,8 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added Google TTS service and corresponding foundational example `07n-interruptible-google.py`
- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.
- Added InputParams to Azure TTS service.
@@ -48,10 +46,15 @@ async def on_connected(processor):
frames. To achieve that, each frame processor should only output frames from a
single task.
In this version all the frame processors have their own task to push
frames. That is, when `push_frame()` is called the given frame will be put
into an internal queue (with the exception of system frames) and a frame
processor task will push it out.
In this version we introduce synchronous and asynchronous frame
processors. The synchronous processors push output frames from the same task
that they receive input frames, and therefore only pushing frames from one
task. Asynchronous frame processors can have internal tasks to perform things
asynchronously (e.g. receiving data from a websocket) but they also have a
single task where they push frames from.
By default, frame processors are synchronous. To change a frame processor to
asynchronous you only need to pass `sync=False` to the base class constructor.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
@@ -63,7 +66,9 @@ async def on_connected(processor):
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added `CartesiaHttpTTSService`.
- Added `CartesiaHttpTTSService`. This is a synchronous frame processor
(i.e. given an input text frame it will wait for the whole output before
returning).
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
@@ -86,9 +91,6 @@ async def on_connected(processor):
### Changed
- Updated individual update settings frame classes into a single UpdateSettingsFrame
class for STT, LLM, and TTS.
- We now distinguish between input and output audio and image frames. We
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
and `OutputImageRawFrame` (and other subclasses of those). The input frames
@@ -103,9 +105,8 @@ async def on_connected(processor):
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by making sure the last processor in each of the
pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).
complete. This is achieved by ensuring all the processors in each of the
internal pipelines are synchronous.
- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be

View File

@@ -86,13 +86,13 @@ async def main():
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
@@ -107,10 +107,8 @@ async def main():
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
# Note that `SyncParallelPipeline` requires all processors in it to be
# synchronous (which is the default for most processors).
pipeline = Pipeline(
[
llm, # LLM

View File

@@ -121,10 +121,8 @@ async def main():
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
# Note that `SyncParallelPipeline` requires all processors in it to
# be synchronous (which is the default for most processors).
pipeline = Pipeline(
[
llm, # LLM

View File

@@ -1,100 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = GoogleTTSService(
credentials=os.getenv("GOOGLE_CREDENTIALS"),
voice_id="en-US-Neural2-J",
params=GoogleTTSService.InputParams(language="en-US", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -5,10 +5,14 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -21,12 +25,6 @@ from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)

View File

@@ -5,15 +5,10 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -24,6 +19,14 @@ from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -31,12 +34,7 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
await llm.push_frame(TextFrame("Let me check on that. "))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
await llm.push_frame(TextFrame("Let me check on that."))
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
@@ -108,11 +106,11 @@ async def main():
pipeline = Pipeline(
[
# fl_in,
fl_in,
transport.input(),
context_aggregator.user(),
llm,
# fl_out,
fl_out,
tts,
transport.output(),
context_aggregator.assistant(),

View File

@@ -44,7 +44,7 @@ elevenlabs = [ "websockets~=12.0" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
google = [ "google-generativeai~=0.7.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]

View File

@@ -4,8 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, List, Optional, Tuple, Union
from pipecat.clocks.base_clock import BaseClock
from pipecat.metrics.metrics import MetricsData
@@ -527,45 +528,113 @@ class UserImageRequestFrame(ControlFrame):
@dataclass
class LLMUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update LLM settings."""
class LLMModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM model."""
model: Optional[str] = None
temperature: Optional[float] = None
top_k: Optional[int] = None
top_p: Optional[float] = None
frequency_penalty: Optional[float] = None
presence_penalty: Optional[float] = None
max_tokens: Optional[int] = None
seed: Optional[int] = None
extra: dict = field(default_factory=dict)
model: str
@dataclass
class TTSUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update TTS settings."""
class LLMTemperatureUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM temperature."""
model: Optional[str] = None
voice: Optional[str] = None
language: Optional[Language] = None
speed: Optional[Union[str, float]] = None
emotion: Optional[List[str]] = None
engine: Optional[str] = None
pitch: Optional[str] = None
rate: Optional[str] = None
volume: Optional[str] = None
emphasis: Optional[str] = None
style: Optional[str] = None
style_degree: Optional[str] = None
role: Optional[str] = None
temperature: float
@dataclass
class STTUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update STT settings."""
class LLMTopKUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM top_k."""
model: Optional[str] = None
language: Optional[Language] = None
top_k: int
@dataclass
class LLMTopPUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM top_p."""
top_p: float
@dataclass
class LLMFrequencyPenaltyUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM frequency
penalty.
"""
frequency_penalty: float
@dataclass
class LLMPresencePenaltyUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM presence
penalty.
"""
presence_penalty: float
@dataclass
class LLMMaxTokensUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM max tokens."""
max_tokens: int
@dataclass
class LLMSeedUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM seed."""
seed: int
@dataclass
class LLMExtraUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM extra params."""
extra: dict
@dataclass
class TTSModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the TTS model."""
model: str
@dataclass
class TTSVoiceUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS voice."""
voice: str
@dataclass
class TTSLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS language and
optional voice.
"""
language: Language
@dataclass
class STTModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the STT model and optional
language.
"""
model: str
@dataclass
class STTLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to STT language."""
language: Language
@dataclass
@@ -585,7 +654,6 @@ class FunctionCallResultFrame(DataFrame):
tool_call_id: str
arguments: str
result: Any
run_llm: bool = True
@dataclass

View File

@@ -9,20 +9,14 @@ import asyncio
from itertools import chain
from typing import List
from pipecat.frames.frames import ControlFrame, Frame, SystemFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import Frame
from loguru import logger
class SyncFrame(ControlFrame):
"""This frame is used to know when the internal pipelines have finished."""
pass
class Source(FrameProcessor):
def __init__(self, upstream_queue: asyncio.Queue):
super().__init__()
@@ -73,16 +67,13 @@ class SyncParallelPipeline(BasePipeline):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
# We add a source at the beginning of the pipeline and a sink at the end.
up_queue = asyncio.Queue()
down_queue = asyncio.Queue()
source = Source(up_queue)
sink = Sink(down_queue)
source = Source(self._up_queue)
sink = Sink(self._down_queue)
processors: List[FrameProcessor] = [source] + processors + [sink]
# Keep track of sources and sinks. We also keep the output queue of
# the source and the sinks so we can use it later.
self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})
# Keep track of sources and sinks.
self._sources.append(source)
self._sinks.append(sink)
# Create pipeline
pipeline = Pipeline(processors)
@@ -103,46 +94,17 @@ class SyncParallelPipeline(BasePipeline):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# The last processor of each pipeline needs to be synchronous otherwise
# this element won't work. Since, we know it should be synchronous we
# push a SyncFrame. Since frames are ordered we know this frame will be
# pushed after the synchronous processor has pushed its data allowing us
# to synchrnonize all the internal pipelines by waiting for the
# SyncFrame in all of them.
async def wait_for_sync(
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
):
processor = obj["processor"]
queue = obj["queue"]
await processor.process_frame(frame, direction)
# If we have a system frame we don't need to synchrnonize anything.
if isinstance(frame, SystemFrame):
await main_queue.put(frame)
else:
await processor.process_frame(SyncFrame(), direction)
frame = await queue.get()
while not isinstance(frame, SyncFrame):
await main_queue.put(frame)
queue.task_done()
frame = await queue.get()
if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
)
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
await asyncio.gather(
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
)
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
seen_ids = set()
while not self._up_queue.empty():
frame = await self._up_queue.get()
if frame.id not in seen_ids:
if frame and frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.UPSTREAM)
seen_ids.add(frame.id)
self._up_queue.task_done()
@@ -150,7 +112,7 @@ class SyncParallelPipeline(BasePipeline):
seen_ids = set()
while not self._down_queue.empty():
frame = await self._down_queue.get()
if frame.id not in seen_ids:
if frame and frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
seen_ids.add(frame.id)
self._down_queue.task_done()

View File

@@ -69,19 +69,6 @@ class Source(FrameProcessor):
await self._up_queue.put(StopTaskFrame())
class Sink(FrameProcessor):
def __init__(self, down_queue: asyncio.Queue):
super().__init__()
self._down_queue = down_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We really just want to know when the EndFrame reached the sink.
if isinstance(frame, EndFrame):
await self._down_queue.put(frame)
class PipelineTask:
def __init__(
self,
@@ -97,16 +84,12 @@ class PipelineTask:
self._params = params
self._finished = False
self._up_queue = asyncio.Queue()
self._down_queue = asyncio.Queue()
self._push_queue = asyncio.Queue()
self._up_queue = asyncio.Queue()
self._source = Source(self._up_queue)
self._source.link(pipeline)
self._sink = Sink(self._down_queue)
pipeline.link(self._sink)
def has_finished(self):
return self._finished
@@ -120,19 +103,19 @@ class PipelineTask:
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._source.push_frame(CancelFrame())
self._process_push_task.cancel()
self._process_down_task.cancel()
self._process_up_task.cancel()
await self._process_push_task
await self._process_down_task
await self._process_up_task
async def run(self):
self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_push_task = asyncio.create_task(self._process_push_queue())
await asyncio.gather(self._process_up_task, self._process_push_task)
self._process_down_task = asyncio.create_task(self._process_down_queue())
await asyncio.gather(self._process_up_task, self._process_down_task)
self._finished = True
async def queue_frame(self, frame: Frame):
await self._push_queue.put(frame)
await self._down_queue.put(frame)
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
if isinstance(frames, AsyncIterable):
@@ -150,7 +133,7 @@ class PipelineTask:
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)
async def _process_push_queue(self):
async def _process_down_queue(self):
self._clock.start()
start_frame = StartFrame(
@@ -171,13 +154,11 @@ class PipelineTask:
should_cleanup = True
while running:
try:
frame = await self._push_queue.get()
frame = await self._down_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, EndFrame):
await self._wait_for_endframe()
running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame))
should_cleanup = not isinstance(frame, StopTaskFrame)
self._push_queue.task_done()
self._down_queue.task_done()
except asyncio.CancelledError:
break
# Cleanup only if we need to.
@@ -188,12 +169,6 @@ class PipelineTask:
self._process_up_task.cancel()
await self._process_up_task
async def _wait_for_endframe(self):
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
# so just wait for it. In the future we might do something else here,
# but for now this is fine.
await self._down_queue.get()
async def _process_up_queue(self):
while True:
try:

View File

@@ -133,7 +133,6 @@ class OpenAILLMContext:
tool_call_id: str,
arguments: str,
llm: FrameProcessor,
run_llm: bool = True,
) -> None:
# Push a SystemFrame downstream. This frame will let our assistant context aggregator
# know that we are in the middle of a function call. Some contexts/aggregators may
@@ -154,7 +153,6 @@ class OpenAILLMContext:
tool_call_id=tool_call_id,
arguments=arguments,
result=result,
run_llm=run_llm,
)
)

View File

@@ -37,6 +37,7 @@ class FrameProcessor:
*,
name: str | None = None,
metrics: FrameProcessorMetrics | None = None,
sync: bool = True,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs,
):
@@ -46,6 +47,7 @@ class FrameProcessor:
self._prev: "FrameProcessor" | None = None
self._next: "FrameProcessor" | None = None
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
self._sync = sync
self._event_handlers: dict = {}
@@ -64,8 +66,11 @@ class FrameProcessor:
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
# the exception to this rule. This create this task.
self.__create_push_task()
# the exception to this rule.
#
# This create this task.
if not self._sync:
self.__create_push_task()
@property
def interruptions_allowed(self):
@@ -162,7 +167,7 @@ class FrameProcessor:
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if isinstance(frame, SystemFrame):
if self._sync or isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
@@ -189,12 +194,13 @@ class FrameProcessor:
#
async def _start_interruption(self):
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task
if not self._sync:
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task
# Create a new queue and task.
self.__create_push_task()
# Create a new queue and task.
self.__create_push_task()
async def _stop_interruption(self):
# Nothing to do right now.

View File

@@ -516,7 +516,7 @@ class RTVIProcessor(FrameProcessor):
params: RTVIProcessorParams = RTVIProcessorParams(),
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._config = config
self._params = params

View File

@@ -44,7 +44,7 @@ class GStreamerPipelineSource(FrameProcessor):
clock_sync: bool = True
def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._out_params = out_params

View File

@@ -26,7 +26,7 @@ class IdleFrameProcessor(FrameProcessor):
types: List[type] = [],
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._callback = callback
self._timeout = timeout

View File

@@ -31,7 +31,7 @@ class UserIdleProcessor(FrameProcessor):
timeout: float,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._callback = callback
self._timeout = timeout

View File

@@ -7,10 +7,9 @@
import asyncio
import io
import wave
from abc import abstractmethod
from typing import AsyncGenerator, List, Optional, Tuple, Union
from loguru import logger
from abc import abstractmethod
from typing import AsyncGenerator, List, Optional, Tuple
from pipecat.frames.frames import (
AudioRawFrame,
@@ -19,26 +18,31 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
STTLanguageUpdateFrame,
STTModelUpdateFrame,
StartFrame,
StartInterruptionFrame,
STTUpdateSettingsFrame,
TextFrame,
TTSAudioRawFrame,
TTSLanguageUpdateFrame,
TTSModelUpdateFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSUpdateSettingsFrame,
TTSVoiceUpdateFrame,
TextFrame,
UserImageRequestFrame,
VisionImageRawFrame,
)
from pipecat.metrics.metrics import MetricsData
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transcriptions.language import Language
from pipecat.utils.audio import calculate_audio_volume
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import seconds_to_nanoseconds
from pipecat.utils.utils import exp_smoothing
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from loguru import logger
class AIService(FrameProcessor):
@@ -110,13 +114,7 @@ class LLMService(AIService):
return function_name in self._callbacks.keys()
async def call_function(
self,
*,
context: OpenAILLMContext,
tool_call_id: str,
function_name: str,
arguments: str,
run_llm: bool,
self, *, context: OpenAILLMContext, tool_call_id: str, function_name: str, arguments: str
) -> None:
f = None
if function_name in self._callbacks.keys():
@@ -126,12 +124,7 @@ class LLMService(AIService):
else:
return None
await context.call_function(
f,
function_name=function_name,
tool_call_id=tool_call_id,
arguments=arguments,
llm=self,
run_llm=run_llm,
f, function_name=function_name, tool_call_id=tool_call_id, arguments=arguments, llm=self
)
# QUESTION FOR CB: maybe this isn't needed anymore?
@@ -155,10 +148,6 @@ class TTSService(AIService):
# if True, TTSService will push TextFrames and LLMFullResponseEndFrames,
# otherwise subclass must do it
push_text_frames: bool = True,
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
# TTS output sample rate
sample_rate: int = 16000,
**kwargs,
@@ -166,14 +155,8 @@ class TTSService(AIService):
super().__init__(**kwargs)
self._aggregate_sentences: bool = aggregate_sentences
self._push_text_frames: bool = push_text_frames
self._push_stop_frames: bool = push_stop_frames
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._sample_rate: int = sample_rate
self._stop_frame_task: Optional[asyncio.Task] = None
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
self._current_sentence: str = ""
self._sample_rate: int = sample_rate
@property
def sample_rate(self) -> int:
@@ -191,112 +174,13 @@ class TTSService(AIService):
async def set_language(self, language: Language):
pass
@abstractmethod
async def set_speed(self, speed: Union[str, float]):
pass
@abstractmethod
async def set_emotion(self, emotion: List[str]):
pass
@abstractmethod
async def set_engine(self, engine: str):
pass
@abstractmethod
async def set_pitch(self, pitch: str):
pass
@abstractmethod
async def set_rate(self, rate: str):
pass
@abstractmethod
async def set_volume(self, volume: str):
pass
@abstractmethod
async def set_emphasis(self, emphasis: str):
pass
@abstractmethod
async def set_style(self, style: str):
pass
@abstractmethod
async def set_style_degree(self, style_degree: str):
pass
@abstractmethod
async def set_role(self, role: str):
pass
@abstractmethod
async def flush_audio(self):
pass
# Converts the text to audio.
@abstractmethod
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
pass
async def start(self, frame: StartFrame):
await super().start(frame)
if self._push_stop_frames:
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
async def stop(self, frame: EndFrame):
await super().stop(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def say(self, text: str):
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
await self.flush_audio()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, StartInterruptionFrame):
await self._handle_interruption(frame, direction)
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
sentence = self._current_sentence
self._current_sentence = ""
await self._push_tts_frames(sentence)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
elif isinstance(frame, TTSSpeakFrame):
await self._push_tts_frames(frame.text)
await self.flush_audio()
elif isinstance(frame, TTSUpdateSettingsFrame):
await self._update_tts_settings(frame)
else:
await self.push_frame(frame, direction)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if self._push_stop_frames and (
isinstance(frame, StartInterruptionFrame)
or isinstance(frame, TTSStartedFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TTSStoppedFrame)
):
await self._stop_frame_queue.put(frame)
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
self._current_sentence = ""
@@ -328,33 +212,91 @@ class TTSService(AIService):
# interrupted, the text is not added to the assistant context.
await self.push_frame(TextFrame(text))
async def _update_tts_settings(self, frame: TTSUpdateSettingsFrame):
if frame.model is not None:
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, StartInterruptionFrame):
await self._handle_interruption(frame, direction)
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
sentence = self._current_sentence
self._current_sentence = ""
await self._push_tts_frames(sentence)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
elif isinstance(frame, TTSSpeakFrame):
await self._push_tts_frames(frame.text)
elif isinstance(frame, TTSModelUpdateFrame):
await self.set_model(frame.model)
if frame.voice is not None:
elif isinstance(frame, TTSVoiceUpdateFrame):
await self.set_voice(frame.voice)
if frame.language is not None:
elif isinstance(frame, TTSLanguageUpdateFrame):
await self.set_language(frame.language)
if frame.speed is not None:
await self.set_speed(frame.speed)
if frame.emotion is not None:
await self.set_emotion(frame.emotion)
if frame.engine is not None:
await self.set_engine(frame.engine)
if frame.pitch is not None:
await self.set_pitch(frame.pitch)
if frame.rate is not None:
await self.set_rate(frame.rate)
if frame.volume is not None:
await self.set_volume(frame.volume)
if frame.emphasis is not None:
await self.set_emphasis(frame.emphasis)
if frame.style is not None:
await self.set_style(frame.style)
if frame.style_degree is not None:
await self.set_style_degree(frame.style_degree)
if frame.role is not None:
await self.set_role(frame.role)
else:
await self.push_frame(frame, direction)
class AsyncTTSService(TTSService):
def __init__(
self,
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
**kwargs,
):
super().__init__(sync=False, **kwargs)
self._push_stop_frames: bool = push_stop_frames
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._stop_frame_task: Optional[asyncio.Task] = None
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
@abstractmethod
async def flush_audio(self):
pass
async def say(self, text: str):
await super().say(text)
await self.flush_audio()
async def start(self, frame: StartFrame):
await super().start(frame)
if self._push_stop_frames:
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
async def stop(self, frame: EndFrame):
await super().stop(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSSpeakFrame):
await self.flush_audio()
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if self._push_stop_frames and (
isinstance(frame, StartInterruptionFrame)
or isinstance(frame, TTSStartedFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TTSStoppedFrame)
):
await self._stop_frame_queue.put(frame)
async def _stop_frame_handler(self):
try:
@@ -376,7 +318,7 @@ class TTSService(AIService):
pass
class WordTTSService(TTSService):
class AsyncWordTTSService(AsyncTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._initial_word_timestamp = -1
@@ -455,12 +397,6 @@ class STTService(AIService):
"""Returns transcript as a string"""
pass
async def _update_stt_settings(self, frame: STTUpdateSettingsFrame):
if frame.model is not None:
await self.set_model(frame.model)
if frame.language is not None:
await self.set_language(frame.language)
async def process_audio_frame(self, frame: AudioRawFrame):
await self.process_generator(self.run_stt(frame.audio))
@@ -472,8 +408,10 @@ class STTService(AIService):
# In this service we accumulate audio internally and at the end we
# push a TextFrame. We don't really want to push audio frames down.
await self.process_audio_frame(frame)
elif isinstance(frame, STTUpdateSettingsFrame):
await self._update_stt_settings(frame)
elif isinstance(frame, STTModelUpdateFrame):
await self.set_model(frame.model)
elif isinstance(frame, STTLanguageUpdateFrame):
await self.set_language(frame.language)
else:
await self.push_frame(frame, direction)

View File

@@ -5,47 +5,47 @@
#
import base64
import copy
import io
import json
import re
from asyncio import CancelledError
from dataclasses import dataclass
import io
import copy
from typing import Any, Dict, List, Optional
from loguru import logger
from dataclasses import dataclass
from PIL import Image
from asyncio import CancelledError
import re
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMEnablePromptCachingFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMUpdateSettingsFrame,
StartInterruptionFrame,
LLMModelUpdateFrame,
TextFrame,
UserImageRawFrame,
UserImageRequestFrame,
VisionImageRawFrame,
UserImageRequestFrame,
UserImageRawFrame,
LLMMessagesFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
FunctionCallResultFrame,
FunctionCallInProgressFrame,
StartInterruptionFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.llm_response import (
LLMUserContextAggregator,
LLMAssistantContextAggregator,
)
from loguru import logger
try:
from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven
from anthropic import AsyncAnthropic, NOT_GIVEN, NotGiven
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -279,21 +279,6 @@ class AnthropicLLMService(LLMService):
cache_read_input_tokens=cache_read_input_tokens,
)
async def _update_settings(self, frame: LLMUpdateSettingsFrame):
if frame.model is not None:
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
if frame.max_tokens is not None:
await self.set_max_tokens(frame.max_tokens)
if frame.temperature is not None:
await self.set_temperature(frame.temperature)
if frame.top_k is not None:
await self.set_top_k(frame.top_k)
if frame.top_p is not None:
await self.set_top_p(frame.top_p)
if frame.extra:
await self.set_extra(frame.extra)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -308,8 +293,9 @@ class AnthropicLLMService(LLMService):
# UserImageRawFrames coming through the pipeline and add them
# to the context.
context = AnthropicLLMContext.from_image_frame(frame)
elif isinstance(frame, LLMUpdateSettingsFrame):
await self._update_settings(frame)
elif isinstance(frame, LLMModelUpdateFrame):
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
elif isinstance(frame, LLMEnablePromptCachingFrame):
logger.debug(f"Setting enable prompt caching to: [{frame.enable}]")
self._enable_prompt_caching_beta = frame.enable

View File

@@ -41,10 +41,7 @@ try:
SpeechRecognizer,
SpeechSynthesizer,
)
from azure.cognitiveservices.speech.audio import (
AudioStreamFormat,
PushAudioInputStream,
)
from azure.cognitiveservices.speech.audio import AudioStreamFormat, PushAudioInputStream
from azure.cognitiveservices.speech.dialog import AudioConfig
from openai import AsyncAzureOpenAI
except ModuleNotFoundError as e:
@@ -76,7 +73,7 @@ class AzureLLMService(BaseOpenAILLMService):
class AzureTTSService(TTSService):
class InputParams(BaseModel):
emphasis: Optional[str] = None
language: Optional[str] = "en-US"
language_code: Optional[str] = "en-US"
pitch: Optional[str] = None
rate: Optional[str] = "1.05"
role: Optional[str] = None
@@ -108,7 +105,7 @@ class AzureTTSService(TTSService):
def _construct_ssml(self, text: str) -> str:
ssml = (
f"<speak version='1.0' xml:lang='{self._params.language}' "
f"<speak version='1.0' xml:lang='{self._params.language_code}' "
"xmlns='http://www.w3.org/2001/10/synthesis' "
"xmlns:mstts='http://www.w3.org/2001/mstts'>"
f"<voice name='{self._voice}'>"
@@ -158,9 +155,9 @@ class AzureTTSService(TTSService):
logger.debug(f"Setting TTS emphasis to: [{emphasis}]")
self._params.emphasis = emphasis
async def set_language(self, language: str):
logger.debug(f"Setting TTS language code to: [{language}]")
self._params.language = language
async def set_language_code(self, language_code: str):
logger.debug(f"Setting TTS language code to: [{language_code}]")
self._params.language_code = language_code
async def set_pitch(self, pitch: str):
logger.debug(f"Setting TTS pitch to: [{pitch}]")
@@ -190,7 +187,7 @@ class AzureTTSService(TTSService):
valid_params = {
"voice": self.set_voice,
"emphasis": self.set_emphasis,
"language_code": self.set_language,
"language_code": self.set_language_code,
"pitch": self.set_pitch,
"rate": self.set_rate,
"role": self.set_role,

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transcriptions.language import Language
from pipecat.services.ai_services import WordTTSService, TTSService
from pipecat.services.ai_services import AsyncWordTTSService, TTSService
from loguru import logger
@@ -61,7 +61,7 @@ def language_to_cartesia_language(language: Language) -> str | None:
return None
class CartesiaTTSService(WordTTSService):
class CartesiaTTSService(AsyncWordTTSService):
class InputParams(BaseModel):
encoding: Optional[str] = "pcm_s16le"
sample_rate: Optional[int] = 16000

View File

@@ -23,7 +23,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import WordTTSService
from pipecat.services.ai_services import AsyncWordTTSService
# See .env.example for ElevenLabs configuration needed
try:
@@ -70,9 +70,9 @@ def calculate_word_times(
return word_times
class ElevenLabsTTSService(WordTTSService):
class ElevenLabsTTSService(AsyncWordTTSService):
class InputParams(BaseModel):
language: Optional[str] = None
language_code: Optional[str] = None
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
optimize_streaming_latency: Optional[str] = None
stability: Optional[float] = None
@@ -229,13 +229,13 @@ class ElevenLabsTTSService(WordTTSService):
if self._params.optimize_streaming_latency:
url += f"&optimize_streaming_latency={self._params.optimize_streaming_latency}"
# language can only be used with the 'eleven_turbo_v2_5' model
if self._params.language:
# language_code can only be used with the 'eleven_turbo_v2_5' model
if self._params.language_code:
if model == "eleven_turbo_v2_5":
url += f"&language_code={self._params.language}"
url += f"&language_code={self._params.language_code}"
else:
logger.debug(
f"Language code [{self._params.language}] not applied. Language codes can only be used with the 'eleven_turbo_v2_5' model."
f"Language code [{self._params.language_code}] not applied. Language codes can only be used with the 'eleven_turbo_v2_5' model."
)
self._websocket = await websockets.connect(url)

View File

@@ -51,7 +51,7 @@ class GladiaSTTService(STTService):
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._api_key = api_key
self._url = url

View File

@@ -5,37 +5,43 @@
#
import asyncio
import base64
import io
import json
from typing import AsyncGenerator, List, Literal, Optional
from dataclasses import dataclass
from typing import List
from loguru import logger
from pydantic import BaseModel
from PIL import Image
from pipecat.frames.frames import (
ErrorFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMUpdateSettingsFrame,
LLMModelUpdateFrame,
StartInterruptionFrame,
TextFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
UserImageRawFrame,
UserImageRequestFrame,
VisionImageRawFrame,
)
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService, TTSService
from pipecat.services.ai_services import LLMService
try:
import google.ai.generativelanguage as glm
import google.generativeai as gai
from google.cloud import texttospeech_v1
from google.oauth2 import service_account
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -44,6 +50,18 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
@dataclass
class GoogleContextAggregatorPair:
_user: "GoogleUserContextAggregator"
_assistant: "GoogleAssistantContextAggregator"
def user(self) -> "GoogleUserContextAggregator":
return self._user
def assistant(self) -> "GoogleAssistantContextAggregator":
return self._assistant
class GoogleLLMService(LLMService):
"""This class implements inference with Google's AI models
@@ -60,6 +78,12 @@ class GoogleLLMService(LLMService):
def can_generate_metrics(self) -> bool:
return True
@staticmethod
def create_context_aggregator(context: OpenAILLMContext) -> GoogleContextAggregatorPair:
user = GoogleUserContextAggregator(context)
assistant = GoogleAssistantContextAggregator(user)
return GoogleContextAggregatorPair(_user=user, _assistant=assistant)
def _create_client(self, model: str):
self.set_model_name(model)
self._client = gai.GenerativeModel(model)
@@ -76,16 +100,24 @@ class GoogleLLMService(LLMService):
elif role == "assistant":
role = "model"
parts = [glm.Part(text=content)]
if "mime_type" in message:
parts.append(
glm.Part(
inline_data=glm.Blob(
mime_type=message["mime_type"], data=message["data"].getvalue()
if isinstance(content, list):
parts = []
for item in content:
if item["type"] == "text":
parts.append(glm.Part(text=item["text"]))
elif item["type"] == "image_url":
image_data = item["image_url"]["url"].split(",")[1]
parts.append(
glm.Part(
inline_data=glm.Blob(
mime_type="image/jpeg", data=base64.b64decode(image_data)
)
)
)
)
)
google_messages.append({"role": role, "parts": parts})
else:
parts = [glm.Part(text=content)]
google_messages.append(glm.Content(role=role, parts=parts))
return google_messages
@@ -95,8 +127,10 @@ class GoogleLLMService(LLMService):
await asyncio.sleep(0)
async def _process_context(self, context: OpenAILLMContext):
await self.push_frame(LLMFullResponseStartFrame())
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
logger.debug(f"Generating chat: {context.get_messages_json()}")
messages = self._get_messages_from_openai_context(context)
@@ -123,23 +157,22 @@ class GoogleLLMService(LLMService):
except Exception as e:
logger.exception(f"{self} exception: {e}")
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
context = None
if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
context = GoogleLLMContext.from_openai_context(frame.context)
elif isinstance(frame, LLMMessagesFrame):
context = OpenAILLMContext.from_messages(frame.messages)
context = GoogleLLMContext.from_messages(frame.messages)
elif isinstance(frame, VisionImageRawFrame):
context = OpenAILLMContext.from_image_frame(frame)
elif isinstance(frame, LLMUpdateSettingsFrame):
if frame.model is not None:
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
context = GoogleLLMContext.from_image_frame(frame)
elif isinstance(frame, LLMModelUpdateFrame):
logger.debug(f"Switching LLM model to: [{frame.model}]")
self._create_client(frame.model)
else:
await self.push_frame(frame, direction)
@@ -147,186 +180,155 @@ class GoogleLLMService(LLMService):
await self._process_context(context)
class GoogleTTSService(TTSService):
class InputParams(BaseModel):
pitch: Optional[str] = None
rate: Optional[str] = None
volume: Optional[str] = None
emphasis: Optional[Literal["strong", "moderate", "reduced", "none"]] = None
language: Optional[str] = None
gender: Optional[Literal["male", "female", "neutral"]] = None
google_style: Optional[Literal["apologetic", "calm", "empathetic", "firm", "lively"]] = None
class GoogleLLMContext(OpenAILLMContext):
def __init__(
self,
*,
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
voice_id: str = "en-US-Neural2-A",
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
messages: list[dict] | None = None,
tools: list[dict] | None = None,
tool_choice: dict | None = None,
):
super().__init__(sample_rate=sample_rate, **kwargs)
super().__init__(messages=messages, tools=tools, tool_choice=tool_choice)
self._user_image_request_context = {}
self._voice_id: str = voice_id
self._params = params
self._client: texttospeech_v1.TextToSpeechAsyncClient = self._create_client(
credentials, credentials_path
@classmethod
def from_openai_context(cls, openai_context: OpenAILLMContext):
return cls(
messages=openai_context.messages,
tools=openai_context.tools,
tool_choice=openai_context.tool_choice,
)
def _create_client(
self, credentials: Optional[str], credentials_path: Optional[str]
) -> texttospeech_v1.TextToSpeechAsyncClient:
creds: Optional[service_account.Credentials] = None
@classmethod
def from_messages(cls, messages: List[dict]) -> "GoogleLLMContext":
return cls(messages=messages)
# Create a Google Cloud service account for the Cloud Text-to-Speech API
# Using either the provided credentials JSON string or the path to a service account JSON
# file, create a Google Cloud service account and use it to authenticate with the API.
if credentials:
# Use provided credentials JSON string
json_account_info = json.loads(credentials)
creds = service_account.Credentials.from_service_account_info(json_account_info)
elif credentials_path:
# Use service account JSON file if provided
creds = service_account.Credentials.from_service_account_file(credentials_path)
else:
raise ValueError("Either 'credentials' or 'credentials_path' must be provided.")
@classmethod
def from_image_frame(cls, frame: VisionImageRawFrame) -> "GoogleLLMContext":
context = cls()
context.add_image_frame_message(
format=frame.format, size=frame.size, image=frame.image, text=frame.text
)
return context
return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds)
def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
):
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
def can_generate_metrics(self) -> bool:
return True
content = [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}}
]
if text:
content.append({"type": "text", "text": text})
self.add_message({"role": "user", "content": content})
def _construct_ssml(self, text: str) -> str:
ssml = "<speak>"
# Voice tag
voice_attrs = [f"name='{self._voice_id}'"]
if self._params.language:
voice_attrs.append(f"language='{self._params.language}'")
if self._params.gender:
voice_attrs.append(f"gender='{self._params.gender}'")
ssml += f"<voice {' '.join(voice_attrs)}>"
class GoogleUserContextAggregator(LLMUserContextAggregator):
def __init__(self, context: OpenAILLMContext | GoogleLLMContext):
super().__init__(context=context)
# Prosody tag
prosody_attrs = []
if self._params.pitch:
prosody_attrs.append(f"pitch='{self._params.pitch}'")
if self._params.rate:
prosody_attrs.append(f"rate='{self._params.rate}'")
if self._params.volume:
prosody_attrs.append(f"volume='{self._params.volume}'")
if isinstance(context, OpenAILLMContext):
self._context = GoogleLLMContext.from_openai_context(context)
if prosody_attrs:
ssml += f"<prosody {' '.join(prosody_attrs)}>"
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
try:
if isinstance(frame, UserImageRequestFrame):
if frame.context:
if isinstance(frame.context, str):
self._context._user_image_request_context[frame.user_id] = frame.context
else:
logger.error(
f"Unexpected UserImageRequestFrame context type: {type(frame.context)}"
)
del self._context._user_image_request_context[frame.user_id]
else:
if frame.user_id in self._context._user_image_request_context:
del self._context._user_image_request_context[frame.user_id]
elif isinstance(frame, UserImageRawFrame):
text = self._context._user_image_request_context.get(frame.user_id) or ""
if text:
del self._context._user_image_request_context[frame.user_id]
# Emphasis tag
if self._params.emphasis:
ssml += f"<emphasis level='{self._params.emphasis}'>"
# Handle the case where frame.format might be None
image_format = frame.format or "JPEG" # Default to JPEG if format is None
# Google style tag
if self._params.google_style:
ssml += f"<google:style name='{self._params.google_style}'>"
self._context.add_image_frame_message(
format=image_format, size=frame.size, image=frame.image, text=text
)
await self.push_context_frame()
except Exception as e:
logger.error(f"Error processing frame: {e}")
ssml += text
# Close tags
if self._params.google_style:
ssml += "</google:style>"
if self._params.emphasis:
ssml += "</emphasis>"
if prosody_attrs:
ssml += "</prosody>"
ssml += "</voice></speak>"
class GoogleAssistantContextAggregator(LLMAssistantContextAggregator):
def __init__(self, user_context_aggregator: GoogleUserContextAggregator):
super().__init__(context=user_context_aggregator._context)
self._user_context_aggregator = user_context_aggregator
self._function_call_in_progress = None
self._function_call_result = None
return ssml
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._function_call_in_progress = None
self._function_call_result = None
elif isinstance(frame, FunctionCallInProgressFrame):
self._function_call_in_progress = frame
elif isinstance(frame, FunctionCallResultFrame):
if (
self._function_call_in_progress
and self._function_call_in_progress.tool_call_id == frame.tool_call_id
):
self._function_call_in_progress = None
self._function_call_result = frame
await self._push_aggregation()
else:
logger.warning(
"FunctionCallResultFrame tool_call_id != InProgressFrame tool_call_id"
)
self._function_call_in_progress = None
self._function_call_result = None
async def set_voice(self, voice: str) -> None:
logger.debug(f"Switching TTS voice to: [{voice}]")
self._voice_id = voice
async def _push_aggregation(self):
if not (self._aggregation or self._function_call_result):
return
async def set_language(self, language: str) -> None:
logger.debug(f"Switching TTS language to: [{language}]")
self._params.language = language
run_llm = False
async def set_pitch(self, pitch: str) -> None:
logger.debug(f"Switching TTS pitch to: [{pitch}]")
self._params.pitch = pitch
async def set_rate(self, rate: str) -> None:
logger.debug(f"Switching TTS rate to: [{rate}]")
self._params.rate = rate
async def set_volume(self, volume: str) -> None:
logger.debug(f"Switching TTS volume to: [{volume}]")
self._params.volume = volume
async def set_emphasis(
self, emphasis: Literal["strong", "moderate", "reduced", "none"]
) -> None:
logger.debug(f"Switching TTS emphasis to: [{emphasis}]")
self._params.emphasis = emphasis
async def set_gender(self, gender: Literal["male", "female", "neutral"]) -> None:
logger.debug(f"Switch TTS gender to [{gender}]")
self._params.gender = gender
async def google_style(
self, google_style: Literal["apologetic", "calm", "empathetic", "firm", "lively"]
) -> None:
logger.debug(f"Switching TTS google style to: [{google_style}]")
self._params.google_style = google_style
async def set_params(self, params: InputParams) -> None:
logger.debug(f"Switching TTS params to: [{params}]")
self._params = params
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
aggregation = self._aggregation
self._aggregation = ""
try:
await self.start_ttfb_metrics()
if self._function_call_result:
frame = self._function_call_result
self._function_call_result = None
if frame.result:
self._context.add_message(
{
"role": "assistant",
"content": aggregation,
"function_call": {
"name": frame.function_name,
"arguments": json.dumps(frame.arguments),
},
}
)
self._context.add_message(
{
"role": "function",
"content": json.dumps(frame.result),
"name": frame.function_name,
}
)
run_llm = True
else:
self._context.add_message({"role": "assistant", "content": aggregation})
ssml = self._construct_ssml(text)
synthesis_input = texttospeech_v1.SynthesisInput(ssml=ssml)
voice = texttospeech_v1.VoiceSelectionParams(
language_code=self._params.language, name=self._voice_id
)
audio_config = texttospeech_v1.AudioConfig(
audio_encoding=texttospeech_v1.AudioEncoding.LINEAR16,
sample_rate_hertz=self.sample_rate,
)
request = texttospeech_v1.SynthesizeSpeechRequest(
input=synthesis_input, voice=voice, audio_config=audio_config
)
response = await self._client.synthesize_speech(request=request)
await self.start_tts_usage_metrics(text)
await self.push_frame(TTSStartedFrame())
# Skip the first 44 bytes to remove the WAV header
audio_content = response.audio_content[44:]
# Read and yield audio data in chunks
chunk_size = 8192
for i in range(0, len(audio_content), chunk_size):
chunk = audio_content[i : i + chunk_size]
if not chunk:
break
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
await asyncio.sleep(0) # Allow other tasks to run
await self.push_frame(TTSStoppedFrame())
if run_llm:
await self._user_context_aggregator.push_context_frame()
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
finally:
await self.push_frame(TTSStoppedFrame())
logger.error(f"Error processing frame: {e}")

View File

@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.ai_services import TTSService
from pipecat.services.ai_services import AsyncTTSService
from loguru import logger
@@ -35,7 +35,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class LmntTTSService(TTSService):
class LmntTTSService(AsyncTTSService):
def __init__(
self,
*,
@@ -47,7 +47,7 @@ class LmntTTSService(TTSService):
):
# Let TTSService produce TTSStoppedFrames after a short delay of
# no activity.
super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs)
super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._voice_id = voice_id

View File

@@ -4,39 +4,38 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import base64
import io
import json
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
import aiohttp
import httpx
from loguru import logger
from PIL import Image
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
ErrorFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMUpdateSettingsFrame,
StartInterruptionFrame,
TextFrame,
LLMModelUpdateFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TextFrame,
URLImageRawFrame,
VisionImageRawFrame,
FunctionCallResultFrame,
FunctionCallInProgressFrame,
StartInterruptionFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
LLMAssistantContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
@@ -45,14 +44,12 @@ from pipecat.processors.aggregators.openai_llm_context import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import ImageGenService, LLMService, TTSService
from PIL import Image
from loguru import logger
try:
from openai import (
NOT_GIVEN,
AsyncOpenAI,
AsyncStream,
BadRequestError,
DefaultAsyncHttpxClient,
)
from openai import AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient, BadRequestError, NOT_GIVEN
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -205,10 +202,6 @@ class BaseOpenAILLMService(LLMService):
return chunks
async def _process_context(self, context: OpenAILLMContext):
functions_list = []
arguments_list = []
tool_id_list = []
func_idx = 0
function_name = ""
arguments = ""
tool_call_id = ""
@@ -246,14 +239,6 @@ class BaseOpenAILLMService(LLMService):
# yield a frame containing the function name and the arguments.
tool_call = chunk.choices[0].delta.tool_calls[0]
if tool_call.index != func_idx:
functions_list.append(function_name)
arguments_list.append(arguments)
tool_id_list.append(tool_call_id)
function_name = ""
arguments = ""
tool_call_id = ""
func_idx += 1
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
tool_call_id = tool_call.id
@@ -269,46 +254,21 @@ class BaseOpenAILLMService(LLMService):
# the context, and re-prompt to get a chat answer. If we don't have a registered
# handler, raise an exception.
if function_name and arguments:
# added to the list as last function name and arguments not added to the list
functions_list.append(function_name)
arguments_list.append(arguments)
tool_id_list.append(tool_call_id)
if self.has_function(function_name):
await self._handle_function_call(context, tool_call_id, function_name, arguments)
else:
raise OpenAIUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
)
total_items = len(functions_list)
for index, (function_name, arguments, tool_id) in enumerate(
zip(functions_list, arguments_list, tool_id_list), start=1
):
if self.has_function(function_name):
run_llm = index == total_items
arguments = json.loads(arguments)
await self.call_function(
context=context,
function_name=function_name,
arguments=arguments,
tool_call_id=tool_id,
run_llm=run_llm,
)
else:
raise OpenAIUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
)
async def _update_settings(self, frame: LLMUpdateSettingsFrame):
if frame.model is not None:
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
if frame.frequency_penalty is not None:
await self.set_frequency_penalty(frame.frequency_penalty)
if frame.presence_penalty is not None:
await self.set_presence_penalty(frame.presence_penalty)
if frame.seed is not None:
await self.set_seed(frame.seed)
if frame.temperature is not None:
await self.set_temperature(frame.temperature)
if frame.top_p is not None:
await self.set_top_p(frame.top_p)
if frame.extra:
await self.set_extra(frame.extra)
async def _handle_function_call(self, context, tool_call_id, function_name, arguments):
arguments = json.loads(arguments)
await self.call_function(
context=context,
tool_call_id=tool_call_id,
function_name=function_name,
arguments=arguments,
)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -320,8 +280,9 @@ class BaseOpenAILLMService(LLMService):
context = OpenAILLMContext.from_messages(frame.messages)
elif isinstance(frame, VisionImageRawFrame):
context = OpenAILLMContext.from_image_frame(frame)
elif isinstance(frame, LLMUpdateSettingsFrame):
await self._update_settings(frame)
elif isinstance(frame, LLMModelUpdateFrame):
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
else:
await self.push_frame(frame, direction)
@@ -481,27 +442,31 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
def __init__(self, user_context_aggregator: OpenAIUserContextAggregator):
super().__init__(context=user_context_aggregator._context)
self._user_context_aggregator = user_context_aggregator
self._function_calls_in_progress = {}
self._function_call_in_progress = None
self._function_call_result = None
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# See note above about not calling push_frame() here.
if isinstance(frame, StartInterruptionFrame):
self._function_calls_in_progress.clear()
self._function_call_in_progress = None
self._function_call_finished = None
elif isinstance(frame, FunctionCallInProgressFrame):
self._function_calls_in_progress[frame.tool_call_id] = frame
self._function_call_in_progress = frame
elif isinstance(frame, FunctionCallResultFrame):
if frame.tool_call_id in self._function_calls_in_progress:
del self._function_calls_in_progress[frame.tool_call_id]
if (
self._function_call_in_progress
and self._function_call_in_progress.tool_call_id == frame.tool_call_id
):
self._function_call_in_progress = None
self._function_call_result = frame
# TODO-CB: Kwin wants us to refactor this out of here but I REFUSE
await self._push_aggregation()
else:
logger.warning(
"FunctionCallResultFrame tool_call_id does not match any function call in progress"
f"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id"
)
self._function_call_in_progress = None
self._function_call_result = None
async def _push_aggregation(self):
@@ -540,7 +505,7 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
"tool_call_id": frame.tool_call_id,
}
)
run_llm = frame.run_llm
run_llm = True
else:
self._context.add_message({"role": "assistant", "content": aggregation})

View File

@@ -7,36 +7,37 @@
import json
import re
import uuid
from asyncio import CancelledError
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from loguru import logger
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from asyncio import CancelledError
from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMUpdateSettingsFrame,
StartInterruptionFrame,
LLMModelUpdateFrame,
TextFrame,
UserImageRequestFrame,
LLMMessagesFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
FunctionCallResultFrame,
FunctionCallInProgressFrame,
StartInterruptionFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.llm_response import (
LLMUserContextAggregator,
LLMAssistantContextAggregator,
)
from loguru import logger
try:
from together import AsyncTogether
@@ -128,25 +129,6 @@ class TogetherLLMService(LLMService):
logger.debug(f"Switching LLM extra to: [{extra}]")
self._extra = extra
async def _update_settings(self, frame: LLMUpdateSettingsFrame):
if frame.model is not None:
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
if frame.frequency_penalty is not None:
await self.set_frequency_penalty(frame.frequency_penalty)
if frame.max_tokens is not None:
await self.set_max_tokens(frame.max_tokens)
if frame.presence_penalty is not None:
await self.set_presence_penalty(frame.presence_penalty)
if frame.temperature is not None:
await self.set_temperature(frame.temperature)
if frame.top_k is not None:
await self.set_top_k(frame.top_k)
if frame.top_p is not None:
await self.set_top_p(frame.top_p)
if frame.extra:
await self.set_extra(frame.extra)
async def _process_context(self, context: OpenAILLMContext):
try:
await self.push_frame(LLMFullResponseStartFrame())
@@ -206,7 +188,7 @@ class TogetherLLMService(LLMService):
if chunk.choices[0].finish_reason == "eos" and accumulating_function_call:
await self._extract_function_call(context, function_call_accumulator)
except CancelledError:
except CancelledError as e:
# todo: implement token counting estimates for use when the user interrupts a long generation
# we do this in the anthropic.py service
raise
@@ -224,8 +206,9 @@ class TogetherLLMService(LLMService):
context = frame.context
elif isinstance(frame, LLMMessagesFrame):
context = TogetherLLMContext.from_messages(frame.messages)
elif isinstance(frame, LLMUpdateSettingsFrame):
await self._update_settings(frame)
elif isinstance(frame, LLMModelUpdateFrame):
logger.debug(f"Switching LLM model to: [{frame.model}]")
self.set_model_name(frame.model)
else:
await self.push_frame(frame, direction)
@@ -355,7 +338,7 @@ class TogetherAssistantContextAggregator(LLMAssistantContextAggregator):
await self._push_aggregation()
else:
logger.warning(
"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id"
f"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id"
)
self._function_call_in_progress = None
self._function_call_result = None

View File

@@ -31,7 +31,7 @@ from loguru import logger
class BaseInputTransport(FrameProcessor):
def __init__(self, params: TransportParams, **kwargs):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._params = params

View File

@@ -43,7 +43,7 @@ from pipecat.utils.time import nanoseconds_to_seconds
class BaseOutputTransport(FrameProcessor):
def __init__(self, params: TransportParams, **kwargs):
super().__init__(**kwargs)
super().__init__(sync=False, **kwargs)
self._params = params

View File

@@ -7,7 +7,6 @@ deepgram-sdk~=3.5.0
fal-client~=0.4.1
fastapi~=0.112.1
faster-whisper~=1.0.3
google-cloud-texttospeech~=2.17.2
google-generativeai~=0.7.2
langchain~=0.2.14
livekit~=0.13.1

View File

@@ -7,9 +7,9 @@
import unittest
from pipecat.frames.frames import (
EndFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
StopTaskFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -32,7 +32,6 @@ from langchain_core.language_models import FakeStreamingListLLM
class TestLangchain(unittest.IsolatedAsyncioTestCase):
class MockProcessor(FrameProcessor):
def __init__(self, name):
super().__init__()
self.name = name
self.token: list[str] = []
# Start collecting tokens when we see the start frame
@@ -56,13 +55,13 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.expected_response = "Hello dear human"
self.fake_llm = FakeStreamingListLLM(responses=[self.expected_response])
self.mock_proc = self.MockProcessor("token_collector")
async def test_langchain(self):
messages = [("system", "Say hello to {name}"), ("human", "{input}")]
prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas")
chain = prompt | self.fake_llm
proc = LangchainProcessor(chain=chain)
self.mock_proc = self.MockProcessor("token_collector")
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
@@ -82,7 +81,7 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"),
UserStoppedSpeakingFrame(),
EndFrame(),
StopTaskFrame(),
]
)