Compare commits

...

54 Commits

Author SHA1 Message Date
James Hush
dddfd791e1 Replace hello with banana 2025-04-18 14:18:32 +08:00
James Hush
e721c2086c Add banana processor 2025-04-18 14:14:22 +08:00
Filipi da Silva Fuchter
f77d8f0b6f Merge pull request #1611 from pipecat-ai/smart_turn_changelog
Mentioning the Smart Turn Detection into the changelog.
2025-04-17 23:02:57 -03:00
Varun Singh
9c0beb05cf Merge pull request #1597 from pipecat-ai/vr000m-opus-added
Changing default codec to OPUS for telephony
2025-04-17 18:42:12 -07:00
Aleix Conchillo Flaqué
858981c404 Merge pull request #1610 from pipecat-ai/aleix/add-base-turn-analyzer
audio: add BaseTurnAnalyzer class
2025-04-17 18:38:08 -07:00
Aleix Conchillo Flaqué
9eed225aa2 audio: add BaseTurnAnalyzer class 2025-04-17 18:37:52 -07:00
Filipi Fuchter
9f7371e485 Mentioning the Smart Turn Detection into the changelog. 2025-04-17 22:31:40 -03:00
Aleix Conchillo Flaqué
004a920920 Merge pull request #1563 from Bnowako/packaging-type-information
Add marker file for static type checkers
2025-04-17 17:26:15 -07:00
Filipi da Silva Fuchter
203c5a3a60 Merge pull request #1592 from pipecat-ai/smart_turn
Smart turn
2025-04-17 18:21:47 -03:00
Filipi Fuchter
7f6fb1754b Merge remote-tracking branch 'origin/smart_turn' into smart_turn 2025-04-17 17:53:53 -03:00
Filipi Fuchter
a390ce13a4 Removing the UserEndOfTurnFrame 2025-04-17 17:53:31 -03:00
Filipi da Silva Fuchter
61d31d1c40 Restoring stop_secs to default value.
Co-authored-by: Mark Backman <mark@daily.co>
2025-04-17 17:44:47 -03:00
Filipi da Silva Fuchter
e872ff943a Using the default model for OpenAi.
Co-authored-by: Mark Backman <mark@daily.co>
2025-04-17 17:43:39 -03:00
Filipi da Silva Fuchter
c71005e249 Using the default model for OpenAi.
Co-authored-by: Mark Backman <mark@daily.co>
2025-04-17 17:43:23 -03:00
Filipi Fuchter
6e06bf97c0 Preventing emitting the UserStartedSpeaking event multiple times. 2025-04-17 17:21:29 -03:00
Filipi Fuchter
a80dc94e91 Fixing ruff format. 2025-04-17 16:47:17 -03:00
Filipi Fuchter
3ea9cfd251 Keeping the _speech_triggered as true if the state is incomplete. 2025-04-17 16:46:15 -03:00
Filipi Fuchter
a80f82cdb6 Moving the environment variables to inside the demo. 2025-04-17 16:28:50 -03:00
Aleix Conchillo Flaqué
d24bab354f Merge pull request #1607 from pipecat-ai/aleix/fix-websocket-disconnects
services: fix TTS websocket services disconnections
2025-04-17 12:27:52 -07:00
Filipi Fuchter
53ee3fb64c Changing the log levels used in smart_turn 2025-04-17 16:14:13 -03:00
Filipi Fuchter
3599761e4e Changing the default behavior to only use the last vad segment, and increasing the default stop_secs to 3 2025-04-17 16:07:03 -03:00
Aleix Conchillo Flaqué
c0b3fe3985 services: only read from TTS websocket if websocket connection established 2025-04-17 11:54:07 -07:00
Aleix Conchillo Flaqué
497d48b6c8 services: fix TTS websocket services disconnections
Fixes #1467
2025-04-17 11:29:49 -07:00
Filipi Fuchter
e179916c9c Creating a new param use_only_last_vad_segment 2025-04-17 11:49:51 -03:00
Filipi Fuchter
b0b38beb19 Returning the max duration back to 8 seconds. 2025-04-17 11:39:48 -03:00
Filipi Fuchter
8577139d21 Fixing to keep the last max samples. 2025-04-17 11:39:06 -03:00
Filipi Fuchter
e2fbbb4b40 Renaming the smart turn classes. 2025-04-17 10:43:21 -03:00
Filipi Fuchter
88ce117e84 Changing the max duration default value to 16 seconds. 2025-04-17 10:35:13 -03:00
Filipi Fuchter
266537c3f4 Fixing to respect the stop_secs. 2025-04-17 10:07:08 -03:00
Filipi Fuchter
230d2f80fa Merge branch 'main' into smart_turn 2025-04-17 09:36:30 -03:00
Filipi Fuchter
3f0688aefa Testing smart turn using stop_secs as 5 seconds 2025-04-17 09:36:03 -03:00
Filipi da Silva Fuchter
5be3e6979e Merge pull request #1533 from pipecat-ai/daily_small_webrtc
Example interoping between SmallWebRTC and Daily
2025-04-17 09:19:23 -03:00
Filipi Fuchter
a458c1e92b Improving the README and fixing the env.example 2025-04-16 18:38:48 -03:00
Filipi Fuchter
5bbf1d0209 Example interoping between SmallWebRTC and Daily. 2025-04-16 17:14:12 -03:00
Filipi Fuchter
8e36bdbed7 Adding some comments to the code. 2025-04-16 09:11:27 -03:00
Filipi Fuchter
cd8bd7f487 Adding some comments to the code. 2025-04-16 08:58:40 -03:00
Filipi Fuchter
5fa47b7a5c Adding the dependencies for the remote smart turn 2025-04-16 08:45:01 -03:00
Filipi Fuchter
616961b487 Stop removing segments from the end 2025-04-16 08:04:38 -03:00
Filipi Fuchter
650d4d9ee2 Changing the start speech time and adding logs. 2025-04-16 07:55:20 -03:00
Filipi Fuchter
2627cb6bf2 Allowing to define SmartTurnParams 2025-04-16 07:13:13 -03:00
Filipi Fuchter
0e4115049b Refactoring to use keep alive sessions. 2025-04-16 06:44:57 -03:00
Filipi Fuchter
3ebef9346f Adding support for RemoteSmartTurn 2025-04-16 06:33:42 -03:00
Filipi Fuchter
3e2d21779f Refactoring the BaseEndOfTurnAnalyzer to include most of the logic 2025-04-16 06:11:56 -03:00
Filipi Fuchter
cfefcac35f Resetting the silence frames when the user speaks. 2025-04-15 20:51:36 -03:00
Filipi Fuchter
57b39c084f Triggering to check if the turn is complete based on the maximum timeout 2025-04-15 20:42:41 -03:00
Filipi Fuchter
11b6de0900 Triggering to check if the turn is complete each time the user stops speaking based on the vad 2025-04-15 17:28:00 -03:00
Varun Singh
824bc9bf16 Update dial.js 2025-04-15 12:48:33 -07:00
Varun Singh
d0ddef6c12 Update server.py 2025-04-15 12:37:33 -07:00
Filipi Fuchter
e6325a8229 Integrating with the smart turn model to predict 2025-04-15 16:01:09 -03:00
Filipi Fuchter
3588b06718 Adding missing torch dependency. 2025-04-15 12:28:36 -03:00
Filipi Fuchter
73874f6ec0 Loading the smart turn model. 2025-04-15 12:11:06 -03:00
Filipi Fuchter
6ab9a8ad7f Starting to create a local smart turn 2025-04-15 11:24:39 -03:00
Filipi Fuchter
821e303249 Bringing Aleix initial implementation for the smart turn. 2025-04-15 10:21:40 -03:00
Bnowako
61cba0136f Add marker file for static type checkers 2025-04-11 11:00:57 +02:00
32 changed files with 1072 additions and 54 deletions

View File

@@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added support for Smart Turn Detection via the `turn_analyzer` transport
parameter. You can now choose between `SmartTurnAnalyzer()` for remote
inference or `LocalCoreMLSmartTurnAnalyzer()` for on-device inference using
Core ML.
- `DeepgramTTSService` accepts `base_url` argument again, allowing you to
connect to an on-prem service.
@@ -55,6 +60,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue that would cause TTS websocket-based services to not cleanup
resources properly when disconnecting.
- Fixed a `TavusVideoService` issue that was causing audio choppiness.
- Fixed an issue in `SmallWebRTCTransport` where an error was thrown if the

View File

@@ -92,4 +92,8 @@ ASSEMBLYAI_API_KEY=...
OPENROUTER_API_KEY=...
# Piper
PIPER_BASE_URL=...
PIPER_BASE_URL=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=
REMOTE_SMART_TURN_URL=

View File

@@ -141,6 +141,7 @@ async def dial(request: RoomRequest, raw_request: Request):
"display_name": request.From,
"sip_mode": "dial-in",
"num_endpoints": 2 if request.call_transfer is not None else 1,
"codecs": {"audio": ["OPUS"]},
}
daily_room_properties["sip"] = sip_config

View File

@@ -103,6 +103,7 @@ export default async function handler(req, res) {
display_name: From,
sip_mode: 'dial-in',
num_endpoints: call_transfer !== null ? 2 : 1,
codecs: {"audio": ["OPUS"]},
};
daily_room_properties.sip = sip_config;
}
@@ -172,4 +173,4 @@ export const config = {
sizeLimit: '1mb',
},
},
};
};

View File

@@ -9,10 +9,11 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.frames.frames import EndFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport

View File

@@ -10,7 +10,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, MetricsFrame
from pipecat.frames.frames import Frame, MetricsFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
@@ -32,7 +32,30 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
# Custom processor that prints a message if it receives a TranscriptionFrame that says "banana"
class BananaProcessor(FrameProcessor):
"""A custom processor that listens for transcription frames containing the word 'banana'."""
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
# Ensure the super method is called first
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
logger.debug(f"Received transcription frame: {frame.text}")
if "banana" in frame.text.lower():
logger.info("---- Received 'banana' in transcription frame")
# Push the frame after processing
await self.push_frame(frame)
class MetricsLogger(FrameProcessor):
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -87,10 +110,13 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
banana = BananaProcessor()
pipeline = Pipeline(
[
transport.input(),
stt,
banana,
context_aggregator.user(),
llm,
tts,

View File

@@ -0,0 +1,111 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn import SmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
remote_smart_turn_url = os.getenv("REMOTE_SMART_TURN_URL")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=SmartTurnAnalyzer(url=remote_smart_turn_url),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,129 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.local_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
# to the path where the smart-turn repo is cloned.
#
# Example setup:
#
# # Git LFS (Large File Storage)
# brew install git-lfs
# # Hugging Face uses LFS to store large model files, including .mlpackage
# git lfs install
# # Clone the repo with the smart_turn_classifier.mlpackage
# git clone https://huggingface.co/pipecat-ai/smart-turn
#
# Then set the env variable:
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
# or add it to your .env file
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=LocalCoreMLSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,61 @@
# SmallWebRTC and Daily
A Pipecat example demonstrating how to interoperate audio and video between `SmallWebRTCTransport` and `DailyTransport`.
## 🚀 Quick Start
### 1⃣ Start the Bot Server
#### 🔧 Set Up the Environment
1. Create and activate a virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure environment variables:
- Copy `env.example` to `.env`
```bash
cp env.example .env
```
- Add your API keys
#### ▶️ Run the Server
```bash
python server.py
```
### 1⃣ Connect the first client using Daily Prebuilt
- Open your browser and navigate to the same URL that you configured inside your `.env` file:
- `DAILY_SAMPLE_ROOM_URL`
### 2⃣ Connect the second client using SmallWebRTC Prebuilt UI
- Open your browser and navigate to:
👉 http://localhost:7860
- (Or use your custom port, if configured)
## ⚠️ Important Note
Ensure the bot server is running before using any client implementations.
## 📌 Requirements
- Python **3.10+**
- Node.js **16+** (for JavaScript components)
- Google API Key
- Modern web browser with WebRTC support
---
### 💡 Notes
- Ensure all dependencies are installed before running the server.
- Check the `.env` file for missing configurations.
- WebRTC requires a secure environment (HTTPS) for full functionality in production.
Happy coding! 🎉

View File

@@ -0,0 +1,128 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import Frame, FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def run_bot(webrtc_connection):
pipecat_transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
camera_in_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
vad_enabled=False,
),
)
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
daily_transport = DailyTransport(
room_url,
None,
"SmallWebRTC",
params=DailyParams(
camera_in_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
vad_enabled=False,
),
)
pipeline = Pipeline(
[
ParallelPipeline(
[
daily_transport.input(),
MirrorProcessor(),
pipecat_transport.output(),
],
[
pipecat_transport.input(),
MirrorProcessor(),
daily_transport.output(),
],
)
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=False,
),
)
@daily_transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await transport.capture_participant_video(participant["id"])
@pipecat_transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Pipecat Client connected")
@pipecat_transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Pipecat Client disconnected")
@pipecat_transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info("Pipecat Client closed")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -0,0 +1,2 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL=

View File

@@ -0,0 +1,5 @@
python-dotenv
fastapi[all]
uvicorn
aiortc
pipecat-ai[silero, webrtc, daily]

View File

@@ -0,0 +1,89 @@
import argparse
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/prebuilt/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False)
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(run_bot, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -79,6 +79,8 @@ qwen = []
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.19.0" ]
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]

View File

View File

@@ -0,0 +1,168 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import time
from abc import abstractmethod
from typing import Dict, Optional
import numpy as np
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
# Default timing parameters
STOP_SECS = 3
PRE_SPEECH_MS = 0
MAX_DURATION_SECONDS = 8 # Max allowed segment duration
USE_ONLY_LAST_VAD_SEGMENT = True
class SmartTurnParams(BaseModel):
stop_secs: float = STOP_SECS
pre_speech_ms: float = PRE_SPEECH_MS
max_duration_secs: float = MAX_DURATION_SECONDS
# not exposing this for now yet until the model can handle it.
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
class BaseSmartTurn(BaseTurnAnalyzer):
def __init__(
self, *, sample_rate: Optional[int] = None, params: SmartTurnParams = SmartTurnParams()
):
super().__init__(sample_rate=sample_rate)
self._params = params
# Configuration
self._stop_ms = self._params.stop_secs * 1000 # silence threshold in ms
# Inference state
self._audio_buffer = []
self._speech_triggered = False
self._silence_ms = 0
self._speech_start_time = None
@property
def speech_triggered(self) -> bool:
return self._speech_triggered
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
# Convert raw audio to float32 format and append to the buffer
audio_int16 = np.frombuffer(buffer, dtype=np.int16)
audio_float32 = np.frombuffer(audio_int16, dtype=np.int16).astype(np.float32) / 32768.0
self._audio_buffer.append((time.time(), audio_float32))
state = EndOfTurnState.INCOMPLETE
if is_speech:
# Reset silence tracking on speech
self._silence_ms = 0
self._speech_triggered = True
if self._speech_start_time is None:
self._speech_start_time = time.time()
logger.debug(f"Speech started at {self._speech_start_time}")
else:
if self._speech_triggered:
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
self._silence_ms += chunk_duration_ms
# If silence exceeds threshold, mark end of turn
if self._silence_ms >= self._stop_ms:
logger.debug(
f"End of Turn complete due to stop_secs. Silence in ms: {self._silence_ms}"
)
state = EndOfTurnState.COMPLETE
self._clear(state)
else:
# Trim buffer to prevent unbounded growth before speech
max_buffer_time = (
(self._params.pre_speech_ms / 1000)
+ self._params.stop_secs
+ self._params.max_duration_secs
)
while (
self._audio_buffer and self._audio_buffer[0][0] < time.time() - max_buffer_time
):
self._audio_buffer.pop(0)
return state
def analyze_end_of_turn(self) -> EndOfTurnState:
logger.debug("Analyzing End of Turn...")
state = self._process_speech_segment(self._audio_buffer)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
return state
def _clear(self, turn_state: EndOfTurnState):
# Reset internal state for next turn
logger.debug("Clearing audio buffer...")
# If the state is still incomplete, keep the _speech_triggered as True
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
self._audio_buffer = []
self._speech_start_time = None
self._silence_ms = 0
def _process_speech_segment(self, audio_buffer) -> EndOfTurnState:
state = EndOfTurnState.INCOMPLETE
if not audio_buffer:
return state
# Extract recent audio segment for prediction
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
start_index = 0
for i, (t, _) in enumerate(audio_buffer):
if t >= start_time:
start_index = i
break
end_index = len(audio_buffer) - 1
# Extract the audio segment
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
segment_audio = np.concatenate(segment_audio_chunks)
logger.debug(f"Segment audio chunks after start index: {len(segment_audio)}")
# Limit maximum duration
max_samples = int(self._params.max_duration_secs * self.sample_rate)
if len(segment_audio) > max_samples:
# slices the array to keep the last max_samples samples, discarding the earlier part.
segment_audio = segment_audio[-max_samples:]
logger.debug(f"Segment audio chunks after limiting duration: {len(segment_audio)}")
if len(segment_audio) > 0:
start_time = time.perf_counter()
result = self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE if result["prediction"] == 1 else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
logger.debug("--------")
logger.debug(f"Prediction: {'Complete' if result['prediction'] == 1 else 'Incomplete'}")
logger.debug(f"Probability of complete: {result['probability']:.4f}")
logger.debug(f"Prediction took {(end_time - start_time) * 1000:.2f}ms seconds")
else:
logger.debug(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.debug("Captured empty audio segment, skipping prediction.")
return state
@abstractmethod
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, any]:
"""
Abstract method to predict if a turn has ended based on audio.
Args:
buffer: Float32 numpy array of audio samples at 16kHz.
Returns:
Dictionary with:
- prediction: 1 if turn is complete, else 0
- probability: Confidence of the prediction
"""
pass

View File

@@ -0,0 +1,81 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional
class EndOfTurnState(Enum):
COMPLETE = 1
INCOMPLETE = 2
class BaseTurnAnalyzer(ABC):
"""
Abstract base class for analyzing user end of turn.
"""
def __init__(self, *, sample_rate: Optional[int] = None):
self._init_sample_rate = sample_rate
self._sample_rate = 0
@property
def sample_rate(self) -> int:
"""
Returns the current sample rate.
Returns:
int: The effective sample rate for audio processing.
"""
return self._sample_rate
def set_sample_rate(self, sample_rate: int):
"""
Sets the sample rate for audio processing.
If the initial sample rate was provided, it will use that; otherwise, it sets to
the provided sample rate.
Args:
sample_rate (int): The sample rate to set.
"""
self._sample_rate = self._init_sample_rate or sample_rate
@property
@abstractmethod
def speech_triggered(self) -> bool:
"""
Determines if speech has been detected.
Returns:
bool: True if speech is triggered, otherwise False.
"""
pass
@abstractmethod
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""
Appends audio data for analysis.
Args:
buffer (bytes): The audio data to append.
is_speech (bool): Indicates whether the appended audio is speech or not.
Returns:
EndOfTurnState: The resulting state after appending the audio.
"""
pass
@abstractmethod
def analyze_end_of_turn(self) -> EndOfTurnState:
"""
Analyzes if an end of turn has occurred based on the audio input.
Returns:
EndOfTurnState: The result of the end of turn analysis.
"""
pass

View File

@@ -0,0 +1,65 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from typing import Dict
import numpy as np
import torch
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
try:
import coremltools as ct
from transformers import AutoFeatureExtractor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, smart_turn_model_path: str, **kwargs):
super().__init__(**kwargs)
if not smart_turn_model_path:
logger.error("smart_turn_model_path is not set.")
raise Exception("smart_turn_model_path must be provided.")
core_ml_model_path = f"{smart_turn_model_path}/coreml/smart_turn_classifier.mlpackage"
logger.debug("Loading Local Smart Turn model...")
# Only load the processor, not the torch model
self._turn_processor = AutoFeatureExtractor.from_pretrained(smart_turn_model_path)
self._turn_model = ct.models.MLModel(core_ml_model_path)
logger.debug("Loaded Local Smart Turn")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,
padding="max_length",
truncation=True,
max_length=800, # Maximum length as specified in training
return_attention_mask=True,
return_tensors="pt",
)
output = self._turn_model.predict(dict(inputs))
logits = output["logits"] # Core ML returns numpy array
logits_tensor = torch.tensor(logits)
probabilities = torch.nn.functional.softmax(logits_tensor, dim=1)
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
prediction = 1 if completion_prob > 0.5 else 0
return {
"prediction": prediction,
"probability": completion_prob,
}

View File

@@ -0,0 +1,75 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import io
import os
from typing import Dict
import numpy as np
import requests
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
class SmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, url: str, **kwargs):
super().__init__(**kwargs)
self.remote_smart_turn_url = url
if not self.remote_smart_turn_url:
logger.error("remote_smart_turn_url is not set.")
raise Exception("remote_smart_turn_url must be provided.")
# Use a session to reuse connections (keep-alive)
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
def _send_raw_request(self, data_bytes: bytes):
headers = {"Content-Type": "application/octet-stream"}
logger.trace(
f"Sending {len(data_bytes)} bytes as raw body to {self.remote_smart_turn_url}..."
)
try:
response = self.session.post(
self.remote_smart_turn_url,
data=data_bytes,
headers=headers,
timeout=60,
)
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status_code}")
if response.ok:
try:
logger.trace("Response JSON:")
logger.trace(response.json())
return response.json()
except requests.exceptions.JSONDecodeError:
logger.trace("Response Content (non-JSON):")
logger.trace(response.text)
else:
logger.trace("Response Content (Error):")
logger.trace(response.text)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
serialized_array = self._serialize_array(audio_array)
return self._send_raw_request(serialized_array)

1
src/pipecat/py.typed Normal file
View File

@@ -0,0 +1 @@

View File

@@ -185,7 +185,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -197,7 +198,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Cartesia")
self._websocket = await websockets.connect(
@@ -215,11 +216,11 @@ class CartesiaTTSService(AudioContextWordTTSService):
if self._websocket:
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
self._websocket = None
self._context_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._context_id = None
self._websocket = None
def _get_websocket(self):
if self._websocket:
@@ -279,7 +280,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
if not self._context_id:

View File

@@ -309,10 +309,10 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if not self._keepalive_task:
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
@@ -328,7 +328,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to ElevenLabs")
@@ -375,11 +375,11 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
logger.debug("Disconnecting from ElevenLabs")
await self._websocket.send(json.dumps({"text": ""}))
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
def _get_websocket(self):
if self._websocket:
@@ -419,7 +419,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -104,7 +104,8 @@ class FishAudioTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -116,7 +117,7 @@ class FishAudioTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Fish Audio")
@@ -141,16 +142,17 @@ class FishAudioTTSService(InterruptibleTTSService):
stop_message = {"event": "stop"}
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
self._websocket = None
self._request_id = None
self._started = False
except Exception as e:
logger.error(f"Error closing websocket: {e}")
finally:
self._request_id = None
self._started = False
self._websocket = None
async def flush_audio(self):
"""Flush any buffered audio by sending a flush event to Fish Audio."""
logger.trace(f"{self}: Flushing audio buffers")
if not self._websocket:
if not self._websocket or self._websocket.closed:
return
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))

View File

@@ -285,7 +285,7 @@ class GladiaSTTService(STTService):
settings = self._prepare_settings()
response = await self._setup_gladia(settings)
self._websocket = await websockets.connect(response["url"])
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler())
async def stop(self, frame: EndFrame):

View File

@@ -109,7 +109,7 @@ class LmntTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -122,7 +122,7 @@ class LmntTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
"""Connect to LMNT websocket."""
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to LMNT")
@@ -158,11 +158,11 @@ class LmntTTSService(InterruptibleTTSService):
# errors on the websocket, so we just skip it for now.
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
def _get_websocket(self):
if self._websocket:
@@ -170,7 +170,7 @@ class LmntTTSService(InterruptibleTTSService):
raise Exception("Websocket not connected")
async def flush_audio(self):
if not self._websocket:
if not self._websocket or self._websocket.closed:
return
await self._get_websocket().send(json.dumps({"flush": True}))
@@ -203,7 +203,7 @@ class LmntTTSService(InterruptibleTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -106,6 +106,9 @@ class NeuphonicTTSService(InterruptibleTTSService):
self._started = False
self._cumulative_time = 0
self._receive_task = None
self._keepalive_task = None
def can_generate_metrics(self) -> bool:
return True
@@ -159,8 +162,11 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
self._keepalive_task = self.create_task(self._keepalive_task_handler())
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
if self._receive_task:
@@ -175,6 +181,9 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Neuphonic")
tts_config = {
@@ -190,7 +199,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
url = f"{self._url}/speak/{self._settings['lang_code']}?{'&'.join(query_params)}"
self._websocket = await websockets.connect(url)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -203,11 +211,11 @@ class NeuphonicTTSService(InterruptibleTTSService):
if self._websocket:
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
async def _receive_messages(self):
async for message in self._websocket:
@@ -235,7 +243,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -157,7 +157,7 @@ class PlayHTTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -169,7 +169,7 @@ class PlayHTTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to PlayHT")
@@ -197,11 +197,11 @@ class PlayHTTTSService(InterruptibleTTSService):
if self._websocket:
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
self._websocket = None
self._request_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._request_id = None
self._websocket = None
async def _get_websocket_url(self):
async with aiohttp.ClientSession() as session:

View File

@@ -168,7 +168,7 @@ class RimeTTSService(AudioContextWordTTSService):
"""Establish websocket connection and start receive task."""
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -182,7 +182,7 @@ class RimeTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
"""Connect to Rime websocket API with configured settings."""
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
params = "&".join(f"{k}={v}" for k, v in self._settings.items())
@@ -201,10 +201,11 @@ class RimeTTSService(AudioContextWordTTSService):
if self._websocket:
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
self._websocket = None
self._context_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._context_id = None
self._websocket = None
def _get_websocket(self):
"""Get active websocket connection or raise exception."""
@@ -316,7 +317,7 @@ class RimeTTSService(AudioContextWordTTSService):
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -31,7 +31,7 @@ class WebsocketService(ABC):
bool: True if connection is verified working, False otherwise
"""
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
return False
await self._websocket.ping()
return True

View File

@@ -10,6 +10,7 @@ from typing import Optional
from loguru import logger
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
BotInterruptionFrame,
@@ -64,12 +65,19 @@ class BaseInputTransport(FrameProcessor):
def vad_analyzer(self) -> Optional[VADAnalyzer]:
return self._params.vad_analyzer
@property
def turn_analyzer(self) -> Optional[BaseTurnAnalyzer]:
return self._params.turn_analyzer
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
# Configure VAD analyzer.
if self._params.vad_enabled and self._params.vad_analyzer:
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
# Configure End of turn analyzer.
if self._params.turn_analyzer:
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._sample_rate)
@@ -187,10 +195,18 @@ class BaseInputTransport(FrameProcessor):
and new_vad_state != VADState.STOPPING
):
frame = None
if new_vad_state == VADState.SPEAKING:
frame = UserStartedSpeakingFrame()
elif new_vad_state == VADState.QUIET:
frame = UserStoppedSpeakingFrame()
# If the turn analyser is enabled, this will prevent:
# - Creating the UserStoppedSpeakingFrame
# - Creating the UserStartedSpeakingFrame multiple times
can_create_user_frames = (
self._params.turn_analyzer is None
or not self._params.turn_analyzer.speech_triggered
)
if can_create_user_frames:
if new_vad_state == VADState.SPEAKING:
frame = UserStartedSpeakingFrame()
elif new_vad_state == VADState.QUIET:
frame = UserStoppedSpeakingFrame()
if frame:
await self._handle_user_interruption(frame)
@@ -198,6 +214,29 @@ class BaseInputTransport(FrameProcessor):
vad_state = new_vad_state
return vad_state
async def _handle_end_of_turn(self):
if self.turn_analyzer:
state = await self.get_event_loop().run_in_executor(
self._executor, self.turn_analyzer.analyze_end_of_turn
)
await self._handle_end_of_turn_complete(state)
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):
if state == EndOfTurnState.COMPLETE:
await self._handle_user_interruption(UserStoppedSpeakingFrame())
async def _run_turn_analyzer(
self, frame: InputAudioRawFrame, vad_state: VADState, previous_vad_state: VADState
):
is_speech = vad_state == VADState.SPEAKING or vad_state == VADState.STARTING
# If silence exceeds threshold, we are going to receive EndOfTurnState.COMPLETE
end_of_turn_state = self._params.turn_analyzer.append_audio(frame.audio, is_speech)
if end_of_turn_state == EndOfTurnState.COMPLETE:
await self._handle_end_of_turn_complete(end_of_turn_state)
# Otherwise we are going to trigger to check if the turn is completed based on the VAD
elif vad_state == VADState.QUIET and vad_state != previous_vad_state:
await self._handle_end_of_turn()
async def _audio_task_handler(self):
vad_state: VADState = VADState.QUIET
while True:
@@ -211,10 +250,14 @@ class BaseInputTransport(FrameProcessor):
# Check VAD and push event if necessary. We just care about
# changes from QUIET to SPEAKING and vice versa.
previous_vad_state = vad_state
if self._params.vad_enabled:
vad_state = await self._handle_vad(frame, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
if self._params.turn_analyzer:
await self._run_turn_analyzer(frame, vad_state, previous_vad_state)
# Push audio downstream if passthrough.
if audio_passthrough:
await self.push_frame(frame)

View File

@@ -386,10 +386,13 @@ class BaseOutputTransport(FrameProcessor):
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
# TODO: we should refactor in the future to support dynamic resolutions
# which is kind of what happens in P2P connections.
# We need to add support for that inside the DailyTransport
if frame.size != desired_size:
image = Image.frombytes(frame.format, frame.size, frame.image)
resized_image = image.resize(desired_size)
logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
frame = OutputImageRawFrame(
resized_image.tobytes(), resized_image.size, resized_image.format
)

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.utils.base_object import BaseObject
@@ -41,6 +42,7 @@ class TransportParams(BaseModel):
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: Optional[VADAnalyzer] = None
turn_analyzer: Optional[BaseTurnAnalyzer] = None
class BaseTransport(BaseObject):