Compare commits

..

1 Commits

Author SHA1 Message Date
mattie ruth backman
50b19a9e77 minor updates to get started and working on latest modal 2025-04-23 21:25:45 -04:00
30 changed files with 295 additions and 501 deletions

View File

@@ -5,14 +5,10 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.64] - 2025-04-22
## [Unreleased]
### Added
- Added automatic hangup logic to the Twilio serializer. This feature hangs up
the Twilio call when an `EndFrame` or `CancelFrame` is received. It is
enabled by default and is configurable via the `auto_hang_up` `InputParam`.
- Added `SmartTurnMetricsData`, which contains end-of-turn prediction metrics,
to the `MetricsFrame`. Using `MetricsFrame`, you can now retrieve prediction
confidence scores and processing time metrics from the smart turn analyzers.
@@ -21,9 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`GoogleSTTService`, `GoogleTTSService`, and `GoogleVertexLLMService`.
- Added support for Smart Turn Detection via the `turn_analyzer` transport
parameter. You can now choose between `HttpSmartTurnAnalyzer()` or
`FalSmartTurnAnalyzer()` for remote inference or
`LocalCoreMLSmartTurnAnalyzer()` for on-device inference using Core ML.
parameter. You can now choose between `SmartTurnAnalyzer()` for remote
inference or `LocalCoreMLSmartTurnAnalyzer()` for on-device inference using
Core ML.
- `DeepgramTTSService` accepts `base_url` argument again, allowing you to
connect to an on-prem service.
@@ -48,8 +44,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `GrokLLMService` now uses `grok-3-beta` as its default model.
- Daily's REST helpers now include an `eject_at_token_exp` param, which ejects
the user when their token expires. This new parameter defaults to False.
Also, the default value for `enable_prejoin_ui` changed to False and
@@ -84,10 +78,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue where LLM input parameters were not working and applied correctly in `GoogleVertexLLMService`, causing
unexpected behavior during inference.
### Other
- Updated the `twilio-chatbot` example to use the auto-hangup feature.
## [0.0.63] - 2025-04-11
### Added

View File

@@ -96,8 +96,4 @@ PIPER_BASE_URL=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=
FAL_SMART_TURN_API_KEY=...
# Twilio
TWILIO_ACCOUNT_SID=
TWILIO_AUTH_TOKEN=
REMOTE_SMART_TURN_URL=

View File

@@ -10,24 +10,27 @@ import aiohttp
import modal
from bot import _voice_bot_process
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from fastapi.responses import RedirectResponse
from loguru import logger
MAX_SESSION_TIME = 15 * 60 # 15 minutes
app = modal.App("pipecat-modal")
image = modal.Image.debian_slim(python_version="3.12").pip_install_from_requirements(
"requirements.txt"
image = (
modal.Image.debian_slim(python_version="3.13")
.apt_install("ffmpeg")
.pip_install_from_requirements("requirements.txt")
.pip_install("pipecat-ai[daily,silero,cartesia,openai]")
.add_local_python_source("bot")
)
app = modal.App("pipecat-modal", image=image)
@app.function(
image=image,
cpu=1.0,
secrets=[modal.Secret.from_dotenv()],
keep_warm=1,
min_containers=1,
enable_memory_snapshot=True,
max_inputs=1, # Do not reuse instances across requests
retries=0,
@@ -40,7 +43,7 @@ def launch_bot_process(room_url: str, token: str):
image=image,
secrets=[modal.Secret.from_dotenv()],
)
@modal.web_endpoint(method="POST")
@modal.fastapi_endpoint(method="GET")
async def start():
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
@@ -77,4 +80,4 @@ async def start():
# Return room URL to the user to join
# Note: in production, you would want to return a token to the user
return JSONResponse(content={"room_url": room.url, token: token})
return RedirectResponse(room.url)

View File

@@ -1,5 +1,4 @@
python-dotenv==1.0.1
modal==0.71.3
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
fastapi==0.115.6
aiohttp==3.11.11

View File

@@ -1,113 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.fal_smart_turn import FalSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=FalSmartTurnAnalyzer(
api_key=os.getenv("FAL_SMART_TURN_API_KEY"), aiohttp_session=session
),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,111 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn import SmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
remote_smart_turn_url = os.getenv("REMOTE_SMART_TURN_URL")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=SmartTurnAnalyzer(url=remote_smart_turn_url),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -9,8 +9,8 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_coreml_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.local_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline

View File

@@ -7,6 +7,7 @@
import argparse
import asyncio
import importlib.util
import logging
import os
import sys
from contextlib import asynccontextmanager
@@ -17,7 +18,6 @@ import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -25,6 +25,14 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
# Configure logger
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger("pipecat-server")
app = FastAPI()
# Store connections by pc_id
@@ -154,11 +162,10 @@ def main():
parser.add_argument("--verbose", "-v", action="count", default=0)
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logger.add(sys.stderr, level="TRACE")
logging.basicConfig(level=logging.DEBUG)
else:
logger.add(sys.stderr, level="DEBUG")
logging.basicConfig(level=logging.INFO)
# Infer the bot file from the caller if not provided explicitly
bot_file = args.bot_file

View File

@@ -26,6 +26,9 @@ from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -1,12 +1,6 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import sys
import logging
from contextlib import asynccontextmanager
from typing import Dict
@@ -15,7 +9,6 @@ from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -23,6 +16,8 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
@@ -86,10 +81,9 @@ if __name__ == "__main__":
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logger.add(sys.stderr, level="TRACE")
logging.basicConfig(level=logging.DEBUG)
else:
logger.add(sys.stderr, level="DEBUG")
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -25,6 +25,9 @@ from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class EdgeDetectionProcessor(FrameProcessor):
def __init__(self, camera_out_width, camera_out_height: int):

View File

@@ -1,12 +1,6 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import sys
import logging
from contextlib import asynccontextmanager
from typing import Dict
@@ -15,7 +9,6 @@ from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -23,6 +16,8 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
@@ -86,10 +81,9 @@ if __name__ == "__main__":
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logger.add(sys.stderr, level="TRACE")
logging.basicConfig(level=logging.DEBUG)
else:
logger.add(sys.stderr, level="DEBUG")
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -20,6 +20,10 @@ from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
SYSTEM_INSTRUCTION = f"""
"You are Gemini Chatbot, a friendly, helpful robot.

View File

@@ -1,12 +1,6 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import sys
import logging
from contextlib import asynccontextmanager
from typing import Dict
@@ -15,13 +9,14 @@ from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
@@ -78,10 +73,9 @@ if __name__ == "__main__":
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logger.add(sys.stderr, level="TRACE")
logging.basicConfig(level=logging.DEBUG)
else:
logger.add(sys.stderr, level="DEBUG")
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -1,7 +1,6 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (optional: for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=
DAILY_API_URL=https://api.daily.co/v1
DEEPGRAM_API_KEY=
DAILY_API_URL=api.daily.co/v1
OPENAI_API_KEY=
GOOGLE_API_KEY
CARTESIA_API_KEY=

View File

@@ -1,5 +1,5 @@
pipecat-ai[daily,cartesia,deepgram,openai,google,silero]
fastapi==0.115.6
pipecat-ai[daily,cartesia,openai,google,silero]
fastapi==3.11.12
uvicorn
python-dotenv
twilio

View File

@@ -54,14 +54,7 @@ async def save_audio(server_name: str, audio: bytes, sample_rate: int, num_chann
logger.info("No audio data to save")
async def run_bot(websocket_client: WebSocket, stream_sid: str, call_sid: str, testing: bool):
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
@@ -71,7 +64,7 @@ async def run_bot(websocket_client: WebSocket, stream_sid: str, call_sid: str, t
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=serializer,
serializer=TwilioFrameSerializer(stream_sid),
),
)

View File

@@ -38,9 +38,8 @@ async def websocket_endpoint(websocket: WebSocket):
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data["start"]["streamSid"]
call_sid = call_data["start"]["callSid"]
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid, call_sid, app.state.testing)
await run_bot(websocket, stream_sid, app.state.testing)
if __name__ == "__main__":

View File

@@ -30,10 +30,6 @@ class SmartTurnParams(BaseModel):
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
class SmartTurnTimeoutException(Exception):
pass
class BaseSmartTurn(BaseTurnAnalyzer):
def __init__(
self, *, sample_rate: Optional[int] = None, params: SmartTurnParams = SmartTurnParams()
@@ -46,7 +42,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
self._audio_buffer = []
self._speech_triggered = False
self._silence_ms = 0
self._speech_start_time = 0
self._speech_start_time = None
@property
def speech_triggered(self) -> bool:
@@ -64,7 +60,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
# Reset silence tracking on speech
self._silence_ms = 0
self._speech_triggered = True
if self._speech_start_time == 0:
if self._speech_start_time is None:
self._speech_start_time = time.time()
else:
if self._speech_triggered:
@@ -91,8 +87,8 @@ class BaseSmartTurn(BaseTurnAnalyzer):
return state
async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state, result = await self._process_speech_segment(self._audio_buffer)
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state, result = self._process_speech_segment(self._audio_buffer)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
@@ -102,12 +98,10 @@ class BaseSmartTurn(BaseTurnAnalyzer):
# If the state is still incomplete, keep the _speech_triggered as True
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
self._audio_buffer = []
self._speech_start_time = 0
self._speech_start_time = None
self._silence_ms = 0
async def _process_speech_segment(
self, audio_buffer
) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state = EndOfTurnState.INCOMPLETE
if not audio_buffer:
@@ -137,41 +131,30 @@ class BaseSmartTurn(BaseTurnAnalyzer):
if len(segment_audio) > 0:
start_time = time.perf_counter()
try:
result = await self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE
if result["prediction"] == 1
else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
result = self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE if result["prediction"] == 1 else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
# Calculate processing time
e2e_processing_time_ms = (end_time - start_time) * 1000
# Calculate processing time
e2e_processing_time_ms = (end_time - start_time) * 1000
# Prepare the result data
result_data = SmartTurnMetricsData(
processor="BaseSmartTurn",
is_complete=result["prediction"] == 1,
probability=result["probability"],
inference_time_ms=result.get("inference_time", 0) * 1000,
server_total_time_ms=result.get("total_time", 0) * 1000,
e2e_processing_time_ms=e2e_processing_time_ms,
)
logger.trace(
f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}"
)
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
except SmartTurnTimeoutException:
logger.debug(
f"End of Turn complete due to stop_secs. Silence in ms: {self._silence_ms}"
)
state = EndOfTurnState.COMPLETE
# Prepare the result data
result_data = SmartTurnMetricsData(
processor="BaseSmartTurn",
is_complete=result["prediction"] == 1,
probability=result["probability"],
inference_time_ms=result.get("inference_time", 0) * 1000,
server_total_time_ms=result.get("total_time", 0) * 1000,
e2e_processing_time_ms=e2e_processing_time_ms,
)
logger.trace(f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}")
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
else:
logger.trace(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.trace("Captured empty audio segment, skipping prediction.")
@@ -179,11 +162,11 @@ class BaseSmartTurn(BaseTurnAnalyzer):
return state, result_data
@abstractmethod
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, Any]:
"""Abstract method to predict if a turn has ended based on audio.
Args:
audio_array: Float32 numpy array of audio samples at 16kHz.
buffer: Float32 numpy array of audio samples at 16kHz.
Returns:
Dictionary with:

View File

@@ -71,7 +71,7 @@ class BaseTurnAnalyzer(ABC):
pass
@abstractmethod
async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
"""Analyzes if an end of turn has occurred based on the audio input.
Returns:

View File

@@ -5,16 +5,17 @@
#
from typing import Any, Dict
import os
from typing import Dict
import numpy as np
import torch
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
try:
import coremltools as ct
import torch
from transformers import AutoFeatureExtractor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -25,7 +26,7 @@ except ModuleNotFoundError as e:
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, *, smart_turn_model_path: str, **kwargs):
def __init__(self, smart_turn_model_path: str, **kwargs):
super().__init__(**kwargs)
if not smart_turn_model_path:
@@ -40,7 +41,7 @@ class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
self._turn_model = ct.models.MLModel(core_ml_model_path)
logger.debug("Loaded Local Smart Turn")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,

View File

@@ -0,0 +1,75 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import io
import os
from typing import Dict
import numpy as np
import requests
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
class SmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, url: str, **kwargs):
super().__init__(**kwargs)
self.remote_smart_turn_url = url
if not self.remote_smart_turn_url:
logger.error("remote_smart_turn_url is not set.")
raise Exception("remote_smart_turn_url must be provided.")
# Use a session to reuse connections (keep-alive)
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
def _send_raw_request(self, data_bytes: bytes):
headers = {"Content-Type": "application/octet-stream"}
logger.trace(
f"Sending {len(data_bytes)} bytes as raw body to {self.remote_smart_turn_url}..."
)
try:
response = self.session.post(
self.remote_smart_turn_url,
data=data_bytes,
headers=headers,
timeout=60,
)
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status_code}")
if response.ok:
try:
logger.trace("Response JSON:")
logger.trace(response.json())
return response.json()
except requests.exceptions.JSONDecodeError:
logger.trace("Response Content (non-JSON):")
logger.trace(response.text)
else:
logger.trace("Response Content (Error):")
logger.trace(response.text)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
serialized_array = self._serialize_array(audio_array)
return self._send_raw_request(serialized_array)

View File

@@ -1,26 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Optional
import aiohttp
from pipecat.audio.turn.smart_turn.http_smart_turn import HttpSmartTurnAnalyzer
class FalSmartTurnAnalyzer(HttpSmartTurnAnalyzer):
def __init__(
self,
*,
aiohttp_session: aiohttp.ClientSession,
url: str = "https://fal.run/fal-ai/smart-turn/raw",
api_key: Optional[str] = None,
**kwargs,
):
headers = {}
if api_key:
headers = {"Authorization": f"Key {api_key}"}
super().__init__(url=url, aiohttp_session=aiohttp_session, headers=headers, **kwargs)

View File

@@ -1,80 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
from typing import Any, Dict
import aiohttp
import numpy as np
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn, SmartTurnTimeoutException
class HttpSmartTurnAnalyzer(BaseSmartTurn):
def __init__(
self,
*,
url: str,
aiohttp_session: aiohttp.ClientSession,
headers: Dict[str, str] = {},
**kwargs,
):
super().__init__(**kwargs)
self._url = url
self._headers = headers
self._aiohttp_session = aiohttp_session
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
async def _send_raw_request(self, data_bytes: bytes) -> Dict[str, Any]:
headers = {"Content-Type": "application/octet-stream"}
headers.update(self._headers)
logger.trace(f"Sending {len(data_bytes)} bytes as raw body to {self._url}...")
try:
timeout = aiohttp.ClientTimeout(total=self._params.stop_secs)
async with self._aiohttp_session.post(
self._url, data=data_bytes, headers=headers, timeout=timeout
) as response:
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status}")
if response.status == 200:
try:
json_data = await response.json()
logger.trace("Response JSON:")
logger.trace(json_data)
return json_data
except aiohttp.ContentTypeError:
# Non-JSON response
text = await response.text()
logger.trace("Response Content (non-JSON):")
logger.trace(text)
raise Exception(f"Non-JSON response: {text}")
else:
error_text = await response.text()
logger.trace("Response Content (Error):")
logger.trace(error_text)
response.raise_for_status()
except asyncio.TimeoutError:
logger.error(f"Request timed out after {self._params.stop_secs} seconds")
raise SmartTurnTimeoutException(f"Request exceeded {self._params.stop_secs} seconds.")
except aiohttp.ClientError as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)

View File

@@ -8,14 +8,11 @@ import base64
import json
from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
@@ -29,61 +26,12 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
class TwilioFrameSerializer(FrameSerializer):
"""Serializer for Twilio Media Streams WebSocket protocol.
This serializer handles converting between Pipecat frames and Twilio's WebSocket
media streams protocol. It supports audio conversion, DTMF events, and automatic
call termination.
When auto_hang_up is enabled (default), the serializer will automatically terminate
the Twilio call when an EndFrame or CancelFrame is processed, but requires Twilio
credentials to be provided.
Attributes:
_stream_sid: The Twilio Media Stream SID.
_call_sid: The associated Twilio Call SID.
_account_sid: Twilio account SID for API access.
_auth_token: Twilio authentication token for API access.
_params: Configuration parameters.
_twilio_sample_rate: Sample rate used by Twilio (typically 8kHz).
_sample_rate: Input sample rate for the pipeline.
_resampler: Audio resampler for format conversion.
"""
class InputParams(BaseModel):
"""Configuration parameters for TwilioFrameSerializer.
twilio_sample_rate: int = 8000 # Default Twilio rate (8kHz)
sample_rate: Optional[int] = None # Pipeline input rate
Attributes:
twilio_sample_rate: Sample rate used by Twilio, defaults to 8000 Hz.
sample_rate: Optional override for pipeline input sample rate.
auto_hang_up: Whether to automatically terminate call on EndFrame.
"""
twilio_sample_rate: int = 8000
sample_rate: Optional[int] = None
auto_hang_up: bool = True
def __init__(
self,
stream_sid: str,
call_sid: str,
account_sid: Optional[str] = None,
auth_token: Optional[str] = None,
params: InputParams = InputParams(),
):
"""Initialize the TwilioFrameSerializer.
Args:
stream_sid: The Twilio Media Stream SID.
call_sid: The associated Twilio Call SID.
account_sid: Twilio account SID.
auth_token: Twilio auth token.
params: Configuration parameters.
"""
def __init__(self, stream_sid: str, params: InputParams = InputParams()):
self._stream_sid = stream_sid
self._call_sid = call_sid
self._account_sid = account_sid
self._auth_token = auth_token
self._params = params
self._twilio_sample_rate = self._params.twilio_sample_rate
@@ -93,37 +41,13 @@ class TwilioFrameSerializer(FrameSerializer):
@property
def type(self) -> FrameSerializerType:
"""Gets the serializer type.
Returns:
The serializer type, either TEXT or BINARY.
"""
return FrameSerializerType.TEXT
async def setup(self, frame: StartFrame):
"""Sets up the serializer with pipeline configuration.
Args:
frame: The StartFrame containing pipeline configuration.
"""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
"""Serializes a Pipecat frame to Twilio WebSocket format.
Handles conversion of various frame types to Twilio WebSocket messages.
For EndFrames, initiates call termination if auto_hang_up is enabled.
Args:
frame: The Pipecat frame to serialize.
Returns:
Serialized data as string or bytes, or None if the frame isn't handled.
"""
if self._params.auto_hang_up and isinstance(frame, (EndFrame, CancelFrame)):
await self._hang_up_call()
return None
elif isinstance(frame, StartInterruptionFrame):
if isinstance(frame, StartInterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):
@@ -144,59 +68,7 @@ class TwilioFrameSerializer(FrameSerializer):
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
return json.dumps(frame.message)
# Return None for unhandled frames
return None
async def _hang_up_call(self):
"""Hang up the Twilio call using Twilio's REST API."""
try:
import aiohttp
account_sid = self._account_sid
auth_token = self._auth_token
if not account_sid or not auth_token:
logger.warning(
"Cannot hang up Twilio call: account_sid and auth_token must be provided"
)
return
# Twilio API endpoint for updating calls
endpoint = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{self._call_sid}.json"
# Create basic auth from account_sid and auth_token
auth = aiohttp.BasicAuth(account_sid, auth_token)
# Parameters to set the call status to "completed" (hang up)
params = {"Status": "completed"}
# Make the POST request to update the call
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, auth=auth, data=params) as response:
if response.status == 200:
logger.info(f"Successfully terminated Twilio call {self._call_sid}")
else:
# Get the error details for better debugging
error_text = await response.text()
logger.error(
f"Failed to terminate Twilio call {self._call_sid}: "
f"Status {response.status}, Response: {error_text}"
)
except Exception as e:
logger.exception(f"Failed to hang up Twilio call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Twilio WebSocket data to Pipecat frames.
Handles conversion of Twilio media events to appropriate Pipecat frames.
Args:
data: The raw WebSocket data from Twilio.
Returns:
A Pipecat frame corresponding to the Twilio event, or None if unhandled.
"""
message = json.loads(data)
if message["event"] == "media":

View File

@@ -42,7 +42,7 @@ class GrokLLMService(OpenAILLMService):
Args:
api_key (str): The API key for accessing Grok's API
base_url (str, optional): The base URL for Grok API. Defaults to "https://api.x.ai/v1"
model (str, optional): The model identifier to use. Defaults to "grok-3-beta"
model (str, optional): The model identifier to use. Defaults to "grok-2"
**kwargs: Additional keyword arguments passed to OpenAILLMService
"""
@@ -51,7 +51,7 @@ class GrokLLMService(OpenAILLMService):
*,
api_key: str,
base_url: str = "https://api.x.ai/v1",
model: str = "grok-3-beta",
model: str = "grok-2",
**kwargs,
):
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)

View File

@@ -222,8 +222,12 @@ class BaseInputTransport(FrameProcessor):
async def _handle_end_of_turn(self):
if self.turn_analyzer:
state, prediction = await self.turn_analyzer.analyze_end_of_turn()
state, prediction = await self.get_event_loop().run_in_executor(
self._executor, self.turn_analyzer.analyze_end_of_turn
)
await self._handle_prediction_result(prediction)
await self._handle_end_of_turn_complete(state)
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):

View File

@@ -207,12 +207,10 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._write_frame(frame)
await self._client.disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._write_frame(frame)
await self._client.disconnect()
async def cleanup(self):

View File

@@ -157,8 +157,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
self, websocket: websockets.WebSocketServerProtocol, session_timeout: int
):
"""Wait for session_timeout seconds, if the websocket is still open,
trigger timeout event.
"""
trigger timeout event."""
try:
await asyncio.sleep(session_timeout)
if not websocket.closed:
@@ -196,14 +195,6 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._write_frame(frame)
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._write_frame(frame)
async def cleanup(self):
await super().cleanup()
await self._transport.cleanup()