From 62fc95300bc1e927b59843f453de7f8fd76cb5da Mon Sep 17 00:00:00 2001 From: Vaibhav159 Date: Fri, 13 Dec 2024 01:09:47 +0530 Subject: [PATCH 1/4] adding livekit audio and chat version --- .../foundational/28-livekit-audio-chat.py | 211 ++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 examples/foundational/28-livekit-audio-chat.py diff --git a/examples/foundational/28-livekit-audio-chat.py b/examples/foundational/28-livekit-audio-chat.py new file mode 100644 index 000000000..20be7becb --- /dev/null +++ b/examples/foundational/28-livekit-audio-chat.py @@ -0,0 +1,211 @@ +import argparse +import asyncio +import json +import os +import sys + +import aiohttp +from deepgram import LiveOptions + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + TextFrame, + BotInterruptionFrame, + UserStartedSpeakingFrame, + TranscriptionFrame, + UserStoppedSpeakingFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask, PipelineParams +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport + +from livekit import api + +from loguru import logger + +from dotenv import load_dotenv + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +DESIRED_SAMPLE_RATE = 16000 + + +def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str: + token = api.AccessToken(api_key, api_secret) + token.with_identity(participant_name).with_name(participant_name).with_grants( + api.VideoGrants( + room_join=True, + room=room_name, + ) + ) + + return token.to_jwt() + + +def generate_token_with_agent( + room_name: str, participant_name: str, api_key: str, api_secret: str +) -> str: + token = api.AccessToken(api_key, api_secret) + token.with_identity(participant_name).with_name(participant_name).with_grants( + api.VideoGrants( + room_join=True, + room=room_name, + agent=True, # This is the only difference, this makes livekit client know agent has joined + ) + ) + + return token.to_jwt() + + +async def configure_livekit(): + parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample") + parser.add_argument( + "-r", "--room", type=str, required=False, help="Name of the LiveKit room to join" + ) + parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server") + + args, unknown = parser.parse_known_args() + + room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME") + url = args.url or os.getenv("LIVEKIT_URL") + api_key = os.getenv("LIVEKIT_API_KEY") + api_secret = os.getenv("LIVEKIT_API_SECRET") + + if not room_name: + raise Exception( + "No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment." + ) + + if not url: + raise Exception( + "No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment." + ) + + if not api_key or not api_secret: + raise Exception( + "LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables." + ) + + token = generate_token_with_agent(room_name, "Say One Thing", api_key, api_secret) + + user_token = generate_token(room_name, "User", api_key, api_secret) + logger.info(f"User token: {user_token}") + + return (url, token, room_name) + + +async def main(): + async with aiohttp.ClientSession() as session: + (url, token, room_name) = await configure_livekit() + + transport = LiveKitTransport( + url=url, + token=token, + room_name=room_name, + params=LiveKitParams( + audio_in_channels=1, + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=DESIRED_SAMPLE_RATE, + audio_out_sample_rate=DESIRED_SAMPLE_RATE, + vad_analyzer=SileroVADAnalyzer(), + vad_enabled=True, + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService( + api_key=os.getenv("DEEPGRAM_API_KEY"), + live_options=LiveOptions( + sample_rate=DESIRED_SAMPLE_RATE, + vad_events=True, + ), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + sample_rate=DESIRED_SAMPLE_RATE, + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. " + "Your goal is to demonstrate your capabilities in a succinct way. " + "Your output will be converted to audio so don't include special characters in your answers. " + "Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + runner = PipelineRunner() + + task = PipelineTask( + Pipeline( + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ), + params=PipelineParams( + allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True + ), + ) + + # Register an event handler so we can play the audio when the + # participant joins. + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant_id): + await asyncio.sleep(1) + await task.queue_frame( + TextFrame( + "Hello there! How are you doing today? Would you like to talk about the weather?" + ) + ) + + # Register an event handler to receive data from the participant via text chat + # in the LiveKit room. This will be used to as transcription frames and + # interrupt the bot and pass it to llm for processing and + # then pass back to the participant as audio output. + @transport.event_handler("on_data_received") + async def on_data_received(transport, data, participant_id): + logger.info(f"Received data from participant {participant_id}: {data}") + # convert data from bytes to string + json_data = json.loads(data) + + await task.queue_frames( + [ + BotInterruptionFrame(), + UserStartedSpeakingFrame(), + TranscriptionFrame( + user_id=participant_id, + timestamp=json_data["timestamp"], + text=json_data["message"], + ), + UserStoppedSpeakingFrame(), + ], + ) + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) From 87670067d7ab38b2202060fb574d8688790e4891 Mon Sep 17 00:00:00 2001 From: Vaibhav159 Date: Mon, 6 Jan 2025 22:03:11 +0530 Subject: [PATCH 2/4] adding changelog --- CHANGELOG.md | 7 +++++++ .../{28-livekit-audio-chat.py => 29-livekit-audio-chat.py} | 0 2 files changed, 7 insertions(+) rename examples/foundational/{28-livekit-audio-chat.py => 29-livekit-audio-chat.py} (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5610525b5..b20c280b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +- Added new foundational examples for `LiveKitTransportLayer` - + + - `29-livekit-audio-chat.py` - Supports both Audio to Audio and Text to Audio + chat pipelines. + ## [0.0.52] - 2024-12-24 ### Added diff --git a/examples/foundational/28-livekit-audio-chat.py b/examples/foundational/29-livekit-audio-chat.py similarity index 100% rename from examples/foundational/28-livekit-audio-chat.py rename to examples/foundational/29-livekit-audio-chat.py From 5138017b57a4c8f01c5cedd1ee5141fc178924a7 Mon Sep 17 00:00:00 2001 From: Vaibhav159 Date: Mon, 6 Jan 2025 22:07:59 +0530 Subject: [PATCH 3/4] ruff changes --- examples/foundational/29-livekit-audio-chat.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/examples/foundational/29-livekit-audio-chat.py b/examples/foundational/29-livekit-audio-chat.py index 20be7becb..5acfff82e 100644 --- a/examples/foundational/29-livekit-audio-chat.py +++ b/examples/foundational/29-livekit-audio-chat.py @@ -6,30 +6,27 @@ import sys import aiohttp from deepgram import LiveOptions +from dotenv import load_dotenv +from livekit import api +from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( - TextFrame, BotInterruptionFrame, - UserStartedSpeakingFrame, + TextFrame, TranscriptionFrame, + UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineTask, PipelineParams +from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.cartesia import CartesiaTTSService from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport -from livekit import api - -from loguru import logger - -from dotenv import load_dotenv - load_dotenv(override=True) logger.remove(0) @@ -99,7 +96,7 @@ async def configure_livekit(): user_token = generate_token(room_name, "User", api_key, api_secret) logger.info(f"User token: {user_token}") - return (url, token, room_name) + return url, token, room_name async def main(): From b3b7a5f023b3a48d104c9209db095153beee3034 Mon Sep 17 00:00:00 2001 From: Vaibhav159 Date: Mon, 6 Jan 2025 22:10:46 +0530 Subject: [PATCH 4/4] adding 2025 license --- examples/foundational/29-livekit-audio-chat.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/examples/foundational/29-livekit-audio-chat.py b/examples/foundational/29-livekit-audio-chat.py index 5acfff82e..522b03ede 100644 --- a/examples/foundational/29-livekit-audio-chat.py +++ b/examples/foundational/29-livekit-audio-chat.py @@ -1,3 +1,9 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import argparse import asyncio import json