Compare commits

...

22 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
b91780ced2 Merge pull request #1638 from pipecat-ai/aleix/pipecat-0.0.64
update CHANGELOG for 0.0.64
2025-04-22 17:35:25 -07:00
Aleix Conchillo Flaqué
8ded666958 update CHANGELOG for 0.0.64 2025-04-22 17:32:06 -07:00
Filipi da Silva Fuchter
2490c804a5 Merge pull request #1631 from pipecat-ai/smart_turn_timeout
Returning the turn as complete if the request don’t return a result within SmartTurnParams stop_secs
2025-04-22 19:51:10 -03:00
Filipi Fuchter
dd8856a673 Merge branch 'main' into smart_turn_timeout
# Conflicts:
#	dot-env.template
2025-04-22 19:49:32 -03:00
Aleix Conchillo Flaqué
e7da08dab1 move smart turn files to audio.turn.smart_turn package 2025-04-22 15:29:31 -07:00
Aleix Conchillo Flaqué
ae60d42016 s/SmartTurnAnalyzer/HttpSmartTurnAnalyzer/ and add FalSmartTurnAnalyzer 2025-04-22 15:13:12 -07:00
Aleix Conchillo Flaqué
50e8d82ece SmartTurn: some linting cleanup 2025-04-22 14:39:02 -07:00
Mark Backman
cc9901a82f Replace httpx with aiohttp 2025-04-22 17:14:19 -04:00
Aleix Conchillo Flaqué
1fd43e8a3f Merge pull request #1636 from pipecat-ai/aleix/examples-logging
examples: always use loguru for logging
2025-04-22 13:06:40 -07:00
Aleix Conchillo Flaqué
fdc508a1a5 examples: always use loguru for logging 2025-04-22 11:51:49 -07:00
Mark Backman
37269db247 Merge pull request #1634 from pipecat-ai/mb/twilio-end-call
Automatically hangup Twilio calls
2025-04-22 14:05:10 -04:00
Mark Backman
51269aabbd Added cancel method to WebsocketServerOutputTransport 2025-04-22 13:58:39 -04:00
Mark Backman
74ecc19e09 Code review feedback 2025-04-22 13:54:12 -04:00
Mark Backman
c6d48c16df Add twilio to pyproject.toml, update demo to use twilio option 2025-04-22 13:01:56 -04:00
Mark Backman
873d84aa09 Twilio serializer to return None 2025-04-22 12:50:11 -04:00
Mark Backman
7360866c97 Add docstrings 2025-04-22 12:49:17 -04:00
Mark Backman
81f4768661 Automatically hangup Twilio calls 2025-04-22 12:45:34 -04:00
Vanessa Pyne
972d65f61b Merge pull request #1628 from pipecat-ai/vp-typo-fixes
typo fixes in phone-chatbot example
2025-04-22 10:05:56 -05:00
Mark Backman
1da9d398e3 Merge pull request #1619 from pipecat-ai/mb/grok-3-beta
GrokLLMService uses grok-3-beta as default model
2025-04-22 10:33:32 -04:00
Filipi Fuchter
7358bc6428 Returning the turn as complete if the request don’t return a result within SmartTurnParams stop_secs 2025-04-22 10:35:14 -03:00
vipyne
a6af499f84 typo fixes in phone-chatbot example 2025-04-21 23:49:13 -05:00
Mark Backman
a9b551d73e GrokLLMService uses grok-3-beta as default model 2025-04-19 08:05:59 -04:00
28 changed files with 492 additions and 284 deletions

View File

@@ -5,10 +5,14 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.64] - 2025-04-22
### Added
- Added automatic hangup logic to the Twilio serializer. This feature hangs up
the Twilio call when an `EndFrame` or `CancelFrame` is received. It is
enabled by default and is configurable via the `auto_hang_up` `InputParam`.
- Added `SmartTurnMetricsData`, which contains end-of-turn prediction metrics,
to the `MetricsFrame`. Using `MetricsFrame`, you can now retrieve prediction
confidence scores and processing time metrics from the smart turn analyzers.
@@ -17,9 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`GoogleSTTService`, `GoogleTTSService`, and `GoogleVertexLLMService`.
- Added support for Smart Turn Detection via the `turn_analyzer` transport
parameter. You can now choose between `SmartTurnAnalyzer()` for remote
inference or `LocalCoreMLSmartTurnAnalyzer()` for on-device inference using
Core ML.
parameter. You can now choose between `HttpSmartTurnAnalyzer()` or
`FalSmartTurnAnalyzer()` for remote inference or
`LocalCoreMLSmartTurnAnalyzer()` for on-device inference using Core ML.
- `DeepgramTTSService` accepts `base_url` argument again, allowing you to
connect to an on-prem service.
@@ -44,6 +48,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `GrokLLMService` now uses `grok-3-beta` as its default model.
- Daily's REST helpers now include an `eject_at_token_exp` param, which ejects
the user when their token expires. This new parameter defaults to False.
Also, the default value for `enable_prejoin_ui` changed to False and
@@ -78,6 +84,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue where LLM input parameters were not working and applied correctly in `GoogleVertexLLMService`, causing
unexpected behavior during inference.
### Other
- Updated the `twilio-chatbot` example to use the auto-hangup feature.
## [0.0.63] - 2025-04-11
### Added

View File

@@ -96,4 +96,8 @@ PIPER_BASE_URL=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=
REMOTE_SMART_TURN_URL=
FAL_SMART_TURN_API_KEY=...
# Twilio
TWILIO_ACCOUNT_SID=
TWILIO_AUTH_TOKEN=

View File

@@ -0,0 +1,113 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.fal_smart_turn import FalSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=FalSmartTurnAnalyzer(
api_key=os.getenv("FAL_SMART_TURN_API_KEY"), aiohttp_session=session
),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -1,111 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn import SmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
remote_smart_turn_url = os.getenv("REMOTE_SMART_TURN_URL")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=SmartTurnAnalyzer(url=remote_smart_turn_url),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -9,8 +9,8 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.local_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_coreml_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline

View File

@@ -7,7 +7,6 @@
import argparse
import asyncio
import importlib.util
import logging
import os
import sys
from contextlib import asynccontextmanager
@@ -18,6 +17,7 @@ import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -25,14 +25,6 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
# Configure logger
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger("pipecat-server")
app = FastAPI()
# Store connections by pc_id
@@ -162,10 +154,11 @@ def main():
parser.add_argument("--verbose", "-v", action="count", default=0)
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logger.add(sys.stderr, level="TRACE")
else:
logging.basicConfig(level=logging.INFO)
logger.add(sys.stderr, level="DEBUG")
# Infer the bot file from the caller if not provided explicitly
bot_file = args.bot_file

View File

@@ -26,9 +26,6 @@ from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -1,6 +1,12 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from typing import Dict
@@ -9,6 +15,7 @@ from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -16,8 +23,6 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
@@ -81,9 +86,10 @@ if __name__ == "__main__":
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logger.add(sys.stderr, level="TRACE")
else:
logging.basicConfig(level=logging.INFO)
logger.add(sys.stderr, level="DEBUG")
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -25,9 +25,6 @@ from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class EdgeDetectionProcessor(FrameProcessor):
def __init__(self, camera_out_width, camera_out_height: int):

View File

@@ -1,6 +1,12 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from typing import Dict
@@ -9,6 +15,7 @@ from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -16,8 +23,6 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
@@ -81,9 +86,10 @@ if __name__ == "__main__":
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logger.add(sys.stderr, level="TRACE")
else:
logging.basicConfig(level=logging.INFO)
logger.add(sys.stderr, level="DEBUG")
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -20,10 +20,6 @@ from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
SYSTEM_INSTRUCTION = f"""
"You are Gemini Chatbot, a friendly, helpful robot.

View File

@@ -1,6 +1,12 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from typing import Dict
@@ -9,14 +15,13 @@ from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
@@ -73,9 +78,10 @@ if __name__ == "__main__":
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logger.add(sys.stderr, level="TRACE")
else:
logging.basicConfig(level=logging.INFO)
logger.add(sys.stderr, level="DEBUG")
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -1,6 +1,7 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (optional: for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=
DAILY_API_URL=api.daily.co/v1
DAILY_API_URL=https://api.daily.co/v1
DEEPGRAM_API_KEY=
OPENAI_API_KEY=
GOOGLE_API_KEY
CARTESIA_API_KEY=

View File

@@ -1,5 +1,5 @@
pipecat-ai[daily,cartesia,openai,google,silero]
fastapi==3.11.12
pipecat-ai[daily,cartesia,deepgram,openai,google,silero]
fastapi==0.115.6
uvicorn
python-dotenv
twilio

View File

@@ -54,7 +54,14 @@ async def save_audio(server_name: str, audio: bytes, sample_rate: int, num_chann
logger.info("No audio data to save")
async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
async def run_bot(websocket_client: WebSocket, stream_sid: str, call_sid: str, testing: bool):
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
@@ -64,7 +71,7 @@ async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid),
serializer=serializer,
),
)

View File

@@ -38,8 +38,9 @@ async def websocket_endpoint(websocket: WebSocket):
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data["start"]["streamSid"]
call_sid = call_data["start"]["callSid"]
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid, app.state.testing)
await run_bot(websocket, stream_sid, call_sid, app.state.testing)
if __name__ == "__main__":

View File

@@ -71,7 +71,7 @@ class BaseTurnAnalyzer(ABC):
pass
@abstractmethod
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
"""Analyzes if an end of turn has occurred based on the audio input.
Returns:

View File

@@ -1,75 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import io
import os
from typing import Dict
import numpy as np
import requests
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
class SmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, url: str, **kwargs):
super().__init__(**kwargs)
self.remote_smart_turn_url = url
if not self.remote_smart_turn_url:
logger.error("remote_smart_turn_url is not set.")
raise Exception("remote_smart_turn_url must be provided.")
# Use a session to reuse connections (keep-alive)
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
def _send_raw_request(self, data_bytes: bytes):
headers = {"Content-Type": "application/octet-stream"}
logger.trace(
f"Sending {len(data_bytes)} bytes as raw body to {self.remote_smart_turn_url}..."
)
try:
response = self.session.post(
self.remote_smart_turn_url,
data=data_bytes,
headers=headers,
timeout=60,
)
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status_code}")
if response.ok:
try:
logger.trace("Response JSON:")
logger.trace(response.json())
return response.json()
except requests.exceptions.JSONDecodeError:
logger.trace("Response Content (non-JSON):")
logger.trace(response.text)
else:
logger.trace("Response Content (Error):")
logger.trace(response.text)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
serialized_array = self._serialize_array(audio_array)
return self._send_raw_request(serialized_array)

View File

@@ -30,6 +30,10 @@ class SmartTurnParams(BaseModel):
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
class SmartTurnTimeoutException(Exception):
pass
class BaseSmartTurn(BaseTurnAnalyzer):
def __init__(
self, *, sample_rate: Optional[int] = None, params: SmartTurnParams = SmartTurnParams()
@@ -42,7 +46,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
self._audio_buffer = []
self._speech_triggered = False
self._silence_ms = 0
self._speech_start_time = None
self._speech_start_time = 0
@property
def speech_triggered(self) -> bool:
@@ -60,7 +64,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
# Reset silence tracking on speech
self._silence_ms = 0
self._speech_triggered = True
if self._speech_start_time is None:
if self._speech_start_time == 0:
self._speech_start_time = time.time()
else:
if self._speech_triggered:
@@ -87,8 +91,8 @@ class BaseSmartTurn(BaseTurnAnalyzer):
return state
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state, result = self._process_speech_segment(self._audio_buffer)
async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state, result = await self._process_speech_segment(self._audio_buffer)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
@@ -98,10 +102,12 @@ class BaseSmartTurn(BaseTurnAnalyzer):
# If the state is still incomplete, keep the _speech_triggered as True
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
self._audio_buffer = []
self._speech_start_time = None
self._speech_start_time = 0
self._silence_ms = 0
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
async def _process_speech_segment(
self, audio_buffer
) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state = EndOfTurnState.INCOMPLETE
if not audio_buffer:
@@ -131,30 +137,41 @@ class BaseSmartTurn(BaseTurnAnalyzer):
if len(segment_audio) > 0:
start_time = time.perf_counter()
result = self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE if result["prediction"] == 1 else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
try:
result = await self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE
if result["prediction"] == 1
else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
# Calculate processing time
e2e_processing_time_ms = (end_time - start_time) * 1000
# Calculate processing time
e2e_processing_time_ms = (end_time - start_time) * 1000
# Prepare the result data
result_data = SmartTurnMetricsData(
processor="BaseSmartTurn",
is_complete=result["prediction"] == 1,
probability=result["probability"],
inference_time_ms=result.get("inference_time", 0) * 1000,
server_total_time_ms=result.get("total_time", 0) * 1000,
e2e_processing_time_ms=e2e_processing_time_ms,
)
# Prepare the result data
result_data = SmartTurnMetricsData(
processor="BaseSmartTurn",
is_complete=result["prediction"] == 1,
probability=result["probability"],
inference_time_ms=result.get("inference_time", 0) * 1000,
server_total_time_ms=result.get("total_time", 0) * 1000,
e2e_processing_time_ms=e2e_processing_time_ms,
)
logger.trace(
f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}"
)
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
except SmartTurnTimeoutException:
logger.debug(
f"End of Turn complete due to stop_secs. Silence in ms: {self._silence_ms}"
)
state = EndOfTurnState.COMPLETE
logger.trace(f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}")
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
else:
logger.trace(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.trace("Captured empty audio segment, skipping prediction.")
@@ -162,11 +179,11 @@ class BaseSmartTurn(BaseTurnAnalyzer):
return state, result_data
@abstractmethod
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, Any]:
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Abstract method to predict if a turn has ended based on audio.
Args:
buffer: Float32 numpy array of audio samples at 16kHz.
audio_array: Float32 numpy array of audio samples at 16kHz.
Returns:
Dictionary with:

View File

@@ -0,0 +1,26 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Optional
import aiohttp
from pipecat.audio.turn.smart_turn.http_smart_turn import HttpSmartTurnAnalyzer
class FalSmartTurnAnalyzer(HttpSmartTurnAnalyzer):
def __init__(
self,
*,
aiohttp_session: aiohttp.ClientSession,
url: str = "https://fal.run/fal-ai/smart-turn/raw",
api_key: Optional[str] = None,
**kwargs,
):
headers = {}
if api_key:
headers = {"Authorization": f"Key {api_key}"}
super().__init__(url=url, aiohttp_session=aiohttp_session, headers=headers, **kwargs)

View File

@@ -0,0 +1,80 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
from typing import Any, Dict
import aiohttp
import numpy as np
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn, SmartTurnTimeoutException
class HttpSmartTurnAnalyzer(BaseSmartTurn):
def __init__(
self,
*,
url: str,
aiohttp_session: aiohttp.ClientSession,
headers: Dict[str, str] = {},
**kwargs,
):
super().__init__(**kwargs)
self._url = url
self._headers = headers
self._aiohttp_session = aiohttp_session
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
async def _send_raw_request(self, data_bytes: bytes) -> Dict[str, Any]:
headers = {"Content-Type": "application/octet-stream"}
headers.update(self._headers)
logger.trace(f"Sending {len(data_bytes)} bytes as raw body to {self._url}...")
try:
timeout = aiohttp.ClientTimeout(total=self._params.stop_secs)
async with self._aiohttp_session.post(
self._url, data=data_bytes, headers=headers, timeout=timeout
) as response:
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status}")
if response.status == 200:
try:
json_data = await response.json()
logger.trace("Response JSON:")
logger.trace(json_data)
return json_data
except aiohttp.ContentTypeError:
# Non-JSON response
text = await response.text()
logger.trace("Response Content (non-JSON):")
logger.trace(text)
raise Exception(f"Non-JSON response: {text}")
else:
error_text = await response.text()
logger.trace("Response Content (Error):")
logger.trace(error_text)
response.raise_for_status()
except asyncio.TimeoutError:
logger.error(f"Request timed out after {self._params.stop_secs} seconds")
raise SmartTurnTimeoutException(f"Request exceeded {self._params.stop_secs} seconds.")
except aiohttp.ClientError as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)

View File

@@ -5,17 +5,16 @@
#
import os
from typing import Dict
from typing import Any, Dict
import numpy as np
import torch
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
try:
import coremltools as ct
import torch
from transformers import AutoFeatureExtractor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -26,7 +25,7 @@ except ModuleNotFoundError as e:
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, smart_turn_model_path: str, **kwargs):
def __init__(self, *, smart_turn_model_path: str, **kwargs):
super().__init__(**kwargs)
if not smart_turn_model_path:
@@ -41,7 +40,7 @@ class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
self._turn_model = ct.models.MLModel(core_ml_model_path)
logger.debug("Loaded Local Smart Turn")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,

View File

@@ -8,11 +8,14 @@ import base64
import json
from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
@@ -26,12 +29,61 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
class TwilioFrameSerializer(FrameSerializer):
class InputParams(BaseModel):
twilio_sample_rate: int = 8000 # Default Twilio rate (8kHz)
sample_rate: Optional[int] = None # Pipeline input rate
"""Serializer for Twilio Media Streams WebSocket protocol.
def __init__(self, stream_sid: str, params: InputParams = InputParams()):
This serializer handles converting between Pipecat frames and Twilio's WebSocket
media streams protocol. It supports audio conversion, DTMF events, and automatic
call termination.
When auto_hang_up is enabled (default), the serializer will automatically terminate
the Twilio call when an EndFrame or CancelFrame is processed, but requires Twilio
credentials to be provided.
Attributes:
_stream_sid: The Twilio Media Stream SID.
_call_sid: The associated Twilio Call SID.
_account_sid: Twilio account SID for API access.
_auth_token: Twilio authentication token for API access.
_params: Configuration parameters.
_twilio_sample_rate: Sample rate used by Twilio (typically 8kHz).
_sample_rate: Input sample rate for the pipeline.
_resampler: Audio resampler for format conversion.
"""
class InputParams(BaseModel):
"""Configuration parameters for TwilioFrameSerializer.
Attributes:
twilio_sample_rate: Sample rate used by Twilio, defaults to 8000 Hz.
sample_rate: Optional override for pipeline input sample rate.
auto_hang_up: Whether to automatically terminate call on EndFrame.
"""
twilio_sample_rate: int = 8000
sample_rate: Optional[int] = None
auto_hang_up: bool = True
def __init__(
self,
stream_sid: str,
call_sid: str,
account_sid: Optional[str] = None,
auth_token: Optional[str] = None,
params: InputParams = InputParams(),
):
"""Initialize the TwilioFrameSerializer.
Args:
stream_sid: The Twilio Media Stream SID.
call_sid: The associated Twilio Call SID.
account_sid: Twilio account SID.
auth_token: Twilio auth token.
params: Configuration parameters.
"""
self._stream_sid = stream_sid
self._call_sid = call_sid
self._account_sid = account_sid
self._auth_token = auth_token
self._params = params
self._twilio_sample_rate = self._params.twilio_sample_rate
@@ -41,13 +93,37 @@ class TwilioFrameSerializer(FrameSerializer):
@property
def type(self) -> FrameSerializerType:
"""Gets the serializer type.
Returns:
The serializer type, either TEXT or BINARY.
"""
return FrameSerializerType.TEXT
async def setup(self, frame: StartFrame):
"""Sets up the serializer with pipeline configuration.
Args:
frame: The StartFrame containing pipeline configuration.
"""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
if isinstance(frame, StartInterruptionFrame):
"""Serializes a Pipecat frame to Twilio WebSocket format.
Handles conversion of various frame types to Twilio WebSocket messages.
For EndFrames, initiates call termination if auto_hang_up is enabled.
Args:
frame: The Pipecat frame to serialize.
Returns:
Serialized data as string or bytes, or None if the frame isn't handled.
"""
if self._params.auto_hang_up and isinstance(frame, (EndFrame, CancelFrame)):
await self._hang_up_call()
return None
elif isinstance(frame, StartInterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):
@@ -68,7 +144,59 @@ class TwilioFrameSerializer(FrameSerializer):
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
return json.dumps(frame.message)
# Return None for unhandled frames
return None
async def _hang_up_call(self):
"""Hang up the Twilio call using Twilio's REST API."""
try:
import aiohttp
account_sid = self._account_sid
auth_token = self._auth_token
if not account_sid or not auth_token:
logger.warning(
"Cannot hang up Twilio call: account_sid and auth_token must be provided"
)
return
# Twilio API endpoint for updating calls
endpoint = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{self._call_sid}.json"
# Create basic auth from account_sid and auth_token
auth = aiohttp.BasicAuth(account_sid, auth_token)
# Parameters to set the call status to "completed" (hang up)
params = {"Status": "completed"}
# Make the POST request to update the call
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, auth=auth, data=params) as response:
if response.status == 200:
logger.info(f"Successfully terminated Twilio call {self._call_sid}")
else:
# Get the error details for better debugging
error_text = await response.text()
logger.error(
f"Failed to terminate Twilio call {self._call_sid}: "
f"Status {response.status}, Response: {error_text}"
)
except Exception as e:
logger.exception(f"Failed to hang up Twilio call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Twilio WebSocket data to Pipecat frames.
Handles conversion of Twilio media events to appropriate Pipecat frames.
Args:
data: The raw WebSocket data from Twilio.
Returns:
A Pipecat frame corresponding to the Twilio event, or None if unhandled.
"""
message = json.loads(data)
if message["event"] == "media":

View File

@@ -42,7 +42,7 @@ class GrokLLMService(OpenAILLMService):
Args:
api_key (str): The API key for accessing Grok's API
base_url (str, optional): The base URL for Grok API. Defaults to "https://api.x.ai/v1"
model (str, optional): The model identifier to use. Defaults to "grok-2"
model (str, optional): The model identifier to use. Defaults to "grok-3-beta"
**kwargs: Additional keyword arguments passed to OpenAILLMService
"""
@@ -51,7 +51,7 @@ class GrokLLMService(OpenAILLMService):
*,
api_key: str,
base_url: str = "https://api.x.ai/v1",
model: str = "grok-2",
model: str = "grok-3-beta",
**kwargs,
):
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)

View File

@@ -222,12 +222,8 @@ class BaseInputTransport(FrameProcessor):
async def _handle_end_of_turn(self):
if self.turn_analyzer:
state, prediction = await self.get_event_loop().run_in_executor(
self._executor, self.turn_analyzer.analyze_end_of_turn
)
state, prediction = await self.turn_analyzer.analyze_end_of_turn()
await self._handle_prediction_result(prediction)
await self._handle_end_of_turn_complete(state)
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):

View File

@@ -207,10 +207,12 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._write_frame(frame)
await self._client.disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._write_frame(frame)
await self._client.disconnect()
async def cleanup(self):

View File

@@ -157,7 +157,8 @@ class WebsocketServerInputTransport(BaseInputTransport):
self, websocket: websockets.WebSocketServerProtocol, session_timeout: int
):
"""Wait for session_timeout seconds, if the websocket is still open,
trigger timeout event."""
trigger timeout event.
"""
try:
await asyncio.sleep(session_timeout)
if not websocket.closed:
@@ -195,6 +196,14 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._write_frame(frame)
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._write_frame(frame)
async def cleanup(self):
await super().cleanup()
await self._transport.cleanup()