Compare commits

...

8 Commits

Author SHA1 Message Date
mattie ruth backman
7742d1a83b Add error handling for unsupported files 2026-03-18 15:49:48 -04:00
mattie ruth backman
d9cebe602f Add new FileSourceType for 'id' and use that for local uploads, prefixed with 'pipecat:' 2026-03-18 15:49:48 -04:00
mattie ruth backman
96e06d2401 Update /files/ upload response to match RTVI format, rather than inventing a new one 2026-03-18 15:49:48 -04:00
mattie ruth backman
267c86e596 support RTVI files uploads larger than the transport can handle
This PR introduces:
1. a new /files/ POST endpoint in the local runner that supports
   uploading a file to a folder that must be provided at runtime
2. By default, the runner will allow a maximum 10 files to be
   saved
3. Added logic to the send-file handler in RTVI to read a file
   from disk if the file provide is a url starting with '/files/'
2026-03-18 15:49:48 -04:00
mattie ruth backman
9fb06c3e4b Update File upload RTVI messages and frames to use mime-type as the format 2026-03-18 15:49:48 -04:00
mattie ruth backman
71197fbc2c Support files provided via url 2026-03-18 15:49:48 -04:00
mattie ruth backman
9cd4e5faca Support generic files (openai so far) 2026-03-18 15:49:48 -04:00
mattie ruth backman
4f290be834 Initial commit: Introducing RTVI support for files
This commit introduces the types for all RTVI file messaging and full
support for sending images as byte strings
2026-03-18 15:49:48 -04:00
16 changed files with 477 additions and 17 deletions

View File

@@ -96,6 +96,8 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
item["source"]["data"] = "..."
if item["type"] == "thinking" and item.get("signature"):
item["signature"] = "..."
if item["type"] == "file":
item["file"]["file_data"] = "data:..."
messages_for_logging.append(msg)
return messages_for_logging

View File

@@ -91,6 +91,8 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
for item in msg["content"]:
if item.get("image"):
item["image"]["source"]["bytes"] = "..."
if item.get("type") == "file":
item["file"]["file_data"] = "data:..."
messages_for_logging.append(msg)
return messages_for_logging

View File

@@ -87,6 +87,9 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
item["audio"] = "..."
if item.get("type") == "audio":
item["audio"] = "..."
if item.get("type") == "file":
if item["file"]["file_data"].startswith("data:"):
item["file"]["file_data"] = "data:..."
msgs.append(msg)
return msgs

View File

@@ -105,6 +105,9 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
item["image_url"]["url"] = "data:image/..."
if item["type"] == "input_audio":
item["input_audio"]["data"] = "..."
if item["type"] == "file":
if item["file"]["file_data"].startswith("data:"):
item["file"]["file_data"] = "data:..."
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
msg["data"] = "..."
msgs.append(msg)

View File

@@ -90,6 +90,9 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
item["image_url"]["url"] = "data:image/..."
if item["type"] == "input_audio":
item["input_audio"]["data"] = "..."
if item["type"] == "file":
if item["file"]["file_data"].startswith("data:"):
item["file"]["file_data"] = "data:..."
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
msg["data"] = "..."
msgs.append(msg)

View File

@@ -234,7 +234,7 @@ class ImageRawFrame:
"""A frame containing a raw image.
Parameters:
image: Raw image bytes.
image: Raw image bytes or a base64-encoded string.
size: Image dimensions as (width, height) tuple.
format: Image format (e.g., 'RGB', 'RGBA').
"""
@@ -244,6 +244,26 @@ class ImageRawFrame:
format: Optional[str]
FileSourceType = Literal["bytes", "url", "id"]
@dataclass
class FileRawFrame:
"""A frame containing a raw file.
Parameters:
file: Raw file bytes.
type: Type of the file ('bytes', 'url', or 'id'),
name: Optional name of the file.
format: File format (expected in Mime Format).
"""
file: bytes | str
type: FileSourceType
name: Optional[str]
format: Optional[str]
#
# Data frames.
#
@@ -1584,6 +1604,18 @@ class InputImageRawFrame(SystemFrame, ImageRawFrame):
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
@dataclass
class InputFileRawFrame(SystemFrame, FileRawFrame):
"""Raw file input frame.
A file usually coming from RTVI.
"""
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, type: {self.type})"
@dataclass
class InputTextRawFrame(SystemFrame, TextFrame):
"""Raw text input frame from transport.
@@ -1638,6 +1670,28 @@ class UserImageRawFrame(InputImageRawFrame):
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, append_to_context: {self.append_to_context})"
@dataclass
class UserFileRawFrame(InputFileRawFrame):
"""Raw file input frame associated with a specific user.
A file associated to a user.
Parameters:
user_id: Identifier of the user who provided this file.
text: Text associated to this file.
append_to_context: Whether the requested file should be appended to the LLM context.
"""
user_id: str = ""
text: str = ""
append_to_context: Optional[bool] = None
custom_options: Optional[dict] = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, format: {self.format}, type: {self.type}, text: {self.text}, append_to_context: {self.append_to_context})"
@dataclass
class AssistantImageRawFrame(OutputImageRawFrame):
"""Frame containing an image generated by the assistant.

View File

@@ -19,8 +19,9 @@ import base64
import io
import wave
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union, get_args
import aiohttp
from loguru import logger
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
from openai._types import NotGiven as OpenAINotGiven
@@ -173,6 +174,11 @@ class LLMContext:
image: Raw image bytes.
text: Optional text to include with the image.
"""
# Format is a data URL: data:<mime type>;base64,<data> already provided
if format.startswith("url/"):
url = image
return LLMContext.create_image_url_message(role=role, url=url, text=text)
# Format is a mime type: image is already encoded
image_already_encoded = format.startswith("image/")
@@ -193,6 +199,34 @@ class LLMContext:
return LLMContext.create_image_url_message(role=role, url=url, text=text)
@staticmethod
async def create_file_message(
*,
role: str = "user",
file: bytes,
name: Optional[str] = None,
text: Optional[str] = None,
) -> LLMContextMessage:
"""Create a context message containing a file.
Args:
role: The role of this message (defaults to "user").
file: Raw file bytes.
name: Optional name of the file.
text: Optional text to include with the file.
"""
# Right now: assumes file is already encoded properly as a data URL:
# data:<mime type>;base64,<data>
# TODO: support not already encoded?
content = []
if text:
content.append({"type": "text", "text": text})
file = {"file_data": file, "filename": name if name else ""}
content.append({"type": "file", "file": file})
return {"role": role, "content": content}
@staticmethod
async def create_audio_message(
*, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows"
@@ -357,6 +391,45 @@ class LLMContext:
"""
self._tool_choice = tool_choice
async def add_file_frame_message(
self,
*,
type: str,
format: str,
file: bytes | str,
text: Optional[str] = None,
name: Optional[str] = None,
role: str = "user",
):
"""Add a message containing a file frame.
Args:
type: File type (e.g., 'bytes' or 'url').
format: File format (the MIME type like 'image/jpeg').
file: Raw file bytes or URL string.
text: Optional text to include with the file.
name: Optional name of the file.
role: The role of this message (defaults to "user").
"""
bytes = file
is_image = format.startswith("image/")
if not is_image and format != "application/pdf":
logger.warning(f"Unsupported file format for LLM context: {format}")
raise ValueError(f"Unsupported file format for LLM context: {format}")
if type == "url":
async with aiohttp.ClientSession() as session:
async with session.get(file) as response:
content = io.BytesIO(await response.content.read())
base64_string = base64.b64encode(content.getvalue()).decode("utf-8")
bytes = f"data:{format};base64,{base64_string}"
if is_image:
message = LLMContext.create_image_url_message(role=role, url=bytes, text=text)
self.add_message(message)
return
message = await LLMContext.create_file_message(role=role, file=bytes, name=name, text=text)
self.add_message(message)
async def add_image_frame_message(
self,
*,
@@ -393,6 +466,33 @@ class LLMContext:
message = await LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
self.add_message(message)
def maybe_remove_invalid_message(self, exception: Exception):
"""Remove messages from context if an exception indicates they were invalid.
This is a best-effort method to handle cases where an LLM service returns
an error indicating that a particular message in the context was invalid
(e.g., due to unsupported content). The method inspects the exception,
and if it can identify a specific message that caused the issue, it removes
that message from the context.
Args:
exception: The exception raised during LLM processing, which may contain
information about invalid messages.
"""
# Check for OpenAI-specific error indicating an invalid file message, and remove it if found.
if exception.type == "invalid_request_error" and exception.param == "file_id":
for i in range(len(self._messages) - 1, -1, -1):
message = self._messages[i]
content = message.get("content", [])
if isinstance(content, list) and any(
item.get("type") == "file" for item in content
):
logger.warning(
"Removing message with file content from context due to invalid response."
)
self._messages.pop(i)
break
@staticmethod
def _normalize_and_validate_tools(tools: ToolsSchema | NotGiven) -> ToolsSchema | NotGiven:
"""Normalize and validate the given tools.

View File

@@ -55,6 +55,7 @@ from pipecat.frames.frames import (
TextFrame,
TranscriptionFrame,
TranslationFrame,
UserFileRawFrame,
UserImageRawFrame,
UserMuteStartedFrame,
UserMuteStoppedFrame,
@@ -957,6 +958,8 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self._handle_function_call_cancel(frame)
elif isinstance(frame, UserImageRawFrame):
await self._handle_user_image_frame(frame)
elif isinstance(frame, UserFileRawFrame):
await self._handle_user_file_frame(frame)
elif isinstance(frame, AssistantImageRawFrame):
await self._handle_assistant_image_frame(frame)
else:
@@ -1135,6 +1138,22 @@ class LLMAssistantAggregator(LLMContextAggregator):
if image_appended:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_user_file_frame(self, frame: UserFileRawFrame):
if not frame.append_to_context:
return
logger.debug(f"{self} Appending UserFileRawFrame to LLM context (format: {frame.format})")
await self._context.add_file_frame_message(
type=frame.type,
format=frame.format,
text=frame.text,
file=frame.file,
# options=frame.custom_options,
)
await self.push_aggregation()
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_assistant_image_frame(self, frame: AssistantImageRawFrame):
logger.debug(f"{self} Appending AssistantImageRawFrame to LLM context (size: {frame.size})")

View File

@@ -233,6 +233,9 @@ class OpenAILLMContext:
if item["type"] == "image_url":
if item["image_url"]["url"].startswith("data:image/"):
item["image_url"]["url"] = "data:image/..."
if item["type"] == "file":
if item["file"]["file_data"].startswith("data:"):
item["file"]["file_data"] = "data:..."
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
msg["data"] = "..."
msgs.append(msg)

View File

@@ -26,10 +26,11 @@ from pydantic import BaseModel
from pipecat.frames.frames import (
AggregationType,
FileSourceType,
)
# -- Constants --
PROTOCOL_VERSION = "1.2.0"
PROTOCOL_VERSION = "1.3.0"
MESSAGE_LABEL = "rtvi-ai"
MessageLiteral = Literal["rtvi-ai"]
@@ -229,6 +230,66 @@ class SendTextData(BaseModel):
options: Optional[SendTextOptions] = None
class FileSource(BaseModel):
"""Base class for RTVI file sources."""
type: FileSourceType
class FileBytes(FileSource):
"""File source as base64-encoded bytes."""
type: FileSourceType = "bytes"
bytes: str # base64-encoded string
width: Optional[int] = None
height: Optional[int] = None
class FileUrl(FileSource):
"""File source as a URL."""
type: FileSourceType = "url"
url: str
public: bool = True
class FileId(FileSource):
"""File source as a file ID."""
type: FileSourceType = "id"
id: str
class File(BaseModel):
"""File data structure for RTVI file sending."""
format: str # Mime format of the file, e.g., 'application/pdf'
name: Optional[str] = None
source: FileBytes | FileUrl | FileId
class SendFileOptions(BaseModel):
"""Options for sending text input to the LLM.
Contains options for how the pipeline should process the text input.
"""
run_immediately: bool = True
audio_response: bool = True
custom_options: Optional[dict] = None # ex. 'detail' in openAI or 'citations' in Bedrock
class SendFileData(BaseModel):
"""Data format for sending a file to the LLM.
Contains the information of the file to send and any options for how the pipeline should process it.
"""
content: str # Text to accompany the file
file: File
options: Optional[SendFileOptions] = None
class AppendToContextData(BaseModel):
"""Data format for appending messages to the context.

View File

@@ -8,8 +8,11 @@
import asyncio
import base64
import io
import os
from typing import Any, Dict, Mapping, Optional
import aiohttp
from loguru import logger
from pydantic import BaseModel, ValidationError
@@ -29,6 +32,8 @@ from pipecat.frames.frames import (
OutputTransportMessageUrgentFrame,
StartFrame,
SystemFrame,
UserFileRawFrame,
UserImageRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi.frames import RTVIActionFrame, RTVIClientMessageFrame
@@ -70,6 +75,7 @@ class RTVIProcessor(FrameProcessor):
*,
config: Optional[RTVIConfig] = None,
transport: Optional[BaseTransport] = None,
uploads_folder: Optional[str] = None,
**kwargs,
):
"""Initialize the RTVI processor.
@@ -77,10 +83,14 @@ class RTVIProcessor(FrameProcessor):
Args:
config: Initial RTVI configuration.
transport: Transport layer for communication.
uploads_folder: Path to folder where client uploads (e.g. POST /files) are
stored; required for send-file with /files/ URLs. Use
runner_uploads_folder() when using the development runner.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._config = config or RTVIConfig(config=[])
self._folder = uploads_folder or ""
self._bot_ready = False
self._client_ready = False
@@ -383,6 +393,9 @@ class RTVIProcessor(FrameProcessor):
case "send-text":
data = RTVI.SendTextData.model_validate(message.data)
await self._handle_send_text(data)
case "send-file":
data = RTVI.SendFileData.model_validate(message.data)
await self._handle_send_file(data)
case "append-to-context":
logger.warning(
f"The append-to-context message is deprecated, use send-text instead."
@@ -393,6 +406,7 @@ class RTVIProcessor(FrameProcessor):
await self._handle_audio_buffer(message.data)
case _:
logger.warning(f"Unsupported RTVI message type: {message.type}")
await self._send_error_response(message.id, f"Unsupported type {message.type}")
except ValidationError as e:
@@ -555,6 +569,91 @@ class RTVIProcessor(FrameProcessor):
output_frame = LLMConfigureOutputFrame(skip_tts=cur_llm_skip_tts)
await self.push_frame(output_frame)
async def _handle_send_file(self, data: RTVI.SendFileData):
"""Handle a send-file message from the client."""
file = data.file
source = None
type = file.source.type
opts = data.options if data.options is not None else RTVI.SendFileOptions()
match type:
case "bytes":
source = file.source.bytes
case "url":
if not file.source.public:
# read bytes from URL and encode to base64
type = "bytes"
async with aiohttp.ClientSession() as session:
async with session.get(file.source.url) as response:
content = io.BytesIO(await response.content.read())
base64_string = base64.b64encode(content.getvalue()).decode("utf-8")
source = f"data:{file.format};base64,{base64_string}"
else:
source = file.source.url
case "id":
if not file.source.id.startswith("pipecat:"):
logger.warning(f"Unsupported file ID: {file.source.id}")
self.send_error_response(data.id, f"Unsupported file ID: {file.source.id}")
return
if not self._folder:
logger.warning(
"Send-file with a pipecat id requires uploads_folder on RTVIProcessor "
"(e.g. uploads_folder=runner_uploads_folder())."
)
self.send_error_response(data.id, "Uploads folder not set")
return
# read bytes from file system, encode to base64, then delete the file
type = "bytes"
file_path = os.path.join(self._folder, file.source.id.removeprefix("pipecat:"))
with open(file_path, "rb") as f:
raw_bytes = f.read()
encoded_file = base64.b64encode(raw_bytes).decode("utf-8")
source = f"data:{file.format};base64,{encoded_file}"
try:
os.remove(file_path)
except OSError as e:
logger.warning(f"Failed to remove uploaded file {file_path}: {e}")
case _:
logger.warning(f"Unsupported file source type: {type}")
return
if type == "bytes" and file.format.startswith("image/"):
# Only access width/height if the original source is RTVIFileBytes (not RTVIFileUrl)
if file.source.type == "bytes":
size = [file.source.width or 0, file.source.height or 0]
else:
size = [0, 0]
file_frame = UserImageRawFrame(
text=data.content,
image=source,
size=size,
format=f"url/{file.format}",
append_to_context=True,
)
else:
file_frame = UserFileRawFrame(
text=data.content,
file=source,
type=type,
format=file.format,
custom_options=opts.custom_options,
append_to_context=True,
)
if opts.run_immediately:
await self.interrupt_bot()
cur_llm_skip_tts = self._llm_skip_tts
should_skip_tts = not opts.audio_response
toggle_skip_tts = cur_llm_skip_tts != should_skip_tts
if toggle_skip_tts:
output_frame = LLMConfigureOutputFrame(skip_tts=should_skip_tts)
await self.push_frame(output_frame)
await self.push_frame(file_frame)
if toggle_skip_tts:
output_frame = LLMConfigureOutputFrame(skip_tts=cur_llm_skip_tts)
await self.push_frame(output_frame)
async def _handle_update_context(self, data: RTVI.AppendToContextData):
if data.run_immediately:
await self.interrupt_bot()

View File

@@ -90,7 +90,16 @@ from pipecat.runner.types import (
try:
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request, WebSocket
from fastapi import (
BackgroundTasks,
FastAPI,
File,
Header,
HTTPException,
Request,
UploadFile,
WebSocket,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
except ImportError as e:
@@ -107,6 +116,7 @@ os.environ["ENV"] = "local"
TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"]
RUNNER_DOWNLOADS_FOLDER: Optional[str] = None
RUNNER_UPLOADS_FOLDER: Optional[str] = None
RUNNER_HOST: str = "localhost"
RUNNER_PORT: int = 7860
@@ -164,6 +174,25 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace):
await bot_module.bot(runner_args)
def _trim_uploads_folder(folder: Path, max_files: int):
"""Keep only the most recent max_files files in folder; delete oldest by mtime."""
if max_files <= 0:
return
try:
files = [p for p in folder.iterdir() if p.is_file()]
if len(files) <= max_files:
return
by_mtime = sorted(files, key=lambda p: p.stat().st_mtime)
for p in by_mtime[: len(files) - max_files]:
try:
p.unlink()
logger.debug(f"Trimmed upload {p.name} from {folder}")
except OSError as e:
logger.warning(f"Failed to trim upload {p}: {e}")
except OSError as e:
logger.warning(f"Failed to list uploads folder {folder}: {e}")
def _create_server_app(args: argparse.Namespace):
"""Create FastAPI app with transport-specific routes."""
app = FastAPI()
@@ -188,6 +217,9 @@ def _create_server_app(args: argparse.Namespace):
else:
logger.warning(f"Unknown transport type: {args.transport}")
if args.uploads_folder:
_setup_file_uploads_route(app, args)
return app
@@ -230,18 +262,30 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
@app.get("/files/{filename:path}")
async def download_file(filename: str):
"""Handle file downloads."""
if not args.folder:
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
return
file_path = Path(args.folder) / filename
if not os.path.exists(file_path):
"""Serve a file from uploads or downloads folder (uploads checked first)."""
# Sanitize: no path traversal
safe_name = Path(filename).name
if not safe_name or safe_name != filename:
raise HTTPException(404)
file_path = None
if args.uploads_folder:
candidate = Path(args.uploads_folder) / safe_name
if candidate.is_file():
file_path = candidate
if file_path is None and args.downloads_folder:
candidate = Path(args.downloads_folder) / safe_name
if candidate.is_file():
file_path = candidate
if file_path is None:
if not args.uploads_folder and not args.downloads_folder:
logger.warning(
f"Attempting to get {filename}, but no uploads or downloads folder set."
)
raise HTTPException(404)
media_type, _ = mimetypes.guess_type(file_path)
return FileResponse(path=file_path, media_type=media_type, filename=filename)
return FileResponse(path=file_path, media_type=media_type, filename=safe_name)
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(
@@ -576,6 +620,7 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
result = None
print(f"create_daily_room: {create_daily_room}, existing_room_url: {existing_room_url}")
# Configure room if:
# 1. Explicitly requested via createDailyRoom in payload
# 2. Using pre-configured room from DAILY_ROOM_URL env var
@@ -796,6 +841,39 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
return {"status": f"Bot started with {args.transport}"}
def _setup_file_uploads_route(app: FastAPI, args: argparse.Namespace):
@app.post("/files")
async def upload_file(file: UploadFile = File(...)):
"""Handle file uploads from clients. Requires --uploads-folder to be set."""
if not args.uploads_folder:
raise HTTPException(
503,
"File upload is disabled: start the runner with -u/--uploads-folder to set the uploads directory.",
)
folder = Path(args.uploads_folder)
folder.mkdir(parents=True, exist_ok=True)
# Always save as UUID so uploads are not discoverable by guessing filenames
safe_name = uuid.uuid4().hex
file_path = folder / safe_name
try:
contents = await file.read()
file_path.write_bytes(contents)
logger.debug(f"Uploaded file to {file_path}")
except OSError as e:
logger.error(f"Failed to save upload: {e}")
raise HTTPException(500, "Failed to save file") from e
_trim_uploads_folder(folder, args.uploads_folder_max_files)
# Use original filename only for format/mime in response
original_name = Path(file.filename or "").name
media_type, _ = mimetypes.guess_type(original_name, strict=False)
# Return the file as a URL that can be used in the client, matching the format of RTVIFile
return {
"name": original_name,
"source": {"type": "id", "id": f"pipecat:{safe_name}"},
"format": media_type,
}
async def _run_daily_direct(args: argparse.Namespace):
"""Run Daily bot with direct connection (no FastAPI server)."""
try:
@@ -850,6 +928,11 @@ def runner_downloads_folder() -> Optional[str]:
return RUNNER_DOWNLOADS_FOLDER
def runner_uploads_folder() -> Optional[str]:
"""Returns the folder where client uploads are stored (short-lived)."""
return RUNNER_UPLOADS_FOLDER
def runner_host() -> str:
"""Returns the host name of this runner."""
return RUNNER_HOST
@@ -875,7 +958,9 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
- -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
- -x/--proxy: Public proxy hostname for telephony webhooks
- -d/--direct: Connect directly to Daily room (automatically sets transport to daily)
- -f/--folder: Path to downloads folder
- -f/--downloads-folder: Path to folder for files available for download
- -u/--uploads-folder: Path to folder for client uploads (short-lived)
- --uploads-folder-max-files: Max files in uploads folder (default: 10)
- --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport)
- --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
- --whatsapp: Ensure requried WhatsApp environment variables are present
@@ -888,7 +973,7 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
ones. Custom args are accessible via `runner_args.cli_args`.
"""
global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
global RUNNER_DOWNLOADS_FOLDER, RUNNER_UPLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
if not parser:
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
@@ -910,7 +995,27 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
default=False,
help="Connect directly to Daily room (automatically sets transport to daily)",
)
parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder")
parser.add_argument(
"-f",
"--downloads-folder",
type=str,
dest="downloads_folder",
help="Path to folder for files available for download",
)
parser.add_argument(
"-u",
"--uploads-folder",
type=str,
dest="uploads_folder",
help="Path to folder for client uploads (short-lived; max files enforced)",
)
parser.add_argument(
"--uploads-folder-max-files",
type=int,
default=10,
dest="uploads_folder_max_files",
help="Max files to keep in uploads folder; oldest removed (default: 10)",
)
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
)
@@ -993,7 +1098,8 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
print(f" → Open http://{args.host}:{args.port} in your browser to start a session")
print()
RUNNER_DOWNLOADS_FOLDER = args.folder
RUNNER_DOWNLOADS_FOLDER = args.downloads_folder
RUNNER_UPLOADS_FOLDER = args.uploads_folder
RUNNER_HOST = args.host
RUNNER_PORT = args.port

View File

@@ -1162,6 +1162,8 @@ class AnthropicLLMContext(OpenAILLMContext):
for item in msg["content"]:
if item["type"] == "image":
item["source"]["data"] = "..."
if item["type"] == "file":
item["file"]["file_data"] = "data:..."
msgs.append(msg)
return msgs

View File

@@ -607,6 +607,8 @@ class AWSBedrockLLMContext(OpenAILLMContext):
for item in msg["content"]:
if item.get("image"):
item["image"]["source"]["bytes"] = "..."
if item.get("type") == "file":
item["file"]["file_data"] = "data:..."
msgs.append(msg)
return msgs

View File

@@ -618,6 +618,7 @@ class BaseOpenAILLMService(LLMService):
await self.push_error(error_msg="LLM completion timeout", exception=e)
except Exception as e:
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
context.maybe_remove_invalid_message(e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -691,7 +691,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
Args:
message: The application message to process.
"""
logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}")
logger.trace(f"Received app message inside SmallWebRTCInputTransport {message}")
await self.broadcast_frame(InputTransportMessageFrame, message=message)
# Add this method similar to DailyInputTransport.request_participant_image