Compare commits

..

7 Commits

Author SHA1 Message Date
mattie ruth backman
50b19a9e77 minor updates to get started and working on latest modal 2025-04-23 21:25:45 -04:00
Aleix Conchillo Flaqué
f9d1a53e28 Merge pull request #1609 from pipecat-ai/aleix/pyproject-py-typed
pyproject: fix license fields
2025-04-21 16:14:22 -07:00
Mark Backman
3f3010af79 Add a SmartTurnMetricsData class, emitted by Metrics Frame in response to smart turn responses 2025-04-21 18:56:14 -04:00
Aleix Conchillo Flaqué
a02d47ddbd Merge pull request #1625 from 0xPatryk/patch-1
Fixed AttributeError: object has no attribute '_sample_rate"
2025-04-21 15:40:54 -07:00
Patryk
a649aff3e7 Fixed AttributeError: 'OpenAITTSService' object has no attribute '_sample_rate' 2025-04-21 11:03:45 +02:00
Aleix Conchillo Flaqué
d77c37ff14 pyproject: add py.typed (PEP 561) 2025-04-17 17:29:04 -07:00
Aleix Conchillo Flaqué
b4916f9dae pyproject: fix license fields 2025-04-17 17:28:14 -07:00
11 changed files with 97 additions and 267 deletions

View File

@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `SmartTurnMetricsData`, which contains end-of-turn prediction metrics,
to the `MetricsFrame`. Using `MetricsFrame`, you can now retrieve prediction
confidence scores and processing time metrics from the smart turn analyzers.
- Added support for Application Default Credentials in Google services,
`GoogleSTTService`, `GoogleTTSService`, and `GoogleVertexLLMService`.

View File

@@ -10,24 +10,27 @@ import aiohttp
import modal
from bot import _voice_bot_process
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from fastapi.responses import RedirectResponse
from loguru import logger
MAX_SESSION_TIME = 15 * 60 # 15 minutes
app = modal.App("pipecat-modal")
image = modal.Image.debian_slim(python_version="3.12").pip_install_from_requirements(
"requirements.txt"
image = (
modal.Image.debian_slim(python_version="3.13")
.apt_install("ffmpeg")
.pip_install_from_requirements("requirements.txt")
.pip_install("pipecat-ai[daily,silero,cartesia,openai]")
.add_local_python_source("bot")
)
app = modal.App("pipecat-modal", image=image)
@app.function(
image=image,
cpu=1.0,
secrets=[modal.Secret.from_dotenv()],
keep_warm=1,
min_containers=1,
enable_memory_snapshot=True,
max_inputs=1, # Do not reuse instances across requests
retries=0,
@@ -40,7 +43,7 @@ def launch_bot_process(room_url: str, token: str):
image=image,
secrets=[modal.Secret.from_dotenv()],
)
@modal.web_endpoint(method="POST")
@modal.fastapi_endpoint(method="GET")
async def start():
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
@@ -77,4 +80,4 @@ async def start():
# Return room URL to the user to join
# Note: in production, you would want to return a token to the user
return JSONResponse(content={"room_url": room.url, token: token})
return RedirectResponse(room.url)

View File

@@ -1,5 +1,4 @@
python-dotenv==1.0.1
modal==0.71.3
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
fastapi==0.115.6
aiohttp==3.11.11

View File

@@ -1,211 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, EndTaskFrame, Frame, TextFrame, TTSSpeakFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAILLMContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.null_filter import NullFilter
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.sync.event_notifier import EventNotifier
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
class KickParticipantProcessor(FrameProcessor):
"""This processor will kick the participant if they say something inappropriate.
This is a simple example of how to use the LLM to moderate
the conversation. In this case we are using the OpenAI LLM to determine if
the user is saying something inappropriate.
"""
def __init__(self):
"""Initialize the processor."""
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text == "YES":
await self.push_frame(
TTSSpeakFrame(
"You are being kicked from the call because of content moderation. Have a nice day!"
)
)
# Signal that the task should end after processing this frame
await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
else:
await self.push_frame(frame, direction)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
# This is the LLM that will be used to detect if the user has said
# something inappropriate.
moderator_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
moderator_messages = [
{
"role": "system",
"content": """
You are a helpful LLM that will be used to moderate a conversation
between a user and an assistant. Your goal is to determine if the user
is saying something inappropriate. You will be given the user
transcript and you will have to determine if the user is saying
something inappropriate. If you think the user is saying something
inappropriate please respond with "YES". If you think the user is
saying something appropriate please respond with "NO". Examples of inappropriate
content are: hate speech, racism, sexism, bullying, harassment,
violence, self-harm, and any other content that violates the
community guidelines.
""",
},
]
moderator_context = OpenAILLMContext(moderator_messages)
moderator_context_aggregator = moderator_llm.create_context_aggregator(moderator_context)
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
# completed a sentence. So, if it's 'YES' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "YES"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
# This a filter that will wake up the notifier if the given predicate
# (wake_check_filter) returns true.
completeness_check = WakeNotifierFilter(notifier, types=(TextFrame,), filter=wake_check_filter)
# This processor keeps the last context and will let it through once the
# notifier is woken up. We start with the gate open because we send an
# initial context frame to start the conversation.
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier=notifier, start_open=True)
# Notify if the user hasn't said anything.
async def user_idle_notifier(frame):
await notifier.notify()
# Sometimes the LLM will fail detecting if a user should be
# moderated, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0)
kick_participant = KickParticipantProcessor()
# The ParallePipeline input are the user transcripts. We have two
# contexts. The first one will be used to determine if the user is
# moderated and if so the notifier will be woken up. The second
# context is simply the regular context but it's gated waiting for the
# notifier to be woken up.
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
ParallelPipeline(
[
moderator_context_aggregator.user(),
moderator_llm,
kick_participant,
# completeness_check,
NullFilter(),
],
[context_aggregator.user(), gated_context_aggregator, llm],
),
user_idle,
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -6,14 +6,14 @@ build-backend = "setuptools.build_meta"
name = "pipecat-ai"
dynamic = ["version"]
description = "An open source framework for voice (and multimodal) assistants"
license = { text = "BSD 2-Clause License" }
license = "BSD-2-Clause"
license-files = ["LICENSE"]
readme = "README.md"
requires-python = ">=3.10"
keywords = ["webrtc", "audio", "video", "ai"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Topic :: Communications :: Conferencing",
"Topic :: Multimedia :: Sound/Audio",
"Topic :: Multimedia :: Video",
@@ -92,9 +92,11 @@ websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]
[tool.setuptools.packages.find]
# All the following settings are optional:
where = ["src"]
[tool.setuptools.package-data]
"pipecat" = ["py.typed"]
[tool.pytest.ini_options]
addopts = "--verbose"
testpaths = ["tests"]

View File

@@ -6,13 +6,14 @@
import time
from abc import abstractmethod
from typing import Dict, Optional
from typing import Any, Dict, Optional, Tuple
import numpy as np
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
# Default timing parameters
STOP_SECS = 3
@@ -61,7 +62,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
self._speech_triggered = True
if self._speech_start_time is None:
self._speech_start_time = time.time()
logger.debug(f"Speech started at {self._speech_start_time}")
else:
if self._speech_triggered:
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
@@ -87,28 +87,25 @@ class BaseSmartTurn(BaseTurnAnalyzer):
return state
def analyze_end_of_turn(self) -> EndOfTurnState:
logger.debug("Analyzing End of Turn...")
state = self._process_speech_segment(self._audio_buffer)
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state, result = self._process_speech_segment(self._audio_buffer)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
return state
return state, result
def _clear(self, turn_state: EndOfTurnState):
# Reset internal state for next turn
logger.debug("Clearing audio buffer...")
# If the state is still incomplete, keep the _speech_triggered as True
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
self._audio_buffer = []
self._speech_start_time = None
self._silence_ms = 0
def _process_speech_segment(self, audio_buffer) -> EndOfTurnState:
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state = EndOfTurnState.INCOMPLETE
if not audio_buffer:
return state
return state, None
# Extract recent audio segment for prediction
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
@@ -124,15 +121,13 @@ class BaseSmartTurn(BaseTurnAnalyzer):
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
segment_audio = np.concatenate(segment_audio_chunks)
logger.debug(f"Segment audio chunks after start index: {len(segment_audio)}")
# Limit maximum duration
max_samples = int(self._params.max_duration_secs * self.sample_rate)
if len(segment_audio) > max_samples:
# slices the array to keep the last max_samples samples, discarding the earlier part.
segment_audio = segment_audio[-max_samples:]
logger.debug(f"Segment audio chunks after limiting duration: {len(segment_audio)}")
result_data = None
if len(segment_audio) > 0:
start_time = time.perf_counter()
@@ -142,20 +137,33 @@ class BaseSmartTurn(BaseTurnAnalyzer):
)
end_time = time.perf_counter()
logger.debug("--------")
logger.debug(f"Prediction: {'Complete' if result['prediction'] == 1 else 'Incomplete'}")
logger.debug(f"Probability of complete: {result['probability']:.4f}")
logger.debug(f"Prediction took {(end_time - start_time) * 1000:.2f}ms seconds")
else:
logger.debug(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.debug("Captured empty audio segment, skipping prediction.")
# Calculate processing time
e2e_processing_time_ms = (end_time - start_time) * 1000
return state
# Prepare the result data
result_data = SmartTurnMetricsData(
processor="BaseSmartTurn",
is_complete=result["prediction"] == 1,
probability=result["probability"],
inference_time_ms=result.get("inference_time", 0) * 1000,
server_total_time_ms=result.get("total_time", 0) * 1000,
e2e_processing_time_ms=e2e_processing_time_ms,
)
logger.trace(f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}")
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
else:
logger.trace(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.trace("Captured empty audio segment, skipping prediction.")
return state, result_data
@abstractmethod
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, any]:
"""
Abstract method to predict if a turn has ended based on audio.
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, Any]:
"""Abstract method to predict if a turn has ended based on audio.
Args:
buffer: Float32 numpy array of audio samples at 16kHz.

View File

@@ -6,7 +6,9 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional
from typing import Optional, Tuple
from pipecat.metrics.metrics import MetricsData
class EndOfTurnState(Enum):
@@ -15,8 +17,10 @@ class EndOfTurnState(Enum):
class BaseTurnAnalyzer(ABC):
"""
Abstract base class for analyzing user end of turn.
"""Abstract base class for analyzing user end of turn.
This class inherits from BaseObject to leverage its event handling system
while still defining an abstract interface through abstract methods.
"""
def __init__(self, *, sample_rate: Optional[int] = None):
@@ -25,8 +29,7 @@ class BaseTurnAnalyzer(ABC):
@property
def sample_rate(self) -> int:
"""
Returns the current sample rate.
"""Returns the current sample rate.
Returns:
int: The effective sample rate for audio processing.
@@ -34,8 +37,7 @@ class BaseTurnAnalyzer(ABC):
return self._sample_rate
def set_sample_rate(self, sample_rate: int):
"""
Sets the sample rate for audio processing.
"""Sets the sample rate for audio processing.
If the initial sample rate was provided, it will use that; otherwise, it sets to
the provided sample rate.
@@ -48,8 +50,7 @@ class BaseTurnAnalyzer(ABC):
@property
@abstractmethod
def speech_triggered(self) -> bool:
"""
Determines if speech has been detected.
"""Determines if speech has been detected.
Returns:
bool: True if speech is triggered, otherwise False.
@@ -58,8 +59,7 @@ class BaseTurnAnalyzer(ABC):
@abstractmethod
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""
Appends audio data for analysis.
"""Appends audio data for analysis.
Args:
buffer (bytes): The audio data to append.
@@ -71,9 +71,8 @@ class BaseTurnAnalyzer(ABC):
pass
@abstractmethod
def analyze_end_of_turn(self) -> EndOfTurnState:
"""
Analyzes if an end of turn has occurred based on the audio input.
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
"""Analyzes if an end of turn has occurred based on the audio input.
Returns:
EndOfTurnState: The result of the end of turn analysis.

View File

@@ -30,3 +30,13 @@ class LLMUsageMetricsData(MetricsData):
class TTSUsageMetricsData(MetricsData):
value: int
class SmartTurnMetricsData(MetricsData):
"""Metrics data for smart turn predictions."""
is_complete: bool
probability: float
inference_time_ms: float
server_total_time_ms: float
e2e_processing_time_ms: float

View File

@@ -1 +0,0 @@

View File

@@ -70,7 +70,7 @@ class OpenAITTSService(TTSService):
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
logger.warning(
f"OpenAI TTS only supports {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
f"Current rate of {self.sample_rate}Hz may cause issues."
f"Current rate of {sample_rate}Hz may cause issues."
)
super().__init__(sample_rate=sample_rate, **kwargs)

View File

@@ -6,11 +6,14 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from typing import Mapping, Optional
from loguru import logger
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.audio.turn.base_turn_analyzer import (
BaseTurnAnalyzer,
EndOfTurnState,
)
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
BotInterruptionFrame,
@@ -21,6 +24,7 @@ from pipecat.frames.frames import (
FilterUpdateSettingsFrame,
Frame,
InputAudioRawFrame,
MetricsFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -29,6 +33,7 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
VADParamsUpdateFrame,
)
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
@@ -78,6 +83,7 @@ class BaseInputTransport(FrameProcessor):
# Configure End of turn analyzer.
if self._params.turn_analyzer:
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._sample_rate)
@@ -216,9 +222,12 @@ class BaseInputTransport(FrameProcessor):
async def _handle_end_of_turn(self):
if self.turn_analyzer:
state = await self.get_event_loop().run_in_executor(
state, prediction = await self.get_event_loop().run_in_executor(
self._executor, self.turn_analyzer.analyze_end_of_turn
)
await self._handle_prediction_result(prediction)
await self._handle_end_of_turn_complete(state)
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):
@@ -263,3 +272,11 @@ class BaseInputTransport(FrameProcessor):
await self.push_frame(frame)
self._audio_in_queue.task_done()
async def _handle_prediction_result(self, result: MetricsData):
"""Handle a prediction result event from the turn analyzer.
Args:
result: The prediction result MetricsData.
"""
await self.push_frame(MetricsFrame(data=[result]))