Compare commits
1 Commits
async-reba
...
mb/google-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d331649736 |
27
CHANGELOG.md
27
CHANGELOG.md
@@ -9,8 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added Google TTS service and corresponding foundational example `07n-interruptible-google.py`
|
||||
|
||||
- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.
|
||||
|
||||
- Added InputParams to Azure TTS service.
|
||||
@@ -48,10 +46,15 @@ async def on_connected(processor):
|
||||
frames. To achieve that, each frame processor should only output frames from a
|
||||
single task.
|
||||
|
||||
In this version all the frame processors have their own task to push
|
||||
frames. That is, when `push_frame()` is called the given frame will be put
|
||||
into an internal queue (with the exception of system frames) and a frame
|
||||
processor task will push it out.
|
||||
In this version we introduce synchronous and asynchronous frame
|
||||
processors. The synchronous processors push output frames from the same task
|
||||
that they receive input frames, and therefore only pushing frames from one
|
||||
task. Asynchronous frame processors can have internal tasks to perform things
|
||||
asynchronously (e.g. receiving data from a websocket) but they also have a
|
||||
single task where they push frames from.
|
||||
|
||||
By default, frame processors are synchronous. To change a frame processor to
|
||||
asynchronous you only need to pass `sync=False` to the base class constructor.
|
||||
|
||||
- Added pipeline clocks. A pipeline clock is used by the output transport to
|
||||
know when a frame needs to be presented. For that, all frames now have an
|
||||
@@ -63,7 +66,9 @@ async def on_connected(processor):
|
||||
`SystemClock`). This clock will be passed to each frame processor via the
|
||||
`StartFrame`.
|
||||
|
||||
- Added `CartesiaHttpTTSService`.
|
||||
- Added `CartesiaHttpTTSService`. This is a synchronous frame processor
|
||||
(i.e. given an input text frame it will wait for the whole output before
|
||||
returning).
|
||||
|
||||
- `DailyTransport` now supports setting the audio bitrate to improve audio
|
||||
quality through the `DailyParams.audio_out_bitrate` parameter. The new
|
||||
@@ -86,9 +91,6 @@ async def on_connected(processor):
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated individual update settings frame classes into a single UpdateSettingsFrame
|
||||
class for STT, LLM, and TTS.
|
||||
|
||||
- We now distinguish between input and output audio and image frames. We
|
||||
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
|
||||
and `OutputImageRawFrame` (and other subclasses of those). The input frames
|
||||
@@ -103,9 +105,8 @@ async def on_connected(processor):
|
||||
pipelines to be executed concurrently. The difference between a
|
||||
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
|
||||
the `SyncParallelPipeline` will wait for all the internal pipelines to
|
||||
complete. This is achieved by making sure the last processor in each of the
|
||||
pipelines is synchronous (e.g. an HTTP-based service that waits for the
|
||||
response).
|
||||
complete. This is achieved by ensuring all the processors in each of the
|
||||
internal pipelines are synchronous.
|
||||
|
||||
- `StartFrame` is back a system frame so we make sure it's processed immediately
|
||||
by all processors. `EndFrame` stays a control frame since it needs to be
|
||||
|
||||
@@ -86,13 +86,13 @@ async def main():
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
tts = CartesiaHttpTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
params=FalImageGenService.InputParams(image_size="square_hd"),
|
||||
aiohttp_session=session,
|
||||
@@ -107,10 +107,8 @@ async def main():
|
||||
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
|
||||
# wait for the input frame to be processed.
|
||||
#
|
||||
# Note that `SyncParallelPipeline` requires the last processor in each
|
||||
# of the pipelines to be synchronous. In this case, we use
|
||||
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
|
||||
# requests and wait for the response.
|
||||
# Note that `SyncParallelPipeline` requires all processors in it to be
|
||||
# synchronous (which is the default for most processors).
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
llm, # LLM
|
||||
|
||||
@@ -121,10 +121,8 @@ async def main():
|
||||
# `SyncParallelPipeline` will wait for the input frame to be
|
||||
# processed.
|
||||
#
|
||||
# Note that `SyncParallelPipeline` requires the last processor in
|
||||
# each of the pipelines to be synchronous. In this case, we use
|
||||
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
|
||||
# requests and wait for the response.
|
||||
# Note that `SyncParallelPipeline` requires all processors in it to
|
||||
# be synchronous (which is the default for most processors).
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
llm, # LLM
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator,
|
||||
)
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.google import GoogleTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = GoogleTTSService(
|
||||
credentials=os.getenv("GOOGLE_CREDENTIALS"),
|
||||
voice_id="en-US-Neural2-J",
|
||||
params=GoogleTTSService.InputParams(language="en-US", rate="1.05"),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -5,10 +5,14 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
@@ -21,12 +25,6 @@ from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
|
||||
@@ -5,15 +5,10 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
@@ -24,6 +19,14 @@ from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
@@ -31,12 +34,7 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
await llm.push_frame(TextFrame("Let me check on that. "))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
await llm.push_frame(TextFrame("Let me check on that."))
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
@@ -108,11 +106,11 @@ async def main():
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
# fl_in,
|
||||
fl_in,
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
# fl_out,
|
||||
fl_out,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
|
||||
@@ -44,7 +44,7 @@ elevenlabs = [ "websockets~=12.0" ]
|
||||
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.1" ]
|
||||
gladia = [ "websockets~=12.0" ]
|
||||
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
|
||||
google = [ "google-generativeai~=0.7.2" ]
|
||||
gstreamer = [ "pygobject~=3.48.2" ]
|
||||
fireworks = [ "openai~=1.37.2" ]
|
||||
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
|
||||
|
||||
@@ -4,8 +4,9 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, List, Optional, Tuple, Union
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
@@ -527,45 +528,113 @@ class UserImageRequestFrame(ControlFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMUpdateSettingsFrame(ControlFrame):
|
||||
"""A control frame containing a request to update LLM settings."""
|
||||
class LLMModelUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM model."""
|
||||
|
||||
model: Optional[str] = None
|
||||
temperature: Optional[float] = None
|
||||
top_k: Optional[int] = None
|
||||
top_p: Optional[float] = None
|
||||
frequency_penalty: Optional[float] = None
|
||||
presence_penalty: Optional[float] = None
|
||||
max_tokens: Optional[int] = None
|
||||
seed: Optional[int] = None
|
||||
extra: dict = field(default_factory=dict)
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSUpdateSettingsFrame(ControlFrame):
|
||||
"""A control frame containing a request to update TTS settings."""
|
||||
class LLMTemperatureUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM temperature."""
|
||||
|
||||
model: Optional[str] = None
|
||||
voice: Optional[str] = None
|
||||
language: Optional[Language] = None
|
||||
speed: Optional[Union[str, float]] = None
|
||||
emotion: Optional[List[str]] = None
|
||||
engine: Optional[str] = None
|
||||
pitch: Optional[str] = None
|
||||
rate: Optional[str] = None
|
||||
volume: Optional[str] = None
|
||||
emphasis: Optional[str] = None
|
||||
style: Optional[str] = None
|
||||
style_degree: Optional[str] = None
|
||||
role: Optional[str] = None
|
||||
temperature: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class STTUpdateSettingsFrame(ControlFrame):
|
||||
"""A control frame containing a request to update STT settings."""
|
||||
class LLMTopKUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM top_k."""
|
||||
|
||||
model: Optional[str] = None
|
||||
language: Optional[Language] = None
|
||||
top_k: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMTopPUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM top_p."""
|
||||
|
||||
top_p: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMFrequencyPenaltyUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM frequency
|
||||
penalty.
|
||||
|
||||
"""
|
||||
|
||||
frequency_penalty: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMPresencePenaltyUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM presence
|
||||
penalty.
|
||||
|
||||
"""
|
||||
|
||||
presence_penalty: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMMaxTokensUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM max tokens."""
|
||||
|
||||
max_tokens: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMSeedUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM seed."""
|
||||
|
||||
seed: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMExtraUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM extra params."""
|
||||
|
||||
extra: dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSModelUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update the TTS model."""
|
||||
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSVoiceUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new TTS voice."""
|
||||
|
||||
voice: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSLanguageUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new TTS language and
|
||||
optional voice.
|
||||
|
||||
"""
|
||||
|
||||
language: Language
|
||||
|
||||
|
||||
@dataclass
|
||||
class STTModelUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update the STT model and optional
|
||||
language.
|
||||
|
||||
"""
|
||||
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class STTLanguageUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to STT language."""
|
||||
|
||||
language: Language
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -585,7 +654,6 @@ class FunctionCallResultFrame(DataFrame):
|
||||
tool_call_id: str
|
||||
arguments: str
|
||||
result: Any
|
||||
run_llm: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -9,20 +9,14 @@ import asyncio
|
||||
from itertools import chain
|
||||
from typing import List
|
||||
|
||||
from pipecat.frames.frames import ControlFrame, Frame, SystemFrame
|
||||
from pipecat.pipeline.base_pipeline import BasePipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import Frame
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class SyncFrame(ControlFrame):
|
||||
"""This frame is used to know when the internal pipelines have finished."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class Source(FrameProcessor):
|
||||
def __init__(self, upstream_queue: asyncio.Queue):
|
||||
super().__init__()
|
||||
@@ -73,16 +67,13 @@ class SyncParallelPipeline(BasePipeline):
|
||||
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
|
||||
|
||||
# We add a source at the beginning of the pipeline and a sink at the end.
|
||||
up_queue = asyncio.Queue()
|
||||
down_queue = asyncio.Queue()
|
||||
source = Source(up_queue)
|
||||
sink = Sink(down_queue)
|
||||
source = Source(self._up_queue)
|
||||
sink = Sink(self._down_queue)
|
||||
processors: List[FrameProcessor] = [source] + processors + [sink]
|
||||
|
||||
# Keep track of sources and sinks. We also keep the output queue of
|
||||
# the source and the sinks so we can use it later.
|
||||
self._sources.append({"processor": source, "queue": down_queue})
|
||||
self._sinks.append({"processor": sink, "queue": up_queue})
|
||||
# Keep track of sources and sinks.
|
||||
self._sources.append(source)
|
||||
self._sinks.append(sink)
|
||||
|
||||
# Create pipeline
|
||||
pipeline = Pipeline(processors)
|
||||
@@ -103,46 +94,17 @@ class SyncParallelPipeline(BasePipeline):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# The last processor of each pipeline needs to be synchronous otherwise
|
||||
# this element won't work. Since, we know it should be synchronous we
|
||||
# push a SyncFrame. Since frames are ordered we know this frame will be
|
||||
# pushed after the synchronous processor has pushed its data allowing us
|
||||
# to synchrnonize all the internal pipelines by waiting for the
|
||||
# SyncFrame in all of them.
|
||||
async def wait_for_sync(
|
||||
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
|
||||
):
|
||||
processor = obj["processor"]
|
||||
queue = obj["queue"]
|
||||
await processor.process_frame(frame, direction)
|
||||
|
||||
# If we have a system frame we don't need to synchrnonize anything.
|
||||
if isinstance(frame, SystemFrame):
|
||||
await main_queue.put(frame)
|
||||
else:
|
||||
await processor.process_frame(SyncFrame(), direction)
|
||||
|
||||
frame = await queue.get()
|
||||
while not isinstance(frame, SyncFrame):
|
||||
await main_queue.put(frame)
|
||||
queue.task_done()
|
||||
frame = await queue.get()
|
||||
|
||||
if direction == FrameDirection.UPSTREAM:
|
||||
# If we get an upstream frame we process it in each sink.
|
||||
await asyncio.gather(
|
||||
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
|
||||
)
|
||||
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
|
||||
elif direction == FrameDirection.DOWNSTREAM:
|
||||
# If we get a downstream frame we process it in each source.
|
||||
await asyncio.gather(
|
||||
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
|
||||
)
|
||||
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
|
||||
|
||||
seen_ids = set()
|
||||
while not self._up_queue.empty():
|
||||
frame = await self._up_queue.get()
|
||||
if frame.id not in seen_ids:
|
||||
if frame and frame.id not in seen_ids:
|
||||
await self.push_frame(frame, FrameDirection.UPSTREAM)
|
||||
seen_ids.add(frame.id)
|
||||
self._up_queue.task_done()
|
||||
@@ -150,7 +112,7 @@ class SyncParallelPipeline(BasePipeline):
|
||||
seen_ids = set()
|
||||
while not self._down_queue.empty():
|
||||
frame = await self._down_queue.get()
|
||||
if frame.id not in seen_ids:
|
||||
if frame and frame.id not in seen_ids:
|
||||
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
seen_ids.add(frame.id)
|
||||
self._down_queue.task_done()
|
||||
|
||||
@@ -69,19 +69,6 @@ class Source(FrameProcessor):
|
||||
await self._up_queue.put(StopTaskFrame())
|
||||
|
||||
|
||||
class Sink(FrameProcessor):
|
||||
def __init__(self, down_queue: asyncio.Queue):
|
||||
super().__init__()
|
||||
self._down_queue = down_queue
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# We really just want to know when the EndFrame reached the sink.
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._down_queue.put(frame)
|
||||
|
||||
|
||||
class PipelineTask:
|
||||
def __init__(
|
||||
self,
|
||||
@@ -97,16 +84,12 @@ class PipelineTask:
|
||||
self._params = params
|
||||
self._finished = False
|
||||
|
||||
self._up_queue = asyncio.Queue()
|
||||
self._down_queue = asyncio.Queue()
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._up_queue = asyncio.Queue()
|
||||
|
||||
self._source = Source(self._up_queue)
|
||||
self._source.link(pipeline)
|
||||
|
||||
self._sink = Sink(self._down_queue)
|
||||
pipeline.link(self._sink)
|
||||
|
||||
def has_finished(self):
|
||||
return self._finished
|
||||
|
||||
@@ -120,19 +103,19 @@ class PipelineTask:
|
||||
# out-of-band from the main streaming task which is what we want since
|
||||
# we want to cancel right away.
|
||||
await self._source.push_frame(CancelFrame())
|
||||
self._process_push_task.cancel()
|
||||
self._process_down_task.cancel()
|
||||
self._process_up_task.cancel()
|
||||
await self._process_push_task
|
||||
await self._process_down_task
|
||||
await self._process_up_task
|
||||
|
||||
async def run(self):
|
||||
self._process_up_task = asyncio.create_task(self._process_up_queue())
|
||||
self._process_push_task = asyncio.create_task(self._process_push_queue())
|
||||
await asyncio.gather(self._process_up_task, self._process_push_task)
|
||||
self._process_down_task = asyncio.create_task(self._process_down_queue())
|
||||
await asyncio.gather(self._process_up_task, self._process_down_task)
|
||||
self._finished = True
|
||||
|
||||
async def queue_frame(self, frame: Frame):
|
||||
await self._push_queue.put(frame)
|
||||
await self._down_queue.put(frame)
|
||||
|
||||
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
|
||||
if isinstance(frames, AsyncIterable):
|
||||
@@ -150,7 +133,7 @@ class PipelineTask:
|
||||
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
|
||||
return MetricsFrame(data=data)
|
||||
|
||||
async def _process_push_queue(self):
|
||||
async def _process_down_queue(self):
|
||||
self._clock.start()
|
||||
|
||||
start_frame = StartFrame(
|
||||
@@ -171,13 +154,11 @@ class PipelineTask:
|
||||
should_cleanup = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._push_queue.get()
|
||||
frame = await self._down_queue.get()
|
||||
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._wait_for_endframe()
|
||||
running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame))
|
||||
should_cleanup = not isinstance(frame, StopTaskFrame)
|
||||
self._push_queue.task_done()
|
||||
self._down_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
# Cleanup only if we need to.
|
||||
@@ -188,12 +169,6 @@ class PipelineTask:
|
||||
self._process_up_task.cancel()
|
||||
await self._process_up_task
|
||||
|
||||
async def _wait_for_endframe(self):
|
||||
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
|
||||
# so just wait for it. In the future we might do something else here,
|
||||
# but for now this is fine.
|
||||
await self._down_queue.get()
|
||||
|
||||
async def _process_up_queue(self):
|
||||
while True:
|
||||
try:
|
||||
|
||||
@@ -133,7 +133,6 @@ class OpenAILLMContext:
|
||||
tool_call_id: str,
|
||||
arguments: str,
|
||||
llm: FrameProcessor,
|
||||
run_llm: bool = True,
|
||||
) -> None:
|
||||
# Push a SystemFrame downstream. This frame will let our assistant context aggregator
|
||||
# know that we are in the middle of a function call. Some contexts/aggregators may
|
||||
@@ -154,7 +153,6 @@ class OpenAILLMContext:
|
||||
tool_call_id=tool_call_id,
|
||||
arguments=arguments,
|
||||
result=result,
|
||||
run_llm=run_llm,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ class FrameProcessor:
|
||||
*,
|
||||
name: str | None = None,
|
||||
metrics: FrameProcessorMetrics | None = None,
|
||||
sync: bool = True,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -46,6 +47,7 @@ class FrameProcessor:
|
||||
self._prev: "FrameProcessor" | None = None
|
||||
self._next: "FrameProcessor" | None = None
|
||||
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
|
||||
self._sync = sync
|
||||
|
||||
self._event_handlers: dict = {}
|
||||
|
||||
@@ -64,8 +66,11 @@ class FrameProcessor:
|
||||
|
||||
# Every processor in Pipecat should only output frames from a single
|
||||
# task. This avoid problems like audio overlapping. System frames are
|
||||
# the exception to this rule. This create this task.
|
||||
self.__create_push_task()
|
||||
# the exception to this rule.
|
||||
#
|
||||
# This create this task.
|
||||
if not self._sync:
|
||||
self.__create_push_task()
|
||||
|
||||
@property
|
||||
def interruptions_allowed(self):
|
||||
@@ -162,7 +167,7 @@ class FrameProcessor:
|
||||
await self.push_frame(error, FrameDirection.UPSTREAM)
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
if isinstance(frame, SystemFrame):
|
||||
if self._sync or isinstance(frame, SystemFrame):
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
else:
|
||||
await self.__push_queue.put((frame, direction))
|
||||
@@ -189,12 +194,13 @@ class FrameProcessor:
|
||||
#
|
||||
|
||||
async def _start_interruption(self):
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self.__push_frame_task.cancel()
|
||||
await self.__push_frame_task
|
||||
if not self._sync:
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self.__push_frame_task.cancel()
|
||||
await self.__push_frame_task
|
||||
|
||||
# Create a new queue and task.
|
||||
self.__create_push_task()
|
||||
# Create a new queue and task.
|
||||
self.__create_push_task()
|
||||
|
||||
async def _stop_interruption(self):
|
||||
# Nothing to do right now.
|
||||
|
||||
@@ -516,7 +516,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
params: RTVIProcessorParams = RTVIProcessorParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
self._config = config
|
||||
self._params = params
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
clock_sync: bool = True
|
||||
|
||||
def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
|
||||
self._out_params = out_params
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ class IdleFrameProcessor(FrameProcessor):
|
||||
types: List[type] = [],
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
|
||||
self._callback = callback
|
||||
self._timeout = timeout
|
||||
|
||||
@@ -31,7 +31,7 @@ class UserIdleProcessor(FrameProcessor):
|
||||
timeout: float,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
|
||||
self._callback = callback
|
||||
self._timeout = timeout
|
||||
|
||||
@@ -7,10 +7,9 @@
|
||||
import asyncio
|
||||
import io
|
||||
import wave
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, List, Optional, Tuple, Union
|
||||
|
||||
from loguru import logger
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, List, Optional, Tuple
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
@@ -19,26 +18,31 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
STTLanguageUpdateFrame,
|
||||
STTModelUpdateFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
STTUpdateSettingsFrame,
|
||||
TextFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSLanguageUpdateFrame,
|
||||
TTSModelUpdateFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSUpdateSettingsFrame,
|
||||
TTSVoiceUpdateFrame,
|
||||
TextFrame,
|
||||
UserImageRequestFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.audio import calculate_audio_volume
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
from pipecat.utils.time import seconds_to_nanoseconds
|
||||
from pipecat.utils.utils import exp_smoothing
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class AIService(FrameProcessor):
|
||||
@@ -110,13 +114,7 @@ class LLMService(AIService):
|
||||
return function_name in self._callbacks.keys()
|
||||
|
||||
async def call_function(
|
||||
self,
|
||||
*,
|
||||
context: OpenAILLMContext,
|
||||
tool_call_id: str,
|
||||
function_name: str,
|
||||
arguments: str,
|
||||
run_llm: bool,
|
||||
self, *, context: OpenAILLMContext, tool_call_id: str, function_name: str, arguments: str
|
||||
) -> None:
|
||||
f = None
|
||||
if function_name in self._callbacks.keys():
|
||||
@@ -126,12 +124,7 @@ class LLMService(AIService):
|
||||
else:
|
||||
return None
|
||||
await context.call_function(
|
||||
f,
|
||||
function_name=function_name,
|
||||
tool_call_id=tool_call_id,
|
||||
arguments=arguments,
|
||||
llm=self,
|
||||
run_llm=run_llm,
|
||||
f, function_name=function_name, tool_call_id=tool_call_id, arguments=arguments, llm=self
|
||||
)
|
||||
|
||||
# QUESTION FOR CB: maybe this isn't needed anymore?
|
||||
@@ -155,10 +148,6 @@ class TTSService(AIService):
|
||||
# if True, TTSService will push TextFrames and LLMFullResponseEndFrames,
|
||||
# otherwise subclass must do it
|
||||
push_text_frames: bool = True,
|
||||
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
|
||||
push_stop_frames: bool = False,
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
# TTS output sample rate
|
||||
sample_rate: int = 16000,
|
||||
**kwargs,
|
||||
@@ -166,14 +155,8 @@ class TTSService(AIService):
|
||||
super().__init__(**kwargs)
|
||||
self._aggregate_sentences: bool = aggregate_sentences
|
||||
self._push_text_frames: bool = push_text_frames
|
||||
self._push_stop_frames: bool = push_stop_frames
|
||||
self._stop_frame_timeout_s: float = stop_frame_timeout_s
|
||||
self._sample_rate: int = sample_rate
|
||||
|
||||
self._stop_frame_task: Optional[asyncio.Task] = None
|
||||
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
|
||||
|
||||
self._current_sentence: str = ""
|
||||
self._sample_rate: int = sample_rate
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
@@ -191,112 +174,13 @@ class TTSService(AIService):
|
||||
async def set_language(self, language: Language):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_speed(self, speed: Union[str, float]):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_emotion(self, emotion: List[str]):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_engine(self, engine: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_pitch(self, pitch: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_rate(self, rate: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_volume(self, volume: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_emphasis(self, emphasis: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_style(self, style: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_style_degree(self, style_degree: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_role(self, role: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
# Converts the text to audio.
|
||||
@abstractmethod
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
pass
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self._push_stop_frames:
|
||||
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def say(self, text: str):
|
||||
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
|
||||
await self.flush_audio()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TextFrame):
|
||||
await self._process_text_frame(frame)
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self._handle_interruption(frame, direction)
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
|
||||
sentence = self._current_sentence
|
||||
self._current_sentence = ""
|
||||
await self._push_tts_frames(sentence)
|
||||
if isinstance(frame, LLMFullResponseEndFrame):
|
||||
if self._push_text_frames:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TTSSpeakFrame):
|
||||
await self._push_tts_frames(frame.text)
|
||||
await self.flush_audio()
|
||||
elif isinstance(frame, TTSUpdateSettingsFrame):
|
||||
await self._update_tts_settings(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await super().push_frame(frame, direction)
|
||||
|
||||
if self._push_stop_frames and (
|
||||
isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, TTSStartedFrame)
|
||||
or isinstance(frame, TTSAudioRawFrame)
|
||||
or isinstance(frame, TTSStoppedFrame)
|
||||
):
|
||||
await self._stop_frame_queue.put(frame)
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
self._current_sentence = ""
|
||||
@@ -328,33 +212,91 @@ class TTSService(AIService):
|
||||
# interrupted, the text is not added to the assistant context.
|
||||
await self.push_frame(TextFrame(text))
|
||||
|
||||
async def _update_tts_settings(self, frame: TTSUpdateSettingsFrame):
|
||||
if frame.model is not None:
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TextFrame):
|
||||
await self._process_text_frame(frame)
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self._handle_interruption(frame, direction)
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
|
||||
sentence = self._current_sentence
|
||||
self._current_sentence = ""
|
||||
await self._push_tts_frames(sentence)
|
||||
if isinstance(frame, LLMFullResponseEndFrame):
|
||||
if self._push_text_frames:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TTSSpeakFrame):
|
||||
await self._push_tts_frames(frame.text)
|
||||
elif isinstance(frame, TTSModelUpdateFrame):
|
||||
await self.set_model(frame.model)
|
||||
if frame.voice is not None:
|
||||
elif isinstance(frame, TTSVoiceUpdateFrame):
|
||||
await self.set_voice(frame.voice)
|
||||
if frame.language is not None:
|
||||
elif isinstance(frame, TTSLanguageUpdateFrame):
|
||||
await self.set_language(frame.language)
|
||||
if frame.speed is not None:
|
||||
await self.set_speed(frame.speed)
|
||||
if frame.emotion is not None:
|
||||
await self.set_emotion(frame.emotion)
|
||||
if frame.engine is not None:
|
||||
await self.set_engine(frame.engine)
|
||||
if frame.pitch is not None:
|
||||
await self.set_pitch(frame.pitch)
|
||||
if frame.rate is not None:
|
||||
await self.set_rate(frame.rate)
|
||||
if frame.volume is not None:
|
||||
await self.set_volume(frame.volume)
|
||||
if frame.emphasis is not None:
|
||||
await self.set_emphasis(frame.emphasis)
|
||||
if frame.style is not None:
|
||||
await self.set_style(frame.style)
|
||||
if frame.style_degree is not None:
|
||||
await self.set_style_degree(frame.style_degree)
|
||||
if frame.role is not None:
|
||||
await self.set_role(frame.role)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class AsyncTTSService(TTSService):
|
||||
def __init__(
|
||||
self,
|
||||
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
|
||||
push_stop_frames: bool = False,
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sync=False, **kwargs)
|
||||
self._push_stop_frames: bool = push_stop_frames
|
||||
self._stop_frame_timeout_s: float = stop_frame_timeout_s
|
||||
self._stop_frame_task: Optional[asyncio.Task] = None
|
||||
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
|
||||
|
||||
@abstractmethod
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
async def say(self, text: str):
|
||||
await super().say(text)
|
||||
await self.flush_audio()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self._push_stop_frames:
|
||||
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TTSSpeakFrame):
|
||||
await self.flush_audio()
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await super().push_frame(frame, direction)
|
||||
|
||||
if self._push_stop_frames and (
|
||||
isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, TTSStartedFrame)
|
||||
or isinstance(frame, TTSAudioRawFrame)
|
||||
or isinstance(frame, TTSStoppedFrame)
|
||||
):
|
||||
await self._stop_frame_queue.put(frame)
|
||||
|
||||
async def _stop_frame_handler(self):
|
||||
try:
|
||||
@@ -376,7 +318,7 @@ class TTSService(AIService):
|
||||
pass
|
||||
|
||||
|
||||
class WordTTSService(TTSService):
|
||||
class AsyncWordTTSService(AsyncTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._initial_word_timestamp = -1
|
||||
@@ -455,12 +397,6 @@ class STTService(AIService):
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
async def _update_stt_settings(self, frame: STTUpdateSettingsFrame):
|
||||
if frame.model is not None:
|
||||
await self.set_model(frame.model)
|
||||
if frame.language is not None:
|
||||
await self.set_language(frame.language)
|
||||
|
||||
async def process_audio_frame(self, frame: AudioRawFrame):
|
||||
await self.process_generator(self.run_stt(frame.audio))
|
||||
|
||||
@@ -472,8 +408,10 @@ class STTService(AIService):
|
||||
# In this service we accumulate audio internally and at the end we
|
||||
# push a TextFrame. We don't really want to push audio frames down.
|
||||
await self.process_audio_frame(frame)
|
||||
elif isinstance(frame, STTUpdateSettingsFrame):
|
||||
await self._update_stt_settings(frame)
|
||||
elif isinstance(frame, STTModelUpdateFrame):
|
||||
await self.set_model(frame.model)
|
||||
elif isinstance(frame, STTLanguageUpdateFrame):
|
||||
await self.set_language(frame.language)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -5,47 +5,47 @@
|
||||
#
|
||||
|
||||
import base64
|
||||
import copy
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
from asyncio import CancelledError
|
||||
from dataclasses import dataclass
|
||||
import io
|
||||
import copy
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
from dataclasses import dataclass
|
||||
from PIL import Image
|
||||
from asyncio import CancelledError
|
||||
import re
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMEnablePromptCachingFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartInterruptionFrame,
|
||||
LLMModelUpdateFrame,
|
||||
TextFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
VisionImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
UserImageRawFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallInProgressFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMUserContextAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
)
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven
|
||||
from anthropic import AsyncAnthropic, NOT_GIVEN, NotGiven
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
@@ -279,21 +279,6 @@ class AnthropicLLMService(LLMService):
|
||||
cache_read_input_tokens=cache_read_input_tokens,
|
||||
)
|
||||
|
||||
async def _update_settings(self, frame: LLMUpdateSettingsFrame):
|
||||
if frame.model is not None:
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
if frame.max_tokens is not None:
|
||||
await self.set_max_tokens(frame.max_tokens)
|
||||
if frame.temperature is not None:
|
||||
await self.set_temperature(frame.temperature)
|
||||
if frame.top_k is not None:
|
||||
await self.set_top_k(frame.top_k)
|
||||
if frame.top_p is not None:
|
||||
await self.set_top_p(frame.top_p)
|
||||
if frame.extra:
|
||||
await self.set_extra(frame.extra)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
@@ -308,8 +293,9 @@ class AnthropicLLMService(LLMService):
|
||||
# UserImageRawFrames coming through the pipeline and add them
|
||||
# to the context.
|
||||
context = AnthropicLLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
elif isinstance(frame, LLMEnablePromptCachingFrame):
|
||||
logger.debug(f"Setting enable prompt caching to: [{frame.enable}]")
|
||||
self._enable_prompt_caching_beta = frame.enable
|
||||
|
||||
@@ -41,10 +41,7 @@ try:
|
||||
SpeechRecognizer,
|
||||
SpeechSynthesizer,
|
||||
)
|
||||
from azure.cognitiveservices.speech.audio import (
|
||||
AudioStreamFormat,
|
||||
PushAudioInputStream,
|
||||
)
|
||||
from azure.cognitiveservices.speech.audio import AudioStreamFormat, PushAudioInputStream
|
||||
from azure.cognitiveservices.speech.dialog import AudioConfig
|
||||
from openai import AsyncAzureOpenAI
|
||||
except ModuleNotFoundError as e:
|
||||
@@ -76,7 +73,7 @@ class AzureLLMService(BaseOpenAILLMService):
|
||||
class AzureTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
emphasis: Optional[str] = None
|
||||
language: Optional[str] = "en-US"
|
||||
language_code: Optional[str] = "en-US"
|
||||
pitch: Optional[str] = None
|
||||
rate: Optional[str] = "1.05"
|
||||
role: Optional[str] = None
|
||||
@@ -108,7 +105,7 @@ class AzureTTSService(TTSService):
|
||||
|
||||
def _construct_ssml(self, text: str) -> str:
|
||||
ssml = (
|
||||
f"<speak version='1.0' xml:lang='{self._params.language}' "
|
||||
f"<speak version='1.0' xml:lang='{self._params.language_code}' "
|
||||
"xmlns='http://www.w3.org/2001/10/synthesis' "
|
||||
"xmlns:mstts='http://www.w3.org/2001/mstts'>"
|
||||
f"<voice name='{self._voice}'>"
|
||||
@@ -158,9 +155,9 @@ class AzureTTSService(TTSService):
|
||||
logger.debug(f"Setting TTS emphasis to: [{emphasis}]")
|
||||
self._params.emphasis = emphasis
|
||||
|
||||
async def set_language(self, language: str):
|
||||
logger.debug(f"Setting TTS language code to: [{language}]")
|
||||
self._params.language = language
|
||||
async def set_language_code(self, language_code: str):
|
||||
logger.debug(f"Setting TTS language code to: [{language_code}]")
|
||||
self._params.language_code = language_code
|
||||
|
||||
async def set_pitch(self, pitch: str):
|
||||
logger.debug(f"Setting TTS pitch to: [{pitch}]")
|
||||
@@ -190,7 +187,7 @@ class AzureTTSService(TTSService):
|
||||
valid_params = {
|
||||
"voice": self.set_voice,
|
||||
"emphasis": self.set_emphasis,
|
||||
"language_code": self.set_language,
|
||||
"language_code": self.set_language_code,
|
||||
"pitch": self.set_pitch,
|
||||
"rate": self.set_rate,
|
||||
"role": self.set_role,
|
||||
|
||||
@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.services.ai_services import WordTTSService, TTSService
|
||||
from pipecat.services.ai_services import AsyncWordTTSService, TTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -61,7 +61,7 @@ def language_to_cartesia_language(language: Language) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
class CartesiaTTSService(WordTTSService):
|
||||
class CartesiaTTSService(AsyncWordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
encoding: Optional[str] = "pcm_s16le"
|
||||
sample_rate: Optional[int] = 16000
|
||||
|
||||
@@ -23,7 +23,7 @@ from pipecat.frames.frames import (
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import WordTTSService
|
||||
from pipecat.services.ai_services import AsyncWordTTSService
|
||||
|
||||
# See .env.example for ElevenLabs configuration needed
|
||||
try:
|
||||
@@ -70,9 +70,9 @@ def calculate_word_times(
|
||||
return word_times
|
||||
|
||||
|
||||
class ElevenLabsTTSService(WordTTSService):
|
||||
class ElevenLabsTTSService(AsyncWordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[str] = None
|
||||
language_code: Optional[str] = None
|
||||
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
|
||||
optimize_streaming_latency: Optional[str] = None
|
||||
stability: Optional[float] = None
|
||||
@@ -229,13 +229,13 @@ class ElevenLabsTTSService(WordTTSService):
|
||||
if self._params.optimize_streaming_latency:
|
||||
url += f"&optimize_streaming_latency={self._params.optimize_streaming_latency}"
|
||||
|
||||
# language can only be used with the 'eleven_turbo_v2_5' model
|
||||
if self._params.language:
|
||||
# language_code can only be used with the 'eleven_turbo_v2_5' model
|
||||
if self._params.language_code:
|
||||
if model == "eleven_turbo_v2_5":
|
||||
url += f"&language_code={self._params.language}"
|
||||
url += f"&language_code={self._params.language_code}"
|
||||
else:
|
||||
logger.debug(
|
||||
f"Language code [{self._params.language}] not applied. Language codes can only be used with the 'eleven_turbo_v2_5' model."
|
||||
f"Language code [{self._params.language_code}] not applied. Language codes can only be used with the 'eleven_turbo_v2_5' model."
|
||||
)
|
||||
|
||||
self._websocket = await websockets.connect(url)
|
||||
|
||||
@@ -51,7 +51,7 @@ class GladiaSTTService(STTService):
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
|
||||
@@ -5,37 +5,43 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
from typing import AsyncGenerator, List, Literal, Optional
|
||||
from dataclasses import dataclass
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
LLMModelUpdateFrame,
|
||||
StartInterruptionFrame,
|
||||
TextFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService, TTSService
|
||||
from pipecat.services.ai_services import LLMService
|
||||
|
||||
try:
|
||||
import google.ai.generativelanguage as glm
|
||||
import google.generativeai as gai
|
||||
from google.cloud import texttospeech_v1
|
||||
from google.oauth2 import service_account
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
@@ -44,6 +50,18 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class GoogleContextAggregatorPair:
|
||||
_user: "GoogleUserContextAggregator"
|
||||
_assistant: "GoogleAssistantContextAggregator"
|
||||
|
||||
def user(self) -> "GoogleUserContextAggregator":
|
||||
return self._user
|
||||
|
||||
def assistant(self) -> "GoogleAssistantContextAggregator":
|
||||
return self._assistant
|
||||
|
||||
|
||||
class GoogleLLMService(LLMService):
|
||||
"""This class implements inference with Google's AI models
|
||||
|
||||
@@ -60,6 +78,12 @@ class GoogleLLMService(LLMService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def create_context_aggregator(context: OpenAILLMContext) -> GoogleContextAggregatorPair:
|
||||
user = GoogleUserContextAggregator(context)
|
||||
assistant = GoogleAssistantContextAggregator(user)
|
||||
return GoogleContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
|
||||
def _create_client(self, model: str):
|
||||
self.set_model_name(model)
|
||||
self._client = gai.GenerativeModel(model)
|
||||
@@ -76,16 +100,24 @@ class GoogleLLMService(LLMService):
|
||||
elif role == "assistant":
|
||||
role = "model"
|
||||
|
||||
parts = [glm.Part(text=content)]
|
||||
if "mime_type" in message:
|
||||
parts.append(
|
||||
glm.Part(
|
||||
inline_data=glm.Blob(
|
||||
mime_type=message["mime_type"], data=message["data"].getvalue()
|
||||
if isinstance(content, list):
|
||||
parts = []
|
||||
for item in content:
|
||||
if item["type"] == "text":
|
||||
parts.append(glm.Part(text=item["text"]))
|
||||
elif item["type"] == "image_url":
|
||||
image_data = item["image_url"]["url"].split(",")[1]
|
||||
parts.append(
|
||||
glm.Part(
|
||||
inline_data=glm.Blob(
|
||||
mime_type="image/jpeg", data=base64.b64decode(image_data)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
google_messages.append({"role": role, "parts": parts})
|
||||
else:
|
||||
parts = [glm.Part(text=content)]
|
||||
|
||||
google_messages.append(glm.Content(role=role, parts=parts))
|
||||
|
||||
return google_messages
|
||||
|
||||
@@ -95,8 +127,10 @@ class GoogleLLMService(LLMService):
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def _process_context(self, context: OpenAILLMContext):
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
|
||||
logger.debug(f"Generating chat: {context.get_messages_json()}")
|
||||
|
||||
messages = self._get_messages_from_openai_context(context)
|
||||
@@ -123,23 +157,22 @@ class GoogleLLMService(LLMService):
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
context = None
|
||||
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
context: OpenAILLMContext = frame.context
|
||||
context = GoogleLLMContext.from_openai_context(frame.context)
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
context = GoogleLLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
context = OpenAILLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
if frame.model is not None:
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
context = GoogleLLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self._create_client(frame.model)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -147,186 +180,155 @@ class GoogleLLMService(LLMService):
|
||||
await self._process_context(context)
|
||||
|
||||
|
||||
class GoogleTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
pitch: Optional[str] = None
|
||||
rate: Optional[str] = None
|
||||
volume: Optional[str] = None
|
||||
emphasis: Optional[Literal["strong", "moderate", "reduced", "none"]] = None
|
||||
language: Optional[str] = None
|
||||
gender: Optional[Literal["male", "female", "neutral"]] = None
|
||||
google_style: Optional[Literal["apologetic", "calm", "empathetic", "firm", "lively"]] = None
|
||||
|
||||
class GoogleLLMContext(OpenAILLMContext):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
credentials: Optional[str] = None,
|
||||
credentials_path: Optional[str] = None,
|
||||
voice_id: str = "en-US-Neural2-A",
|
||||
sample_rate: int = 24000,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
messages: list[dict] | None = None,
|
||||
tools: list[dict] | None = None,
|
||||
tool_choice: dict | None = None,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
super().__init__(messages=messages, tools=tools, tool_choice=tool_choice)
|
||||
self._user_image_request_context = {}
|
||||
|
||||
self._voice_id: str = voice_id
|
||||
self._params = params
|
||||
self._client: texttospeech_v1.TextToSpeechAsyncClient = self._create_client(
|
||||
credentials, credentials_path
|
||||
@classmethod
|
||||
def from_openai_context(cls, openai_context: OpenAILLMContext):
|
||||
return cls(
|
||||
messages=openai_context.messages,
|
||||
tools=openai_context.tools,
|
||||
tool_choice=openai_context.tool_choice,
|
||||
)
|
||||
|
||||
def _create_client(
|
||||
self, credentials: Optional[str], credentials_path: Optional[str]
|
||||
) -> texttospeech_v1.TextToSpeechAsyncClient:
|
||||
creds: Optional[service_account.Credentials] = None
|
||||
@classmethod
|
||||
def from_messages(cls, messages: List[dict]) -> "GoogleLLMContext":
|
||||
return cls(messages=messages)
|
||||
|
||||
# Create a Google Cloud service account for the Cloud Text-to-Speech API
|
||||
# Using either the provided credentials JSON string or the path to a service account JSON
|
||||
# file, create a Google Cloud service account and use it to authenticate with the API.
|
||||
if credentials:
|
||||
# Use provided credentials JSON string
|
||||
json_account_info = json.loads(credentials)
|
||||
creds = service_account.Credentials.from_service_account_info(json_account_info)
|
||||
elif credentials_path:
|
||||
# Use service account JSON file if provided
|
||||
creds = service_account.Credentials.from_service_account_file(credentials_path)
|
||||
else:
|
||||
raise ValueError("Either 'credentials' or 'credentials_path' must be provided.")
|
||||
@classmethod
|
||||
def from_image_frame(cls, frame: VisionImageRawFrame) -> "GoogleLLMContext":
|
||||
context = cls()
|
||||
context.add_image_frame_message(
|
||||
format=frame.format, size=frame.size, image=frame.image, text=frame.text
|
||||
)
|
||||
return context
|
||||
|
||||
return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds)
|
||||
def add_image_frame_message(
|
||||
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
|
||||
):
|
||||
buffer = io.BytesIO()
|
||||
Image.frombytes(format, size, image).save(buffer, format="JPEG")
|
||||
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
content = [
|
||||
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}}
|
||||
]
|
||||
if text:
|
||||
content.append({"type": "text", "text": text})
|
||||
self.add_message({"role": "user", "content": content})
|
||||
|
||||
def _construct_ssml(self, text: str) -> str:
|
||||
ssml = "<speak>"
|
||||
|
||||
# Voice tag
|
||||
voice_attrs = [f"name='{self._voice_id}'"]
|
||||
if self._params.language:
|
||||
voice_attrs.append(f"language='{self._params.language}'")
|
||||
if self._params.gender:
|
||||
voice_attrs.append(f"gender='{self._params.gender}'")
|
||||
ssml += f"<voice {' '.join(voice_attrs)}>"
|
||||
class GoogleUserContextAggregator(LLMUserContextAggregator):
|
||||
def __init__(self, context: OpenAILLMContext | GoogleLLMContext):
|
||||
super().__init__(context=context)
|
||||
|
||||
# Prosody tag
|
||||
prosody_attrs = []
|
||||
if self._params.pitch:
|
||||
prosody_attrs.append(f"pitch='{self._params.pitch}'")
|
||||
if self._params.rate:
|
||||
prosody_attrs.append(f"rate='{self._params.rate}'")
|
||||
if self._params.volume:
|
||||
prosody_attrs.append(f"volume='{self._params.volume}'")
|
||||
if isinstance(context, OpenAILLMContext):
|
||||
self._context = GoogleLLMContext.from_openai_context(context)
|
||||
|
||||
if prosody_attrs:
|
||||
ssml += f"<prosody {' '.join(prosody_attrs)}>"
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
try:
|
||||
if isinstance(frame, UserImageRequestFrame):
|
||||
if frame.context:
|
||||
if isinstance(frame.context, str):
|
||||
self._context._user_image_request_context[frame.user_id] = frame.context
|
||||
else:
|
||||
logger.error(
|
||||
f"Unexpected UserImageRequestFrame context type: {type(frame.context)}"
|
||||
)
|
||||
del self._context._user_image_request_context[frame.user_id]
|
||||
else:
|
||||
if frame.user_id in self._context._user_image_request_context:
|
||||
del self._context._user_image_request_context[frame.user_id]
|
||||
elif isinstance(frame, UserImageRawFrame):
|
||||
text = self._context._user_image_request_context.get(frame.user_id) or ""
|
||||
if text:
|
||||
del self._context._user_image_request_context[frame.user_id]
|
||||
|
||||
# Emphasis tag
|
||||
if self._params.emphasis:
|
||||
ssml += f"<emphasis level='{self._params.emphasis}'>"
|
||||
# Handle the case where frame.format might be None
|
||||
image_format = frame.format or "JPEG" # Default to JPEG if format is None
|
||||
|
||||
# Google style tag
|
||||
if self._params.google_style:
|
||||
ssml += f"<google:style name='{self._params.google_style}'>"
|
||||
self._context.add_image_frame_message(
|
||||
format=image_format, size=frame.size, image=frame.image, text=text
|
||||
)
|
||||
await self.push_context_frame()
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing frame: {e}")
|
||||
|
||||
ssml += text
|
||||
|
||||
# Close tags
|
||||
if self._params.google_style:
|
||||
ssml += "</google:style>"
|
||||
if self._params.emphasis:
|
||||
ssml += "</emphasis>"
|
||||
if prosody_attrs:
|
||||
ssml += "</prosody>"
|
||||
ssml += "</voice></speak>"
|
||||
class GoogleAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
def __init__(self, user_context_aggregator: GoogleUserContextAggregator):
|
||||
super().__init__(context=user_context_aggregator._context)
|
||||
self._user_context_aggregator = user_context_aggregator
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = None
|
||||
|
||||
return ssml
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = None
|
||||
elif isinstance(frame, FunctionCallInProgressFrame):
|
||||
self._function_call_in_progress = frame
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
if (
|
||||
self._function_call_in_progress
|
||||
and self._function_call_in_progress.tool_call_id == frame.tool_call_id
|
||||
):
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = frame
|
||||
await self._push_aggregation()
|
||||
else:
|
||||
logger.warning(
|
||||
"FunctionCallResultFrame tool_call_id != InProgressFrame tool_call_id"
|
||||
)
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = None
|
||||
|
||||
async def set_voice(self, voice: str) -> None:
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
async def _push_aggregation(self):
|
||||
if not (self._aggregation or self._function_call_result):
|
||||
return
|
||||
|
||||
async def set_language(self, language: str) -> None:
|
||||
logger.debug(f"Switching TTS language to: [{language}]")
|
||||
self._params.language = language
|
||||
run_llm = False
|
||||
|
||||
async def set_pitch(self, pitch: str) -> None:
|
||||
logger.debug(f"Switching TTS pitch to: [{pitch}]")
|
||||
self._params.pitch = pitch
|
||||
|
||||
async def set_rate(self, rate: str) -> None:
|
||||
logger.debug(f"Switching TTS rate to: [{rate}]")
|
||||
self._params.rate = rate
|
||||
|
||||
async def set_volume(self, volume: str) -> None:
|
||||
logger.debug(f"Switching TTS volume to: [{volume}]")
|
||||
self._params.volume = volume
|
||||
|
||||
async def set_emphasis(
|
||||
self, emphasis: Literal["strong", "moderate", "reduced", "none"]
|
||||
) -> None:
|
||||
logger.debug(f"Switching TTS emphasis to: [{emphasis}]")
|
||||
self._params.emphasis = emphasis
|
||||
|
||||
async def set_gender(self, gender: Literal["male", "female", "neutral"]) -> None:
|
||||
logger.debug(f"Switch TTS gender to [{gender}]")
|
||||
self._params.gender = gender
|
||||
|
||||
async def google_style(
|
||||
self, google_style: Literal["apologetic", "calm", "empathetic", "firm", "lively"]
|
||||
) -> None:
|
||||
logger.debug(f"Switching TTS google style to: [{google_style}]")
|
||||
self._params.google_style = google_style
|
||||
|
||||
async def set_params(self, params: InputParams) -> None:
|
||||
logger.debug(f"Switching TTS params to: [{params}]")
|
||||
self._params = params
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
aggregation = self._aggregation
|
||||
self._aggregation = ""
|
||||
|
||||
try:
|
||||
await self.start_ttfb_metrics()
|
||||
if self._function_call_result:
|
||||
frame = self._function_call_result
|
||||
self._function_call_result = None
|
||||
if frame.result:
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": aggregation,
|
||||
"function_call": {
|
||||
"name": frame.function_name,
|
||||
"arguments": json.dumps(frame.arguments),
|
||||
},
|
||||
}
|
||||
)
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "function",
|
||||
"content": json.dumps(frame.result),
|
||||
"name": frame.function_name,
|
||||
}
|
||||
)
|
||||
run_llm = True
|
||||
else:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
ssml = self._construct_ssml(text)
|
||||
synthesis_input = texttospeech_v1.SynthesisInput(ssml=ssml)
|
||||
voice = texttospeech_v1.VoiceSelectionParams(
|
||||
language_code=self._params.language, name=self._voice_id
|
||||
)
|
||||
audio_config = texttospeech_v1.AudioConfig(
|
||||
audio_encoding=texttospeech_v1.AudioEncoding.LINEAR16,
|
||||
sample_rate_hertz=self.sample_rate,
|
||||
)
|
||||
|
||||
request = texttospeech_v1.SynthesizeSpeechRequest(
|
||||
input=synthesis_input, voice=voice, audio_config=audio_config
|
||||
)
|
||||
|
||||
response = await self._client.synthesize_speech(request=request)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
|
||||
# Skip the first 44 bytes to remove the WAV header
|
||||
audio_content = response.audio_content[44:]
|
||||
|
||||
# Read and yield audio data in chunks
|
||||
chunk_size = 8192
|
||||
for i in range(0, len(audio_content), chunk_size):
|
||||
chunk = audio_content[i : i + chunk_size]
|
||||
if not chunk:
|
||||
break
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
await asyncio.sleep(0) # Allow other tasks to run
|
||||
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
if run_llm:
|
||||
await self._user_context_aggregator.push_context_frame()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error generating TTS: {e}")
|
||||
error_message = f"TTS generation error: {str(e)}"
|
||||
yield ErrorFrame(error=error_message)
|
||||
finally:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
logger.error(f"Error processing frame: {e}")
|
||||
|
||||
@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.services.ai_services import AsyncTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -35,7 +35,7 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class LmntTTSService(TTSService):
|
||||
class LmntTTSService(AsyncTTSService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
@@ -47,7 +47,7 @@ class LmntTTSService(TTSService):
|
||||
):
|
||||
# Let TTSService produce TTSStoppedFrames after a short delay of
|
||||
# no activity.
|
||||
super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs)
|
||||
super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
|
||||
@@ -4,39 +4,38 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import aiohttp
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
|
||||
|
||||
import aiohttp
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartInterruptionFrame,
|
||||
TextFrame,
|
||||
LLMModelUpdateFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TextFrame,
|
||||
URLImageRawFrame,
|
||||
VisionImageRawFrame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallInProgressFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
@@ -45,14 +44,12 @@ from pipecat.processors.aggregators.openai_llm_context import (
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import ImageGenService, LLMService, TTSService
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from openai import (
|
||||
NOT_GIVEN,
|
||||
AsyncOpenAI,
|
||||
AsyncStream,
|
||||
BadRequestError,
|
||||
DefaultAsyncHttpxClient,
|
||||
)
|
||||
from openai import AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient, BadRequestError, NOT_GIVEN
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
@@ -205,10 +202,6 @@ class BaseOpenAILLMService(LLMService):
|
||||
return chunks
|
||||
|
||||
async def _process_context(self, context: OpenAILLMContext):
|
||||
functions_list = []
|
||||
arguments_list = []
|
||||
tool_id_list = []
|
||||
func_idx = 0
|
||||
function_name = ""
|
||||
arguments = ""
|
||||
tool_call_id = ""
|
||||
@@ -246,14 +239,6 @@ class BaseOpenAILLMService(LLMService):
|
||||
# yield a frame containing the function name and the arguments.
|
||||
|
||||
tool_call = chunk.choices[0].delta.tool_calls[0]
|
||||
if tool_call.index != func_idx:
|
||||
functions_list.append(function_name)
|
||||
arguments_list.append(arguments)
|
||||
tool_id_list.append(tool_call_id)
|
||||
function_name = ""
|
||||
arguments = ""
|
||||
tool_call_id = ""
|
||||
func_idx += 1
|
||||
if tool_call.function and tool_call.function.name:
|
||||
function_name += tool_call.function.name
|
||||
tool_call_id = tool_call.id
|
||||
@@ -269,46 +254,21 @@ class BaseOpenAILLMService(LLMService):
|
||||
# the context, and re-prompt to get a chat answer. If we don't have a registered
|
||||
# handler, raise an exception.
|
||||
if function_name and arguments:
|
||||
# added to the list as last function name and arguments not added to the list
|
||||
functions_list.append(function_name)
|
||||
arguments_list.append(arguments)
|
||||
tool_id_list.append(tool_call_id)
|
||||
if self.has_function(function_name):
|
||||
await self._handle_function_call(context, tool_call_id, function_name, arguments)
|
||||
else:
|
||||
raise OpenAIUnhandledFunctionException(
|
||||
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
|
||||
)
|
||||
|
||||
total_items = len(functions_list)
|
||||
for index, (function_name, arguments, tool_id) in enumerate(
|
||||
zip(functions_list, arguments_list, tool_id_list), start=1
|
||||
):
|
||||
if self.has_function(function_name):
|
||||
run_llm = index == total_items
|
||||
arguments = json.loads(arguments)
|
||||
await self.call_function(
|
||||
context=context,
|
||||
function_name=function_name,
|
||||
arguments=arguments,
|
||||
tool_call_id=tool_id,
|
||||
run_llm=run_llm,
|
||||
)
|
||||
else:
|
||||
raise OpenAIUnhandledFunctionException(
|
||||
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
|
||||
)
|
||||
|
||||
async def _update_settings(self, frame: LLMUpdateSettingsFrame):
|
||||
if frame.model is not None:
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
if frame.frequency_penalty is not None:
|
||||
await self.set_frequency_penalty(frame.frequency_penalty)
|
||||
if frame.presence_penalty is not None:
|
||||
await self.set_presence_penalty(frame.presence_penalty)
|
||||
if frame.seed is not None:
|
||||
await self.set_seed(frame.seed)
|
||||
if frame.temperature is not None:
|
||||
await self.set_temperature(frame.temperature)
|
||||
if frame.top_p is not None:
|
||||
await self.set_top_p(frame.top_p)
|
||||
if frame.extra:
|
||||
await self.set_extra(frame.extra)
|
||||
async def _handle_function_call(self, context, tool_call_id, function_name, arguments):
|
||||
arguments = json.loads(arguments)
|
||||
await self.call_function(
|
||||
context=context,
|
||||
tool_call_id=tool_call_id,
|
||||
function_name=function_name,
|
||||
arguments=arguments,
|
||||
)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -320,8 +280,9 @@ class BaseOpenAILLMService(LLMService):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
context = OpenAILLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -481,27 +442,31 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
def __init__(self, user_context_aggregator: OpenAIUserContextAggregator):
|
||||
super().__init__(context=user_context_aggregator._context)
|
||||
self._user_context_aggregator = user_context_aggregator
|
||||
self._function_calls_in_progress = {}
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = None
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
# See note above about not calling push_frame() here.
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
self._function_calls_in_progress.clear()
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_finished = None
|
||||
elif isinstance(frame, FunctionCallInProgressFrame):
|
||||
self._function_calls_in_progress[frame.tool_call_id] = frame
|
||||
self._function_call_in_progress = frame
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
if frame.tool_call_id in self._function_calls_in_progress:
|
||||
del self._function_calls_in_progress[frame.tool_call_id]
|
||||
if (
|
||||
self._function_call_in_progress
|
||||
and self._function_call_in_progress.tool_call_id == frame.tool_call_id
|
||||
):
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = frame
|
||||
# TODO-CB: Kwin wants us to refactor this out of here but I REFUSE
|
||||
await self._push_aggregation()
|
||||
else:
|
||||
logger.warning(
|
||||
"FunctionCallResultFrame tool_call_id does not match any function call in progress"
|
||||
f"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id"
|
||||
)
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = None
|
||||
|
||||
async def _push_aggregation(self):
|
||||
@@ -540,7 +505,7 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
}
|
||||
)
|
||||
run_llm = frame.run_llm
|
||||
run_llm = True
|
||||
else:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
|
||||
@@ -7,36 +7,37 @@
|
||||
import json
|
||||
import re
|
||||
import uuid
|
||||
from asyncio import CancelledError
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from asyncio import CancelledError
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartInterruptionFrame,
|
||||
LLMModelUpdateFrame,
|
||||
TextFrame,
|
||||
UserImageRequestFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallInProgressFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMUserContextAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
)
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from together import AsyncTogether
|
||||
@@ -128,25 +129,6 @@ class TogetherLLMService(LLMService):
|
||||
logger.debug(f"Switching LLM extra to: [{extra}]")
|
||||
self._extra = extra
|
||||
|
||||
async def _update_settings(self, frame: LLMUpdateSettingsFrame):
|
||||
if frame.model is not None:
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
if frame.frequency_penalty is not None:
|
||||
await self.set_frequency_penalty(frame.frequency_penalty)
|
||||
if frame.max_tokens is not None:
|
||||
await self.set_max_tokens(frame.max_tokens)
|
||||
if frame.presence_penalty is not None:
|
||||
await self.set_presence_penalty(frame.presence_penalty)
|
||||
if frame.temperature is not None:
|
||||
await self.set_temperature(frame.temperature)
|
||||
if frame.top_k is not None:
|
||||
await self.set_top_k(frame.top_k)
|
||||
if frame.top_p is not None:
|
||||
await self.set_top_p(frame.top_p)
|
||||
if frame.extra:
|
||||
await self.set_extra(frame.extra)
|
||||
|
||||
async def _process_context(self, context: OpenAILLMContext):
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
@@ -206,7 +188,7 @@ class TogetherLLMService(LLMService):
|
||||
if chunk.choices[0].finish_reason == "eos" and accumulating_function_call:
|
||||
await self._extract_function_call(context, function_call_accumulator)
|
||||
|
||||
except CancelledError:
|
||||
except CancelledError as e:
|
||||
# todo: implement token counting estimates for use when the user interrupts a long generation
|
||||
# we do this in the anthropic.py service
|
||||
raise
|
||||
@@ -224,8 +206,9 @@ class TogetherLLMService(LLMService):
|
||||
context = frame.context
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
context = TogetherLLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self.set_model_name(frame.model)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -355,7 +338,7 @@ class TogetherAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
await self._push_aggregation()
|
||||
else:
|
||||
logger.warning(
|
||||
"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id"
|
||||
f"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id"
|
||||
)
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = None
|
||||
|
||||
@@ -31,7 +31,7 @@ from loguru import logger
|
||||
|
||||
class BaseInputTransport(FrameProcessor):
|
||||
def __init__(self, params: TransportParams, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
|
||||
self._params = params
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ from pipecat.utils.time import nanoseconds_to_seconds
|
||||
|
||||
class BaseOutputTransport(FrameProcessor):
|
||||
def __init__(self, params: TransportParams, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sync=False, **kwargs)
|
||||
|
||||
self._params = params
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ deepgram-sdk~=3.5.0
|
||||
fal-client~=0.4.1
|
||||
fastapi~=0.112.1
|
||||
faster-whisper~=1.0.3
|
||||
google-cloud-texttospeech~=2.17.2
|
||||
google-generativeai~=0.7.2
|
||||
langchain~=0.2.14
|
||||
livekit~=0.13.1
|
||||
|
||||
@@ -7,9 +7,9 @@
|
||||
import unittest
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
EndFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
StopTaskFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
@@ -32,7 +32,6 @@ from langchain_core.language_models import FakeStreamingListLLM
|
||||
class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
class MockProcessor(FrameProcessor):
|
||||
def __init__(self, name):
|
||||
super().__init__()
|
||||
self.name = name
|
||||
self.token: list[str] = []
|
||||
# Start collecting tokens when we see the start frame
|
||||
@@ -56,13 +55,13 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
def setUp(self):
|
||||
self.expected_response = "Hello dear human"
|
||||
self.fake_llm = FakeStreamingListLLM(responses=[self.expected_response])
|
||||
self.mock_proc = self.MockProcessor("token_collector")
|
||||
|
||||
async def test_langchain(self):
|
||||
messages = [("system", "Say hello to {name}"), ("human", "{input}")]
|
||||
prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas")
|
||||
chain = prompt | self.fake_llm
|
||||
proc = LangchainProcessor(chain=chain)
|
||||
self.mock_proc = self.MockProcessor("token_collector")
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
@@ -82,7 +81,7 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
UserStartedSpeakingFrame(),
|
||||
TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"),
|
||||
UserStoppedSpeakingFrame(),
|
||||
EndFrame(),
|
||||
StopTaskFrame(),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user