Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
d175e5e5fc Hidden assistant demo 2025-07-07 11:58:03 +08:00
139 changed files with 4608 additions and 9912 deletions

View File

@@ -4,5 +4,5 @@ repos:
hooks:
- id: ruff
language_version: python3
args: [--fix]
args: [ --select, I, ]
- id: ruff-format

View File

@@ -9,186 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added a new field `handle_sigterm` to `PipelineRunner`. It defaults to `False`.
This field handles SIGTERM signals. The `handle_sigint` field still defaults
to `True`, but now it handles only SIGINT signals.
- Added foundational example `14u-function-calling-ollama.py` for Ollama
function calling.
- Added `LocalSmartTurnAnalyzerV2`, which supports local on-device inference
with the new `smart-turn-v2` turn detection model.
- Added `set_log_level` to `DailyTransport`, allowing setting the logging level
for Daily's internal logging system.
### Changed
- Play delayed messages from `ElevenLabsTTSService` if they still belong to the
current context.
- Dependency compatibility improvements: Relaxed version constraints for core
dependencies to support broader version ranges while maintaining stability:
- `aiohttp`, `Markdown`, `nltk`, `numpy`, `Pillow`, `pydantic`, `openai`,
`numba`: Now support up to the next major version (e.g. `numpy>=1.26.4,<3`)
- `pyht`: Relaxed to `>=0.1.6` to resolve `grpcio` conflicts with
`nvidia-riva-client`
- `fastapi`: Updated to support versions `>=0.115.6,<0.117.0`
- `torch`/`torchaudio`: Changed from exact pinning (`==2.5.0`) to compatible
range (`~=2.5.0`)
- `aws_sdk_bedrock_runtime`: Added Python 3.12+ constraint via environment
marker
- `numba`: Reduced minimum version to `0.60.0` for better compatibility
- Changed `NeuphonicHttpTTSService` to use a POST based request instead of the
`pyneuphonic` package. This removes a package requirement, allowing Neuphonic
to work with more services.
- Updated the `deepgram` optional dependency to 4.7.0, which downgrades the
`tasks cancelled error` to a debug log. This removes the log from appearing
in Pipecat logs upon leaving.
- Upgraded the `websockets` implementation to the new asyncio implementation.
Along with this change, we're updating support for versions >=13.1.0 and
<15.0.0. All services have been update to use the asyncio implementation.
- Updated `MiniMaxHttpTTSService` with a `base_url` arg where you can specify
the Global endpoint (default) or Mainland China.
- Replaced regex-based sentence detection in `match_endofsentence` with NLTK's
punkt_tab tokenizer for more reliable sentence boundary detection.
- Changed the `livekit` optional dependency for `tenacity` to
`tenacity>=8.2.3,<10.0.0` in order to support the `google-genai` package.
- For `LmntTTSService`, changed the default `model` to `blizzard`, LMNT's
recommended model.
### Fixed
- Fixed a dependency issue for uv users where an `llvmlite` version required python 3.9.
- Fixed an issue in `MiniMaxHttpTTSService` where the `pitch` param was the
incorrect type.
- Fixed an issue with OpenTelemetry tracing where the `enable_tracing` flag did
not disable the internal tracing decorator functions.
- Fixed an issue in `OLLamaLLMService` where kwargs were not passed correctly
to the parent class.
- Fixed an issue in `ElevenLabsTTSService` where the word/timestamp pairs were
calculating word boundaries incorrectly.
- Fixed an issue where, in some edge cases, the `EmulateUserStartedSpeakingFrame`
could be created even if we didn't have a transcription.
- Fixed an issue in `GoogleLLMContext` where it would inject the
`system_message` as a "user" message into cases where it was not meant to;
it was only meant to do that when there were no "regular" (non-function-call)
messages in the context, to ensure that inference would run properly.
- Fixed an issue in `LiveKitTransport` where the `on_audio_track_subscribed` was never emitted.
## [0.0.76] - 2025-07-11
### Added
- Added `SpeechControlParamsFrame`, a new `SystemFrame` that notifies
downstream processors of the VAD and Turn analyzer params. This frame is
pushed by the `BaseInputTransport` at Start and any time a
`VADParamsUpdateFrame` is received.
### Changed
- Two package dependencies have been updated:
- `numpy` now supports 1.26.0 and newer
- `transformers` now supports 4.48.0 and newer
### Fixed
- Fixed an issue with RTVI's handling of `append-to-context`.
- Fixed an issue where using audio input with a sample rate requiring resampling
could result in empty audio being passed to STT services, causing errors.
- Fixed the VAD analyzer to process the full audio buffer as long as it contains
more than the minimum required bytes per iteration, instead of only analyzing
the first chunk.
- Fixed an issue in ParallelPipeline that caused errors when attempting to drain
the queues.
- Fixed an issue with emulated VAD timeout inconsistency in
`LLMUserContextAggregator`. Previously, emulated VAD scenarios (where
transcription is received without VAD detection) used a hardcoded
`aggregation_timeout` (default 0.5s) instead of matching the VAD's
`stop_secs` parameter (default 0.8s). This created different user experiences
between real VAD and emulated VAD scenarios. Now, emulated VAD timeouts
automatically synchronize with the VAD's `stop_secs` parameter.
- Fix a pipeline freeze when using AWS Nova Sonic, which would occur if the
user started early, while the bot was still working through
`trigger_assistant_response()`.
## [0.0.75] - 2025-07-08
### Added
- Added an `aggregate_sentences` arg in `CartesiaTTSService`,
`ElevenLabsTTSService`, `NeuphonicTTSService` and `RimeTTSService`, where the
default value is True. When `aggregate_sentences` is True, the `TTSService`
aggregates the LLM streamed tokens into sentences by default. Note: setting
the value to False requires a custom processor before the `TTSService` to
aggregate LLM tokens.
- Added `kwargs` to the `OLLamaLLMService` to allow for configuration args to
be passed to Ollama.
- Added call hang-up error handling in `TwilioFrameSerializer`, which handles
the case where the user has hung up before the `TwilioFrameSerializer` hangs
up the call.
### Changed
- Updated `RTVIObserver` and `RTVIProcessor` to match the new RTVI 1.0.0 protocol.
This includes:
- Deprecating support for all messages related to service configuaration and
actions.
- Adding support for obtaining and logging data about client, including its
RTVI version and optionally included system information (OS/browser/etc.)
- Adding support for handling the new `client-message` RTVI message through
either a `on_client_message` event handler or listening for a new
`RTVIClientMessageFrame`
- Adding support for responding to a `client-message` with a `server-response`
via either a direct call on the `RTVIProcessor` or via pushing a new
`RTVIServerResponseFrame`
- Adding built-in support for handling the new `append-to-context` RTVI message
which allows a client to add to the user or assistant llm context. No extra
code is required for supporting this behavior.
- Updating all JavaScript and React client RTVI examples to use versions 1.0.0
of the clients.
Get started migrating to RTVI protocol 1.0.0 by following the migration guide:
https://docs.pipecat.ai/client/migration-guide
- Refactored `AWSBedrockLLMService` and `AWSPollyTTSService` to work
asynchronously using `aioboto3` instead of the `boto3` library.
- The `UserIdleProcessor` now handles the scenario where function calls take
longer than the idle timeout duration. This allows you to use the
`UserIdleProcessor` in conjunction with function calls that take a while to
return a result.
### Fixed
- Updated the `NeuphonicTTSService` to work with the updated websocket API.
- Fixed an issue with `RivaSTTService` where the watchdog feature was causing
an error on initialization.
### Performance
- Remove unncessary push task in each `FrameProcessor`.

View File

@@ -53,7 +53,7 @@ You can connect to Pipecat from any platform using our official SDKs:
| Category | Services |
| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |

View File

@@ -11,10 +11,3 @@ ruff~=0.12.1
setuptools~=78.1.1
setuptools_scm~=8.3.1
python-dotenv~=1.1.1
# For running examples
uvicorn
python-dotenv
fastapi
aiohttp
aiortc

View File

@@ -77,7 +77,6 @@ autodoc_mock_imports = [
"openpipe",
"simli",
"soundfile",
"soniox",
"pipecat_ai_krisp",
"pyaudio",
"_tkinter",

View File

@@ -46,7 +46,6 @@ pipecat-ai[sambanova]
pipecat-ai[silero]
pipecat-ai[simli]
pipecat-ai[soundfile]
pipecat-ai[soniox]
pipecat-ai[speechmatics]
pipecat-ai[tavus]
pipecat-ai[together]

View File

@@ -109,9 +109,6 @@ MINIMAX_GROUP_ID=...
# Sarvam AI
SARVAM_API_KEY=...
# Soniox
SONIOX_API_KEY=
# Speechmatics
SPEECHMATICS_API_KEY=...

View File

@@ -1,60 +0,0 @@
# AWS Strands Examples
This folder contains two Python examples demonstrating how to use Pipecat with the AWS Strands agent.
## Overview
These examples show how to delegate complex, multi-step tasks to a Strands agent, which can reason step-by-step and call tools to accomplish user requests.
These examples are intentionally simplified for demonstration, using mock API calls. They work best if you ask it:
> What's the weather where the Golden Gate Bridge is?
## Example Scripts
### `black-box.py`
A minimal example that demonstrates how to use the Strands agent with Pipecat. The agent can handle multi-step queries by calling tools, but does not explain its reasoning out loud.
### `explain-thinking.py`
An enhanced example where the Strands agent explains each step of its reasoning in clear, simple language as it works through a multi-step task.
## Quick Start
1. **Clone the repository and navigate to this example:**
```bash
git clone https://github.com/pipecat-ai/pipecat.git
cd pipecat/examples/aws-strands
```
2. **Set up a virtual environment:**
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. **Install dependencies:**
```bash
pip install -r requirements.txt
```
4. **Configure environment variables:**
Copy the provided `env.example` file to `.env` and fill in the necessary credentials:
```bash
cp env.example .env
# Then edit .env with your preferred editor
```
5. **Run an example:**
```bash
python black-box.py
# or
python explain-thinking.py
```

View File

@@ -1,206 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from strands import Agent, tool
from strands.models import BedrockModel
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
"""This example demonstrates how to use the Strands agent with Pipecat.
You can delegate complex, multi-step tasks to the Strands agent, which can cycle through LLM-based reasoning and tool calls to accomplish the task.
Try asking: "What's the weather where the Golden Gate Bridge is?"
"""
# Strands agent tools
@tool
def get_location_name_from_landmark(landmark: str) -> str:
"""
Get the location name from a landmark.
Args:
landmark (str): The name of the landmark, e.g. "Golden Gate Bridge".
"""
# Simulate fetching location
return "San Francisco, CA"
@tool
def get_lat_long_from_location_name(location: str) -> dict:
"""
Get the latitude and longitude for a location name.
Args:
location (str): The city and state, e.g. "San Francisco, CA".
"""
# Simulate fetching lat/long from a geocoding service
return {"lat": 37.7749, "long": -122.4194}
@tool
def get_current_weather_from_lat_long(lat: float, long: float) -> dict:
"""
Get the current weather for a specific latitude and longitude.
Args:
lat (float): The latitude of the location.
long (float): The longitude of the location.
"""
# Simulate fetching weather data from a weather service
return {"conditions": "nice", "temperature": "75"}
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
strands_agent = Agent(
model=BedrockModel(
model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0", max_tokens=64000
),
tools=[
get_location_name_from_landmark,
get_lat_long_from_location_name,
get_current_weather_from_lat_long,
],
system_prompt="""
You are a helpful personal assistant who can look up information about places and weather.
Your key capabilities:
1. Look up where landmarks are located.
2. Find latitude and longitude for a location.
3. Look up the current weather for a specific latitude and longitude.
Explain each step of your reasoning in clear, simple, and concise language. Your responses will be converted to audio, so avoid special characters and numbered lists.
""",
)
async def handle_location_or_weather_related_queries(params: FunctionCallParams, query: str):
"""
Handle location or weather related queries.
Args:
query (str): The user's query, e.g. "What's the weather where the Golden Gate Bridge is?".
"""
# Run in a background thread
# (Otherwise the agent blocks the event loop; one effect of that is that we don't hear
# "let me check on that" until the agent finishes)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, strands_agent, query)
await params.result_callback(result.message)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm.register_direct_function(handle_location_or_weather_related_queries)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
tools = ToolsSchema(standard_tools=[handle_location_or_weather_related_queries])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. Start by suggesting that the user ask about the weather where the Golden Gate Bridge is.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -1,8 +0,0 @@
OPENAI_API_KEY=
CARTESIA_API_KEY=
DEEPGRAM_API_KEY=
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=
AWS_REGION=

View File

@@ -1,249 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import threading
import time
from dotenv import load_dotenv
from loguru import logger
from strands import Agent, tool
from strands.models import BedrockModel
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
"""This example demonstrates how to use the Strands agent with Pipecat in a way where the agent explains its reasoning step-by-step.
You can delegate complex, multi-step tasks to the Strands agent, which can cycle through LLM-based reasoning and tool calls to accomplish the task.
Try asking: "What's the weather where the Golden Gate Bridge is?"
"""
# Strands agent tools
@tool
def get_location_name_from_landmark(landmark: str) -> str:
"""
Get the location name from a landmark.
Args:
landmark (str): The name of the landmark, e.g. "Golden Gate Bridge".
"""
# Simulate fetching location (slowly)
time.sleep(3)
return "San Francisco, CA"
@tool
def get_lat_long_from_location_name(location: str) -> dict:
"""
Get the latitude and longitude for a location name.
Args:
location (str): The city and state, e.g. "San Francisco, CA".
"""
# Simulate fetching lat/long from a geocoding service (slowly)
time.sleep(3)
return {"lat": 37.7749, "long": -122.4194}
@tool
def get_current_weather_from_lat_long(lat: float, long: float) -> dict:
"""
Get the current weather for a specific latitude and longitude.
Args:
lat (float): The latitude of the location.
long (float): The longitude of the location.
"""
# Simulate fetching weather data from a weather service (slowly)
time.sleep(3)
return {"conditions": "nice", "temperature": "75"}
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
next_strands_message_is_last = False
strands_messages_queue = asyncio.Queue()
def strands_callback_handler(**kwargs):
"""
Handle events from the Strands agent.
"""
nonlocal next_strands_message_is_last
if "event" in kwargs:
event_obj = kwargs["event"]
if event_obj and "messageStop" in event_obj:
message_stop = event_obj["messageStop"]
if message_stop and "stopReason" in message_stop:
stop_reason = message_stop["stopReason"]
if stop_reason == "end_turn":
next_strands_message_is_last = True
elif "message" in kwargs:
message_obj = kwargs["message"]
if message_obj and "content" in message_obj and "role" in message_obj:
role = message_obj["role"]
content = message_obj["content"]
if role == "assistant" and isinstance(content, list):
for content_obj in content:
if isinstance(content_obj, dict) and "text" in content_obj:
message = content_obj["text"]
if not next_strands_message_is_last:
strands_messages_queue.put_nowait(message)
async def process_strands_messages():
while True:
message = await strands_messages_queue.get()
await tts.queue_frame(TTSSpeakFrame(message))
strands_messages_queue.task_done()
asyncio.create_task(process_strands_messages())
strands_agent = Agent(
model=BedrockModel(
model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0", max_tokens=64000
),
tools=[
get_location_name_from_landmark,
get_lat_long_from_location_name,
get_current_weather_from_lat_long,
],
system_prompt="""
You are a helpful personal assistant who can look up information about places and weather.
Your key capabilities:
1. Look up where landmarks are located.
2. Find latitude and longitude for a location.
3. Look up the current weather for a specific latitude and longitude.
Explain each step of your reasoning in clear, simple, and concise language. Your responses will be converted to audio, so avoid special characters and numbered lists.
""",
callback_handler=strands_callback_handler,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
async def handle_location_or_weather_related_queries(params: FunctionCallParams, query: str):
"""
Handle location or weather related queries.
Args:
query (str): The user's query, e.g. "What's the weather where the Golden Gate Bridge is?".
"""
# Run in a background thread
# (Otherwise the agent blocks the event loop; one effect of that is that we don't hear
# the agent's "thinking" messages until the agent finishes)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, strands_agent, query)
await params.result_callback(result.message)
llm.register_direct_function(handle_location_or_weather_related_queries)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
tools = ToolsSchema(standard_tools=[handle_location_or_weather_related_queries])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. Start by suggesting that the user ask about the weather where the Golden Gate Bridge is.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,7 @@
"vite": "^6.3.5"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0"
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.10"
}
}

View File

@@ -5,7 +5,7 @@
*/
/**
* Pipecat Client Implementation
* RTVI Client Implementation
*
* This client connects to an RTVI-compatible bot server using WebRTC (via Daily).
* It handles audio/video streaming and manages the connection lifecycle.
@@ -16,7 +16,7 @@
* - Browser with WebRTC support
*/
import { PipecatClient, RTVIEvent } from '@pipecat-ai/client-js';
import { RTVIClient, RTVIEvent } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
/**
@@ -26,7 +26,7 @@ import { DailyTransport } from '@pipecat-ai/daily-transport';
class ChatbotClient {
constructor() {
// Initialize client state
this.pcClient = null;
this.rtviClient = null;
this.setupDOMElements();
this.initializeClientAndTransport();
this.setupEventListeners();
@@ -59,7 +59,7 @@ class ChatbotClient {
this.disconnectBtn.addEventListener('click', () => this.disconnect());
// Populate device selector
this.pcClient.getAllMics().then((mics) => {
this.rtviClient.getAllMics().then((mics) => {
console.log('Available mics:', mics);
mics.forEach((device) => {
const option = document.createElement('option');
@@ -71,16 +71,16 @@ class ChatbotClient {
this.deviceSelector.addEventListener('change', (event) => {
const selectedDeviceId = event.target.value;
console.log('Selected device ID:', selectedDeviceId);
this.pcClient.updateMic(selectedDeviceId);
this.rtviClient.updateMic(selectedDeviceId);
});
// Handle mic mute/unmute toggle
const micToggleBtn = document.getElementById('mic-toggle-btn');
micToggleBtn.addEventListener('click', () => {
let micEnabled = this.pcClient.isMicEnabled;
let micEnabled = this.rtviClient.isMicEnabled;
micToggleBtn.textContent = micEnabled ? 'Unmute Mic' : 'Mute Mic';
this.pcClient.enableMic(!micEnabled);
this.rtviClient.enableMic(!micEnabled);
// Add logic to mute/unmute the mic
if (micEnabled) {
console.log('Mic muted');
@@ -93,12 +93,23 @@ class ChatbotClient {
}
/**
* Set up the Pipecat client and Daily transport
* Set up the RTVI client and Daily transport
*/
async initializeClientAndTransport() {
// Initialize the Pipecat client with a DailyTransport and our configuration
this.pcClient = new PipecatClient({
// Initialize the RTVI client with a DailyTransport and our configuration
this.rtviClient = new RTVIClient({
transport: new DailyTransport(),
params: {
// REPLACE WITH YOUR MODAL URL ENDPOINT
baseUrl:
'https://<Modal workspace>--pipecat-modal-bot-launcher.modal.run',
endpoints: {
connect: '/connect',
},
requestData: {
bot_name: 'openai',
},
},
enableMic: true, // Enable microphone for user input
enableCam: false,
callbacks: {
@@ -165,8 +176,8 @@ class ChatbotClient {
// Set up listeners for media track events
this.setupTrackListeners();
await this.pcClient.initDevices();
window.client = this.pcClient;
await this.rtviClient.initDevices();
window.client = this.rtviClient;
}
/**
@@ -201,10 +212,10 @@ class ChatbotClient {
* This is called when the bot is ready or when the transport state changes to ready
*/
setupMediaTracks() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Get current tracks from the client
const tracks = this.pcClient.tracks();
const tracks = this.rtviClient.tracks();
// Set up any available bot tracks
if (tracks.bot?.audio) {
@@ -220,10 +231,10 @@ class ChatbotClient {
* This handles new tracks being added during the session
*/
setupTrackListeners() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Listen for new tracks starting
this.pcClient.on(RTVIEvent.TrackStarted, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
// Only handle non-local (bot) tracks
if (!participant?.local) {
if (track.kind === 'audio') {
@@ -242,7 +253,7 @@ class ChatbotClient {
});
// Listen for tracks stopping
this.pcClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
if (participant.local) {
this.log('Local mic muted');
return;
@@ -300,27 +311,21 @@ class ChatbotClient {
/**
* Initialize and connect to the bot
* This sets up the Pipecat client, initializes devices, and establishes the connection
* This sets up the RTVI client, initializes devices, and establishes the connection
*/
async connect() {
try {
const botSelector = document.getElementById('bot-selector');
const selectedBot = botSelector.value;
this.rtviClient.params.requestData.bot_name = selectedBot;
// Initialize audio/video devices
this.log('Initializing devices...');
await this.pcClient.initDevices();
await this.rtviClient.initDevices();
// Connect to the bot
this.log(`Connecting to bot: ${selectedBot}`);
await this.pcClient.connect({
// REPLACE WITH YOUR MODAL URL ENDPOINT
endpoint:
'https://<your-workspace>--pipecat-modal-fastapi-app.modal.run/connect',
requestData: {
bot_name: selectedBot,
},
});
await this.rtviClient.connect();
this.log('Connection complete');
} catch (error) {
@@ -331,9 +336,9 @@ class ChatbotClient {
this.updateStatus('Error');
// Clean up if there's an error
if (this.pcClient) {
if (this.rtviClient) {
try {
await this.pcClient.disconnect();
await this.rtviClient.disconnect();
} catch (disconnectError) {
this.log(`Error during disconnect: ${disconnectError.message}`);
}
@@ -345,10 +350,10 @@ class ChatbotClient {
* Disconnect from the bot and clean up media resources
*/
async disconnect() {
if (this.pcClient) {
if (this.rtviClient) {
try {
// Disconnect the Pipecat client
await this.pcClient.disconnect();
// Disconnect the RTVI client
await this.rtviClient.disconnect();
// Clean up audio
if (this.botAudio.srcObject) {

View File

@@ -301,7 +301,7 @@ def fastapi_app():
allow_headers=["*"],
)
# Include the endpoints from this file
# Include the endpoints from endpoints.py
web_app.include_router(router)
return web_app

View File

@@ -1,3 +1,2 @@
python-dotenv==1.0.1
modal==1.0.5
fastapi[all]
modal==0.71.3

View File

@@ -8,7 +8,7 @@
"name": "my-daily-app",
"version": "0.1.0",
"dependencies": {
"axios": "^1.11.0",
"axios": "^1.6.0",
"next": "^14.0.0",
"pino": "^8.15.0",
"react": "^18.2.0",
@@ -1165,13 +1165,13 @@
}
},
"node_modules/axios": {
"version": "1.11.0",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.11.0.tgz",
"integrity": "sha512-1Lx3WLFQWm3ooKDYZD1eXmoGO9fxYQjrycfHFC8P0sCfQVXyROp0p9PFWBehewBOdCwHc+f/b8I0fMto5eSfwA==",
"version": "1.8.4",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.8.4.tgz",
"integrity": "sha512-eBSYY4Y68NNlHbHBMdeDmKNtDgXWhQsJcGqzO3iLUM0GraQFSS9cVgPX5I9b3lbdFKyYoAEGAZF1DwhTaljNAw==",
"license": "MIT",
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.4",
"form-data": "^4.0.0",
"proxy-from-env": "^1.1.0"
}
},
@@ -2436,15 +2436,14 @@
}
},
"node_modules/form-data": {
"version": "4.0.4",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.4.tgz",
"integrity": "sha512-KrGhL9Q4zjj0kiUt5OO4Mr/A/jlI2jDYs5eHBpYHPcBEVSiipAvn2Ko2HnPe20rmcuuvMHNdZFp+4IlGTMF0Ow==",
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.2.tgz",
"integrity": "sha512-hGfm/slu0ZabnNt4oaRZ6uREyfCj6P4fT/n6A1rGV+Z0VdGXjfOhVUpkn6qVQONHGIFwmveGXyDs75+nr6FM8w==",
"license": "MIT",
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"es-set-tostringtag": "^2.1.0",
"hasown": "^2.0.2",
"mime-types": "^2.1.12"
},
"engines": {

View File

@@ -9,7 +9,7 @@
"lint": "next lint"
},
"dependencies": {
"axios": "^1.11.0",
"axios": "^1.6.0",
"next": "^14.0.0",
"pino": "^8.15.0",
"react": "^18.2.0",

View File

@@ -103,7 +103,7 @@ export default async function handler(req, res) {
const sip_config = {
display_name: From,
sip_mode: 'dial-in',
num_endpoints: (call_transfer !== undefined && call_transfer !== null) ? 2 : 1,
num_endpoints: call_transfer !== null ? 2 : 1,
codecs: {"audio": ["OPUS"]},
};
daily_room_properties.sip = sip_config;

View File

@@ -90,7 +90,7 @@ async def main(transport: DailyTransport):
logger.info("Participant left: {}", participant)
await task.cancel()
runner = PipelineRunner(handle_sigint=False, force_gc=True)
runner = PipelineRunner()
await runner.run(task)

View File

@@ -44,7 +44,7 @@ Try the hosted version of the demo here: https://pcc-smart-turn.vercel.app/.
4. Run the server:
```bash
LOCAL_RUN=1 python server.py
LOCAL=1 python server.py
```
### Run the client

File diff suppressed because it is too large Load Diff

View File

@@ -9,9 +9,9 @@
"lint": "next lint"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/client-react": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0",
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/client-react": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.10",
"next": "15.3.1",
"react": "^19.0.0",
"react-dom": "^19.0.0"

View File

@@ -1,5 +1,5 @@
import './globals.css';
import { PipecatProvider } from '@/providers/PipecatProvider';
import { RTVIProvider } from '@/providers/RTVIProvider';
export const metadata = {
title: 'Pipecat React Client',
@@ -20,7 +20,7 @@ export default function RootLayout({
<link rel="icon" href="/favicon.svg" type="image/svg+xml" />
</head>
<body>
<PipecatProvider>{children}</PipecatProvider>
<RTVIProvider>{children}</RTVIProvider>
</body>
</html>
);

View File

@@ -1,22 +1,22 @@
'use client';
import {
PipecatClientAudio,
PipecatClientVideo,
usePipecatClientTransportState,
RTVIClientAudio,
RTVIClientVideo,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
import { ConnectButton } from '../components/ConnectButton';
import { StatusDisplay } from '../components/StatusDisplay';
import { DebugDisplay } from '../components/DebugDisplay';
function BotVideo() {
const transportState = usePipecatClientTransportState();
const transportState = useRTVIClientTransportState();
const isConnected = transportState !== 'disconnected';
return (
<div className="bot-container">
<div className="video-container">
{isConnected && <PipecatClientVideo participant="bot" fit="cover" />}
{isConnected && <RTVIClientVideo participant="bot" fit="cover" />}
</div>
</div>
);
@@ -35,7 +35,7 @@ export default function Home() {
</div>
<DebugDisplay />
<PipecatClientAudio />
<RTVIClientAudio />
</div>
);
}

View File

@@ -1,17 +1,11 @@
import {
usePipecatClient,
usePipecatClientTransportState,
useRTVIClient,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
// Get the API base URL from environment variables
// Default to "/api" if not specified
// "/api" is the default for Next.js API routes and used
// for the Pipecat Cloud deployed agent
const API_BASE_URL = process.env.NEXT_PUBLIC_API_BASE_URL || '/api';
export function ConnectButton() {
const client = usePipecatClient();
const transportState = usePipecatClientTransportState();
const client = useRTVIClient();
const transportState = useRTVIClientTransportState();
const isConnected = ['connected', 'ready'].includes(transportState);
const handleClick = async () => {
@@ -24,10 +18,7 @@ export function ConnectButton() {
if (isConnected) {
await client.disconnect();
} else {
await client.connect({
endpoint: `${API_BASE_URL}/connect`,
requestData: { foo: 'bar' },
});
await client.connect();
}
} catch (error) {
console.error('Connection error:', error);

View File

@@ -6,7 +6,7 @@ import {
TranscriptData,
BotLLMTextData,
} from '@pipecat-ai/client-js';
import { usePipecatClient, useRTVIClientEvent } from '@pipecat-ai/client-react';
import { useRTVIClient, useRTVIClientEvent } from '@pipecat-ai/client-react';
import './DebugDisplay.css';
interface SmartTurnResultData {
@@ -20,7 +20,7 @@ interface SmartTurnResultData {
export function DebugDisplay() {
const debugLogRef = useRef<HTMLDivElement>(null);
const client = usePipecatClient();
const client = useRTVIClient();
const log = useCallback((message: string) => {
if (!debugLogRef.current) return;

View File

@@ -1,7 +1,7 @@
import { usePipecatClientTransportState } from '@pipecat-ai/client-react';
import { useRTVIClientTransportState } from '@pipecat-ai/client-react';
export function StatusDisplay() {
const transportState = usePipecatClientTransportState();
const transportState = useRTVIClientTransportState();
return (
<div className="status">

View File

@@ -1,28 +0,0 @@
'use client';
import { PipecatClient } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import { PipecatClientProvider } from '@pipecat-ai/client-react';
import { PropsWithChildren, useEffect, useState } from 'react';
export function PipecatProvider({ children }: PropsWithChildren) {
const [client, setClient] = useState<PipecatClient | null>(null);
useEffect(() => {
const pcClient = new PipecatClient({
transport: new DailyTransport(),
enableMic: true,
enableCam: false,
});
setClient(pcClient);
}, []);
if (!client) {
return null;
}
return (
<PipecatClientProvider client={client}>{children}</PipecatClientProvider>
);
}

View File

@@ -0,0 +1,43 @@
'use client';
import { RTVIClient } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import { RTVIClientProvider } from '@pipecat-ai/client-react';
import { PropsWithChildren, useEffect, useState } from 'react';
// Get the API base URL from environment variables
// Default to "/api" if not specified
// "/api" is the default for Next.js API routes and used
// for the Pipecat Cloud deployed agent
const API_BASE_URL = process.env.NEXT_PUBLIC_API_BASE_URL || '/api';
console.log('Using API base URL:', API_BASE_URL);
export function RTVIProvider({ children }: PropsWithChildren) {
const [client, setClient] = useState<RTVIClient | null>(null);
useEffect(() => {
const transport = new DailyTransport();
const rtviClient = new RTVIClient({
transport,
params: {
baseUrl: API_BASE_URL,
endpoints: {
connect: '/connect',
},
requestData: { foo: 'bar' },
},
enableMic: true,
enableCam: false,
});
setClient(rtviClient);
}, []);
if (!client) {
return null;
}
return <RTVIClientProvider client={client}>{children}</RTVIClientProvider>;
}

View File

@@ -45,7 +45,7 @@ from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
# Check if we're in local development mode
LOCAL = os.getenv("LOCAL_RUN")
LOCAL = os.getenv("LOCAL")
logger.remove()
logger.add(sys.stderr, level="DEBUG")

View File

@@ -20,7 +20,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyLogLevel, DailyParams, DailyTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -43,7 +43,6 @@ async def main():
vad_analyzer=SileroVADAnalyzer(),
),
)
transport.set_log_level(DailyLogLevel.Info)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -1,109 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -7,7 +7,6 @@
import argparse
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
@@ -51,63 +50,60 @@ transport_params = {
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = NeuphonicHttpTTSService(
api_key=os.getenv("NEUPHONIC_API_KEY"),
voice_id="fc854436-2dac-4d21-aa69-ae17b54e98eb", # Emily
aiohttp_session=session,
)
tts = NeuphonicHttpTTSService(
api_key=os.getenv("NEUPHONIC_API_KEY"),
voice_id="fc854436-2dac-4d21-aa69-ae17b54e98eb", # Emily
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner(handle_sigint=handle_sigint)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -1,81 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -1,162 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OLLamaLLMService(model="llama3.2") # Update to the model you're running locally
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -1,165 +0,0 @@
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import Frame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
}
SYSTEM_INSTRUCTION = """
You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information.
IMPORTANT: For ANY question about current events, news, recent developments, real-time information, or anything that might have changed recently, you MUST use the google_search tool to get the latest information.
You should use Google Search for:
- Current news and events
- Recent developments in any field
- Today's weather, stock prices, or other real-time data
- Any question that starts with "what's happening", "latest", "recent", "current", "today", etc.
- When you're not certain about recent information
Always be proactive about using search when the user asks about anything that could benefit from real-time information.
Your output will be converted to audio so don't include special characters in your answers.
Respond to what the user said in a creative and helpful way, always using search for current information.
"""
class GroundingMetadataProcessor(FrameProcessor):
"""Processor to capture and display grounding metadata from Gemini Live API."""
def __init__(self):
super().__init__()
self._grounding_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMSearchResponseFrame):
self._grounding_count += 1
logger.info(f"\n\n🔍 GROUNDING METADATA RECEIVED #{self._grounding_count}\n")
logger.info(f"📝 Search Result Text: {frame.search_result[:200]}...")
if frame.rendered_content:
logger.info(f"🔗 Rendered Content: {frame.rendered_content}")
if frame.origins:
logger.info(f"📍 Number of Origins: {len(frame.origins)}")
for i, origin in enumerate(frame.origins):
logger.info(f" Origin {i + 1}: {origin.site_title} - {origin.site_uri}")
if origin.results:
logger.info(f" Results: {len(origin.results)} items")
# Always push the frame downstream
await self.push_frame(frame, direction)
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting Gemini Live Grounding Metadata Test Bot")
# Create tools using ToolsSchema with custom tools for Gemini
tools = ToolsSchema(
standard_tools=[], # No standard function declarations needed
custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]},
)
llm = GeminiMultimodalLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=SYSTEM_INSTRUCTION,
voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck
transcribe_user_audio=True,
tools=tools,
)
# Create a processor to capture grounding metadata
grounding_processor = GroundingMetadataProcessor()
messages = [
{
"role": "user",
"content": "Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I'd like to know about.",
},
]
# Set up conversation context and management
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
grounding_processor, # Add our grounding processor here
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v2 import LocalSmartTurnAnalyzerV2
from pipecat.audio.turn.smart_turn.local_smart_turn import LocalSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
@@ -37,7 +37,7 @@ load_dotenv(override=True)
# # Hugging Face uses LFS to store large model files, including .mlpackage
# git lfs install
# # Clone the repo with the smart_turn_classifier.mlpackage
# git clone https://huggingface.co/pipecat-ai/smart-turn-v2
# git clone https://huggingface.co/pipecat-ai/smart-turn
#
# Then set the env variable:
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
@@ -52,7 +52,7 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV2(
turn_analyzer=LocalSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
@@ -60,7 +60,7 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV2(
turn_analyzer=LocalSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
@@ -68,7 +68,7 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV2(
turn_analyzer=LocalSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),

View File

@@ -2,5 +2,4 @@ fastapi
uvicorn
python-dotenv
pipecat-ai[webrtc,daily,deepgram,cartesia]
pipecat-ai-small-webrtc-prebuilt
strands-agents
pipecat-ai-small-webrtc-prebuilt

View File

@@ -20,10 +20,11 @@ import {
} from '@pipecat-ai/client-js';
import {
ProtobufFrameSerializer,
WebSocketTransport,
} from '@pipecat-ai/websocket-transport';
WebSocketTransport
} from "@pipecat-ai/websocket-transport";
class RecordingSerializer extends ProtobufFrameSerializer {
private lastTimestamp: number | null = null;
private recordingAudioToSend: boolean = false;
private _recordedAudio: { data: ArrayBuffer; delay: number }[] = [];
@@ -39,11 +40,7 @@ class RecordingSerializer extends ProtobufFrameSerializer {
}
// @ts-ignore
serializeAudio(
data: ArrayBuffer,
sampleRate: number,
numChannels: number
): Uint8Array | null {
serializeAudio(data: ArrayBuffer, sampleRate: number, numChannels: number): Uint8Array | null {
if (this.recordingAudioToSend) {
const now = Date.now();
// Compute delay since last packet
@@ -58,13 +55,13 @@ class RecordingSerializer extends ProtobufFrameSerializer {
}
public get recordedAudio() {
return this._recordedAudio;
return this._recordedAudio
}
}
class WebsocketClientApp {
private ENABLE_RECORDING_MODE = false;
private RECORDING_TIME_MS = 10000;
private ENABLE_RECORDING_MODE = false
private RECORDING_TIME_MS = 10000
private rtviClient: RTVIClient | null = null;
private connectBtn: HTMLButtonElement | null = null;
@@ -74,7 +71,7 @@ class WebsocketClientApp {
private botAudio: HTMLAudioElement;
private declare websocketTransport: WebSocketTransport;
private sendRecordedAudio: boolean = false;
private sendRecordedAudio: boolean = false
private declare recordingSerializer: RecordingSerializer;
private playBtn: HTMLButtonElement | null = null;
@@ -94,12 +91,8 @@ class WebsocketClientApp {
* Set up references to DOM elements and create necessary media elements
*/
private setupDOMElements(): void {
this.connectBtn = document.getElementById(
'connect-btn'
) as HTMLButtonElement;
this.disconnectBtn = document.getElementById(
'disconnect-btn'
) as HTMLButtonElement;
this.connectBtn = document.getElementById('connect-btn') as HTMLButtonElement;
this.disconnectBtn = document.getElementById('disconnect-btn') as HTMLButtonElement;
this.statusSpan = document.getElementById('connection-status');
this.debugLog = document.getElementById('debug-log');
this.playBtn = document.getElementById('play-btn') as HTMLButtonElement;
@@ -112,12 +105,8 @@ class WebsocketClientApp {
private setupEventListeners(): void {
this.connectBtn?.addEventListener('click', () => this.connect());
this.disconnectBtn?.addEventListener('click', () => this.disconnect());
this.playBtn?.addEventListener('click', () =>
this.startSendingRecordedAudio()
);
this.stopBtn?.addEventListener('click', () =>
this.stopSendingRecordedAudio()
);
this.playBtn?.addEventListener('click', () => this.startSendingRecordedAudio());
this.stopBtn?.addEventListener('click', () => this.stopSendingRecordedAudio());
}
/**
@@ -176,9 +165,7 @@ class WebsocketClientApp {
// Listen for tracks stopping
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(
`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`
);
this.log(`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`);
});
}
@@ -188,10 +175,7 @@ class WebsocketClientApp {
*/
private setupAudioTrack(track: MediaStreamTrack): void {
this.log('Setting up audio track');
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject
) {
if (this.botAudio.srcObject && "getAudioTracks" in this.botAudio.srcObject) {
const oldTrack = this.botAudio.srcObject.getAudioTracks()[0];
if (oldTrack?.id === track.id) return;
}
@@ -206,17 +190,27 @@ class WebsocketClientApp {
try {
const startTime = Date.now();
this.recordingSerializer = new RecordingSerializer();
const ws_opts = {
serializer: this.ENABLE_RECORDING_MODE
? this.recordingSerializer
: new ProtobufFrameSerializer(),
recorderSampleRate: 8000,
playerSampleRate: 8000,
};
this.recordingSerializer = new RecordingSerializer()
const transport = this.ENABLE_RECORDING_MODE ?
new WebSocketTransport({
serializer: this.recordingSerializer,
recorderSampleRate: 8000,
playerSampleRate:8000
}) :
new WebSocketTransport({
serializer: new ProtobufFrameSerializer(),
recorderSampleRate: 8000,
playerSampleRate:8000
});
this.websocketTransport = transport
const RTVIConfig: RTVIClientOptions = {
transport: new WebSocketTransport(ws_opts),
transport,
params: {
// The baseURL and endpoint of your bot server that the client will connect to
baseUrl: 'http://localhost:7860',
endpoints: { connect: '/connect' },
},
enableMic: true,
enableCam: false,
callbacks: {
@@ -244,34 +238,27 @@ class WebsocketClientApp {
onMessageError: (error) => console.error('Message error:', error),
onError: (error) => console.error('Error:', error),
},
};
}
this.rtviClient = new RTVIClient(RTVIConfig);
this.websocketTransport = this.rtviClient.transport;
this.setupTrackListeners();
this.log('Initializing devices...');
await this.rtviClient.initDevices();
this.log('Connecting to bot...');
await this.rtviClient.connect({
endpoint: 'http://localhost:7860/connect',
});
await this.rtviClient.connect();
const timeTaken = Date.now() - startTime;
this.log(`Connection complete, timeTaken: ${timeTaken}`);
if (this.ENABLE_RECORDING_MODE) {
this.log(
`Starting to recording the next ${
this.RECORDING_TIME_MS / 1000
}s of audio`
);
this.recordingSerializer.startRecording();
await this.sleep(this.RECORDING_TIME_MS);
this.recordingSerializer.stopRecording();
this.log('Recording stopped');
this.rtviClient.enableMic(false);
this.startSendingRecordedAudio();
this.log(`Starting to recording the next ${(this.RECORDING_TIME_MS/1000)}s of audio`);
this.recordingSerializer.startRecording()
await this.sleep(this.RECORDING_TIME_MS)
this.recordingSerializer.stopRecording()
this.log("Recording stopped");
this.rtviClient.enableMic(false)
this.startSendingRecordedAudio()
}
} catch (error) {
this.log(`Error connecting: ${(error as Error).message}`);
@@ -293,16 +280,11 @@ class WebsocketClientApp {
public async disconnect(): Promise<void> {
if (this.rtviClient) {
try {
this.stopSendingRecordedAudio();
this.stopSendingRecordedAudio()
await this.rtviClient.disconnect();
this.rtviClient = null;
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject
) {
this.botAudio.srcObject
.getAudioTracks()
.forEach((track) => track.stop());
if (this.botAudio.srcObject && "getAudioTracks" in this.botAudio.srcObject) {
this.botAudio.srcObject.getAudioTracks().forEach((track) => track.stop());
this.botAudio.srcObject = null;
}
} catch (error) {
@@ -312,21 +294,21 @@ class WebsocketClientApp {
}
private startSendingRecordedAudio() {
this.sendRecordedAudio = true;
this.sendRecordedAudio = true
if (this.playBtn) this.playBtn.disabled = true;
if (this.stopBtn) this.stopBtn.disabled = false;
void this.replayAudio();
void this.replayAudio()
}
private stopSendingRecordedAudio() {
if (this.stopBtn) this.stopBtn.disabled = true;
if (this.playBtn) this.playBtn.disabled = false;
this.sendRecordedAudio = false;
this.sendRecordedAudio = false
}
private async replayAudio() {
if (this.sendRecordedAudio) {
this.log('Sending recorded audio');
this.log("Sending recorded audio")
for (const chunk of this.recordingSerializer.recordedAudio) {
await this.sleep(chunk.delay);
this.websocketTransport.handleUserAudioStream(chunk.data);
@@ -334,13 +316,14 @@ class WebsocketClientApp {
const randomDelay = 1000 + Math.random() * (10000 - 500);
await this.sleep(randomDelay);
void this.replayAudio();
void this.replayAudio()
}
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
return new Promise(resolve => setTimeout(resolve, ms));
}
}
declare global {

File diff suppressed because it is too large Load Diff

View File

@@ -18,7 +18,7 @@
"vite": "^6.3.5"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0"
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.8"
}
}

View File

@@ -5,7 +5,7 @@
*/
/**
* Pipecat Client Implementation
* RTVI Client Implementation
*
* This client connects to an RTVI-compatible bot server using WebRTC (via Daily).
* It handles audio/video streaming and manages the connection lifecycle.
@@ -18,22 +18,20 @@
import {
Participant,
PipecatClient,
PipecatClientOptions,
RTVIClient,
RTVIClientOptions,
RTVIEvent,
} from '@pipecat-ai/client-js';
import {
DailyEventCallbacks,
DailyTransport,
} from '@pipecat-ai/daily-transport';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import SoundUtils from './util/soundUtils';
import { InstantVoiceHelper } from './util/instantVoiceHelper';
/**
* InstantVoiceClient handles the connection and media management for a real-time
* voice and video interaction with an AI bot.
*/
class InstantVoiceClient {
private declare pcClient: PipecatClient;
private declare rtviClient: RTVIClient;
private connectBtn: HTMLButtonElement | null = null;
private disconnectBtn: HTMLButtonElement | null = null;
private statusSpan: HTMLElement | null = null;
@@ -48,7 +46,7 @@ class InstantVoiceClient {
document.body.appendChild(this.botAudio);
this.setupDOMElements();
this.setupEventListeners();
this.initializePipecatClient();
this.initializeRTVIClient();
}
/**
@@ -74,11 +72,16 @@ class InstantVoiceClient {
this.disconnectBtn?.addEventListener('click', () => this.disconnect());
}
private initializePipecatClient(): void {
const PipecatConfig: PipecatClientOptions = {
private initializeRTVIClient(): void {
const RTVIConfig: RTVIClientOptions = {
transport: new DailyTransport({
bufferLocalAudioUntilBotReady: true,
}),
params: {
// The baseURL and endpoint of your bot server that the client will connect to
baseUrl: 'http://localhost:7860',
endpoints: { connect: '/connect' },
},
enableMic: true,
enableCam: false,
callbacks: {
@@ -110,23 +113,30 @@ class InstantVoiceClient {
onBotTranscript: (data) => this.log(`Bot: ${data.text}`),
onMessageError: (error) => console.error('Message error:', error),
onError: (error) => console.error('Error:', error),
onAudioBufferingStarted: () => {
SoundUtils.beep();
this.updateBufferingStatus('Yes');
this.log(
`onMicCaptureStarted, timeTaken: ${Date.now() - this.startTime}`
);
},
onAudioBufferingStopped: () => {
this.updateBufferingStatus('No');
this.log(
`onMicCaptureStopped, timeTaken: ${Date.now() - this.startTime}`
);
},
} as DailyEventCallbacks,
},
};
this.pcClient = new PipecatClient(PipecatConfig);
this.rtviClient = new RTVIClient(RTVIConfig);
this.rtviClient.registerHelper(
'transport',
new InstantVoiceHelper({
callbacks: {
onAudioBufferingStarted: () => {
SoundUtils.beep();
this.updateBufferingStatus('Yes');
this.log(
`onMicCaptureStarted, timeTaken: ${Date.now() - this.startTime}`
);
},
onAudioBufferingStopped: () => {
this.updateBufferingStatus('No');
this.log(
`onMicCaptureStopped, timeTaken: ${Date.now() - this.startTime}`
);
},
},
})
);
this.setupTrackListeners();
}
@@ -172,8 +182,8 @@ class InstantVoiceClient {
* This is called when the bot is ready or when the transport state changes to ready
*/
setupMediaTracks() {
if (!this.pcClient) return;
const tracks = this.pcClient.tracks();
if (!this.rtviClient) return;
const tracks = this.rtviClient.tracks();
if (tracks.bot?.audio) {
this.setupAudioTrack(tracks.bot.audio);
}
@@ -184,10 +194,10 @@ class InstantVoiceClient {
* This handles new tracks being added during the session
*/
setupTrackListeners() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Listen for new tracks starting
this.pcClient.on(RTVIEvent.TrackStarted, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
// Only handle non-local (bot) tracks
if (!participant?.local && track.kind === 'audio') {
this.setupAudioTrack(track);
@@ -195,7 +205,7 @@ class InstantVoiceClient {
});
// Listen for tracks stopping
this.pcClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(
`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`
);
@@ -220,25 +230,22 @@ class InstantVoiceClient {
/**
* Initialize and connect to the bot
* This sets up the Pipecat client, initializes devices, and establishes the connection
* This sets up the RTVI client, initializes devices, and establishes the connection
*/
public async connect(): Promise<void> {
try {
this.startTime = Date.now();
this.log('Connecting to bot...');
await this.pcClient.connect({
// The baseURL and endpoint of your bot server that the client will connect to
endpoint: 'http://localhost:7860/connect',
});
await this.rtviClient.connect();
} catch (error) {
this.log(`Error connecting: ${(error as Error).message}`);
this.updateStatus('Error');
this.updateBufferingStatus('No');
// Clean up if there's an error
if (this.pcClient) {
if (this.rtviClient) {
try {
await this.pcClient.disconnect();
await this.rtviClient.disconnect();
} catch (disconnectError) {
this.log(`Error during disconnect: ${disconnectError}`);
}
@@ -251,7 +258,7 @@ class InstantVoiceClient {
*/
public async disconnect(): Promise<void> {
try {
await this.pcClient.disconnect();
await this.rtviClient.disconnect();
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject

View File

@@ -0,0 +1,39 @@
import {RTVIClientHelper, RTVIClientHelperOptions, RTVIMessage} from "@pipecat-ai/client-js";
import {DailyRTVIMessageType} from '@pipecat-ai/daily-transport';
export type InstantVoiceHelperCallbacks = Partial<{
onAudioBufferingStarted: () => void;
onAudioBufferingStopped: () => void;
}>;
// --- Interface and class
export interface InstantVoiceHelperOptions extends RTVIClientHelperOptions {
callbacks?: InstantVoiceHelperCallbacks;
}
export class InstantVoiceHelper extends RTVIClientHelper {
protected declare _options: InstantVoiceHelperOptions;
constructor(options: InstantVoiceHelperOptions) {
super(options);
}
handleMessage(rtviMessage: RTVIMessage): void {
switch (rtviMessage.type) {
case DailyRTVIMessageType.AUDIO_BUFFERING_STARTED:
if (this._options.callbacks?.onAudioBufferingStarted) {
this._options.callbacks?.onAudioBufferingStarted()
}
break;
case DailyRTVIMessageType.AUDIO_BUFFERING_STOPPED:
if (this._options.callbacks?.onAudioBufferingStopped) {
this._options.callbacks?.onAudioBufferingStopped()
}
break;
}
}
getMessageTypes(): string[] {
return [DailyRTVIMessageType.AUDIO_BUFFERING_STARTED, DailyRTVIMessageType.AUDIO_BUFFERING_STOPPED];
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,7 @@
"vite": "^6.3.5"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0"
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.8"
}
}

View File

@@ -5,7 +5,7 @@
*/
/**
* Pipecat Client Implementation
* RTVI Client Implementation
*
* This client connects to an RTVI-compatible bot server using WebRTC (via Daily).
* It handles audio/video streaming and manages the connection lifecycle.
@@ -16,9 +16,78 @@
* - Browser with WebRTC support
*/
import { LogLevel, PipecatClient, RTVIEvent } from '@pipecat-ai/client-js';
import {
LogLevel,
RTVIClient,
RTVIClientHelper,
RTVIEvent,
} from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
class SearchResponseHelper extends RTVIClientHelper {
constructor(contentPanel) {
super();
this.contentPanel = contentPanel;
}
handleMessage(rtviMessage) {
console.log('SearchResponseHelper, received message:', rtviMessage);
if (rtviMessage.data) {
// Clear existing content
this.contentPanel.innerHTML = '';
// Create a container for all content
const contentContainer = document.createElement('div');
contentContainer.className = 'content-container';
// Add the search_result
if (rtviMessage.data.search_result) {
const searchResultDiv = document.createElement('div');
searchResultDiv.className = 'search-result';
searchResultDiv.textContent = rtviMessage.data.search_result;
contentContainer.appendChild(searchResultDiv);
}
// Add the sources
if (rtviMessage.data.origins) {
const sourcesDiv = document.createElement('div');
sourcesDiv.className = 'sources';
const sourcesTitle = document.createElement('h3');
sourcesTitle.className = 'sources-title';
sourcesTitle.textContent = 'Sources:';
sourcesDiv.appendChild(sourcesTitle);
rtviMessage.data.origins.forEach((origin) => {
const sourceLink = document.createElement('a');
sourceLink.className = 'source-link';
sourceLink.href = origin.site_uri;
sourceLink.target = '_blank';
sourceLink.textContent = origin.site_title;
sourcesDiv.appendChild(sourceLink);
});
contentContainer.appendChild(sourcesDiv);
}
// Add the rendered_content in an iframe
if (rtviMessage.data.rendered_content) {
const iframe = document.createElement('iframe');
iframe.className = 'iframe-container';
iframe.srcdoc = rtviMessage.data.rendered_content;
contentContainer.appendChild(iframe);
}
// Append the content container to the content panel
this.contentPanel.appendChild(contentContainer);
}
}
getMessageTypes() {
return ['bot-llm-search-response'];
}
}
/**
* ChatbotClient handles the connection and media management for a real-time
* voice and video interaction with an AI bot.
@@ -26,7 +95,7 @@ import { DailyTransport } from '@pipecat-ai/daily-transport';
class ChatbotClient {
constructor() {
// Initialize client state
this.pcClient = null;
this.rtviClient = null;
this.setupDOMElements();
this.setupEventListeners();
}
@@ -91,10 +160,10 @@ class ChatbotClient {
* This is called when the bot is ready or when the transport state changes to ready
*/
setupMediaTracks() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Get current tracks from the client
const tracks = this.pcClient.tracks();
const tracks = this.rtviClient.tracks();
// Set up any available bot tracks
if (tracks.bot?.audio) {
@@ -107,10 +176,10 @@ class ChatbotClient {
* This handles new tracks being added during the session
*/
setupTrackListeners() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Listen for new tracks starting
this.pcClient.on(RTVIEvent.TrackStarted, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
// Only handle non-local (bot) tracks
if (!participant?.local && track.kind === 'audio') {
this.setupAudioTrack(track);
@@ -118,7 +187,7 @@ class ChatbotClient {
});
// Listen for tracks stopping
this.pcClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(
`Track stopped event: ${track.kind} from ${
participant?.name || 'unknown'
@@ -144,13 +213,20 @@ class ChatbotClient {
/**
* Initialize and connect to the bot
* This sets up the Pipecat client, initializes devices, and establishes the connection
* This sets up the RTVI client, initializes devices, and establishes the connection
*/
async connect() {
try {
// Initialize the Pipecat client with a Daily WebRTC transport and our configuration
this.pcClient = new PipecatClient({
// Initialize the RTVI client with a Daily WebRTC transport and our configuration
this.rtviClient = new RTVIClient({
transport: new DailyTransport(),
params: {
// The baseURL and endpoint of your bot server that the client will connect to
baseUrl: 'http://localhost:7860',
endpoints: {
connect: '/connect',
},
},
enableMic: true, // Enable microphone for user input
enableCam: false,
callbacks: {
@@ -175,8 +251,6 @@ class ChatbotClient {
this.setupMediaTracks();
}
},
// Handle search response events
onBotLlmSearchResponse: this.handleSearchResponse.bind(this),
// Handle bot connection events
onBotConnected: (participant) => {
this.log(`Bot connected: ${JSON.stringify(participant)}`);
@@ -207,22 +281,22 @@ class ChatbotClient {
},
},
});
//this.pcClient.setLogLevel(LogLevel.DEBUG)
//this.rtviClient.setLogLevel(LogLevel.DEBUG)
this.rtviClient.registerHelper(
'llm',
new SearchResponseHelper(this.searchResultContainer)
);
// Set up listeners for media track events
this.setupTrackListeners();
// Initialize audio devices
this.log('Initializing devices...');
await this.pcClient.initDevices();
await this.rtviClient.initDevices();
// Connect to the bot
this.log('Connecting to bot...');
await this.pcClient.connect({
// The baseURL and endpoint of your bot server that the client will connect to
endpoint: 'http://localhost:7860/connect',
});
await this.rtviClient.connect();
this.log('Connection complete');
} catch (error) {
@@ -232,9 +306,9 @@ class ChatbotClient {
this.updateStatus('Error');
// Clean up if there's an error
if (this.pcClient) {
if (this.rtviClient) {
try {
await this.pcClient.disconnect();
await this.rtviClient.disconnect();
} catch (disconnectError) {
this.log(`Error during disconnect: ${disconnectError.message}`);
}
@@ -246,11 +320,11 @@ class ChatbotClient {
* Disconnect from the bot and clean up media resources
*/
async disconnect() {
if (this.pcClient) {
if (this.rtviClient) {
try {
// Disconnect the Pipecat client
await this.pcClient.disconnect();
this.pcClient = null;
// Disconnect the RTVI client
await this.rtviClient.disconnect();
this.rtviClient = null;
// Clean up audio
if (this.botAudio.srcObject) {
@@ -265,57 +339,6 @@ class ChatbotClient {
}
}
}
handleSearchResponse(response) {
console.log('SearchResponseHelper, received message:', response);
// Clear existing content
this.searchResultContainer.innerHTML = '';
// Create a container for all content
const contentContainer = document.createElement('div');
contentContainer.className = 'content-container';
// Add the search_result
if (response.search_result) {
const searchResultDiv = document.createElement('div');
searchResultDiv.className = 'search-result';
searchResultDiv.textContent = response.search_result;
contentContainer.appendChild(searchResultDiv);
}
// Add the sources
if (response.origins) {
const sourcesDiv = document.createElement('div');
sourcesDiv.className = 'sources';
const sourcesTitle = document.createElement('h3');
sourcesTitle.className = 'sources-title';
sourcesTitle.textContent = 'Sources:';
sourcesDiv.appendChild(sourcesTitle);
response.origins.forEach((origin) => {
const sourceLink = document.createElement('a');
sourceLink.className = 'source-link';
sourceLink.href = origin.site_uri;
sourceLink.target = '_blank';
sourceLink.textContent = origin.site_title;
sourcesDiv.appendChild(sourceLink);
});
contentContainer.appendChild(sourcesDiv);
}
// Add the rendered_content in an iframe
if (response.rendered_content) {
const iframe = document.createElement('iframe');
iframe.className = 'iframe-container';
iframe.srcdoc = response.rendered_content;
contentContainer.appendChild(iframe);
}
// Append the content container to the content panel
this.searchResultContainer.appendChild(contentContainer);
}
}
// Initialize the client when the page loads

File diff suppressed because it is too large Load Diff

View File

@@ -18,7 +18,7 @@
"vite": "^6.3.5"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/small-webrtc-transport": "^1.0.0"
"@pipecat-ai/client-js": "^0.3.2",
"@pipecat-ai/small-webrtc-transport": "^0.0.2"
}
}

View File

@@ -1,236 +1,217 @@
import { SmallWebRTCTransport } from '@pipecat-ai/small-webrtc-transport';
import {
BotLLMTextData,
Participant,
PipecatClient,
PipecatClientOptions,
TranscriptData,
TransportState,
} from '@pipecat-ai/client-js';
SmallWebRTCTransport
} from "@pipecat-ai/small-webrtc-transport";
import {Participant, RTVIClient, RTVIClientOptions, Transport} from "@pipecat-ai/client-js";
class WebRTCApp {
private declare connectBtn: HTMLButtonElement;
private declare disconnectBtn: HTMLButtonElement;
private declare muteBtn: HTMLButtonElement;
private declare audioInput: HTMLSelectElement;
private declare videoInput: HTMLSelectElement;
private declare audioCodec: HTMLSelectElement;
private declare videoCodec: HTMLSelectElement;
private declare connectBtn: HTMLButtonElement;
private declare disconnectBtn: HTMLButtonElement;
private declare muteBtn: HTMLButtonElement;
private declare videoElement: HTMLVideoElement;
private declare audioElement: HTMLAudioElement;
private declare audioInput: HTMLSelectElement;
private declare videoInput: HTMLSelectElement;
private declare audioCodec: HTMLSelectElement;
private declare videoCodec: HTMLSelectElement;
private debugLog: HTMLElement | null = null;
private statusSpan: HTMLElement | null = null;
private declare videoElement: HTMLVideoElement;
private declare audioElement: HTMLAudioElement;
private declare smallWebRTCTransport: SmallWebRTCTransport;
private declare pcClient: PipecatClient;
private debugLog: HTMLElement | null = null;
private statusSpan: HTMLElement | null = null;
constructor() {
this.setupDOMElements();
this.setupDOMEventListeners();
this.initializePipecatClient();
void this.populateDevices();
}
private declare smallWebRTCTransport: SmallWebRTCTransport;
private declare rtviClient: RTVIClient;
private initializePipecatClient(): void {
const opts: PipecatClientOptions = {
transport: new SmallWebRTCTransport({ connectionUrl: '/api/offer' }),
enableMic: true,
enableCam: true,
callbacks: {
onTransportStateChanged: (state: TransportState) => {
this.log(`Transport state: ${state}`);
},
onConnected: () => {
this.onConnectedHandler();
},
onBotReady: () => {
this.log('Bot is ready.');
},
onDisconnected: () => {
this.onDisconnectedHandler();
},
onUserStartedSpeaking: () => {
this.log('User started speaking.');
},
onUserStoppedSpeaking: () => {
this.log('User stopped speaking.');
},
onBotStartedSpeaking: () => {
this.log('Bot started speaking.');
},
onBotStoppedSpeaking: () => {
this.log('Bot stopped speaking.');
},
onUserTranscript: (transcript: TranscriptData) => {
if (transcript.final) {
this.log(`User transcript: ${transcript.text}`);
}
},
onBotTranscript: (data: BotLLMTextData) => {
this.log(`Bot transcript: ${data.text}`);
},
onTrackStarted: (
track: MediaStreamTrack,
participant?: Participant
) => {
if (participant?.local) {
return;
}
this.onBotTrackStarted(track);
},
onServerMessage: (msg: unknown) => {
this.log(`Server message: ${msg}`);
},
},
};
this.pcClient = new PipecatClient(opts);
this.smallWebRTCTransport = this.pcClient.transport as SmallWebRTCTransport;
}
private setupDOMElements(): void {
this.connectBtn = document.getElementById(
'connect-btn'
) as HTMLButtonElement;
this.disconnectBtn = document.getElementById(
'disconnect-btn'
) as HTMLButtonElement;
this.muteBtn = document.getElementById('mute-btn') as HTMLButtonElement;
this.audioInput = document.getElementById(
'audio-input'
) as HTMLSelectElement;
this.videoInput = document.getElementById(
'video-input'
) as HTMLSelectElement;
this.audioCodec = document.getElementById(
'audio-codec'
) as HTMLSelectElement;
this.videoCodec = document.getElementById(
'video-codec'
) as HTMLSelectElement;
this.videoElement = document.getElementById(
'bot-video'
) as HTMLVideoElement;
this.audioElement = document.getElementById(
'bot-audio'
) as HTMLAudioElement;
this.debugLog = document.getElementById('debug-log');
this.statusSpan = document.getElementById('connection-status');
}
private setupDOMEventListeners(): void {
this.connectBtn.addEventListener('click', () => this.start());
this.disconnectBtn.addEventListener('click', () => this.stop());
this.audioInput.addEventListener('change', (e) => {
// @ts-ignore
let audioDevice = e.target?.value;
this.pcClient.updateMic(audioDevice);
});
this.videoInput.addEventListener('change', (e) => {
// @ts-ignore
let videoDevice = e.target?.value;
this.pcClient.updateCam(videoDevice);
});
this.muteBtn.addEventListener('click', () => {
let isCamEnabled = this.pcClient.isCamEnabled;
this.pcClient.enableCam(!isCamEnabled);
this.muteBtn.textContent = isCamEnabled ? '📵' : '📷';
});
}
private log(message: string): void {
if (!this.debugLog) return;
const entry = document.createElement('div');
entry.textContent = `${new Date().toISOString()} - ${message}`;
if (message.startsWith('User: ')) {
entry.style.color = '#2196F3';
} else if (message.startsWith('Bot: ')) {
entry.style.color = '#4CAF50';
constructor() {
this.setupDOMElements();
this.setupDOMEventListeners();
this.initializeRTVIClient()
void this.populateDevices();
}
this.debugLog.appendChild(entry);
this.debugLog.scrollTop = this.debugLog.scrollHeight;
}
private clearAllLogs() {
this.debugLog!.innerText = '';
}
private updateStatus(status: string): void {
if (this.statusSpan) {
this.statusSpan.textContent = status;
private initializeRTVIClient(): void {
const transport = new SmallWebRTCTransport();
const RTVIConfig: RTVIClientOptions = {
params: {
baseUrl: "/api/offer"
},
transport: transport as Transport,
enableMic: true,
enableCam: true,
callbacks: {
onTransportStateChanged: (state) => {
this.log(`Transport state: ${state}`)
},
onConnected: () => {
this.onConnectedHandler()
},
onBotReady: () => {
this.log("Bot is ready.")
},
onDisconnected: () => {
this.onDisconnectedHandler()
},
onUserStartedSpeaking: () => {
this.log("User started speaking.")
},
onUserStoppedSpeaking: () => {
this.log("User stopped speaking.")
},
onBotStartedSpeaking: () => {
this.log("Bot started speaking.")
},
onBotStoppedSpeaking: () => {
this.log("Bot stopped speaking.")
},
onUserTranscript: (transcript) => {
if (transcript.final) {
this.log(`User transcript: ${transcript.text}`)
}
},
onBotTranscript: (transcript) => {
this.log(`Bot transcript: ${transcript.text}`)
},
onTrackStarted: (track: MediaStreamTrack, participant?: Participant) => {
if (participant?.local) {
return
}
this.onBotTrackStarted(track)
},
onServerMessage: (msg) => {
this.log(`Server message: ${msg}`)
}
},
}
RTVIConfig.customConnectHandler = () => Promise.resolve();
this.rtviClient = new RTVIClient(RTVIConfig);
this.smallWebRTCTransport = transport
}
this.log(`Status: ${status}`);
}
private onConnectedHandler() {
this.updateStatus('Connected');
if (this.connectBtn) this.connectBtn.disabled = true;
if (this.disconnectBtn) this.disconnectBtn.disabled = false;
}
private setupDOMElements(): void {
this.connectBtn = document.getElementById('connect-btn') as HTMLButtonElement;
this.disconnectBtn = document.getElementById('disconnect-btn') as HTMLButtonElement;
this.muteBtn = document.getElementById('mute-btn') as HTMLButtonElement;
private onDisconnectedHandler() {
this.updateStatus('Disconnected');
if (this.connectBtn) this.connectBtn.disabled = false;
if (this.disconnectBtn) this.disconnectBtn.disabled = true;
}
this.audioInput = document.getElementById('audio-input') as HTMLSelectElement;
this.videoInput = document.getElementById('video-input') as HTMLSelectElement;
this.audioCodec = document.getElementById('audio-codec') as HTMLSelectElement;
this.videoCodec = document.getElementById('video-codec') as HTMLSelectElement;
private onBotTrackStarted(track: MediaStreamTrack) {
if (track.kind === 'video') {
this.videoElement.srcObject = new MediaStream([track]);
} else {
this.audioElement.srcObject = new MediaStream([track]);
this.videoElement = document.getElementById('bot-video') as HTMLVideoElement;
this.audioElement = document.getElementById('bot-audio') as HTMLAudioElement;
this.debugLog = document.getElementById('debug-log');
this.statusSpan = document.getElementById('connection-status');
}
}
private async populateDevices(): Promise<void> {
const populateSelect = (
select: HTMLSelectElement,
devices: MediaDeviceInfo[]
): void => {
let counter = 1;
devices.forEach((device) => {
const option = document.createElement('option');
option.value = device.deviceId;
option.text = device.label || 'Device #' + counter;
select.appendChild(option);
counter += 1;
});
};
private setupDOMEventListeners(): void {
this.connectBtn.addEventListener("click", () => this.start());
this.disconnectBtn.addEventListener("click", () => this.stop());
this.audioInput.addEventListener("change", (e) => {
// @ts-ignore
let audioDevice = e.target?.value
this.rtviClient.updateMic(audioDevice)
})
this.videoInput.addEventListener("change", (e) => {
// @ts-ignore
let videoDevice = e.target?.value
this.rtviClient.updateCam(videoDevice)
})
this.muteBtn.addEventListener('click', () => {
let isCamEnabled = this.rtviClient.isCamEnabled
this.rtviClient.enableCam(!isCamEnabled)
this.muteBtn.textContent = isCamEnabled ? '📵' : '📷';
});
try {
const audioDevices = await this.pcClient.getAllMics();
populateSelect(this.audioInput, audioDevices);
const videoDevices = await this.pcClient.getAllCams();
populateSelect(this.videoInput, videoDevices);
} catch (e) {
alert(e);
}
}
private async start(): Promise<void> {
this.clearAllLogs();
this.connectBtn.disabled = true;
this.updateStatus('Connecting');
this.smallWebRTCTransport.setAudioCodec(this.audioCodec.value);
this.smallWebRTCTransport.setVideoCodec(this.videoCodec.value);
try {
await this.pcClient.connect();
} catch (e) {
console.log(`Failed to connect ${e}`);
this.stop();
private log(message: string): void {
if (!this.debugLog) return;
const entry = document.createElement('div');
entry.textContent = `${new Date().toISOString()} - ${message}`;
if (message.startsWith('User: ')) {
entry.style.color = '#2196F3';
} else if (message.startsWith('Bot: ')) {
entry.style.color = '#4CAF50';
}
this.debugLog.appendChild(entry);
this.debugLog.scrollTop = this.debugLog.scrollHeight;
}
}
private stop(): void {
void this.pcClient.disconnect();
}
private clearAllLogs() {
this.debugLog!.innerText = ''
}
private updateStatus(status: string): void {
if (this.statusSpan) {
this.statusSpan.textContent = status;
}
this.log(`Status: ${status}`);
}
private onConnectedHandler() {
this.updateStatus('Connected');
if (this.connectBtn) this.connectBtn.disabled = true;
if (this.disconnectBtn) this.disconnectBtn.disabled = false;
}
private onDisconnectedHandler() {
this.updateStatus('Disconnected');
if (this.connectBtn) this.connectBtn.disabled = false;
if (this.disconnectBtn) this.disconnectBtn.disabled = true;
}
private onBotTrackStarted(track: MediaStreamTrack) {
if (track.kind === 'video') {
this.videoElement.srcObject = new MediaStream([track]);
} else {
this.audioElement.srcObject = new MediaStream([track]);
}
}
private async populateDevices(): Promise<void> {
const populateSelect = (select: HTMLSelectElement, devices: MediaDeviceInfo[]): void => {
let counter = 1;
devices.forEach((device) => {
const option = document.createElement('option');
option.value = device.deviceId;
option.text = device.label || ('Device #' + counter);
select.appendChild(option);
counter += 1;
});
};
try {
const audioDevices = await this.rtviClient.getAllMics();
populateSelect(this.audioInput, audioDevices);
const videoDevices = await this.rtviClient.getAllCams();
populateSelect(this.videoInput, videoDevices);
} catch (e) {
alert(e);
}
}
private async start(): Promise<void> {
this.clearAllLogs()
this.connectBtn.disabled = true;
this.updateStatus("Connecting")
this.smallWebRTCTransport.setAudioCodec(this.audioCodec.value)
this.smallWebRTCTransport.setVideoCodec(this.videoCodec.value)
try {
await this.rtviClient.connect()
} catch (e) {
console.log(`Failed to connect ${e}`)
this.stop()
}
}
private stop(): void {
void this.rtviClient.disconnect()
}
}
// Create the WebRTCConnection instance

View File

@@ -1,51 +1,40 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>AI Chatbot</title>
</head>
<body>
<div class="container">
<div class="status-bar">
<div class="status">
Status: <span id="connection-status">Disconnected</span>
</div>
<div class="controls">
<button id="connect-btn">Connect</button>
<button id="disconnect-btn" disabled>Disconnect</button>
</div>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI Chatbot</title>
</head>
<body>
<div class="container">
<div class="status-bar">
<div class="status">
Status: <span id="connection-status">Disconnected</span>
</div>
<div class="main-content">
<div class="bot-container">
<div id="bot-video-container"></div>
<audio id="bot-audio" autoplay></audio>
</div>
</div>
<div class="device-bar">
<div class="device-controls">
<select id="device-selector"></select>
<button id="mic-toggle-btn">Unmute Mic</button>
</div>
<div class="text-input-container">
<input
type="text"
id="text-input"
placeholder="Type your message..." />
<button id="send-text-btn" disabled>Send</button>
</div>
</div>
<div class="debug-panel">
<h3>Debug Info</h3>
<div id="debug-log"></div>
<div class="controls">
<button id="connect-btn">Connect</button>
<button id="disconnect-btn" disabled>Disconnect</button>
</div>
</div>
<script type="module" src="/src/app.js"></script>
<link rel="stylesheet" href="/src/style.css" />
</body>
</html>
<div class="main-content">
<div class="bot-container">
<div id="bot-video-container">
</div>
<audio id="bot-audio" autoplay></audio>
</div>
</div>
<div class="debug-panel">
<h3>Debug Info</h3>
<div id="debug-log"></div>
</div>
</div>
<script type="module" src="/src/app.js"></script>
<link rel="stylesheet" href="/src/style.css">
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,7 @@
"vite": "^6.3.5"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0"
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.8"
}
}

View File

@@ -5,7 +5,7 @@
*/
/**
* Pipecat Client Implementation
* RTVI Client Implementation
*
* This client connects to an RTVI-compatible bot server using WebRTC (via Daily).
* It handles audio/video streaming and manages the connection lifecycle.
@@ -16,7 +16,7 @@
* - Browser with WebRTC support
*/
import { PipecatClient, RTVIEvent } from '@pipecat-ai/client-js';
import { RTVIClient, RTVIEvent } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
/**
@@ -26,8 +26,9 @@ import { DailyTransport } from '@pipecat-ai/daily-transport';
class ChatbotClient {
constructor() {
// Initialize client state
this.pcClient = null;
this.rtviClient = null;
this.setupDOMElements();
this.setupEventListeners();
this.initializeClientAndTransport();
}
@@ -41,9 +42,6 @@ class ChatbotClient {
this.statusSpan = document.getElementById('connection-status');
this.debugLog = document.getElementById('debug-log');
this.botVideoContainer = document.getElementById('bot-video-container');
this.deviceSelector = document.getElementById('device-selector');
this.micToggleBtn = document.getElementById('mic-toggle-btn');
this.sendTextBtn = document.getElementById('send-text-btn');
// Create an audio element for bot's voice output
this.botAudio = document.createElement('audio');
@@ -56,78 +54,25 @@ class ChatbotClient {
* Set up event listeners for connect/disconnect buttons
*/
setupEventListeners() {
this.connectBtn.addEventListener('click', () => {
console.log('click');
this.connect();
});
this.connectBtn.addEventListener('click', () => this.connect());
this.disconnectBtn.addEventListener('click', () => this.disconnect());
// Populate device selector
this.pcClient.getAllMics().then((mics) => {
console.log('Available mics:', mics);
mics.forEach((device) => {
const option = document.createElement('option');
option.value = device.deviceId;
option.textContent = device.label || `Microphone ${device.deviceId}`;
this.deviceSelector.appendChild(option);
});
});
this.deviceSelector.addEventListener('change', (event) => {
const selectedDeviceId = event.target.value;
console.log('Selected device ID:', selectedDeviceId);
this.pcClient.updateMic(selectedDeviceId);
});
// Handle mic mute/unmute toggle
const micToggleBtn = document.getElementById('mic-toggle-btn');
micToggleBtn.addEventListener('click', async () => {
if (this.pcClient.state === 'disconnected') {
await this.pcClient.initDevices();
} else {
this.pcClient.enableMic(!this.pcClient.isMicEnabled);
}
});
const textInput = document.getElementById('text-input');
const sendTextToLLM = () => {
this.sendTextBtn.disabled = true; // Disable button to prevent multiple clicks
const text = textInput.value.trim();
if (text) {
void this.pcClient.appendToContext({
role: 'user',
content: text,
run_immediately: true,
});
}
textInput.value = ''; // Clear the input
this.sendTextBtn.disabled = false; // Re-enable button after sending
};
this.sendTextBtn.addEventListener('click', sendTextToLLM);
// Also handle Enter key in the input
textInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendTextToLLM();
}
});
}
updateMicToggleButton(micEnabled) {
console.log('Mic enabled:', micEnabled, this.pcClient?.isMicEnabled);
this.micToggleBtn.textContent = micEnabled ? 'Mute Mic' : 'Unmute Mic';
}
/**
* Set up the Pipecat client and Daily transport
* Set up the RTVI client and Daily transport
*/
async initializeClientAndTransport() {
console.log('Initializing Pipecat client and transport...');
// Initialize the Pipecat client with a DailyTransport and our configuration
this.pcClient = new PipecatClient({
initializeClientAndTransport() {
// Initialize the RTVI client with a DailyTransport and our configuration
this.rtviClient = new RTVIClient({
transport: new DailyTransport(),
enableMic: true,
params: {
// The baseURL and endpoint of your bot server that the client will connect to
baseUrl: 'http://localhost:7860',
endpoints: {
connect: '/connect',
},
},
enableMic: true, // Enable microphone for user input
enableCam: false,
callbacks: {
// Handle connection state changes
@@ -141,9 +86,7 @@ class ChatbotClient {
this.updateStatus('Disconnected');
this.connectBtn.disabled = false;
this.disconnectBtn.disabled = true;
this.sendTextBtn.disabled = true;
this.log('Client disconnected');
this.updateMicToggleButton(false);
},
// Handle transport state changes
onTransportStateChanged: (state) => {
@@ -163,7 +106,6 @@ class ChatbotClient {
onBotReady: (data) => {
this.log(`Bot ready: ${JSON.stringify(data)}`);
this.setupMediaTracks();
this.sendTextBtn.disabled = false;
},
// Transcript events
onUserTranscript: (data) => {
@@ -179,20 +121,14 @@ class ChatbotClient {
onMessageError: (error) => {
console.log('Message error:', error);
},
onMicUpdated: (data) => {
console.log('Mic updated:', data);
this.deviceSelector.value = data.deviceId;
},
onError: (error) => {
console.log('Error:', JSON.stringify(error));
},
},
});
window.client = this; // Expose client globally for debugging
// Set up listeners for media track events
this.setupTrackListeners();
this.setupEventListeners();
}
/**
@@ -227,10 +163,10 @@ class ChatbotClient {
* This is called when the bot is ready or when the transport state changes to ready
*/
setupMediaTracks() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Get current tracks from the client
const tracks = this.pcClient.tracks();
const tracks = this.rtviClient.tracks();
// Set up any available bot tracks
if (tracks.bot?.audio) {
@@ -246,34 +182,27 @@ class ChatbotClient {
* This handles new tracks being added during the session
*/
setupTrackListeners() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Listen for new tracks starting
this.pcClient.on(RTVIEvent.TrackStarted, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
// Only handle non-local (bot) tracks
if (!participant?.local) {
if (track.kind === 'audio') {
this.setupAudioTrack(track);
} else if (track.kind === 'video') {
this.setupVideoTrack(track);
}
} else if (track.kind === 'audio') {
console.log(`Local audio track started: `, this.pcClient.tracks());
// If local audio track starts, update mic
this.updateMicToggleButton(true);
}
});
// Listen for tracks stopping
this.pcClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(
`Track stopped event: ${track.kind} from ${
participant ? (participant.local ? 'local' : 'bot') : 'unknown'
participant?.name || 'unknown'
}`
);
if (participant?.local && track.kind === 'audio') {
// If local audio track stops, update mic toggle button
this.updateMicToggleButton(false);
}
});
}
@@ -322,16 +251,17 @@ class ChatbotClient {
/**
* Initialize and connect to the bot
* This sets up the Pipecat client, initializes devices, and establishes the connection
* This sets up the RTVI client, initializes devices, and establishes the connection
*/
async connect() {
try {
// Initialize audio/video devices
this.log('Initializing devices...');
await this.rtviClient.initDevices();
// Connect to the bot
this.log('Connecting to bot...');
await this.pcClient.connect({
endpoint: 'http://localhost:7860/connect',
timeout: 25000,
});
await this.rtviClient.connect();
this.log('Connection complete');
} catch (error) {
@@ -341,9 +271,9 @@ class ChatbotClient {
this.updateStatus('Error');
// Clean up if there's an error
if (this.pcClient) {
if (this.rtviClient) {
try {
await this.pcClient.disconnect();
await this.rtviClient.disconnect();
} catch (disconnectError) {
this.log(`Error during disconnect: ${disconnectError.message}`);
}
@@ -355,10 +285,10 @@ class ChatbotClient {
* Disconnect from the bot and clean up media resources
*/
async disconnect() {
if (this.pcClient) {
if (this.rtviClient) {
try {
// Disconnect the Pipecat client
await this.pcClient.disconnect();
// Disconnect the RTVI client
await this.rtviClient.disconnect();
// Clean up audio
if (this.botAudio.srcObject) {

View File

@@ -10,8 +10,7 @@ body {
margin: 0 auto;
}
.status-bar,
.device-bar {
.status-bar {
display: flex;
justify-content: space-between;
align-items: center;
@@ -21,24 +20,7 @@ body {
margin-bottom: 20px;
}
.device-bar {
flex-direction: column;
gap: 10px;
}
.controls,
.device-controls {
display: flex;
align-items: center;
gap: 10px; /* Adds spacing between elements */
}
.device-controls {
margin-left: auto;
}
.controls button,
.device-controls button {
.controls button {
padding: 8px 16px;
margin-left: 10px;
border: none;
@@ -46,56 +28,6 @@ body {
cursor: pointer;
}
#bot-selector,
#device-selector {
padding: 8px 16px;
padding-right: 40px;
border: none;
border-radius: 4px;
background-color: #6c757d; /* Gray background */
color: white; /* White text */
cursor: pointer;
appearance: none; /* Removes default browser styling for dropdowns */
background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 24 24' fill='white'%3E%3Cpath d='M7 10l5 5 5-5z'/%3E%3C/svg%3E"); /* Custom arrow */
background-repeat: no-repeat;
background-position: right 8px center; /* Position the arrow */
}
#bot-selector:focus,
#device-selector:focus {
outline: none;
box-shadow: 0 0 4px rgba(0, 0, 0, 0.3); /* Add a subtle focus effect */
}
.text-input-container {
display: flex;
gap: 8px;
margin-left: 10px;
width: 100%;
flex: 1;
}
#text-input {
flex: 1;
padding: 8px 16px;
border: 1px solid #e0e0e0;
border-radius: 4px;
min-width: 200px;
}
#send-text-btn {
padding: 8px 16px;
border: none;
border-radius: 4px;
background-color: #007bff;
color: white;
flex-shrink: 0;
}
#send-text-btn:hover {
background-color: #0056b3;
}
#connect-btn {
background-color: #4caf50;
color: white;
@@ -106,9 +38,6 @@ body {
color: white;
}
#mic-toggle-btn {
}
button:disabled {
opacity: 0.5;
cursor: not-allowed;

File diff suppressed because it is too large Load Diff

View File

@@ -10,9 +10,9 @@
"preview": "vite preview"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/client-react": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0",
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/client-react": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.8",
"react": "^18.3.1",
"react-dom": "^18.3.1"
},

View File

@@ -1,22 +1,22 @@
import {
PipecatClientAudio,
PipecatClientVideo,
usePipecatClientTransportState,
RTVIClientAudio,
RTVIClientVideo,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
import { PipecatProvider } from './providers/PipecatProvider';
import { RTVIProvider } from './providers/RTVIProvider';
import { ConnectButton } from './components/ConnectButton';
import { StatusDisplay } from './components/StatusDisplay';
import { DebugDisplay } from './components/DebugDisplay';
import './App.css';
function BotVideo() {
const transportState = usePipecatClientTransportState();
const transportState = useRTVIClientTransportState();
const isConnected = transportState !== 'disconnected';
return (
<div className="bot-container">
<div className="video-container">
{isConnected && <PipecatClientVideo participant="bot" fit="cover" />}
{isConnected && <RTVIClientVideo participant="bot" fit="cover" />}
</div>
</div>
);
@@ -35,16 +35,16 @@ function AppContent() {
</div>
<DebugDisplay />
<PipecatClientAudio />
<RTVIClientAudio />
</div>
);
}
function App() {
return (
<PipecatProvider>
<RTVIProvider>
<AppContent />
</PipecatProvider>
</RTVIProvider>
);
}

View File

@@ -1,16 +1,16 @@
import {
usePipecatClient,
usePipecatClientTransportState,
useRTVIClient,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
export function ConnectButton() {
const client = usePipecatClient();
const transportState = usePipecatClientTransportState();
const client = useRTVIClient();
const transportState = useRTVIClientTransportState();
const isConnected = ['connected', 'ready'].includes(transportState);
const handleClick = async () => {
if (!client) {
console.error('Pipecat client is not initialized');
console.error('RTVI client is not initialized');
return;
}
@@ -18,7 +18,7 @@ export function ConnectButton() {
if (isConnected) {
await client.disconnect();
} else {
await client.connect({ endpoint: 'http://localhost:7860/connect' });
await client.connect();
}
} catch (error) {
console.error('Connection error:', error);

View File

@@ -6,12 +6,12 @@ import {
TranscriptData,
BotLLMTextData,
} from '@pipecat-ai/client-js';
import { usePipecatClient, useRTVIClientEvent } from '@pipecat-ai/client-react';
import { useRTVIClient, useRTVIClientEvent } from '@pipecat-ai/client-react';
import './DebugDisplay.css';
export function DebugDisplay() {
const debugLogRef = useRef<HTMLDivElement>(null);
const client = usePipecatClient();
const client = useRTVIClient();
const log = useCallback((message: string) => {
if (!debugLogRef.current) return;

View File

@@ -1,7 +1,7 @@
import { usePipecatClientTransportState } from '@pipecat-ai/client-react';
import { useRTVIClientTransportState } from '@pipecat-ai/client-react';
export function StatusDisplay() {
const transportState = usePipecatClientTransportState();
const transportState = useRTVIClientTransportState();
return (
<div className="status">

View File

@@ -1,16 +0,0 @@
import { type PropsWithChildren } from 'react';
import { PipecatClient } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import { PipecatClientProvider } from '@pipecat-ai/client-react';
const client = new PipecatClient({
transport: new DailyTransport(),
enableMic: true,
enableCam: false,
});
export function PipecatProvider({ children }: PropsWithChildren) {
return (
<PipecatClientProvider client={client}>{children}</PipecatClientProvider>
);
}

View File

@@ -0,0 +1,22 @@
import { type PropsWithChildren } from 'react';
import { RTVIClient } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import { RTVIClientProvider } from '@pipecat-ai/client-react';
const transport = new DailyTransport();
const client = new RTVIClient({
transport,
params: {
baseUrl: 'http://localhost:7860',
endpoints: {
connect: '/connect',
},
},
enableMic: true,
enableCam: false,
});
export function RTVIProvider({ children }: PropsWithChildren) {
return <RTVIClientProvider client={client}>{children}</RTVIClientProvider>;
}

View File

@@ -4,18 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Bot Implementation.
This module implements a chatbot using OpenAI's GPT-4 model for natural language
processing. It includes:
- Real-time audio/video interaction through Daily
- Animated robot avatar
- Text-to-speech using ElevenLabs
- Support for both English and Spanish
The bot runs as part of a pipeline that processes audio/video frames and manages
the conversation flow.
"""
import asyncio
import os
@@ -24,150 +12,72 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
SpriteFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import (
DailyMeetingTokenParams,
DailyMeetingTokenProperties,
DailyRESTHelper,
DailyRoomParams,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
sprites = []
script_dir = os.path.dirname(__file__)
# Load sequential animation frames
for i in range(1, 26):
# Build the full path to the image file
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
# Create a smooth animation by adding reversed frames
flipped = sprites[::-1]
sprites.extend(flipped)
# Define static and animated states
quiet_frame = sprites[0] # Static frame for when bot is listening
talking_frame = SpriteFrame(images=sprites) # Animation sequence for when bot is talking
class TalkingAnimation(FrameProcessor):
"""Manages the bot's visual animation states.
Switches between static (listening) and animated (talking) states based on
the bot's current speaking status.
"""
def __init__(self):
super().__init__()
self._is_talking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and update animation state.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
# Return to static frame when bot stops speaking
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(frame, direction)
async def main():
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Daily video transport
- Speech-to-text and text-to-speech services
- Language model integration
- Animation processing
- RTVI event handling
"""
"""Main bot execution function."""
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
room = await daily_rest_helper.create_room(
DailyRoomParams(properties={"enable_prejoin_ui": False})
)
token_params = DailyMeetingTokenParams(
properties=DailyMeetingTokenProperties(
is_owner=True,
permissions={
"hasPresence": False, # Example: join as a hidden participant
},
start_video_off=True,
start_audio_off=True,
)
)
token = await daily_rest_helper.get_token(room_url=room.url, params=token_params)
# Set up Daily transport with video/audio parameters
transport = DailyTransport(
room_url,
room.url,
token,
"Chatbot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=576,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
),
)
# Initialize text-to-speech service
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="pNInz6obpgDQGcFmaJgB",
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
#
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
#
# Spanish
#
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
"content": "Summerize the conversation so far in a single sentence.",
},
]
@@ -176,8 +86,6 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
ta = TalkingAnimation()
#
# RTVI events for Pipecat client UI
#
@@ -189,8 +97,6 @@ async def main():
rtvi,
context_aggregator.user(),
llm,
tts,
ta,
transport.output(),
context_aggregator.assistant(),
]
@@ -204,7 +110,6 @@ async def main():
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):

File diff suppressed because it is too large Load Diff

View File

@@ -19,7 +19,7 @@
"vite": "^6.0.2"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/websocket-transport": "^1.0.0"
"@pipecat-ai/client-js": "^0.4.0",
"@pipecat-ai/websocket-transport": "^0.4.2"
}
}

View File

@@ -5,22 +5,21 @@
*/
import {
BotLLMTextData,
Participant,
PipecatClient,
PipecatClientOptions,
RTVIEvent, RTVIMessage, TranscriptData,
RTVIClient,
RTVIClientOptions,
RTVIEvent,
} from '@pipecat-ai/client-js';
import {
WebSocketTransport,
TwilioSerializer,
} from '@pipecat-ai/websocket-transport';
} from "@pipecat-ai/websocket-transport";
class WebsocketClientApp {
private static STREAM_SID = 'ws_mock_stream_sid';
private static CALL_SID = 'ws_mock_call_sid';
private rtviClient: PipecatClient | null = null;
private static STREAM_SID = "ws_mock_stream_sid"
private static CALL_SID = "ws_mock_call_sid"
private rtviClient: RTVIClient | null = null;
private connectBtn: HTMLButtonElement | null = null;
private disconnectBtn: HTMLButtonElement | null = null;
private statusSpan: HTMLElement | null = null;
@@ -39,12 +38,8 @@ class WebsocketClientApp {
* Set up references to DOM elements and create necessary media elements
*/
private setupDOMElements(): void {
this.connectBtn = document.getElementById(
'connect-btn'
) as HTMLButtonElement;
this.disconnectBtn = document.getElementById(
'disconnect-btn'
) as HTMLButtonElement;
this.connectBtn = document.getElementById('connect-btn') as HTMLButtonElement;
this.disconnectBtn = document.getElementById('disconnect-btn') as HTMLButtonElement;
this.statusSpan = document.getElementById('connection-status');
this.debugLog = document.getElementById('debug-log');
}
@@ -85,23 +80,13 @@ class WebsocketClientApp {
}
private async emulateTwilioMessages() {
const connectedMessage = {
event: 'connected',
protocol: 'Call',
version: '1.0.0',
};
const connectedMessage={"event": "connected", "protocol": "Call", "version": "1.0.0"}
const websocketTransport = this.rtviClient?.transport as WebSocketTransport;
void websocketTransport?.sendRawMessage(connectedMessage);
const websocketTransport = this.rtviClient?.transport as WebSocketTransport
void websocketTransport?.sendRawMessage(connectedMessage)
const startMessage = {
event: 'start',
start: {
streamSid: WebsocketClientApp.STREAM_SID,
callSid: WebsocketClientApp.CALL_SID,
},
};
void websocketTransport?.sendRawMessage(startMessage);
const startMessage={"event": "start", "start": {"streamSid": WebsocketClientApp.STREAM_SID, "callSid": WebsocketClientApp.CALL_SID}}
void websocketTransport?.sendRawMessage(startMessage)
}
/**
@@ -124,7 +109,7 @@ class WebsocketClientApp {
if (!this.rtviClient) return;
// Listen for new tracks starting
this.rtviClient.on(RTVIEvent.TrackStarted, (track: MediaStreamTrack, participant?: Participant) => {
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
// Only handle non-local (bot) tracks
if (!participant?.local && track.kind === 'audio') {
this.setupAudioTrack(track);
@@ -132,10 +117,8 @@ class WebsocketClientApp {
});
// Listen for tracks stopping
this.rtviClient.on(RTVIEvent.TrackStopped, (track: MediaStreamTrack, participant?: Participant) => {
this.log(
`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`
);
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`);
});
}
@@ -145,10 +128,7 @@ class WebsocketClientApp {
*/
private setupAudioTrack(track: MediaStreamTrack): void {
this.log('Setting up audio track');
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject
) {
if (this.botAudio.srcObject && "getAudioTracks" in this.botAudio.srcObject) {
const oldTrack = this.botAudio.srcObject.getAudioTracks()[0];
if (oldTrack?.id === track.id) return;
}
@@ -163,19 +143,23 @@ class WebsocketClientApp {
try {
const startTime = Date.now();
const ws_opts = {
const transport = new WebSocketTransport({
serializer: new TwilioSerializer(),
recorderSampleRate: 8000,
playerSampleRate: 8000,
ws_url: 'http://localhost:8765/ws',
};
const RTVIConfig: PipecatClientOptions = {
transport: new WebSocketTransport(ws_opts),
playerSampleRate: 8000
});
const RTVIConfig: RTVIClientOptions = {
transport,
params: {
// The baseURL and endpoint of your bot server that the client will connect to
baseUrl: 'http://localhost:8765',
endpoints: { connect: '/' },
},
enableMic: true,
enableCam: false,
callbacks: {
onConnected: () => {
this.emulateTwilioMessages();
this.emulateTwilioMessages()
this.updateStatus('Connected');
if (this.connectBtn) this.connectBtn.disabled = true;
if (this.disconnectBtn) this.disconnectBtn.disabled = false;
@@ -186,21 +170,27 @@ class WebsocketClientApp {
if (this.disconnectBtn) this.disconnectBtn.disabled = true;
this.log('Client disconnected');
},
onBotReady: (data: any) => {
onBotReady: (data) => {
this.log(`Bot ready: ${JSON.stringify(data)}`);
this.setupMediaTracks();
},
onUserTranscript: (data: TranscriptData) => {
onUserTranscript: (data) => {
if (data.final) {
this.log(`User: ${data.text}`);
}
},
onBotTranscript: (data: BotLLMTextData) => this.log(`Bot: ${data.text}`),
onMessageError: (error: RTVIMessage) => console.error('Message error:', error),
onError: (error: RTVIMessage) => console.error('Error:', error),
onBotTranscript: (data) => this.log(`Bot: ${data.text}`),
onMessageError: (error) => console.error('Message error:', error),
onError: (error) => console.error('Error:', error),
},
};
this.rtviClient = new PipecatClient(RTVIConfig);
}
// @ts-ignore
RTVIConfig.customConnectHandler = () => Promise.resolve(
{
ws_url: "/ws",
}
);
this.rtviClient = new RTVIClient(RTVIConfig);
this.setupTrackListeners();
this.log('Initializing devices...');
@@ -233,13 +223,8 @@ class WebsocketClientApp {
try {
await this.rtviClient.disconnect();
this.rtviClient = null;
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject
) {
this.botAudio.srcObject
.getAudioTracks()
.forEach((track) => track.stop());
if (this.botAudio.srcObject && "getAudioTracks" in this.botAudio.srcObject) {
this.botAudio.srcObject.getAudioTracks().forEach((track) => track.stop());
this.botAudio.srcObject = null;
}
} catch (error) {
@@ -247,6 +232,7 @@ class WebsocketClientApp {
}
}
}
}
declare global {

File diff suppressed because it is too large Load Diff

View File

@@ -19,8 +19,8 @@
"vite": "^6.3.5"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/websocket-transport": "^1.0.0",
"@pipecat-ai/client-js": "^0.4.0",
"@pipecat-ai/websocket-transport": "^0.4.2",
"protobufjs": "^7.4.0"
}
}

View File

@@ -5,7 +5,7 @@
*/
/**
* Pipecat Client Implementation
* RTVI Client Implementation
*
* This client connects to an RTVI-compatible bot server using WebSocket.
*
@@ -14,14 +14,16 @@
*/
import {
PipecatClient,
PipecatClientOptions,
RTVIClient,
RTVIClientOptions,
RTVIEvent,
} from '@pipecat-ai/client-js';
import { WebSocketTransport } from '@pipecat-ai/websocket-transport';
import {
WebSocketTransport
} from "@pipecat-ai/websocket-transport";
class WebsocketClientApp {
private pcClient: PipecatClient | null = null;
private rtviClient: RTVIClient | null = null;
private connectBtn: HTMLButtonElement | null = null;
private disconnectBtn: HTMLButtonElement | null = null;
private statusSpan: HTMLElement | null = null;
@@ -29,7 +31,7 @@ class WebsocketClientApp {
private botAudio: HTMLAudioElement;
constructor() {
console.log('WebsocketClientApp');
console.log("WebsocketClientApp");
this.botAudio = document.createElement('audio');
this.botAudio.autoplay = true;
//this.botAudio.playsInline = true;
@@ -43,12 +45,8 @@ class WebsocketClientApp {
* Set up references to DOM elements and create necessary media elements
*/
private setupDOMElements(): void {
this.connectBtn = document.getElementById(
'connect-btn'
) as HTMLButtonElement;
this.disconnectBtn = document.getElementById(
'disconnect-btn'
) as HTMLButtonElement;
this.connectBtn = document.getElementById('connect-btn') as HTMLButtonElement;
this.disconnectBtn = document.getElementById('disconnect-btn') as HTMLButtonElement;
this.statusSpan = document.getElementById('connection-status');
this.debugLog = document.getElementById('debug-log');
}
@@ -93,8 +91,8 @@ class WebsocketClientApp {
* This is called when the bot is ready or when the transport state changes to ready
*/
setupMediaTracks() {
if (!this.pcClient) return;
const tracks = this.pcClient.tracks();
if (!this.rtviClient) return;
const tracks = this.rtviClient.tracks();
if (tracks.bot?.audio) {
this.setupAudioTrack(tracks.bot.audio);
}
@@ -105,10 +103,10 @@ class WebsocketClientApp {
* This handles new tracks being added during the session
*/
setupTrackListeners() {
if (!this.pcClient) return;
if (!this.rtviClient) return;
// Listen for new tracks starting
this.pcClient.on(RTVIEvent.TrackStarted, (track, participant) => {
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
// Only handle non-local (bot) tracks
if (!participant?.local && track.kind === 'audio') {
this.setupAudioTrack(track);
@@ -116,10 +114,8 @@ class WebsocketClientApp {
});
// Listen for tracks stopping
this.pcClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(
`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`
);
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
this.log(`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`);
});
}
@@ -129,10 +125,7 @@ class WebsocketClientApp {
*/
private setupAudioTrack(track: MediaStreamTrack): void {
this.log('Setting up audio track');
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject
) {
if (this.botAudio.srcObject && "getAudioTracks" in this.botAudio.srcObject) {
const oldTrack = this.botAudio.srcObject.getAudioTracks()[0];
if (oldTrack?.id === track.id) return;
}
@@ -141,15 +134,21 @@ class WebsocketClientApp {
/**
* Initialize and connect to the bot
* This sets up the Pipecat client, initializes devices, and establishes the connection
* This sets up the RTVI client, initializes devices, and establishes the connection
*/
public async connect(): Promise<void> {
try {
const startTime = Date.now();
//const transport = new DailyTransport();
const PipecatConfig: PipecatClientOptions = {
transport: new WebSocketTransport(),
const transport = new WebSocketTransport();
const RTVIConfig: RTVIClientOptions = {
transport,
params: {
// The baseURL and endpoint of your bot server that the client will connect to
baseUrl: 'http://localhost:7860',
endpoints: { connect: '/connect' },
},
enableMic: true,
enableCam: false,
callbacks: {
@@ -177,20 +176,15 @@ class WebsocketClientApp {
onMessageError: (error) => console.error('Message error:', error),
onError: (error) => console.error('Error:', error),
},
};
this.pcClient = new PipecatClient(PipecatConfig);
// @ts-ignore
window.pcClient = this.pcClient; // Expose for debugging
}
this.rtviClient = new RTVIClient(RTVIConfig);
this.setupTrackListeners();
this.log('Initializing devices...');
await this.pcClient.initDevices();
await this.rtviClient.initDevices();
this.log('Connecting to bot...');
await this.pcClient.connect({
// The baseURL and endpoint of your bot server that the client will connect to
endpoint: 'http://localhost:7860/connect',
});
await this.rtviClient.connect();
const timeTaken = Date.now() - startTime;
this.log(`Connection complete, timeTaken: ${timeTaken}`);
@@ -198,9 +192,9 @@ class WebsocketClientApp {
this.log(`Error connecting: ${(error as Error).message}`);
this.updateStatus('Error');
// Clean up if there's an error
if (this.pcClient) {
if (this.rtviClient) {
try {
await this.pcClient.disconnect();
await this.rtviClient.disconnect();
} catch (disconnectError) {
this.log(`Error during disconnect: ${disconnectError}`);
}
@@ -212,17 +206,12 @@ class WebsocketClientApp {
* Disconnect from the bot and clean up media resources
*/
public async disconnect(): Promise<void> {
if (this.pcClient) {
if (this.rtviClient) {
try {
await this.pcClient.disconnect();
this.pcClient = null;
if (
this.botAudio.srcObject &&
'getAudioTracks' in this.botAudio.srcObject
) {
this.botAudio.srcObject
.getAudioTracks()
.forEach((track) => track.stop());
await this.rtviClient.disconnect();
this.rtviClient = null;
if (this.botAudio.srcObject && "getAudioTracks" in this.botAudio.srcObject) {
this.botAudio.srcObject.getAudioTracks().forEach((track) => track.stop());
this.botAudio.srcObject = null;
}
} catch (error) {
@@ -230,6 +219,7 @@ class WebsocketClientApp {
}
}
}
}
declare global {

View File

@@ -295,22 +295,6 @@ This project uses TypeScript, React, and Next.js, making it a perfect fit for [V
Again, we'll use Pipecat Cloud. Follow the steps from above. The only difference will be the secrets required; in addition to a GOOGLE_API_KEY, you'll need `GOOGLE_APPLICATION_CREDENTIALS` in the format of a .json file with your [Google Cloud service account](https://console.cloud.google.com/iam-admin/serviceaccounts) information.
You'll need to modify the Dockerfile so that the credentials.json and word_list.py are accessible. This Dockerfile will work:
```Dockerfile
FROM dailyco/pipecat-base:latest
COPY ./requirements.txt requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./word_list.py word_list.py
COPY ./credentials.json credentials.json
COPY ./bot_phone_twilio.py bot.py
```
Note: Your `credentials.json` file should have your Google service account credentials.
#### Buy and Configure a Twilio Number
Check out the [Twilio Websocket Telephony guide](https://docs.pipecat.daily.co/pipecat-in-production/telephony/twilio-mediastreams) for a step-by-step walkthrough on how to purchase a phone number, configure your TwiML, and make or receive calls.

File diff suppressed because it is too large Load Diff

View File

@@ -9,12 +9,11 @@
"lint": "next lint"
},
"dependencies": {
"@pipecat-ai/client-js": "^1.0.0",
"@pipecat-ai/client-react": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0",
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/client-react": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.10",
"@tabler/icons-react": "^3.31.0",
"@tailwindcss/postcss": "^4.1.3",
"jotai": "^2.12.5",
"js-confetti": "^0.12.0",
"next": "15.2.4",
"react": "^19.0.0",

View File

@@ -1,26 +1,16 @@
import { useEffect, useCallback } from 'react';
import {
usePipecatClient,
usePipecatClientTransportState,
useRTVIClient,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
import { CONNECTION_STATES } from '@/constants/gameConstants';
import { useConfigurationSettings } from '@/contexts/Configuration';
// Get the API base URL from environment variables
// Default to "/api" if not specified
// "/api" is the default for Next.js API routes and used
// for the Pipecat Cloud deployed agent
const API_BASE_URL = process.env.NEXT_PUBLIC_API_BASE_URL || '/api';
console.log('Using API base URL:', API_BASE_URL);
export function useConnectionState(
onConnected?: () => void,
onDisconnected?: () => void
) {
const client = usePipecatClient();
const transportState = usePipecatClientTransportState();
const config = useConfigurationSettings();
const client = useRTVIClient();
const transportState = useRTVIClientTransportState();
const isConnected = CONNECTION_STATES.ACTIVE.includes(transportState);
const isConnecting = CONNECTION_STATES.CONNECTING.includes(transportState);
@@ -45,17 +35,12 @@ export function useConnectionState(
if (isConnected) {
await client.disconnect();
} else {
await client.connect({
endpoint: `${API_BASE_URL}/connect`,
requestData: {
personality: config.personality,
},
});
await client.connect();
}
} catch (error) {
console.error('Connection error:', error);
}
}, [client, config, isConnected]);
}, [client, isConnected]);
return {
isConnected,

View File

@@ -1,15 +1,15 @@
import { ConfigurationProvider } from '@/contexts/Configuration';
import { PipecatProvider } from '@/providers/PipecatProvider';
import { PipecatClientAudio } from '@pipecat-ai/client-react';
import type { AppProps } from 'next/app';
import { Nunito } from 'next/font/google';
import Head from 'next/head';
import '../styles/globals.css';
import { ConfigurationProvider } from "@/contexts/Configuration";
import { RTVIProvider } from "@/providers/RTVIProvider";
import { RTVIClientAudio } from "@pipecat-ai/client-react";
import type { AppProps } from "next/app";
import { Nunito } from "next/font/google";
import Head from "next/head";
import "../styles/globals.css";
const nunito = Nunito({
subsets: ['latin'],
display: 'swap',
variable: '--font-sans',
subsets: ["latin"],
display: "swap",
variable: "--font-sans",
});
export default function App({ Component, pageProps }: AppProps) {
@@ -21,10 +21,10 @@ export default function App({ Component, pageProps }: AppProps) {
</Head>
<main className={`${nunito.variable}`}>
<ConfigurationProvider>
<PipecatProvider>
<PipecatClientAudio />
<RTVIProvider>
<RTVIClientAudio />
<Component {...pageProps} />
</PipecatProvider>
</RTVIProvider>
</ConfigurationProvider>
</main>
</>

View File

@@ -1,11 +1,11 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import type { NextApiRequest, NextApiResponse } from "next";
export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method not allowed' });
if (req.method !== "POST") {
return res.status(405).json({ error: "Method not allowed" });
}
try {
@@ -15,16 +15,16 @@ export default async function handler(
if (!personality) {
return res
.status(400)
.json({ error: 'Missing required configuration parameters' });
.json({ error: "Missing required configuration parameters" });
}
const response = await fetch(
`https://api.pipecat.daily.co/v1/public/${process.env.AGENT_NAME}/start`,
{
method: 'POST',
method: "POST",
headers: {
Authorization: `Bearer ${process.env.PIPECAT_CLOUD_API_KEY}`,
'Content-Type': 'application/json',
"Content-Type": "application/json",
},
body: JSON.stringify({
createDailyRoom: true,
@@ -37,15 +37,15 @@ export default async function handler(
const data = await response.json();
console.log('Response from API:', JSON.stringify(data, null, 2));
console.log("Response from API:", JSON.stringify(data, null, 2));
// Transform the response to match what Pipecat client expects
// Transform the response to match what RTVI client expects
return res.status(200).json({
room_url: data.dailyRoom,
token: data.dailyToken,
});
} catch (error) {
console.error('Error starting agent:', error);
return res.status(500).json({ error: 'Failed to start agent' });
console.error("Error starting agent:", error);
return res.status(500).json({ error: "Failed to start agent" });
}
}

View File

@@ -1,43 +0,0 @@
'use client';
import { PipecatClient } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import { PipecatClientProvider } from '@pipecat-ai/client-react';
import { PropsWithChildren, useEffect, useState, useRef } from 'react';
export function PipecatProvider({ children }: PropsWithChildren) {
const [client, setClient] = useState<PipecatClient | null>(null);
const clientCreated = useRef(false);
useEffect(() => {
// Only create the client once
if (clientCreated.current) return;
const pcClient = new PipecatClient({
transport: new DailyTransport(),
enableMic: true,
enableCam: false,
});
setClient(pcClient);
clientCreated.current = true;
// Cleanup when component unmounts
return () => {
if (pcClient) {
pcClient.disconnect().catch((err) => {
console.error('Error disconnecting client:', err);
});
}
clientCreated.current = false;
};
}, []);
if (!client) {
return null;
}
return (
<PipecatClientProvider client={client}>{children}</PipecatClientProvider>
);
}

View File

@@ -0,0 +1,72 @@
"use client";
import { RTVIClient } from "@pipecat-ai/client-js";
import { DailyTransport } from "@pipecat-ai/daily-transport";
import { RTVIClientProvider } from "@pipecat-ai/client-react";
import { PropsWithChildren, useEffect, useState, useRef } from "react";
import { useConfigurationSettings } from "@/contexts/Configuration";
// Get the API base URL from environment variables
// Default to "/api" if not specified
// "/api" is the default for Next.js API routes and used
// for the Pipecat Cloud deployed agent
const API_BASE_URL = process.env.NEXT_PUBLIC_API_BASE_URL || "/api";
console.log("Using API base URL:", API_BASE_URL);
export function RTVIProvider({ children }: PropsWithChildren) {
const [client, setClient] = useState<RTVIClient | null>(null);
const config = useConfigurationSettings();
const clientCreated = useRef(false);
useEffect(() => {
// Only create the client once
if (clientCreated.current) return;
const transport = new DailyTransport();
const rtviClient = new RTVIClient({
transport,
params: {
baseUrl: API_BASE_URL,
endpoints: {
connect: "/connect",
},
requestData: {
personality: config.personality,
},
},
enableMic: true,
enableCam: false,
});
setClient(rtviClient);
clientCreated.current = true;
// Cleanup when component unmounts
return () => {
if (rtviClient) {
rtviClient.disconnect().catch((err) => {
console.error("Error disconnecting client:", err);
});
}
clientCreated.current = false;
};
}, []);
// Update the connectParams when config changes
useEffect(() => {
if (!client) return;
// Update the connect params without recreating the client
client.params.requestData = {
personality: config.personality,
};
}, [client, config.personality]);
if (!client) {
return null;
}
return <RTVIClientProvider client={client}>{children}</RTVIClientProvider>;
}

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
@@ -199,15 +198,16 @@ async def bot(args: DailySessionArguments):
# Local development
async def local_daily(args: DailySessionArguments):
async def local_daily():
"""Daily transport for local development."""
# from runner import configure
from runner import configure
try:
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url=args.room_url,
token=args.token,
room_url,
token,
bot_name="Bot",
params=DailyParams(
audio_in_enabled=True,
@@ -217,7 +217,7 @@ async def local_daily(args: DailySessionArguments):
)
test_config = {
"personality": args.personality,
"personality": "witty",
}
await main(transport, test_config)
@@ -227,24 +227,7 @@ async def local_daily(args: DailySessionArguments):
# Local development entry point
if LOCAL_RUN and __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the Word Wrangler bot in local development mode"
)
parser.add_argument(
"-u", "--room-url", type=str, default=os.getenv("DAILY_SAMPLE_ROOM_URL", "")
)
parser.add_argument(
"-t", "--token", type=str, default=os.getenv("DAILY_SAMPLE_ROOM_TOKEN", None)
)
parser.add_argument(
"-p",
"--personality",
default="witty",
choices=["friendly", "professional", "enthusiastic", "thoughtful", "witty"],
help="Personality preset for the bot (friendly, professional, enthusiastic, thoughtful, witty)",
)
args = parser.parse_args()
try:
asyncio.run(local_daily(args))
asyncio.run(local_daily())
except Exception as e:
logger.exception(f"Failed to run in local mode: {e}")

View File

@@ -160,15 +160,14 @@ async def rtvi_connect(request: Request) -> Dict[Any, Any]:
Raises:
HTTPException: If room creation, token generation, or bot startup fails
"""
body = await request.json()
print("Creating room for RTVI connection", body)
print("Creating room for RTVI connection")
room_url, token = await create_room_and_token()
print(f"Room URL: {room_url}")
# Start the bot process
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room_url} -t {token} -p {body.get('personality', 'witty')}"],
[f"python3 -m bot -u {room_url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),

View File

@@ -20,22 +20,19 @@ classifiers = [
"Topic :: Scientific/Engineering :: Artificial Intelligence"
]
dependencies = [
"aiohttp>=3.11.12,<4",
"aiohttp~=3.11.12",
"audioop-lts~=0.2.1; python_version>='3.13'",
"docstring_parser~=0.16",
"loguru~=0.7.3",
"Markdown>=3.7,<4",
"nltk>=3.9.1,<4",
"numpy>=1.26.4,<3",
"Pillow>=11.1.0,<12",
"Markdown~=3.7",
"numpy~=1.26.4",
"Pillow~=11.1.0",
"protobuf~=5.29.3",
"pydantic>=2.10.6,<3",
"pydantic~=2.10.6",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0",
"openai>=1.74.0,<2",
# Explicit dependency pins for Python 3.11+ compatibility
"numba>=0.60.0,<1",
"openai~=1.70.0",
]
[project.urls]
@@ -44,60 +41,59 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "websockets>=13.1,<15.0" ]
aws = [ "aioboto3~=15.0.0", "websockets>=13.1,<15.0" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.0.2; python_version>='3.12'" ]
assemblyai = [ "websockets~=13.1" ]
aws = [ "boto3~=1.37.16", "websockets~=13.1" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.0.2" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "websockets>=13.1,<15.0" ]
cartesia = [ "cartesia~=2.0.3", "websockets~=13.1" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.19.4" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
elevenlabs = [ "websockets>=13.1,<15.0" ]
deepgram = [ "deepgram-sdk~=4.1.0" ]
elevenlabs = [ "websockets~=13.1" ]
fal = [ "fal-client~=0.5.9" ]
fireworks = []
fish = [ "ormsgpack~=1.7.0", "websockets>=13.1,<15.0" ]
gladia = [ "websockets>=13.1,<15.0" ]
google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", "google-genai~=1.24.0", "websockets>=13.1,<15.0" ]
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", "google-genai~=1.24.0", "websockets~=13.1" ]
grok = []
groq = [ "groq~=0.23.0" ]
gstreamer = [ "pygobject~=3.50.0" ]
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
koala = [ "pvkoala~=2.0.3" ]
langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-openai~=0.3.9" ]
livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity>=8.2.3,<10.0.0" ]
lmnt = [ "websockets>=13.1,<15.0" ]
livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity~=9.0.0" ]
lmnt = [ "websockets~=13.1" ]
local = [ "pyaudio~=0.2.14" ]
mcp = [ "mcp[cli]~=1.9.4" ]
mem0 = [ "mem0ai~=0.1.94" ]
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers>=4.48.0" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
nim = []
neuphonic = [ "websockets>=13.1,<15.0" ]
neuphonic = [ "pyneuphonic~=1.5.13", "websockets~=13.1" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "websockets>=13.1,<15.0" ]
openai = [ "websockets~=13.1" ]
openpipe = [ "openpipe~=4.50.0" ]
openrouter = []
perplexity = []
playht = [ "pyht>=0.1.6", "websockets>=13.1,<15.0" ]
playht = [ "pyht~=0.1.12", "websockets~=13.1" ]
qwen = []
rime = [ "websockets>=13.1,<15.0" ]
riva = [ "nvidia-riva-client~=2.21.1" ]
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.19.1" ]
sambanova = []
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch~=2.5.0", "torchaudio~=2.5.0" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "websockets>=13.1,<15.0" ]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.3.1" ]
tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm~=0.7.3" ]
ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ]
webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ]
websocket = [ "websockets>=13.1,<15.0", "fastapi>=0.115.6,<0.117.0" ]
websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]
[tool.setuptools.packages.find]
@@ -152,6 +148,3 @@ convention = "google"
command_line = "--module pytest"
source = ["src"]
omit = ["*/tests/*"]
[project.scripts]
pipecat = "pipecat.__main__:main"

View File

@@ -1,10 +1,5 @@
#!/bin/bash
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
echo "Running ruff format..."
ruff format "$PROJECT_ROOT"
echo "Running ruff check..."
ruff check --fix "$PROJECT_ROOT"
ruff format src
ruff format examples
ruff format tests
ruff format scripts
ruff check --select I,D --fix

View File

@@ -1,101 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import importlib.util
import sys
from pathlib import Path
from typing import Any, Callable
from loguru import logger
def load_bot_module(file_path: str, function_name: str = "run_example"):
"""Load a bot module from a Python file and return the specified function.
Args:
file_path: Path to the Python file containing the bot
function_name: Name of the function to load (default: run_example)
Returns:
The callable function from the module
Raises:
SystemExit: If the file doesn't exist, isn't a Python file, or the function isn't found
"""
logger.info(f"Loading bot module from: {file_path}")
logger.info(f"Looking for function: {function_name}")
file_path_obj = Path(file_path)
if not file_path_obj.exists():
print(f"Error: File '{file_path}' not found", file=sys.stderr)
sys.exit(1)
if not file_path_obj.suffix == ".py":
print(f"Error: File '{file_path}' is not a Python file", file=sys.stderr)
sys.exit(1)
# Import the module
try:
logger.info(f"Importing module from: {file_path}")
spec = importlib.util.spec_from_file_location("bot_module", file_path_obj)
if spec is None or spec.loader is None:
print(f"Error: Could not load module from '{file_path}'", file=sys.stderr)
sys.exit(1)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
logger.info(f"Successfully imported module: {module.__name__}")
except Exception as e:
print(f"Error importing module from '{file_path}': {e}", file=sys.stderr)
sys.exit(1)
# Find the function to run
if not hasattr(module, function_name):
print(f"Error: Function '{function_name}' not found in '{file_path}'", file=sys.stderr)
print(
f"Available functions: {[name for name in dir(module) if not name.startswith('_')]}", file=sys.stderr)
sys.exit(1)
run_example = getattr(module, function_name)
if not callable(run_example):
print(f"Error: '{function_name}' is not a callable function", file=sys.stderr)
sys.exit(1)
logger.info(f"Successfully loaded function: {function_name}")
return run_example
def main():
"""Main entry point for the pipecat command line tool.
This function is called by the entry point script and handles argument parsing
and module loading before calling the actual main execution logic.
"""
# Set up argument parser for our specific arguments
parser = argparse.ArgumentParser(description="Run a Pipecat bot from a Python file")
parser.add_argument("file", help="Python file containing the bot to run")
parser.add_argument("--function", "-f", default="run_example",
help="Function name to run (default: run_example)")
# Parse our arguments first
args, remaining_args = parser.parse_known_args()
# Load the bot module and get the function
run_example = load_bot_module(args.file, args.function)
# Set sys.argv to the remaining arguments for the run_main function
sys.argv = [sys.argv[0]] + remaining_args
# Import run_main only when we need it
from pipecat.examples.run import main as run_main
# Call the main function from pipecat.examples.run
run_main(run_example)
if __name__ == "__main__":
main()

View File

@@ -76,16 +76,6 @@ class BaseTurnAnalyzer(ABC):
"""
pass
@property
@abstractmethod
def params(self):
"""Get the current turn analyzer parameters.
Returns:
Current turn analyzer configuration parameters.
"""
pass
@abstractmethod
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""Appends audio data for analysis.

View File

@@ -87,15 +87,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
"""
return self._speech_triggered
@property
def params(self) -> SmartTurnParams:
"""Get the current smart turn parameters.
Returns:
Current smart turn configuration parameters.
"""
return self._params
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""Append audio data for turn analysis.

View File

@@ -1,196 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Local PyTorch turn analyzer for on-device ML inference using the smart-turn-v2 model.
This module provides a smart turn analyzer that uses PyTorch models for
local end-of-turn detection without requiring network connectivity.
"""
from typing import Any, Dict
import numpy as np
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
try:
import torch
import torch.nn.functional as F
from torch import nn
from transformers import (
Wav2Vec2Config,
Wav2Vec2Model,
Wav2Vec2PreTrainedModel,
Wav2Vec2Processor,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use LocalSmartTurnAnalyzerV2, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
class LocalSmartTurnAnalyzerV2(BaseSmartTurn):
"""Local turn analyzer using the smart-turn-v2 PyTorch model.
Provides end-of-turn detection using locally-stored PyTorch models,
enabling offline operation without network dependencies. Uses
Wav2Vec2 architecture for audio sequence classification.
"""
def __init__(self, *, smart_turn_model_path: str, **kwargs):
"""Initialize the local PyTorch smart-turn-v2 analyzer.
Args:
smart_turn_model_path: Path to directory containing the PyTorch model
and feature extractor files. If empty, uses default HuggingFace model.
**kwargs: Additional arguments passed to BaseSmartTurn.
"""
super().__init__(**kwargs)
if not smart_turn_model_path:
# Define the path to the pretrained model on Hugging Face
smart_turn_model_path = "pipecat-ai/smart-turn-v2"
logger.debug("Loading Local Smart Turn v2 model...")
# Load the pretrained model for sequence classification
self._turn_model = _Wav2Vec2ForEndpointing.from_pretrained(smart_turn_model_path)
# Load the corresponding feature extractor for preprocessing audio
self._turn_processor = Wav2Vec2Processor.from_pretrained(smart_turn_model_path)
# Use platform-optimized backend if available (MPS for Apple silicon, CUDA for NVIDIA)
self._device = "cpu"
if torch.backends.mps.is_available():
self._device = "mps"
elif torch.cuda.is_available():
self._device = "cuda"
# Move model to selected device and set it to evaluation mode
self._turn_model = self._turn_model.to(self._device)
self._turn_model.eval()
logger.debug("Loaded Local Smart Turn v2")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local PyTorch model."""
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,
padding="max_length",
truncation=True,
max_length=16000 * 16, # 16 seconds at 16kHz
return_attention_mask=True,
return_tensors="pt",
)
# Move inputs to device
inputs = {k: v.to(self._device) for k, v in inputs.items()}
# Run inference
with torch.no_grad():
outputs = self._turn_model(**inputs)
# The model returns sigmoid probabilities directly in the logits field
probability = outputs["logits"][0].item()
# Make prediction (1 for Complete, 0 for Incomplete)
prediction = 1 if probability > 0.5 else 0
return {
"prediction": prediction,
"probability": probability,
}
class _Wav2Vec2ForEndpointing(Wav2Vec2PreTrainedModel):
def __init__(self, config: Wav2Vec2Config):
super().__init__(config)
self.wav2vec2 = Wav2Vec2Model(config)
self.pool_attention = nn.Sequential(
nn.Linear(config.hidden_size, 256), nn.Tanh(), nn.Linear(256, 1)
)
self.classifier = nn.Sequential(
nn.Linear(config.hidden_size, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(256, 64),
nn.GELU(),
nn.Linear(64, 1),
)
for module in self.classifier:
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=0.1)
if module.bias is not None:
module.bias.data.zero_()
for module in self.pool_attention:
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=0.1)
if module.bias is not None:
module.bias.data.zero_()
def attention_pool(self, hidden_states, attention_mask):
# Calculate attention weights
attention_weights = self.pool_attention(hidden_states)
if attention_mask is None:
raise ValueError("attention_mask must be provided for attention pooling")
attention_weights = attention_weights + (
(1.0 - attention_mask.unsqueeze(-1).to(attention_weights.dtype)) * -1e9
)
attention_weights = F.softmax(attention_weights, dim=1)
# Apply attention to hidden states
weighted_sum = torch.sum(hidden_states * attention_weights, dim=1)
return weighted_sum
def forward(self, input_values, attention_mask=None, labels=None):
outputs = self.wav2vec2(input_values, attention_mask=attention_mask)
hidden_states = outputs[0]
# Create transformer padding mask
if attention_mask is not None:
input_length = attention_mask.size(1)
hidden_length = hidden_states.size(1)
ratio = input_length / hidden_length
indices = (torch.arange(hidden_length, device=attention_mask.device) * ratio).long()
attention_mask = attention_mask[:, indices]
attention_mask = attention_mask.bool()
else:
attention_mask = None
pooled = self.attention_pool(hidden_states, attention_mask)
logits = self.classifier(pooled)
if torch.isnan(logits).any():
raise ValueError("NaN values detected in logits")
if labels is not None:
# Calculate positive sample weight based on batch statistics
pos_weight = ((labels == 0).sum() / (labels == 1).sum()).clamp(min=0.1, max=10.0)
loss_fct = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
labels = labels.float()
loss = loss_fct(logits.view(-1), labels.view(-1))
# Add L2 regularization for classifier layers
l2_lambda = 0.01
l2_reg = torch.tensor(0.0, device=logits.device)
for param in self.classifier.parameters():
l2_reg += torch.norm(param)
loss += l2_lambda * l2_reg
probs = torch.sigmoid(logits.detach())
return {"loss": loss, "logits": probs}
probs = torch.sigmoid(logits)
return {"logits": probs}

View File

@@ -183,37 +183,36 @@ class VADAnalyzer(ABC):
if len(self._vad_buffer) < num_required_bytes:
return self._vad_state
while len(self._vad_buffer) >= num_required_bytes:
audio_frames = self._vad_buffer[:num_required_bytes]
self._vad_buffer = self._vad_buffer[num_required_bytes:]
audio_frames = self._vad_buffer[:num_required_bytes]
self._vad_buffer = self._vad_buffer[num_required_bytes:]
confidence = self.voice_confidence(audio_frames)
confidence = self.voice_confidence(audio_frames)
volume = self._get_smoothed_volume(audio_frames)
self._prev_volume = volume
volume = self._get_smoothed_volume(audio_frames)
self._prev_volume = volume
speaking = confidence >= self._params.confidence and volume >= self._params.min_volume
speaking = confidence >= self._params.confidence and volume >= self._params.min_volume
if speaking:
match self._vad_state:
case VADState.QUIET:
self._vad_state = VADState.STARTING
self._vad_starting_count = 1
case VADState.STARTING:
self._vad_starting_count += 1
case VADState.STOPPING:
self._vad_state = VADState.SPEAKING
self._vad_stopping_count = 0
else:
match self._vad_state:
case VADState.STARTING:
self._vad_state = VADState.QUIET
self._vad_starting_count = 0
case VADState.SPEAKING:
self._vad_state = VADState.STOPPING
self._vad_stopping_count = 1
case VADState.STOPPING:
self._vad_stopping_count += 1
if speaking:
match self._vad_state:
case VADState.QUIET:
self._vad_state = VADState.STARTING
self._vad_starting_count = 1
case VADState.STARTING:
self._vad_starting_count += 1
case VADState.STOPPING:
self._vad_state = VADState.SPEAKING
self._vad_stopping_count = 0
else:
match self._vad_state:
case VADState.STARTING:
self._vad_state = VADState.QUIET
self._vad_starting_count = 0
case VADState.SPEAKING:
self._vad_state = VADState.STOPPING
self._vad_stopping_count = 1
case VADState.STOPPING:
self._vad_stopping_count += 1
if (
self._vad_state == VADState.STARTING

View File

@@ -9,21 +9,6 @@
This module provides a unified interface for running Pipecat examples across
different transport types including Daily.co, WebRTC, and Twilio. It handles
setup, configuration, and lifecycle management for each transport type.
Example usage:
SmallWebRTCTransport::
python bot.py --transport webrtc
DailyTransport::
python bot.py --transport daily
Twilio::
python bot.py --transport twilio --proxy username.ngrok.io
# Note: Concurrently, run an ngrok tunnel to your local server:
# ngrok http 7860
"""
import argparse

View File

@@ -28,7 +28,6 @@ from typing import (
)
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
@@ -614,7 +613,6 @@ class StartFrame(SystemFrame):
audio_out_sample_rate: Output audio sample rate in Hz.
allow_interruptions: Whether to allow user interruptions.
enable_metrics: Whether to enable performance metrics collection.
enable_tracing: Whether to enable OpenTelemetry tracing.
enable_usage_metrics: Whether to enable usage metrics collection.
interruption_strategies: List of interruption handling strategies.
report_only_initial_ttfb: Whether to report only initial time-to-first-byte.
@@ -624,7 +622,6 @@ class StartFrame(SystemFrame):
audio_out_sample_rate: int = 24000
allow_interruptions: bool = False
enable_metrics: bool = False
enable_tracing: bool = False
enable_usage_metrics: bool = False
interruption_strategies: List[BaseInterruptionStrategy] = field(default_factory=list)
report_only_initial_ttfb: bool = False
@@ -1148,23 +1145,6 @@ class OutputDTMFUrgentFrame(DTMFFrame, SystemFrame):
pass
@dataclass
class SpeechControlParamsFrame(SystemFrame):
"""Frame for notifying processors of speech control parameter changes.
This includes parameters for both VAD (Voice Activity Detection) and
turn-taking analysis. It allows downstream processors to adjust their
behavior based on updated interaction control settings.
Parameters:
vad_params: Current VAD parameters.
turn_params: Current turn-taking analysis parameters.
"""
vad_params: Optional[VADParams] = None
turn_params: Optional[SmartTurnParams] = None
#
# Control frames
#

View File

@@ -273,17 +273,12 @@ class ParallelPipeline(BasePipeline):
if not self._down_task:
self._down_task = self.create_task(self._process_down_queue())
async def _drain_queue(self, queue: asyncio.Queue):
try:
while not queue.empty():
queue.get_nowait()
except asyncio.QueueEmpty:
logger.debug(f"Draining {self} queue already empty")
async def _drain_queues(self):
"""Drain all frames from upstream and downstream queues."""
await self._drain_queue(self._up_queue)
await self._drain_queue(self._down_queue)
while not self._up_queue.empty:
await self._up_queue.get()
while not self._down_queue.empty:
await self._down_queue.get()
async def _handle_interruption(self):
"""Handle interruption by cancelling tasks, draining queues, and restarting."""

View File

@@ -38,16 +38,14 @@ class PipelineRunner(BaseObject):
handle_sigint: bool = True,
force_gc: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
handle_sigterm: bool = False,
):
"""Initialize the pipeline runner.
Args:
name: Optional name for the runner instance.
handle_sigint: Whether to automatically handle SIGINT signals.
handle_sigint: Whether to automatically handle SIGINT/SIGTERM signals.
force_gc: Whether to force garbage collection after task completion.
loop: Event loop to use. If None, uses the current running loop.
handle_sigterm: Whether to automatically handle SIGTERM signals.
"""
super().__init__(name=name)
@@ -59,9 +57,6 @@ class PipelineRunner(BaseObject):
if handle_sigint:
self._setup_sigint()
if handle_sigterm:
self._setup_sigterm()
async def run(self, task: PipelineTask):
"""Run a pipeline task to completion.
@@ -101,10 +96,6 @@ class PipelineRunner(BaseObject):
"""Set up signal handlers for graceful shutdown."""
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda *args: self._sig_handler())
def _setup_sigterm(self):
"""Set up signal handlers for graceful shutdown."""
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, lambda *args: self._sig_handler())
def _sig_handler(self):

View File

@@ -638,7 +638,6 @@ class PipelineTask(BasePipelineTask):
audio_in_sample_rate=self._params.audio_in_sample_rate,
audio_out_sample_rate=self._params.audio_out_sample_rate,
enable_metrics=self._params.enable_metrics,
enable_tracing=self._enable_tracing,
enable_usage_metrics=self._params.enable_usage_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
interruption_strategies=self._params.interruption_strategies,

View File

@@ -19,8 +19,6 @@ from typing import Dict, List, Literal, Optional, Set
from loguru import logger
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotInterruptionFrame,
BotStartedSpeakingFrame,
@@ -45,7 +43,6 @@ from pipecat.frames.frames import (
LLMSetToolsFrame,
LLMTextFrame,
OpenAILLMContextAssistantTimestampFrame,
SpeechControlParamsFrame,
StartFrame,
StartInterruptionFrame,
TextFrame,
@@ -70,13 +67,9 @@ class LLMUserAggregatorParams:
aggregation_timeout: Maximum time in seconds to wait for additional
transcription content before pushing aggregated result. This
timeout is used only when the transcription is slow to arrive.
turn_emulated_vad_timeout: Maximum time in seconds to wait for emulated
VAD when using turn-based analysis. Applied when transcription is
received but VAD didn't detect speech (e.g., whispered utterances).
"""
aggregation_timeout: float = 0.5
turn_emulated_vad_timeout: float = 0.8
@dataclass
@@ -397,9 +390,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
"""
super().__init__(context=context, role="user", **kwargs)
self._params = params or LLMUserAggregatorParams()
self._vad_params: Optional[VADParams] = None
self._turn_params: Optional[SmartTurnParams] = None
if "aggregation_timeout" in kwargs:
import warnings
@@ -487,10 +477,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
self.set_tools(frame.tools)
elif isinstance(frame, LLMSetToolChoiceFrame):
self.set_tool_choice(frame.tool_choice)
elif isinstance(frame, SpeechControlParamsFrame):
self._vad_params = frame.vad_params
self._turn_params = frame.turn_params
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
@@ -632,40 +618,9 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
async def _aggregation_task_handler(self):
while True:
try:
# The _aggregation_task_handler handles two distinct timeout scenarios:
#
# 1. When emulating_vad=True: Wait for emulated VAD timeout before
# pushing aggregation (simulating VAD behavior when no actual VAD
# detection occurred).
#
# 2. When emulating_vad=False: Use aggregation_timeout as a buffer
# to wait for potential late-arriving transcription frames after
# a real VAD event.
#
# For emulated VAD scenarios, the timeout strategy depends on whether
# a turn analyzer is configured:
#
# - WITH turn analyzer: Use turn_emulated_vad_timeout parameter because
# the VAD's stop_secs is set very low (e.g. 0.2s) for rapid speech
# chunking to feed the turn analyzer. This low value is too fast
# for emulated VAD scenarios where we need to allow users time to
# finish speaking (e.g. 0.8s).
#
# - WITHOUT turn analyzer: Use VAD's stop_secs directly to maintain
# consistent user experience between real VAD detection and
# emulated VAD scenarios.
if not self._emulating_vad:
timeout = self._params.aggregation_timeout
elif self._turn_params:
timeout = self._params.turn_emulated_vad_timeout
else:
# Use VAD stop_secs when no turn analyzer is present, fallback if no VAD params
timeout = (
self._vad_params.stop_secs
if self._vad_params
else self._params.turn_emulated_vad_timeout
)
await asyncio.wait_for(self._aggregation_event.wait(), timeout)
await asyncio.wait_for(
self._aggregation_event.wait(), self._params.aggregation_timeout
)
await self._maybe_emulate_user_speaking()
except asyncio.TimeoutError:
if not self._user_speaking:
@@ -693,11 +648,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
# to emulate VAD (i.e. user start/stopped speaking), but we do it only
# if the bot is not speaking. If the bot is speaking and we really have
# a short utterance we don't really want to interrupt the bot.
if (
not self._user_speaking
and not self._waiting_for_aggregation
and len(self._aggregation) > 0
):
if not self._user_speaking and not self._waiting_for_aggregation:
if self._bot_speaking:
# If we reached this case and the bot is speaking, let's ignore
# what the user said.

View File

@@ -44,7 +44,6 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMTextFrame,
MetricsFrame,
StartFrame,
@@ -72,14 +71,13 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import (
FunctionCallParams, # TODO(aleix): we shouldn't import `services` from `processors`
)
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.string import match_endofsentence
RTVI_PROTOCOL_VERSION = "1.0.0"
RTVI_PROTOCOL_VERSION = "0.3.0"
RTVI_MESSAGE_LABEL = "rtvi-ai"
RTVIMessageLiteral = Literal["rtvi-ai"]
@@ -92,10 +90,6 @@ class RTVIServiceOption(BaseModel):
Defines a configurable option that can be set for an RTVI service,
including its name, type, and handler function.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
name: str
@@ -110,10 +104,6 @@ class RTVIService(BaseModel):
Represents a service that can be configured and used within the RTVI protocol,
containing a name and list of configurable options.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
name: str
@@ -132,10 +122,6 @@ class RTVIActionArgumentData(BaseModel):
"""Data for an RTVI action argument.
Contains the name and value of an argument passed to an RTVI action.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
name: str
@@ -146,10 +132,6 @@ class RTVIActionArgument(BaseModel):
"""Definition of an RTVI action argument.
Specifies the name and expected type of an argument for an RTVI action.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
name: str
@@ -161,10 +143,6 @@ class RTVIAction(BaseModel):
Represents an action that can be executed within the RTVI protocol,
including its service, name, arguments, and handler function.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
service: str
@@ -188,10 +166,6 @@ class RTVIServiceOptionConfig(BaseModel):
"""Configuration value for an RTVI service option.
Contains the name and value to set for a specific service option.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
name: str
@@ -202,10 +176,6 @@ class RTVIServiceConfig(BaseModel):
"""Configuration for an RTVI service.
Contains the service name and list of option configurations to apply.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
service: str
@@ -216,10 +186,6 @@ class RTVIConfig(BaseModel):
"""Complete RTVI configuration.
Contains the full configuration for all RTVI services.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
config: List[RTVIServiceConfig]
@@ -230,15 +196,10 @@ class RTVIConfig(BaseModel):
#
# deprecated
class RTVIUpdateConfig(BaseModel):
"""Request to update RTVI configuration.
Contains new configuration settings and whether to interrupt the bot.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
config: List[RTVIServiceConfig]
@@ -249,10 +210,6 @@ class RTVIActionRunArgument(BaseModel):
"""Argument for running an RTVI action.
Contains the name and value of an argument to pass to an action.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
name: str
@@ -263,10 +220,6 @@ class RTVIActionRun(BaseModel):
"""Request to run an RTVI action.
Contains the service, action name, and optional arguments.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
service: str
@@ -281,80 +234,12 @@ class RTVIActionFrame(DataFrame):
Parameters:
rtvi_action_run: The action to execute.
message_id: Optional message ID for response correlation.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
rtvi_action_run: RTVIActionRun
message_id: Optional[str] = None
class RTVIRawClientMessageData(BaseModel):
"""Data structure expected from client messages sent to the RTVI server."""
t: str
d: Optional[Any] = None
class RTVIClientMessage(BaseModel):
"""Cleansed data structure for client messages for handling."""
msg_id: str
type: str
data: Optional[Any] = None
@dataclass
class RTVIClientMessageFrame(SystemFrame):
"""A frame for sending messages from the client to the RTVI server.
This frame is meant for custom messaging from the client to the server
and expects a server-response message.
"""
msg_id: str
type: str
data: Optional[Any] = None
@dataclass
class RTVIServerResponseFrame(SystemFrame):
"""A frame for responding to a client RTVI message.
This frame should be sent in response to an RTVIClientMessageFrame
and include the original RTVIClientMessageFrame to ensure the response
is properly attributed to the original request. To respond with an error,
set the `error` field to a string describing the error. This will result
in the client receiving a `response-error` message instead of a
`server-response` message.
"""
client_msg: RTVIClientMessageFrame
data: Optional[Any] = None
error: Optional[str] = None
class RTVIRawServerResponseData(BaseModel):
"""Data structure for server responses to client messages."""
t: str
d: Optional[Any] = None
class RTVIServerResponse(BaseModel):
"""The RTVI-formatted message response from the server to the client.
This message is used to respond to custom messages sent by the client.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["server-response"] = "server-response"
id: str
data: RTVIRawServerResponseData
class RTVIMessage(BaseModel):
"""Base RTVI message structure.
@@ -384,7 +269,7 @@ class RTVIErrorResponseData(BaseModel):
class RTVIErrorResponse(BaseModel):
"""RTVI error response message.
RTVI Formatted error response message for relaying failed client requests.
Sent in response to a client request that resulted in an error.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
@@ -400,13 +285,13 @@ class RTVIErrorData(BaseModel):
"""
error: str
fatal: bool # Indicates the pipeline has stopped due to this error
fatal: bool
class RTVIError(BaseModel):
"""RTVI error event message.
RTVI Formatted error message for relaying errors in the pipeline.
Sent when an error occurs that isn't in response to a specific request.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
@@ -418,10 +303,6 @@ class RTVIDescribeConfigData(BaseModel):
"""Data for describing available RTVI configuration.
Contains the list of available services and their options.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
config: List[RTVIService]
@@ -431,10 +312,6 @@ class RTVIDescribeConfig(BaseModel):
"""Message describing available RTVI configuration.
Sent in response to a describe-config request.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
@@ -447,10 +324,6 @@ class RTVIDescribeActionsData(BaseModel):
"""Data for describing available RTVI actions.
Contains the list of available actions that can be executed.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
actions: List[RTVIAction]
@@ -460,10 +333,6 @@ class RTVIDescribeActions(BaseModel):
"""Message describing available RTVI actions.
Sent in response to a describe-actions request.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
@@ -476,10 +345,6 @@ class RTVIConfigResponse(BaseModel):
"""Response containing current RTVI configuration.
Sent in response to a get-config request.
.. deprecated:: 0.0.75
Pipeline Configuration has been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
@@ -492,10 +357,6 @@ class RTVIActionResponseData(BaseModel):
"""Data for an RTVI action response.
Contains the result of executing an action.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
result: ActionResult
@@ -505,10 +366,6 @@ class RTVIActionResponse(BaseModel):
"""Response to an RTVI action execution.
Sent after successfully executing an action.
.. deprecated:: 0.0.75
Actions have been removed as part of the RTVI protocol 1.0.0.
Use custom client and server messages instead.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
@@ -517,30 +374,6 @@ class RTVIActionResponse(BaseModel):
data: RTVIActionResponseData
class AboutClientData(BaseModel):
"""Data about the RTVI client.
Contains information about the client, including which RTVI library it
is using, what platform it is on and any additional details, if available.
"""
library: str
library_version: Optional[str] = None
platform: Optional[str] = None
platform_version: Optional[str] = None
platform_details: Optional[Any] = None
class RTVIClientReadyData(BaseModel):
"""Data format of client ready messages.
Contains the RTVIprotocol version and client information.
"""
version: str
about: AboutClientData
class RTVIBotReadyData(BaseModel):
"""Data for bot ready notification.
@@ -548,10 +381,7 @@ class RTVIBotReadyData(BaseModel):
"""
version: str
# The config field is deprecated and will not be included if
# the client's rtvi version is 1.0.0 or higher.
config: Optional[List[RTVIServiceConfig]] = None
about: Optional[Mapping[str, Any]] = None
config: List[RTVIServiceConfig]
class RTVIBotReady(BaseModel):
@@ -588,25 +418,6 @@ class RTVILLMFunctionCallMessage(BaseModel):
data: RTVILLMFunctionCallMessageData
class RTVIAppendToContextData(BaseModel):
"""Data format for appending messages to the context.
Contains the role, content, and whether to run the message immediately.
"""
role: Literal["user", "assistant"] | str
content: Any
run_immediately: bool = False
class RTVIAppendToContext(BaseModel):
"""RTVI Message format to append content to the LLM context."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["append-to-context"] = "append-to-context"
data: RTVIAppendToContextData
class RTVILLMFunctionCallStartMessageData(BaseModel):
"""Data for LLM function call start notification.
@@ -941,11 +752,6 @@ class RTVIObserver(BaseObserver):
elif isinstance(frame, RTVIServerMessageFrame):
message = RTVIServerMessage(data=frame.data)
await self.push_transport_message_urgent(message)
elif isinstance(frame, RTVIServerResponseFrame):
if frame.error is not None:
await self._send_error_response(frame)
else:
await self._send_server_response(frame)
if mark_as_seen:
self._frames_seen.add(frame.id)
@@ -1073,22 +879,6 @@ class RTVIObserver(BaseObserver):
message = RTVIMetricsMessage(data=metrics)
await self.push_transport_message_urgent(message)
async def _send_server_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
message = RTVIServerResponse(
id=str(frame.client_msg.msg_id),
data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data),
)
await self.push_transport_message_urgent(message)
async def _send_error_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
if self._params.errors_enabled:
message = RTVIErrorResponse(
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
)
await self.push_transport_message_urgent(message)
class RTVIProcessor(FrameProcessor):
"""Main processor for handling RTVI protocol messages and actions.
@@ -1118,7 +908,6 @@ class RTVIProcessor(FrameProcessor):
self._bot_ready = False
self._client_ready = False
self._client_ready_id = ""
self._client_version = []
self._errors_enabled = True
self._registered_actions: Dict[str, RTVIAction] = {}
@@ -1132,7 +921,6 @@ class RTVIProcessor(FrameProcessor):
self._register_event_handler("on_bot_started")
self._register_event_handler("on_client_ready")
self._register_event_handler("on_client_message")
self._input_transport = None
self._transport = transport
@@ -1148,15 +936,6 @@ class RTVIProcessor(FrameProcessor):
Args:
action: The action to register.
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The actions API is deprecated, use server and client messages instead.",
DeprecationWarning,
)
id = self._action_id(action.service, action.action)
self._registered_actions[id] = action
@@ -1166,15 +945,6 @@ class RTVIProcessor(FrameProcessor):
Args:
service: The service to register.
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The actions API is deprecated, use server and client messages instead.",
DeprecationWarning,
)
self._registered_services[service.name] = service
async def set_client_ready(self):
@@ -1200,22 +970,6 @@ class RTVIProcessor(FrameProcessor):
"""Send a bot interruption frame upstream."""
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
async def send_server_message(self, data: Any):
"""Send a server message to the client."""
message = RTVIServerMessage(data=data)
await self._send_server_message(message)
async def send_server_response(self, client_msg: RTVIClientMessage, data: Any):
"""Send a server response for a given client message."""
message = RTVIServerResponse(
id=client_msg.msg_id, data=RTVIRawServerResponseData(t=client_msg.type, d=data)
)
await self._send_server_message(message)
async def send_error_response(self, client_msg: RTVIClientMessage, error: str):
"""Send an error response for a given client message."""
await self._send_error_response(id=client_msg.msg_id, error=error)
async def send_error(self, error: str):
"""Send an error message to the client.
@@ -1259,6 +1013,9 @@ class RTVIProcessor(FrameProcessor):
function_name: Name of the function being called.
llm: The LLM processor making the call.
context: The LLM context.
Note:
This method is deprecated. Use handle_function_call() instead.
"""
import warnings
@@ -1379,15 +1136,7 @@ class RTVIProcessor(FrameProcessor):
try:
match message.type:
case "client-ready":
data = None
try:
data = RTVIClientReadyData.model_validate(message.data)
except ValidationError:
# Not all clients have been updated to RTVI 1.0.0.
# For now, that's okay, we just log their info as unknown.
data = None
pass
await self._handle_client_ready(message.id, data)
await self._handle_client_ready(message.id)
case "describe-actions":
await self._handle_describe_actions(message.id)
case "describe-config":
@@ -1399,9 +1148,6 @@ class RTVIProcessor(FrameProcessor):
await self._handle_update_config(message.id, update_config)
case "disconnect-bot":
await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
case "client-message":
data = RTVIRawClientMessageData.model_validate(message.data)
await self._handle_client_message(message.id, data)
case "action":
action = RTVIActionRun.model_validate(message.data)
action_frame = RTVIActionFrame(message_id=message.id, rtvi_action_run=action)
@@ -1409,9 +1155,6 @@ class RTVIProcessor(FrameProcessor):
case "llm-function-call-result":
data = RTVILLMFunctionCallResultData.model_validate(message.data)
await self._handle_function_call_result(data)
case "append-to-context":
data = RTVIAppendToContextData.model_validate(message.data)
await self._handle_update_context(data)
case "raw-audio" | "raw-audio-batch":
await self._handle_audio_buffer(message.data)
@@ -1425,20 +1168,9 @@ class RTVIProcessor(FrameProcessor):
await self._send_error_response(message.id, f"Exception processing message: {e}")
logger.warning(f"Exception processing message: {e}")
async def _handle_client_ready(self, request_id: str, data: RTVIClientReadyData | None):
"""Handle the client-ready message from the client."""
version = data.version if data else "unknown"
logger.debug(f"Received client-ready: version {version}")
if version == "unknown":
self._client_version = [0, 3, 0] # Default to 0.3.0 if unknown
else:
try:
self._client_version = [int(v) for v in version.split(".")]
except ValueError:
logger.warning(f"Invalid client version format: {version}")
self._client_version = [0, 3, 0]
about = data.about if data else {"library": "unknown"}
logger.debug(f"Client Details: {about}")
async def _handle_client_ready(self, request_id: str):
"""Handle a client-ready message."""
logger.debug("Received client-ready")
if self._input_transport:
await self._input_transport.start_audio_in_streaming()
@@ -1469,45 +1201,18 @@ class RTVIProcessor(FrameProcessor):
async def _handle_describe_config(self, request_id: str):
"""Handle a describe-config request."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Configuration helpers are deprecated. If your application needs this behavior, use custom server and client messages.",
DeprecationWarning,
)
services = list(self._registered_services.values())
message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services))
await self._push_transport_message(message)
async def _handle_describe_actions(self, request_id: str):
"""Handle a describe-actions request."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The Actions API is deprecated, use custom server and client messages instead.",
DeprecationWarning,
)
actions = list(self._registered_actions.values())
message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions))
await self._push_transport_message(message)
async def _handle_get_config(self, request_id: str):
"""Handle a get-config request."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Configuration helpers are deprecated. If your application needs this behavior, use custom server and client messages.",
DeprecationWarning,
)
message = RTVIConfigResponse(id=request_id, data=self._config)
await self._push_transport_message(message)
@@ -1525,15 +1230,6 @@ class RTVIProcessor(FrameProcessor):
async def _update_service_config(self, config: RTVIServiceConfig):
"""Update configuration for a specific service."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Configuration helpers are deprecated. If your application needs this behavior, use custom server and client messages.",
DeprecationWarning,
)
service = self._registered_services[config.service]
for option in config.options:
handler = service._options_dict[option.name].handler
@@ -1542,15 +1238,6 @@ class RTVIProcessor(FrameProcessor):
async def _update_config(self, data: RTVIConfig, interrupt: bool):
"""Update the RTVI configuration."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Configuration helpers are deprecated. If your application needs this behavior, use custom server and client messages.",
DeprecationWarning,
)
if interrupt:
await self.interrupt_bot()
for service_config in data.config:
@@ -1561,33 +1248,6 @@ class RTVIProcessor(FrameProcessor):
await self._update_config(RTVIConfig(config=data.config), data.interrupt)
await self._handle_get_config(request_id)
async def _handle_update_context(self, data: RTVIAppendToContextData):
if data.run_immediately:
await self.interrupt_bot()
frame = LLMMessagesAppendFrame(
messages=[{"role": data.role, "content": data.content}],
run_llm=data.run_immediately,
)
await self.push_frame(frame)
async def _handle_client_message(self, msg_id: str, data: RTVIRawClientMessageData):
"""Handle a client message frame."""
if not data:
await self._send_error_response(msg_id, "Malformed client message")
return
# Create a RTVIClientMessageFrame to push the message
frame = RTVIClientMessageFrame(msg_id=msg_id, type=data.t, data=data.d)
await self.push_frame(frame)
await self._call_event_handler(
"on_client_message",
RTVIClientMessage(
msg_id=msg_id,
type=data.t,
data=data.d,
),
)
async def _handle_function_call_result(self, data):
"""Handle a function call result from the client."""
frame = FunctionCallResultFrame(
@@ -1618,19 +1278,12 @@ class RTVIProcessor(FrameProcessor):
async def _send_bot_ready(self):
"""Send the bot-ready message to the client."""
config = None
if self._client_version[0] < 1:
config = self._config.config
message = RTVIBotReady(
id=self._client_ready_id,
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config),
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=self._config.config),
)
await self._push_transport_message(message)
async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse):
"""Send a message or response to the client."""
await self._push_transport_message(message)
async def _send_error_frame(self, frame: ErrorFrame):
"""Send an error frame as an RTVI error message."""
if self._errors_enabled:

View File

@@ -108,10 +108,6 @@ class ExotelFrameSerializer(FrameSerializer):
serialized_data = await self._output_resampler.resample(
data, frame.sample_rate, self._exotel_sample_rate
)
if serialized_data is None or len(serialized_data) == 0:
# Ignoring in case we don't have audio
return None
payload = base64.b64encode(serialized_data).decode("ascii")
answer = {
@@ -148,9 +144,6 @@ class ExotelFrameSerializer(FrameSerializer):
self._exotel_sample_rate,
self._sample_rate,
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None
# Input: Exotel takes PCM data, so just resample to match sample_rate
audio_frame = InputAudioRawFrame(

View File

@@ -132,10 +132,6 @@ class PlivoFrameSerializer(FrameSerializer):
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._plivo_sample_rate, self._output_resampler
)
if serialized_data is None or len(serialized_data) == 0:
# Ignoring in case we don't have audio
return None
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "playAudio",
@@ -231,10 +227,6 @@ class PlivoFrameSerializer(FrameSerializer):
deserialized_data = await ulaw_to_pcm(
payload, self._plivo_sample_rate, self._sample_rate, self._input_resampler
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)

View File

@@ -155,10 +155,6 @@ class TelnyxFrameSerializer(FrameSerializer):
else:
raise ValueError(f"Unsupported encoding: {self._params.inbound_encoding}")
if serialized_data is None or len(serialized_data) == 0:
# Ignoring in case we don't have audio
return None
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
@@ -266,10 +262,6 @@ class TelnyxFrameSerializer(FrameSerializer):
else:
raise ValueError(f"Unsupported encoding: {self._params.outbound_encoding}")
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)

View File

@@ -132,10 +132,6 @@ class TwilioFrameSerializer(FrameSerializer):
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._twilio_sample_rate, self._output_resampler
)
if serialized_data is None or len(serialized_data) == 0:
# Ignoring in case we don't have audio
return None
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
@@ -239,10 +235,6 @@ class TwilioFrameSerializer(FrameSerializer):
deserialized_data = await ulaw_to_pcm(
payload, self._twilio_sample_rate, self._sample_rate, self._input_resampler
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)

View File

@@ -44,7 +44,6 @@ from .models import (
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error('In order to use AssemblyAI, you need to `pip install "pipecat-ai[assemblyai]"`.')
@@ -191,9 +190,9 @@ class AssemblyAISTTService(STTService):
"Authorization": self._api_key,
"User-Agent": f"AssemblyAI/1.0 (integration=Pipecat/{pipecat_version})",
}
self._websocket = await websocket_connect(
self._websocket = await websockets.connect(
ws_url,
additional_headers=headers,
extra_headers=headers,
)
self._connected = True
self._receive_task = self.create_task(self._receive_task_handler())

View File

@@ -55,7 +55,7 @@ from pipecat.services.llm_service import LLMService
from pipecat.utils.tracing.service_decorators import traced_llm
try:
import aioboto3
import boto3
import httpx
from botocore.config import Config
except ModuleNotFoundError as e:
@@ -749,17 +749,13 @@ class AWSBedrockLLMService(LLMService):
read_timeout=300, # 5 minutes
retries={"max_attempts": 3},
)
self._aws_session = aioboto3.Session()
# Store AWS session parameters for creating client in async context
self._aws_params = {
"aws_access_key_id": aws_access_key,
"aws_secret_access_key": aws_secret_key,
"aws_session_token": aws_session_token,
"region_name": aws_region,
"config": client_config,
}
session = boto3.Session(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
aws_session_token=aws_session_token,
region_name=aws_region,
)
self._client = session.client(service_name="bedrock-runtime", config=client_config)
self.set_model_name(model)
self._settings = {
@@ -907,74 +903,70 @@ class AWSBedrockLLMService(LLMService):
logger.debug(f"Calling AWS Bedrock model with: {request_params}")
async with self._aws_session.client(
service_name="bedrock-runtime", **self._aws_params
) as client:
# Call AWS Bedrock with streaming
response = await client.converse_stream(**request_params)
# Call AWS Bedrock with streaming
response = self._client.converse_stream(**request_params)
await self.stop_ttfb_metrics()
await self.stop_ttfb_metrics()
# Process the streaming response
tool_use_block = None
json_accumulator = ""
# Process the streaming response
tool_use_block = None
json_accumulator = ""
function_calls = []
function_calls = []
for event in response["stream"]:
self.reset_watchdog()
async for event in response["stream"]:
self.reset_watchdog()
# Handle text content
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]
if "text" in delta:
await self.push_frame(LLMTextFrame(delta["text"]))
completion_tokens_estimate += self._estimate_tokens(delta["text"])
elif "toolUse" in delta and "input" in delta["toolUse"]:
# Handle partial JSON for tool use
json_accumulator += delta["toolUse"]["input"]
completion_tokens_estimate += self._estimate_tokens(
delta["toolUse"]["input"]
)
# Handle text content
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]
if "text" in delta:
await self.push_frame(LLMTextFrame(delta["text"]))
completion_tokens_estimate += self._estimate_tokens(delta["text"])
elif "toolUse" in delta and "input" in delta["toolUse"]:
# Handle partial JSON for tool use
json_accumulator += delta["toolUse"]["input"]
completion_tokens_estimate += self._estimate_tokens(
delta["toolUse"]["input"]
)
# Handle tool use start
elif "contentBlockStart" in event:
content_block_start = event["contentBlockStart"]["start"]
if "toolUse" in content_block_start:
tool_use_block = {
"id": content_block_start["toolUse"].get("toolUseId", ""),
"name": content_block_start["toolUse"].get("name", ""),
}
json_accumulator = ""
# Handle tool use start
elif "contentBlockStart" in event:
content_block_start = event["contentBlockStart"]["start"]
if "toolUse" in content_block_start:
tool_use_block = {
"id": content_block_start["toolUse"].get("toolUseId", ""),
"name": content_block_start["toolUse"].get("name", ""),
}
json_accumulator = ""
# Handle message completion with tool use
elif "messageStop" in event and "stopReason" in event["messageStop"]:
if event["messageStop"]["stopReason"] == "tool_use" and tool_use_block:
try:
arguments = json.loads(json_accumulator) if json_accumulator else {}
# Handle message completion with tool use
elif "messageStop" in event and "stopReason" in event["messageStop"]:
if event["messageStop"]["stopReason"] == "tool_use" and tool_use_block:
try:
arguments = json.loads(json_accumulator) if json_accumulator else {}
# Only call function if it's not the no_operation tool
if not using_noop_tool:
function_calls.append(
FunctionCallFromLLM(
context=context,
tool_call_id=tool_use_block["id"],
function_name=tool_use_block["name"],
arguments=arguments,
)
# Only call function if it's not the no_operation tool
if not using_noop_tool:
function_calls.append(
FunctionCallFromLLM(
context=context,
tool_call_id=tool_use_block["id"],
function_name=tool_use_block["name"],
arguments=arguments,
)
else:
logger.debug("Ignoring no_operation tool call")
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {json_accumulator}")
)
else:
logger.debug("Ignoring no_operation tool call")
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {json_accumulator}")
# Handle usage metrics if available
if "metadata" in event and "usage" in event["metadata"]:
usage = event["metadata"]["usage"]
prompt_tokens += usage.get("inputTokens", 0)
completion_tokens += usage.get("outputTokens", 0)
cache_read_input_tokens += usage.get("cacheReadInputTokens", 0)
cache_creation_input_tokens += usage.get("cacheWriteInputTokens", 0)
# Handle usage metrics if available
if "metadata" in event and "usage" in event["metadata"]:
usage = event["metadata"]["usage"]
prompt_tokens += usage.get("inputTokens", 0)
completion_tokens += usage.get("outputTokens", 0)
cache_read_input_tokens += usage.get("cacheReadInputTokens", 0)
cache_creation_input_tokens += usage.get("cacheWriteInputTokens", 0)
await self.run_function_calls(function_calls)
except asyncio.CancelledError:

Some files were not shown because too many files have changed in this diff Show More