From f411bf33fd738d2e8c948a84cbd9e0de4ed2533e Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Tue, 10 Sep 2024 10:15:48 -0700 Subject: [PATCH 1/9] adding a frame processor with the ability to save a conversation to a buffer and another frame processor to upload audio to Canonical for evaluation and metrics collection. Examples included --- examples/canonical-metrics/.gitignore | 161 ++++++++++++++ examples/canonical-metrics/Dockerfile | 16 ++ examples/canonical-metrics/README.md | 37 ++++ examples/canonical-metrics/bot.py | 146 +++++++++++++ examples/canonical-metrics/env.example | 5 + examples/canonical-metrics/requirements.txt | 5 + examples/canonical-metrics/runner.py | 54 +++++ examples/canonical-metrics/server.py | 147 +++++++++++++ examples/chatbot-audio-recording/.gitignore | 161 ++++++++++++++ examples/chatbot-audio-recording/Dockerfile | 16 ++ examples/chatbot-audio-recording/README.md | 37 ++++ examples/chatbot-audio-recording/bot.py | 136 ++++++++++++ examples/chatbot-audio-recording/env.example | 4 + .../chatbot-audio-recording/requirements.txt | 4 + examples/chatbot-audio-recording/runner.py | 54 +++++ examples/chatbot-audio-recording/server.py | 147 +++++++++++++ examples/simple-chatbot/requirements.txt | 2 +- pyproject.toml | 1 + src/pipecat/frames/frames.py | 14 +- .../processors/audio_buffer_processor.py | 75 +++++++ .../processors/canonical_metrics_processor.py | 202 ++++++++++++++++++ .../processors/user_marker_processor.py | 19 ++ 22 files changed, 1440 insertions(+), 3 deletions(-) create mode 100644 examples/canonical-metrics/.gitignore create mode 100644 examples/canonical-metrics/Dockerfile create mode 100644 examples/canonical-metrics/README.md create mode 100644 examples/canonical-metrics/bot.py create mode 100644 examples/canonical-metrics/env.example create mode 100644 examples/canonical-metrics/requirements.txt create mode 100644 examples/canonical-metrics/runner.py create mode 100644 examples/canonical-metrics/server.py create mode 100644 examples/chatbot-audio-recording/.gitignore create mode 100644 examples/chatbot-audio-recording/Dockerfile create mode 100644 examples/chatbot-audio-recording/README.md create mode 100644 examples/chatbot-audio-recording/bot.py create mode 100644 examples/chatbot-audio-recording/env.example create mode 100644 examples/chatbot-audio-recording/requirements.txt create mode 100644 examples/chatbot-audio-recording/runner.py create mode 100644 examples/chatbot-audio-recording/server.py create mode 100644 src/pipecat/processors/audio_buffer_processor.py create mode 100644 src/pipecat/processors/canonical_metrics_processor.py create mode 100644 src/pipecat/processors/user_marker_processor.py diff --git a/examples/canonical-metrics/.gitignore b/examples/canonical-metrics/.gitignore new file mode 100644 index 000000000..50d9d205e --- /dev/null +++ b/examples/canonical-metrics/.gitignore @@ -0,0 +1,161 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +recordings/ +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +runpod.toml diff --git a/examples/canonical-metrics/Dockerfile b/examples/canonical-metrics/Dockerfile new file mode 100644 index 000000000..704080eec --- /dev/null +++ b/examples/canonical-metrics/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-bullseye + +RUN mkdir /app +RUN mkdir /app/assets +RUN mkdir /app/utils +COPY *.py /app/ +COPY requirements.txt /app/ +copy assets/* /app/assets/ +copy utils/* /app/utils/ + +WORKDIR /app +RUN pip3 install -r requirements.txt + +EXPOSE 7860 + +CMD ["python3", "server.py"] \ No newline at end of file diff --git a/examples/canonical-metrics/README.md b/examples/canonical-metrics/README.md new file mode 100644 index 000000000..13c0b31e0 --- /dev/null +++ b/examples/canonical-metrics/README.md @@ -0,0 +1,37 @@ +# Simple Chatbot + + + +This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion. + +See a video of it in action: https://x.com/kwindla/status/1778628911817183509 + +And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416 + +ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded. + +## Get started + +```python +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp env.example .env # and add your credentials + +``` + +## Run the server + +```bash +python server.py +``` + +Then, visit `http://localhost:7860/start` in your browser to start a chatbot session. + +## Build and test the Docker image + +``` +docker build -t chatbot . +docker run --env-file .env -p 7860:7860 chatbot +``` diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py new file mode 100644 index 000000000..b6da3fbd4 --- /dev/null +++ b/examples/canonical-metrics/bot.py @@ -0,0 +1,146 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +import uuid + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.processors.canonical_metrics_processor import CanonicalMetrics +from pipecat.processors.user_marker_processor import UserMarkerProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + audio_in_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_audio_passthrough=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ) + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + + # + # English + # + voice_id="cgSgspJ2msm6clMCkdW9", + aiohttp_session=session, + + # + # Spanish + # + # model="eleven_multilingual_v2", + # voice_id="gD1IexrzCvsXPHUuT0s3", + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + # + # English + # + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your responses to 12 words or fewer.", + + # + # Spanish + # + # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", + }, + ] + + user_response = LLMUserResponseAggregator() + assistant_response = LLMAssistantResponseAggregator() + + """ + CanonicalMetrics uses AudioBufferProcessor under the hood to buffer the audio. On + call completion, CanonicalMetrics will send the audio buffer to Canonical for + analysis. Visit https://voice.canonical.chat to learn more. + """ + canonical = CanonicalMetrics( + call_id=str(uuid.uuid4()), + assistant="pipecat-chatbot", + assistant_speaks_first=True, + ) + usermarker = UserMarkerProcessor() + pipeline = Pipeline([ + transport.input(), # microphone + usermarker, # used to mark the user's audio in the pipeline + user_response, + llm, + tts, + canonical, # captures audio and uploads to Canonical AI for metrics + transport.output(), + assistant_response, + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + print(f"Participant left: {participant}") + await task.queue_frame(EndFrame()) + + @transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + if state == "left": + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/canonical-metrics/env.example b/examples/canonical-metrics/env.example new file mode 100644 index 000000000..39f209372 --- /dev/null +++ b/examples/canonical-metrics/env.example @@ -0,0 +1,5 @@ +DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev) +DAILY_API_KEY=7df... +OPENAI_API_KEY=sk-PL... +ELEVENLABS_API_KEY=aeb... +CANONICAL_API_KEY=can... \ No newline at end of file diff --git a/examples/canonical-metrics/requirements.txt b/examples/canonical-metrics/requirements.txt new file mode 100644 index 000000000..7e53edc6b --- /dev/null +++ b/examples/canonical-metrics/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv +fastapi[all] +uvicorn +pipecat-ai[daily,openai,silero,elevenlabs,canonical] + diff --git a/examples/canonical-metrics/runner.py b/examples/canonical-metrics/runner.py new file mode 100644 index 000000000..7507d28d6 --- /dev/null +++ b/examples/canonical-metrics/runner.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import aiohttp +import argparse +import os + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper + + +async def configure(aiohttp_session: aiohttp.ClientSession): + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-u", + "--url", + type=str, + required=False, + help="URL of the Daily room to join") + parser.add_argument( + "-k", + "--apikey", + type=str, + required=False, + help="Daily API Key (needed to create an owner token for the room)", + ) + + args, unknown = parser.parse_known_args() + + url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") + key = args.apikey or os.getenv("DAILY_API_KEY") + + if not url: + raise Exception( + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.") + + if not key: + raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.") + + daily_rest_helper = DailyRESTHelper( + daily_api_key=key, + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session + ) + + # Create a meeting token for the given room with an expiration 1 hour in + # the future. + expiry_time: float = 60 * 60 + + token = await daily_rest_helper.get_token(url, expiry_time) + + return (url, token) diff --git a/examples/canonical-metrics/server.py b/examples/canonical-metrics/server.py new file mode 100644 index 000000000..2c717ffa2 --- /dev/null +++ b/examples/canonical-metrics/server.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os +import subprocess +from contextlib import asynccontextmanager + +import aiohttp +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, RedirectResponse + +from pipecat.transports.services.helpers.daily_rest import (DailyRESTHelper, + DailyRoomParams) + +MAX_BOTS_PER_ROOM = 1 + +# Bot sub-process dict for status reporting and concurrency control +bot_procs = {} + +daily_helpers = {} + +load_dotenv(override=True) + + +def cleanup(): + # Clean up function, just to be extra safe + for entry in bot_procs.values(): + proc = entry[0] + proc.terminate() + proc.wait() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + aiohttp_session = aiohttp.ClientSession() + daily_helpers["rest"] = DailyRESTHelper( + daily_api_key=os.getenv("DAILY_API_KEY", ""), + daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'), + aiohttp_session=aiohttp_session + ) + yield + await aiohttp_session.close() + cleanup() + +app = FastAPI(lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/start") +async def start_agent(request: Request): + print(f"!!! Creating room") + room = await daily_helpers["rest"].create_room(DailyRoomParams()) + print(f"!!! Room URL: {room.url}") + # Ensure the room property is present + if not room.url: + raise HTTPException( + status_code=500, + detail="Missing 'room' property in request data. Cannot start agent without a target room!") + + # Check if there is already an existing process running in this room + num_bots_in_room = sum( + 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None) + if num_bots_in_room >= MAX_BOTS_PER_ROOM: + raise HTTPException( + status_code=500, detail=f"Max bot limited reach for room: {room.url}") + + # Get the token for the room + token = await daily_helpers["rest"].get_token(room.url) + + if not token: + raise HTTPException( + status_code=500, detail=f"Failed to get token for room: {room.url}") + + # Spawn a new agent, and join the user session + # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) + try: + proc = subprocess.Popen( + [ + f"python3 -m bot -u {room.url} -t {token}" + ], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__)) + ) + bot_procs[proc.pid] = (proc, room.url) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to start subprocess: {e}") + + return RedirectResponse(room.url) + + +@app.get("/status/{pid}") +def get_status(pid: int): + # Look up the subprocess + proc = bot_procs.get(pid) + + # If the subprocess doesn't exist, return an error + if not proc: + raise HTTPException( + status_code=404, detail=f"Bot with process id: {pid} not found") + + # Check the status of the subprocess + if proc[0].poll() is None: + status = "running" + else: + status = "finished" + + return JSONResponse({"bot_id": pid, "status": status}) + + +if __name__ == "__main__": + import uvicorn + + default_host = os.getenv("HOST", "0.0.0.0") + default_port = int(os.getenv("FAST_API_PORT", "7860")) + + parser = argparse.ArgumentParser( + description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, + default=default_host, help="Host address") + parser.add_argument("--port", type=int, + default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", + help="Reload code on change") + + config = parser.parse_args() + + uvicorn.run( + "server:app", + host=config.host, + port=config.port, + reload=config.reload, + ) diff --git a/examples/chatbot-audio-recording/.gitignore b/examples/chatbot-audio-recording/.gitignore new file mode 100644 index 000000000..2bc1403d1 --- /dev/null +++ b/examples/chatbot-audio-recording/.gitignore @@ -0,0 +1,161 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +runpod.toml diff --git a/examples/chatbot-audio-recording/Dockerfile b/examples/chatbot-audio-recording/Dockerfile new file mode 100644 index 000000000..704080eec --- /dev/null +++ b/examples/chatbot-audio-recording/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-bullseye + +RUN mkdir /app +RUN mkdir /app/assets +RUN mkdir /app/utils +COPY *.py /app/ +COPY requirements.txt /app/ +copy assets/* /app/assets/ +copy utils/* /app/utils/ + +WORKDIR /app +RUN pip3 install -r requirements.txt + +EXPOSE 7860 + +CMD ["python3", "server.py"] \ No newline at end of file diff --git a/examples/chatbot-audio-recording/README.md b/examples/chatbot-audio-recording/README.md new file mode 100644 index 000000000..13c0b31e0 --- /dev/null +++ b/examples/chatbot-audio-recording/README.md @@ -0,0 +1,37 @@ +# Simple Chatbot + + + +This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion. + +See a video of it in action: https://x.com/kwindla/status/1778628911817183509 + +And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416 + +ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded. + +## Get started + +```python +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp env.example .env # and add your credentials + +``` + +## Run the server + +```bash +python server.py +``` + +Then, visit `http://localhost:7860/start` in your browser to start a chatbot session. + +## Build and test the Docker image + +``` +docker build -t chatbot . +docker run --env-file .env -p 7860:7860 chatbot +``` diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py new file mode 100644 index 000000000..7297de215 --- /dev/null +++ b/examples/chatbot-audio-recording/bot.py @@ -0,0 +1,136 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.processors.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.user_marker_processor import UserMarkerProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + audio_in_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_audio_passthrough=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ) + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + + # + # English + # + voice_id="cgSgspJ2msm6clMCkdW9", + aiohttp_session=session, + + # + # Spanish + # + # model="eleven_multilingual_v2", + # voice_id="gD1IexrzCvsXPHUuT0s3", + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + # + # English + # + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your response to 12 words or fewer.", + + # + # Spanish + # + # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", + }, + ] + + user_response = LLMUserResponseAggregator() + assistant_response = LLMAssistantResponseAggregator() + + audiobuffer = AudioBufferProcessor() + usermarker = UserMarkerProcessor() + pipeline = Pipeline([ + transport.input(), # microphone + usermarker, # used to mark the user's audio in the pipeline + user_response, + llm, + tts, + audiobuffer, # used to buffer the audio in the pipeline + transport.output(), + assistant_response, + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + print(f"Participant left: {participant}") + await task.queue_frame(EndFrame()) + + @transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + if state == "left": + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/chatbot-audio-recording/env.example b/examples/chatbot-audio-recording/env.example new file mode 100644 index 000000000..d368ae510 --- /dev/null +++ b/examples/chatbot-audio-recording/env.example @@ -0,0 +1,4 @@ +DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev) +DAILY_API_KEY=7df... +OPENAI_API_KEY=sk-PL... +ELEVENLABS_API_KEY=aeb... \ No newline at end of file diff --git a/examples/chatbot-audio-recording/requirements.txt b/examples/chatbot-audio-recording/requirements.txt new file mode 100644 index 000000000..9786b52de --- /dev/null +++ b/examples/chatbot-audio-recording/requirements.txt @@ -0,0 +1,4 @@ +python-dotenv +fastapi[all] +uvicorn +pipecat-ai[daily,openai,silero,elevenlabs] diff --git a/examples/chatbot-audio-recording/runner.py b/examples/chatbot-audio-recording/runner.py new file mode 100644 index 000000000..7507d28d6 --- /dev/null +++ b/examples/chatbot-audio-recording/runner.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import aiohttp +import argparse +import os + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper + + +async def configure(aiohttp_session: aiohttp.ClientSession): + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-u", + "--url", + type=str, + required=False, + help="URL of the Daily room to join") + parser.add_argument( + "-k", + "--apikey", + type=str, + required=False, + help="Daily API Key (needed to create an owner token for the room)", + ) + + args, unknown = parser.parse_known_args() + + url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") + key = args.apikey or os.getenv("DAILY_API_KEY") + + if not url: + raise Exception( + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.") + + if not key: + raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.") + + daily_rest_helper = DailyRESTHelper( + daily_api_key=key, + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session + ) + + # Create a meeting token for the given room with an expiration 1 hour in + # the future. + expiry_time: float = 60 * 60 + + token = await daily_rest_helper.get_token(url, expiry_time) + + return (url, token) diff --git a/examples/chatbot-audio-recording/server.py b/examples/chatbot-audio-recording/server.py new file mode 100644 index 000000000..2c717ffa2 --- /dev/null +++ b/examples/chatbot-audio-recording/server.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os +import subprocess +from contextlib import asynccontextmanager + +import aiohttp +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, RedirectResponse + +from pipecat.transports.services.helpers.daily_rest import (DailyRESTHelper, + DailyRoomParams) + +MAX_BOTS_PER_ROOM = 1 + +# Bot sub-process dict for status reporting and concurrency control +bot_procs = {} + +daily_helpers = {} + +load_dotenv(override=True) + + +def cleanup(): + # Clean up function, just to be extra safe + for entry in bot_procs.values(): + proc = entry[0] + proc.terminate() + proc.wait() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + aiohttp_session = aiohttp.ClientSession() + daily_helpers["rest"] = DailyRESTHelper( + daily_api_key=os.getenv("DAILY_API_KEY", ""), + daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'), + aiohttp_session=aiohttp_session + ) + yield + await aiohttp_session.close() + cleanup() + +app = FastAPI(lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/start") +async def start_agent(request: Request): + print(f"!!! Creating room") + room = await daily_helpers["rest"].create_room(DailyRoomParams()) + print(f"!!! Room URL: {room.url}") + # Ensure the room property is present + if not room.url: + raise HTTPException( + status_code=500, + detail="Missing 'room' property in request data. Cannot start agent without a target room!") + + # Check if there is already an existing process running in this room + num_bots_in_room = sum( + 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None) + if num_bots_in_room >= MAX_BOTS_PER_ROOM: + raise HTTPException( + status_code=500, detail=f"Max bot limited reach for room: {room.url}") + + # Get the token for the room + token = await daily_helpers["rest"].get_token(room.url) + + if not token: + raise HTTPException( + status_code=500, detail=f"Failed to get token for room: {room.url}") + + # Spawn a new agent, and join the user session + # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) + try: + proc = subprocess.Popen( + [ + f"python3 -m bot -u {room.url} -t {token}" + ], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__)) + ) + bot_procs[proc.pid] = (proc, room.url) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to start subprocess: {e}") + + return RedirectResponse(room.url) + + +@app.get("/status/{pid}") +def get_status(pid: int): + # Look up the subprocess + proc = bot_procs.get(pid) + + # If the subprocess doesn't exist, return an error + if not proc: + raise HTTPException( + status_code=404, detail=f"Bot with process id: {pid} not found") + + # Check the status of the subprocess + if proc[0].poll() is None: + status = "running" + else: + status = "finished" + + return JSONResponse({"bot_id": pid, "status": status}) + + +if __name__ == "__main__": + import uvicorn + + default_host = os.getenv("HOST", "0.0.0.0") + default_port = int(os.getenv("FAST_API_PORT", "7860")) + + parser = argparse.ArgumentParser( + description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, + default=default_host, help="Host address") + parser.add_argument("--port", type=int, + default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", + help="Reload code on change") + + config = parser.parse_args() + + uvicorn.run( + "server:app", + host=config.host, + port=config.port, + reload=config.reload, + ) diff --git a/examples/simple-chatbot/requirements.txt b/examples/simple-chatbot/requirements.txt index a7a8729df..9786b52de 100644 --- a/examples/simple-chatbot/requirements.txt +++ b/examples/simple-chatbot/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,openai,silero,elevenlabs] diff --git a/pyproject.toml b/pyproject.toml index 73c643ddc..41e1af149 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ Website = "https://pipecat.ai" [project.optional-dependencies] anthropic = [ "anthropic~=0.34.0" ] azure = [ "azure-cognitiveservices-speech~=1.40.0" ] +canonical = [ "aiofiles~=24.1.0" ] cartesia = [ "websockets~=12.0" ] daily = [ "daily-python~=0.10.1" ] deepgram = [ "deepgram-sdk~=3.5.0" ] diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 13c2f53f1..4eee87cc9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -4,9 +4,9 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import Any, List, Mapping, Optional, Tuple - +import time from dataclasses import dataclass, field +from typing import Any, List, Mapping, Optional, Tuple from pipecat.transcriptions.language import Language from pipecat.utils.utils import obj_count, obj_id @@ -223,6 +223,16 @@ class TransportMessageFrame(DataFrame): class AppFrame(Frame): pass + +@dataclass +class UserAudioFrame(AudioRawFrame): + """ + Indicates user audio in the pipeline. + """ + + def __init__(self, frame: AudioRawFrame): + super().__init__(frame.audio, frame.sample_rate, frame.num_channels) + # # System frames # diff --git a/src/pipecat/processors/audio_buffer_processor.py b/src/pipecat/processors/audio_buffer_processor.py new file mode 100644 index 000000000..f724e7c5e --- /dev/null +++ b/src/pipecat/processors/audio_buffer_processor.py @@ -0,0 +1,75 @@ +from pipecat.frames.frames import (AudioRawFrame, BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, Frame, + UserAudioFrame, UserStoppedSpeakingFrame) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class AudioBufferProcessor(FrameProcessor): + + def __init__(self): + """ + Initialize the AudioBufferProcessor. + + This constructor sets up the initial state for audio processing: + - audio_buffer: A bytearray to store incoming audio data. + - num_channels: The number of audio channels (initialized as None). + - sample_rate: The sample rate of the audio (initialized as None). + - assistant_audio: A boolean flag to indicate if assistant audio is being processed. + - user_audio: A boolean flag to indicate if user audio is being processed. + + The num_channels and sample_rate are set to None initially and will be + populated when the first audio frame is processed. + """ + super().__init__() + self.audio_buffer = bytearray() + self.num_channels = None + self.sample_rate = None + self.assistant_audio = False + self.user_audio = False + + def has_audio(self): + return ( + self.audio_buffer and + len(self.audio_buffer) > 0 and + self.num_channels and + self.sample_rate + ) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, AudioRawFrame) or isinstance(frame, UserAudioFrame): + if self.num_channels is None: + self.num_channels = frame.num_channels + if self.sample_rate is None: + self.sample_rate = frame.sample_rate + + elif isinstance(frame, UserStoppedSpeakingFrame): + self.user_audio = False + + if isinstance(frame, BotStartedSpeakingFrame): + self.assistant_audio = True + self.user_audio = False # do not capture user audio if assistant is speaking + if isinstance(frame, BotStoppedSpeakingFrame): + self.assistant_audio = False + # Capture user audio if assistant is not speaking, even if it's silence, the point + # here is to capture so that the conversation is as close to reality as possible. + # This is important for evaluation and metrics capture. + self.user_audio = True + + # only include audio from the user if the user is speaking, this is because audio from the user's + # mic is always coming in. if we include all the user's audio there will be a long latency before + # the user starts speaking because all of the user's silence during the assistant's speech will have been + # added to the buffer. + if isinstance(frame, UserAudioFrame) and self.user_audio: + self.audio_buffer.extend(frame.audio) + + # include all audio from the assistant + if ( + isinstance(frame, AudioRawFrame) + and not isinstance(frame, UserAudioFrame) + ): + self.audio_buffer.extend(frame.audio) + + # do not push the user's audio frame, doing so will result in echo + if not isinstance(frame, UserAudioFrame): + await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/canonical_metrics_processor.py b/src/pipecat/processors/canonical_metrics_processor.py new file mode 100644 index 000000000..e7c0afb61 --- /dev/null +++ b/src/pipecat/processors/canonical_metrics_processor.py @@ -0,0 +1,202 @@ +import os +import uuid +import wave +from datetime import datetime +from io import BytesIO +from typing import Dict, List, Tuple + +import aiohttp +from loguru import logger + +try: + import aiofiles + import aiofiles.os +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Canonical Metrics, you need to `pip install pipecat-ai[canonical]`. " + + "Also, set the `CANONICAL_API_KEY` environment variable.") + raise Exception(f"Missing module: {e}") + + +from pipecat.frames.frames import CancelFrame, EndFrame, Frame +from pipecat.processors.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.frame_processor import FrameDirection + +""" +This class extends AudioBufferProcessor to handle audio processing and uploading +for the Canonical Voice API. +""" + + +class CanonicalMetrics(AudioBufferProcessor): + """ + Initialize a CanonicalAudioProcessor instance. + + This class extends AudioBufferProcessor to handle audio processing and uploading + for the Canonical Voice API. + + Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Canonical Voice system to the call in your system. + assistant (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish + between different assistants and a grouping mechanism for calls. + assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. + output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". + default_part_size (int, optional): Default size for multipart upload parts in bytes. Defaults to 1MB (1024 * 1024 * 1). + + Attributes: + call_id (str): Stores the unique call identifier. + assistant (str): Stores the assistant identifier. + assistant_speaks_first (bool): Indicates whether the assistant speaks first. + output_dir (str): Directory path for saving temporary audio files. + partsize (int): Size of each part for multipart uploads. + + The constructor also ensures that the output directory exists. + This class requires a Canonical API key to be set in the CANONICAL_API_KEY environment variable. + """ + + def __init__( + self, + call_id: str, + assistant: str, + assistant_speaks_first: bool = True, + output_dir: str = "recordings", + default_part_size: int = 1024 * 1024 * 1): + super().__init__() + if not os.environ.get("CANONICAL_API_KEY"): + raise ValueError( + "CANONICAL_API_KEY is not set, a Canonical API key is required to use this class") + self.call_id = call_id + self.assistant = assistant + self.assistant_speaks_first = assistant_speaks_first + self.output_dir = output_dir + self.partsize = default_part_size + self.end_of_call = False + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self.end_of_call: + return + + if (isinstance(frame, EndFrame) or isinstance(frame, CancelFrame)): + self.end_of_call = True + if self.has_audio(): + os.makedirs(self.output_dir, exist_ok=True) + filename = self.get_output_filename() + with BytesIO() as buffer: + with wave.open(buffer, 'wb') as wf: + wf.setnchannels(self.num_channels) + wf.setsampwidth(self.sample_rate // 8000) + wf.setframerate(self.sample_rate) + wf.writeframes(self.audio_buffer) + wave_data = buffer.getvalue() + + async with aiofiles.open(filename, 'wb') as file: + await file.write(wave_data) + + try: + await self.multipart_upload(filename) + await aiofiles.os.remove(filename) + except FileNotFoundError: + pass + except Exception as e: + raise e + self.audio_buffer = bytearray() + + def get_output_filename(self): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{self.output_dir}/{timestamp}-{uuid.uuid4().hex}.wav" + + def canonical_api_url(self): + return os.environ.get("CANONICAL_API_URL", "https://voiceapp.canonical.chat/api/v1") + + def request_headers(self): + return { + "Content-Type": "application/json", + "X-Canonical-Api-Key": os.environ.get("CANONICAL_API_KEY") + } + + async def multipart_upload(self, file_path: str): + upload_request, upload_response = await self.request_upload(file_path) + parts = await self.upload_parts(file_path, upload_request, upload_response) + await self.upload_complete(parts, upload_request, upload_response) + + async def request_upload(self, file_path: str) -> Tuple[Dict, Dict]: + filename = os.path.basename(file_path) + filename = f"{str(uuid.uuid4())}-{filename}" + filesize = os.path.getsize(file_path) + numparts = int((filesize + self.partsize - 1) / self.partsize) + + params = { + 'filename': filename, + 'parts': numparts, + 'assistant': self.assistant, + 'assistantSpeaksFirst': self.assistant_speaks_first + } + print(f"Requesting presigned URLs for {numparts} parts") + async with aiohttp.ClientSession() as session: + async with session.post( + f"{self.canonical_api_url()}/recording/uploadRequest", + headers=self.request_headers(), + json=params + ) as response: + if not response.ok: + raise Exception(f"Failed to get presigned URLs: {await response.text()}") + response_json = await response.json() + return params, response_json + + async def upload_parts( + self, + file_path: str, + upload_request: Dict, + upload_response: Dict) -> List[Dict]: + + urls = upload_response['urls'] + parts = [] + try: + async with aiofiles.open(file_path, 'rb') as file: + async with aiohttp.ClientSession() as session: + for partnum, upload_url in enumerate(urls, start=1): + data = await file.read(self.partsize) + if not data: + break + + async with session.put(upload_url, data=data) as response: + if not response.ok: + logger.error(f"Failed to upload part {partnum}: {await response.text()}") + raise Exception(f"Failed to upload part {partnum}: {await response.text()}") + + etag = response.headers['ETag'] + parts.append({'partnum': str(partnum), 'etag': etag}) + + except Exception as e: + logger.error(f"Multipart upload aborted, an error occurred: {str(e)}") + return parts + + async def upload_complete( + self, + parts: List[Dict], + upload_request: Dict, + upload_response: Dict): + + params = { + 'filename': upload_request['filename'], + 'parts': parts, + 'slug': upload_response['slug'], + 'callId': self.call_id, + 'assistant': { + 'id': self.assistant, + 'speaksFirst': self.assistant_speaks_first + } + } + print(f"Completing upload for {params['filename']}") + print(f"Slug: {params['slug']}") + async with aiohttp.ClientSession() as session: + async with session.post( + f"{self.canonical_api_url()}/recording/uploadComplete", + headers=self.request_headers(), + json=params + ) as response: + if not response.ok: + logger.error(f"Failed to complete upload: {await response.text()}") + raise Exception(f"Failed to complete upload: {await response.text()}") diff --git a/src/pipecat/processors/user_marker_processor.py b/src/pipecat/processors/user_marker_processor.py new file mode 100644 index 000000000..cadbfffeb --- /dev/null +++ b/src/pipecat/processors/user_marker_processor.py @@ -0,0 +1,19 @@ +from pipecat.frames.frames import AudioRawFrame, Frame, UserAudioFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class UserMarkerProcessor(FrameProcessor): + """ + This class extends FrameProcessor, used to mark the user's audio in the pipeline. + This FrameProcessor must be inserted after transport.input() so that the only + AudioRaw it receives are from the user. + """ + + def __init__(self): + super().__init__() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, AudioRawFrame): + frame = UserAudioFrame(frame) + await self.push_frame(frame, direction) From 2e02ab740d13d23964e1b7bc2c2735923f6307fa Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Sun, 15 Sep 2024 20:59:17 -0700 Subject: [PATCH 2/9] PR feedback --- examples/canonical-metrics/bot.py | 16 +- examples/chatbot-audio-recording/bot.py | 2 +- .../{ => audio}/audio_buffer_processor.py | 58 ++--- .../processors/canonical_metrics_processor.py | 202 ----------------- src/pipecat/services/canonical.py | 212 ++++++++++++++++++ 5 files changed, 258 insertions(+), 232 deletions(-) rename src/pipecat/processors/{ => audio}/audio_buffer_processor.py (60%) delete mode 100644 src/pipecat/processors/canonical_metrics_processor.py create mode 100644 src/pipecat/services/canonical.py diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py index b6da3fbd4..111c1666e 100644 --- a/examples/canonical-metrics/bot.py +++ b/examples/canonical-metrics/bot.py @@ -20,8 +20,10 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.canonical_metrics_processor import CanonicalMetrics +from pipecat.processors.audio.audio_buffer_processor import \ + AudioBufferProcessor from pipecat.processors.user_marker_processor import UserMarkerProcessor +from pipecat.services.canonical import CanonicalMetricsService from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -103,7 +105,12 @@ async def main(): call completion, CanonicalMetrics will send the audio buffer to Canonical for analysis. Visit https://voice.canonical.chat to learn more. """ - canonical = CanonicalMetrics( + audio_buffer_processor = AudioBufferProcessor() + canonical = CanonicalMetricsService( + audio_buffer_processor=audio_buffer_processor, + aiohttp_session=session, + api_key=os.getenv("CANONICAL_API_KEY"), + api_url=os.getenv("CANONICAL_API_URL"), call_id=str(uuid.uuid4()), assistant="pipecat-chatbot", assistant_speaks_first=True, @@ -111,11 +118,12 @@ async def main(): usermarker = UserMarkerProcessor() pipeline = Pipeline([ transport.input(), # microphone - usermarker, # used to mark the user's audio in the pipeline + usermarker, user_response, llm, tts, - canonical, # captures audio and uploads to Canonical AI for metrics + audio_buffer_processor, # captures audio into a buffer + canonical, # uploads audio buffer to Canonical AI for metrics transport.output(), assistant_response, ]) diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py index 7297de215..b6da9b54d 100644 --- a/examples/chatbot-audio-recording/bot.py +++ b/examples/chatbot-audio-recording/bot.py @@ -19,7 +19,7 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.processors.user_marker_processor import UserMarkerProcessor from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService diff --git a/src/pipecat/processors/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py similarity index 60% rename from src/pipecat/processors/audio_buffer_processor.py rename to src/pipecat/processors/audio/audio_buffer_processor.py index f724e7c5e..8edf197e4 100644 --- a/src/pipecat/processors/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -1,6 +1,7 @@ from pipecat.frames.frames import (AudioRawFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame, - UserAudioFrame, UserStoppedSpeakingFrame) + UserAudioFrame, UserStartedSpeakingFrame, + UserStoppedSpeakingFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -21,54 +22,61 @@ class AudioBufferProcessor(FrameProcessor): populated when the first audio frame is processed. """ super().__init__() - self.audio_buffer = bytearray() - self.num_channels = None - self.sample_rate = None - self.assistant_audio = False - self.user_audio = False + self._audio_buffer = bytearray() + self._num_channels = None + self._sample_rate = None + self._assistant_audio = False + self._user_audio = False + print(f"ctor::AudioBufferProcessor object memory address: {id(self)}") - def has_audio(self): + def _has_audio(self): return ( - self.audio_buffer and - len(self.audio_buffer) > 0 and - self.num_channels and - self.sample_rate + self._audio_buffer is not None and + len(self._audio_buffer) > 0 and + self._num_channels is not None and + self._sample_rate is not None ) + def _reset_audio_buffer(self): + self._audio_buffer = bytearray() + async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame) or isinstance(frame, UserAudioFrame): - if self.num_channels is None: - self.num_channels = frame.num_channels - if self.sample_rate is None: - self.sample_rate = frame.sample_rate + if isinstance(frame, AudioRawFrame): + if self._num_channels is None: + self._num_channels = frame.num_channels + if self._sample_rate is None: + self._sample_rate = frame.sample_rate - elif isinstance(frame, UserStoppedSpeakingFrame): - self.user_audio = False + if isinstance(frame, UserStoppedSpeakingFrame): + self._user_audio = False if isinstance(frame, BotStartedSpeakingFrame): - self.assistant_audio = True - self.user_audio = False # do not capture user audio if assistant is speaking + self._assistant_audio = True + self._user_audio = False # do not capture user audio if assistant is speaking + if isinstance(frame, BotStoppedSpeakingFrame): - self.assistant_audio = False + self._assistant_audio = False # Capture user audio if assistant is not speaking, even if it's silence, the point # here is to capture so that the conversation is as close to reality as possible. # This is important for evaluation and metrics capture. - self.user_audio = True + self._user_audio = True # only include audio from the user if the user is speaking, this is because audio from the user's # mic is always coming in. if we include all the user's audio there will be a long latency before # the user starts speaking because all of the user's silence during the assistant's speech will have been # added to the buffer. - if isinstance(frame, UserAudioFrame) and self.user_audio: - self.audio_buffer.extend(frame.audio) + # + # and include all audio from the assistant + if isinstance(frame, UserAudioFrame) and self._user_audio: + self._audio_buffer.extend(frame.audio) # include all audio from the assistant if ( isinstance(frame, AudioRawFrame) and not isinstance(frame, UserAudioFrame) ): - self.audio_buffer.extend(frame.audio) + self._audio_buffer.extend(frame.audio) # do not push the user's audio frame, doing so will result in echo if not isinstance(frame, UserAudioFrame): diff --git a/src/pipecat/processors/canonical_metrics_processor.py b/src/pipecat/processors/canonical_metrics_processor.py deleted file mode 100644 index e7c0afb61..000000000 --- a/src/pipecat/processors/canonical_metrics_processor.py +++ /dev/null @@ -1,202 +0,0 @@ -import os -import uuid -import wave -from datetime import datetime -from io import BytesIO -from typing import Dict, List, Tuple - -import aiohttp -from loguru import logger - -try: - import aiofiles - import aiofiles.os -except ModuleNotFoundError as e: - logger.error(f"Exception: {e}") - logger.error( - "In order to use Canonical Metrics, you need to `pip install pipecat-ai[canonical]`. " + - "Also, set the `CANONICAL_API_KEY` environment variable.") - raise Exception(f"Missing module: {e}") - - -from pipecat.frames.frames import CancelFrame, EndFrame, Frame -from pipecat.processors.audio_buffer_processor import AudioBufferProcessor -from pipecat.processors.frame_processor import FrameDirection - -""" -This class extends AudioBufferProcessor to handle audio processing and uploading -for the Canonical Voice API. -""" - - -class CanonicalMetrics(AudioBufferProcessor): - """ - Initialize a CanonicalAudioProcessor instance. - - This class extends AudioBufferProcessor to handle audio processing and uploading - for the Canonical Voice API. - - Args: - call_id (str): Your unique identifier for the call. This is used to match the call in the Canonical Voice system to the call in your system. - assistant (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish - between different assistants and a grouping mechanism for calls. - assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. - output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". - default_part_size (int, optional): Default size for multipart upload parts in bytes. Defaults to 1MB (1024 * 1024 * 1). - - Attributes: - call_id (str): Stores the unique call identifier. - assistant (str): Stores the assistant identifier. - assistant_speaks_first (bool): Indicates whether the assistant speaks first. - output_dir (str): Directory path for saving temporary audio files. - partsize (int): Size of each part for multipart uploads. - - The constructor also ensures that the output directory exists. - This class requires a Canonical API key to be set in the CANONICAL_API_KEY environment variable. - """ - - def __init__( - self, - call_id: str, - assistant: str, - assistant_speaks_first: bool = True, - output_dir: str = "recordings", - default_part_size: int = 1024 * 1024 * 1): - super().__init__() - if not os.environ.get("CANONICAL_API_KEY"): - raise ValueError( - "CANONICAL_API_KEY is not set, a Canonical API key is required to use this class") - self.call_id = call_id - self.assistant = assistant - self.assistant_speaks_first = assistant_speaks_first - self.output_dir = output_dir - self.partsize = default_part_size - self.end_of_call = False - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self.end_of_call: - return - - if (isinstance(frame, EndFrame) or isinstance(frame, CancelFrame)): - self.end_of_call = True - if self.has_audio(): - os.makedirs(self.output_dir, exist_ok=True) - filename = self.get_output_filename() - with BytesIO() as buffer: - with wave.open(buffer, 'wb') as wf: - wf.setnchannels(self.num_channels) - wf.setsampwidth(self.sample_rate // 8000) - wf.setframerate(self.sample_rate) - wf.writeframes(self.audio_buffer) - wave_data = buffer.getvalue() - - async with aiofiles.open(filename, 'wb') as file: - await file.write(wave_data) - - try: - await self.multipart_upload(filename) - await aiofiles.os.remove(filename) - except FileNotFoundError: - pass - except Exception as e: - raise e - self.audio_buffer = bytearray() - - def get_output_filename(self): - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - return f"{self.output_dir}/{timestamp}-{uuid.uuid4().hex}.wav" - - def canonical_api_url(self): - return os.environ.get("CANONICAL_API_URL", "https://voiceapp.canonical.chat/api/v1") - - def request_headers(self): - return { - "Content-Type": "application/json", - "X-Canonical-Api-Key": os.environ.get("CANONICAL_API_KEY") - } - - async def multipart_upload(self, file_path: str): - upload_request, upload_response = await self.request_upload(file_path) - parts = await self.upload_parts(file_path, upload_request, upload_response) - await self.upload_complete(parts, upload_request, upload_response) - - async def request_upload(self, file_path: str) -> Tuple[Dict, Dict]: - filename = os.path.basename(file_path) - filename = f"{str(uuid.uuid4())}-{filename}" - filesize = os.path.getsize(file_path) - numparts = int((filesize + self.partsize - 1) / self.partsize) - - params = { - 'filename': filename, - 'parts': numparts, - 'assistant': self.assistant, - 'assistantSpeaksFirst': self.assistant_speaks_first - } - print(f"Requesting presigned URLs for {numparts} parts") - async with aiohttp.ClientSession() as session: - async with session.post( - f"{self.canonical_api_url()}/recording/uploadRequest", - headers=self.request_headers(), - json=params - ) as response: - if not response.ok: - raise Exception(f"Failed to get presigned URLs: {await response.text()}") - response_json = await response.json() - return params, response_json - - async def upload_parts( - self, - file_path: str, - upload_request: Dict, - upload_response: Dict) -> List[Dict]: - - urls = upload_response['urls'] - parts = [] - try: - async with aiofiles.open(file_path, 'rb') as file: - async with aiohttp.ClientSession() as session: - for partnum, upload_url in enumerate(urls, start=1): - data = await file.read(self.partsize) - if not data: - break - - async with session.put(upload_url, data=data) as response: - if not response.ok: - logger.error(f"Failed to upload part {partnum}: {await response.text()}") - raise Exception(f"Failed to upload part {partnum}: {await response.text()}") - - etag = response.headers['ETag'] - parts.append({'partnum': str(partnum), 'etag': etag}) - - except Exception as e: - logger.error(f"Multipart upload aborted, an error occurred: {str(e)}") - return parts - - async def upload_complete( - self, - parts: List[Dict], - upload_request: Dict, - upload_response: Dict): - - params = { - 'filename': upload_request['filename'], - 'parts': parts, - 'slug': upload_response['slug'], - 'callId': self.call_id, - 'assistant': { - 'id': self.assistant, - 'speaksFirst': self.assistant_speaks_first - } - } - print(f"Completing upload for {params['filename']}") - print(f"Slug: {params['slug']}") - async with aiohttp.ClientSession() as session: - async with session.post( - f"{self.canonical_api_url()}/recording/uploadComplete", - headers=self.request_headers(), - json=params - ) as response: - if not response.ok: - logger.error(f"Failed to complete upload: {await response.text()}") - raise Exception(f"Failed to complete upload: {await response.text()}") diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py new file mode 100644 index 000000000..a973d5481 --- /dev/null +++ b/src/pipecat/services/canonical.py @@ -0,0 +1,212 @@ +import os +import uuid +import wave +from datetime import datetime +from io import BytesIO +from typing import Dict, List, Tuple + +import aiohttp +from loguru import logger + +try: + import aiofiles + import aiofiles.os +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Canonical Metrics, you need to `pip install pipecat-ai[canonical]`. " + + "Also, set the `CANONICAL_API_KEY` environment variable.") + raise Exception(f"Missing module: {e}") + + +from pipecat.frames.frames import CancelFrame, EndFrame, Frame +from pipecat.processors.audio.audio_buffer_processor import \ + AudioBufferProcessor +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import AIService + +# Multipart upload part size in bytes, cannot be smaller than 5MB +PART_SIZE = 1024 * 1024 * 5 +""" +This class extends AudioBufferProcessor to handle audio processing and uploading +for the Canonical Voice API. +""" + + +class CanonicalMetricsService(AIService): + """ + Initialize a CanonicalAudioProcessor instance. + + This class extends AudioBufferProcessor to handle audio processing and uploading + for the Canonical Voice API. + + Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Canonical Voice system to the call in your system. + assistant (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish + between different assistants and a grouping mechanism for calls. + assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. + output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". + + Attributes: + call_id (str): Stores the unique call identifier. + assistant (str): Stores the assistant identifier. + assistant_speaks_first (bool): Indicates whether the assistant speaks first. + output_dir (str): Directory path for saving temporary audio files. + + The constructor also ensures that the output directory exists. + This class requires a Canonical API key to be set in the CANONICAL_API_KEY environment variable. + """ + + def __init__( + self, + aiohttp_session: aiohttp.ClientSession, + audio_buffer_processor: AudioBufferProcessor, + call_id: str, + assistant: str, + api_key: str, + api_url: str = "https://voiceapp.canonical.chat/api/v1", + assistant_speaks_first: bool = True, + output_dir: str = "recordings"): + super().__init__() + self._aiohttp_session = aiohttp_session + self._audio_buffer_processor = audio_buffer_processor + self._api_key = api_key + self._api_url = api_url + self._call_id = call_id + self._assistant = assistant + self._assistant_speaks_first = assistant_speaks_first + self._output_dir = output_dir + + async def stop(self, frame: EndFrame): + await self._process_audio() + + async def cancel(self, frame: CancelFrame): + await self._process_audio() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) + + async def _process_audio(self): + pipeline = self._audio_buffer_processor + if pipeline._has_audio(): + os.makedirs(self._output_dir, exist_ok=True) + filename = self._get_output_filename() + with BytesIO() as buffer: + with wave.open(buffer, 'wb') as wf: + wf.setnchannels(pipeline._num_channels) + wf.setsampwidth(pipeline._sample_rate // 8000) + wf.setframerate(pipeline._sample_rate) + wf.writeframes(pipeline._audio_buffer) + wave_data = buffer.getvalue() + + async with aiofiles.open(filename, 'wb') as file: + await file.write(wave_data) + + try: + await self._multipart_upload(filename) + pipeline._reset_audio_buffer() + # await aiofiles.os.remove(filename) + except FileNotFoundError: + pass + except Exception as e: + logger.error(f"Failed to upload recording: {e}") + + def _get_output_filename(self): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{self._output_dir}/{timestamp}-{uuid.uuid4().hex}.wav" + + def _request_headers(self): + return { + "Content-Type": "application/json", + "X-Canonical-Api-Key": self._api_key + } + + async def _multipart_upload(self, file_path: str): + upload_request, upload_response = await self._request_upload(file_path) + if upload_request is None or upload_response is None: + return + parts = await self._upload_parts(file_path, upload_response) + if parts is None: + return + await self._upload_complete(parts, upload_request, upload_response) + + async def _request_upload(self, file_path: str) -> Tuple[Dict, Dict]: + filename = os.path.basename(file_path) + filesize = os.path.getsize(file_path) + numparts = int((filesize + PART_SIZE - 1) / PART_SIZE) + + params = { + 'filename': filename, + 'parts': numparts, + 'callId': self._call_id, + 'assistant': { + 'id': self._assistant, + 'speaksFirst': self._assistant_speaks_first + } + } + logger.debug(f"Requesting presigned URLs for {numparts} parts") + response = await self._aiohttp_session.post( + f"{self._api_url}/recording/uploadRequest", + headers=self._request_headers(), + json=params + ) + if not response.ok: + logger.error(f"Failed to get presigned URLs: {await response.text()}") + return None, None + response_json = await response.json() + return params, response_json + + async def _upload_parts( + self, + file_path: str, + upload_response: Dict) -> List[Dict]: + + urls = upload_response['urls'] + parts = [] + try: + async with aiofiles.open(file_path, 'rb') as file: + for partnum, upload_url in enumerate(urls, start=1): + data = await file.read(PART_SIZE) + if not data: + break + + response = await self._aiohttp_session.put(upload_url, data=data) + if not response.ok: + logger.error(f"Failed to upload part {partnum}: {await response.text()}") + return None + + etag = response.headers['ETag'] + parts.append({'partnum': str(partnum), 'etag': etag}) + + except Exception as e: + logger.error(f"Multipart upload aborted, an error occurred: {str(e)}") + return parts + + async def _upload_complete( + self, + parts: List[Dict], + upload_request: Dict, + upload_response: Dict): + + params = { + 'filename': upload_request['filename'], + 'parts': parts, + 'slug': upload_response['slug'], + 'callId': self._call_id, + 'assistant': { + 'id': self._assistant, + 'speaksFirst': self._assistant_speaks_first + } + } + logger.debug(f"Completing upload for {params['filename']}") + logger.debug(f"Slug: {params['slug']}") + response = await self._aiohttp_session.post( + f"{self._api_url}/recording/uploadComplete", + headers=self._request_headers(), + json=params + ) + if not response.ok: + logger.error(f"Failed to complete upload: {await response.text()}") + return + From 387a36dd8a7f7c80be24ac8c234e9693155870e3 Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Mon, 16 Sep 2024 17:43:42 -0700 Subject: [PATCH 3/9] missed a debug print statement --- src/pipecat/processors/audio/audio_buffer_processor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 8edf197e4..27381dc82 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -27,7 +27,6 @@ class AudioBufferProcessor(FrameProcessor): self._sample_rate = None self._assistant_audio = False self._user_audio = False - print(f"ctor::AudioBufferProcessor object memory address: {id(self)}") def _has_audio(self): return ( From 4d81a2ebfe13003e11529bfc4c8dc7ab59fc62ee Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Thu, 3 Oct 2024 14:10:03 -0700 Subject: [PATCH 4/9] nuked the code that marks user audio in favor for InputAudioRawFrame. also moving to stereo instead of mono with the human and bot on their own channel. --- examples/canonical-metrics/bot.py | 5 +- examples/chatbot-audio-recording/bot.py | 6 +- src/pipecat/frames/frames.py | 9 -- .../audio/audio_buffer_processor.py | 103 ++++++++++++------ .../processors/user_marker_processor.py | 19 ---- src/pipecat/services/canonical.py | 13 +-- 6 files changed, 72 insertions(+), 83 deletions(-) delete mode 100644 src/pipecat/processors/user_marker_processor.py diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py index 111c1666e..626af5f62 100644 --- a/examples/canonical-metrics/bot.py +++ b/examples/canonical-metrics/bot.py @@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) from pipecat.processors.audio.audio_buffer_processor import \ AudioBufferProcessor -from pipecat.processors.user_marker_processor import UserMarkerProcessor from pipecat.services.canonical import CanonicalMetricsService from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService @@ -115,14 +114,12 @@ async def main(): assistant="pipecat-chatbot", assistant_speaks_first=True, ) - usermarker = UserMarkerProcessor() pipeline = Pipeline([ transport.input(), # microphone - usermarker, user_response, llm, tts, - audio_buffer_processor, # captures audio into a buffer + audio_buffer_processor, # captures audio into a buffer canonical, # uploads audio buffer to Canonical AI for metrics transport.output(), assistant_response, diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py index b6da9b54d..b1955ddba 100644 --- a/examples/chatbot-audio-recording/bot.py +++ b/examples/chatbot-audio-recording/bot.py @@ -19,8 +19,8 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor -from pipecat.processors.user_marker_processor import UserMarkerProcessor +from pipecat.processors.audio.audio_buffer_processor import \ + AudioBufferProcessor from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -98,10 +98,8 @@ async def main(): assistant_response = LLMAssistantResponseAggregator() audiobuffer = AudioBufferProcessor() - usermarker = UserMarkerProcessor() pipeline = Pipeline([ transport.input(), # microphone - usermarker, # used to mark the user's audio in the pipeline user_response, llm, tts, diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index c39026c31..e2ef78df5 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -285,15 +285,6 @@ class AppFrame(Frame): pass -@dataclass -class UserAudioFrame(AudioRawFrame): - """ - Indicates user audio in the pipeline. - """ - - def __init__(self, frame: AudioRawFrame): - super().__init__(frame.audio, frame.sample_rate, frame.num_channels) - # # System frames # diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 27381dc82..8474466dc 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -1,6 +1,13 @@ -from pipecat.frames.frames import (AudioRawFrame, BotStartedSpeakingFrame, +import wave +from io import BytesIO + +from pipecat.frames.frames import (AudioRawFrame, BotInterruptionFrame, + BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame, - UserAudioFrame, UserStartedSpeakingFrame, + InputAudioRawFrame, OutputAudioRawFrame, + StartInterruptionFrame, + StopInterruptionFrame, + UserStartedSpeakingFrame, UserStoppedSpeakingFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -22,61 +29,85 @@ class AudioBufferProcessor(FrameProcessor): populated when the first audio frame is processed. """ super().__init__() - self._audio_buffer = bytearray() + self._user_audio_buffer = bytearray() + self._assistant_audio_buffer = bytearray() self._num_channels = None self._sample_rate = None self._assistant_audio = False self._user_audio = False + def _buffer_has_audio(self, buffer: bytearray): + return ( + buffer is not None and + len(buffer) > 0 + ) + def _has_audio(self): return ( - self._audio_buffer is not None and - len(self._audio_buffer) > 0 and - self._num_channels is not None and + self._buffer_has_audio(self._user_audio_buffer) and + self._buffer_has_audio(self._assistant_audio_buffer) and self._sample_rate is not None ) def _reset_audio_buffer(self): - self._audio_buffer = bytearray() + self._user_audio_buffer = bytearray() + self._assistant_audio_buffer = bytearray() + + def _merge_audio_buffers(self): + with BytesIO() as buffer: + with wave.open(buffer, 'wb') as wf: + wf.setnchannels(2) + wf.setsampwidth(self._sample_rate // 8000) + wf.setframerate(self._sample_rate) + # Interleave the two audio streams + max_length = max(len(self._user_audio_buffer), + len(self._assistant_audio_buffer)) + interleaved = bytearray(max_length * 2) + + for i in range(0, max_length, 2): + if i < len(self._user_audio_buffer): + interleaved[i * 2] = self._user_audio_buffer[i] + interleaved[i * 2 + 1] = self._user_audio_buffer[i + 1] + else: + interleaved[i * 2] = 0 + interleaved[i * 2 + 1] = 0 + + if i < len(self._assistant_audio_buffer): + interleaved[i * 2 + 2] = self._assistant_audio_buffer[i] + interleaved[i * 2 + 3] = self._assistant_audio_buffer[i + 1] + else: + interleaved[i * 2 + 2] = 0 + interleaved[i * 2 + 3] = 0 + + wf.writeframes(interleaved) + return buffer.getvalue() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): - if self._num_channels is None: - self._num_channels = frame.num_channels - if self._sample_rate is None: - self._sample_rate = frame.sample_rate - - if isinstance(frame, UserStoppedSpeakingFrame): - self._user_audio = False + if (isinstance(frame, AudioRawFrame) and self._sample_rate is None): + self._sample_rate = frame.sample_rate if isinstance(frame, BotStartedSpeakingFrame): self._assistant_audio = True - self._user_audio = False # do not capture user audio if assistant is speaking - if isinstance(frame, BotStoppedSpeakingFrame): + # this handles the case where the user starts speaking and interrupts the bot + if (isinstance(frame, BotStoppedSpeakingFrame) or + isinstance(frame, UserStartedSpeakingFrame)): self._assistant_audio = False - # Capture user audio if assistant is not speaking, even if it's silence, the point - # here is to capture so that the conversation is as close to reality as possible. - # This is important for evaluation and metrics capture. - self._user_audio = True - # only include audio from the user if the user is speaking, this is because audio from the user's - # mic is always coming in. if we include all the user's audio there will be a long latency before - # the user starts speaking because all of the user's silence during the assistant's speech will have been - # added to the buffer. - # - # and include all audio from the assistant - if isinstance(frame, UserAudioFrame) and self._user_audio: - self._audio_buffer.extend(frame.audio) + # include all audio from the user + if isinstance(frame, InputAudioRawFrame): + self._user_audio_buffer.extend(frame.audio) + # Sync the assistant's buffer to the user's buffer by adding silence if needed + if len(self._user_audio_buffer) > len(self._assistant_audio_buffer): + silence_length = len(self._user_audio_buffer) - len(self._assistant_audio_buffer) + silence = b'\x00' * silence_length + self._assistant_audio_buffer.extend(silence) - # include all audio from the assistant - if ( - isinstance(frame, AudioRawFrame) - and not isinstance(frame, UserAudioFrame) - ): - self._audio_buffer.extend(frame.audio) + # if the assistant is speaking, include all audio from the assistant, + if (isinstance(frame, OutputAudioRawFrame)) and self._assistant_audio: + self._assistant_audio_buffer.extend(frame.audio) # do not push the user's audio frame, doing so will result in echo - if not isinstance(frame, UserAudioFrame): + if not isinstance(frame, InputAudioRawFrame): await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/user_marker_processor.py b/src/pipecat/processors/user_marker_processor.py deleted file mode 100644 index cadbfffeb..000000000 --- a/src/pipecat/processors/user_marker_processor.py +++ /dev/null @@ -1,19 +0,0 @@ -from pipecat.frames.frames import AudioRawFrame, Frame, UserAudioFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class UserMarkerProcessor(FrameProcessor): - """ - This class extends FrameProcessor, used to mark the user's audio in the pipeline. - This FrameProcessor must be inserted after transport.input() so that the only - AudioRaw it receives are from the user. - """ - - def __init__(self): - super().__init__() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): - frame = UserAudioFrame(frame) - await self.push_frame(frame, direction) diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py index a973d5481..00990ad16 100644 --- a/src/pipecat/services/canonical.py +++ b/src/pipecat/services/canonical.py @@ -1,8 +1,6 @@ import os import uuid -import wave from datetime import datetime -from io import BytesIO from typing import Dict, List, Tuple import aiohttp @@ -92,19 +90,13 @@ class CanonicalMetricsService(AIService): if pipeline._has_audio(): os.makedirs(self._output_dir, exist_ok=True) filename = self._get_output_filename() - with BytesIO() as buffer: - with wave.open(buffer, 'wb') as wf: - wf.setnchannels(pipeline._num_channels) - wf.setsampwidth(pipeline._sample_rate // 8000) - wf.setframerate(pipeline._sample_rate) - wf.writeframes(pipeline._audio_buffer) - wave_data = buffer.getvalue() + wave_data = pipeline._merge_audio_buffers() async with aiofiles.open(filename, 'wb') as file: await file.write(wave_data) try: - await self._multipart_upload(filename) + # await self._multipart_upload(filename) pipeline._reset_audio_buffer() # await aiofiles.os.remove(filename) except FileNotFoundError: @@ -209,4 +201,3 @@ class CanonicalMetricsService(AIService): if not response.ok: logger.error(f"Failed to complete upload: {await response.text()}") return - From 678e87fd3148b806c7f8db4499dbfef3547c8462 Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Thu, 3 Oct 2024 14:12:23 -0700 Subject: [PATCH 5/9] comment back in some code --- src/pipecat/services/canonical.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py index 00990ad16..8fc415c95 100644 --- a/src/pipecat/services/canonical.py +++ b/src/pipecat/services/canonical.py @@ -96,9 +96,9 @@ class CanonicalMetricsService(AIService): await file.write(wave_data) try: - # await self._multipart_upload(filename) + await self._multipart_upload(filename) pipeline._reset_audio_buffer() - # await aiofiles.os.remove(filename) + await aiofiles.os.remove(filename) except FileNotFoundError: pass except Exception as e: From 522d9319506e2ab5e5b88fcfd875c5286d4a1ba3 Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Fri, 11 Oct 2024 10:33:12 -0700 Subject: [PATCH 6/9] better interruption handling by moving the processors after the transport output --- examples/canonical-metrics/bot.py | 2 +- examples/chatbot-audio-recording/bot.py | 2 +- .../processors/audio/audio_buffer_processor.py | 12 +----------- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py index 626af5f62..5b46b50fc 100644 --- a/examples/canonical-metrics/bot.py +++ b/examples/canonical-metrics/bot.py @@ -119,9 +119,9 @@ async def main(): user_response, llm, tts, + transport.output(), audio_buffer_processor, # captures audio into a buffer canonical, # uploads audio buffer to Canonical AI for metrics - transport.output(), assistant_response, ]) diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py index b1955ddba..2d572fb90 100644 --- a/examples/chatbot-audio-recording/bot.py +++ b/examples/chatbot-audio-recording/bot.py @@ -103,8 +103,8 @@ async def main(): user_response, llm, tts, - audiobuffer, # used to buffer the audio in the pipeline transport.output(), + audiobuffer, # used to buffer the audio in the pipeline assistant_response, ]) diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 8474466dc..1b6dc1856 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -33,8 +33,6 @@ class AudioBufferProcessor(FrameProcessor): self._assistant_audio_buffer = bytearray() self._num_channels = None self._sample_rate = None - self._assistant_audio = False - self._user_audio = False def _buffer_has_audio(self, buffer: bytearray): return ( @@ -87,14 +85,6 @@ class AudioBufferProcessor(FrameProcessor): if (isinstance(frame, AudioRawFrame) and self._sample_rate is None): self._sample_rate = frame.sample_rate - if isinstance(frame, BotStartedSpeakingFrame): - self._assistant_audio = True - - # this handles the case where the user starts speaking and interrupts the bot - if (isinstance(frame, BotStoppedSpeakingFrame) or - isinstance(frame, UserStartedSpeakingFrame)): - self._assistant_audio = False - # include all audio from the user if isinstance(frame, InputAudioRawFrame): self._user_audio_buffer.extend(frame.audio) @@ -105,7 +95,7 @@ class AudioBufferProcessor(FrameProcessor): self._assistant_audio_buffer.extend(silence) # if the assistant is speaking, include all audio from the assistant, - if (isinstance(frame, OutputAudioRawFrame)) and self._assistant_audio: + if (isinstance(frame, OutputAudioRawFrame)): self._assistant_audio_buffer.extend(frame.audio) # do not push the user's audio frame, doing so will result in echo From 083d221dd291b3c91cfc01c645e050abbfd2148a Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Fri, 11 Oct 2024 11:29:01 -0700 Subject: [PATCH 7/9] PR feedback --- .../processors/audio/audio_buffer_processor.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 1b6dc1856..dabc24e38 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -14,7 +14,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class AudioBufferProcessor(FrameProcessor): - def __init__(self): + def __init__(self, **kwargs): """ Initialize the AudioBufferProcessor. @@ -22,13 +22,11 @@ class AudioBufferProcessor(FrameProcessor): - audio_buffer: A bytearray to store incoming audio data. - num_channels: The number of audio channels (initialized as None). - sample_rate: The sample rate of the audio (initialized as None). - - assistant_audio: A boolean flag to indicate if assistant audio is being processed. - - user_audio: A boolean flag to indicate if user audio is being processed. The num_channels and sample_rate are set to None initially and will be populated when the first audio frame is processed. """ - super().__init__() + super().__init__(**kwargs) self._user_audio_buffer = bytearray() self._assistant_audio_buffer = bytearray() self._num_channels = None @@ -55,7 +53,7 @@ class AudioBufferProcessor(FrameProcessor): with BytesIO() as buffer: with wave.open(buffer, 'wb') as wf: wf.setnchannels(2) - wf.setsampwidth(self._sample_rate // 8000) + wf.setsampwidth(2) wf.setframerate(self._sample_rate) # Interleave the two audio streams max_length = max(len(self._user_audio_buffer), @@ -82,7 +80,7 @@ class AudioBufferProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if (isinstance(frame, AudioRawFrame) and self._sample_rate is None): + if isinstance(frame, AudioRawFrame) and self._sample_rate is None: self._sample_rate = frame.sample_rate # include all audio from the user @@ -95,7 +93,7 @@ class AudioBufferProcessor(FrameProcessor): self._assistant_audio_buffer.extend(silence) # if the assistant is speaking, include all audio from the assistant, - if (isinstance(frame, OutputAudioRawFrame)): + if isinstance(frame, OutputAudioRawFrame): self._assistant_audio_buffer.extend(frame.audio) # do not push the user's audio frame, doing so will result in echo From 79c8aa2c4a089fe3ad3ccbd9199b817467465ff2 Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Fri, 11 Oct 2024 11:35:02 -0700 Subject: [PATCH 8/9] ruff formatting --- examples/canonical-metrics/bot.py | 38 ++++----- examples/canonical-metrics/runner.py | 20 +++-- examples/canonical-metrics/server.py | 44 ++++------ examples/chatbot-audio-recording/bot.py | 36 ++++---- examples/chatbot-audio-recording/runner.py | 20 +++-- examples/chatbot-audio-recording/server.py | 44 ++++------ .../audio/audio_buffer_processor.py | 40 ++++----- src/pipecat/services/canonical.py | 85 ++++++++----------- 8 files changed, 147 insertions(+), 180 deletions(-) diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py index 5b46b50fc..efad7710c 100644 --- a/examples/canonical-metrics/bot.py +++ b/examples/canonical-metrics/bot.py @@ -19,9 +19,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( - LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.audio.audio_buffer_processor import \ - AudioBufferProcessor + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.services.canonical import CanonicalMetricsService from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService @@ -58,18 +59,16 @@ async def main(): # tier="nova", # model="2-general" # ) - ) + ), ) tts = ElevenLabsTTSService( api_key=os.getenv("ELEVENLABS_API_KEY"), - # # English # voice_id="cgSgspJ2msm6clMCkdW9", aiohttp_session=session, - # # Spanish # @@ -77,9 +76,7 @@ async def main(): # voice_id="gD1IexrzCvsXPHUuT0s3", ) - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") messages = [ { @@ -88,7 +85,6 @@ async def main(): # English # "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your responses to 12 words or fewer.", - # # Spanish # @@ -114,16 +110,18 @@ async def main(): assistant="pipecat-chatbot", assistant_speaks_first=True, ) - pipeline = Pipeline([ - transport.input(), # microphone - user_response, - llm, - tts, - transport.output(), - audio_buffer_processor, # captures audio into a buffer - canonical, # uploads audio buffer to Canonical AI for metrics - assistant_response, - ]) + pipeline = Pipeline( + [ + transport.input(), # microphone + user_response, + llm, + tts, + transport.output(), + audio_buffer_processor, # captures audio into a buffer + canonical, # uploads audio buffer to Canonical AI for metrics + assistant_response, + ] + ) task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) diff --git a/examples/canonical-metrics/runner.py b/examples/canonical-metrics/runner.py index 7507d28d6..a0b46ca36 100644 --- a/examples/canonical-metrics/runner.py +++ b/examples/canonical-metrics/runner.py @@ -4,21 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import aiohttp import argparse import os +import aiohttp + from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper async def configure(aiohttp_session: aiohttp.ClientSession): parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") parser.add_argument( - "-u", - "--url", - type=str, - required=False, - help="URL of the Daily room to join") + "-u", "--url", type=str, required=False, help="URL of the Daily room to join" + ) parser.add_argument( "-k", "--apikey", @@ -34,15 +32,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession): if not url: raise Exception( - "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.") + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL." + ) if not key: - raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.") + raise Exception( + "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers." + ) daily_rest_helper = DailyRESTHelper( daily_api_key=key, daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), - aiohttp_session=aiohttp_session + aiohttp_session=aiohttp_session, ) # Create a meeting token for the given room with an expiration 1 hour in @@ -52,3 +53,4 @@ async def configure(aiohttp_session: aiohttp.ClientSession): token = await daily_rest_helper.get_token(url, expiry_time) return (url, token) + return (url, token) diff --git a/examples/canonical-metrics/server.py b/examples/canonical-metrics/server.py index 2c717ffa2..62ce899be 100644 --- a/examples/canonical-metrics/server.py +++ b/examples/canonical-metrics/server.py @@ -15,8 +15,7 @@ from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, RedirectResponse -from pipecat.transports.services.helpers.daily_rest import (DailyRESTHelper, - DailyRoomParams) +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams MAX_BOTS_PER_ROOM = 1 @@ -41,13 +40,14 @@ async def lifespan(app: FastAPI): aiohttp_session = aiohttp.ClientSession() daily_helpers["rest"] = DailyRESTHelper( daily_api_key=os.getenv("DAILY_API_KEY", ""), - daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'), - aiohttp_session=aiohttp_session + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, ) yield await aiohttp_session.close() cleanup() + app = FastAPI(lifespan=lifespan) app.add_middleware( @@ -68,37 +68,34 @@ async def start_agent(request: Request): if not room.url: raise HTTPException( status_code=500, - detail="Missing 'room' property in request data. Cannot start agent without a target room!") + detail="Missing 'room' property in request data. Cannot start agent without a target room!", + ) # Check if there is already an existing process running in this room num_bots_in_room = sum( - 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None) + 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None + ) if num_bots_in_room >= MAX_BOTS_PER_ROOM: - raise HTTPException( - status_code=500, detail=f"Max bot limited reach for room: {room.url}") + raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}") # Get the token for the room token = await daily_helpers["rest"].get_token(room.url) if not token: - raise HTTPException( - status_code=500, detail=f"Failed to get token for room: {room.url}") + raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}") # Spawn a new agent, and join the user session # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) try: proc = subprocess.Popen( - [ - f"python3 -m bot -u {room.url} -t {token}" - ], + [f"python3 -m bot -u {room.url} -t {token}"], shell=True, bufsize=1, - cwd=os.path.dirname(os.path.abspath(__file__)) + cwd=os.path.dirname(os.path.abspath(__file__)), ) bot_procs[proc.pid] = (proc, room.url) except Exception as e: - raise HTTPException( - status_code=500, detail=f"Failed to start subprocess: {e}") + raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") return RedirectResponse(room.url) @@ -110,8 +107,7 @@ def get_status(pid: int): # If the subprocess doesn't exist, return an error if not proc: - raise HTTPException( - status_code=404, detail=f"Bot with process id: {pid} not found") + raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found") # Check the status of the subprocess if proc[0].poll() is None: @@ -128,14 +124,10 @@ if __name__ == "__main__": default_host = os.getenv("HOST", "0.0.0.0") default_port = int(os.getenv("FAST_API_PORT", "7860")) - parser = argparse.ArgumentParser( - description="Daily Storyteller FastAPI server") - parser.add_argument("--host", type=str, - default=default_host, help="Host address") - parser.add_argument("--port", type=int, - default=default_port, help="Port number") - parser.add_argument("--reload", action="store_true", - help="Reload code on change") + parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, default=default_host, help="Host address") + parser.add_argument("--port", type=int, default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", help="Reload code on change") config = parser.parse_args() diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py index 2d572fb90..6acdb08e6 100644 --- a/examples/chatbot-audio-recording/bot.py +++ b/examples/chatbot-audio-recording/bot.py @@ -18,9 +18,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( - LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.audio.audio_buffer_processor import \ - AudioBufferProcessor + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -56,18 +57,16 @@ async def main(): # tier="nova", # model="2-general" # ) - ) + ), ) tts = ElevenLabsTTSService( api_key=os.getenv("ELEVENLABS_API_KEY"), - # # English # voice_id="cgSgspJ2msm6clMCkdW9", aiohttp_session=session, - # # Spanish # @@ -75,9 +74,7 @@ async def main(): # voice_id="gD1IexrzCvsXPHUuT0s3", ) - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") messages = [ { @@ -86,7 +83,6 @@ async def main(): # English # "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your response to 12 words or fewer.", - # # Spanish # @@ -98,15 +94,17 @@ async def main(): assistant_response = LLMAssistantResponseAggregator() audiobuffer = AudioBufferProcessor() - pipeline = Pipeline([ - transport.input(), # microphone - user_response, - llm, - tts, - transport.output(), - audiobuffer, # used to buffer the audio in the pipeline - assistant_response, - ]) + pipeline = Pipeline( + [ + transport.input(), # microphone + user_response, + llm, + tts, + transport.output(), + audiobuffer, # used to buffer the audio in the pipeline + assistant_response, + ] + ) task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) diff --git a/examples/chatbot-audio-recording/runner.py b/examples/chatbot-audio-recording/runner.py index 7507d28d6..a0b46ca36 100644 --- a/examples/chatbot-audio-recording/runner.py +++ b/examples/chatbot-audio-recording/runner.py @@ -4,21 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import aiohttp import argparse import os +import aiohttp + from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper async def configure(aiohttp_session: aiohttp.ClientSession): parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") parser.add_argument( - "-u", - "--url", - type=str, - required=False, - help="URL of the Daily room to join") + "-u", "--url", type=str, required=False, help="URL of the Daily room to join" + ) parser.add_argument( "-k", "--apikey", @@ -34,15 +32,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession): if not url: raise Exception( - "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.") + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL." + ) if not key: - raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.") + raise Exception( + "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers." + ) daily_rest_helper = DailyRESTHelper( daily_api_key=key, daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), - aiohttp_session=aiohttp_session + aiohttp_session=aiohttp_session, ) # Create a meeting token for the given room with an expiration 1 hour in @@ -52,3 +53,4 @@ async def configure(aiohttp_session: aiohttp.ClientSession): token = await daily_rest_helper.get_token(url, expiry_time) return (url, token) + return (url, token) diff --git a/examples/chatbot-audio-recording/server.py b/examples/chatbot-audio-recording/server.py index 2c717ffa2..62ce899be 100644 --- a/examples/chatbot-audio-recording/server.py +++ b/examples/chatbot-audio-recording/server.py @@ -15,8 +15,7 @@ from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, RedirectResponse -from pipecat.transports.services.helpers.daily_rest import (DailyRESTHelper, - DailyRoomParams) +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams MAX_BOTS_PER_ROOM = 1 @@ -41,13 +40,14 @@ async def lifespan(app: FastAPI): aiohttp_session = aiohttp.ClientSession() daily_helpers["rest"] = DailyRESTHelper( daily_api_key=os.getenv("DAILY_API_KEY", ""), - daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'), - aiohttp_session=aiohttp_session + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, ) yield await aiohttp_session.close() cleanup() + app = FastAPI(lifespan=lifespan) app.add_middleware( @@ -68,37 +68,34 @@ async def start_agent(request: Request): if not room.url: raise HTTPException( status_code=500, - detail="Missing 'room' property in request data. Cannot start agent without a target room!") + detail="Missing 'room' property in request data. Cannot start agent without a target room!", + ) # Check if there is already an existing process running in this room num_bots_in_room = sum( - 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None) + 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None + ) if num_bots_in_room >= MAX_BOTS_PER_ROOM: - raise HTTPException( - status_code=500, detail=f"Max bot limited reach for room: {room.url}") + raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}") # Get the token for the room token = await daily_helpers["rest"].get_token(room.url) if not token: - raise HTTPException( - status_code=500, detail=f"Failed to get token for room: {room.url}") + raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}") # Spawn a new agent, and join the user session # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) try: proc = subprocess.Popen( - [ - f"python3 -m bot -u {room.url} -t {token}" - ], + [f"python3 -m bot -u {room.url} -t {token}"], shell=True, bufsize=1, - cwd=os.path.dirname(os.path.abspath(__file__)) + cwd=os.path.dirname(os.path.abspath(__file__)), ) bot_procs[proc.pid] = (proc, room.url) except Exception as e: - raise HTTPException( - status_code=500, detail=f"Failed to start subprocess: {e}") + raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") return RedirectResponse(room.url) @@ -110,8 +107,7 @@ def get_status(pid: int): # If the subprocess doesn't exist, return an error if not proc: - raise HTTPException( - status_code=404, detail=f"Bot with process id: {pid} not found") + raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found") # Check the status of the subprocess if proc[0].poll() is None: @@ -128,14 +124,10 @@ if __name__ == "__main__": default_host = os.getenv("HOST", "0.0.0.0") default_port = int(os.getenv("FAST_API_PORT", "7860")) - parser = argparse.ArgumentParser( - description="Daily Storyteller FastAPI server") - parser.add_argument("--host", type=str, - default=default_host, help="Host address") - parser.add_argument("--port", type=int, - default=default_port, help="Port number") - parser.add_argument("--reload", action="store_true", - help="Reload code on change") + parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, default=default_host, help="Host address") + parser.add_argument("--port", type=int, default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", help="Reload code on change") config = parser.parse_args() diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index dabc24e38..0c07d6815 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -1,19 +1,23 @@ import wave from io import BytesIO -from pipecat.frames.frames import (AudioRawFrame, BotInterruptionFrame, - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, Frame, - InputAudioRawFrame, OutputAudioRawFrame, - StartInterruptionFrame, - StopInterruptionFrame, - UserStartedSpeakingFrame, - UserStoppedSpeakingFrame) +from pipecat.frames.frames import ( + AudioRawFrame, + BotInterruptionFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, + StartInterruptionFrame, + StopInterruptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class AudioBufferProcessor(FrameProcessor): - def __init__(self, **kwargs): """ Initialize the AudioBufferProcessor. @@ -33,16 +37,13 @@ class AudioBufferProcessor(FrameProcessor): self._sample_rate = None def _buffer_has_audio(self, buffer: bytearray): - return ( - buffer is not None and - len(buffer) > 0 - ) + return buffer is not None and len(buffer) > 0 def _has_audio(self): return ( - self._buffer_has_audio(self._user_audio_buffer) and - self._buffer_has_audio(self._assistant_audio_buffer) and - self._sample_rate is not None + self._buffer_has_audio(self._user_audio_buffer) + and self._buffer_has_audio(self._assistant_audio_buffer) + and self._sample_rate is not None ) def _reset_audio_buffer(self): @@ -51,13 +52,12 @@ class AudioBufferProcessor(FrameProcessor): def _merge_audio_buffers(self): with BytesIO() as buffer: - with wave.open(buffer, 'wb') as wf: + with wave.open(buffer, "wb") as wf: wf.setnchannels(2) wf.setsampwidth(2) wf.setframerate(self._sample_rate) # Interleave the two audio streams - max_length = max(len(self._user_audio_buffer), - len(self._assistant_audio_buffer)) + max_length = max(len(self._user_audio_buffer), len(self._assistant_audio_buffer)) interleaved = bytearray(max_length * 2) for i in range(0, max_length, 2): @@ -89,7 +89,7 @@ class AudioBufferProcessor(FrameProcessor): # Sync the assistant's buffer to the user's buffer by adding silence if needed if len(self._user_audio_buffer) > len(self._assistant_audio_buffer): silence_length = len(self._user_audio_buffer) - len(self._assistant_audio_buffer) - silence = b'\x00' * silence_length + silence = b"\x00" * silence_length self._assistant_audio_buffer.extend(silence) # if the assistant is speaking, include all audio from the assistant, diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py index 8fc415c95..412e06dab 100644 --- a/src/pipecat/services/canonical.py +++ b/src/pipecat/services/canonical.py @@ -12,14 +12,14 @@ try: except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error( - "In order to use Canonical Metrics, you need to `pip install pipecat-ai[canonical]`. " + - "Also, set the `CANONICAL_API_KEY` environment variable.") + "In order to use Canonical Metrics, you need to `pip install pipecat-ai[canonical]`. " + + "Also, set the `CANONICAL_API_KEY` environment variable." + ) raise Exception(f"Missing module: {e}") from pipecat.frames.frames import CancelFrame, EndFrame, Frame -from pipecat.processors.audio.audio_buffer_processor import \ - AudioBufferProcessor +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import AIService @@ -56,15 +56,16 @@ class CanonicalMetricsService(AIService): """ def __init__( - self, - aiohttp_session: aiohttp.ClientSession, - audio_buffer_processor: AudioBufferProcessor, - call_id: str, - assistant: str, - api_key: str, - api_url: str = "https://voiceapp.canonical.chat/api/v1", - assistant_speaks_first: bool = True, - output_dir: str = "recordings"): + self, + aiohttp_session: aiohttp.ClientSession, + audio_buffer_processor: AudioBufferProcessor, + call_id: str, + assistant: str, + api_key: str, + api_url: str = "https://voiceapp.canonical.chat/api/v1", + assistant_speaks_first: bool = True, + output_dir: str = "recordings", + ): super().__init__() self._aiohttp_session = aiohttp_session self._audio_buffer_processor = audio_buffer_processor @@ -92,7 +93,7 @@ class CanonicalMetricsService(AIService): filename = self._get_output_filename() wave_data = pipeline._merge_audio_buffers() - async with aiofiles.open(filename, 'wb') as file: + async with aiofiles.open(filename, "wb") as file: await file.write(wave_data) try: @@ -109,10 +110,7 @@ class CanonicalMetricsService(AIService): return f"{self._output_dir}/{timestamp}-{uuid.uuid4().hex}.wav" def _request_headers(self): - return { - "Content-Type": "application/json", - "X-Canonical-Api-Key": self._api_key - } + return {"Content-Type": "application/json", "X-Canonical-Api-Key": self._api_key} async def _multipart_upload(self, file_path: str): upload_request, upload_response = await self._request_upload(file_path) @@ -129,19 +127,14 @@ class CanonicalMetricsService(AIService): numparts = int((filesize + PART_SIZE - 1) / PART_SIZE) params = { - 'filename': filename, - 'parts': numparts, - 'callId': self._call_id, - 'assistant': { - 'id': self._assistant, - 'speaksFirst': self._assistant_speaks_first - } + "filename": filename, + "parts": numparts, + "callId": self._call_id, + "assistant": {"id": self._assistant, "speaksFirst": self._assistant_speaks_first}, } logger.debug(f"Requesting presigned URLs for {numparts} parts") response = await self._aiohttp_session.post( - f"{self._api_url}/recording/uploadRequest", - headers=self._request_headers(), - json=params + f"{self._api_url}/recording/uploadRequest", headers=self._request_headers(), json=params ) if not response.ok: logger.error(f"Failed to get presigned URLs: {await response.text()}") @@ -149,15 +142,11 @@ class CanonicalMetricsService(AIService): response_json = await response.json() return params, response_json - async def _upload_parts( - self, - file_path: str, - upload_response: Dict) -> List[Dict]: - - urls = upload_response['urls'] + async def _upload_parts(self, file_path: str, upload_response: Dict) -> List[Dict]: + urls = upload_response["urls"] parts = [] try: - async with aiofiles.open(file_path, 'rb') as file: + async with aiofiles.open(file_path, "rb") as file: for partnum, upload_url in enumerate(urls, start=1): data = await file.read(PART_SIZE) if not data: @@ -168,35 +157,29 @@ class CanonicalMetricsService(AIService): logger.error(f"Failed to upload part {partnum}: {await response.text()}") return None - etag = response.headers['ETag'] - parts.append({'partnum': str(partnum), 'etag': etag}) + etag = response.headers["ETag"] + parts.append({"partnum": str(partnum), "etag": etag}) except Exception as e: logger.error(f"Multipart upload aborted, an error occurred: {str(e)}") return parts async def _upload_complete( - self, - parts: List[Dict], - upload_request: Dict, - upload_response: Dict): - + self, parts: List[Dict], upload_request: Dict, upload_response: Dict + ): params = { - 'filename': upload_request['filename'], - 'parts': parts, - 'slug': upload_response['slug'], - 'callId': self._call_id, - 'assistant': { - 'id': self._assistant, - 'speaksFirst': self._assistant_speaks_first - } + "filename": upload_request["filename"], + "parts": parts, + "slug": upload_response["slug"], + "callId": self._call_id, + "assistant": {"id": self._assistant, "speaksFirst": self._assistant_speaks_first}, } logger.debug(f"Completing upload for {params['filename']}") logger.debug(f"Slug: {params['slug']}") response = await self._aiohttp_session.post( f"{self._api_url}/recording/uploadComplete", headers=self._request_headers(), - json=params + json=params, ) if not response.ok: logger.error(f"Failed to complete upload: {await response.text()}") From 4330374ba41c9339aae184dafce990f541e584a0 Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Fri, 11 Oct 2024 12:01:51 -0700 Subject: [PATCH 9/9] passing kwargs and forcing keyword-only arguments --- src/pipecat/services/canonical.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py index 412e06dab..0f4fb6a2f 100644 --- a/src/pipecat/services/canonical.py +++ b/src/pipecat/services/canonical.py @@ -57,6 +57,7 @@ class CanonicalMetricsService(AIService): def __init__( self, + *, aiohttp_session: aiohttp.ClientSession, audio_buffer_processor: AudioBufferProcessor, call_id: str, @@ -65,8 +66,9 @@ class CanonicalMetricsService(AIService): api_url: str = "https://voiceapp.canonical.chat/api/v1", assistant_speaks_first: bool = True, output_dir: str = "recordings", + **kwargs, ): - super().__init__() + super().__init__(**kwargs) self._aiohttp_session = aiohttp_session self._audio_buffer_processor = audio_buffer_processor self._api_key = api_key