Compare commits
26 Commits
khk/togeth
...
khk/http
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81c2c5adfa | ||
|
|
d9d6571c73 | ||
|
|
540cad4844 | ||
|
|
0a26b650c0 | ||
|
|
adaac003e5 | ||
|
|
3d4f125071 | ||
|
|
bce87f8717 | ||
|
|
1fe940bd6b | ||
|
|
cb36a71381 | ||
|
|
5acc4928fe | ||
|
|
434493b8aa | ||
|
|
f08b25dbb2 | ||
|
|
3665734972 | ||
|
|
a98d78cdea | ||
|
|
80f6d74e80 | ||
|
|
02d926e9bd | ||
|
|
7749692f72 | ||
|
|
7807cbeb39 | ||
|
|
72f231b327 | ||
|
|
3cbe97d346 | ||
|
|
b880e1a60e | ||
|
|
886046e696 | ||
|
|
9106a5f8ae | ||
|
|
98286336bf | ||
|
|
081b001c8b | ||
|
|
c92531a02f |
23
CHANGELOG.md
23
CHANGELOG.md
@@ -9,6 +9,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- A clock can now be specified to `PipelineTask` (defaults to
|
||||
`SystemClock`). This clock will be passed to each frame processor via the
|
||||
`StartFrame`.
|
||||
|
||||
- Added pipeline clocks. A pipeline clock is used by the output transport to
|
||||
know when a frame needs to be presented. For that, all frames now have an
|
||||
optional `pts` field (prensentation timestamp). There's currently just one
|
||||
clock implementation `SystemClock` and the `pts` field is currently only used
|
||||
for `TextFrame`s (audio and image frames will be next).
|
||||
|
||||
- `DailyTransport` now supports setting the audio bitrate to improve audio
|
||||
quality through the `DailyParams.audio_out_bitrate` parameter. The new
|
||||
default is 96kbps.
|
||||
|
||||
- `DailyTransport` now uses the number of audio output channels (1 or 2) to set
|
||||
mono or stereo audio when needed.
|
||||
|
||||
- Interruptions support has been added to `TwilioFrameSerializer` when using
|
||||
`FastAPIWebsocketTransport`.
|
||||
|
||||
@@ -23,6 +40,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
|
||||
timestamps to their text output. This allows the output transport to push the
|
||||
text frames downstream at almost the same time the words are spoken. We say
|
||||
"almost" because currently the audio frames don't have presentation timestamp
|
||||
but they should be played at roughly the same time.
|
||||
|
||||
- `DailyTransport.on_joined` event now returns the full session data instead of
|
||||
just the participant.
|
||||
|
||||
|
||||
@@ -3,3 +3,4 @@ fastapi
|
||||
uvicorn
|
||||
python-dotenv
|
||||
twilio
|
||||
python-multipart
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -15,12 +15,11 @@ from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
@@ -41,7 +40,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_sample_rate=44100,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
@@ -49,10 +47,9 @@ async def main():
|
||||
)
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
|
||||
sample_rate=44100,
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
@@ -74,11 +71,16 @@ async def main():
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
tma_out, # Goes before the transport because cartesia has word-level timestamps!
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
@@ -147,8 +147,8 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
tma_out,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
@@ -39,7 +39,7 @@ azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
|
||||
cartesia = [ "websockets~=12.0" ]
|
||||
daily = [ "daily-python~=0.10.1" ]
|
||||
deepgram = [ "deepgram-sdk~=3.5.0" ]
|
||||
elevenlabs = [ "elevenlabs~=1.7.0" ]
|
||||
elevenlabs = [ "websockets~=12.0" ]
|
||||
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.1" ]
|
||||
gladia = [ "websockets~=12.0" ]
|
||||
|
||||
0
src/pipecat/clocks/__init__.py
Normal file
0
src/pipecat/clocks/__init__.py
Normal file
18
src/pipecat/clocks/base_clock.py
Normal file
18
src/pipecat/clocks/base_clock.py
Normal file
@@ -0,0 +1,18 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseClock(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def get_time(self) -> int:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start(self):
|
||||
pass
|
||||
21
src/pipecat/clocks/system_clock.py
Normal file
21
src/pipecat/clocks/system_clock.py
Normal file
@@ -0,0 +1,21 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import time
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
|
||||
|
||||
class SystemClock(BaseClock):
|
||||
|
||||
def __init__(self):
|
||||
self._time = 0
|
||||
|
||||
def get_time(self) -> int:
|
||||
return time.monotonic_ns() - self._time if self._time > 0 else 0
|
||||
|
||||
def start(self):
|
||||
self._time = time.monotonic_ns()
|
||||
@@ -8,19 +8,27 @@ from typing import Any, List, Mapping, Optional, Tuple
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import nanoseconds_to_str
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
from pipecat.vad.vad_analyzer import VADParams
|
||||
|
||||
|
||||
def format_pts(pts: int | None):
|
||||
return nanoseconds_to_str(pts) if pts else None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Frame:
|
||||
id: int = field(init=False)
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
@@ -46,7 +54,8 @@ class AudioRawFrame(DataFrame):
|
||||
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -60,7 +69,8 @@ class ImageRawFrame(DataFrame):
|
||||
format: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -72,7 +82,8 @@ class URLImageRawFrame(ImageRawFrame):
|
||||
url: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(url: {self.url}, size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -84,7 +95,8 @@ class VisionImageRawFrame(ImageRawFrame):
|
||||
text: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(text: {self.text}, size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -96,7 +108,8 @@ class UserImageRawFrame(ImageRawFrame):
|
||||
user_id: str
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -109,7 +122,8 @@ class SpriteFrame(Frame):
|
||||
images: List[ImageRawFrame]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(size: {len(self.images)})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.images)})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -121,7 +135,8 @@ class TextFrame(DataFrame):
|
||||
text: str
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(text: {self.text})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, text: {self.text})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -326,6 +341,7 @@ class ControlFrame(Frame):
|
||||
@dataclass
|
||||
class StartFrame(ControlFrame):
|
||||
"""This is the first frame that should be pushed down a pipeline."""
|
||||
clock: BaseClock
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
|
||||
@@ -10,6 +10,8 @@ from typing import AsyncIterable, Iterable
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.clocks.system_clock import SystemClock
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
@@ -60,11 +62,16 @@ class Source(FrameProcessor):
|
||||
|
||||
class PipelineTask:
|
||||
|
||||
def __init__(self, pipeline: BasePipeline, params: PipelineParams = PipelineParams()):
|
||||
def __init__(
|
||||
self,
|
||||
pipeline: BasePipeline,
|
||||
params: PipelineParams = PipelineParams(),
|
||||
clock: BaseClock = SystemClock()):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
|
||||
self._pipeline = pipeline
|
||||
self._clock = clock
|
||||
self._params = params
|
||||
self._finished = False
|
||||
|
||||
@@ -116,11 +123,14 @@ class PipelineTask:
|
||||
return MetricsFrame(ttfb=ttfb, processing=processing)
|
||||
|
||||
async def _process_down_queue(self):
|
||||
self._clock.start()
|
||||
|
||||
start_frame = StartFrame(
|
||||
allow_interruptions=self._params.allow_interruptions,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_metrics,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
clock=self._clock
|
||||
)
|
||||
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ class LLMResponseAggregator(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, self._accumulator_frame):
|
||||
if self._aggregating:
|
||||
self._aggregation += f" {frame.text}"
|
||||
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
|
||||
# We have recevied a complete sentence, so if we have seen the
|
||||
# end frame and we were still aggregating, it means we should
|
||||
# send the aggregation.
|
||||
|
||||
@@ -9,6 +9,7 @@ import time
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -96,6 +97,9 @@ class FrameProcessor:
|
||||
self._next: "FrameProcessor" | None = None
|
||||
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
|
||||
|
||||
# Clock
|
||||
self._clock: BaseClock | None = None
|
||||
|
||||
# Properties
|
||||
self._allow_interruptions = False
|
||||
self._enable_metrics = False
|
||||
@@ -177,8 +181,12 @@ class FrameProcessor:
|
||||
def get_parent(self) -> "FrameProcessor":
|
||||
return self._parent
|
||||
|
||||
def get_clock(self) -> BaseClock:
|
||||
return self._clock
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, StartFrame):
|
||||
self._clock = frame.clock
|
||||
self._allow_interruptions = frame.allow_interruptions
|
||||
self._enable_metrics = frame.enable_metrics
|
||||
self._enable_usage_metrics = frame.enable_usage_metrics
|
||||
|
||||
@@ -9,7 +9,7 @@ import io
|
||||
import wave
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, Optional
|
||||
from typing import AsyncGenerator, List, Optional, Tuple
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
@@ -37,9 +37,12 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.audio import calculate_audio_volume
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
from pipecat.utils.time import seconds_to_nanoseconds
|
||||
from pipecat.utils.utils import exp_smoothing
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class AIService(FrameProcessor):
|
||||
def __init__(self, **kwargs):
|
||||
@@ -167,7 +170,7 @@ class TTSService(AIService):
|
||||
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
|
||||
push_stop_frames: bool = False,
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 0.8,
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aggregate_sentences: bool = aggregate_sentences
|
||||
@@ -303,6 +306,74 @@ class TTSService(AIService):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncTTSService(TTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncWordTTSService(AsyncTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._initial_word_timestamp = -1
|
||||
self._words_queue = asyncio.Queue()
|
||||
self._words_task = self.get_event_loop().create_task(self._words_task_handler())
|
||||
|
||||
def start_word_timestamps(self):
|
||||
if self._initial_word_timestamp == -1:
|
||||
self._initial_word_timestamp = self.get_clock().get_time()
|
||||
|
||||
def reset_word_timestamps(self):
|
||||
self._initial_word_timestamp = -1
|
||||
self._word_timestamps = []
|
||||
|
||||
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
|
||||
for (word, timestamp) in word_times:
|
||||
await self._words_queue.put((word, seconds_to_nanoseconds(timestamp)))
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._stop_words_task()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._stop_words_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
|
||||
await self.flush_audio()
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
self.reset_word_timestamps()
|
||||
|
||||
async def _stop_words_task(self):
|
||||
if self._words_task:
|
||||
self._words_task.cancel()
|
||||
await self._words_task
|
||||
|
||||
async def _words_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
(word, timestamp) = await self._words_queue.get()
|
||||
if word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
else:
|
||||
frame = TextFrame(word)
|
||||
frame.pts = self._initial_word_timestamp + timestamp
|
||||
await self.push_frame(frame)
|
||||
self._words_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
|
||||
class STTService(AIService):
|
||||
"""STTService is a base class for speech-to-text services."""
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.services.ai_services import AsyncWordTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -60,7 +60,7 @@ def language_to_cartesia_language(language: Language) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
class CartesiaTTSService(TTSService):
|
||||
class CartesiaTTSService(AsyncWordTTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -74,19 +74,17 @@ class CartesiaTTSService(TTSService):
|
||||
sample_rate: int = 16000,
|
||||
language: str = "en",
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Aggregating sentences still gives cleaner-sounding results and fewer
|
||||
# artifacts than streaming one word at a time. On average, waiting for
|
||||
# a full sentence should only "cost" us 15ms or so with GPT-4o or a Llama 3
|
||||
# model, and it's worth it for the better audio quality.
|
||||
self._aggregate_sentences = True
|
||||
|
||||
# we don't want to automatically push LLM response text frames, because the
|
||||
# context aggregators will add them to the LLM context even if we're
|
||||
# interrupted. cartesia gives us word-by-word timestamps. we can use those
|
||||
# to generate text frames ourselves aligned with the playout timing of the audio!
|
||||
self._push_text_frames = False
|
||||
# artifacts than streaming one word at a time. On average, waiting for a
|
||||
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
|
||||
# 3 model, and it's worth it for the better audio quality.
|
||||
#
|
||||
# We also don't want to automatically push LLM response text frames,
|
||||
# because the context aggregators will add them to the LLM context even
|
||||
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
|
||||
# can use those to generate text frames ourselves aligned with the
|
||||
# playout timing of the audio!
|
||||
super().__init__(aggregate_sentences=True, push_text_frames=False, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._cartesia_version = cartesia_version
|
||||
@@ -102,10 +100,7 @@ class CartesiaTTSService(TTSService):
|
||||
|
||||
self._websocket = None
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
self._receive_task = None
|
||||
self._context_appending_task = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
@@ -140,7 +135,6 @@ class CartesiaTTSService(TTSService):
|
||||
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
|
||||
)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler())
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -149,10 +143,6 @@ class CartesiaTTSService(TTSService):
|
||||
try:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
if self._context_appending_task:
|
||||
self._context_appending_task.cancel()
|
||||
await self._context_appending_task
|
||||
self._context_appending_task = None
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
@@ -162,18 +152,33 @@ class CartesiaTTSService(TTSService):
|
||||
self._websocket = None
|
||||
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
await self.stop_all_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._context_id = None
|
||||
|
||||
async def flush_audio(self):
|
||||
if not self._context_id or not self._websocket:
|
||||
return
|
||||
logger.debug("Flushing audio")
|
||||
msg = {
|
||||
"transcript": "",
|
||||
"continue": False,
|
||||
"context_id": self._context_id,
|
||||
"model_id": self._model_id,
|
||||
"voice": {
|
||||
"mode": "id",
|
||||
"id": self._voice_id
|
||||
},
|
||||
"output_format": self._output_format,
|
||||
"language": self._language,
|
||||
"add_timestamps": True,
|
||||
}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
@@ -188,16 +193,14 @@ class CartesiaTTSService(TTSService):
|
||||
# because we are likely still playing out audio and need the
|
||||
# timestamp to set send context frames.
|
||||
self._context_id = None
|
||||
self._timestamped_words_buffer.append(("LLMFullResponseEndFrame", 0))
|
||||
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
|
||||
elif msg["type"] == "timestamps":
|
||||
# logger.debug(f"TIMESTAMPS: {msg}")
|
||||
self._timestamped_words_buffer.extend(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"]))
|
||||
await self.add_word_timestamps(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
|
||||
)
|
||||
elif msg["type"] == "chunk":
|
||||
await self.stop_ttfb_metrics()
|
||||
if not self._context_id_start_timestamp:
|
||||
self._context_id_start_timestamp = time.time()
|
||||
self.start_word_timestamps()
|
||||
frame = AudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
sample_rate=self._output_format["sample_rate"],
|
||||
@@ -216,27 +219,6 @@ class CartesiaTTSService(TTSService):
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _context_appending_task_handler(self):
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(0.1)
|
||||
if not self._context_id_start_timestamp:
|
||||
continue
|
||||
elapsed_seconds = time.time() - self._context_id_start_timestamp
|
||||
# Pop all words from self._timestamped_words_buffer that are
|
||||
# older than the elapsed time and print a message about them to
|
||||
# the console.
|
||||
while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds:
|
||||
word, timestamp = self._timestamped_words_buffer.pop(0)
|
||||
if word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
continue
|
||||
await self.push_frame(TextFrame(word))
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
|
||||
@@ -4,17 +4,30 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import AsyncGenerator, Literal
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
|
||||
from typing import Any, AsyncGenerator, List, Literal, Mapping, Tuple
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AsyncWordTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# See .env.example for ElevenLabs configuration needed
|
||||
try:
|
||||
from elevenlabs.client import AsyncElevenLabs
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
@@ -35,7 +48,28 @@ def sample_rate_from_output_format(output_format: str) -> int:
|
||||
return 16000
|
||||
|
||||
|
||||
class ElevenLabsTTSService(TTSService):
|
||||
def calculate_word_times(
|
||||
alignment_info: Mapping[str, Any], cumulative_time: float
|
||||
) -> List[Tuple[str, float]]:
|
||||
zipped_times = list(zip(alignment_info["chars"], alignment_info["charStartTimesMs"]))
|
||||
|
||||
words = "".join(alignment_info["chars"]).split(" ")
|
||||
|
||||
# Calculate start time for each word. We do this by finding a space character
|
||||
# and using the previous word time, also taking into account there might not
|
||||
# be a space at the end.
|
||||
times = []
|
||||
for (i, (a, b)) in enumerate(zipped_times):
|
||||
if a == " " or i == len(zipped_times) - 1:
|
||||
t = cumulative_time + (zipped_times[i - 1][1] / 1000.0)
|
||||
times.append(t)
|
||||
|
||||
word_times = list(zip(words, times))
|
||||
|
||||
return word_times
|
||||
|
||||
|
||||
class ElevenLabsTTSService(AsyncWordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
|
||||
|
||||
@@ -45,49 +79,185 @@ class ElevenLabsTTSService(TTSService):
|
||||
api_key: str,
|
||||
voice_id: str,
|
||||
model: str = "eleven_turbo_v2_5",
|
||||
url: str = "wss://api.elevenlabs.io",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
# Aggregating sentences still gives cleaner-sounding results and fewer
|
||||
# artifacts than streaming one word at a time. On average, waiting for a
|
||||
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
|
||||
# 3 model, and it's worth it for the better audio quality.
|
||||
#
|
||||
# We also don't want to automatically push LLM response text frames,
|
||||
# because the context aggregators will add them to the LLM context even
|
||||
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
|
||||
# can use those to generate text frames ourselves aligned with the
|
||||
# playout timing of the audio!
|
||||
#
|
||||
# Finally, ElevenLabs doesn't provide information on when the bot stops
|
||||
# speaking for a while, so we want the parent class to send TTSStopFrame
|
||||
# after a short period not receiving any audio.
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
self._model = model
|
||||
self._url = url
|
||||
self._params = params
|
||||
self._client = AsyncElevenLabs(api_key=api_key)
|
||||
self._sample_rate = sample_rate_from_output_format(params.output_format)
|
||||
|
||||
# Websocket connection to ElevenLabs.
|
||||
self._websocket = None
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
# there's an interruption or TTSStoppedFrame.
|
||||
self._started = False
|
||||
self._cumulative_time = 0
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_model(self, model: str):
|
||||
logger.debug(f"Switching TTS model to: [{model}]")
|
||||
self._model = model
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def flush_audio(self):
|
||||
if self._websocket:
|
||||
msg = {"text": " ", "flush": True}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await super().push_frame(frame, direction)
|
||||
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
self._started = False
|
||||
if isinstance(frame, TTSStoppedFrame):
|
||||
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
voice_id = self._voice_id
|
||||
model = self._model
|
||||
output_format = self._params.output_format
|
||||
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}"
|
||||
self._websocket = await websockets.connect(url)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._keepalive_task = self.get_event_loop().create_task(self._keepalive_task_handler())
|
||||
|
||||
# According to ElevenLabs, we should always start with a single space.
|
||||
msg = {
|
||||
"text": " ",
|
||||
"xi_api_key": self._api_key,
|
||||
}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
|
||||
async def _disconnect(self):
|
||||
try:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
self._receive_task = None
|
||||
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
await self._keepalive_task
|
||||
self._keepalive_task = None
|
||||
|
||||
if self._websocket:
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
async for message in self._websocket:
|
||||
msg = json.loads(message)
|
||||
if msg.get("audio"):
|
||||
await self.stop_ttfb_metrics()
|
||||
self.start_word_timestamps()
|
||||
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
frame = AudioRawFrame(audio, self._sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
|
||||
if msg.get("alignment"):
|
||||
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
|
||||
await self.add_word_timestamps(word_times)
|
||||
self._cumulative_time = word_times[-1][1]
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _keepalive_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
await self._send_text("")
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _send_text(self, text: str):
|
||||
if self._websocket:
|
||||
msg = {"text": text + " "}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
await self.start_ttfb_metrics()
|
||||
try:
|
||||
if not self._websocket:
|
||||
await self._connect()
|
||||
|
||||
results = await self._client.generate(
|
||||
text=text,
|
||||
voice=self._voice_id,
|
||||
model=self._model,
|
||||
output_format=self._params.output_format
|
||||
)
|
||||
try:
|
||||
if not self._started:
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
await self.start_ttfb_metrics()
|
||||
self._started = True
|
||||
self._cumulative_time = 0
|
||||
|
||||
tts_started = False
|
||||
async for audio in results:
|
||||
# This is so we send TTSStartedFrame when we have the first audio
|
||||
# bytes.
|
||||
if not tts_started:
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
tts_started = True
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = AudioRawFrame(audio, self._sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self._send_text(text)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error sending message: {e}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.services.ai_services import AsyncTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -34,7 +34,7 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class LmntTTSService(TTSService):
|
||||
class LmntTTSService(AsyncTTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -44,11 +44,9 @@ class LmntTTSService(TTSService):
|
||||
sample_rate: int = 24000,
|
||||
language: str = "en",
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Let TTSService produce TTSStoppedFrames after a short delay of
|
||||
# no activity.
|
||||
self._push_stop_frames = True
|
||||
super().__init__(push_stop_frames=True, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
@@ -62,6 +60,8 @@ class LmntTTSService(TTSService):
|
||||
self._speech = None
|
||||
self._connection = None
|
||||
self._receive_task = None
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
# there's an interruption or TTSStoppedFrame.
|
||||
self._started = False
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
|
||||
0
src/pipecat/transcriptions/__init__.py
Normal file
0
src/pipecat/transcriptions/__init__.py
Normal file
@@ -8,6 +8,7 @@
|
||||
import asyncio
|
||||
import itertools
|
||||
import time
|
||||
import sys
|
||||
|
||||
from PIL import Image
|
||||
from typing import List
|
||||
@@ -30,11 +31,14 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TextFrame,
|
||||
TransportMessageFrame)
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.utils.time import nanoseconds_to_seconds
|
||||
|
||||
|
||||
class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
@@ -64,7 +68,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Create sink frame task. This is the task that will actually write
|
||||
# audio or video frames. We write audio/video in a task so we can keep
|
||||
# generating frames upstream while, for example, the audio is playing.
|
||||
self._create_sink_task()
|
||||
self._create_sink_tasks()
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
@@ -149,6 +153,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._sink_queue.put(frame)
|
||||
await self.start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._sink_queue.put(frame)
|
||||
await self.stop(frame)
|
||||
# Other frames.
|
||||
@@ -158,6 +163,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._handle_image(frame)
|
||||
elif isinstance(frame, TransportMessageFrame) and frame.urgent:
|
||||
await self.send_message(frame)
|
||||
# TODO(aleix): Images and audio should support presentation timestamps.
|
||||
elif frame.pts:
|
||||
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
@@ -166,10 +174,14 @@ class BaseOutputTransport(FrameProcessor):
|
||||
return
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
# Stop sink task.
|
||||
# Stop sink tasks.
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
self._create_sink_task()
|
||||
# Stop sink clock tasks.
|
||||
self._sink_clock_task.cancel()
|
||||
await self._sink_clock_task
|
||||
# Create sink tasks.
|
||||
self._create_sink_tasks()
|
||||
# Stop push task.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
@@ -201,43 +213,83 @@ class BaseOutputTransport(FrameProcessor):
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
def _create_sink_task(self):
|
||||
#
|
||||
# Sink tasks
|
||||
#
|
||||
|
||||
def _create_sink_tasks(self):
|
||||
loop = self.get_event_loop()
|
||||
self._sink_queue = asyncio.Queue()
|
||||
self._sink_task = loop.create_task(self._sink_task_handler())
|
||||
self._sink_clock_queue = asyncio.PriorityQueue()
|
||||
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
await self._internal_push_frame(frame)
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
elif isinstance(frame, ImageRawFrame):
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await self._bot_stopped_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
else:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._sink_queue.get()
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
await self._internal_push_frame(frame)
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
elif isinstance(frame, ImageRawFrame):
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await self._bot_stopped_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
else:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
await self._sink_frame_handler(frame)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
|
||||
self._sink_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink queue: {e}")
|
||||
|
||||
async def _sink_clock_frame_handler(self, frame: Frame):
|
||||
# TODO(aleix): For now we just process TextFrame. But we should process
|
||||
# audio and video as well.
|
||||
if isinstance(frame, TextFrame):
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
async def _sink_clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
timestamp, _, frame = await self._sink_clock_queue.get()
|
||||
|
||||
# If we hit an EndFrame, we cna finish right away.
|
||||
running = not isinstance(frame, EndFrame)
|
||||
|
||||
# If we have a frame we check it's presentation timestamp. If it
|
||||
# has already passed we process it, otherwise we wait until it's
|
||||
# time to process it.
|
||||
if running:
|
||||
current_time = self.get_clock().get_time()
|
||||
if timestamp <= current_time:
|
||||
await self._sink_clock_frame_handler(frame)
|
||||
else:
|
||||
wait_time = nanoseconds_to_seconds(timestamp - current_time)
|
||||
await asyncio.sleep(wait_time)
|
||||
await self._sink_frame_handler(frame)
|
||||
|
||||
self._sink_clock_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
logger.debug("Bot started speaking")
|
||||
self._bot_speaking = True
|
||||
|
||||
@@ -32,6 +32,7 @@ class TransportParams(BaseModel):
|
||||
audio_out_is_live: bool = False
|
||||
audio_out_sample_rate: int = 16000
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
audio_in_enabled: bool = False
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_in_channels: int = 1
|
||||
|
||||
117
src/pipecat/transports/network/fastapi_http.py
Normal file
117
src/pipecat/transports/network/fastapi_http.py
Normal file
@@ -0,0 +1,117 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import io
|
||||
import wave
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from fastapi import Request, Response
|
||||
from starlette.background import BackgroundTask
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use FastAPI HTTP SSE, you need to `pip install pipecat-ai[http]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class FastAPIHTTPParams(TransportParams):
|
||||
serializer: FrameSerializer
|
||||
|
||||
|
||||
class FastAPIHTTPInputTransport(BaseInputTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
params: FastAPIHTTPParams,
|
||||
**kwargs):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._params = params
|
||||
self._request = None
|
||||
|
||||
# todo: this should probably expect a list of frames, not just one frame
|
||||
async def handle_request(self, request: Request):
|
||||
self._request = request
|
||||
frames_list = await request.json()
|
||||
logger.debug(f"Received frames: {frames_list}")
|
||||
for frame in frames_list:
|
||||
logger.debug(f"Received frame: {frame}")
|
||||
frame = self._params.serializer.deserialize(frame)
|
||||
if frame and isinstance(frame, AudioRawFrame):
|
||||
await self.push_audio_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
class FastAPIHTTPOutputTransport(BaseOutputTransport):
|
||||
|
||||
def __init__(self, params: FastAPIHTTPParams, **kwargs):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._params = params
|
||||
self._event_queue = asyncio.Queue()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
pass
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
await self._event_queue.put(payload)
|
||||
|
||||
async def event_generator(self):
|
||||
while True:
|
||||
event = await self._event_queue.get()
|
||||
logger.debug(f"Sending event {event}")
|
||||
yield event
|
||||
|
||||
|
||||
class FastAPIHTTPTransport(BaseTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
params: FastAPIHTTPParams,
|
||||
input_name: str | None = None,
|
||||
output_name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None):
|
||||
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
|
||||
self._params = params
|
||||
self._request = None
|
||||
|
||||
self._input = FastAPIHTTPInputTransport(
|
||||
self._params, name=self._input_name)
|
||||
self._output = FastAPIHTTPOutputTransport(
|
||||
self._params, name=self._output_name)
|
||||
|
||||
def input(self) -> FrameProcessor:
|
||||
return self._input
|
||||
|
||||
def output(self) -> FrameProcessor:
|
||||
return self._output
|
||||
|
||||
async def handle_request(self, request: Request):
|
||||
self._request = request
|
||||
await self._input.handle_request(request)
|
||||
return EventSourceResponse(self._output.event_generator())
|
||||
@@ -101,7 +101,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
self._websocket_audio_buffer += frames
|
||||
while len(self._websocket_audio_buffer) >= self._params.audio_frame_size:
|
||||
while len(self._websocket_audio_buffer):
|
||||
frame = AudioRawFrame(
|
||||
audio=self._websocket_audio_buffer[:
|
||||
self._params.audio_frame_size],
|
||||
|
||||
@@ -366,6 +366,12 @@ class DailyTransportClient(EventHandler):
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"microphone": {
|
||||
"sendSettings": {
|
||||
"channelConfig": "stereo" if self._params.audio_out_channels == 2 else "mono",
|
||||
"bitrate": self._params.audio_out_bitrate,
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
@@ -9,3 +9,20 @@ import datetime
|
||||
|
||||
def time_now_iso8601() -> str:
|
||||
return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="milliseconds")
|
||||
|
||||
|
||||
def seconds_to_nanoseconds(seconds: float) -> int:
|
||||
return int(seconds * 1_000_000_000)
|
||||
|
||||
|
||||
def nanoseconds_to_seconds(nanoseconds: int) -> float:
|
||||
return nanoseconds / 1_000_000_000
|
||||
|
||||
|
||||
def nanoseconds_to_str(nanoseconds: int) -> str:
|
||||
total_seconds = nanoseconds_to_seconds(nanoseconds)
|
||||
hours = int(total_seconds // 3600)
|
||||
minutes = int((total_seconds % 3600) // 60)
|
||||
seconds = int(total_seconds % 60)
|
||||
microseconds = int((total_seconds - int(total_seconds)) * 1_000_000)
|
||||
return f"{hours}:{minutes:02}:{seconds:02}.{microseconds:06}"
|
||||
|
||||
Reference in New Issue
Block a user