diff --git a/CHANGELOG.md b/CHANGELOG.md index 05a53fc44..49ea56824 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added RTVI messages for user/bot audio levels and system logs. + - Include OpenAI-based LLM services cached tokens to `MetricsFrame`. ### Changed diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 3f0abdab0..ec330261a 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -13,6 +13,7 @@ and frame observation for the RTVI protocol. import asyncio import base64 +import time from dataclasses import dataclass from typing import ( Any, @@ -29,6 +30,7 @@ from typing import ( from loguru import logger from pydantic import BaseModel, Field, PrivateAttr, ValidationError +from pipecat.audio.utils import calculate_audio_volume from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, @@ -52,6 +54,7 @@ from pipecat.frames.frames import ( SystemFrame, TranscriptionFrame, TransportMessageUrgentFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TTSTextFrame, @@ -613,9 +616,9 @@ class RTVIAppendToContextData(BaseModel): Contains the role, content, and whether to run the message immediately. - .. deprecated:: 0.0.85 - The RTVI message, append-to-context, has been deprecated. Use send-text - or custom client and server messages instead. + .. deprecated:: 0.0.85 + The RTVI message, append-to-context, has been deprecated. Use send-text + or custom client and server messages instead. """ role: Literal["user", "assistant"] | str @@ -839,6 +842,36 @@ class RTVIServerMessage(BaseModel): data: Any +class RTVIAudioLevelMessageData(BaseModel): + """Data format for sending audio levels.""" + + value: float + + +class RTVIUserAudioLevelMessage(BaseModel): + """Message indicating user audio level.""" + + label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL + type: Literal["user-audio-level"] = "user-audio-level" + data: RTVIAudioLevelMessageData + + +class RTVIBotAudioLevelMessage(BaseModel): + """Message indicating bot audio level.""" + + label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL + type: Literal["bot-audio-level"] = "bot-audio-level" + data: RTVIAudioLevelMessageData + + +class RTVISystemLogMessage(BaseModel): + """Message including a system log.""" + + label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL + type: Literal["system-log"] = "system-log" + data: RTVITextMessageData + + @dataclass class RTVIServerMessageFrame(SystemFrame): """A frame for sending server messages to the client. @@ -858,25 +891,36 @@ class RTVIServerMessageFrame(SystemFrame): class RTVIObserverParams: """Parameters for configuring RTVI Observer behavior. + .. deprecated:: 0.0.87 + Parameter `errors_enabled` is deprecated. Error messages are always enabled. + Parameters: bot_llm_enabled: Indicates if the bot's LLM messages should be sent. bot_tts_enabled: Indicates if the bot's TTS messages should be sent. bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent. + bot_audio_level_enabled: Indicates if bot's audio level messages should be sent. user_llm_enabled: Indicates if the user's LLM input messages should be sent. user_speaking_enabled: Indicates if the user's started/stopped speaking messages should be sent. user_transcription_enabled: Indicates if user's transcription messages should be sent. + user_audio_level_enabled: Indicates if user's audio level messages should be sent. metrics_enabled: Indicates if metrics messages should be sent. - errors_enabled: Indicates if errors messages should be sent. + system_logs_enabled: Indicates if system logs should be sent. + errors_enabled: [Deprecated] Indicates if errors messages should be sent. + audio_level_period_secs: How often audio levels should be sent if enabled. """ bot_llm_enabled: bool = True bot_tts_enabled: bool = True bot_speaking_enabled: bool = True + bot_audio_level_enabled: bool = False user_llm_enabled: bool = True user_speaking_enabled: bool = True user_transcription_enabled: bool = True + user_audio_level_enabled: bool = False metrics_enabled: bool = True + system_logs_enabled: bool = False errors_enabled: bool = True + audio_level_period_secs: float = 0.15 class RTVIObserver(BaseObserver): @@ -892,7 +936,11 @@ class RTVIObserver(BaseObserver): """ def __init__( - self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs + self, + rtvi: Optional["RTVIProcessor"] = None, + *, + params: Optional[RTVIObserverParams] = None, + **kwargs, ): """Initialize the RTVI observer. @@ -904,9 +952,50 @@ class RTVIObserver(BaseObserver): super().__init__(**kwargs) self._rtvi = rtvi self._params = params or RTVIObserverParams() - self._bot_transcription = "" + self._frames_seen = set() - rtvi.set_errors_enabled(self._params.errors_enabled) + + self._bot_transcription = "" + self._last_user_audio_level = 0 + self._last_bot_audio_level = 0 + + if self._params.system_logs_enabled: + self._system_logger_id = logger.add(self._logger_sink) + + if self._params.errors_enabled: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Parameter `errors_enabled` is deprecated. Error messages are always enabled.", + DeprecationWarning, + ) + + async def _logger_sink(self, message): + """Logger sink so we cna send system logs to RTVI clients.""" + message = RTVISystemLogMessage(data=RTVITextMessageData(text=message)) + await self.send_rtvi_message(message) + + async def cleanup(self): + """Cleanup RTVI observer resources.""" + await super().cleanup() + if self._params.system_logs_enabled: + logger.remove(self._system_logger_id) + + async def send_rtvi_message(self, model: BaseModel, exclude_none: bool = True): + """Send an RTVI message. + + By default, we push a transport frame. But this function can be + overriden by subclass to send RTVI messages in different ways. + + Args: + model: The message to send. + exclude_none: Whether to exclude None values from the model dump. + + """ + if self._rtvi: + await self._rtvi.push_transport_message(model, exclude_none) async def on_push_frame(self, data: FramePushed): """Process a frame being pushed through the pipeline. @@ -948,52 +1037,58 @@ class RTVIObserver(BaseObserver): ): await self._handle_context(frame) elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled: - await self.push_transport_message_urgent(RTVIBotLLMStartedMessage()) + await self.send_rtvi_message(RTVIBotLLMStartedMessage()) elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled: - await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage()) + await self.send_rtvi_message(RTVIBotLLMStoppedMessage()) elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled: await self._handle_llm_text_frame(frame) elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled: - await self.push_transport_message_urgent(RTVIBotTTSStartedMessage()) + await self.send_rtvi_message(RTVIBotTTSStartedMessage()) elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled: - await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage()) + await self.send_rtvi_message(RTVIBotTTSStoppedMessage()) elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled: if isinstance(src, BaseOutputTransport): message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text)) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) else: mark_as_seen = False elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled: await self._handle_metrics(frame) elif isinstance(frame, RTVIServerMessageFrame): message = RTVIServerMessage(data=frame.data) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) elif isinstance(frame, RTVIServerResponseFrame): if frame.error is not None: await self._send_error_response(frame) else: await self._send_server_response(frame) + elif isinstance(frame, InputAudioRawFrame) and self._params.user_audio_level_enabled: + curr_time = time.time() + diff_time = curr_time - self._last_user_audio_level + if diff_time > self._params.audio_level_period_secs: + level = calculate_audio_volume(frame.audio, frame.sample_rate) + message = RTVIUserAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level)) + await self.send_rtvi_message(message) + self._last_user_audio_level = curr_time + elif isinstance(frame, TTSAudioRawFrame) and self._params.bot_audio_level_enabled: + curr_time = time.time() + diff_time = curr_time - self._last_bot_audio_level + if diff_time > self._params.audio_level_period_secs: + level = calculate_audio_volume(frame.audio, frame.sample_rate) + message = RTVIBotAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level)) + await self.send_rtvi_message(message) + self._last_bot_audio_level = curr_time if mark_as_seen: self._frames_seen.add(frame.id) - async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True): - """Push an urgent transport message to the RTVI processor. - - Args: - model: The message model to send. - exclude_none: Whether to exclude None values from the model dump. - """ - frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none)) - await self._rtvi.push_frame(frame) - async def _push_bot_transcription(self): """Push accumulated bot transcription as a message.""" if len(self._bot_transcription) > 0: message = RTVIBotTranscriptionMessage( data=RTVITextMessageData(text=self._bot_transcription) ) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) self._bot_transcription = "" async def _handle_interruptions(self, frame: Frame): @@ -1005,7 +1100,7 @@ class RTVIObserver(BaseObserver): message = RTVIUserStoppedSpeakingMessage() if message: - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) async def _handle_bot_speaking(self, frame: Frame): """Handle bot speaking event frames.""" @@ -1016,12 +1111,12 @@ class RTVIObserver(BaseObserver): message = RTVIBotStoppedSpeakingMessage() if message: - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) async def _handle_llm_text_frame(self, frame: LLMTextFrame): """Handle LLM text output frames.""" message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text)) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) self._bot_transcription += frame.text if match_endofsentence(self._bot_transcription): @@ -1044,7 +1139,7 @@ class RTVIObserver(BaseObserver): ) if message: - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) async def _handle_context(self, frame: OpenAILLMContextFrame | LLMContextFrame): """Process LLM context frames to extract user messages for the RTVI client.""" @@ -1064,7 +1159,7 @@ class RTVIObserver(BaseObserver): text = "".join(part.text for part in message.parts if hasattr(part, "text")) if text: rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text)) - await self.push_transport_message_urgent(rtvi_message) + await self.send_rtvi_message(rtvi_message) # Handle OpenAI format (original implementation) elif isinstance(message, dict): @@ -1075,7 +1170,7 @@ class RTVIObserver(BaseObserver): else: text = content rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text)) - await self.push_transport_message_urgent(rtvi_message) + await self.send_rtvi_message(rtvi_message) except Exception as e: logger.warning(f"Caught an error while trying to handle context: {e}") @@ -1102,7 +1197,7 @@ class RTVIObserver(BaseObserver): metrics["characters"].append(d.model_dump(exclude_none=True)) message = RTVIMetricsMessage(data=metrics) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) async def _send_server_response(self, frame: RTVIServerResponseFrame): """Send a response to the client for a specific request.""" @@ -1110,7 +1205,7 @@ class RTVIObserver(BaseObserver): id=str(frame.client_msg.msg_id), data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data), ) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) async def _send_error_response(self, frame: RTVIServerResponseFrame): """Send a response to the client for a specific request.""" @@ -1118,7 +1213,7 @@ class RTVIObserver(BaseObserver): message = RTVIErrorResponse( id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error) ) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) class RTVIProcessor(FrameProcessor): @@ -1152,7 +1247,6 @@ class RTVIProcessor(FrameProcessor): # Default to 0.3.0 which is the last version before actually having a # "client-version". self._client_version = [0, 3, 0] - self._errors_enabled = True self._skip_tts: bool = False # Keep in sync with llm_service.py self._registered_actions: Dict[str, RTVIAction] = {} @@ -1222,14 +1316,6 @@ class RTVIProcessor(FrameProcessor): await self._update_config(self._config, False) await self._send_bot_ready() - def set_errors_enabled(self, enabled: bool): - """Enable or disable error message sending. - - Args: - enabled: Whether to send error messages. - """ - self._errors_enabled = enabled - async def interrupt_bot(self): """Send a bot interruption frame upstream.""" await self.push_interruption_task_frame_and_wait() @@ -1258,6 +1344,11 @@ class RTVIProcessor(FrameProcessor): """ await self._send_error_frame(ErrorFrame(error=error)) + async def push_transport_message(self, model: BaseModel, exclude_none: bool = True): + """Push a transport message frame.""" + frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none)) + await self.push_frame(frame) + async def handle_message(self, message: RTVIMessage): """Handle an incoming RTVI message. @@ -1278,7 +1369,7 @@ class RTVIProcessor(FrameProcessor): args=params.arguments, ) message = RTVILLMFunctionCallMessage(data=fn) - await self._push_transport_message(message, exclude_none=False) + await self.push_transport_message(message, exclude_none=False) async def handle_function_call_start( self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext @@ -1305,7 +1396,7 @@ class RTVIProcessor(FrameProcessor): fn = RTVILLMFunctionCallStartMessageData(function_name=function_name) message = RTVILLMFunctionCallStartMessage(data=fn) - await self._push_transport_message(message, exclude_none=False) + await self.push_transport_message(message, exclude_none=False) async def process_frame(self, frame: Frame, direction: FrameDirection): """Process incoming frames through the RTVI processor. @@ -1377,11 +1468,6 @@ class RTVIProcessor(FrameProcessor): await self.cancel_task(self._message_task) self._message_task = None - async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): - """Push a transport message frame.""" - frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none)) - await self.push_frame(frame) - async def _action_task_handler(self): """Handle incoming action frames.""" while True: @@ -1518,7 +1604,7 @@ class RTVIProcessor(FrameProcessor): services = list(self._registered_services.values()) message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services)) - await self._push_transport_message(message) + await self.push_transport_message(message) async def _handle_describe_actions(self, request_id: str): """Handle a describe-actions request.""" @@ -1533,7 +1619,7 @@ class RTVIProcessor(FrameProcessor): actions = list(self._registered_actions.values()) message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions)) - await self._push_transport_message(message) + await self.push_transport_message(message) async def _handle_get_config(self, request_id: str): """Handle a get-config request.""" @@ -1547,7 +1633,7 @@ class RTVIProcessor(FrameProcessor): ) message = RTVIConfigResponse(id=request_id, data=self._config) - await self._push_transport_message(message) + await self.push_transport_message(message) def _update_config_option(self, service: str, config: RTVIServiceOptionConfig): """Update a specific configuration option.""" @@ -1672,7 +1758,7 @@ class RTVIProcessor(FrameProcessor): # action responses (such as webhooks) don't set a request_id if request_id: message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result)) - await self._push_transport_message(message) + await self.push_transport_message(message) async def _send_bot_ready(self): """Send the bot-ready message to the client.""" @@ -1683,23 +1769,21 @@ class RTVIProcessor(FrameProcessor): id=self._client_ready_id, data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config), ) - await self._push_transport_message(message) + await self.push_transport_message(message) async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse): """Send a message or response to the client.""" - await self._push_transport_message(message) + await self.push_transport_message(message) async def _send_error_frame(self, frame: ErrorFrame): """Send an error frame as an RTVI error message.""" - if self._errors_enabled: - message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal)) - await self._push_transport_message(message) + message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal)) + await self.push_transport_message(message) async def _send_error_response(self, id: str, error: str): """Send an error response message.""" - if self._errors_enabled: - message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error)) - await self._push_transport_message(message) + message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error)) + await self.push_transport_message(message) def _action_id(self, service: str, action: str) -> str: """Generate an action ID from service and action names."""