Merge pull request #2761 from pipecat-ai/aleix/rtvi-tail-updates

RTVI updates: audio levels and system logs
This commit is contained in:
Aleix Conchillo Flaqué
2025-09-30 13:55:17 -07:00
committed by GitHub
2 changed files with 147 additions and 61 deletions

View File

@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added RTVI messages for user/bot audio levels and system logs.
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
### Changed

View File

@@ -13,6 +13,7 @@ and frame observation for the RTVI protocol.
import asyncio
import base64
import time
from dataclasses import dataclass
from typing import (
Any,
@@ -29,6 +30,7 @@ from typing import (
from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
from pipecat.audio.utils import calculate_audio_volume
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
@@ -52,6 +54,7 @@ from pipecat.frames.frames import (
SystemFrame,
TranscriptionFrame,
TransportMessageUrgentFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
@@ -613,9 +616,9 @@ class RTVIAppendToContextData(BaseModel):
Contains the role, content, and whether to run the message immediately.
.. deprecated:: 0.0.85
The RTVI message, append-to-context, has been deprecated. Use send-text
or custom client and server messages instead.
.. deprecated:: 0.0.85
The RTVI message, append-to-context, has been deprecated. Use send-text
or custom client and server messages instead.
"""
role: Literal["user", "assistant"] | str
@@ -839,6 +842,36 @@ class RTVIServerMessage(BaseModel):
data: Any
class RTVIAudioLevelMessageData(BaseModel):
"""Data format for sending audio levels."""
value: float
class RTVIUserAudioLevelMessage(BaseModel):
"""Message indicating user audio level."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-audio-level"] = "user-audio-level"
data: RTVIAudioLevelMessageData
class RTVIBotAudioLevelMessage(BaseModel):
"""Message indicating bot audio level."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-audio-level"] = "bot-audio-level"
data: RTVIAudioLevelMessageData
class RTVISystemLogMessage(BaseModel):
"""Message including a system log."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["system-log"] = "system-log"
data: RTVITextMessageData
@dataclass
class RTVIServerMessageFrame(SystemFrame):
"""A frame for sending server messages to the client.
@@ -858,25 +891,36 @@ class RTVIServerMessageFrame(SystemFrame):
class RTVIObserverParams:
"""Parameters for configuring RTVI Observer behavior.
.. deprecated:: 0.0.87
Parameter `errors_enabled` is deprecated. Error messages are always enabled.
Parameters:
bot_llm_enabled: Indicates if the bot's LLM messages should be sent.
bot_tts_enabled: Indicates if the bot's TTS messages should be sent.
bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent.
bot_audio_level_enabled: Indicates if bot's audio level messages should be sent.
user_llm_enabled: Indicates if the user's LLM input messages should be sent.
user_speaking_enabled: Indicates if the user's started/stopped speaking messages should be sent.
user_transcription_enabled: Indicates if user's transcription messages should be sent.
user_audio_level_enabled: Indicates if user's audio level messages should be sent.
metrics_enabled: Indicates if metrics messages should be sent.
errors_enabled: Indicates if errors messages should be sent.
system_logs_enabled: Indicates if system logs should be sent.
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
audio_level_period_secs: How often audio levels should be sent if enabled.
"""
bot_llm_enabled: bool = True
bot_tts_enabled: bool = True
bot_speaking_enabled: bool = True
bot_audio_level_enabled: bool = False
user_llm_enabled: bool = True
user_speaking_enabled: bool = True
user_transcription_enabled: bool = True
user_audio_level_enabled: bool = False
metrics_enabled: bool = True
system_logs_enabled: bool = False
errors_enabled: bool = True
audio_level_period_secs: float = 0.15
class RTVIObserver(BaseObserver):
@@ -892,7 +936,11 @@ class RTVIObserver(BaseObserver):
"""
def __init__(
self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs
self,
rtvi: Optional["RTVIProcessor"] = None,
*,
params: Optional[RTVIObserverParams] = None,
**kwargs,
):
"""Initialize the RTVI observer.
@@ -904,9 +952,50 @@ class RTVIObserver(BaseObserver):
super().__init__(**kwargs)
self._rtvi = rtvi
self._params = params or RTVIObserverParams()
self._bot_transcription = ""
self._frames_seen = set()
rtvi.set_errors_enabled(self._params.errors_enabled)
self._bot_transcription = ""
self._last_user_audio_level = 0
self._last_bot_audio_level = 0
if self._params.system_logs_enabled:
self._system_logger_id = logger.add(self._logger_sink)
if self._params.errors_enabled:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter `errors_enabled` is deprecated. Error messages are always enabled.",
DeprecationWarning,
)
async def _logger_sink(self, message):
"""Logger sink so we cna send system logs to RTVI clients."""
message = RTVISystemLogMessage(data=RTVITextMessageData(text=message))
await self.send_rtvi_message(message)
async def cleanup(self):
"""Cleanup RTVI observer resources."""
await super().cleanup()
if self._params.system_logs_enabled:
logger.remove(self._system_logger_id)
async def send_rtvi_message(self, model: BaseModel, exclude_none: bool = True):
"""Send an RTVI message.
By default, we push a transport frame. But this function can be
overriden by subclass to send RTVI messages in different ways.
Args:
model: The message to send.
exclude_none: Whether to exclude None values from the model dump.
"""
if self._rtvi:
await self._rtvi.push_transport_message(model, exclude_none)
async def on_push_frame(self, data: FramePushed):
"""Process a frame being pushed through the pipeline.
@@ -948,52 +1037,58 @@ class RTVIObserver(BaseObserver):
):
await self._handle_context(frame)
elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled:
await self.push_transport_message_urgent(RTVIBotLLMStartedMessage())
await self.send_rtvi_message(RTVIBotLLMStartedMessage())
elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled:
await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage())
await self.send_rtvi_message(RTVIBotLLMStoppedMessage())
elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled:
await self._handle_llm_text_frame(frame)
elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled:
await self.push_transport_message_urgent(RTVIBotTTSStartedMessage())
await self.send_rtvi_message(RTVIBotTTSStartedMessage())
elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled:
await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage())
await self.send_rtvi_message(RTVIBotTTSStoppedMessage())
elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled:
if isinstance(src, BaseOutputTransport):
message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text))
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
else:
mark_as_seen = False
elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled:
await self._handle_metrics(frame)
elif isinstance(frame, RTVIServerMessageFrame):
message = RTVIServerMessage(data=frame.data)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
elif isinstance(frame, RTVIServerResponseFrame):
if frame.error is not None:
await self._send_error_response(frame)
else:
await self._send_server_response(frame)
elif isinstance(frame, InputAudioRawFrame) and self._params.user_audio_level_enabled:
curr_time = time.time()
diff_time = curr_time - self._last_user_audio_level
if diff_time > self._params.audio_level_period_secs:
level = calculate_audio_volume(frame.audio, frame.sample_rate)
message = RTVIUserAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
await self.send_rtvi_message(message)
self._last_user_audio_level = curr_time
elif isinstance(frame, TTSAudioRawFrame) and self._params.bot_audio_level_enabled:
curr_time = time.time()
diff_time = curr_time - self._last_bot_audio_level
if diff_time > self._params.audio_level_period_secs:
level = calculate_audio_volume(frame.audio, frame.sample_rate)
message = RTVIBotAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
await self.send_rtvi_message(message)
self._last_bot_audio_level = curr_time
if mark_as_seen:
self._frames_seen.add(frame.id)
async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True):
"""Push an urgent transport message to the RTVI processor.
Args:
model: The message model to send.
exclude_none: Whether to exclude None values from the model dump.
"""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self._rtvi.push_frame(frame)
async def _push_bot_transcription(self):
"""Push accumulated bot transcription as a message."""
if len(self._bot_transcription) > 0:
message = RTVIBotTranscriptionMessage(
data=RTVITextMessageData(text=self._bot_transcription)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
self._bot_transcription = ""
async def _handle_interruptions(self, frame: Frame):
@@ -1005,7 +1100,7 @@ class RTVIObserver(BaseObserver):
message = RTVIUserStoppedSpeakingMessage()
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_bot_speaking(self, frame: Frame):
"""Handle bot speaking event frames."""
@@ -1016,12 +1111,12 @@ class RTVIObserver(BaseObserver):
message = RTVIBotStoppedSpeakingMessage()
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_llm_text_frame(self, frame: LLMTextFrame):
"""Handle LLM text output frames."""
message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text))
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
self._bot_transcription += frame.text
if match_endofsentence(self._bot_transcription):
@@ -1044,7 +1139,7 @@ class RTVIObserver(BaseObserver):
)
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_context(self, frame: OpenAILLMContextFrame | LLMContextFrame):
"""Process LLM context frames to extract user messages for the RTVI client."""
@@ -1064,7 +1159,7 @@ class RTVIObserver(BaseObserver):
text = "".join(part.text for part in message.parts if hasattr(part, "text"))
if text:
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
await self.push_transport_message_urgent(rtvi_message)
await self.send_rtvi_message(rtvi_message)
# Handle OpenAI format (original implementation)
elif isinstance(message, dict):
@@ -1075,7 +1170,7 @@ class RTVIObserver(BaseObserver):
else:
text = content
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
await self.push_transport_message_urgent(rtvi_message)
await self.send_rtvi_message(rtvi_message)
except Exception as e:
logger.warning(f"Caught an error while trying to handle context: {e}")
@@ -1102,7 +1197,7 @@ class RTVIObserver(BaseObserver):
metrics["characters"].append(d.model_dump(exclude_none=True))
message = RTVIMetricsMessage(data=metrics)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _send_server_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
@@ -1110,7 +1205,7 @@ class RTVIObserver(BaseObserver):
id=str(frame.client_msg.msg_id),
data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data),
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _send_error_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
@@ -1118,7 +1213,7 @@ class RTVIObserver(BaseObserver):
message = RTVIErrorResponse(
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
class RTVIProcessor(FrameProcessor):
@@ -1152,7 +1247,6 @@ class RTVIProcessor(FrameProcessor):
# Default to 0.3.0 which is the last version before actually having a
# "client-version".
self._client_version = [0, 3, 0]
self._errors_enabled = True
self._skip_tts: bool = False # Keep in sync with llm_service.py
self._registered_actions: Dict[str, RTVIAction] = {}
@@ -1222,14 +1316,6 @@ class RTVIProcessor(FrameProcessor):
await self._update_config(self._config, False)
await self._send_bot_ready()
def set_errors_enabled(self, enabled: bool):
"""Enable or disable error message sending.
Args:
enabled: Whether to send error messages.
"""
self._errors_enabled = enabled
async def interrupt_bot(self):
"""Send a bot interruption frame upstream."""
await self.push_interruption_task_frame_and_wait()
@@ -1258,6 +1344,11 @@ class RTVIProcessor(FrameProcessor):
"""
await self._send_error_frame(ErrorFrame(error=error))
async def push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self.push_frame(frame)
async def handle_message(self, message: RTVIMessage):
"""Handle an incoming RTVI message.
@@ -1278,7 +1369,7 @@ class RTVIProcessor(FrameProcessor):
args=params.arguments,
)
message = RTVILLMFunctionCallMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)
await self.push_transport_message(message, exclude_none=False)
async def handle_function_call_start(
self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext
@@ -1305,7 +1396,7 @@ class RTVIProcessor(FrameProcessor):
fn = RTVILLMFunctionCallStartMessageData(function_name=function_name)
message = RTVILLMFunctionCallStartMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)
await self.push_transport_message(message, exclude_none=False)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames through the RTVI processor.
@@ -1377,11 +1468,6 @@ class RTVIProcessor(FrameProcessor):
await self.cancel_task(self._message_task)
self._message_task = None
async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self.push_frame(frame)
async def _action_task_handler(self):
"""Handle incoming action frames."""
while True:
@@ -1518,7 +1604,7 @@ class RTVIProcessor(FrameProcessor):
services = list(self._registered_services.values())
message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _handle_describe_actions(self, request_id: str):
"""Handle a describe-actions request."""
@@ -1533,7 +1619,7 @@ class RTVIProcessor(FrameProcessor):
actions = list(self._registered_actions.values())
message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _handle_get_config(self, request_id: str):
"""Handle a get-config request."""
@@ -1547,7 +1633,7 @@ class RTVIProcessor(FrameProcessor):
)
message = RTVIConfigResponse(id=request_id, data=self._config)
await self._push_transport_message(message)
await self.push_transport_message(message)
def _update_config_option(self, service: str, config: RTVIServiceOptionConfig):
"""Update a specific configuration option."""
@@ -1672,7 +1758,7 @@ class RTVIProcessor(FrameProcessor):
# action responses (such as webhooks) don't set a request_id
if request_id:
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_bot_ready(self):
"""Send the bot-ready message to the client."""
@@ -1683,23 +1769,21 @@ class RTVIProcessor(FrameProcessor):
id=self._client_ready_id,
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config),
)
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse):
"""Send a message or response to the client."""
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_error_frame(self, frame: ErrorFrame):
"""Send an error frame as an RTVI error message."""
if self._errors_enabled:
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
await self._push_transport_message(message)
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
await self.push_transport_message(message)
async def _send_error_response(self, id: str, error: str):
"""Send an error response message."""
if self._errors_enabled:
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
await self._push_transport_message(message)
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
await self.push_transport_message(message)
def _action_id(self, service: str, action: str) -> str:
"""Generate an action ID from service and action names."""