Compare commits
1 Commits
fix/event-
...
mb/tts-spe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8dbf9728b |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -51,7 +51,4 @@ docs/api/_build/
|
||||
docs/api/api
|
||||
|
||||
# uv
|
||||
.python-version
|
||||
|
||||
# Pipecat
|
||||
whisker_setup.py
|
||||
.python-version
|
||||
@@ -73,9 +73,9 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
|
||||
|
||||
| Category | Services |
|
||||
| ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [Hathora](https://docs.pipecat.ai/server/services/stt/hathora), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
|
||||
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hathora](https://docs.pipecat.ai/server/services/tts/hathora), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/server/services/s2s/ultravox), |
|
||||
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
|
||||
| Serializers | [Exotel](https://docs.pipecat.ai/server/utilities/serializers/exotel), [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx), [Vonage](https://docs.pipecat.ai/server/utilities/serializers/vonage) |
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
- Added Hathora service to support Hathora-hosted TTS and STT models (only non-streaming)
|
||||
@@ -1,8 +0,0 @@
|
||||
- Fixed an issue where the "bot-llm-text" RTVI event would not fire for realtime (speech-to-speech) services:
|
||||
|
||||
- `AWSNovaSonicLLMService`
|
||||
- `GeminiLiveLLMService`
|
||||
- `OpenAIRealtimeLLMService`
|
||||
- `GrokRealtimeLLMService`
|
||||
|
||||
The issue was that these services weren't pushing `LLMTextFrame`s. Now they do.
|
||||
@@ -1 +0,0 @@
|
||||
- Added the `additional_headers` param to `WebsocketClientParams`, allowing `WebsocketClientTransport` to send custom headers on connect, for cases such as authentication.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `MinWordsUserTurnStartStrategy` to not aggregate transcriptions, preventing incorrect turn starts when words are spoken with pauses between them.
|
||||
1
changelog/3465.changed.md
Normal file
1
changelog/3465.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added a parameter called `append_to_context` to the `TTSSpeakFrame`, which controls whether or not the `TTSSpeakFrame` should be added to the context. By default this value is False.
|
||||
@@ -1 +0,0 @@
|
||||
- For consistency with other package names, we just deprecated `pipecat.turns.mute` (introduced in Pipecat 0.0.99) in favor of `pipecat.turns.user_mute`.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed an issue where Grok Realtime would error out when running with SmallWebRTC transport.
|
||||
@@ -1 +0,0 @@
|
||||
- Added `UserIdleController` for detecting user idle state, integrated into `LLMUserAggregator` and `UserTurnProcessor` via optional `user_idle_timeout` parameter. Emits `on_user_turn_idle` event for application-level handling. Deprecated `UserIdleProcessor` in favor of the new compositional approach.
|
||||
@@ -1 +0,0 @@
|
||||
- Throttle `UserSpeakingFrame` to broadcast at most every 200ms instead of on every audio chunk, reducing frame processing overhead during user speech.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed a `Mem0MemoryService` issue where passing `async_mode: true` was causing an error. See https://docs.mem0.ai/platform/features/async-mode-default-change.
|
||||
@@ -1,3 +0,0 @@
|
||||
- Fixed `AzureTTSService` transcript formatting issues:
|
||||
- Punctuation now appears without extra spaces (e.g., "Hello!" instead of "Hello !")
|
||||
- CJK languages (Chinese, Japanese, Korean) no longer have unwanted spaces between characters
|
||||
@@ -1 +0,0 @@
|
||||
- Added `on_user_mute_started` and `on_user_mute_stopped` event handlers to `LLMUserAggregator` for tracking user mute state changes.
|
||||
@@ -85,9 +85,6 @@ GROK_API_KEY=...
|
||||
# Groq
|
||||
GROQ_API_KEY=...
|
||||
|
||||
# Hathora
|
||||
HATHORA_API_KEY=...
|
||||
|
||||
# Heygen
|
||||
HEYGEN_API_KEY=...
|
||||
HEYGEN_LIVE_AVATAR_API_KEY=...
|
||||
|
||||
@@ -13,12 +13,7 @@ from loguru import logger
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import (
|
||||
EndTaskFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, LLMRunFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -27,7 +22,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
@@ -41,43 +36,6 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class IdleHandler:
|
||||
"""Helper class to manage user idle retry logic."""
|
||||
|
||||
def __init__(self):
|
||||
self._retry_count = 0
|
||||
|
||||
def reset(self):
|
||||
"""Reset the retry count when user becomes active."""
|
||||
self._retry_count = 0
|
||||
|
||||
async def handle_idle(self, aggregator):
|
||||
"""Handle user idle event with escalating prompts."""
|
||||
self._retry_count += 1
|
||||
|
||||
if self._retry_count == 1:
|
||||
# First attempt: Add a gentle prompt to the conversation
|
||||
message = {
|
||||
"role": "system",
|
||||
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
|
||||
}
|
||||
await aggregator.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
|
||||
elif self._retry_count == 2:
|
||||
# Second attempt: More direct prompt
|
||||
message = {
|
||||
"role": "system",
|
||||
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
|
||||
}
|
||||
await aggregator.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
|
||||
else:
|
||||
# Third attempt: End the conversation
|
||||
await aggregator.push_frame(
|
||||
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
|
||||
)
|
||||
await aggregator.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
@@ -126,15 +84,42 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
|
||||
),
|
||||
user_idle_timeout=5.0, # Detect user idle after 5 seconds
|
||||
),
|
||||
)
|
||||
|
||||
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
|
||||
if retry_count == 1:
|
||||
# First attempt: Add a gentle prompt to the conversation
|
||||
message = {
|
||||
"role": "system",
|
||||
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
|
||||
}
|
||||
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
|
||||
return True
|
||||
elif retry_count == 2:
|
||||
# Second attempt: More direct prompt
|
||||
message = {
|
||||
"role": "system",
|
||||
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
|
||||
}
|
||||
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
|
||||
return True
|
||||
else:
|
||||
# Third attempt: End the conversation
|
||||
await user_idle.push_frame(
|
||||
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
|
||||
)
|
||||
await task.queue_frame(EndFrame())
|
||||
return False
|
||||
|
||||
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User aggregator with built-in idle detection
|
||||
user_idle, # Idle user check-in
|
||||
user_aggregator,
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
@@ -151,17 +136,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
# Set up idle handling with retry logic
|
||||
idle_handler = IdleHandler()
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(aggregator):
|
||||
await idle_handler.handle_idle(aggregator)
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(aggregator, strategy):
|
||||
idle_handler.reset()
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
@@ -21,10 +25,12 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.hathora.stt import HathoraSTTService
|
||||
from pipecat.services.hathora.tts import HathoraTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.deepgram.tts import DeepgramTTSService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
@@ -34,6 +40,15 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
# Add a delay to test interruption during function calls
|
||||
logger.info("Weather API call starting...")
|
||||
await asyncio.sleep(5) # 5-second delay
|
||||
logger.info("Weather API call completed")
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
@@ -59,30 +74,50 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = HathoraSTTService(
|
||||
model="nvidia-parakeet-tdt-0.6b-v3",
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
# Configure the mute processor with both strategies
|
||||
stt_mute_processor = STTMuteFilter(
|
||||
config=STTMuteConfig(
|
||||
strategies={
|
||||
STTMuteStrategy.MUTE_UNTIL_FIRST_BOT_COMPLETE,
|
||||
STTMuteStrategy.FUNCTION_CALL,
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
tts = HathoraTTSService(
|
||||
model="hexgrad-kokoro-82m",
|
||||
)
|
||||
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
|
||||
|
||||
# See https://models.hathora.dev/model/qwen3-30b-a3b
|
||||
llm = OpenAILLMService(
|
||||
base_url="https://app-362f7ca1-6975-4e18-a605-ab202bf2c315.app.hathora.dev/v1",
|
||||
api_key=os.getenv("HATHORA_API_KEY"),
|
||||
model=None,
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function])
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
|
||||
"content": "You are a helpful assistant who can check the weather. Always check the weather when a location is mentioned. Respond concisely and naturally. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
context = LLMContext(messages, tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
@@ -94,12 +129,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
stt, # STT
|
||||
stt_mute_processor, # Add the mute processor between STT and context aggregator
|
||||
user_aggregator, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
assistant_aggregator, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
@@ -115,8 +151,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
# Kick off the conversation with a weather-related prompt
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Ask the user what city they'd like to know the weather for.",
|
||||
}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
@@ -34,7 +34,7 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.user_mute import (
|
||||
from pipecat.turns.mute import (
|
||||
FunctionCallUserMuteStrategy,
|
||||
MuteUntilFirstBotCompleteUserMuteStrategy,
|
||||
)
|
||||
@@ -161,14 +161,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
@user_aggregator.event_handler("on_user_mute_started")
|
||||
async def on_user_mute_started(aggregator):
|
||||
logger.info(f"User mute started")
|
||||
|
||||
@user_aggregator.event_handler("on_user_mute_stopped")
|
||||
async def on_user_mute_stopped(aggregator):
|
||||
logger.info(f"User mute stopped")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -41,11 +41,8 @@ dependencies = [
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://pipecat.ai"
|
||||
Documentation = "https://docs.pipecat.ai/"
|
||||
Source = "https://github.com/pipecat-ai/pipecat"
|
||||
Issues = "https://github.com/pipecat-ai/pipecat/issues"
|
||||
Changelog = "https://github.com/pipecat-ai/pipecat/blob/main/CHANGELOG.md"
|
||||
Website = "https://pipecat.ai"
|
||||
|
||||
[project.optional-dependencies]
|
||||
aic = [ "aic-sdk~=1.2.0" ]
|
||||
|
||||
@@ -137,7 +137,6 @@ TESTS_07 = [
|
||||
# ("07zd-interruptible-aicoustics.py", EVAL_SIMPLE_MATH),
|
||||
("07ze-interruptible-hume.py", EVAL_SIMPLE_MATH),
|
||||
("07zf-interruptible-gradium.py", EVAL_SIMPLE_MATH),
|
||||
("07zh-interruptible-hathora.py", EVAL_SIMPLE_MATH),
|
||||
# Needs a local XTTS docker instance running.
|
||||
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),
|
||||
# Needs a Krisp license.
|
||||
|
||||
@@ -341,6 +341,11 @@ class TextFrame(DataFrame):
|
||||
|
||||
Parameters:
|
||||
text: The text content.
|
||||
skip_tts: Whether this text should be skipped by the TTS service.
|
||||
includes_inter_frame_spaces: Whether any necessary inter-frame (leading/trailing) spaces are already
|
||||
included in the text.
|
||||
append_to_context: Whether this text should be appended to the LLM context.
|
||||
Defaults to True.
|
||||
"""
|
||||
|
||||
text: str
|
||||
@@ -918,9 +923,12 @@ class TTSSpeakFrame(DataFrame):
|
||||
|
||||
Parameters:
|
||||
text: The text to be spoken.
|
||||
append_to_context: Whether this text should be appended to the LLM context.
|
||||
Defaults to False.
|
||||
"""
|
||||
|
||||
text: str
|
||||
append_to_context: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -1024,8 +1024,10 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
|
||||
logger.debug(
|
||||
f"{self} FunctionCallCancelFrame: [{frame.function_name}:{frame.tool_call_id}]"
|
||||
)
|
||||
function_call = self._function_calls_in_progress.get(frame.tool_call_id)
|
||||
if function_call and function_call.cancel_on_interruption:
|
||||
if frame.tool_call_id not in self._function_calls_in_progress:
|
||||
return
|
||||
|
||||
if self._function_calls_in_progress[frame.tool_call_id].cancel_on_interruption:
|
||||
await self.handle_function_call_cancel(frame)
|
||||
del self._function_calls_in_progress[frame.tool_call_id]
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ from pipecat.frames.frames import (
|
||||
StartFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TranslationFrame,
|
||||
UserImageRawFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
@@ -62,8 +63,7 @@ from pipecat.processors.aggregators.llm_context import (
|
||||
NotGiven,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.turns.user_idle_controller import UserIdleController
|
||||
from pipecat.turns.user_mute import BaseUserMuteStrategy
|
||||
from pipecat.turns.mute import BaseUserMuteStrategy
|
||||
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
|
||||
from pipecat.turns.user_turn_controller import UserTurnController
|
||||
@@ -81,16 +81,11 @@ class LLMUserAggregatorParams:
|
||||
user_mute_strategies: List of user mute strategies.
|
||||
user_turn_stop_timeout: Time in seconds to wait before considering the
|
||||
user's turn finished.
|
||||
user_idle_timeout: Optional timeout in seconds for detecting user idle state.
|
||||
If set, the aggregator will emit an `on_user_turn_idle` event when the user
|
||||
has been idle (not speaking) for this duration. Set to None to disable
|
||||
idle detection.
|
||||
"""
|
||||
|
||||
user_turn_strategies: Optional[UserTurnStrategies] = None
|
||||
user_mute_strategies: List[BaseUserMuteStrategy] = field(default_factory=list)
|
||||
user_turn_stop_timeout: float = 5.0
|
||||
user_idle_timeout: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -297,14 +292,11 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
- on_user_turn_started: Called when the user turn starts
|
||||
- on_user_turn_stopped: Called when the user turn ends
|
||||
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
|
||||
- on_user_turn_idle: Called when the user has been idle for the configured timeout
|
||||
- on_user_mute_started: Called when the user becomes muted
|
||||
- on_user_mute_stopped: Called when the user becomes unmuted
|
||||
|
||||
Example::
|
||||
|
||||
@aggregator.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(aggregator, strategy: BaseUserTurnStartStrategy):
|
||||
async def on_user_turn_started(aggregator, strategy: BaseUserTurnStartStrategy]):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_turn_stopped")
|
||||
@@ -315,18 +307,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
async def on_user_turn_stop_timeout(aggregator):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(aggregator):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_mute_started")
|
||||
async def on_user_mute_started(aggregator):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_mute_stopped")
|
||||
async def on_user_mute_stopped(aggregator):
|
||||
...
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -349,9 +329,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._register_event_handler("on_user_turn_started")
|
||||
self._register_event_handler("on_user_turn_stopped")
|
||||
self._register_event_handler("on_user_turn_stop_timeout")
|
||||
self._register_event_handler("on_user_turn_idle")
|
||||
self._register_event_handler("on_user_mute_started")
|
||||
self._register_event_handler("on_user_mute_stopped")
|
||||
|
||||
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
|
||||
|
||||
@@ -374,16 +351,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
|
||||
)
|
||||
|
||||
# Optional user idle controller
|
||||
self._user_idle_controller: Optional[UserIdleController] = None
|
||||
if self._params.user_idle_timeout:
|
||||
self._user_idle_controller = UserIdleController(
|
||||
user_idle_timeout=self._params.user_idle_timeout
|
||||
)
|
||||
self._user_idle_controller.add_event_handler(
|
||||
"on_user_turn_idle", self._on_user_turn_idle
|
||||
)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up processor resources."""
|
||||
await super().cleanup()
|
||||
@@ -439,9 +406,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
await self._user_turn_controller.process_frame(frame)
|
||||
|
||||
if self._user_idle_controller:
|
||||
await self._user_idle_controller.process_frame(frame)
|
||||
|
||||
async def push_aggregation(self) -> str:
|
||||
"""Push the current aggregation."""
|
||||
if len(self._aggregation) == 0:
|
||||
@@ -457,9 +421,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
async def _start(self, frame: StartFrame):
|
||||
await self._user_turn_controller.setup(self.task_manager)
|
||||
|
||||
if self._user_idle_controller:
|
||||
await self._user_idle_controller.setup(self.task_manager)
|
||||
|
||||
for s in self._params.user_mute_strategies:
|
||||
await s.setup(self.task_manager)
|
||||
|
||||
@@ -472,9 +433,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
async def _cleanup(self):
|
||||
await self._user_turn_controller.cleanup()
|
||||
|
||||
if self._user_idle_controller:
|
||||
await self._user_idle_controller.cleanup()
|
||||
|
||||
for s in self._params.user_mute_strategies:
|
||||
await s.cleanup()
|
||||
|
||||
@@ -504,12 +462,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
logger.debug(f"{self}: user is now {'muted' if should_mute_next_time else 'unmuted'}")
|
||||
self._user_is_muted = should_mute_next_time
|
||||
|
||||
# Emit mute state change events
|
||||
if self._user_is_muted:
|
||||
await self._call_event_handler("on_user_mute_started")
|
||||
else:
|
||||
await self._call_event_handler("on_user_mute_stopped")
|
||||
|
||||
return should_mute_frame
|
||||
|
||||
async def _handle_llm_run(self, frame: LLMRunFrame):
|
||||
@@ -614,9 +566,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
async def _on_user_turn_stop_timeout(self, controller):
|
||||
await self._call_event_handler("on_user_turn_stop_timeout")
|
||||
|
||||
async def _on_user_turn_idle(self, controller):
|
||||
await self._call_event_handler("on_user_turn_idle")
|
||||
|
||||
|
||||
class LLMAssistantAggregator(LLMContextAggregator):
|
||||
"""Assistant LLM aggregator that processes bot responses and function calls.
|
||||
@@ -691,7 +640,6 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
self._started = 0
|
||||
self._function_calls_in_progress: Dict[str, Optional[FunctionCallInProgressFrame]] = {}
|
||||
self._function_calls_image_results: Dict[str, UserImageRawFrame] = {}
|
||||
self._context_updated_tasks: Set[asyncio.Task] = set()
|
||||
@@ -810,7 +758,6 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
|
||||
async def _handle_interruptions(self, frame: InterruptionFrame):
|
||||
await self._trigger_assistant_turn_stopped()
|
||||
self._started = 0
|
||||
await self.reset()
|
||||
|
||||
async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame):
|
||||
@@ -910,8 +857,10 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
logger.debug(
|
||||
f"{self} FunctionCallCancelFrame: [{frame.function_name}:{frame.tool_call_id}]"
|
||||
)
|
||||
function_call = self._function_calls_in_progress.get(frame.tool_call_id)
|
||||
if function_call and function_call.cancel_on_interruption:
|
||||
if frame.tool_call_id not in self._function_calls_in_progress:
|
||||
return
|
||||
|
||||
if self._function_calls_in_progress[frame.tool_call_id].cancel_on_interruption:
|
||||
# Update context with the function call cancellation
|
||||
self._update_function_call_result(frame.function_name, frame.tool_call_id, "CANCELLED")
|
||||
del self._function_calls_in_progress[frame.tool_call_id]
|
||||
@@ -954,15 +903,17 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
)
|
||||
|
||||
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
|
||||
self._started += 1
|
||||
await self._trigger_assistant_turn_started()
|
||||
|
||||
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
|
||||
self._started -= 1
|
||||
await self._trigger_assistant_turn_stopped()
|
||||
|
||||
async def _handle_text(self, frame: TextFrame):
|
||||
if not self._started or not frame.append_to_context:
|
||||
# Skip TextFrame types not intended to build the assistant context
|
||||
if isinstance(frame, (TranscriptionFrame, TranslationFrame, InterimTranscriptionFrame)):
|
||||
return
|
||||
|
||||
if not frame.append_to_context:
|
||||
return
|
||||
|
||||
# Make sure we really have text (spaces count, too!)
|
||||
@@ -976,18 +927,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
)
|
||||
|
||||
async def _handle_thought_start(self, frame: LLMThoughtStartFrame):
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
await self._reset_thought_aggregation()
|
||||
self._thought_append_to_context = frame.append_to_context
|
||||
self._thought_llm = frame.llm
|
||||
self._thought_start_time = time_now_iso8601()
|
||||
|
||||
async def _handle_thought_text(self, frame: LLMThoughtTextFrame):
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
# Make sure we really have text (spaces count, too!)
|
||||
if len(frame.text) == 0:
|
||||
return
|
||||
@@ -999,9 +944,6 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
)
|
||||
|
||||
async def _handle_thought_end(self, frame: LLMThoughtEndFrame):
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
thought = concatenate_aggregated_text(self._thought_aggregation)
|
||||
|
||||
if self._thought_append_to_context:
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import warnings
|
||||
from typing import Awaitable, Callable, Union
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
@@ -27,10 +26,6 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
class UserIdleProcessor(FrameProcessor):
|
||||
"""Monitors user inactivity and triggers callbacks after timeout periods.
|
||||
|
||||
.. deprecated::
|
||||
UserIdleProcessor is deprecated in 0.0.100 and will be removed in a future version.
|
||||
Use LLMUserAggregator with user_idle_timeout parameter instead.
|
||||
|
||||
This processor tracks user activity and triggers configurable callbacks when
|
||||
users become idle. It starts monitoring only after the first conversation
|
||||
activity and supports both basic and retry-based callback patterns.
|
||||
@@ -75,14 +70,6 @@ class UserIdleProcessor(FrameProcessor):
|
||||
**kwargs: Additional arguments passed to FrameProcessor.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
warnings.warn(
|
||||
"UserIdleProcessor is deprecated in 0.0.100 and will be removed in a "
|
||||
"future version. Use LLMUserAggregator with user_idle_timeout parameter "
|
||||
"instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
self._callback = self._wrap_callback(callback)
|
||||
self._timeout = timeout
|
||||
self._retry_count = 0
|
||||
|
||||
@@ -38,7 +38,6 @@ from pipecat.frames.frames import (
|
||||
LLMContextFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
@@ -1078,7 +1077,9 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
logger.debug(f"Assistant response text added: {text}")
|
||||
|
||||
# Report the text of the assistant response.
|
||||
await self._push_assistant_response_text_frames(text)
|
||||
frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(frame)
|
||||
|
||||
# HACK: here we're also buffering the assistant text ourselves as a
|
||||
# backup rather than relying solely on the assistant context aggregator
|
||||
@@ -1111,7 +1112,11 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
# TTSTextFrame would be ignored otherwise (the interruption frame
|
||||
# would have cleared the assistant aggregator state).
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self._push_assistant_response_text_frames(self._assistant_text_buffer)
|
||||
frame = TTSTextFrame(
|
||||
self._assistant_text_buffer, aggregated_by=AggregationType.SENTENCE
|
||||
)
|
||||
frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(frame)
|
||||
self._may_need_repush_assistant_text = False
|
||||
|
||||
# Report the end of the assistant response.
|
||||
@@ -1123,25 +1128,6 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
# Clear out the buffered assistant text
|
||||
self._assistant_text_buffer = ""
|
||||
|
||||
async def _push_assistant_response_text_frames(self, text: str):
|
||||
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
|
||||
# proceed beyond the TTS service. Therefore, since a speech-to-speech
|
||||
# service like Nova Sonic combines both LLM and TTS functionality, you
|
||||
# would think we wouldn't need to push LLMTextFrames at all. However,
|
||||
# RTVI relies on LLMTextFrames being pushed to trigger its
|
||||
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
|
||||
# appending it to context to avoid context message duplication.
|
||||
|
||||
# Push LLMTextFrame
|
||||
llm_text_frame = LLMTextFrame(text)
|
||||
llm_text_frame.append_to_context = False
|
||||
await self.push_frame(llm_text_frame)
|
||||
|
||||
# Push TTSTextFrame
|
||||
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
tts_text_frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(tts_text_frame)
|
||||
|
||||
#
|
||||
# user transcription reporting
|
||||
#
|
||||
|
||||
@@ -277,8 +277,6 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
|
||||
self._started = False
|
||||
self._first_chunk = True
|
||||
self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds
|
||||
self._last_word: Optional[str] = None # Track last word for punctuation merging
|
||||
self._last_timestamp: Optional[float] = None # Track last timestamp
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
@@ -348,34 +346,9 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
|
||||
await self.cancel_task(self._word_processor_task)
|
||||
self._word_processor_task = None
|
||||
|
||||
def _is_cjk_language(self) -> bool:
|
||||
"""Check if the configured language is CJK (Chinese, Japanese, Korean).
|
||||
|
||||
Returns:
|
||||
True if the language is CJK, False otherwise.
|
||||
"""
|
||||
language = self._settings.get("language", "").lower()
|
||||
# Check if language starts with CJK language codes
|
||||
return language.startswith(("zh", "ja", "ko", "cmn", "yue", "wuu"))
|
||||
|
||||
def _is_punctuation_only(self, text: str) -> bool:
|
||||
"""Check if text consists only of punctuation and whitespace.
|
||||
|
||||
Args:
|
||||
text: Text to check.
|
||||
|
||||
Returns:
|
||||
True if text is only punctuation/whitespace, False otherwise.
|
||||
"""
|
||||
return text and all(not c.isalnum() for c in text)
|
||||
|
||||
def _handle_word_boundary(self, evt):
|
||||
"""Handle word boundary events from Azure SDK.
|
||||
|
||||
Azure sends punctuation as separate word boundaries, and breaks CJK text
|
||||
into individual characters/particles. This method routes to language-specific
|
||||
handlers to properly merge and emit word boundaries.
|
||||
|
||||
Args:
|
||||
evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK
|
||||
containing word text and audio offset timing.
|
||||
@@ -389,75 +362,13 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
|
||||
# Add cumulative offset to get absolute timestamp across sentences
|
||||
absolute_seconds = self._cumulative_audio_offset + sentence_relative_seconds
|
||||
|
||||
if not word:
|
||||
return
|
||||
|
||||
# Route to language-specific handler
|
||||
if self._is_cjk_language():
|
||||
self._handle_cjk_word_boundary(word, absolute_seconds)
|
||||
else:
|
||||
self._handle_non_cjk_word_boundary(word, absolute_seconds)
|
||||
|
||||
def _emit_pending_word(self):
|
||||
"""Emit the currently buffered word if one exists."""
|
||||
if self._last_word is not None:
|
||||
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
|
||||
self._last_word = None
|
||||
self._last_timestamp = None
|
||||
|
||||
def _handle_cjk_word_boundary(self, word: str, timestamp: float):
|
||||
"""Handle word boundaries for CJK languages (Chinese, Japanese, Korean).
|
||||
|
||||
CJK languages don't use spaces between words, so we merge characters together
|
||||
and only emit at natural break points (punctuation or whitespace boundaries).
|
||||
Without this logic, we don't get word output for CJK languages.
|
||||
|
||||
Args:
|
||||
word: The word/character from Azure.
|
||||
timestamp: Timestamp in seconds.
|
||||
"""
|
||||
# First word: just store it
|
||||
if self._last_word is None:
|
||||
self._last_word = word
|
||||
self._last_timestamp = timestamp
|
||||
return
|
||||
|
||||
# Punctuation: merge and emit (natural break)
|
||||
if self._is_punctuation_only(word):
|
||||
self._last_word += word
|
||||
self._emit_pending_word()
|
||||
return
|
||||
|
||||
# Whitespace: emit before boundary, start new segment
|
||||
if word.strip() != word:
|
||||
self._emit_pending_word()
|
||||
self._last_word = word
|
||||
self._last_timestamp = timestamp
|
||||
return
|
||||
|
||||
# Default: continue merging CJK characters
|
||||
self._last_word += word
|
||||
|
||||
def _handle_non_cjk_word_boundary(self, word: str, timestamp: float):
|
||||
"""Handle word boundaries for non-CJK languages.
|
||||
|
||||
Non-CJK languages use spaces between words, so we emit each word separately
|
||||
after merging any trailing punctuation.
|
||||
|
||||
Args:
|
||||
word: The word from Azure.
|
||||
timestamp: Timestamp in seconds.
|
||||
"""
|
||||
# Punctuation: merge with previous word (don't emit yet)
|
||||
if self._is_punctuation_only(word) and self._last_word is not None:
|
||||
self._last_word += word
|
||||
return
|
||||
|
||||
# Regular word: emit previous, store current
|
||||
if self._last_word is not None:
|
||||
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
|
||||
self._last_word = word
|
||||
self._last_timestamp = timestamp
|
||||
# Queue word timestamp for async processing
|
||||
# Use thread-safe queue since this is called from Azure SDK thread
|
||||
if word:
|
||||
logger.trace(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s")
|
||||
# Put in temporary queue - will be processed by async task
|
||||
# Store as (word, timestamp_in_seconds) tuple
|
||||
self._word_boundary_queue.put_nowait((word, absolute_seconds))
|
||||
|
||||
async def _word_processor_task_handler(self):
|
||||
"""Process word timestamps from the queue and call add_word_timestamps."""
|
||||
@@ -486,12 +397,6 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
|
||||
Args:
|
||||
evt: Completion event from Azure Speech SDK.
|
||||
"""
|
||||
# Flush any pending word before completing
|
||||
if self._last_word is not None:
|
||||
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
|
||||
self._last_word = None
|
||||
self._last_timestamp = None
|
||||
|
||||
# Update cumulative audio offset for next sentence
|
||||
if evt.result and evt.result.audio_duration:
|
||||
self._cumulative_audio_offset += evt.result.audio_duration.total_seconds()
|
||||
@@ -530,8 +435,6 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService):
|
||||
self._started = False
|
||||
self._first_chunk = True
|
||||
self._cumulative_audio_offset = 0.0
|
||||
self._last_word = None
|
||||
self._last_timestamp = None
|
||||
|
||||
async def flush_audio(self):
|
||||
"""Flush any pending audio data."""
|
||||
|
||||
@@ -1710,26 +1710,11 @@ class GeminiLiveLLMService(LLMService):
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
await self._push_output_transcription_text_frames(text)
|
||||
frame = TTSTextFrame(text=text, aggregated_by=AggregationType.SENTENCE)
|
||||
# Gemini Live text already includes any necessary inter-chunk spaces
|
||||
frame.includes_inter_frame_spaces = True
|
||||
|
||||
async def _push_output_transcription_text_frames(self, text: str):
|
||||
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
|
||||
# proceed beyond the TTS service. Therefore, since a speech-to-speech
|
||||
# service like Gemini Live combines both LLM and TTS functionality, you
|
||||
# might think we wouldn't need to push LLMTextFrames at all. However,
|
||||
# RTVI relies on LLMTextFrames being pushed to trigger its
|
||||
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
|
||||
# appending it to context to avoid context message duplication.
|
||||
|
||||
# Push LLMTextFrame
|
||||
llm_text_frame = LLMTextFrame(text)
|
||||
llm_text_frame.append_to_context = False
|
||||
await self.push_frame(llm_text_frame)
|
||||
|
||||
# Push TTSTextFrame
|
||||
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
tts_text_frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(tts_text_frame)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_msg_grounding_metadata(self, message: LiveServerMessage):
|
||||
"""Handle dedicated grounding metadata messages."""
|
||||
|
||||
@@ -33,7 +33,6 @@ from pipecat.frames.frames import (
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
@@ -620,26 +619,9 @@ class GrokRealtimeLLMService(LLMService):
|
||||
async def _handle_evt_audio_transcript_delta(self, evt):
|
||||
"""Handle audio transcript delta event."""
|
||||
if evt.delta:
|
||||
await self._push_output_transcript_text_frames(evt.delta)
|
||||
|
||||
async def _push_output_transcript_text_frames(self, text: str):
|
||||
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
|
||||
# proceed beyond the TTS service. Therefore, since a speech-to-speech
|
||||
# service like Grok Realtime combines both LLM and TTS functionality,
|
||||
# you might think we wouldn't need to push LLMTextFrames at all.
|
||||
# However, RTVI relies on LLMTextFrames being pushed to trigger its
|
||||
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
|
||||
# appending it to context to avoid context message duplication.
|
||||
|
||||
# Push LLMTextFrame
|
||||
llm_text_frame = LLMTextFrame(text)
|
||||
llm_text_frame.append_to_context = False
|
||||
await self.push_frame(llm_text_frame)
|
||||
|
||||
# Push TTSTextFrame
|
||||
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
tts_text_frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(tts_text_frame)
|
||||
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
|
||||
frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_evt_function_call_arguments_done(self, evt):
|
||||
"""Handle function call arguments done event."""
|
||||
@@ -752,14 +734,6 @@ class GrokRealtimeLLMService(LLMService):
|
||||
|
||||
async def _send_user_audio(self, frame):
|
||||
"""Send user audio to Grok."""
|
||||
# Don't send audio if conversation setup is still pending, as it can
|
||||
# lead to errors. For example: audio sent before conversation setup
|
||||
# will be interpreted as having Grok's default sample rate (24000),
|
||||
# and if that differs from the sample rate we eventually set through
|
||||
# the conversation setup, Grok will error out.
|
||||
if self._llm_needs_conversation_setup:
|
||||
return
|
||||
|
||||
payload = base64.b64encode(frame.audio).decode("utf-8")
|
||||
await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload))
|
||||
|
||||
|
||||
@@ -1,160 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""[Hathora-hosted](https://models.hathora.dev) speech-to-text services."""
|
||||
|
||||
import base64
|
||||
import os
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.services.stt_service import SegmentedSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
from .utils import ConfigOption
|
||||
|
||||
|
||||
class HathoraSTTService(SegmentedSTTService):
|
||||
"""This service supports several different speech-to-text models hosted by Hathora.
|
||||
|
||||
[Documentation](https://models.hathora.dev)
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Optional input parameters for Hathora STT configuration.
|
||||
|
||||
Parameters:
|
||||
language: Language code (if supported by model).
|
||||
config: Some models support additional config, refer to
|
||||
[docs](https://models.hathora.dev) for each model to see
|
||||
what is supported.
|
||||
"""
|
||||
|
||||
language: Optional[str] = None
|
||||
config: Optional[list[ConfigOption]] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model: str,
|
||||
sample_rate: Optional[int] = None,
|
||||
api_key: Optional[str] = None,
|
||||
base_url: str = "https://api.models.hathora.dev/inference/v1/stt",
|
||||
params: Optional[InputParams] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the Hathora STT service.
|
||||
|
||||
Args:
|
||||
model: Model to use; find available models
|
||||
[here](https://models.hathora.dev).
|
||||
sample_rate: The sample rate for audio input. If None, will be determined
|
||||
from the start frame.
|
||||
api_key: API key for authentication with the Hathora service;
|
||||
provision one [here](https://models.hathora.dev/tokens).
|
||||
base_url: Base API URL for the Hathora STT service.
|
||||
params: Configuration parameters.
|
||||
**kwargs: Additional arguments passed to the parent class.
|
||||
"""
|
||||
super().__init__(
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
self._model = model
|
||||
self._api_key = api_key or os.getenv("HATHORA_API_KEY")
|
||||
self._base_url = base_url
|
||||
|
||||
params = params or HathoraSTTService.InputParams()
|
||||
|
||||
self._settings = {
|
||||
"language": params.language,
|
||||
"config": params.config,
|
||||
}
|
||||
|
||||
self.set_model_name(model)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True
|
||||
"""
|
||||
return True
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Run speech-to-text on the provided audio data.
|
||||
|
||||
Args:
|
||||
audio: Raw audio bytes to transcribe.
|
||||
|
||||
Yields:
|
||||
Frame: Frames containing transcription results (typically TextFrame).
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
url = f"{self._base_url}"
|
||||
|
||||
payload = {
|
||||
"model": self._model,
|
||||
}
|
||||
|
||||
if self._settings["language"] is not None:
|
||||
payload["language"] = self._settings["language"]
|
||||
if self._settings["config"] is not None:
|
||||
payload["model_config"] = [
|
||||
{"name": option.name, "value": option.value}
|
||||
for option in self._settings["config"]
|
||||
]
|
||||
|
||||
base64_audio = base64.b64encode(audio).decode("utf-8")
|
||||
payload["audio"] = base64_audio
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
url,
|
||||
headers={"Authorization": f"Bearer {self._api_key}"},
|
||||
json=payload,
|
||||
) as resp:
|
||||
response = await resp.json()
|
||||
|
||||
if response and "text" in response:
|
||||
text = response["text"].strip()
|
||||
if text: # Only yield non-empty text
|
||||
# Hathora's API currently doesn't return language info
|
||||
# so we default to the requested language or "en"
|
||||
response_language = self._settings["language"] or "en"
|
||||
await self._handle_transcription(text, True, response_language)
|
||||
yield TranscriptionFrame(
|
||||
text,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
Language(response_language),
|
||||
result=response,
|
||||
)
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
except Exception as e:
|
||||
yield ErrorFrame(error=f"Unknown error occurred: {e}")
|
||||
@@ -1,173 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""[Hathora-hosted](https://models.hathora.dev) text-to-speech services."""
|
||||
|
||||
import io
|
||||
import os
|
||||
import wave
|
||||
from typing import AsyncGenerator, Optional, Tuple
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.services.tts_service import TTSService
|
||||
from pipecat.utils.tracing.service_decorators import traced_tts
|
||||
|
||||
from .utils import ConfigOption
|
||||
|
||||
|
||||
def _decode_audio_payload(
|
||||
audio_bytes: bytes,
|
||||
*,
|
||||
fallback_sample_rate: int = 24000,
|
||||
fallback_channels: int = 1,
|
||||
) -> Tuple[bytes, int, int]:
|
||||
"""Convert a WAV/PCM payload into raw PCM samples for TTSAudioRawFrame."""
|
||||
try:
|
||||
with wave.open(io.BytesIO(audio_bytes), "rb") as wav_reader:
|
||||
channels = wav_reader.getnchannels()
|
||||
sample_rate = wav_reader.getframerate()
|
||||
frames = wav_reader.readframes(wav_reader.getnframes())
|
||||
return frames, sample_rate, channels
|
||||
except (wave.Error, EOFError):
|
||||
# If the payload is already raw PCM, just pass it through.
|
||||
return audio_bytes, fallback_sample_rate, fallback_channels
|
||||
|
||||
|
||||
class HathoraTTSService(TTSService):
|
||||
"""This service supports several different text-to-speech models hosted by Hathora.
|
||||
|
||||
[Documentation](https://models.hathora.dev)
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Optional input parameters for Hathora TTS configuration.
|
||||
|
||||
Parameters:
|
||||
speed: Speech speed multiplier (if supported by model).
|
||||
config: Some models support additional config, refer to
|
||||
[docs](https://models.hathora.dev) for each model to see
|
||||
what is supported.
|
||||
"""
|
||||
|
||||
speed: Optional[float] = None
|
||||
config: Optional[list[ConfigOption]] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model: str,
|
||||
voice_id: Optional[str] = None,
|
||||
sample_rate: Optional[int] = None,
|
||||
api_key: Optional[str] = None,
|
||||
base_url: str = "https://api.models.hathora.dev/inference/v1/tts",
|
||||
params: Optional[InputParams] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the Hathora TTS service.
|
||||
|
||||
Args:
|
||||
model: Model to use; find available models
|
||||
[here](https://models.hathora.dev).
|
||||
voice_id: Voice to use for synthesis (if supported by model).
|
||||
sample_rate: Output sample rate for generated audio.
|
||||
api_key: API key for authentication with the Hathora service;
|
||||
provision one [here](https://models.hathora.dev/tokens).
|
||||
base_url: Base API URL for the Hathora TTS service.
|
||||
params: Configuration parameters.
|
||||
**kwargs: Additional arguments passed to the parent class.
|
||||
"""
|
||||
super().__init__(
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
self._model = model
|
||||
self._api_key = api_key or os.getenv("HATHORA_API_KEY")
|
||||
self._base_url = base_url
|
||||
|
||||
params = params or HathoraTTSService.InputParams()
|
||||
|
||||
self._settings = {
|
||||
"speed": params.speed,
|
||||
"config": params.config,
|
||||
}
|
||||
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True
|
||||
"""
|
||||
return True
|
||||
|
||||
@traced_tts
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
"""Run text-to-speech synthesis on the provided text.
|
||||
|
||||
Args:
|
||||
text: The text to synthesize into speech.
|
||||
|
||||
Yields:
|
||||
Frame: Audio frames containing the synthesized speech.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
url = f"{self._base_url}"
|
||||
|
||||
payload = {"model": self._model, "text": text}
|
||||
|
||||
if self._voice_id is not None:
|
||||
payload["voice"] = self._voice_id
|
||||
if self._settings["speed"] is not None:
|
||||
payload["speed"] = self._settings["speed"]
|
||||
if self._settings["config"] is not None:
|
||||
payload["model_config"] = [
|
||||
{"name": option.name, "value": option.value}
|
||||
for option in self._settings["config"]
|
||||
]
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
url,
|
||||
headers={"Authorization": f"Bearer {self._api_key}"},
|
||||
json=payload,
|
||||
) as resp:
|
||||
audio_data = await resp.read()
|
||||
|
||||
pcm_audio, sample_rate, num_channels = _decode_audio_payload(
|
||||
audio_data,
|
||||
fallback_sample_rate=self.sample_rate,
|
||||
)
|
||||
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=pcm_audio,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=num_channels,
|
||||
)
|
||||
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
yield ErrorFrame(error=f"Unknown error occurred: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
@@ -1,22 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Utilities and types for [Hathora-hosted](https://models.hathora.dev) voice services."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfigOption:
|
||||
"""Extra configuration option passed into model_config for Hathora (if supported by model).
|
||||
|
||||
Args:
|
||||
name: Name of the configuration option.
|
||||
value: Value of the configuration option.
|
||||
"""
|
||||
|
||||
name: str
|
||||
value: str
|
||||
@@ -121,6 +121,7 @@ class Mem0MemoryService(FrameProcessor):
|
||||
try:
|
||||
logger.debug(f"Storing {len(messages)} messages in Mem0")
|
||||
params = {
|
||||
"async_mode": True,
|
||||
"messages": messages,
|
||||
"metadata": {"platform": "pipecat"},
|
||||
"output_format": "v1.1",
|
||||
|
||||
@@ -1003,6 +1003,7 @@ class TokenDetails(BaseModel):
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="allow")
|
||||
__pydantic_extra__: dict[str, Any]
|
||||
|
||||
cached_tokens: Optional[int] = 0
|
||||
text_tokens: Optional[int] = 0
|
||||
|
||||
@@ -724,26 +724,10 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
# We receive audio transcript deltas (as opposed to text deltas) when
|
||||
# the output modality is "audio" (the default)
|
||||
if evt.delta:
|
||||
await self._push_output_transcript_text_frames(evt.delta)
|
||||
|
||||
async def _push_output_transcript_text_frames(self, text: str):
|
||||
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
|
||||
# proceed beyond the TTS service. Therefore, since a speech-to-speech
|
||||
# service like OpenAI Realtime combines both LLM and TTS functionality,
|
||||
# you might think we wouldn't need to push LLMTextFrames at all.
|
||||
# However, RTVI relies on LLMTextFrames being pushed to trigger its
|
||||
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
|
||||
# appending it to context to avoid context message duplication.
|
||||
|
||||
# Push LLMTextFrame
|
||||
llm_text_frame = LLMTextFrame(text)
|
||||
llm_text_frame.append_to_context = False
|
||||
await self.push_frame(llm_text_frame)
|
||||
|
||||
# Push TTSTextFrame
|
||||
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
|
||||
tts_text_frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(tts_text_frame)
|
||||
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
|
||||
# OpenAI Realtime text already includes any necessary inter-chunk spaces
|
||||
frame.includes_inter_frame_spaces = True
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_evt_function_call_arguments_done(self, evt):
|
||||
"""Handle completion of function call arguments.
|
||||
|
||||
@@ -878,6 +878,7 @@ class TokenDetails(BaseModel):
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="allow")
|
||||
__pydantic_extra__: dict[str, Any]
|
||||
|
||||
cached_tokens: Optional[int] = 0
|
||||
text_tokens: Optional[int] = 0
|
||||
|
||||
@@ -207,6 +207,9 @@ class TTSService(AIService):
|
||||
|
||||
self._processing_text: bool = False
|
||||
|
||||
# Track append_to_context for the current TTS generation (used by WordTTSService subclasses)
|
||||
self._current_append_to_context: Optional[bool] = None
|
||||
|
||||
self._register_event_handler("on_connected")
|
||||
self._register_event_handler("on_disconnected")
|
||||
self._register_event_handler("on_connection_error")
|
||||
@@ -460,7 +463,10 @@ class TTSService(AIService):
|
||||
# Store if we were processing text or not so we can set it back.
|
||||
processing_text = self._processing_text
|
||||
# Assumption: text in TTSSpeakFrame does not include inter-frame spaces
|
||||
await self._push_tts_frames(AggregatedTextFrame(frame.text, AggregationType.SENTENCE))
|
||||
await self._push_tts_frames(
|
||||
AggregatedTextFrame(frame.text, AggregationType.SENTENCE),
|
||||
append_to_context=frame.append_to_context,
|
||||
)
|
||||
# We pause processing incoming frames because we are sending data to
|
||||
# the TTS. We pause to avoid audio overlapping.
|
||||
await self._maybe_pause_frame_processing()
|
||||
@@ -571,7 +577,10 @@ class TTSService(AIService):
|
||||
)
|
||||
|
||||
async def _push_tts_frames(
|
||||
self, src_frame: AggregatedTextFrame, includes_inter_frame_spaces: Optional[bool] = False
|
||||
self,
|
||||
src_frame: AggregatedTextFrame,
|
||||
includes_inter_frame_spaces: Optional[bool] = False,
|
||||
append_to_context: Optional[bool] = None,
|
||||
):
|
||||
type = src_frame.aggregated_by
|
||||
text = src_frame.text
|
||||
@@ -623,6 +632,10 @@ class TTSService(AIService):
|
||||
if aggregation_type == type or aggregation_type == "*":
|
||||
transformed_text = await transform(transformed_text, type)
|
||||
|
||||
# Store append_to_context for use by WordTTSService subclasses
|
||||
if append_to_context is not None:
|
||||
self._current_append_to_context = append_to_context
|
||||
|
||||
# Apply any final text preparation (e.g., trailing space)
|
||||
prepared_text = self._prepare_text_for_tts(transformed_text)
|
||||
await self.process_generator(self.run_tts(prepared_text))
|
||||
@@ -639,7 +652,15 @@ class TTSService(AIService):
|
||||
# or transformations.
|
||||
frame = TTSTextFrame(text, aggregated_by=type)
|
||||
frame.includes_inter_frame_spaces = includes_inter_frame_spaces
|
||||
# If append_to_context was explicitly specified (e.g., from TTSSpeakFrame),
|
||||
# use that value; otherwise, use TTSTextFrame's default (True).
|
||||
if self._current_append_to_context is not None:
|
||||
frame.append_to_context = self._current_append_to_context
|
||||
await self.push_frame(frame)
|
||||
# Reset after pushing the frame to avoid affecting subsequent TTS operations.
|
||||
# Note: WordTTSService subclasses don't use _push_text_frames so this reset
|
||||
# does not affect that class.
|
||||
self._current_append_to_context = None
|
||||
|
||||
async def _stop_frame_handler(self):
|
||||
has_started = False
|
||||
@@ -690,6 +711,7 @@ class WordTTSService(TTSService):
|
||||
async def reset_word_timestamps(self):
|
||||
"""Reset word timestamp tracking."""
|
||||
self._initial_word_timestamp = -1
|
||||
self._current_append_to_context = None
|
||||
|
||||
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
|
||||
"""Add word timestamps to the processing queue.
|
||||
@@ -783,6 +805,10 @@ class WordTTSService(TTSService):
|
||||
# we can rely on the default includes_inter_frame_spaces=False
|
||||
frame = TTSTextFrame(word, aggregated_by=AggregationType.WORD)
|
||||
frame.pts = self._initial_word_timestamp + timestamp
|
||||
# Apply the append_to_context setting from the original TTSSpeakFrame
|
||||
# if it was explicitly set; otherwise rely on TTSTextFrame's default (True).
|
||||
if self._current_append_to_context is not None:
|
||||
frame.append_to_context = self._current_append_to_context
|
||||
if frame:
|
||||
last_pts = frame.pts
|
||||
await self.push_frame(frame)
|
||||
|
||||
@@ -11,7 +11,6 @@ input processing, including VAD, turn analysis, and interruption management.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
@@ -78,11 +77,6 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
# Track user speaking state for interruption logic
|
||||
self._user_speaking = False
|
||||
# Last time a UserSpeakingFrame was pushed.
|
||||
self._user_speaking_frame_time = 0
|
||||
# How often a UserSpeakingFrame should be pushed (value should be
|
||||
# greater than the audio chunks to have any effect).
|
||||
self._user_speaking_frame_period = 0.2
|
||||
|
||||
# Task to process incoming audio (VAD) and push audio frames downstream
|
||||
# if passthrough is enabled.
|
||||
@@ -429,7 +423,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self._deprecated_run_turn_analyzer(frame, vad_state, previous_vad_state)
|
||||
|
||||
if vad_state == VADState.SPEAKING:
|
||||
await self._user_currently_speaking()
|
||||
await self.broadcast_frame(UserSpeakingFrame)
|
||||
|
||||
# Push audio downstream if passthrough is set.
|
||||
if self._params.audio_in_passthrough:
|
||||
@@ -450,13 +444,6 @@ class BaseInputTransport(FrameProcessor):
|
||||
else:
|
||||
await self.push_frame(VADUserStoppedSpeakingFrame())
|
||||
|
||||
async def _user_currently_speaking(self):
|
||||
"""Handle user speaking frame."""
|
||||
diff_time = time.time() - self._user_speaking_frame_time
|
||||
if diff_time >= self._user_speaking_frame_period:
|
||||
await self.broadcast_frame(UserSpeakingFrame)
|
||||
self._user_speaking_frame_time = time.time()
|
||||
|
||||
#
|
||||
# DEPRECATED.
|
||||
#
|
||||
|
||||
@@ -403,7 +403,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Last time a BotSpeakingFrame was pushed.
|
||||
self._bot_speaking_frame_time = 0
|
||||
# How often a BotSpeakingFrame should be pushed (value should be
|
||||
# greater than the audio chunks to have any effect).
|
||||
# lower than the audio chunks).
|
||||
self._bot_speaking_frame_period = 0.2
|
||||
# Last time the bot actually spoke.
|
||||
self._bot_speech_last_time = 0
|
||||
@@ -644,7 +644,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
diff_time = time.time() - self._bot_speaking_frame_time
|
||||
if diff_time >= self._bot_speaking_frame_period:
|
||||
await self._transport.broadcast_frame(BotSpeakingFrame)
|
||||
await self._transport.push_frame(BotSpeakingFrame())
|
||||
await self._transport.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking_frame_time = time.time()
|
||||
|
||||
self._bot_speech_last_time = time.time()
|
||||
|
||||
@@ -102,6 +102,9 @@ class DailyRoomProperties(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="allow")
|
||||
|
||||
# Pydantic v2.12+ requires explicit annotation for extra fields
|
||||
__pydantic_extra__: dict[str, Any]
|
||||
|
||||
exp: Optional[float] = None
|
||||
enable_chat: bool = False
|
||||
enable_prejoin_ui: bool = False
|
||||
|
||||
@@ -50,7 +50,6 @@ class WebsocketClientParams(TransportParams):
|
||||
"""
|
||||
|
||||
add_wav_header: bool = True
|
||||
additional_headers: Optional[dict[str, str]] = None
|
||||
serializer: Optional[FrameSerializer] = None
|
||||
|
||||
|
||||
@@ -131,11 +130,7 @@ class WebsocketClientSession:
|
||||
return
|
||||
|
||||
try:
|
||||
self._websocket = await websocket_connect(
|
||||
uri=self._uri,
|
||||
open_timeout=10,
|
||||
additional_headers=self._params.additional_headers,
|
||||
)
|
||||
self._websocket = await websocket_connect(uri=self._uri, open_timeout=10)
|
||||
self._client_task = self.task_manager.create_task(
|
||||
self._client_task_handler(),
|
||||
f"{self._transport_name}::WebsocketClientSession::_client_task_handler",
|
||||
|
||||
@@ -4,21 +4,10 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import warnings
|
||||
|
||||
from pipecat.turns.user_mute.always_user_mute_strategy import AlwaysUserMuteStrategy
|
||||
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from pipecat.turns.user_mute.first_speech_user_mute_strategy import FirstSpeechUserMuteStrategy
|
||||
from pipecat.turns.user_mute.function_call_user_mute_strategy import FunctionCallUserMuteStrategy
|
||||
from pipecat.turns.user_mute.mute_until_first_bot_complete_user_mute_strategy import (
|
||||
from pipecat.turns.mute.always_user_mute_strategy import AlwaysUserMuteStrategy
|
||||
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from pipecat.turns.mute.first_speech_user_mute_strategy import FirstSpeechUserMuteStrategy
|
||||
from pipecat.turns.mute.function_call_user_mute_strategy import FunctionCallUserMuteStrategy
|
||||
from pipecat.turns.mute.mute_until_first_bot_complete_user_mute_strategy import (
|
||||
MuteUntilFirstBotCompleteUserMuteStrategy,
|
||||
)
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.turns.mute are deprecated. "
|
||||
"Please use the equivalent types from pipecat.turns.user_mute instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
"""User mute strategy that always mutes the user while the bot is speaking."""
|
||||
|
||||
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
|
||||
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
|
||||
|
||||
class AlwaysUserMuteStrategy(BaseUserMuteStrategy):
|
||||
@@ -7,7 +7,7 @@
|
||||
"""User mute strategy that mutes the user only during the bot’s first speech."""
|
||||
|
||||
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
|
||||
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
|
||||
|
||||
class FirstSpeechUserMuteStrategy(BaseUserMuteStrategy):
|
||||
@@ -14,7 +14,7 @@ from pipecat.frames.frames import (
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallsStartedFrame,
|
||||
)
|
||||
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
|
||||
|
||||
class FunctionCallUserMuteStrategy(BaseUserMuteStrategy):
|
||||
@@ -7,7 +7,7 @@
|
||||
"""User mute strategy that mutes the user until the bot completes its first speech."""
|
||||
|
||||
from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame
|
||||
from pipecat.turns.user_mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
|
||||
|
||||
|
||||
class MuteUntilFirstBotCompleteUserMuteStrategy(BaseUserMuteStrategy):
|
||||
@@ -1,173 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""This module defines a controller for managing user idle detection."""
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallsStartedFrame,
|
||||
UserSpeakingFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
)
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
|
||||
class UserIdleController(BaseObject):
|
||||
"""Controller for managing user idle detection.
|
||||
|
||||
This class monitors user activity and triggers an event when the user has been
|
||||
idle (not speaking) for a configured timeout period. It only starts monitoring
|
||||
after the first conversation activity and does not trigger while the bot is
|
||||
speaking or function calls are in progress.
|
||||
|
||||
The controller tracks activity using continuous frames (UserSpeakingFrame and
|
||||
BotSpeakingFrame) which are emitted repeatedly while speaking is happening, and
|
||||
state-based tracking for function calls (FunctionCallsStartedFrame and
|
||||
FunctionCallResultFrame) which are only sent at start and end.
|
||||
|
||||
Event handlers available:
|
||||
|
||||
- on_user_turn_idle: Emitted when the user has been idle for the timeout period.
|
||||
|
||||
Example::
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
# Handle user idle - send reminder, prompt, etc.
|
||||
...
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
user_idle_timeout: float,
|
||||
):
|
||||
"""Initialize the user idle controller.
|
||||
|
||||
Args:
|
||||
user_idle_timeout: Timeout in seconds before considering the user idle.
|
||||
"""
|
||||
super().__init__()
|
||||
|
||||
self._user_idle_timeout = user_idle_timeout
|
||||
|
||||
self._task_manager: Optional[BaseTaskManager] = None
|
||||
|
||||
self._conversation_started = False
|
||||
self._function_call_in_progress = False
|
||||
|
||||
self.user_idle_event = asyncio.Event()
|
||||
self.user_idle_task: Optional[asyncio.Task] = None
|
||||
|
||||
self._register_event_handler("on_user_turn_idle", sync=True)
|
||||
|
||||
@property
|
||||
def task_manager(self) -> BaseTaskManager:
|
||||
"""Returns the configured task manager."""
|
||||
if not self._task_manager:
|
||||
raise RuntimeError(f"{self} user idle controller was not properly setup")
|
||||
return self._task_manager
|
||||
|
||||
async def setup(self, task_manager: BaseTaskManager):
|
||||
"""Initialize the controller with the given task manager.
|
||||
|
||||
Args:
|
||||
task_manager: The task manager to be associated with this instance.
|
||||
"""
|
||||
self._task_manager = task_manager
|
||||
|
||||
if not self.user_idle_task:
|
||||
self.user_idle_task = self.task_manager.create_task(
|
||||
self.user_idle_task_handler(),
|
||||
f"{self}::user_idle_task_handler",
|
||||
)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup the controller."""
|
||||
await super().cleanup()
|
||||
|
||||
if self.user_idle_task:
|
||||
await self.task_manager.cancel_task(self.user_idle_task)
|
||||
self.user_idle_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
"""Process an incoming frame to track user activity state.
|
||||
|
||||
Args:
|
||||
frame: The frame to be processed.
|
||||
"""
|
||||
# Start monitoring on first conversation activity
|
||||
if not self._conversation_started:
|
||||
if isinstance(frame, (UserStartedSpeakingFrame, BotSpeakingFrame)):
|
||||
self._conversation_started = True
|
||||
self.user_idle_event.set()
|
||||
else:
|
||||
return
|
||||
|
||||
# Reset idle timer on continuous activity frames
|
||||
if isinstance(frame, (UserSpeakingFrame, BotSpeakingFrame)):
|
||||
await self._handle_activity(frame)
|
||||
# Track function call state (start/end frames, not continuous)
|
||||
elif isinstance(frame, FunctionCallsStartedFrame):
|
||||
await self._handle_function_calls_started(frame)
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
await self._handle_function_call_result(frame)
|
||||
|
||||
async def _handle_activity(self, _: UserSpeakingFrame | BotSpeakingFrame):
|
||||
"""Handle continuous activity frames that should reset the idle timer.
|
||||
|
||||
These frames are emitted continuously while the user or bot is speaking,
|
||||
so we simply reset the timer whenever we receive them.
|
||||
|
||||
Args:
|
||||
frame: The activity frame to process.
|
||||
"""
|
||||
self.user_idle_event.set()
|
||||
|
||||
async def _handle_function_calls_started(self, _: FunctionCallsStartedFrame):
|
||||
"""Handle function calls started event.
|
||||
|
||||
Function calls can take longer than the timeout, so we track their state
|
||||
to prevent idle callbacks while they're in progress.
|
||||
|
||||
Args:
|
||||
frame: The FunctionCallsStartedFrame to process.
|
||||
"""
|
||||
self._function_call_in_progress = True
|
||||
self.user_idle_event.set()
|
||||
|
||||
async def _handle_function_call_result(self, _: FunctionCallResultFrame):
|
||||
"""Handle function call result event.
|
||||
|
||||
Args:
|
||||
frame: The FunctionCallResultFrame to process.
|
||||
"""
|
||||
self._function_call_in_progress = False
|
||||
self.user_idle_event.set()
|
||||
|
||||
async def user_idle_task_handler(self):
|
||||
"""Monitors for idle timeout and triggers events.
|
||||
|
||||
Runs in a loop until cancelled. The idle timer is reset whenever activity
|
||||
frames are received (UserSpeakingFrame or BotSpeakingFrame). Function calls
|
||||
are tracked via state since they only send start/end frames. If no activity
|
||||
is detected for the configured timeout period and no function call is in
|
||||
progress, the on_user_turn_idle event is triggered.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self.user_idle_event.wait(), timeout=self._user_idle_timeout)
|
||||
self.user_idle_event.clear()
|
||||
except asyncio.TimeoutError:
|
||||
# Only trigger if conversation has started and no function call is in progress
|
||||
if self._conversation_started and not self._function_call_in_progress:
|
||||
await self._call_event_handler("on_user_turn_idle")
|
||||
@@ -1,21 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from .always_user_mute_strategy import AlwaysUserMuteStrategy
|
||||
from .base_user_mute_strategy import BaseUserMuteStrategy
|
||||
from .first_speech_user_mute_strategy import FirstSpeechUserMuteStrategy
|
||||
from .function_call_user_mute_strategy import FunctionCallUserMuteStrategy
|
||||
from .mute_until_first_bot_complete_user_mute_strategy import (
|
||||
MuteUntilFirstBotCompleteUserMuteStrategy,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"AlwaysUserMuteStrategy",
|
||||
"BaseUserMuteStrategy",
|
||||
"FirstSpeechUserMuteStrategy",
|
||||
"FunctionCallUserMuteStrategy",
|
||||
"MuteUntilFirstBotCompleteUserMuteStrategy",
|
||||
]
|
||||
@@ -4,17 +4,15 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from .base_user_turn_start_strategy import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
from .external_user_turn_start_strategy import ExternalUserTurnStartStrategy
|
||||
from .min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy
|
||||
from .transcription_user_turn_start_strategy import TranscriptionUserTurnStartStrategy
|
||||
from .vad_user_turn_start_strategy import VADUserTurnStartStrategy
|
||||
|
||||
__all__ = [
|
||||
"BaseUserTurnStartStrategy",
|
||||
"ExternalUserTurnStartStrategy",
|
||||
"MinWordsUserTurnStartStrategy",
|
||||
"TranscriptionUserTurnStartStrategy",
|
||||
"UserTurnStartedParams",
|
||||
"VADUserTurnStartStrategy",
|
||||
]
|
||||
from pipecat.turns.user_start.base_user_turn_start_strategy import (
|
||||
BaseUserTurnStartStrategy,
|
||||
UserTurnStartedParams,
|
||||
)
|
||||
from pipecat.turns.user_start.external_user_turn_start_strategy import ExternalUserTurnStartStrategy
|
||||
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
|
||||
MinWordsUserTurnStartStrategy,
|
||||
)
|
||||
from pipecat.turns.user_start.transcription_user_turn_start_strategy import (
|
||||
TranscriptionUserTurnStartStrategy,
|
||||
)
|
||||
from pipecat.turns.user_start.vad_user_turn_start_strategy import VADUserTurnStartStrategy
|
||||
|
||||
@@ -41,10 +41,12 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
self._min_words = min_words
|
||||
self._use_interim = use_interim
|
||||
self._bot_speaking = False
|
||||
self._text = ""
|
||||
|
||||
async def reset(self):
|
||||
"""Reset the strategy to its initial state."""
|
||||
await super().reset()
|
||||
self._text = ""
|
||||
self._bot_speaking = False
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
@@ -65,7 +67,7 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._handle_transcription(frame)
|
||||
elif isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
|
||||
await self._handle_transcription(frame)
|
||||
await self._handle_interim_transcription(frame)
|
||||
|
||||
async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame):
|
||||
"""Handle bot started speaking frame.
|
||||
@@ -87,21 +89,41 @@ class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
"""
|
||||
self._bot_speaking = False
|
||||
|
||||
async def _handle_transcription(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
|
||||
async def _handle_transcription(self, frame: TranscriptionFrame):
|
||||
"""Handle a completed transcription frame and check word count.
|
||||
|
||||
Args:
|
||||
frame: The transcription frame to be processed.
|
||||
"""
|
||||
self._text += frame.text
|
||||
|
||||
min_words = self._min_words if self._bot_speaking else 1
|
||||
|
||||
word_count = len(frame.text.split())
|
||||
word_count = len(self._text.split())
|
||||
should_trigger = word_count >= min_words
|
||||
is_interim = isinstance(frame, InterimTranscriptionFrame)
|
||||
|
||||
logger.debug(
|
||||
f"{self} should_trigger={should_trigger} num_spoken_words={word_count} "
|
||||
f"min_words={min_words} bot_speaking={self._bot_speaking} interim_transcription={is_interim}"
|
||||
f"min_words={min_words} bot_speaking={self._bot_speaking}"
|
||||
)
|
||||
|
||||
if should_trigger:
|
||||
await self.trigger_user_turn_started()
|
||||
|
||||
async def _handle_interim_transcription(self, frame: InterimTranscriptionFrame):
|
||||
"""Handle an interim transcription frame and check word count.
|
||||
|
||||
Args:
|
||||
frame: The interim transcription frame to be processed.
|
||||
"""
|
||||
min_words = self._min_words if self._bot_speaking else 1
|
||||
|
||||
word_count = len(frame.text.split())
|
||||
should_trigger = word_count >= min_words
|
||||
|
||||
logger.debug(
|
||||
f"{self} interim=True should_trigger={should_trigger} num_spoken_words={word_count} "
|
||||
f"min_words={min_words} bot_speaking={self._bot_speaking}"
|
||||
)
|
||||
|
||||
if should_trigger:
|
||||
|
||||
@@ -4,15 +4,14 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams
|
||||
from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
|
||||
from .transcription_user_turn_stop_strategy import TranscriptionUserTurnStopStrategy
|
||||
from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy
|
||||
|
||||
__all__ = [
|
||||
"BaseUserTurnStopStrategy",
|
||||
"ExternalUserTurnStopStrategy",
|
||||
"UserTurnStoppedParams",
|
||||
"TranscriptionUserTurnStopStrategy",
|
||||
"TurnAnalyzerUserTurnStopStrategy",
|
||||
]
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import (
|
||||
BaseUserTurnStopStrategy,
|
||||
UserTurnStoppedParams,
|
||||
)
|
||||
from pipecat.turns.user_stop.external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
|
||||
from pipecat.turns.user_stop.transcription_user_turn_stop_strategy import (
|
||||
TranscriptionUserTurnStopStrategy,
|
||||
)
|
||||
from pipecat.turns.user_stop.turn_analyzer_user_turn_stop_strategy import (
|
||||
TurnAnalyzerUserTurnStopStrategy,
|
||||
)
|
||||
|
||||
@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.turns.user_idle_controller import UserIdleController
|
||||
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
|
||||
from pipecat.turns.user_turn_controller import UserTurnController
|
||||
@@ -39,7 +38,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
- on_user_turn_started: Emitted when a user turn starts.
|
||||
- on_user_turn_stopped: Emitted when a user turn stops.
|
||||
- on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout.
|
||||
- on_user_turn_idle: Emitted when the user has been idle for the configured timeout.
|
||||
|
||||
Example::
|
||||
|
||||
@@ -55,10 +53,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
async def on_user_turn_stop_timeout(processor):
|
||||
...
|
||||
|
||||
@processor.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(processor):
|
||||
...
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -66,7 +60,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
*,
|
||||
user_turn_strategies: Optional[UserTurnStrategies] = None,
|
||||
user_turn_stop_timeout: float = 5.0,
|
||||
user_idle_timeout: Optional[float] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the user turn processor.
|
||||
@@ -75,10 +68,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
user_turn_strategies: Configured strategies for starting and stopping user turns.
|
||||
user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn
|
||||
if no activity is detected.
|
||||
user_idle_timeout: Optional timeout in seconds for detecting user idle state.
|
||||
If set, the processor will emit an `on_user_turn_idle` event when the user
|
||||
has been idle (not speaking) for this duration. Set to None to disable
|
||||
idle detection.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
@@ -86,7 +75,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
self._register_event_handler("on_user_turn_started")
|
||||
self._register_event_handler("on_user_turn_stopped")
|
||||
self._register_event_handler("on_user_turn_stop_timeout")
|
||||
self._register_event_handler("on_user_turn_idle")
|
||||
|
||||
self._user_turn_controller = UserTurnController(
|
||||
user_turn_strategies=user_turn_strategies or UserTurnStrategies(),
|
||||
@@ -104,14 +92,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
|
||||
)
|
||||
|
||||
# Optional user idle controller
|
||||
self._user_idle_controller: Optional[UserIdleController] = None
|
||||
if user_idle_timeout:
|
||||
self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout)
|
||||
self._user_idle_controller.add_event_handler(
|
||||
"on_user_turn_idle", self._on_user_turn_idle
|
||||
)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up processor resources."""
|
||||
await super().cleanup()
|
||||
@@ -149,15 +129,9 @@ class UserTurnProcessor(FrameProcessor):
|
||||
|
||||
await self._user_turn_controller.process_frame(frame)
|
||||
|
||||
if self._user_idle_controller:
|
||||
await self._user_idle_controller.process_frame(frame)
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
await self._user_turn_controller.setup(self.task_manager)
|
||||
|
||||
if self._user_idle_controller:
|
||||
await self._user_idle_controller.setup(self.task_manager)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
await self._cleanup()
|
||||
|
||||
@@ -167,9 +141,6 @@ class UserTurnProcessor(FrameProcessor):
|
||||
async def _cleanup(self):
|
||||
await self._user_turn_controller.cleanup()
|
||||
|
||||
if self._user_idle_controller:
|
||||
await self._user_idle_controller.cleanup()
|
||||
|
||||
async def _on_push_frame(
|
||||
self, controller, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
|
||||
):
|
||||
@@ -209,6 +180,3 @@ class UserTurnProcessor(FrameProcessor):
|
||||
|
||||
async def _on_user_turn_stop_timeout(self, controller):
|
||||
await self._call_event_handler("on_user_turn_stop_timeout")
|
||||
|
||||
async def _on_user_turn_idle(self, controller):
|
||||
await self._call_event_handler("on_user_turn_idle")
|
||||
|
||||
@@ -16,15 +16,12 @@ import inspect
|
||||
import traceback
|
||||
from abc import ABC
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
# TypeVar for preserving function signatures in decorators
|
||||
F = TypeVar("F", bound=Callable[..., Any])
|
||||
|
||||
|
||||
@dataclass
|
||||
class EventHandler:
|
||||
@@ -102,7 +99,7 @@ class BaseObject(ABC):
|
||||
logger.debug(f"{self}: waiting on event handlers to finish {list(event_names)}...")
|
||||
await asyncio.wait(tasks)
|
||||
|
||||
def event_handler(self, event_name: str) -> Callable[[F], F]:
|
||||
def event_handler(self, event_name: str):
|
||||
"""Decorator for registering event handlers.
|
||||
|
||||
Args:
|
||||
@@ -112,7 +109,7 @@ class BaseObject(ABC):
|
||||
The decorator function that registers the handler.
|
||||
"""
|
||||
|
||||
def decorator(handler: F) -> F:
|
||||
def decorator(handler):
|
||||
self.add_event_handler(event_name, handler)
|
||||
return handler
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.tests.utils import SleepFrame, run_test
|
||||
from pipecat.turns.user_mute import FirstSpeechUserMuteStrategy, FunctionCallUserMuteStrategy
|
||||
from pipecat.turns.mute import FirstSpeechUserMuteStrategy, FunctionCallUserMuteStrategy
|
||||
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
|
||||
from pipecat.turns.user_turn_strategies import UserTurnStrategies
|
||||
|
||||
|
||||
@@ -1,216 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallsStartedFrame,
|
||||
UserSpeakingFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.user_idle_controller import UserIdleController
|
||||
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
|
||||
|
||||
USER_IDLE_TIMEOUT = 0.2
|
||||
|
||||
|
||||
class TestUserIdleController(unittest.IsolatedAsyncioTestCase):
|
||||
async def asyncSetUp(self):
|
||||
self.task_manager = TaskManager()
|
||||
self.task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
|
||||
|
||||
async def test_basic_idle_detection(self):
|
||||
"""Test that idle event is triggered after timeout when no activity."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_triggered = False
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_triggered
|
||||
idle_triggered = True
|
||||
|
||||
# Start conversation
|
||||
await controller.process_frame(UserStartedSpeakingFrame())
|
||||
|
||||
# Wait for idle timeout
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
|
||||
self.assertTrue(idle_triggered)
|
||||
|
||||
await controller.cleanup()
|
||||
|
||||
async def test_user_speaking_resets_idle_timer(self):
|
||||
"""Test that continuous UserSpeakingFrame frames reset the idle timer."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_triggered = False
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_triggered
|
||||
idle_triggered = True
|
||||
|
||||
# Start conversation
|
||||
await controller.process_frame(UserStartedSpeakingFrame())
|
||||
|
||||
# Send UserSpeakingFrame continuously to reset timer
|
||||
for _ in range(5):
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT * 0.5) # 50% of timeout period
|
||||
await controller.process_frame(UserSpeakingFrame())
|
||||
|
||||
self.assertFalse(idle_triggered)
|
||||
|
||||
await controller.cleanup()
|
||||
|
||||
async def test_bot_speaking_resets_idle_timer(self):
|
||||
"""Test that BotSpeakingFrame frames reset the idle timer."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_triggered = False
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_triggered
|
||||
idle_triggered = True
|
||||
|
||||
# Start conversation
|
||||
await controller.process_frame(UserStartedSpeakingFrame())
|
||||
|
||||
# Bot speaking should reset timer
|
||||
for _ in range(5):
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT * 0.6) # 60% of timeout
|
||||
await controller.process_frame(BotSpeakingFrame())
|
||||
|
||||
self.assertFalse(idle_triggered)
|
||||
|
||||
await controller.cleanup()
|
||||
|
||||
async def test_function_call_prevents_idle(self):
|
||||
"""Test that function calls in progress prevent idle event."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_triggered = False
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_triggered
|
||||
idle_triggered = True
|
||||
|
||||
# Start conversation
|
||||
await controller.process_frame(UserStartedSpeakingFrame())
|
||||
|
||||
# Start function call
|
||||
await controller.process_frame(FunctionCallsStartedFrame(function_calls=[]))
|
||||
|
||||
# Wait longer than idle timeout
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
|
||||
# Should not trigger idle because function call is in progress
|
||||
self.assertFalse(idle_triggered)
|
||||
|
||||
# Complete function call
|
||||
await controller.process_frame(
|
||||
FunctionCallResultFrame(
|
||||
function_name="test",
|
||||
tool_call_id="123",
|
||||
arguments={},
|
||||
result=None,
|
||||
run_llm=False,
|
||||
)
|
||||
)
|
||||
|
||||
# Now idle should trigger
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
self.assertTrue(idle_triggered)
|
||||
|
||||
await controller.cleanup()
|
||||
|
||||
async def test_no_idle_before_conversation_starts(self):
|
||||
"""Test that idle monitoring doesn't start before first conversation activity."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_triggered = False
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_triggered
|
||||
idle_triggered = True
|
||||
|
||||
# Wait without starting conversation
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
|
||||
self.assertFalse(idle_triggered)
|
||||
|
||||
await controller.cleanup()
|
||||
|
||||
async def test_idle_starts_with_bot_speaking(self):
|
||||
"""Test that monitoring starts with BotSpeakingFrame, not just user speech."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_triggered = False
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_triggered
|
||||
idle_triggered = True
|
||||
|
||||
# Start conversation with bot speaking
|
||||
await controller.process_frame(BotSpeakingFrame())
|
||||
|
||||
# Wait for idle timeout
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
|
||||
self.assertTrue(idle_triggered)
|
||||
|
||||
await controller.cleanup()
|
||||
|
||||
async def test_multiple_idle_events(self):
|
||||
"""Test that idle event can trigger multiple times."""
|
||||
controller = UserIdleController(user_idle_timeout=USER_IDLE_TIMEOUT)
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
idle_count = 0
|
||||
|
||||
@controller.event_handler("on_user_turn_idle")
|
||||
async def on_user_turn_idle(controller):
|
||||
nonlocal idle_count
|
||||
idle_count += 1
|
||||
|
||||
# Start conversation
|
||||
await controller.process_frame(UserStartedSpeakingFrame())
|
||||
|
||||
# First idle
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
first_count = idle_count
|
||||
self.assertGreaterEqual(first_count, 1)
|
||||
|
||||
# Second idle
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
second_count = idle_count
|
||||
self.assertGreater(second_count, first_count)
|
||||
|
||||
# User activity resets timer
|
||||
await controller.process_frame(UserSpeakingFrame())
|
||||
|
||||
# Give a moment for the timer to reset
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Third idle
|
||||
await asyncio.sleep(USER_IDLE_TIMEOUT + 0.1)
|
||||
third_count = idle_count
|
||||
self.assertGreater(third_count, second_count)
|
||||
|
||||
await controller.cleanup()
|
||||
@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
|
||||
FunctionCallsStartedFrame,
|
||||
InterruptionFrame,
|
||||
)
|
||||
from pipecat.turns.user_mute import (
|
||||
from pipecat.turns.mute import (
|
||||
AlwaysUserMuteStrategy,
|
||||
FirstSpeechUserMuteStrategy,
|
||||
FunctionCallUserMuteStrategy,
|
||||
|
||||
@@ -84,7 +84,7 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
|
||||
self.assertEqual(should_start, 0)
|
||||
|
||||
await controller.process_frame(
|
||||
TranscriptionFrame(text="One two three!", user_id="cat", timestamp="")
|
||||
TranscriptionFrame(text=" two three!", user_id="cat", timestamp="")
|
||||
)
|
||||
self.assertEqual(should_start, 1)
|
||||
|
||||
@@ -92,11 +92,13 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
|
||||
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
|
||||
|
||||
await controller.process_frame(BotStartedSpeakingFrame())
|
||||
await controller.process_frame(TranscriptionFrame(text="Hi!", user_id="cat", timestamp=""))
|
||||
await controller.process_frame(
|
||||
TranscriptionFrame(text="Hello", user_id="cat", timestamp="")
|
||||
)
|
||||
self.assertEqual(should_start, 1)
|
||||
|
||||
await controller.process_frame(
|
||||
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
|
||||
TranscriptionFrame(text=" there friend!", user_id="cat", timestamp="")
|
||||
)
|
||||
self.assertEqual(should_start, 2)
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="Hello there!", user_id="cat", timestamp="")
|
||||
TranscriptionFrame(text=" there!", user_id="cat", timestamp="")
|
||||
)
|
||||
self.assertTrue(should_start)
|
||||
|
||||
@@ -55,26 +55,6 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
)
|
||||
self.assertTrue(should_start)
|
||||
|
||||
async def test_bot_speaking_singlw_words(self):
|
||||
strategy = MinWordsUserTurnStartStrategy(min_words=3)
|
||||
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy, params):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
await strategy.process_frame(BotStartedSpeakingFrame())
|
||||
await strategy.process_frame(TranscriptionFrame(text="One", user_id="cat", timestamp=""))
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(TranscriptionFrame(text="Two", user_id="cat", timestamp=""))
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(TranscriptionFrame(text="Three", user_id="cat", timestamp=""))
|
||||
self.assertFalse(should_start)
|
||||
|
||||
async def test_bot_speaking_interim_transcriptions(self):
|
||||
strategy = MinWordsUserTurnStartStrategy(min_words=2)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user