Compare commits
8 Commits
pk/decoupl
...
rtvi-send-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7742d1a83b | ||
|
|
d9cebe602f | ||
|
|
96e06d2401 | ||
|
|
267c86e596 | ||
|
|
9fb06c3e4b | ||
|
|
71197fbc2c | ||
|
|
9cd4e5faca | ||
|
|
4f290be834 |
@@ -96,6 +96,8 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
|
||||
item["source"]["data"] = "..."
|
||||
if item["type"] == "thinking" and item.get("signature"):
|
||||
item["signature"] = "..."
|
||||
if item["type"] == "file":
|
||||
item["file"]["file_data"] = "data:..."
|
||||
messages_for_logging.append(msg)
|
||||
return messages_for_logging
|
||||
|
||||
|
||||
@@ -91,6 +91,8 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
|
||||
for item in msg["content"]:
|
||||
if item.get("image"):
|
||||
item["image"]["source"]["bytes"] = "..."
|
||||
if item.get("type") == "file":
|
||||
item["file"]["file_data"] = "data:..."
|
||||
messages_for_logging.append(msg)
|
||||
return messages_for_logging
|
||||
|
||||
|
||||
@@ -87,6 +87,9 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
item["audio"] = "..."
|
||||
if item.get("type") == "audio":
|
||||
item["audio"] = "..."
|
||||
if item.get("type") == "file":
|
||||
if item["file"]["file_data"].startswith("data:"):
|
||||
item["file"]["file_data"] = "data:..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
|
||||
|
||||
@@ -105,6 +105,9 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
|
||||
item["image_url"]["url"] = "data:image/..."
|
||||
if item["type"] == "input_audio":
|
||||
item["input_audio"]["data"] = "..."
|
||||
if item["type"] == "file":
|
||||
if item["file"]["file_data"].startswith("data:"):
|
||||
item["file"]["file_data"] = "data:..."
|
||||
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
|
||||
msg["data"] = "..."
|
||||
msgs.append(msg)
|
||||
|
||||
@@ -90,6 +90,9 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
item["image_url"]["url"] = "data:image/..."
|
||||
if item["type"] == "input_audio":
|
||||
item["input_audio"]["data"] = "..."
|
||||
if item["type"] == "file":
|
||||
if item["file"]["file_data"].startswith("data:"):
|
||||
item["file"]["file_data"] = "data:..."
|
||||
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
|
||||
msg["data"] = "..."
|
||||
msgs.append(msg)
|
||||
|
||||
@@ -234,7 +234,7 @@ class ImageRawFrame:
|
||||
"""A frame containing a raw image.
|
||||
|
||||
Parameters:
|
||||
image: Raw image bytes.
|
||||
image: Raw image bytes or a base64-encoded string.
|
||||
size: Image dimensions as (width, height) tuple.
|
||||
format: Image format (e.g., 'RGB', 'RGBA').
|
||||
"""
|
||||
@@ -244,6 +244,26 @@ class ImageRawFrame:
|
||||
format: Optional[str]
|
||||
|
||||
|
||||
FileSourceType = Literal["bytes", "url", "id"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileRawFrame:
|
||||
"""A frame containing a raw file.
|
||||
|
||||
Parameters:
|
||||
file: Raw file bytes.
|
||||
type: Type of the file ('bytes', 'url', or 'id'),
|
||||
name: Optional name of the file.
|
||||
format: File format (expected in Mime Format).
|
||||
"""
|
||||
|
||||
file: bytes | str
|
||||
type: FileSourceType
|
||||
name: Optional[str]
|
||||
format: Optional[str]
|
||||
|
||||
|
||||
#
|
||||
# Data frames.
|
||||
#
|
||||
@@ -1584,6 +1604,18 @@ class InputImageRawFrame(SystemFrame, ImageRawFrame):
|
||||
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputFileRawFrame(SystemFrame, FileRawFrame):
|
||||
"""Raw file input frame.
|
||||
|
||||
A file usually coming from RTVI.
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, type: {self.type})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputTextRawFrame(SystemFrame, TextFrame):
|
||||
"""Raw text input frame from transport.
|
||||
@@ -1638,6 +1670,28 @@ class UserImageRawFrame(InputImageRawFrame):
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, append_to_context: {self.append_to_context})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserFileRawFrame(InputFileRawFrame):
|
||||
"""Raw file input frame associated with a specific user.
|
||||
|
||||
A file associated to a user.
|
||||
|
||||
Parameters:
|
||||
user_id: Identifier of the user who provided this file.
|
||||
text: Text associated to this file.
|
||||
append_to_context: Whether the requested file should be appended to the LLM context.
|
||||
"""
|
||||
|
||||
user_id: str = ""
|
||||
text: str = ""
|
||||
append_to_context: Optional[bool] = None
|
||||
custom_options: Optional[dict] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, format: {self.format}, type: {self.type}, text: {self.text}, append_to_context: {self.append_to_context})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AssistantImageRawFrame(OutputImageRawFrame):
|
||||
"""Frame containing an image generated by the assistant.
|
||||
|
||||
@@ -19,8 +19,9 @@ import base64
|
||||
import io
|
||||
import wave
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
|
||||
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union, get_args
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
|
||||
from openai._types import NotGiven as OpenAINotGiven
|
||||
@@ -173,6 +174,11 @@ class LLMContext:
|
||||
image: Raw image bytes.
|
||||
text: Optional text to include with the image.
|
||||
"""
|
||||
# Format is a data URL: data:<mime type>;base64,<data> already provided
|
||||
if format.startswith("url/"):
|
||||
url = image
|
||||
return LLMContext.create_image_url_message(role=role, url=url, text=text)
|
||||
|
||||
# Format is a mime type: image is already encoded
|
||||
image_already_encoded = format.startswith("image/")
|
||||
|
||||
@@ -193,6 +199,34 @@ class LLMContext:
|
||||
|
||||
return LLMContext.create_image_url_message(role=role, url=url, text=text)
|
||||
|
||||
@staticmethod
|
||||
async def create_file_message(
|
||||
*,
|
||||
role: str = "user",
|
||||
file: bytes,
|
||||
name: Optional[str] = None,
|
||||
text: Optional[str] = None,
|
||||
) -> LLMContextMessage:
|
||||
"""Create a context message containing a file.
|
||||
|
||||
Args:
|
||||
role: The role of this message (defaults to "user").
|
||||
file: Raw file bytes.
|
||||
name: Optional name of the file.
|
||||
text: Optional text to include with the file.
|
||||
"""
|
||||
# Right now: assumes file is already encoded properly as a data URL:
|
||||
# data:<mime type>;base64,<data>
|
||||
# TODO: support not already encoded?
|
||||
content = []
|
||||
if text:
|
||||
content.append({"type": "text", "text": text})
|
||||
|
||||
file = {"file_data": file, "filename": name if name else ""}
|
||||
content.append({"type": "file", "file": file})
|
||||
|
||||
return {"role": role, "content": content}
|
||||
|
||||
@staticmethod
|
||||
async def create_audio_message(
|
||||
*, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows"
|
||||
@@ -357,6 +391,45 @@ class LLMContext:
|
||||
"""
|
||||
self._tool_choice = tool_choice
|
||||
|
||||
async def add_file_frame_message(
|
||||
self,
|
||||
*,
|
||||
type: str,
|
||||
format: str,
|
||||
file: bytes | str,
|
||||
text: Optional[str] = None,
|
||||
name: Optional[str] = None,
|
||||
role: str = "user",
|
||||
):
|
||||
"""Add a message containing a file frame.
|
||||
|
||||
Args:
|
||||
type: File type (e.g., 'bytes' or 'url').
|
||||
format: File format (the MIME type like 'image/jpeg').
|
||||
file: Raw file bytes or URL string.
|
||||
text: Optional text to include with the file.
|
||||
name: Optional name of the file.
|
||||
role: The role of this message (defaults to "user").
|
||||
"""
|
||||
bytes = file
|
||||
is_image = format.startswith("image/")
|
||||
if not is_image and format != "application/pdf":
|
||||
logger.warning(f"Unsupported file format for LLM context: {format}")
|
||||
raise ValueError(f"Unsupported file format for LLM context: {format}")
|
||||
if type == "url":
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(file) as response:
|
||||
content = io.BytesIO(await response.content.read())
|
||||
base64_string = base64.b64encode(content.getvalue()).decode("utf-8")
|
||||
bytes = f"data:{format};base64,{base64_string}"
|
||||
if is_image:
|
||||
message = LLMContext.create_image_url_message(role=role, url=bytes, text=text)
|
||||
self.add_message(message)
|
||||
return
|
||||
|
||||
message = await LLMContext.create_file_message(role=role, file=bytes, name=name, text=text)
|
||||
self.add_message(message)
|
||||
|
||||
async def add_image_frame_message(
|
||||
self,
|
||||
*,
|
||||
@@ -393,6 +466,33 @@ class LLMContext:
|
||||
message = await LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
|
||||
self.add_message(message)
|
||||
|
||||
def maybe_remove_invalid_message(self, exception: Exception):
|
||||
"""Remove messages from context if an exception indicates they were invalid.
|
||||
|
||||
This is a best-effort method to handle cases where an LLM service returns
|
||||
an error indicating that a particular message in the context was invalid
|
||||
(e.g., due to unsupported content). The method inspects the exception,
|
||||
and if it can identify a specific message that caused the issue, it removes
|
||||
that message from the context.
|
||||
|
||||
Args:
|
||||
exception: The exception raised during LLM processing, which may contain
|
||||
information about invalid messages.
|
||||
"""
|
||||
# Check for OpenAI-specific error indicating an invalid file message, and remove it if found.
|
||||
if exception.type == "invalid_request_error" and exception.param == "file_id":
|
||||
for i in range(len(self._messages) - 1, -1, -1):
|
||||
message = self._messages[i]
|
||||
content = message.get("content", [])
|
||||
if isinstance(content, list) and any(
|
||||
item.get("type") == "file" for item in content
|
||||
):
|
||||
logger.warning(
|
||||
"Removing message with file content from context due to invalid response."
|
||||
)
|
||||
self._messages.pop(i)
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def _normalize_and_validate_tools(tools: ToolsSchema | NotGiven) -> ToolsSchema | NotGiven:
|
||||
"""Normalize and validate the given tools.
|
||||
|
||||
@@ -55,6 +55,7 @@ from pipecat.frames.frames import (
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TranslationFrame,
|
||||
UserFileRawFrame,
|
||||
UserImageRawFrame,
|
||||
UserMuteStartedFrame,
|
||||
UserMuteStoppedFrame,
|
||||
@@ -957,6 +958,8 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
await self._handle_function_call_cancel(frame)
|
||||
elif isinstance(frame, UserImageRawFrame):
|
||||
await self._handle_user_image_frame(frame)
|
||||
elif isinstance(frame, UserFileRawFrame):
|
||||
await self._handle_user_file_frame(frame)
|
||||
elif isinstance(frame, AssistantImageRawFrame):
|
||||
await self._handle_assistant_image_frame(frame)
|
||||
else:
|
||||
@@ -1135,6 +1138,22 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
if image_appended:
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_user_file_frame(self, frame: UserFileRawFrame):
|
||||
if not frame.append_to_context:
|
||||
return
|
||||
|
||||
logger.debug(f"{self} Appending UserFileRawFrame to LLM context (format: {frame.format})")
|
||||
await self._context.add_file_frame_message(
|
||||
type=frame.type,
|
||||
format=frame.format,
|
||||
text=frame.text,
|
||||
file=frame.file,
|
||||
# options=frame.custom_options,
|
||||
)
|
||||
|
||||
await self.push_aggregation()
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_assistant_image_frame(self, frame: AssistantImageRawFrame):
|
||||
logger.debug(f"{self} Appending AssistantImageRawFrame to LLM context (size: {frame.size})")
|
||||
|
||||
|
||||
@@ -233,6 +233,9 @@ class OpenAILLMContext:
|
||||
if item["type"] == "image_url":
|
||||
if item["image_url"]["url"].startswith("data:image/"):
|
||||
item["image_url"]["url"] = "data:image/..."
|
||||
if item["type"] == "file":
|
||||
if item["file"]["file_data"].startswith("data:"):
|
||||
item["file"]["file_data"] = "data:..."
|
||||
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
|
||||
msg["data"] = "..."
|
||||
msgs.append(msg)
|
||||
|
||||
@@ -26,10 +26,11 @@ from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AggregationType,
|
||||
FileSourceType,
|
||||
)
|
||||
|
||||
# -- Constants --
|
||||
PROTOCOL_VERSION = "1.2.0"
|
||||
PROTOCOL_VERSION = "1.3.0"
|
||||
|
||||
MESSAGE_LABEL = "rtvi-ai"
|
||||
MessageLiteral = Literal["rtvi-ai"]
|
||||
@@ -229,6 +230,66 @@ class SendTextData(BaseModel):
|
||||
options: Optional[SendTextOptions] = None
|
||||
|
||||
|
||||
class FileSource(BaseModel):
|
||||
"""Base class for RTVI file sources."""
|
||||
|
||||
type: FileSourceType
|
||||
|
||||
|
||||
class FileBytes(FileSource):
|
||||
"""File source as base64-encoded bytes."""
|
||||
|
||||
type: FileSourceType = "bytes"
|
||||
bytes: str # base64-encoded string
|
||||
width: Optional[int] = None
|
||||
height: Optional[int] = None
|
||||
|
||||
|
||||
class FileUrl(FileSource):
|
||||
"""File source as a URL."""
|
||||
|
||||
type: FileSourceType = "url"
|
||||
url: str
|
||||
public: bool = True
|
||||
|
||||
|
||||
class FileId(FileSource):
|
||||
"""File source as a file ID."""
|
||||
|
||||
type: FileSourceType = "id"
|
||||
id: str
|
||||
|
||||
|
||||
class File(BaseModel):
|
||||
"""File data structure for RTVI file sending."""
|
||||
|
||||
format: str # Mime format of the file, e.g., 'application/pdf'
|
||||
name: Optional[str] = None
|
||||
source: FileBytes | FileUrl | FileId
|
||||
|
||||
|
||||
class SendFileOptions(BaseModel):
|
||||
"""Options for sending text input to the LLM.
|
||||
|
||||
Contains options for how the pipeline should process the text input.
|
||||
"""
|
||||
|
||||
run_immediately: bool = True
|
||||
audio_response: bool = True
|
||||
custom_options: Optional[dict] = None # ex. 'detail' in openAI or 'citations' in Bedrock
|
||||
|
||||
|
||||
class SendFileData(BaseModel):
|
||||
"""Data format for sending a file to the LLM.
|
||||
|
||||
Contains the information of the file to send and any options for how the pipeline should process it.
|
||||
"""
|
||||
|
||||
content: str # Text to accompany the file
|
||||
file: File
|
||||
options: Optional[SendFileOptions] = None
|
||||
|
||||
|
||||
class AppendToContextData(BaseModel):
|
||||
"""Data format for appending messages to the context.
|
||||
|
||||
|
||||
@@ -8,8 +8,11 @@
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
from typing import Any, Dict, Mapping, Optional
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
@@ -29,6 +32,8 @@ from pipecat.frames.frames import (
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
UserFileRawFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi.frames import RTVIActionFrame, RTVIClientMessageFrame
|
||||
@@ -70,6 +75,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
*,
|
||||
config: Optional[RTVIConfig] = None,
|
||||
transport: Optional[BaseTransport] = None,
|
||||
uploads_folder: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the RTVI processor.
|
||||
@@ -77,10 +83,14 @@ class RTVIProcessor(FrameProcessor):
|
||||
Args:
|
||||
config: Initial RTVI configuration.
|
||||
transport: Transport layer for communication.
|
||||
uploads_folder: Path to folder where client uploads (e.g. POST /files) are
|
||||
stored; required for send-file with /files/ URLs. Use
|
||||
runner_uploads_folder() when using the development runner.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._config = config or RTVIConfig(config=[])
|
||||
self._folder = uploads_folder or ""
|
||||
|
||||
self._bot_ready = False
|
||||
self._client_ready = False
|
||||
@@ -383,6 +393,9 @@ class RTVIProcessor(FrameProcessor):
|
||||
case "send-text":
|
||||
data = RTVI.SendTextData.model_validate(message.data)
|
||||
await self._handle_send_text(data)
|
||||
case "send-file":
|
||||
data = RTVI.SendFileData.model_validate(message.data)
|
||||
await self._handle_send_file(data)
|
||||
case "append-to-context":
|
||||
logger.warning(
|
||||
f"The append-to-context message is deprecated, use send-text instead."
|
||||
@@ -393,6 +406,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self._handle_audio_buffer(message.data)
|
||||
|
||||
case _:
|
||||
logger.warning(f"Unsupported RTVI message type: {message.type}")
|
||||
await self._send_error_response(message.id, f"Unsupported type {message.type}")
|
||||
|
||||
except ValidationError as e:
|
||||
@@ -555,6 +569,91 @@ class RTVIProcessor(FrameProcessor):
|
||||
output_frame = LLMConfigureOutputFrame(skip_tts=cur_llm_skip_tts)
|
||||
await self.push_frame(output_frame)
|
||||
|
||||
async def _handle_send_file(self, data: RTVI.SendFileData):
|
||||
"""Handle a send-file message from the client."""
|
||||
file = data.file
|
||||
source = None
|
||||
type = file.source.type
|
||||
opts = data.options if data.options is not None else RTVI.SendFileOptions()
|
||||
|
||||
match type:
|
||||
case "bytes":
|
||||
source = file.source.bytes
|
||||
case "url":
|
||||
if not file.source.public:
|
||||
# read bytes from URL and encode to base64
|
||||
type = "bytes"
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(file.source.url) as response:
|
||||
content = io.BytesIO(await response.content.read())
|
||||
base64_string = base64.b64encode(content.getvalue()).decode("utf-8")
|
||||
source = f"data:{file.format};base64,{base64_string}"
|
||||
else:
|
||||
source = file.source.url
|
||||
case "id":
|
||||
if not file.source.id.startswith("pipecat:"):
|
||||
logger.warning(f"Unsupported file ID: {file.source.id}")
|
||||
self.send_error_response(data.id, f"Unsupported file ID: {file.source.id}")
|
||||
return
|
||||
if not self._folder:
|
||||
logger.warning(
|
||||
"Send-file with a pipecat id requires uploads_folder on RTVIProcessor "
|
||||
"(e.g. uploads_folder=runner_uploads_folder())."
|
||||
)
|
||||
self.send_error_response(data.id, "Uploads folder not set")
|
||||
return
|
||||
# read bytes from file system, encode to base64, then delete the file
|
||||
type = "bytes"
|
||||
file_path = os.path.join(self._folder, file.source.id.removeprefix("pipecat:"))
|
||||
with open(file_path, "rb") as f:
|
||||
raw_bytes = f.read()
|
||||
encoded_file = base64.b64encode(raw_bytes).decode("utf-8")
|
||||
source = f"data:{file.format};base64,{encoded_file}"
|
||||
try:
|
||||
os.remove(file_path)
|
||||
except OSError as e:
|
||||
logger.warning(f"Failed to remove uploaded file {file_path}: {e}")
|
||||
case _:
|
||||
logger.warning(f"Unsupported file source type: {type}")
|
||||
return
|
||||
|
||||
if type == "bytes" and file.format.startswith("image/"):
|
||||
# Only access width/height if the original source is RTVIFileBytes (not RTVIFileUrl)
|
||||
if file.source.type == "bytes":
|
||||
size = [file.source.width or 0, file.source.height or 0]
|
||||
else:
|
||||
size = [0, 0]
|
||||
file_frame = UserImageRawFrame(
|
||||
text=data.content,
|
||||
image=source,
|
||||
size=size,
|
||||
format=f"url/{file.format}",
|
||||
append_to_context=True,
|
||||
)
|
||||
else:
|
||||
file_frame = UserFileRawFrame(
|
||||
text=data.content,
|
||||
file=source,
|
||||
type=type,
|
||||
format=file.format,
|
||||
custom_options=opts.custom_options,
|
||||
append_to_context=True,
|
||||
)
|
||||
|
||||
if opts.run_immediately:
|
||||
await self.interrupt_bot()
|
||||
|
||||
cur_llm_skip_tts = self._llm_skip_tts
|
||||
should_skip_tts = not opts.audio_response
|
||||
toggle_skip_tts = cur_llm_skip_tts != should_skip_tts
|
||||
if toggle_skip_tts:
|
||||
output_frame = LLMConfigureOutputFrame(skip_tts=should_skip_tts)
|
||||
await self.push_frame(output_frame)
|
||||
await self.push_frame(file_frame)
|
||||
if toggle_skip_tts:
|
||||
output_frame = LLMConfigureOutputFrame(skip_tts=cur_llm_skip_tts)
|
||||
await self.push_frame(output_frame)
|
||||
|
||||
async def _handle_update_context(self, data: RTVI.AppendToContextData):
|
||||
if data.run_immediately:
|
||||
await self.interrupt_bot()
|
||||
|
||||
@@ -90,7 +90,16 @@ from pipecat.runner.types import (
|
||||
try:
|
||||
import uvicorn
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request, WebSocket
|
||||
from fastapi import (
|
||||
BackgroundTasks,
|
||||
FastAPI,
|
||||
File,
|
||||
Header,
|
||||
HTTPException,
|
||||
Request,
|
||||
UploadFile,
|
||||
WebSocket,
|
||||
)
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
except ImportError as e:
|
||||
@@ -107,6 +116,7 @@ os.environ["ENV"] = "local"
|
||||
TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"]
|
||||
|
||||
RUNNER_DOWNLOADS_FOLDER: Optional[str] = None
|
||||
RUNNER_UPLOADS_FOLDER: Optional[str] = None
|
||||
RUNNER_HOST: str = "localhost"
|
||||
RUNNER_PORT: int = 7860
|
||||
|
||||
@@ -164,6 +174,25 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace):
|
||||
await bot_module.bot(runner_args)
|
||||
|
||||
|
||||
def _trim_uploads_folder(folder: Path, max_files: int):
|
||||
"""Keep only the most recent max_files files in folder; delete oldest by mtime."""
|
||||
if max_files <= 0:
|
||||
return
|
||||
try:
|
||||
files = [p for p in folder.iterdir() if p.is_file()]
|
||||
if len(files) <= max_files:
|
||||
return
|
||||
by_mtime = sorted(files, key=lambda p: p.stat().st_mtime)
|
||||
for p in by_mtime[: len(files) - max_files]:
|
||||
try:
|
||||
p.unlink()
|
||||
logger.debug(f"Trimmed upload {p.name} from {folder}")
|
||||
except OSError as e:
|
||||
logger.warning(f"Failed to trim upload {p}: {e}")
|
||||
except OSError as e:
|
||||
logger.warning(f"Failed to list uploads folder {folder}: {e}")
|
||||
|
||||
|
||||
def _create_server_app(args: argparse.Namespace):
|
||||
"""Create FastAPI app with transport-specific routes."""
|
||||
app = FastAPI()
|
||||
@@ -188,6 +217,9 @@ def _create_server_app(args: argparse.Namespace):
|
||||
else:
|
||||
logger.warning(f"Unknown transport type: {args.transport}")
|
||||
|
||||
if args.uploads_folder:
|
||||
_setup_file_uploads_route(app, args)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@@ -230,18 +262,30 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
|
||||
|
||||
@app.get("/files/{filename:path}")
|
||||
async def download_file(filename: str):
|
||||
"""Handle file downloads."""
|
||||
if not args.folder:
|
||||
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
|
||||
return
|
||||
|
||||
file_path = Path(args.folder) / filename
|
||||
if not os.path.exists(file_path):
|
||||
"""Serve a file from uploads or downloads folder (uploads checked first)."""
|
||||
# Sanitize: no path traversal
|
||||
safe_name = Path(filename).name
|
||||
if not safe_name or safe_name != filename:
|
||||
raise HTTPException(404)
|
||||
file_path = None
|
||||
if args.uploads_folder:
|
||||
candidate = Path(args.uploads_folder) / safe_name
|
||||
if candidate.is_file():
|
||||
file_path = candidate
|
||||
if file_path is None and args.downloads_folder:
|
||||
candidate = Path(args.downloads_folder) / safe_name
|
||||
if candidate.is_file():
|
||||
file_path = candidate
|
||||
if file_path is None:
|
||||
if not args.uploads_folder and not args.downloads_folder:
|
||||
logger.warning(
|
||||
f"Attempting to get {filename}, but no uploads or downloads folder set."
|
||||
)
|
||||
raise HTTPException(404)
|
||||
|
||||
media_type, _ = mimetypes.guess_type(file_path)
|
||||
|
||||
return FileResponse(path=file_path, media_type=media_type, filename=filename)
|
||||
return FileResponse(path=file_path, media_type=media_type, filename=safe_name)
|
||||
|
||||
# Initialize the SmallWebRTC request handler
|
||||
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(
|
||||
@@ -576,6 +620,7 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
|
||||
|
||||
result = None
|
||||
|
||||
print(f"create_daily_room: {create_daily_room}, existing_room_url: {existing_room_url}")
|
||||
# Configure room if:
|
||||
# 1. Explicitly requested via createDailyRoom in payload
|
||||
# 2. Using pre-configured room from DAILY_ROOM_URL env var
|
||||
@@ -796,6 +841,39 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
|
||||
return {"status": f"Bot started with {args.transport}"}
|
||||
|
||||
|
||||
def _setup_file_uploads_route(app: FastAPI, args: argparse.Namespace):
|
||||
@app.post("/files")
|
||||
async def upload_file(file: UploadFile = File(...)):
|
||||
"""Handle file uploads from clients. Requires --uploads-folder to be set."""
|
||||
if not args.uploads_folder:
|
||||
raise HTTPException(
|
||||
503,
|
||||
"File upload is disabled: start the runner with -u/--uploads-folder to set the uploads directory.",
|
||||
)
|
||||
folder = Path(args.uploads_folder)
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
# Always save as UUID so uploads are not discoverable by guessing filenames
|
||||
safe_name = uuid.uuid4().hex
|
||||
file_path = folder / safe_name
|
||||
try:
|
||||
contents = await file.read()
|
||||
file_path.write_bytes(contents)
|
||||
logger.debug(f"Uploaded file to {file_path}")
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to save upload: {e}")
|
||||
raise HTTPException(500, "Failed to save file") from e
|
||||
_trim_uploads_folder(folder, args.uploads_folder_max_files)
|
||||
# Use original filename only for format/mime in response
|
||||
original_name = Path(file.filename or "").name
|
||||
media_type, _ = mimetypes.guess_type(original_name, strict=False)
|
||||
# Return the file as a URL that can be used in the client, matching the format of RTVIFile
|
||||
return {
|
||||
"name": original_name,
|
||||
"source": {"type": "id", "id": f"pipecat:{safe_name}"},
|
||||
"format": media_type,
|
||||
}
|
||||
|
||||
|
||||
async def _run_daily_direct(args: argparse.Namespace):
|
||||
"""Run Daily bot with direct connection (no FastAPI server)."""
|
||||
try:
|
||||
@@ -850,6 +928,11 @@ def runner_downloads_folder() -> Optional[str]:
|
||||
return RUNNER_DOWNLOADS_FOLDER
|
||||
|
||||
|
||||
def runner_uploads_folder() -> Optional[str]:
|
||||
"""Returns the folder where client uploads are stored (short-lived)."""
|
||||
return RUNNER_UPLOADS_FOLDER
|
||||
|
||||
|
||||
def runner_host() -> str:
|
||||
"""Returns the host name of this runner."""
|
||||
return RUNNER_HOST
|
||||
@@ -875,7 +958,9 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
|
||||
- -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
|
||||
- -x/--proxy: Public proxy hostname for telephony webhooks
|
||||
- -d/--direct: Connect directly to Daily room (automatically sets transport to daily)
|
||||
- -f/--folder: Path to downloads folder
|
||||
- -f/--downloads-folder: Path to folder for files available for download
|
||||
- -u/--uploads-folder: Path to folder for client uploads (short-lived)
|
||||
- --uploads-folder-max-files: Max files in uploads folder (default: 10)
|
||||
- --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport)
|
||||
- --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
|
||||
- --whatsapp: Ensure requried WhatsApp environment variables are present
|
||||
@@ -888,7 +973,7 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
|
||||
ones. Custom args are accessible via `runner_args.cli_args`.
|
||||
|
||||
"""
|
||||
global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
|
||||
global RUNNER_DOWNLOADS_FOLDER, RUNNER_UPLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
|
||||
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
|
||||
@@ -910,7 +995,27 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
|
||||
default=False,
|
||||
help="Connect directly to Daily room (automatically sets transport to daily)",
|
||||
)
|
||||
parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder")
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--downloads-folder",
|
||||
type=str,
|
||||
dest="downloads_folder",
|
||||
help="Path to folder for files available for download",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--uploads-folder",
|
||||
type=str,
|
||||
dest="uploads_folder",
|
||||
help="Path to folder for client uploads (short-lived; max files enforced)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--uploads-folder-max-files",
|
||||
type=int,
|
||||
default=10,
|
||||
dest="uploads_folder_max_files",
|
||||
help="Max files to keep in uploads folder; oldest removed (default: 10)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
|
||||
)
|
||||
@@ -993,7 +1098,8 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
|
||||
print(f" → Open http://{args.host}:{args.port} in your browser to start a session")
|
||||
print()
|
||||
|
||||
RUNNER_DOWNLOADS_FOLDER = args.folder
|
||||
RUNNER_DOWNLOADS_FOLDER = args.downloads_folder
|
||||
RUNNER_UPLOADS_FOLDER = args.uploads_folder
|
||||
RUNNER_HOST = args.host
|
||||
RUNNER_PORT = args.port
|
||||
|
||||
|
||||
@@ -1162,6 +1162,8 @@ class AnthropicLLMContext(OpenAILLMContext):
|
||||
for item in msg["content"]:
|
||||
if item["type"] == "image":
|
||||
item["source"]["data"] = "..."
|
||||
if item["type"] == "file":
|
||||
item["file"]["file_data"] = "data:..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
|
||||
|
||||
@@ -607,6 +607,8 @@ class AWSBedrockLLMContext(OpenAILLMContext):
|
||||
for item in msg["content"]:
|
||||
if item.get("image"):
|
||||
item["image"]["source"]["bytes"] = "..."
|
||||
if item.get("type") == "file":
|
||||
item["file"]["file_data"] = "data:..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
|
||||
|
||||
@@ -618,6 +618,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
await self.push_error(error_msg="LLM completion timeout", exception=e)
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
|
||||
context.maybe_remove_invalid_message(e)
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -691,7 +691,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
|
||||
Args:
|
||||
message: The application message to process.
|
||||
"""
|
||||
logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}")
|
||||
logger.trace(f"Received app message inside SmallWebRTCInputTransport {message}")
|
||||
await self.broadcast_frame(InputTransportMessageFrame, message=message)
|
||||
|
||||
# Add this method similar to DailyInputTransport.request_participant_image
|
||||
|
||||
Reference in New Issue
Block a user