services: add GladiaSTTService support

This commit is contained in:
Aleix Conchillo Flaqué
2024-07-01 17:49:57 -07:00
parent 974d9c33ed
commit a9a82c083b
6 changed files with 227 additions and 4 deletions

View File

@@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `GladiaSTTService`.
See https://docs.gladia.io/chapters/speech-to-text-api/pages/live-speech-recognition
- Added `XTTSService`. This is a local Text-To-Speech service.
See https://github.com/coqui-ai/TTS
- Added `UserIdleProcessor`. This processor can be used to wait for any
interaction with the user. If the user doesn't say anything within a given
timeout a provided callback is called.
@@ -20,9 +26,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed
upstream while the bot is talking.
- Added `XTTSService`. This is a local Text-To-Speech service.
See https://github.com/coqui-ai/TTS
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
or `SileroVAD`.

View File

@@ -39,7 +39,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples

View File

@@ -27,6 +27,9 @@ FAL_KEY=...
# Fireworks
FIREWORKS_API_KEY=...
# Gladia
GLADIA_API_KEY=...
# PlayHT
PLAY_HT_USER_ID=...
PLAY_HT_API_KEY=...

View File

@@ -0,0 +1,101 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -41,6 +41,7 @@ daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.2.7" ]
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.0" ]
gladia = [ "websockets~=12.0" ]
google = [ "google-generativeai~=0.5.3" ]
fireworks = [ "openai~=1.26.0" ]
langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1", "langchain-openai~=0.1.8" ]

View File

@@ -0,0 +1,115 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import base64
import json
import time
from typing import Optional
from pydantic.main import BaseModel
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
SystemFrame,
TranscriptionFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncAIService
from loguru import logger
# See .env.example for Gladia configuration needed
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Gladia, you need to `pip install pipecat-ai[gladia]`. Also, set `GLADIA_API_KEY` environment variable.")
raise Exception(f"Missing module: {e}")
class GladiaSTTService(AsyncAIService):
class InputParams(BaseModel):
sample_rate: Optional[int] = 16000
language: Optional[str] = "english"
transcription_hint: Optional[str] = None
endpointing: Optional[int] = 200
prosody: Optional[bool] = None
def __init__(self,
*,
api_key: str,
url: str = "wss://api.gladia.io/audio/text/audio-transcription",
confidence: float = 0.5,
params: InputParams = InputParams(),
**kwargs):
super().__init__(**kwargs)
self._api_key = api_key
self._url = url
self._params = params
self._confidence = confidence
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
await self._send_audio(frame)
else:
await self.queue_frame(frame, direction)
async def start(self, frame: StartFrame):
self._websocket = await websockets.connect(self._url)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
await self._setup_gladia()
async def stop(self, frame: EndFrame):
await self._websocket.close()
async def cancel(self, frame: CancelFrame):
await self._websocket.close()
async def _setup_gladia(self):
configuration = {
"x_gladia_key": self._api_key,
"encoding": "WAV/PCM",
"model_type": "fast",
"language_behaviour": "manual",
**self._params.model_dump(exclude_none=True)
}
await self._websocket.send(json.dumps(configuration))
async def _send_audio(self, frame: AudioRawFrame):
message = {
'frames': base64.b64encode(frame.audio).decode("utf-8")
}
await self._websocket.send(json.dumps(message))
async def _receive_task_handler(self):
async for message in self._websocket:
utterance = json.loads(message)
if not utterance:
continue
if "error" in utterance:
message = utterance["message"]
logger.error(f"Gladia error: {message}")
elif "confidence" in utterance:
type = utterance["type"]
confidence = utterance["confidence"]
transcript = utterance["transcription"]
if confidence >= self._confidence:
if type == "final":
await self.queue_frame(TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)))
else:
await self.queue_frame(InterimTranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)))