From 318d6f042b57f8b47cd426df454476d266220dd0 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 12 May 2025 12:19:26 -0500 Subject: [PATCH 1/4] examples: add text chatbot example using small webrtc transport examples: send small webrtc UI updates in RTVIServerMessageFrame add explanation about RTVI server messages being specific to small-webrtc-prebuilt UI --- examples/foundational/41-text-only-webrtc.py | 167 +++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 examples/foundational/41-text-only-webrtc.py diff --git a/examples/foundational/41-text-only-webrtc.py b/examples/foundational/41-text-only-webrtc.py new file mode 100644 index 000000000..4f10135b6 --- /dev/null +++ b/examples/foundational/41-text-only-webrtc.py @@ -0,0 +1,167 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import asyncio +import io +import os +import re +import shutil +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from PIL import Image + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + LLMMessagesAppendFrame, + URLImageRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frameworks.rtvi import ( + ActionResult, + RTVIAction, + RTVIActionArgument, + RTVIConfig, + RTVIObserver, + RTVIProcessor, + RTVIServerMessageFrame, +) +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams(), + ) + + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + async def action_llm_append_to_messages_handler( + rtvi: RTVIProcessor, service: str, arguments: dict[str, any] + ) -> ActionResult: + run_immediately = ( + arguments["run_immediately"] if "run_immediately" in arguments else True + ) + + if run_immediately: + await rtvi.interrupt_bot() + + # We just interrupted the bot so it should be fine to use the + # context directly instead of through frame. + if "messages" in arguments and arguments["messages"]: + mess = arguments["messages"] + frame = LLMMessagesAppendFrame(messages=arguments["messages"]) + await rtvi.push_frame(frame) + + if run_immediately: + frame = context_aggregator.user().get_context_frame() + await rtvi.push_frame(frame) + + return True + + action_llm_append_to_messages = RTVIAction( + service="llm", + action="append_to_messages", + result="bool", + arguments=[ + RTVIActionArgument(name="messages", type="array"), + RTVIActionArgument(name="run_immediately", type="bool"), + ], + handler=action_llm_append_to_messages_handler, + ) + + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + rtvi.register_action(action_llm_append_to_messages) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + context_aggregator.user(), + llm, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) + + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info("Pipecat client ready.") + await rtvi.set_bot_ready() + + # This block is frontend UI specific + # These messages are intended for small webrtc UI to only handle text + # https://github.com/pipecat-ai/small-webrtc-prebuilt + messages = { + "show_text_container": True, + "show_video_container": False, + "show_debug_container": False, + } + rtvi_frame = RTVIServerMessageFrame(data=messages) + await task.queue_frames([rtvi_frame]) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() From cc0819b70906cfbb1861f550db161797ea15a202 Mon Sep 17 00:00:00 2001 From: vipyne Date: Wed, 21 May 2025 14:40:19 -0500 Subject: [PATCH 2/4] examples: add text and audio over webrtc transport update filenames add high level comments to 41* examples --- ...only-webrtc.py => 41a-text-only-webrtc.py} | 16 +- .../foundational/41b-text-and-audio-webrtc.py | 184 ++++++++++++++++++ 2 files changed, 194 insertions(+), 6 deletions(-) rename examples/foundational/{41-text-only-webrtc.py => 41a-text-only-webrtc.py} (89%) create mode 100644 examples/foundational/41b-text-and-audio-webrtc.py diff --git a/examples/foundational/41-text-only-webrtc.py b/examples/foundational/41a-text-only-webrtc.py similarity index 89% rename from examples/foundational/41-text-only-webrtc.py rename to examples/foundational/41a-text-only-webrtc.py index 4f10135b6..1bc3389fb 100644 --- a/examples/foundational/41-text-only-webrtc.py +++ b/examples/foundational/41a-text-only-webrtc.py @@ -45,6 +45,10 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) +# This is an example of a text-only chatbot using small webrtc tranport. +# It uses the small webrtc transport prebuilt web UI. +# https://github.com/pipecat-ai/small-webrtc-prebuilt + async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): logger.info(f"Starting bot") @@ -78,12 +82,12 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac if run_immediately: await rtvi.interrupt_bot() - # We just interrupted the bot so it should be fine to use the - # context directly instead of through frame. - if "messages" in arguments and arguments["messages"]: - mess = arguments["messages"] - frame = LLMMessagesAppendFrame(messages=arguments["messages"]) - await rtvi.push_frame(frame) + # We just interrupted the bot so it should be fine to use the + # context directly instead of through frame. + if "messages" in arguments and arguments["messages"]: + mess = arguments["messages"] + frame = LLMMessagesAppendFrame(messages=arguments["messages"]) + await rtvi.push_frame(frame) if run_immediately: frame = context_aggregator.user().get_context_frame() diff --git a/examples/foundational/41b-text-and-audio-webrtc.py b/examples/foundational/41b-text-and-audio-webrtc.py new file mode 100644 index 000000000..2629198e3 --- /dev/null +++ b/examples/foundational/41b-text-and-audio-webrtc.py @@ -0,0 +1,184 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import asyncio +import io +import os +import re +import shutil +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from PIL import Image + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + LLMMessagesAppendFrame, + URLImageRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frameworks.rtvi import ( + ActionResult, + RTVIAction, + RTVIActionArgument, + RTVIConfig, + RTVIObserver, + RTVIProcessor, + RTVIServerMessageFrame, +) +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + +# This is an example of a chatbot in which a user can speak and/or type text to communicate with the bot. +# It uses the small webrtc transport prebuilt web UI. +# https://github.com/pipecat-ai/small-webrtc-prebuilt + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121" + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user says in a creative and helpful way. Explain to the User they can speak or type text to communicate with you.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + async def action_llm_append_to_messages_handler( + rtvi: RTVIProcessor, service: str, arguments: dict[str, any] + ) -> ActionResult: + run_immediately = ( + arguments["run_immediately"] if "run_immediately" in arguments else True + ) + + if run_immediately: + await rtvi.interrupt_bot() + + # We just interrupted the bot so it should be fine to use the + # context directly instead of through frame. + if "messages" in arguments and arguments["messages"]: + mess = arguments["messages"] + frame = LLMMessagesAppendFrame(messages=arguments["messages"]) + await rtvi.push_frame(frame) + + if run_immediately: + frame = context_aggregator.user().get_context_frame() + await rtvi.push_frame(frame) + + return True + + action_llm_append_to_messages = RTVIAction( + service="llm", + action="append_to_messages", + result="bool", + arguments=[ + RTVIActionArgument(name="messages", type="array"), + RTVIActionArgument(name="run_immediately", type="bool"), + ], + handler=action_llm_append_to_messages_handler, + ) + + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + rtvi.register_action(action_llm_append_to_messages) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) + + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info("Pipecat client ready.") + await rtvi.set_bot_ready() + + # This block is frontend UI specific + # These messages are intended for small webrtc UI to only handle text + # https://github.com/pipecat-ai/small-webrtc-prebuilt + messages = { + "show_text_container": True, + "show_debug_container": False, + } + rtvi_frame = RTVIServerMessageFrame(data=messages) + await task.queue_frames([rtvi_frame]) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() From 575b97ba60ce87bd012c4f25f2e622a244a3c655 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 23 May 2025 11:14:56 -0300 Subject: [PATCH 3/4] Some improvements and cleanups in the SmallWebRTCTransport text examples. --- examples/foundational/41a-text-only-webrtc.py | 200 ++++++++--------- .../foundational/41b-text-and-audio-webrtc.py | 212 +++++++++--------- 2 files changed, 194 insertions(+), 218 deletions(-) diff --git a/examples/foundational/41a-text-only-webrtc.py b/examples/foundational/41a-text-only-webrtc.py index 1bc3389fb..e154126a8 100644 --- a/examples/foundational/41a-text-only-webrtc.py +++ b/examples/foundational/41a-text-only-webrtc.py @@ -5,30 +5,18 @@ # import argparse -import asyncio -import io import os -import re -import shutil -import sys -import aiohttp from dotenv import load_dotenv from loguru import logger -from PIL import Image -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( - Frame, - FunctionCallResultFrame, LLMMessagesAppendFrame, - URLImageRawFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import ( ActionResult, RTVIAction, @@ -38,6 +26,7 @@ from pipecat.processors.frameworks.rtvi import ( RTVIProcessor, RTVIServerMessageFrame, ) +from pipecat.services.openai import OpenAIContextAggregatorPair from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import TransportParams from pipecat.transports.network.small_webrtc import SmallWebRTCTransport @@ -45,11 +34,43 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) + # This is an example of a text-only chatbot using small webrtc tranport. # It uses the small webrtc transport prebuilt web UI. # https://github.com/pipecat-ai/small-webrtc-prebuilt +def create_action_llm_append_to_messages(context_aggregator: OpenAIContextAggregatorPair): + async def action_llm_append_to_messages_handler( + rtvi: RTVIProcessor, service: str, arguments: dict[str, any] + ) -> ActionResult: + run_immediately = arguments["run_immediately"] if "run_immediately" in arguments else True + logger.info(f"run_immediately: {run_immediately}") + if run_immediately: + await rtvi.interrupt_bot() + # We just interrupted the bot so it should be fine to use the + # context directly instead of through frame. + if "messages" in arguments and arguments["messages"]: + frame = LLMMessagesAppendFrame(messages=arguments["messages"]) + await rtvi.push_frame(frame) + + frame = context_aggregator.user().get_context_frame() + await rtvi.push_frame(frame) + return True + + action_llm_append_to_messages = RTVIAction( + service="llm", + action="append_to_messages", + result="bool", + arguments=[ + RTVIActionArgument(name="messages", type="array"), + RTVIActionArgument(name="run_immediately", type="bool"), + ], + handler=action_llm_append_to_messages_handler, + ) + return action_llm_append_to_messages + + async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): logger.info(f"Starting bot") @@ -58,111 +79,76 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac params=TransportParams(), ) - # Create an HTTP session for API calls - async with aiohttp.ClientSession() as session: - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way.", - }, + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + action_llm_append_to_messages = create_action_llm_append_to_messages(context_aggregator) + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + rtvi.register_action(action_llm_append_to_messages) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + context_aggregator.user(), + llm, + transport.output(), + context_aggregator.assistant(), ] + ) - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) - async def action_llm_append_to_messages_handler( - rtvi: RTVIProcessor, service: str, arguments: dict[str, any] - ) -> ActionResult: - run_immediately = ( - arguments["run_immediately"] if "run_immediately" in arguments else True - ) + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info("Pipecat client ready.") + await rtvi.set_bot_ready() - if run_immediately: - await rtvi.interrupt_bot() + # This block is frontend UI specific + # These messages are intended for small webrtc UI to only handle text + # https://github.com/pipecat-ai/small-webrtc-prebuilt + messages = { + "show_text_container": True, + "show_video_container": False, + "show_debug_container": False, + } + rtvi_frame = RTVIServerMessageFrame(data=messages) + await task.queue_frames([rtvi_frame]) - # We just interrupted the bot so it should be fine to use the - # context directly instead of through frame. - if "messages" in arguments and arguments["messages"]: - mess = arguments["messages"] - frame = LLMMessagesAppendFrame(messages=arguments["messages"]) - await rtvi.push_frame(frame) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) - if run_immediately: - frame = context_aggregator.user().get_context_frame() - await rtvi.push_frame(frame) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") - return True + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() - action_llm_append_to_messages = RTVIAction( - service="llm", - action="append_to_messages", - result="bool", - arguments=[ - RTVIActionArgument(name="messages", type="array"), - RTVIActionArgument(name="run_immediately", type="bool"), - ], - handler=action_llm_append_to_messages_handler, - ) + runner = PipelineRunner(handle_sigint=False) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - rtvi.register_action(action_llm_append_to_messages) - - pipeline = Pipeline( - [ - transport.input(), - rtvi, - context_aggregator.user(), - llm, - transport.output(), - context_aggregator.assistant(), - ] - ) - - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - ), - observers=[RTVIObserver(rtvi)], - ) - - @rtvi.event_handler("on_client_ready") - async def on_client_ready(rtvi): - logger.info("Pipecat client ready.") - await rtvi.set_bot_ready() - - # This block is frontend UI specific - # These messages are intended for small webrtc UI to only handle text - # https://github.com/pipecat-ai/small-webrtc-prebuilt - messages = { - "show_text_container": True, - "show_video_container": False, - "show_debug_container": False, - } - rtvi_frame = RTVIServerMessageFrame(data=messages) - await task.queue_frames([rtvi_frame]) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - # Kick off the conversation. - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - - @transport.event_handler("on_client_closed") - async def on_client_closed(transport, client): - logger.info(f"Client closed connection") - await task.cancel() - - runner = PipelineRunner(handle_sigint=False) - - await runner.run(task) + await runner.run(task) if __name__ == "__main__": diff --git a/examples/foundational/41b-text-and-audio-webrtc.py b/examples/foundational/41b-text-and-audio-webrtc.py index 2629198e3..ae701fe4d 100644 --- a/examples/foundational/41b-text-and-audio-webrtc.py +++ b/examples/foundational/41b-text-and-audio-webrtc.py @@ -5,30 +5,19 @@ # import argparse -import asyncio -import io import os -import re -import shutil -import sys -import aiohttp from dotenv import load_dotenv from loguru import logger -from PIL import Image from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( - Frame, - FunctionCallResultFrame, LLMMessagesAppendFrame, - URLImageRawFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import ( ActionResult, RTVIAction, @@ -40,6 +29,7 @@ from pipecat.processors.frameworks.rtvi import ( ) from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai import OpenAIContextAggregatorPair from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import TransportParams from pipecat.transports.network.small_webrtc import SmallWebRTCTransport @@ -52,6 +42,41 @@ load_dotenv(override=True) # https://github.com/pipecat-ai/small-webrtc-prebuilt +def create_action_llm_append_to_messages(context_aggregator: OpenAIContextAggregatorPair): + async def action_llm_append_to_messages_handler( + rtvi: RTVIProcessor, service: str, arguments: dict[str, any] + ) -> ActionResult: + run_immediately = arguments["run_immediately"] if "run_immediately" in arguments else True + + if run_immediately: + await rtvi.interrupt_bot() + + # We just interrupted the bot so it should be fine to use the + # context directly instead of through frame. + if "messages" in arguments and arguments["messages"]: + mess = arguments["messages"] + frame = LLMMessagesAppendFrame(messages=arguments["messages"]) + await rtvi.push_frame(frame) + + if run_immediately: + frame = context_aggregator.user().get_context_frame() + await rtvi.push_frame(frame) + + return True + + action_llm_append_to_messages = RTVIAction( + service="llm", + action="append_to_messages", + result="bool", + arguments=[ + RTVIActionArgument(name="messages", type="array"), + RTVIActionArgument(name="run_immediately", type="bool"), + ], + handler=action_llm_append_to_messages_handler, + ) + return action_llm_append_to_messages + + async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): logger.info(f"Starting bot") @@ -64,118 +89,83 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac ), ) - # Create an HTTP session for API calls - async with aiohttp.ClientSession() as session: - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121" - ) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121" + ) - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user says in a creative and helpful way. Explain to the User they can speak or type text to communicate with you.", - }, + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user says in a creative and helpful way. Explain to the User they can speak or type text to communicate with you.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + action_llm_append_to_messages = create_action_llm_append_to_messages(context_aggregator) + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + rtvi.register_action(action_llm_append_to_messages) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), ] + ) - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) - async def action_llm_append_to_messages_handler( - rtvi: RTVIProcessor, service: str, arguments: dict[str, any] - ) -> ActionResult: - run_immediately = ( - arguments["run_immediately"] if "run_immediately" in arguments else True - ) + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info("Pipecat client ready.") + await rtvi.set_bot_ready() - if run_immediately: - await rtvi.interrupt_bot() + # This block is frontend UI specific + # These messages are intended for small webrtc UI to only handle text + # https://github.com/pipecat-ai/small-webrtc-prebuilt + messages = { + "show_text_container": True, + "show_debug_container": False, + } + rtvi_frame = RTVIServerMessageFrame(data=messages) + await task.queue_frames([rtvi_frame]) - # We just interrupted the bot so it should be fine to use the - # context directly instead of through frame. - if "messages" in arguments and arguments["messages"]: - mess = arguments["messages"] - frame = LLMMessagesAppendFrame(messages=arguments["messages"]) - await rtvi.push_frame(frame) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) - if run_immediately: - frame = context_aggregator.user().get_context_frame() - await rtvi.push_frame(frame) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") - return True + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() - action_llm_append_to_messages = RTVIAction( - service="llm", - action="append_to_messages", - result="bool", - arguments=[ - RTVIActionArgument(name="messages", type="array"), - RTVIActionArgument(name="run_immediately", type="bool"), - ], - handler=action_llm_append_to_messages_handler, - ) + runner = PipelineRunner(handle_sigint=False) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - rtvi.register_action(action_llm_append_to_messages) - - pipeline = Pipeline( - [ - transport.input(), - rtvi, - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] - ) - - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - ), - observers=[RTVIObserver(rtvi)], - ) - - @rtvi.event_handler("on_client_ready") - async def on_client_ready(rtvi): - logger.info("Pipecat client ready.") - await rtvi.set_bot_ready() - - # This block is frontend UI specific - # These messages are intended for small webrtc UI to only handle text - # https://github.com/pipecat-ai/small-webrtc-prebuilt - messages = { - "show_text_container": True, - "show_debug_container": False, - } - rtvi_frame = RTVIServerMessageFrame(data=messages) - await task.queue_frames([rtvi_frame]) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - # Kick off the conversation. - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - - @transport.event_handler("on_client_closed") - async def on_client_closed(transport, client): - logger.info(f"Client closed connection") - await task.cancel() - - runner = PipelineRunner(handle_sigint=False) - - await runner.run(task) + await runner.run(task) if __name__ == "__main__": From 821da723c056bc5a0a46458a32151d92920e955e Mon Sep 17 00:00:00 2001 From: vipyne Date: Wed, 28 May 2025 10:16:27 -0500 Subject: [PATCH 4/4] update SmallWebRTCTransport text examples with new run_example --- examples/foundational/41a-text-only-webrtc.py | 145 ++++++++-------- .../foundational/41b-text-and-audio-webrtc.py | 163 +++++++++--------- 2 files changed, 159 insertions(+), 149 deletions(-) diff --git a/examples/foundational/41a-text-only-webrtc.py b/examples/foundational/41a-text-only-webrtc.py index e154126a8..f27bcca76 100644 --- a/examples/foundational/41a-text-only-webrtc.py +++ b/examples/foundational/41a-text-only-webrtc.py @@ -7,6 +7,7 @@ import argparse import os +import aiohttp from dotenv import load_dotenv from loguru import logger @@ -28,9 +29,8 @@ from pipecat.processors.frameworks.rtvi import ( ) from pipecat.services.openai import OpenAIContextAggregatorPair from pipecat.services.openai.llm import OpenAILLMService -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.network.small_webrtc import SmallWebRTCTransport -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.services.daily import DailyParams load_dotenv(override=True) @@ -71,87 +71,92 @@ def create_action_llm_append_to_messages(context_aggregator: OpenAIContextAggreg return action_llm_append_to_messages -async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "webrtc": lambda: TransportParams(), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): logger.info(f"Starting bot") - transport = SmallWebRTCTransport( - webrtc_connection=webrtc_connection, - params=TransportParams(), - ) + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way.", - }, - ] - - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) - - action_llm_append_to_messages = create_action_llm_append_to_messages(context_aggregator) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - rtvi.register_action(action_llm_append_to_messages) - - pipeline = Pipeline( - [ - transport.input(), - rtvi, - context_aggregator.user(), - llm, - transport.output(), - context_aggregator.assistant(), + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user said in a creative and helpful way.", + }, ] - ) - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - ), - observers=[RTVIObserver(rtvi)], - ) + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) - @rtvi.event_handler("on_client_ready") - async def on_client_ready(rtvi): - logger.info("Pipecat client ready.") - await rtvi.set_bot_ready() + action_llm_append_to_messages = create_action_llm_append_to_messages(context_aggregator) + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + rtvi.register_action(action_llm_append_to_messages) - # This block is frontend UI specific - # These messages are intended for small webrtc UI to only handle text - # https://github.com/pipecat-ai/small-webrtc-prebuilt - messages = { - "show_text_container": True, - "show_video_container": False, - "show_debug_container": False, - } - rtvi_frame = RTVIServerMessageFrame(data=messages) - await task.queue_frames([rtvi_frame]) + pipeline = Pipeline( + [ + transport.input(), + rtvi, + context_aggregator.user(), + llm, + transport.output(), + context_aggregator.assistant(), + ] + ) - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - # Kick off the conversation. - await task.queue_frames([context_aggregator.user().get_context_frame()]) + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info("Pipecat client ready.") + await rtvi.set_bot_ready() - @transport.event_handler("on_client_closed") - async def on_client_closed(transport, client): - logger.info(f"Client closed connection") - await task.cancel() + # This block is frontend UI specific + # These messages are intended for small webrtc UI to only handle text + # https://github.com/pipecat-ai/small-webrtc-prebuilt + messages = { + "show_text_container": True, + "show_video_container": False, + "show_debug_container": False, + } + rtvi_frame = RTVIServerMessageFrame(data=messages) + await task.queue_frames([rtvi_frame]) - runner = PipelineRunner(handle_sigint=False) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) - await runner.run(task) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) if __name__ == "__main__": from run import main - main() + main(run_example, transport_params=transport_params) diff --git a/examples/foundational/41b-text-and-audio-webrtc.py b/examples/foundational/41b-text-and-audio-webrtc.py index ae701fe4d..1d7e4cfa5 100644 --- a/examples/foundational/41b-text-and-audio-webrtc.py +++ b/examples/foundational/41b-text-and-audio-webrtc.py @@ -7,6 +7,7 @@ import argparse import os +import aiohttp from dotenv import load_dotenv from loguru import logger @@ -31,9 +32,8 @@ from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai import OpenAIContextAggregatorPair from pipecat.services.openai.llm import OpenAILLMService -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.network.small_webrtc import SmallWebRTCTransport -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.services.daily import DailyParams load_dotenv(override=True) @@ -77,98 +77,103 @@ def create_action_llm_append_to_messages(context_aggregator: OpenAIContextAggreg return action_llm_append_to_messages -async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): logger.info(f"Starting bot") - transport = SmallWebRTCTransport( - webrtc_connection=webrtc_connection, - params=TransportParams( - audio_in_enabled=True, - audio_out_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - ), - ) + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121" + ) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121" - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user says in a creative and helpful way. Explain to the User they can speak or type text to communicate with you.", - }, - ] - - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) - - action_llm_append_to_messages = create_action_llm_append_to_messages(context_aggregator) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - rtvi.register_action(action_llm_append_to_messages) - - pipeline = Pipeline( - [ - transport.input(), - rtvi, - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Respond to what the user says in a creative and helpful way. Explain to the User they can speak or type text to communicate with you.", + }, ] - ) - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - ), - observers=[RTVIObserver(rtvi)], - ) + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) - @rtvi.event_handler("on_client_ready") - async def on_client_ready(rtvi): - logger.info("Pipecat client ready.") - await rtvi.set_bot_ready() + action_llm_append_to_messages = create_action_llm_append_to_messages(context_aggregator) + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + rtvi.register_action(action_llm_append_to_messages) - # This block is frontend UI specific - # These messages are intended for small webrtc UI to only handle text - # https://github.com/pipecat-ai/small-webrtc-prebuilt - messages = { - "show_text_container": True, - "show_debug_container": False, - } - rtvi_frame = RTVIServerMessageFrame(data=messages) - await task.queue_frames([rtvi_frame]) + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - # Kick off the conversation. - await task.queue_frames([context_aggregator.user().get_context_frame()]) + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info("Pipecat client ready.") + await rtvi.set_bot_ready() - @transport.event_handler("on_client_closed") - async def on_client_closed(transport, client): - logger.info(f"Client closed connection") - await task.cancel() + # This block is frontend UI specific + # These messages are intended for small webrtc UI to only handle text + # https://github.com/pipecat-ai/small-webrtc-prebuilt + messages = { + "show_text_container": True, + "show_debug_container": False, + } + rtvi_frame = RTVIServerMessageFrame(data=messages) + await task.queue_frames([rtvi_frame]) - runner = PipelineRunner(handle_sigint=False) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) - await runner.run(task) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) if __name__ == "__main__": from run import main - main() + main(run_example, transport_params=transport_params)