Compare commits

...

1 Commits

Author SHA1 Message Date
mattie ruth backman
424c2a341b Introduce new helper type for acting as a liason between rtvi messages and the llm
This somewhat parallels the helper system the RTVI clients have.
2025-04-15 12:14:29 -04:00
4 changed files with 226 additions and 1 deletions

View File

@@ -16,7 +16,7 @@
* - Browser with WebRTC support
*/
import { RTVIClient, RTVIEvent } from '@pipecat-ai/client-js';
import { LLMHelper, RTVIClient, RTVIEvent } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
/**
@@ -27,6 +27,7 @@ class ChatbotClient {
constructor() {
// Initialize client state
this.rtviClient = null;
this.llmHelper = null;
this.setupDOMElements();
this.setupEventListeners();
this.initializeClientAndTransport();
@@ -127,6 +128,13 @@ class ChatbotClient {
},
});
this.llmHelper = new LLMHelper({});
rtviClient.registerHelper('llm', this.llmHelper);
// Uncomment for debugging
// window.rtviClient = this.rtviClient;
// window.llmHelper = this.llmHelper;
// Set up listeners for media track events
this.setupTrackListeners();
}

View File

@@ -41,6 +41,8 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.processors.frameworks.rtvi_helpers.llm import RTVILLMHelper
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -182,6 +184,11 @@ async def main():
# RTVI events for Pipecat client UI
#
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
llmHelper = RTVILLMHelper(
service="llm",
user_aggregator=context_aggregator.user(),
)
llmHelper.register_actions(rtvi)
pipeline = Pipeline(
[

View File

@@ -0,0 +1,210 @@
import sys
from enum import Enum
from typing import Any, Dict, List
from openai._types import NotGiven
from pydantic import BaseModel
from pipecat.frames.frames import (
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
)
from pipecat.processors.aggregators.llm_response import (
LLMUserContextAggregator,
)
from pipecat.processors.frameworks.rtvi import (
ActionResult,
RTVIAction,
RTVIActionArgument,
RTVIProcessor,
)
if sys.version_info < (3, 11):
class StrEnum(str, Enum):
"""A string-based Enum class for Python versions < 3.11."""
def __new__(cls, value):
"""Constructor for StrEnum."""
obj = str.__new__(cls, value)
obj._value_ = value
return obj
else:
from enum import StrEnum
class RTVILLMActionType(StrEnum):
"""Enum for RTVI LLM action types."""
APPEND_TO_MESSAGES = "append_to_messages"
GET_CONTEXT = "get_context"
SET_CONTEXT = "set_context"
RUN = "run"
class RTVIHelper(BaseModel):
"""Abstract class for helpers meant to handle various service related requests."""
def __init__(self, service: str):
super().__init__()
self._service = service
self._actions = []
def register_actions(self, rtvi: RTVIProcessor):
"""Register the actions for the RTVI LLM helper."""
for action in self._actions:
rtvi.register_action(action)
class RTVILLMHelper(RTVIHelper):
"""Helper class for handling RTVI LLM-related requests."""
def __init__(self, service: str, user_aggregator: LLMUserContextAggregator):
super().__init__(service)
self._user_aggregator = user_aggregator
self.setupActions()
def setupActions(self):
"""Set up the actions for the RTVI LLM helper."""
self._actions.append(RTVIAction(
service=self._service,
action=RTVILLMActionType.APPEND_TO_MESSAGES,
result="bool",
arguments=[
RTVIActionArgument(name="messages", type="array"),
RTVIActionArgument(name="run_immediately", type="bool"),
],
handler=self.append_to_messages_handler,
))
self._actions.append(RTVIAction(
service=self._service,
action=RTVILLMActionType.GET_CONTEXT,
result="array",
handler=self.get_context_handler
))
self._actions.append(RTVIAction(
service=self._service,
action=RTVILLMActionType.SET_CONTEXT,
result="bool",
arguments=[
RTVIActionArgument(name="messages", type="array"),
RTVIActionArgument(name="tools", type="array"),
],
handler=self.set_context_handler,
))
self._actions.append(RTVIAction(
service=self._service,
action=RTVILLMActionType.RUN,
result="bool",
arguments=[RTVIActionArgument(name="interrupt", type="bool")],
handler=self.run_handler,
))
async def append_to_messages_handler(
self, rtvi: RTVIProcessor, service: str, arguments: Dict[str, Any]) -> ActionResult:
"""Handle the LLM append-to-messages action.
Args:
rtvi: The RTVIProcessor instance managing the bot's real-time interaction.
service: The name of the service handling the action.
arguments: A dictionary of arguments for the action, including 'messages' and 'run_immediately'.
Returns:
ActionResult: A boolean indicating the success of the action.
"""
print('action_llm_append_to_messages_handler', arguments)
run_immediately = arguments["run_immediately"] if "run_immediately" in arguments else True
if run_immediately:
await rtvi.interrupt_bot()
# We just interrupted the bot so it should be fine to use the
# context directly instead of through frame.
if "messages" in arguments and arguments["messages"]:
frame = LLMMessagesAppendFrame(messages=arguments["messages"])
await rtvi.push_frame(frame)
if run_immediately:
frame = self._user_aggregator.get_context_frame()
await rtvi.push_frame(frame)
return True
async def get_context_handler(
self, rtvi: RTVIProcessor, service: str, arguments: Dict[str, Any]
) -> ActionResult:
"""Handle the RTVI get-context action.
Args:
rtvi: The RTVIProcessor instance managing the bot's real-time interaction.
service: The name of the service handling the action.
arguments: A dictionary of arguments for the action.
Returns:
ActionResult: A dictionary containing the context messages and tools.
"""
messages = self._user_aggregator.context.messages
tools = (
self._user_aggregator.context.tools
# TODO: Is it ok that we have to depend on an openai type here?
if not isinstance(self._user_aggregator.context.tools, NotGiven)
else []
)
result = {"messages": messages, "tools": tools}
return result
async def set_context_handler(
self, rtvi: RTVIProcessor, service: str, arguments: Dict[str, Any]
) -> ActionResult:
"""Handle the RTVI set-context action.
Args:
rtvi: The RTVIProcessor instance managing the bot's real-time interaction.
service: The name of the service handling the action.
arguments: A dictionary of arguments for the action, including 'messages' and 'tools'.
Returns:
ActionResult: A boolean indicating the success of the action.
"""
run_immediately = arguments["run_immediately"] if "run_immediately" in arguments else True
if run_immediately:
await rtvi.interrupt_bot()
# We just interrupted the bot so it should be find to use the
# context directly instead of through frame.
if "messages" in arguments and arguments["messages"]:
frame = LLMMessagesUpdateFrame(messages=arguments["messages"])
await rtvi.push_frame(frame)
if "tools" in arguments and arguments["tools"]:
frame = LLMSetToolsFrame(tools=arguments["tools"])
await rtvi.push_frame(frame)
if run_immediately:
frame = self._user_aggregator.get_context_frame()
await rtvi.push_frame(frame)
return True
async def run_handler(
self, rtvi: RTVIProcessor, service: str, arguments: Dict[str, Any]
) -> ActionResult:
"""Handle the RTVI run action.
Args:
rtvi: The RTVIProcessor instance managing the bot's real-time interaction.
service: The name of the service handling the action.
arguments: A dictionary of arguments for the action, including 'interrupt'.
Returns:
ActionResult: A boolean indicating the success of the action.
"""
interrupt = arguments["interrupt"] if "interrupt" in arguments else True
if interrupt:
await rtvi.interrupt_bot()
frame = self._user_aggregator.get_context_frame()
await rtvi.push_frame(frame)
return True