Compare commits

...

26 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
81c2c5adfa working on a fastapi http transport 2024-09-17 09:52:19 -07:00
Aleix Conchillo Flaqué
d9d6571c73 Merge pull request #465 from kunal-cai/ks--fix-ws
[Cartesia] Fix streaming truncation bug with Twilio Fast API WS
2024-09-16 17:17:13 -07:00
Kunal Shah
540cad4844 Undo sorting 2024-09-16 16:07:19 -07:00
Kunal Shah
0a26b650c0 Undo sorting 2024-09-16 16:06:25 -07:00
Kunal Shah
adaac003e5 [Cartesia] Fix streaming truncation bug with Twilio Fast API WS 2024-09-16 15:59:06 -07:00
Aleix Conchillo Flaqué
3d4f125071 Merge pull request #454 from pipecat-ai/aleix/initial-pipeline-clock-support
initial pipeline clock support
2024-09-13 13:51:04 -07:00
Aleix Conchillo Flaqué
bce87f8717 update CHANGELOG.md 2024-09-13 13:50:03 -07:00
Aleix Conchillo Flaqué
1fe940bd6b servceis(cartesia,elevenlabs): use word start times instead 2024-09-13 13:10:44 -07:00
Aleix Conchillo Flaqué
cb36a71381 fix some linting 2024-09-13 09:56:12 -07:00
Aleix Conchillo Flaqué
5acc4928fe examples: add 07d-interruptible-elevenlabs.py 2024-09-13 09:43:18 -07:00
Aleix Conchillo Flaqué
434493b8aa services(elevenlabs): implement word-by-word support through websockets 2024-09-13 09:31:35 -07:00
Aleix Conchillo Flaqué
f08b25dbb2 examples: assistant aggregator should always goes after transport 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
3665734972 transports(output): initial sink clock synchronization 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
a98d78cdea services(lmnt): change to subclass of AsyncTTSService 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
80f6d74e80 services(cartesia): change to subclass of AsyncWordTTSService 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
02d926e9bd services: create AsyncTTSService and AsyncWordTTSService 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
7749692f72 processors: get pipeline clock from StartFrame 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
7807cbeb39 pipeline(task): add a clock to the pipeline task 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
72f231b327 frames: add a presentation timestamp (pts) to each frame 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
3cbe97d346 clocks: added new BaseClock and SystemClock 2024-09-12 00:31:48 -07:00
Kwindla Hultman Kramer
b880e1a60e Merge pull request #448 from pipecat-ai/khk/aggregation-leading-space
fix for leading space in context aggregator strings
2024-09-10 09:57:35 -07:00
Aleix Conchillo Flaqué
886046e696 Merge pull request #445 from dleybz/patch-1
Update requirements.txt
2024-09-09 17:54:33 -07:00
Aleix Conchillo Flaqué
9106a5f8ae Merge pull request #449 from pipecat-ai/aleix/audio-out-bitrate
transports(daily): allow setting audio output bitrate (default 96kpbs)
2024-09-09 08:39:06 -07:00
Aleix Conchillo Flaqué
98286336bf transports(daily): allow setting audio output bitrate (default 96kpbs)
Fixes #388
2024-09-08 19:39:17 -07:00
Kwindla Hultman Kramer
081b001c8b fix for leading space in context aggregator strings 2024-09-07 16:42:52 -07:00
Danny D. Leybzon
c92531a02f Update requirements.txt
request.form() throws an error if you don't have python-multipart installed
2024-09-06 20:22:18 +02:00
23 changed files with 650 additions and 135 deletions

View File

@@ -9,6 +9,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
optional `pts` field (prensentation timestamp). There's currently just one
clock implementation `SystemClock` and the `pts` field is currently only used
for `TextFrame`s (audio and image frames will be next).
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
default is 96kbps.
- `DailyTransport` now uses the number of audio output channels (1 or 2) to set
mono or stereo audio when needed.
- Interruptions support has been added to `TwilioFrameSerializer` when using
`FastAPIWebsocketTransport`.
@@ -23,6 +40,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
timestamps to their text output. This allows the output transport to push the
text frames downstream at almost the same time the words are spoken. We say
"almost" because currently the audio frames don't have presentation timestamp
but they should be played at roughly the same time.
- `DailyTransport.on_joined` event now returns the full session data instead of
just the participant.

View File

@@ -3,3 +3,4 @@ fastapi
uvicorn
python-dotenv
twilio
python-multipart

View File

@@ -4,8 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import aiohttp
import os
import sys
@@ -15,12 +15,11 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
@@ -41,7 +40,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_out_sample_rate=44100,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
@@ -49,10 +47,9 @@ async def main():
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
sample_rate=44100,
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
@@ -74,11 +71,16 @@ async def main():
tma_in, # User responses
llm, # LLM
tts, # TTS
tma_out, # Goes before the transport because cartesia has word-level timestamps!
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -147,8 +147,8 @@ Your task is to help the user understand and learn from this article in 2 senten
tma_in,
llm,
tts,
tma_out,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))

View File

@@ -39,7 +39,7 @@ azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
cartesia = [ "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.5.0" ]
elevenlabs = [ "elevenlabs~=1.7.0" ]
elevenlabs = [ "websockets~=12.0" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]

View File

View File

@@ -0,0 +1,18 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class BaseClock(ABC):
@abstractmethod
def get_time(self) -> int:
pass
@abstractmethod
def start(self):
pass

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import time
from pipecat.clocks.base_clock import BaseClock
class SystemClock(BaseClock):
def __init__(self):
self._time = 0
def get_time(self) -> int:
return time.monotonic_ns() - self._time if self._time > 0 else 0
def start(self):
self._time = time.monotonic_ns()

View File

@@ -8,19 +8,27 @@ from typing import Any, List, Mapping, Optional, Tuple
from dataclasses import dataclass, field
from pipecat.clocks.base_clock import BaseClock
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
from pipecat.vad.vad_analyzer import VADParams
def format_pts(pts: int | None):
return nanoseconds_to_str(pts) if pts else None
@dataclass
class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
def __str__(self):
return self.name
@@ -46,7 +54,8 @@ class AudioRawFrame(DataFrame):
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
def __str__(self):
return f"{self.name}(size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
@@ -60,7 +69,8 @@ class ImageRawFrame(DataFrame):
format: str | None
def __str__(self):
return f"{self.name}(size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
@dataclass
@@ -72,7 +82,8 @@ class URLImageRawFrame(ImageRawFrame):
url: str | None
def __str__(self):
return f"{self.name}(url: {self.url}, size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
@dataclass
@@ -84,7 +95,8 @@ class VisionImageRawFrame(ImageRawFrame):
text: str | None
def __str__(self):
return f"{self.name}(text: {self.text}, size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
@dataclass
@@ -96,7 +108,8 @@ class UserImageRawFrame(ImageRawFrame):
user_id: str
def __str__(self):
return f"{self.name}(user: {self.user_id}, size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
@dataclass
@@ -109,7 +122,8 @@ class SpriteFrame(Frame):
images: List[ImageRawFrame]
def __str__(self):
return f"{self.name}(size: {len(self.images)})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.images)})"
@dataclass
@@ -121,7 +135,8 @@ class TextFrame(DataFrame):
text: str
def __str__(self):
return f"{self.name}(text: {self.text})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text})"
@dataclass
@@ -326,6 +341,7 @@ class ControlFrame(Frame):
@dataclass
class StartFrame(ControlFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False

View File

@@ -10,6 +10,8 @@ from typing import AsyncIterable, Iterable
from pydantic import BaseModel
from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -60,11 +62,16 @@ class Source(FrameProcessor):
class PipelineTask:
def __init__(self, pipeline: BasePipeline, params: PipelineParams = PipelineParams()):
def __init__(
self,
pipeline: BasePipeline,
params: PipelineParams = PipelineParams(),
clock: BaseClock = SystemClock()):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self._pipeline = pipeline
self._clock = clock
self._params = params
self._finished = False
@@ -116,11 +123,14 @@ class PipelineTask:
return MetricsFrame(ttfb=ttfb, processing=processing)
async def _process_down_queue(self):
self._clock.start()
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
clock=self._clock
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)

View File

@@ -109,7 +109,7 @@ class LLMResponseAggregator(FrameProcessor):
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
# We have recevied a complete sentence, so if we have seen the
# end frame and we were still aggregating, it means we should
# send the aggregation.

View File

@@ -9,6 +9,7 @@ import time
from enum import Enum
from pipecat.clocks.base_clock import BaseClock
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -96,6 +97,9 @@ class FrameProcessor:
self._next: "FrameProcessor" | None = None
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
# Clock
self._clock: BaseClock | None = None
# Properties
self._allow_interruptions = False
self._enable_metrics = False
@@ -177,8 +181,12 @@ class FrameProcessor:
def get_parent(self) -> "FrameProcessor":
return self._parent
def get_clock(self) -> BaseClock:
return self._clock
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._clock = frame.clock
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics

View File

@@ -9,7 +9,7 @@ import io
import wave
from abc import abstractmethod
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, List, Optional, Tuple
from pipecat.frames.frames import (
AudioRawFrame,
@@ -37,9 +37,12 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transcriptions.language import Language
from pipecat.utils.audio import calculate_audio_volume
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import seconds_to_nanoseconds
from pipecat.utils.utils import exp_smoothing
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from loguru import logger
class AIService(FrameProcessor):
def __init__(self, **kwargs):
@@ -167,7 +170,7 @@ class TTSService(AIService):
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 0.8,
stop_frame_timeout_s: float = 1.0,
**kwargs):
super().__init__(**kwargs)
self._aggregate_sentences: bool = aggregate_sentences
@@ -303,6 +306,74 @@ class TTSService(AIService):
pass
class AsyncTTSService(TTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@abstractmethod
async def flush_audio(self):
pass
class AsyncWordTTSService(AsyncTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._initial_word_timestamp = -1
self._words_queue = asyncio.Queue()
self._words_task = self.get_event_loop().create_task(self._words_task_handler())
def start_word_timestamps(self):
if self._initial_word_timestamp == -1:
self._initial_word_timestamp = self.get_clock().get_time()
def reset_word_timestamps(self):
self._initial_word_timestamp = -1
self._word_timestamps = []
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
for (word, timestamp) in word_times:
await self._words_queue.put((word, seconds_to_nanoseconds(timestamp)))
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._stop_words_task()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._stop_words_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
await self.flush_audio()
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
self.reset_word_timestamps()
async def _stop_words_task(self):
if self._words_task:
self._words_task.cancel()
await self._words_task
async def _words_task_handler(self):
while True:
try:
(word, timestamp) = await self._words_queue.get()
if word == "LLMFullResponseEndFrame" and timestamp == 0:
await self.push_frame(LLMFullResponseEndFrame())
else:
frame = TextFrame(word)
frame.pts = self._initial_word_timestamp + timestamp
await self.push_frame(frame)
self._words_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} exception: {e}")
class STTService(AIService):
"""STTService is a base class for speech-to-text services."""

View File

@@ -27,7 +27,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transcriptions.language import Language
from pipecat.services.ai_services import TTSService
from pipecat.services.ai_services import AsyncWordTTSService
from loguru import logger
@@ -60,7 +60,7 @@ def language_to_cartesia_language(language: Language) -> str | None:
return None
class CartesiaTTSService(TTSService):
class CartesiaTTSService(AsyncWordTTSService):
def __init__(
self,
@@ -74,19 +74,17 @@ class CartesiaTTSService(TTSService):
sample_rate: int = 16000,
language: str = "en",
**kwargs):
super().__init__(**kwargs)
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for
# a full sentence should only "cost" us 15ms or so with GPT-4o or a Llama 3
# model, and it's worth it for the better audio quality.
self._aggregate_sentences = True
# we don't want to automatically push LLM response text frames, because the
# context aggregators will add them to the LLM context even if we're
# interrupted. cartesia gives us word-by-word timestamps. we can use those
# to generate text frames ourselves aligned with the playout timing of the audio!
self._push_text_frames = False
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
# 3 model, and it's worth it for the better audio quality.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
super().__init__(aggregate_sentences=True, push_text_frames=False, **kwargs)
self._api_key = api_key
self._cartesia_version = cartesia_version
@@ -102,10 +100,7 @@ class CartesiaTTSService(TTSService):
self._websocket = None
self._context_id = None
self._context_id_start_timestamp = None
self._timestamped_words_buffer = []
self._receive_task = None
self._context_appending_task = None
def can_generate_metrics(self) -> bool:
return True
@@ -140,7 +135,6 @@ class CartesiaTTSService(TTSService):
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler())
except Exception as e:
logger.exception(f"{self} initialization error: {e}")
self._websocket = None
@@ -149,10 +143,6 @@ class CartesiaTTSService(TTSService):
try:
await self.stop_all_metrics()
if self._context_appending_task:
self._context_appending_task.cancel()
await self._context_appending_task
self._context_appending_task = None
if self._receive_task:
self._receive_task.cancel()
await self._receive_task
@@ -162,18 +152,33 @@ class CartesiaTTSService(TTSService):
self._websocket = None
self._context_id = None
self._context_id_start_timestamp = None
self._timestamped_words_buffer = []
except Exception as e:
logger.exception(f"{self} error closing websocket: {e}")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
self._context_id = None
self._context_id_start_timestamp = None
self._timestamped_words_buffer = []
await self.stop_all_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._context_id = None
async def flush_audio(self):
if not self._context_id or not self._websocket:
return
logger.debug("Flushing audio")
msg = {
"transcript": "",
"continue": False,
"context_id": self._context_id,
"model_id": self._model_id,
"voice": {
"mode": "id",
"id": self._voice_id
},
"output_format": self._output_format,
"language": self._language,
"add_timestamps": True,
}
await self._websocket.send(json.dumps(msg))
async def _receive_task_handler(self):
try:
@@ -188,16 +193,14 @@ class CartesiaTTSService(TTSService):
# because we are likely still playing out audio and need the
# timestamp to set send context frames.
self._context_id = None
self._timestamped_words_buffer.append(("LLMFullResponseEndFrame", 0))
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
elif msg["type"] == "timestamps":
# logger.debug(f"TIMESTAMPS: {msg}")
self._timestamped_words_buffer.extend(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"]))
await self.add_word_timestamps(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
)
elif msg["type"] == "chunk":
await self.stop_ttfb_metrics()
if not self._context_id_start_timestamp:
self._context_id_start_timestamp = time.time()
self.start_word_timestamps()
frame = AudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self._output_format["sample_rate"],
@@ -216,27 +219,6 @@ class CartesiaTTSService(TTSService):
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def _context_appending_task_handler(self):
try:
while True:
await asyncio.sleep(0.1)
if not self._context_id_start_timestamp:
continue
elapsed_seconds = time.time() - self._context_id_start_timestamp
# Pop all words from self._timestamped_words_buffer that are
# older than the elapsed time and print a message about them to
# the console.
while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds:
word, timestamp = self._timestamped_words_buffer.pop(0)
if word == "LLMFullResponseEndFrame" and timestamp == 0:
await self.push_frame(LLMFullResponseEndFrame())
continue
await self.push_frame(TextFrame(word))
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -4,17 +4,30 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import AsyncGenerator, Literal
import asyncio
import base64
import json
from typing import Any, AsyncGenerator, List, Literal, Mapping, Tuple
from pydantic import BaseModel
from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame
from pipecat.services.ai_services import TTSService
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
StartFrame,
StartInterruptionFrame,
TTSStartedFrame,
TTSStoppedFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncWordTTSService
from loguru import logger
# See .env.example for ElevenLabs configuration needed
try:
from elevenlabs.client import AsyncElevenLabs
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -35,7 +48,28 @@ def sample_rate_from_output_format(output_format: str) -> int:
return 16000
class ElevenLabsTTSService(TTSService):
def calculate_word_times(
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
zipped_times = list(zip(alignment_info["chars"], alignment_info["charStartTimesMs"]))
words = "".join(alignment_info["chars"]).split(" ")
# Calculate start time for each word. We do this by finding a space character
# and using the previous word time, also taking into account there might not
# be a space at the end.
times = []
for (i, (a, b)) in enumerate(zipped_times):
if a == " " or i == len(zipped_times) - 1:
t = cumulative_time + (zipped_times[i - 1][1] / 1000.0)
times.append(t)
word_times = list(zip(words, times))
return word_times
class ElevenLabsTTSService(AsyncWordTTSService):
class InputParams(BaseModel):
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
@@ -45,49 +79,185 @@ class ElevenLabsTTSService(TTSService):
api_key: str,
voice_id: str,
model: str = "eleven_turbo_v2_5",
url: str = "wss://api.elevenlabs.io",
params: InputParams = InputParams(),
**kwargs):
super().__init__(**kwargs)
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
# 3 model, and it's worth it for the better audio quality.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
#
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
**kwargs
)
self._api_key = api_key
self._voice_id = voice_id
self._model = model
self._url = url
self._params = params
self._client = AsyncElevenLabs(api_key=api_key)
self._sample_rate = sample_rate_from_output_format(params.output_format)
# Websocket connection to ElevenLabs.
self._websocket = None
# Indicates if we have sent TTSStartedFrame. It will reset to False when
# there's an interruption or TTSStoppedFrame.
self._started = False
self._cumulative_time = 0
def can_generate_metrics(self) -> bool:
return True
async def set_model(self, model: str):
logger.debug(f"Switching TTS model to: [{model}]")
self._model = model
await self._disconnect()
await self._connect()
async def set_voice(self, voice: str):
logger.debug(f"Switching TTS voice to: [{voice}]")
self._voice_id = voice
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()
async def flush_audio(self):
if self._websocket:
msg = {"text": " ", "flush": True}
await self._websocket.send(json.dumps(msg))
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
self._started = False
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
async def _connect(self):
try:
voice_id = self._voice_id
model = self._model
output_format = self._params.output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}"
self._websocket = await websockets.connect(url)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._keepalive_task = self.get_event_loop().create_task(self._keepalive_task_handler())
# According to ElevenLabs, we should always start with a single space.
msg = {
"text": " ",
"xi_api_key": self._api_key,
}
await self._websocket.send(json.dumps(msg))
except Exception as e:
logger.exception(f"{self} initialization error: {e}")
self._websocket = None
async def _disconnect(self):
try:
await self.stop_all_metrics()
if self._receive_task:
self._receive_task.cancel()
await self._receive_task
self._receive_task = None
if self._keepalive_task:
self._keepalive_task.cancel()
await self._keepalive_task
self._keepalive_task = None
if self._websocket:
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.exception(f"{self} error closing websocket: {e}")
async def _receive_task_handler(self):
try:
async for message in self._websocket:
msg = json.loads(message)
if msg.get("audio"):
await self.stop_ttfb_metrics()
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
frame = AudioRawFrame(audio, self._sample_rate, 1)
await self.push_frame(frame)
if msg.get("alignment"):
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
await self.add_word_timestamps(word_times)
self._cumulative_time = word_times[-1][1]
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def _keepalive_task_handler(self):
while True:
try:
await asyncio.sleep(10)
await self._send_text("")
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def _send_text(self, text: str):
if self._websocket:
msg = {"text": text + " "}
await self._websocket.send(json.dumps(msg))
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
await self.start_tts_usage_metrics(text)
await self.start_ttfb_metrics()
try:
if not self._websocket:
await self._connect()
results = await self._client.generate(
text=text,
voice=self._voice_id,
model=self._model,
output_format=self._params.output_format
)
try:
if not self._started:
await self.push_frame(TTSStartedFrame())
await self.start_ttfb_metrics()
self._started = True
self._cumulative_time = 0
tts_started = False
async for audio in results:
# This is so we send TTSStartedFrame when we have the first audio
# bytes.
if not tts_started:
await self.push_frame(TTSStartedFrame())
tts_started = True
await self.stop_ttfb_metrics()
frame = AudioRawFrame(audio, self._sample_rate, 1)
yield frame
await self.push_frame(TTSStoppedFrame())
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
await self.push_frame(TTSStoppedFrame())
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.exception(f"{self} exception: {e}")

View File

@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.ai_services import TTSService
from pipecat.services.ai_services import AsyncTTSService
from loguru import logger
@@ -34,7 +34,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class LmntTTSService(TTSService):
class LmntTTSService(AsyncTTSService):
def __init__(
self,
@@ -44,11 +44,9 @@ class LmntTTSService(TTSService):
sample_rate: int = 24000,
language: str = "en",
**kwargs):
super().__init__(**kwargs)
# Let TTSService produce TTSStoppedFrames after a short delay of
# no activity.
self._push_stop_frames = True
super().__init__(push_stop_frames=True, **kwargs)
self._api_key = api_key
self._voice_id = voice_id
@@ -62,6 +60,8 @@ class LmntTTSService(TTSService):
self._speech = None
self._connection = None
self._receive_task = None
# Indicates if we have sent TTSStartedFrame. It will reset to False when
# there's an interruption or TTSStoppedFrame.
self._started = False
def can_generate_metrics(self) -> bool:

View File

View File

@@ -8,6 +8,7 @@
import asyncio
import itertools
import time
import sys
from PIL import Image
from typing import List
@@ -30,11 +31,14 @@ from pipecat.frames.frames import (
SystemFrame,
TTSStartedFrame,
TTSStoppedFrame,
TextFrame,
TransportMessageFrame)
from pipecat.transports.base_transport import TransportParams
from loguru import logger
from pipecat.utils.time import nanoseconds_to_seconds
class BaseOutputTransport(FrameProcessor):
@@ -64,7 +68,7 @@ class BaseOutputTransport(FrameProcessor):
# Create sink frame task. This is the task that will actually write
# audio or video frames. We write audio/video in a task so we can keep
# generating frames upstream while, for example, the audio is playing.
self._create_sink_task()
self._create_sink_tasks()
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
@@ -149,6 +153,7 @@ class BaseOutputTransport(FrameProcessor):
await self._sink_queue.put(frame)
await self.start(frame)
elif isinstance(frame, EndFrame):
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)
await self.stop(frame)
# Other frames.
@@ -158,6 +163,9 @@ class BaseOutputTransport(FrameProcessor):
await self._handle_image(frame)
elif isinstance(frame, TransportMessageFrame) and frame.urgent:
await self.send_message(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
else:
await self._sink_queue.put(frame)
@@ -166,10 +174,14 @@ class BaseOutputTransport(FrameProcessor):
return
if isinstance(frame, StartInterruptionFrame):
# Stop sink task.
# Stop sink tasks.
self._sink_task.cancel()
await self._sink_task
self._create_sink_task()
# Stop sink clock tasks.
self._sink_clock_task.cancel()
await self._sink_clock_task
# Create sink tasks.
self._create_sink_tasks()
# Stop push task.
self._push_frame_task.cancel()
await self._push_frame_task
@@ -201,43 +213,83 @@ class BaseOutputTransport(FrameProcessor):
else:
await self._sink_queue.put(frame)
def _create_sink_task(self):
#
# Sink tasks
#
def _create_sink_tasks(self):
loop = self.get_event_loop()
self._sink_queue = asyncio.Queue()
self._sink_task = loop.create_task(self._sink_task_handler())
self._sink_clock_queue = asyncio.PriorityQueue()
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
async def _sink_frame_handler(self, frame: Frame):
if isinstance(frame, AudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
await self._internal_push_frame(frame)
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
elif isinstance(frame, ImageRawFrame):
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
elif isinstance(frame, TTSStartedFrame):
await self._bot_started_speaking()
await self._internal_push_frame(frame)
elif isinstance(frame, TTSStoppedFrame):
await self._bot_stopped_speaking()
await self._internal_push_frame(frame)
else:
await self._internal_push_frame(frame)
async def _sink_task_handler(self):
running = True
while running:
try:
frame = await self._sink_queue.get()
if isinstance(frame, AudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
await self._internal_push_frame(frame)
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
elif isinstance(frame, ImageRawFrame):
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
elif isinstance(frame, TTSStartedFrame):
await self._bot_started_speaking()
await self._internal_push_frame(frame)
elif isinstance(frame, TTSStoppedFrame):
await self._bot_stopped_speaking()
await self._internal_push_frame(frame)
else:
await self._internal_push_frame(frame)
await self._sink_frame_handler(frame)
running = not isinstance(frame, EndFrame)
self._sink_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} error processing sink queue: {e}")
async def _sink_clock_frame_handler(self, frame: Frame):
# TODO(aleix): For now we just process TextFrame. But we should process
# audio and video as well.
if isinstance(frame, TextFrame):
await self._internal_push_frame(frame)
async def _sink_clock_task_handler(self):
running = True
while running:
try:
timestamp, _, frame = await self._sink_clock_queue.get()
# If we hit an EndFrame, we cna finish right away.
running = not isinstance(frame, EndFrame)
# If we have a frame we check it's presentation timestamp. If it
# has already passed we process it, otherwise we wait until it's
# time to process it.
if running:
current_time = self.get_clock().get_time()
if timestamp <= current_time:
await self._sink_clock_frame_handler(frame)
else:
wait_time = nanoseconds_to_seconds(timestamp - current_time)
await asyncio.sleep(wait_time)
await self._sink_frame_handler(frame)
self._sink_clock_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} error processing sink clock queue: {e}")
async def _bot_started_speaking(self):
logger.debug("Bot started speaking")
self._bot_speaking = True

View File

@@ -32,6 +32,7 @@ class TransportParams(BaseModel):
audio_out_is_live: bool = False
audio_out_sample_rate: int = 16000
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_in_enabled: bool = False
audio_in_sample_rate: int = 16000
audio_in_channels: int = 1

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import json
import io
import wave
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from loguru import logger
try:
from fastapi import Request, Response
from starlette.background import BackgroundTask
from sse_starlette.sse import EventSourceResponse
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use FastAPI HTTP SSE, you need to `pip install pipecat-ai[http]`.")
raise Exception(f"Missing module: {e}")
class FastAPIHTTPParams(TransportParams):
serializer: FrameSerializer
class FastAPIHTTPInputTransport(BaseInputTransport):
def __init__(
self,
params: FastAPIHTTPParams,
**kwargs):
super().__init__(params, **kwargs)
self._params = params
self._request = None
# todo: this should probably expect a list of frames, not just one frame
async def handle_request(self, request: Request):
self._request = request
frames_list = await request.json()
logger.debug(f"Received frames: {frames_list}")
for frame in frames_list:
logger.debug(f"Received frame: {frame}")
frame = self._params.serializer.deserialize(frame)
if frame and isinstance(frame, AudioRawFrame):
await self.push_audio_frame(frame)
else:
await self.push_frame(frame)
class FastAPIHTTPOutputTransport(BaseOutputTransport):
def __init__(self, params: FastAPIHTTPParams, **kwargs):
super().__init__(params, **kwargs)
self._params = params
self._event_queue = asyncio.Queue()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
pass
async def _write_frame(self, frame: Frame):
payload = self._params.serializer.serialize(frame)
await self._event_queue.put(payload)
async def event_generator(self):
while True:
event = await self._event_queue.get()
logger.debug(f"Sending event {event}")
yield event
class FastAPIHTTPTransport(BaseTransport):
def __init__(
self,
params: FastAPIHTTPParams,
input_name: str | None = None,
output_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
self._params = params
self._request = None
self._input = FastAPIHTTPInputTransport(
self._params, name=self._input_name)
self._output = FastAPIHTTPOutputTransport(
self._params, name=self._output_name)
def input(self) -> FrameProcessor:
return self._input
def output(self) -> FrameProcessor:
return self._output
async def handle_request(self, request: Request):
self._request = request
await self._input.handle_request(request)
return EventSourceResponse(self._output.event_generator())

View File

@@ -101,7 +101,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def write_raw_audio_frames(self, frames: bytes):
self._websocket_audio_buffer += frames
while len(self._websocket_audio_buffer) >= self._params.audio_frame_size:
while len(self._websocket_audio_buffer):
frame = AudioRawFrame(
audio=self._websocket_audio_buffer[:
self._params.audio_frame_size],

View File

@@ -366,6 +366,12 @@ class DailyTransportClient(EventHandler):
}
},
}
},
"microphone": {
"sendSettings": {
"channelConfig": "stereo" if self._params.audio_out_channels == 2 else "mono",
"bitrate": self._params.audio_out_bitrate,
}
}
},
})

View File

@@ -9,3 +9,20 @@ import datetime
def time_now_iso8601() -> str:
return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="milliseconds")
def seconds_to_nanoseconds(seconds: float) -> int:
return int(seconds * 1_000_000_000)
def nanoseconds_to_seconds(nanoseconds: int) -> float:
return nanoseconds / 1_000_000_000
def nanoseconds_to_str(nanoseconds: int) -> str:
total_seconds = nanoseconds_to_seconds(nanoseconds)
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
microseconds = int((total_seconds - int(total_seconds)) * 1_000_000)
return f"{hours}:{minutes:02}:{seconds:02}.{microseconds:06}"