Compare commits
10 Commits
hush/realt
...
khk-deepgr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40f2ed41e9 | ||
|
|
0875fca15d | ||
|
|
cd67885d0b | ||
|
|
fa25234296 | ||
|
|
93a4c2cec6 | ||
|
|
5ea5d42317 | ||
|
|
af2982eeb3 | ||
|
|
24189b2591 | ||
|
|
cf87a3cac1 | ||
|
|
533b1f8b56 |
1
examples/fast-bot-metrics/bot.py
Symbolic link
1
examples/fast-bot-metrics/bot.py
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
classic-pipeline.py
|
||||||
170
examples/fast-bot-metrics/bot_runner.py
Normal file
170
examples/fast-bot-metrics/bot_runner.py
Normal file
@@ -0,0 +1,170 @@
|
|||||||
|
"""
|
||||||
|
bot_runner.py
|
||||||
|
|
||||||
|
HTTP service that listens for incoming calls from either Daily or Twilio,
|
||||||
|
provisioning a room and starting a Pipecat bot in response.
|
||||||
|
|
||||||
|
Refer to README for more information.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomSipParams, DailyRoomParams
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Request, HTTPException
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
from fastapi.responses import JSONResponse, PlainTextResponse
|
||||||
|
from twilio.twiml.voice_response import VoiceResponse
|
||||||
|
|
||||||
|
from bot import BotSettings
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
|
||||||
|
# ------------ Configuration ------------ #
|
||||||
|
|
||||||
|
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||||
|
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
|
||||||
|
|
||||||
|
daily_rest_helper = DailyRESTHelper(
|
||||||
|
os.getenv("DAILY_API_KEY", ""),
|
||||||
|
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||||
|
|
||||||
|
|
||||||
|
class RunnerSettings(BaseModel):
|
||||||
|
prompt: Optional[str] = "You are a helpful assistant."
|
||||||
|
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
|
||||||
|
openai_model: Optional[str] = os.getenv("OPENAI_MODEL")
|
||||||
|
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
|
||||||
|
test: Optional[bool] = None
|
||||||
|
|
||||||
|
# ----------------- API ----------------- #
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=["*"],
|
||||||
|
allow_credentials=True,
|
||||||
|
allow_methods=["*"],
|
||||||
|
allow_headers=["*"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# ----------------- Main ----------------- #
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/start_bot")
|
||||||
|
async def start_bot(request: Request) -> JSONResponse:
|
||||||
|
runner_settings = RunnerSettings()
|
||||||
|
try:
|
||||||
|
request_body = await request.body()
|
||||||
|
if len(request_body) > 0:
|
||||||
|
runner_settings = RunnerSettings.model_validate_json(request_body)
|
||||||
|
except ValidationError as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail=f"Invalid request: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
# If no data in request, pass
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Is this a webhook creation request?
|
||||||
|
if runner_settings.test is not None:
|
||||||
|
return JSONResponse({"test": True})
|
||||||
|
|
||||||
|
# Use specified room URL, or create a new one if not specified
|
||||||
|
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
|
||||||
|
|
||||||
|
if not room_url:
|
||||||
|
params = DailyRoomParams(
|
||||||
|
properties=DailyRoomProperties()
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Unable to provision room {e}")
|
||||||
|
else:
|
||||||
|
# Check passed room URL exists, we should assume that it already has a sip set up
|
||||||
|
try:
|
||||||
|
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail=f"Room not found: {room_url}")
|
||||||
|
|
||||||
|
# Give the agent a token to join the session
|
||||||
|
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||||
|
|
||||||
|
if not room or not token:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||||
|
|
||||||
|
# Spawn a new agent, and join the user session
|
||||||
|
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
|
||||||
|
try:
|
||||||
|
bot_settings = BotSettings(
|
||||||
|
room_url=room.url,
|
||||||
|
room_token=token,
|
||||||
|
prompt=runner_settings.prompt,
|
||||||
|
deepgram_voice=runner_settings.deepgram_voice,
|
||||||
|
deepgram_api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||||
|
# deepgram_base_url="http://0.0.0.0:8080/v1/speak",
|
||||||
|
openai_model=runner_settings.openai_model,
|
||||||
|
openai_api_key=runner_settings.openai_api_key,
|
||||||
|
# openai_base_url="http://0.0.0.0:8000/v1",
|
||||||
|
)
|
||||||
|
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
|
||||||
|
|
||||||
|
subprocess.Popen(
|
||||||
|
[f"python3 -m bot -s '{bot_settings_str}'"],
|
||||||
|
shell=True,
|
||||||
|
bufsize=1,
|
||||||
|
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||||
|
|
||||||
|
# Grab a token for the user to join with
|
||||||
|
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||||
|
|
||||||
|
return JSONResponse({
|
||||||
|
"room_url": room.url,
|
||||||
|
"token": user_token,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Check environment variables
|
||||||
|
for env_var in REQUIRED_ENV_VARS:
|
||||||
|
if env_var not in os.environ:
|
||||||
|
raise Exception(f"Missing environment variable: {env_var}.")
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
|
||||||
|
parser.add_argument("--host", type=str,
|
||||||
|
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
|
||||||
|
parser.add_argument("--port", type=int,
|
||||||
|
default=os.getenv("PORT", 7860), help="Port number")
|
||||||
|
parser.add_argument("--reload", action="store_true",
|
||||||
|
default=True, help="Reload code on change")
|
||||||
|
|
||||||
|
config = parser.parse_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
uvicorn.run(
|
||||||
|
"bot_runner:app",
|
||||||
|
host=config.host,
|
||||||
|
port=config.port,
|
||||||
|
reload=config.reload
|
||||||
|
)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Pipecat runner shutting down...")
|
||||||
199
examples/fast-bot-metrics/classic-pipeline.py
Normal file
199
examples/fast-bot-metrics/classic-pipeline.py
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024, Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from runner import configure
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
|
from pipecat.vad.vad_analyzer import VADParams
|
||||||
|
from pipecat.vad.silero import SileroVADAnalyzer
|
||||||
|
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
|
||||||
|
from pipecat.services.openai import OpenAILLMService, OpenAILLMContext
|
||||||
|
from pipecat.services.deepgram import DeepgramSTTService
|
||||||
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
|
from pipecat.processors.logger import FrameLogger
|
||||||
|
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
|
||||||
|
|
||||||
|
from pipecat.processors.aggregators.llm_response import (
|
||||||
|
LLMAssistantResponseAggregator, LLMUserResponseAggregator
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
from fastbothelpers import (
|
||||||
|
GreedyLLMAggregator,
|
||||||
|
ClearableDeepgramTTSService,
|
||||||
|
VADGate,
|
||||||
|
AudioVolumeTimer,
|
||||||
|
TranscriptionTimingLogger
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
logger.remove(0)
|
||||||
|
logger.add(sys.stderr, level="DEBUG")
|
||||||
|
|
||||||
|
|
||||||
|
class BotSettings(BaseModel):
|
||||||
|
room_url: str
|
||||||
|
room_token: str
|
||||||
|
bot_name: str = "Pipecat"
|
||||||
|
prompt: Optional[str] = "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way in a few short sentences."
|
||||||
|
deepgram_api_key: Optional[str] = None
|
||||||
|
deepgram_voice: Optional[str] = "aura-asteria-en"
|
||||||
|
deepgram_base_url: Optional[str] = "https://api.deepgram.com/v1/speak"
|
||||||
|
openai_api_key: Optional[str] = None
|
||||||
|
openai_model: Optional[str] = "gpt-4o"
|
||||||
|
openai_base_url: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
async def main(settings: BotSettings):
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
transport = DailyTransport(
|
||||||
|
settings.room_url,
|
||||||
|
settings.room_token,
|
||||||
|
settings.bot_name,
|
||||||
|
DailyParams(
|
||||||
|
audio_out_enabled=True,
|
||||||
|
transcription_enabled=False,
|
||||||
|
vad_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.200)),
|
||||||
|
vad_audio_passthrough=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
stt = DeepgramSTTService(
|
||||||
|
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||||
|
**({'url': url} if (url := os.getenv("DEEPGRAM_STT_URL")) else {})
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
tts = ClearableDeepgramTTSService(
|
||||||
|
name="Voice",
|
||||||
|
aiohttp_session=session,
|
||||||
|
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||||
|
voice="aura-asteria-en",
|
||||||
|
**({'base_url': url} if (url := os.getenv("DEEPGRAM_TTS_BASE_URL")) else {})
|
||||||
|
)
|
||||||
|
|
||||||
|
llm = OpenAILLMService(
|
||||||
|
name="LLM",
|
||||||
|
# To use OpenAI
|
||||||
|
api_key=os.getenv("OPENAI_API_KEY"),
|
||||||
|
model=os.getenv("OPENAI_MODEL"),
|
||||||
|
base_url=os.getenv("OPENAI_BASE_URL")
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": """You are a helpful assistant in an audio conversation.
|
||||||
|
|
||||||
|
Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers.
|
||||||
|
|
||||||
|
Respond to what the user said in a creative and helpful way. Be concise in your answers to basic questions. If you are asked to elaborate or tell a story, provide a longer response.
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
avt = AudioVolumeTimer()
|
||||||
|
tl = TranscriptionTimingLogger(avt)
|
||||||
|
|
||||||
|
tma_in = LLMUserResponseAggregator(messages)
|
||||||
|
tma_out = LLMAssistantResponseAggregator(messages)
|
||||||
|
|
||||||
|
pipeline = Pipeline([
|
||||||
|
transport.input(), # Transport user input
|
||||||
|
avt,
|
||||||
|
stt,
|
||||||
|
tl,
|
||||||
|
tma_in, # User responses
|
||||||
|
llm, # LLM
|
||||||
|
tts, # TTS
|
||||||
|
transport.output(), # Transport bot output
|
||||||
|
tma_out, # Assistant spoken responses
|
||||||
|
])
|
||||||
|
|
||||||
|
task = PipelineTask(
|
||||||
|
pipeline,
|
||||||
|
PipelineParams(
|
||||||
|
allow_interruptions=True,
|
||||||
|
enable_metrics=True,
|
||||||
|
report_only_initial_ttfb=True
|
||||||
|
))
|
||||||
|
|
||||||
|
# When a participant joins, start transcription for that participant so the
|
||||||
|
# bot can "hear" and respond to them.
|
||||||
|
# @ transport.event_handler("on_participant_joined")
|
||||||
|
# async def on_participant_joined(transport, participant):
|
||||||
|
# transport.capture_participant_transcription(participant["id"])
|
||||||
|
|
||||||
|
# When the participant leaves, we exit the bot.
|
||||||
|
@transport.event_handler("on_participant_left")
|
||||||
|
async def on_participant_left(transport, participant, reason):
|
||||||
|
await task.queue_frame(EndFrame())
|
||||||
|
|
||||||
|
# When the first participant joins, the bot should introduce itself.
|
||||||
|
@ transport.event_handler("on_first_participant_joined")
|
||||||
|
async def on_first_participant_joined(transport, participant):
|
||||||
|
messages.append(
|
||||||
|
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||||
|
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||||
|
|
||||||
|
# Handle "latency-ping" messages. The client will send app messages that look like
|
||||||
|
# this:
|
||||||
|
# { "latency-ping": { ts: <client-side timestamp> }}
|
||||||
|
#
|
||||||
|
# We want to send an immediate pong back to the client from this handler function.
|
||||||
|
# Also, we will push a frame into the top of the pipeline and send it after the
|
||||||
|
#
|
||||||
|
@ transport.event_handler("on_app_message")
|
||||||
|
async def on_app_message(transport, message, sender):
|
||||||
|
try:
|
||||||
|
if "latency-ping" in message:
|
||||||
|
logger.debug(f"Received latency ping app message: {message}")
|
||||||
|
ts = message["latency-ping"]["ts"]
|
||||||
|
# Send immediately
|
||||||
|
transport.output().send_message(DailyTransportMessageFrame(
|
||||||
|
message={"latency-pong-msg-handler": {"ts": ts}},
|
||||||
|
participant_id=sender))
|
||||||
|
# And push to the pipeline for the Daily transport.output to send
|
||||||
|
await tma_in.push_frame(
|
||||||
|
DailyTransportMessageFrame(
|
||||||
|
message={"latency-pong-pipeline-delivery": {"ts": ts}},
|
||||||
|
participant_id=sender))
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"message handling error: {e} - {message}")
|
||||||
|
|
||||||
|
runner = PipelineRunner()
|
||||||
|
await runner.run(task)
|
||||||
|
|
||||||
|
|
||||||
|
# if __name__ == "__main__":
|
||||||
|
# (url, token) = configure()
|
||||||
|
# asyncio.run(main(url, token))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||||
|
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
|
||||||
|
|
||||||
|
args, unknown = parser.parse_known_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
settings = BotSettings.model_validate_json(args.settings)
|
||||||
|
asyncio.run(main(settings))
|
||||||
|
except ValidationError as e:
|
||||||
|
print(e)
|
||||||
15
examples/fast-bot-metrics/env-example
Normal file
15
examples/fast-bot-metrics/env-example
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
DAILY_SAMPLE_ROOM_URL=...
|
||||||
|
DAILY_API_KEY=...
|
||||||
|
|
||||||
|
DEEPGRAM_API_KEY=...
|
||||||
|
DEEPGRAM_VOICE=alpha-asteria-en-v2
|
||||||
|
# DEEPGRAM_STT_URL=ws://localhost:8080
|
||||||
|
# DEEPGRAM_TTS_BASE_URL=http://0.0.0.0:8080/v1/speak
|
||||||
|
|
||||||
|
OPENAI_API_KEY=...
|
||||||
|
# OPENAI_API_KEY=
|
||||||
|
OPENAI_MODEL=gpt-4o
|
||||||
|
# OPENAI_MODEL=meta-llama/Meta-Llama-3-70B-Instruct
|
||||||
|
# OPENAI_MODEL=meta-llama/Meta-Llama-3-8B-Instruct
|
||||||
|
# OPENAI_MODEL=neuralmagic/Meta-Llama-3-70B-Instruct-FP8
|
||||||
|
# OPENAI_BASE_URL=http://0.0.0.0:8000/v1
|
||||||
158
examples/fast-bot-metrics/example-bot-runner-bot.py
Normal file
158
examples/fast-bot-metrics/example-bot-runner-bot.py
Normal file
@@ -0,0 +1,158 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024, Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||||
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
|
from pipecat.processors.aggregators.llm_response import (
|
||||||
|
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||||
|
from pipecat.services.deepgram import DeepgramTTSService
|
||||||
|
from pipecat.services.openai import OpenAILLMService
|
||||||
|
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
|
||||||
|
from pipecat.vad.silero import SileroVADAnalyzer
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
logger.remove(0)
|
||||||
|
logger.add(sys.stderr, level="DEBUG")
|
||||||
|
|
||||||
|
|
||||||
|
class BotSettings(BaseModel):
|
||||||
|
room_url: str
|
||||||
|
room_token: str
|
||||||
|
bot_name: str = "Pipecat"
|
||||||
|
prompt: Optional[str] = "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way in a few short sentences."
|
||||||
|
deepgram_api_key: Optional[str] = None
|
||||||
|
deepgram_voice: Optional[str] = "aura-asteria-en"
|
||||||
|
deepgram_base_url: Optional[str] = "https://api.deepgram.com/v1/speak"
|
||||||
|
openai_api_key: Optional[str] = None
|
||||||
|
openai_model: Optional[str] = "gpt-4o"
|
||||||
|
openai_base_url: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
async def main(settings: BotSettings):
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
transport = DailyTransport(
|
||||||
|
settings.room_url,
|
||||||
|
settings.room_token,
|
||||||
|
settings.bot_name,
|
||||||
|
DailyParams(
|
||||||
|
audio_out_enabled=True,
|
||||||
|
transcription_enabled=True,
|
||||||
|
vad_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
tts = DeepgramTTSService(
|
||||||
|
aiohttp_session=session,
|
||||||
|
api_key=settings.deepgram_api_key,
|
||||||
|
voice=settings.deepgram_voice,
|
||||||
|
base_url=settings.deepgram_base_url
|
||||||
|
)
|
||||||
|
|
||||||
|
llm = OpenAILLMService(
|
||||||
|
api_key=settings.openai_api_key,
|
||||||
|
model=settings.openai_model,
|
||||||
|
base_url=settings.openai_base_url
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": settings.prompt,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
tma_in = LLMUserResponseAggregator(messages)
|
||||||
|
tma_out = LLMAssistantResponseAggregator(messages)
|
||||||
|
|
||||||
|
pipeline = Pipeline([
|
||||||
|
transport.input(), # Transport user input
|
||||||
|
tma_in, # User responses
|
||||||
|
llm, # LLM
|
||||||
|
tts, # TTS
|
||||||
|
transport.output(), # Transport bot output
|
||||||
|
tma_out, # Assistant spoken responses
|
||||||
|
])
|
||||||
|
|
||||||
|
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||||
|
|
||||||
|
# When the first participant joins, the bot should introduce itself.
|
||||||
|
@transport.event_handler("on_first_participant_joined")
|
||||||
|
async def on_first_participant_joined(transport, participant):
|
||||||
|
# Kick off the conversation.
|
||||||
|
messages.append(
|
||||||
|
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||||
|
await task.queue_frame(LLMMessagesFrame(messages))
|
||||||
|
|
||||||
|
# When a participant joins, start transcription for that participant so the
|
||||||
|
# bot can "hear" and respond to them.
|
||||||
|
@transport.event_handler("on_participant_joined")
|
||||||
|
async def on_participant_joined(transport, participant):
|
||||||
|
transport.capture_participant_transcription(participant["id"])
|
||||||
|
|
||||||
|
# When the participant leaves, we exit the bot.
|
||||||
|
@transport.event_handler("on_participant_left")
|
||||||
|
async def on_participant_left(transport, participant, reason):
|
||||||
|
await task.queue_frame(EndFrame())
|
||||||
|
|
||||||
|
# If the call is ended make sure we quit as well.
|
||||||
|
@transport.event_handler("on_call_state_updated")
|
||||||
|
async def on_call_state_updated(transport, state):
|
||||||
|
if state == "left":
|
||||||
|
await task.queue_frame(EndFrame())
|
||||||
|
|
||||||
|
# Handle "latency-ping" messages. The client will send app messages that look like
|
||||||
|
# this:
|
||||||
|
# { "latency-ping": { ts: <client-side timestamp> }}
|
||||||
|
#
|
||||||
|
# We want to send an immediate pong back to the client from this handler function.
|
||||||
|
# Also, we will push a frame into the top of the pipeline and send it after the
|
||||||
|
#
|
||||||
|
@transport.event_handler("on_app_message")
|
||||||
|
async def on_app_message(transport, message, sender):
|
||||||
|
try:
|
||||||
|
if "latency-ping" in message:
|
||||||
|
logger.debug(f"Received latency ping app message: {message}")
|
||||||
|
ts = message["latency-ping"]["ts"]
|
||||||
|
# Send immediately
|
||||||
|
transport.output().send_message(DailyTransportMessageFrame(
|
||||||
|
message={"latency-pong-msg-handler": {"ts": ts}},
|
||||||
|
participant_id=sender))
|
||||||
|
# And push to the pipeline for the Daily transport.output to send
|
||||||
|
await tma_in.push_frame(
|
||||||
|
DailyTransportMessageFrame(
|
||||||
|
message={"latency-pong-pipeline-delivery": {"ts": ts}},
|
||||||
|
participant_id=sender))
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"message handling error: {e} - {message}")
|
||||||
|
|
||||||
|
runner = PipelineRunner()
|
||||||
|
|
||||||
|
await runner.run(task)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||||
|
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
|
||||||
|
|
||||||
|
args, unknown = parser.parse_known_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
settings = BotSettings.model_validate_json(args.settings)
|
||||||
|
asyncio.run(main(settings))
|
||||||
|
except ValidationError as e:
|
||||||
|
print(e)
|
||||||
268
examples/fast-bot-metrics/fastbothelpers.py
Normal file
268
examples/fast-bot-metrics/fastbothelpers.py
Normal file
@@ -0,0 +1,268 @@
|
|||||||
|
from loguru import logger
|
||||||
|
import asyncio
|
||||||
|
import math
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
|
||||||
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||||
|
from pipecat.frames.frames import (
|
||||||
|
Frame,
|
||||||
|
AudioRawFrame,
|
||||||
|
InterimTranscriptionFrame,
|
||||||
|
TranscriptionFrame,
|
||||||
|
TextFrame,
|
||||||
|
StartInterruptionFrame,
|
||||||
|
StopInterruptionFrame,
|
||||||
|
LLMFullResponseStartFrame,
|
||||||
|
TTSStoppedFrame,
|
||||||
|
MetricsFrame
|
||||||
|
)
|
||||||
|
|
||||||
|
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
|
||||||
|
from pipecat.services.deepgram import DeepgramTTSService, DeepgramSTTService
|
||||||
|
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
|
||||||
|
|
||||||
|
|
||||||
|
class GreedyLLMAggregator(FrameProcessor):
|
||||||
|
def __init__(self, context: OpenAILLMContext = None, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
logger.debug(f"{frame}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
if isinstance(frame, InterimTranscriptionFrame):
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(frame, TranscriptionFrame):
|
||||||
|
# append transcribed text to last "user" frame
|
||||||
|
if self.context.messages and self.context.messages[-1]["role"] == "user":
|
||||||
|
last_frame = self.context.messages.pop()
|
||||||
|
else:
|
||||||
|
last_frame = {"role": "user", "content": ""}
|
||||||
|
|
||||||
|
last_frame["content"] += " " + frame.text
|
||||||
|
self.context.messages.append(last_frame)
|
||||||
|
|
||||||
|
oai_context_frame = OpenAILLMContextFrame(context=self.context)
|
||||||
|
logger.debug(f"pushing frame {oai_context_frame}")
|
||||||
|
await self.push_frame(oai_context_frame)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"error: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class ClearableDeepgramTTSService(DeepgramTTSService):
|
||||||
|
def __init___(self, **kwargs):
|
||||||
|
super().__init(**kwargs)
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, StartInterruptionFrame):
|
||||||
|
self._current_sentence = ""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BufferedSentence:
|
||||||
|
audio_frames: List[AudioRawFrame] = field(default_factory=list)
|
||||||
|
text_frame: TextFrame = None
|
||||||
|
|
||||||
|
|
||||||
|
class VADGate(FrameProcessor):
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
vad_analyzer: VADAnalyzer = None,
|
||||||
|
context: OpenAILLMContext = None,
|
||||||
|
**kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.vad_analyzer = vad_analyzer
|
||||||
|
self.context = context
|
||||||
|
|
||||||
|
self._audio_pusher_task = None
|
||||||
|
self._expect_text_frame_next = False
|
||||||
|
self._sentences: List[BufferedSentence] = []
|
||||||
|
|
||||||
|
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
|
||||||
|
# each text frame.
|
||||||
|
#
|
||||||
|
# start a coroutine to service the queue and send sentences down the pipeline when possible.
|
||||||
|
# 1. do not send anything when we are not in VADState.QUIET
|
||||||
|
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
|
||||||
|
# to output, sleep until it's time to send another sentence
|
||||||
|
# 3. each time we send a sentence, append it to the conversation context
|
||||||
|
# 3. when the sentence buffer becomes empty, cancel the coroutine
|
||||||
|
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
|
||||||
|
# then a TextFrame.
|
||||||
|
|
||||||
|
if self._expect_text_frame_next:
|
||||||
|
self._expect_text_frame_next = False
|
||||||
|
if isinstance(frame, TextFrame):
|
||||||
|
self._sentences[-1].text_frame = frame
|
||||||
|
else:
|
||||||
|
logger.debug(f"expected a text frame, but received {frame}")
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
if isinstance(frame, TextFrame):
|
||||||
|
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
|
||||||
|
|
||||||
|
if isinstance(frame, AudioRawFrame):
|
||||||
|
# if our buffer is empty or has a "finished" sentence at the end,
|
||||||
|
# then we need to start buffering a new sentence
|
||||||
|
if not self._sentences or self._sentences[-1].text_frame:
|
||||||
|
self._sentences.append(BufferedSentence())
|
||||||
|
self._sentences[-1].audio_frames.append(frame)
|
||||||
|
await self.maybe_start_audio_pusher_task()
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(frame, TTSStoppedFrame):
|
||||||
|
self._expect_text_frame_next = True
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
return
|
||||||
|
|
||||||
|
# There are two ways we can be interrupted. During greedy inference, a new
|
||||||
|
# LLM response can start. Or, during playout, we can get a traditional
|
||||||
|
# user interruption frame.
|
||||||
|
if (isinstance(frame, LLMFullResponseStartFrame) or
|
||||||
|
isinstance(frame, StartInterruptionFrame)):
|
||||||
|
logger.debug(f"{frame} - Handle interruption in VADGate")
|
||||||
|
self._sentences = []
|
||||||
|
if self._audio_pusher_task:
|
||||||
|
self._audio_pusher_task.cancel()
|
||||||
|
self._audio_pusher_task = None
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"error: {e}")
|
||||||
|
|
||||||
|
async def maybe_start_audio_pusher_task(self):
|
||||||
|
try:
|
||||||
|
if self._audio_pusher_task:
|
||||||
|
return
|
||||||
|
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Exception {e}")
|
||||||
|
|
||||||
|
async def push_audio(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
if not self._sentences:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if self.vad_analyzer._vad_state != VADState.QUIET:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# we only want to push completed sentence buffers
|
||||||
|
if not self._sentences[0].text_frame:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
s = self._sentences.pop(0)
|
||||||
|
if not s.audio_frames:
|
||||||
|
continue
|
||||||
|
sample_rate = s.audio_frames[0].sample_rate
|
||||||
|
duration = 0
|
||||||
|
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
|
||||||
|
for frame in s.audio_frames:
|
||||||
|
await self.push_frame(frame)
|
||||||
|
# assume linear16 encoding (2 bytes per sample). todo: add some more
|
||||||
|
# metadata to AudioRawFrame, maybe
|
||||||
|
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
|
||||||
|
await asyncio.sleep(duration - 20 / 1000)
|
||||||
|
if self.context:
|
||||||
|
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
|
||||||
|
self.context.messages.append(
|
||||||
|
{"role": "assistant", "content": s.text_frame.text}
|
||||||
|
)
|
||||||
|
await self.push_frame(s.text_frame)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Exception {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class TranscriptionTimingLogger(FrameProcessor):
|
||||||
|
def __init__(self, avt):
|
||||||
|
super().__init__()
|
||||||
|
self.name = "Transcription"
|
||||||
|
self._avt = avt
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
try:
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
if isinstance(frame, TranscriptionFrame):
|
||||||
|
elapsed = time.time() - self._avt.last_transition_ts
|
||||||
|
logger.debug(f"Transcription TTF: {elapsed}")
|
||||||
|
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Exception {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class AudioVolumeTimer(FrameProcessor):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.last_transition_ts = 0
|
||||||
|
self._prev_volume = -80
|
||||||
|
self._speech_volume_threshold = -50
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, AudioRawFrame):
|
||||||
|
volume = self.calculate_volume(frame)
|
||||||
|
# print(f"Audio volume: {volume:.2f} dB")
|
||||||
|
if (volume >= self._speech_volume_threshold and
|
||||||
|
self._prev_volume < self._speech_volume_threshold):
|
||||||
|
# logger.debug("transition above speech volume threshold")
|
||||||
|
self.last_transition_ts = time.time()
|
||||||
|
elif (volume < self._speech_volume_threshold and
|
||||||
|
self._prev_volume >= self._speech_volume_threshold):
|
||||||
|
# logger.debug("transition below non-speech volume threshold")
|
||||||
|
self.last_transition_ts = time.time()
|
||||||
|
self._prev_volume = volume
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
|
||||||
|
def calculate_volume(self, frame: AudioRawFrame) -> float:
|
||||||
|
if frame.num_channels != 1:
|
||||||
|
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
|
||||||
|
|
||||||
|
# Unpack audio data into 16-bit integers
|
||||||
|
fmt = f"{len(frame.audio)//2}h"
|
||||||
|
audio_samples = struct.unpack(fmt, frame.audio)
|
||||||
|
|
||||||
|
# Calculate RMS
|
||||||
|
sum_squares = sum(sample**2 for sample in audio_samples)
|
||||||
|
rms = math.sqrt(sum_squares / len(audio_samples))
|
||||||
|
|
||||||
|
# Convert RMS to decibels (dB)
|
||||||
|
# Reference: maximum value for 16-bit audio is 32767
|
||||||
|
if rms > 0:
|
||||||
|
db = 20 * math.log10(rms / 32767)
|
||||||
|
else:
|
||||||
|
db = -96 # Minimum value (almost silent)
|
||||||
|
|
||||||
|
return db
|
||||||
58
examples/fast-bot-metrics/runner.py
Normal file
58
examples/fast-bot-metrics/runner.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import urllib
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
def configure():
|
||||||
|
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||||
|
parser.add_argument(
|
||||||
|
"-u",
|
||||||
|
"--url",
|
||||||
|
type=str,
|
||||||
|
required=False,
|
||||||
|
help="URL of the Daily room to join")
|
||||||
|
parser.add_argument(
|
||||||
|
"-k",
|
||||||
|
"--apikey",
|
||||||
|
type=str,
|
||||||
|
required=False,
|
||||||
|
help="Daily API Key (needed to create an owner token for the room)",
|
||||||
|
)
|
||||||
|
|
||||||
|
args, unknown = parser.parse_known_args()
|
||||||
|
|
||||||
|
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||||
|
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||||
|
|
||||||
|
if not url:
|
||||||
|
raise Exception(
|
||||||
|
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
|
||||||
|
|
||||||
|
if not key:
|
||||||
|
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
|
||||||
|
|
||||||
|
# Create a meeting token for the given room with an expiration 1 hour in
|
||||||
|
# the future.
|
||||||
|
room_name: str = urllib.parse.urlparse(url).path[1:]
|
||||||
|
expiration: float = time.time() + 60 * 60
|
||||||
|
|
||||||
|
res: requests.Response = requests.post(
|
||||||
|
f"https://api.daily.co/v1/meeting-tokens",
|
||||||
|
headers={
|
||||||
|
"Authorization": f"Bearer {key}"},
|
||||||
|
json={
|
||||||
|
"properties": {
|
||||||
|
"room_name": room_name,
|
||||||
|
"is_owner": True,
|
||||||
|
"exp": expiration}},
|
||||||
|
)
|
||||||
|
|
||||||
|
if res.status_code != 200:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||||
|
|
||||||
|
token: str = res.json()["token"]
|
||||||
|
|
||||||
|
return (url, token)
|
||||||
165
examples/fast-bot-metrics/vad-gated-pipeline.py
Normal file
165
examples/fast-bot-metrics/vad-gated-pipeline.py
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024, Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from runner import configure
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
|
||||||
|
from pipecat.vad.vad_analyzer import VADParams
|
||||||
|
from pipecat.vad.silero import SileroVADAnalyzer
|
||||||
|
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
|
||||||
|
from pipecat.services.openai import OpenAILLMService, OpenAILLMContext
|
||||||
|
from pipecat.services.deepgram import DeepgramSTTService
|
||||||
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
|
from pipecat.processors.logger import FrameLogger
|
||||||
|
from pipecat.frames.frames import LLMMessagesFrame
|
||||||
|
|
||||||
|
|
||||||
|
from fastbothelpers import (
|
||||||
|
GreedyLLMAggregator,
|
||||||
|
ClearableDeepgramTTSService,
|
||||||
|
VADGate,
|
||||||
|
AudioVolumeTimer,
|
||||||
|
TranscriptionTimingLogger
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
|
||||||
|
logger.remove(0)
|
||||||
|
logger.add(sys.stderr, level="DEBUG")
|
||||||
|
|
||||||
|
|
||||||
|
async def main(room_url: str, token):
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
transport = DailyTransport(
|
||||||
|
room_url,
|
||||||
|
token,
|
||||||
|
"Respond bot",
|
||||||
|
DailyParams(
|
||||||
|
audio_out_enabled=True,
|
||||||
|
transcription_enabled=False,
|
||||||
|
vad_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.400)),
|
||||||
|
vad_audio_passthrough=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
stt = DeepgramSTTService(
|
||||||
|
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||||
|
**({'url': url} if (url := os.getenv("DEEPGRAM_STT_URL")) else {})
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
tts = ClearableDeepgramTTSService(
|
||||||
|
name="Voice",
|
||||||
|
aiohttp_session=session,
|
||||||
|
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||||
|
voice="aura-asteria-en",
|
||||||
|
**({'base_url': url} if (url := os.getenv("DEEPGRAM_TTS_BASE_URL")) else {})
|
||||||
|
)
|
||||||
|
|
||||||
|
llm = OpenAILLMService(
|
||||||
|
name="LLM",
|
||||||
|
# To use OpenAI
|
||||||
|
api_key=os.getenv("OPENAI_API_KEY"),
|
||||||
|
model=os.getenv("OPENAI_MODEL"),
|
||||||
|
base_url=os.getenv("OPENAI_BASE_URL")
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": """You are a helpful assistant in an audio conversation.
|
||||||
|
|
||||||
|
Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers.
|
||||||
|
|
||||||
|
Respond to what the user said in a creative and helpful way. Be concise in your answers to basic questions. If you are asked to elaborate or tell a story, provide a longer response.
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
ctx = OpenAILLMContext()
|
||||||
|
greedy = GreedyLLMAggregator(name="greedy", context=ctx)
|
||||||
|
gate = VADGate(name="gate", vad_analyzer=transport.input().vad_analyzer(), context=ctx)
|
||||||
|
avt = AudioVolumeTimer()
|
||||||
|
tl = TranscriptionTimingLogger(avt)
|
||||||
|
|
||||||
|
pipeline = Pipeline([
|
||||||
|
transport.input(), # Transport user input
|
||||||
|
avt,
|
||||||
|
stt,
|
||||||
|
tl,
|
||||||
|
greedy,
|
||||||
|
llm, # LLM
|
||||||
|
tts, # TTS
|
||||||
|
gate,
|
||||||
|
transport.output(), # Transport bot output
|
||||||
|
# FrameLogger()
|
||||||
|
])
|
||||||
|
|
||||||
|
task = PipelineTask(
|
||||||
|
pipeline,
|
||||||
|
PipelineParams(
|
||||||
|
allow_interruptions=True,
|
||||||
|
enable_metrics=True,
|
||||||
|
report_only_initial_ttfb=True
|
||||||
|
))
|
||||||
|
|
||||||
|
# When a participant joins, start transcription for that participant so the
|
||||||
|
# bot can "hear" and respond to them.
|
||||||
|
# @ transport.event_handler("on_participant_joined")
|
||||||
|
# async def on_participant_joined(transport, participant):
|
||||||
|
# transport.capture_participant_transcription(participant["id"])
|
||||||
|
|
||||||
|
# When the first participant joins, the bot should introduce itself.
|
||||||
|
@ transport.event_handler("on_first_participant_joined")
|
||||||
|
async def on_first_participant_joined(transport, participant):
|
||||||
|
messages.append(
|
||||||
|
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||||
|
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||||
|
|
||||||
|
# Handle "latency-ping" messages. The client will send app messages that look like
|
||||||
|
# this:
|
||||||
|
# { "latency-ping": { ts: <client-side timestamp> }}
|
||||||
|
#
|
||||||
|
# We want to send an immediate pong back to the client from this handler function.
|
||||||
|
# Also, we will push a frame into the top of the pipeline and send it after the
|
||||||
|
#
|
||||||
|
@ transport.event_handler("on_app_message")
|
||||||
|
async def on_app_message(transport, message, sender):
|
||||||
|
try:
|
||||||
|
if "latency-ping" in message:
|
||||||
|
logger.debug(f"Received latency ping app message: {message}")
|
||||||
|
ts = message["latency-ping"]["ts"]
|
||||||
|
# Send immediately
|
||||||
|
transport.output().send_message(DailyTransportMessageFrame(
|
||||||
|
message={"latency-pong-msg-handler": {"ts": ts}},
|
||||||
|
participant_id=sender))
|
||||||
|
# And push to the pipeline for the Daily transport.output to send
|
||||||
|
await tma_in.push_frame(
|
||||||
|
DailyTransportMessageFrame(
|
||||||
|
message={"latency-pong-pipeline-delivery": {"ts": ts}},
|
||||||
|
participant_id=sender))
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"message handling error: {e} - {message}")
|
||||||
|
|
||||||
|
runner = PipelineRunner()
|
||||||
|
await runner.run(task)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
(url, token) = configure()
|
||||||
|
asyncio.run(main(url, token))
|
||||||
Reference in New Issue
Block a user