Compare commits
1 Commits
fix/fastap
...
hush/aggre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a163201ea |
27
CHANGELOG.md
27
CHANGELOG.md
@@ -5,33 +5,6 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added RTVI messages for user/bot audio levels and system logs.
|
||||
|
||||
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated the default model for `AnthropicLLMService` to
|
||||
`claude-sonnet-4-5-20250929`.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a
|
||||
future version. Instead, create your own custom frame and handle it in the
|
||||
`@transport.output().event_handler("on_after_push_frame")` event handler or a
|
||||
custom processor.
|
||||
|
||||
## Fixed
|
||||
|
||||
- Fixed a `PipelineTask` issue that could prevent the application to exit if
|
||||
`task.cancel()` was called when the task was already finished.
|
||||
|
||||
- Fixed an issue where local SmartTurn was not being ran in a separate thread.
|
||||
|
||||
## [0.0.86] - 2025-09-24
|
||||
|
||||
### Added
|
||||
|
||||
@@ -29,6 +29,10 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
|
||||
logger.info("Loading Whisker debugger...")
|
||||
from pipecat_whisker import WhiskerObserver
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -52,6 +56,8 @@ class TranscriptHandler:
|
||||
"""
|
||||
self.messages: List[TranscriptionMessage] = []
|
||||
self.output_file: Optional[str] = output_file
|
||||
self._current_user_sentence = ""
|
||||
self._current_assistant_sentence = ""
|
||||
logger.debug(
|
||||
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
|
||||
)
|
||||
@@ -78,11 +84,29 @@ class TranscriptHandler:
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving transcript message to file: {e}")
|
||||
|
||||
async def _save_sentence(self, role: str, content: str, timestamp: Optional[str] = None):
|
||||
"""Save a complete sentence as a transcript message.
|
||||
|
||||
Args:
|
||||
role: The role (user/assistant)
|
||||
content: The complete sentence content
|
||||
timestamp: Optional timestamp
|
||||
"""
|
||||
# Cast role to the appropriate literal type
|
||||
message_role = "user" if role == "user" else "assistant"
|
||||
sentence_message = TranscriptionMessage(
|
||||
role=message_role, content=content.strip(), timestamp=timestamp
|
||||
)
|
||||
self.messages.append(sentence_message)
|
||||
await self.save_message(sentence_message)
|
||||
|
||||
async def on_transcript_update(
|
||||
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
|
||||
):
|
||||
"""Handle new transcript messages.
|
||||
|
||||
Aggregates messages into complete sentences before saving them using match_endofsentence.
|
||||
|
||||
Args:
|
||||
processor: The TranscriptProcessor that emitted the update
|
||||
frame: TranscriptionUpdateFrame containing new messages
|
||||
@@ -90,8 +114,31 @@ class TranscriptHandler:
|
||||
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
|
||||
|
||||
for msg in frame.messages:
|
||||
self.messages.append(msg)
|
||||
await self.save_message(msg)
|
||||
# Accumulate text for the appropriate role
|
||||
if msg.role == "user":
|
||||
self._current_user_sentence += msg.content + " "
|
||||
# Check if we have a complete sentence
|
||||
if match_endofsentence(self._current_user_sentence):
|
||||
await self._save_sentence("user", self._current_user_sentence, msg.timestamp)
|
||||
self._current_user_sentence = ""
|
||||
elif msg.role == "assistant":
|
||||
self._current_assistant_sentence += msg.content + " "
|
||||
# Check if we have a complete sentence
|
||||
if match_endofsentence(self._current_assistant_sentence):
|
||||
await self._save_sentence(
|
||||
"assistant", self._current_assistant_sentence, msg.timestamp
|
||||
)
|
||||
self._current_assistant_sentence = ""
|
||||
|
||||
async def finalize_partial_sentences(self):
|
||||
"""Save any remaining partial sentences when the conversation ends."""
|
||||
if self._current_user_sentence.strip():
|
||||
await self._save_sentence("user", self._current_user_sentence)
|
||||
self._current_user_sentence = ""
|
||||
|
||||
if self._current_assistant_sentence.strip():
|
||||
await self._save_sentence("assistant", self._current_assistant_sentence)
|
||||
self._current_assistant_sentence = ""
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
@@ -160,12 +207,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
]
|
||||
)
|
||||
|
||||
# Create Whisker debugger observer
|
||||
whisker = WhiskerObserver(pipeline)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[whisker],
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@@ -183,6 +234,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
# Finalize any partial sentences before canceling
|
||||
await transcript_handler.finalize_partial_sentences()
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
@@ -206,14 +206,6 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("NASA_API_KEY"):
|
||||
logger.error(
|
||||
f"Please set NASA_API_KEY environment variable for this example. See https://api.nasa.gov"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -141,14 +141,6 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("MCP_RUN_SSE_URL"):
|
||||
logger.error(
|
||||
f"Please set MCP_RUN_SSE_URL environment variable for this example. See https://mcp.run"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -219,14 +219,6 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("NASA_API_KEY") or not os.getenv("MCP_RUN_SSE_URL"):
|
||||
logger.error(
|
||||
f"Please set NASA_API_KEY and MCP_RUN_SSE_URL environment variables. See https://api.nasa.gov and https://mcp.run"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -145,14 +145,6 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
|
||||
logger.error(
|
||||
f"Please set GITHUB_PERSONAL_ACCESS_TOKEN environment variable for this example."
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
||||
description = "Quickstart example for building voice AI bots with Pipecat"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.86",
|
||||
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.85",
|
||||
"pipecatcloud>=0.2.4"
|
||||
]
|
||||
|
||||
|
||||
@@ -34,8 +34,7 @@ from pipecat.frames.frames import EndTaskFrame, LLMRunFrame, OutputImageRawFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
@@ -284,8 +283,8 @@ async def run_eval_pipeline(
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
audio_buffer = AudioBufferProcessor()
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@ from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
|
||||
|
||||
@@ -31,12 +29,6 @@ class EndOfTurnState(Enum):
|
||||
INCOMPLETE = 2
|
||||
|
||||
|
||||
class BaseTurnParams(BaseModel):
|
||||
"""Base class for turn analyzer parameters."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class BaseTurnAnalyzer(ABC):
|
||||
"""Abstract base class for analyzing user end of turn.
|
||||
|
||||
@@ -86,7 +78,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def params(self) -> BaseTurnParams:
|
||||
def params(self):
|
||||
"""Get the current turn analyzer parameters.
|
||||
|
||||
Returns:
|
||||
|
||||
@@ -11,17 +11,15 @@ machine learning models to determine when a user has finished speaking, going
|
||||
beyond simple silence-based detection.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, BaseTurnParams, EndOfTurnState
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
|
||||
|
||||
# Default timing parameters
|
||||
@@ -31,7 +29,7 @@ MAX_DURATION_SECONDS = 8 # Max allowed segment duration
|
||||
USE_ONLY_LAST_VAD_SEGMENT = True
|
||||
|
||||
|
||||
class SmartTurnParams(BaseTurnParams):
|
||||
class SmartTurnParams(BaseModel):
|
||||
"""Configuration parameters for smart turn analysis.
|
||||
|
||||
Parameters:
|
||||
@@ -79,9 +77,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
self._speech_triggered = False
|
||||
self._silence_ms = 0
|
||||
self._speech_start_time = 0
|
||||
# Thread executor that will run the model. We only need one thread per
|
||||
# analyzer because one analyzer just handles one audio stream.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
@property
|
||||
def speech_triggered(self) -> bool:
|
||||
@@ -156,10 +151,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
Tuple containing the end-of-turn state and optional metrics data
|
||||
from the ML model analysis.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
state, result = await loop.run_in_executor(
|
||||
self._executor, self._process_speech_segment, self._audio_buffer
|
||||
)
|
||||
state, result = await self._process_speech_segment(self._audio_buffer)
|
||||
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
|
||||
self._clear(state)
|
||||
logger.debug(f"End of Turn result: {state}")
|
||||
@@ -177,7 +169,9 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
self._speech_start_time = 0
|
||||
self._silence_ms = 0
|
||||
|
||||
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
async def _process_speech_segment(
|
||||
self, audio_buffer
|
||||
) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
"""Process accumulated audio segment using ML model."""
|
||||
state = EndOfTurnState.INCOMPLETE
|
||||
|
||||
@@ -209,7 +203,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
if len(segment_audio) > 0:
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
result = self._predict_endpoint(segment_audio)
|
||||
result = await self._predict_endpoint(segment_audio)
|
||||
state = (
|
||||
EndOfTurnState.COMPLETE
|
||||
if result["prediction"] == 1
|
||||
@@ -255,6 +249,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
return state, result_data
|
||||
|
||||
@abstractmethod
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using ML model from audio data."""
|
||||
pass
|
||||
|
||||
@@ -104,15 +104,11 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
|
||||
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
|
||||
raise Exception("Failed to send raw request to Daily Smart Turn.")
|
||||
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using remote HTTP ML service."""
|
||||
try:
|
||||
serialized_array = self._serialize_array(audio_array)
|
||||
loop = asyncio.get_running_loop()
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._send_raw_request(serialized_array), loop
|
||||
)
|
||||
return future.result()
|
||||
return await self._send_raw_request(serialized_array)
|
||||
except Exception as e:
|
||||
logger.error(f"Smart turn prediction failed: {str(e)}")
|
||||
# Return an incomplete prediction when a failure occurs
|
||||
|
||||
@@ -64,7 +64,7 @@ class LocalSmartTurnAnalyzer(BaseSmartTurn):
|
||||
self._turn_model.eval()
|
||||
logger.debug("Loaded Local Smart Turn")
|
||||
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using local PyTorch model."""
|
||||
inputs = self._turn_processor(
|
||||
audio_array,
|
||||
|
||||
@@ -73,7 +73,7 @@ class LocalSmartTurnAnalyzerV2(BaseSmartTurn):
|
||||
self._turn_model.eval()
|
||||
logger.debug("Loaded Local Smart Turn v2")
|
||||
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using local PyTorch model."""
|
||||
inputs = self._turn_processor(
|
||||
audio_array,
|
||||
|
||||
@@ -77,7 +77,7 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
|
||||
|
||||
logger.debug("Loaded Local Smart Turn v3")
|
||||
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using local ONNX model."""
|
||||
|
||||
def truncate_audio_to_last_n_seconds(audio_array, n_seconds=8, sample_rate=16000):
|
||||
|
||||
@@ -11,9 +11,7 @@ data structures for voice activity detection in audio streams. Includes state
|
||||
management, parameter configuration, and audio analysis framework.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
@@ -86,10 +84,6 @@ class VADAnalyzer(ABC):
|
||||
self._smoothing_factor = 0.2
|
||||
self._prev_volume = 0
|
||||
|
||||
# Thread executor that will run the model. We only need one thread per
|
||||
# analyzer because one analyzer just handles one audio stream.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""Get the current sample rate.
|
||||
@@ -171,7 +165,7 @@ class VADAnalyzer(ABC):
|
||||
volume = calculate_audio_volume(audio, self.sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
async def analyze_audio(self, buffer: bytes) -> VADState:
|
||||
def analyze_audio(self, buffer) -> VADState:
|
||||
"""Analyze audio buffer and return current VAD state.
|
||||
|
||||
Processes incoming audio data, maintains internal state, and determines
|
||||
@@ -183,12 +177,6 @@ class VADAnalyzer(ABC):
|
||||
Returns:
|
||||
Current VAD state after processing the buffer.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
state = await loop.run_in_executor(self._executor, self._run_analyzer, buffer)
|
||||
return state
|
||||
|
||||
def _run_analyzer(self, buffer: bytes) -> VADState:
|
||||
"""Analyze audio buffer and return current VAD state."""
|
||||
self._vad_buffer += buffer
|
||||
|
||||
num_required_bytes = self._vad_frames_num_bytes
|
||||
|
||||
@@ -13,7 +13,8 @@ including heartbeats, idle detection, and observer integration.
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
|
||||
from collections import deque
|
||||
from typing import Any, AsyncIterable, Deque, Dict, Iterable, List, Optional, Tuple, Type
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
@@ -30,6 +31,7 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
HeartbeatFrame,
|
||||
InputAudioRawFrame,
|
||||
InterruptionFrame,
|
||||
InterruptionTaskFrame,
|
||||
MetricsFrame,
|
||||
@@ -393,8 +395,7 @@ class PipelineTask(BasePipelineTask):
|
||||
Cancels all running tasks and stops frame processing without
|
||||
waiting for completion.
|
||||
"""
|
||||
if not self._finished:
|
||||
await self._cancel()
|
||||
await self._cancel()
|
||||
|
||||
async def run(self, params: PipelineTaskParams):
|
||||
"""Start and manage the pipeline execution until completion or cancellation.
|
||||
|
||||
@@ -13,7 +13,6 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from abc import abstractmethod
|
||||
from typing import Any, Dict, List, Literal, Optional, Set
|
||||
|
||||
from loguru import logger
|
||||
@@ -170,11 +169,6 @@ class LLMContextAggregator(FrameProcessor):
|
||||
"""Reset the aggregation state."""
|
||||
self._aggregation = ""
|
||||
|
||||
@abstractmethod
|
||||
async def push_aggregation(self):
|
||||
"""Push the current aggregation downstream."""
|
||||
pass
|
||||
|
||||
|
||||
class LLMUserAggregator(LLMContextAggregator):
|
||||
"""User LLM aggregator that processes speech-to-text transcriptions.
|
||||
@@ -307,7 +301,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
frame = LLMContextFrame(self._context)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def push_aggregation(self):
|
||||
async def _push_aggregation(self):
|
||||
"""Push the current aggregation based on interruption strategies and conditions."""
|
||||
if len(self._aggregation) > 0:
|
||||
if self.interruption_strategies and self._bot_speaking:
|
||||
@@ -398,7 +392,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
# pushing the aggregation as we will probably get a final transcription.
|
||||
if len(self._aggregation) > 0:
|
||||
if not self._seen_interim_results:
|
||||
await self.push_aggregation()
|
||||
await self._push_aggregation()
|
||||
# Handles the case where both the user and the bot are not speaking,
|
||||
# and the bot was previously speaking before the user interruption.
|
||||
# So in this case we are resetting the aggregation timer
|
||||
@@ -477,7 +471,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await self._maybe_emulate_user_speaking()
|
||||
except asyncio.TimeoutError:
|
||||
if not self._user_speaking:
|
||||
await self.push_aggregation()
|
||||
await self._push_aggregation()
|
||||
|
||||
# If we are emulating VAD we still need to send the user stopped
|
||||
# speaking frame.
|
||||
@@ -613,12 +607,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
|
||||
await self._handle_user_image_frame(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self.push_aggregation()
|
||||
await self._push_aggregation()
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def push_aggregation(self):
|
||||
async def _push_aggregation(self):
|
||||
"""Push the current assistant aggregation with timestamp."""
|
||||
if not self._aggregation:
|
||||
return
|
||||
@@ -650,7 +644,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_interruptions(self, frame: InterruptionFrame):
|
||||
await self.push_aggregation()
|
||||
await self._push_aggregation()
|
||||
self._started = 0
|
||||
await self.reset()
|
||||
|
||||
@@ -784,7 +778,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
text=frame.request.context,
|
||||
)
|
||||
|
||||
await self.push_aggregation()
|
||||
await self._push_aggregation()
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
|
||||
@@ -792,7 +786,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
|
||||
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
|
||||
self._started -= 1
|
||||
await self.push_aggregation()
|
||||
await self._push_aggregation()
|
||||
|
||||
async def _handle_text(self, frame: TextFrame):
|
||||
if not self._started:
|
||||
|
||||
@@ -12,14 +12,14 @@ in conversational pipelines.
|
||||
"""
|
||||
|
||||
from pipecat.frames.frames import TextFrame
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMUserAggregator
|
||||
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
|
||||
class UserResponseAggregator(LLMUserAggregator):
|
||||
class UserResponseAggregator(LLMUserContextAggregator):
|
||||
"""Aggregates user responses into TextFrame objects.
|
||||
|
||||
This aggregator extends LLMUserAggregator to specifically handle
|
||||
This aggregator extends LLMUserContextAggregator to specifically handle
|
||||
user input by collecting text responses and outputting them as TextFrame
|
||||
objects when the aggregation is complete.
|
||||
"""
|
||||
@@ -28,9 +28,9 @@ class UserResponseAggregator(LLMUserAggregator):
|
||||
"""Initialize the user response aggregator.
|
||||
|
||||
Args:
|
||||
**kwargs: Additional arguments passed to parent LLMUserAggregator.
|
||||
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
|
||||
"""
|
||||
super().__init__(context=LLMContext(), **kwargs)
|
||||
super().__init__(context=OpenAILLMContext(), **kwargs)
|
||||
|
||||
async def push_aggregation(self):
|
||||
"""Push the aggregated user response as a TextFrame.
|
||||
|
||||
@@ -13,7 +13,6 @@ and frame observation for the RTVI protocol.
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
Any,
|
||||
@@ -30,7 +29,6 @@ from typing import (
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
|
||||
|
||||
from pipecat.audio.utils import calculate_audio_volume
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
@@ -54,7 +52,6 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
TranscriptionFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSTextFrame,
|
||||
@@ -616,9 +613,9 @@ class RTVIAppendToContextData(BaseModel):
|
||||
|
||||
Contains the role, content, and whether to run the message immediately.
|
||||
|
||||
.. deprecated:: 0.0.85
|
||||
The RTVI message, append-to-context, has been deprecated. Use send-text
|
||||
or custom client and server messages instead.
|
||||
.. deprecated:: 0.0.85
|
||||
The RTVI message, append-to-context, has been deprecated. Use send-text
|
||||
or custom client and server messages instead.
|
||||
"""
|
||||
|
||||
role: Literal["user", "assistant"] | str
|
||||
@@ -842,36 +839,6 @@ class RTVIServerMessage(BaseModel):
|
||||
data: Any
|
||||
|
||||
|
||||
class RTVIAudioLevelMessageData(BaseModel):
|
||||
"""Data format for sending audio levels."""
|
||||
|
||||
value: float
|
||||
|
||||
|
||||
class RTVIUserAudioLevelMessage(BaseModel):
|
||||
"""Message indicating user audio level."""
|
||||
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["user-audio-level"] = "user-audio-level"
|
||||
data: RTVIAudioLevelMessageData
|
||||
|
||||
|
||||
class RTVIBotAudioLevelMessage(BaseModel):
|
||||
"""Message indicating bot audio level."""
|
||||
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["bot-audio-level"] = "bot-audio-level"
|
||||
data: RTVIAudioLevelMessageData
|
||||
|
||||
|
||||
class RTVISystemLogMessage(BaseModel):
|
||||
"""Message including a system log."""
|
||||
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["system-log"] = "system-log"
|
||||
data: RTVITextMessageData
|
||||
|
||||
|
||||
@dataclass
|
||||
class RTVIServerMessageFrame(SystemFrame):
|
||||
"""A frame for sending server messages to the client.
|
||||
@@ -891,36 +858,25 @@ class RTVIServerMessageFrame(SystemFrame):
|
||||
class RTVIObserverParams:
|
||||
"""Parameters for configuring RTVI Observer behavior.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
Parameter `errors_enabled` is deprecated. Error messages are always enabled.
|
||||
|
||||
Parameters:
|
||||
bot_llm_enabled: Indicates if the bot's LLM messages should be sent.
|
||||
bot_tts_enabled: Indicates if the bot's TTS messages should be sent.
|
||||
bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent.
|
||||
bot_audio_level_enabled: Indicates if bot's audio level messages should be sent.
|
||||
user_llm_enabled: Indicates if the user's LLM input messages should be sent.
|
||||
user_speaking_enabled: Indicates if the user's started/stopped speaking messages should be sent.
|
||||
user_transcription_enabled: Indicates if user's transcription messages should be sent.
|
||||
user_audio_level_enabled: Indicates if user's audio level messages should be sent.
|
||||
metrics_enabled: Indicates if metrics messages should be sent.
|
||||
system_logs_enabled: Indicates if system logs should be sent.
|
||||
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
|
||||
audio_level_period_secs: How often audio levels should be sent if enabled.
|
||||
errors_enabled: Indicates if errors messages should be sent.
|
||||
"""
|
||||
|
||||
bot_llm_enabled: bool = True
|
||||
bot_tts_enabled: bool = True
|
||||
bot_speaking_enabled: bool = True
|
||||
bot_audio_level_enabled: bool = False
|
||||
user_llm_enabled: bool = True
|
||||
user_speaking_enabled: bool = True
|
||||
user_transcription_enabled: bool = True
|
||||
user_audio_level_enabled: bool = False
|
||||
metrics_enabled: bool = True
|
||||
system_logs_enabled: bool = False
|
||||
errors_enabled: bool = True
|
||||
audio_level_period_secs: float = 0.15
|
||||
|
||||
|
||||
class RTVIObserver(BaseObserver):
|
||||
@@ -936,11 +892,7 @@ class RTVIObserver(BaseObserver):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
rtvi: Optional["RTVIProcessor"] = None,
|
||||
*,
|
||||
params: Optional[RTVIObserverParams] = None,
|
||||
**kwargs,
|
||||
self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs
|
||||
):
|
||||
"""Initialize the RTVI observer.
|
||||
|
||||
@@ -952,50 +904,9 @@ class RTVIObserver(BaseObserver):
|
||||
super().__init__(**kwargs)
|
||||
self._rtvi = rtvi
|
||||
self._params = params or RTVIObserverParams()
|
||||
|
||||
self._frames_seen = set()
|
||||
|
||||
self._bot_transcription = ""
|
||||
self._last_user_audio_level = 0
|
||||
self._last_bot_audio_level = 0
|
||||
|
||||
if self._params.system_logs_enabled:
|
||||
self._system_logger_id = logger.add(self._logger_sink)
|
||||
|
||||
if self._params.errors_enabled:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Parameter `errors_enabled` is deprecated. Error messages are always enabled.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
async def _logger_sink(self, message):
|
||||
"""Logger sink so we cna send system logs to RTVI clients."""
|
||||
message = RTVISystemLogMessage(data=RTVITextMessageData(text=message))
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup RTVI observer resources."""
|
||||
await super().cleanup()
|
||||
if self._params.system_logs_enabled:
|
||||
logger.remove(self._system_logger_id)
|
||||
|
||||
async def send_rtvi_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Send an RTVI message.
|
||||
|
||||
By default, we push a transport frame. But this function can be
|
||||
overriden by subclass to send RTVI messages in different ways.
|
||||
|
||||
Args:
|
||||
model: The message to send.
|
||||
exclude_none: Whether to exclude None values from the model dump.
|
||||
|
||||
"""
|
||||
if self._rtvi:
|
||||
await self._rtvi.push_transport_message(model, exclude_none)
|
||||
self._frames_seen = set()
|
||||
rtvi.set_errors_enabled(self._params.errors_enabled)
|
||||
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Process a frame being pushed through the pipeline.
|
||||
@@ -1037,58 +948,52 @@ class RTVIObserver(BaseObserver):
|
||||
):
|
||||
await self._handle_context(frame)
|
||||
elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled:
|
||||
await self.send_rtvi_message(RTVIBotLLMStartedMessage())
|
||||
await self.push_transport_message_urgent(RTVIBotLLMStartedMessage())
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled:
|
||||
await self.send_rtvi_message(RTVIBotLLMStoppedMessage())
|
||||
await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage())
|
||||
elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled:
|
||||
await self._handle_llm_text_frame(frame)
|
||||
elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled:
|
||||
await self.send_rtvi_message(RTVIBotTTSStartedMessage())
|
||||
await self.push_transport_message_urgent(RTVIBotTTSStartedMessage())
|
||||
elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled:
|
||||
await self.send_rtvi_message(RTVIBotTTSStoppedMessage())
|
||||
await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage())
|
||||
elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled:
|
||||
if isinstance(src, BaseOutputTransport):
|
||||
message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text))
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
else:
|
||||
mark_as_seen = False
|
||||
elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled:
|
||||
await self._handle_metrics(frame)
|
||||
elif isinstance(frame, RTVIServerMessageFrame):
|
||||
message = RTVIServerMessage(data=frame.data)
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
elif isinstance(frame, RTVIServerResponseFrame):
|
||||
if frame.error is not None:
|
||||
await self._send_error_response(frame)
|
||||
else:
|
||||
await self._send_server_response(frame)
|
||||
elif isinstance(frame, InputAudioRawFrame) and self._params.user_audio_level_enabled:
|
||||
curr_time = time.time()
|
||||
diff_time = curr_time - self._last_user_audio_level
|
||||
if diff_time > self._params.audio_level_period_secs:
|
||||
level = calculate_audio_volume(frame.audio, frame.sample_rate)
|
||||
message = RTVIUserAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
|
||||
await self.send_rtvi_message(message)
|
||||
self._last_user_audio_level = curr_time
|
||||
elif isinstance(frame, TTSAudioRawFrame) and self._params.bot_audio_level_enabled:
|
||||
curr_time = time.time()
|
||||
diff_time = curr_time - self._last_bot_audio_level
|
||||
if diff_time > self._params.audio_level_period_secs:
|
||||
level = calculate_audio_volume(frame.audio, frame.sample_rate)
|
||||
message = RTVIBotAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
|
||||
await self.send_rtvi_message(message)
|
||||
self._last_bot_audio_level = curr_time
|
||||
|
||||
if mark_as_seen:
|
||||
self._frames_seen.add(frame.id)
|
||||
|
||||
async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Push an urgent transport message to the RTVI processor.
|
||||
|
||||
Args:
|
||||
model: The message model to send.
|
||||
exclude_none: Whether to exclude None values from the model dump.
|
||||
"""
|
||||
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
|
||||
await self._rtvi.push_frame(frame)
|
||||
|
||||
async def _push_bot_transcription(self):
|
||||
"""Push accumulated bot transcription as a message."""
|
||||
if len(self._bot_transcription) > 0:
|
||||
message = RTVIBotTranscriptionMessage(
|
||||
data=RTVITextMessageData(text=self._bot_transcription)
|
||||
)
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
self._bot_transcription = ""
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
@@ -1100,7 +1005,7 @@ class RTVIObserver(BaseObserver):
|
||||
message = RTVIUserStoppedSpeakingMessage()
|
||||
|
||||
if message:
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
async def _handle_bot_speaking(self, frame: Frame):
|
||||
"""Handle bot speaking event frames."""
|
||||
@@ -1111,12 +1016,12 @@ class RTVIObserver(BaseObserver):
|
||||
message = RTVIBotStoppedSpeakingMessage()
|
||||
|
||||
if message:
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
async def _handle_llm_text_frame(self, frame: LLMTextFrame):
|
||||
"""Handle LLM text output frames."""
|
||||
message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text))
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
self._bot_transcription += frame.text
|
||||
if match_endofsentence(self._bot_transcription):
|
||||
@@ -1139,7 +1044,7 @@ class RTVIObserver(BaseObserver):
|
||||
)
|
||||
|
||||
if message:
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
async def _handle_context(self, frame: OpenAILLMContextFrame | LLMContextFrame):
|
||||
"""Process LLM context frames to extract user messages for the RTVI client."""
|
||||
@@ -1159,7 +1064,7 @@ class RTVIObserver(BaseObserver):
|
||||
text = "".join(part.text for part in message.parts if hasattr(part, "text"))
|
||||
if text:
|
||||
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
|
||||
await self.send_rtvi_message(rtvi_message)
|
||||
await self.push_transport_message_urgent(rtvi_message)
|
||||
|
||||
# Handle OpenAI format (original implementation)
|
||||
elif isinstance(message, dict):
|
||||
@@ -1170,7 +1075,7 @@ class RTVIObserver(BaseObserver):
|
||||
else:
|
||||
text = content
|
||||
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
|
||||
await self.send_rtvi_message(rtvi_message)
|
||||
await self.push_transport_message_urgent(rtvi_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Caught an error while trying to handle context: {e}")
|
||||
@@ -1197,7 +1102,7 @@ class RTVIObserver(BaseObserver):
|
||||
metrics["characters"].append(d.model_dump(exclude_none=True))
|
||||
|
||||
message = RTVIMetricsMessage(data=metrics)
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
async def _send_server_response(self, frame: RTVIServerResponseFrame):
|
||||
"""Send a response to the client for a specific request."""
|
||||
@@ -1205,7 +1110,7 @@ class RTVIObserver(BaseObserver):
|
||||
id=str(frame.client_msg.msg_id),
|
||||
data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data),
|
||||
)
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
async def _send_error_response(self, frame: RTVIServerResponseFrame):
|
||||
"""Send a response to the client for a specific request."""
|
||||
@@ -1213,7 +1118,7 @@ class RTVIObserver(BaseObserver):
|
||||
message = RTVIErrorResponse(
|
||||
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
|
||||
)
|
||||
await self.send_rtvi_message(message)
|
||||
await self.push_transport_message_urgent(message)
|
||||
|
||||
|
||||
class RTVIProcessor(FrameProcessor):
|
||||
@@ -1247,6 +1152,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
# Default to 0.3.0 which is the last version before actually having a
|
||||
# "client-version".
|
||||
self._client_version = [0, 3, 0]
|
||||
self._errors_enabled = True
|
||||
self._skip_tts: bool = False # Keep in sync with llm_service.py
|
||||
|
||||
self._registered_actions: Dict[str, RTVIAction] = {}
|
||||
@@ -1316,6 +1222,14 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self._update_config(self._config, False)
|
||||
await self._send_bot_ready()
|
||||
|
||||
def set_errors_enabled(self, enabled: bool):
|
||||
"""Enable or disable error message sending.
|
||||
|
||||
Args:
|
||||
enabled: Whether to send error messages.
|
||||
"""
|
||||
self._errors_enabled = enabled
|
||||
|
||||
async def interrupt_bot(self):
|
||||
"""Send a bot interruption frame upstream."""
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
@@ -1344,11 +1258,6 @@ class RTVIProcessor(FrameProcessor):
|
||||
"""
|
||||
await self._send_error_frame(ErrorFrame(error=error))
|
||||
|
||||
async def push_transport_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Push a transport message frame."""
|
||||
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def handle_message(self, message: RTVIMessage):
|
||||
"""Handle an incoming RTVI message.
|
||||
|
||||
@@ -1369,7 +1278,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
args=params.arguments,
|
||||
)
|
||||
message = RTVILLMFunctionCallMessage(data=fn)
|
||||
await self.push_transport_message(message, exclude_none=False)
|
||||
await self._push_transport_message(message, exclude_none=False)
|
||||
|
||||
async def handle_function_call_start(
|
||||
self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext
|
||||
@@ -1396,7 +1305,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
fn = RTVILLMFunctionCallStartMessageData(function_name=function_name)
|
||||
message = RTVILLMFunctionCallStartMessage(data=fn)
|
||||
await self.push_transport_message(message, exclude_none=False)
|
||||
await self._push_transport_message(message, exclude_none=False)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames through the RTVI processor.
|
||||
@@ -1468,6 +1377,11 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self.cancel_task(self._message_task)
|
||||
self._message_task = None
|
||||
|
||||
async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Push a transport message frame."""
|
||||
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _action_task_handler(self):
|
||||
"""Handle incoming action frames."""
|
||||
while True:
|
||||
@@ -1604,7 +1518,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
services = list(self._registered_services.values())
|
||||
message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services))
|
||||
await self.push_transport_message(message)
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _handle_describe_actions(self, request_id: str):
|
||||
"""Handle a describe-actions request."""
|
||||
@@ -1619,7 +1533,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
actions = list(self._registered_actions.values())
|
||||
message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions))
|
||||
await self.push_transport_message(message)
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _handle_get_config(self, request_id: str):
|
||||
"""Handle a get-config request."""
|
||||
@@ -1633,7 +1547,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
)
|
||||
|
||||
message = RTVIConfigResponse(id=request_id, data=self._config)
|
||||
await self.push_transport_message(message)
|
||||
await self._push_transport_message(message)
|
||||
|
||||
def _update_config_option(self, service: str, config: RTVIServiceOptionConfig):
|
||||
"""Update a specific configuration option."""
|
||||
@@ -1758,7 +1672,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
# action responses (such as webhooks) don't set a request_id
|
||||
if request_id:
|
||||
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
|
||||
await self.push_transport_message(message)
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _send_bot_ready(self):
|
||||
"""Send the bot-ready message to the client."""
|
||||
@@ -1769,21 +1683,23 @@ class RTVIProcessor(FrameProcessor):
|
||||
id=self._client_ready_id,
|
||||
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config),
|
||||
)
|
||||
await self.push_transport_message(message)
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse):
|
||||
"""Send a message or response to the client."""
|
||||
await self.push_transport_message(message)
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _send_error_frame(self, frame: ErrorFrame):
|
||||
"""Send an error frame as an RTVI error message."""
|
||||
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
|
||||
await self.push_transport_message(message)
|
||||
if self._errors_enabled:
|
||||
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _send_error_response(self, id: str, error: str):
|
||||
"""Send an error response message."""
|
||||
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
|
||||
await self.push_transport_message(message)
|
||||
if self._errors_enabled:
|
||||
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
|
||||
await self._push_transport_message(message)
|
||||
|
||||
def _action_id(self, service: str, action: str) -> str:
|
||||
"""Generate an action ID from service and action names."""
|
||||
|
||||
@@ -151,7 +151,7 @@ class AnthropicLLMService(LLMService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "claude-sonnet-4-5-20250929",
|
||||
model: str = "claude-sonnet-4-20250514",
|
||||
params: Optional[InputParams] = None,
|
||||
client=None,
|
||||
retry_timeout_secs: Optional[float] = 5.0,
|
||||
@@ -162,7 +162,7 @@ class AnthropicLLMService(LLMService):
|
||||
|
||||
Args:
|
||||
api_key: Anthropic API key for authentication.
|
||||
model: Model name to use. Defaults to "claude-sonnet-4-5-20250929".
|
||||
model: Model name to use. Defaults to "claude-sonnet-4-20250514".
|
||||
params: Optional model parameters for inference.
|
||||
client: Optional custom Anthropic client instance.
|
||||
retry_timeout_secs: Request timeout in seconds for retry logic.
|
||||
|
||||
@@ -429,7 +429,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
await self._finish_connecting_if_context_available()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self._disconnect()
|
||||
self._disconnect()
|
||||
|
||||
async def _finish_connecting_if_context_available(self):
|
||||
# We can only finish connecting once we've gotten our initial context and we're ready to
|
||||
|
||||
@@ -337,16 +337,10 @@ class BaseOpenAILLMService(LLMService):
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.usage:
|
||||
cached_tokens = (
|
||||
chunk.usage.prompt_tokens_details.cached_tokens
|
||||
if chunk.usage.prompt_tokens_details
|
||||
else None
|
||||
)
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=chunk.usage.prompt_tokens,
|
||||
completion_tokens=chunk.usage.completion_tokens,
|
||||
total_tokens=chunk.usage.total_tokens,
|
||||
cache_read_input_tokens=cached_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ input processing, including VAD, turn analysis, and interruption management.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
@@ -78,6 +79,10 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Track user speaking state for interruption logic
|
||||
self._user_speaking = False
|
||||
|
||||
# We read audio from a single queue one at a time and we then run VAD in
|
||||
# a thread. Therefore, only one thread should be necessary.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
# Task to process incoming audio (VAD) and push audio frames downstream
|
||||
# if passthrough is enabled.
|
||||
self._audio_task = None
|
||||
@@ -393,7 +398,9 @@ class BaseInputTransport(FrameProcessor):
|
||||
"""Analyze audio frame for voice activity."""
|
||||
state = VADState.QUIET
|
||||
if self.vad_analyzer:
|
||||
state = await self.vad_analyzer.analyze_audio(audio_frame.audio)
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
|
||||
)
|
||||
return state
|
||||
|
||||
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState) -> VADState:
|
||||
|
||||
@@ -110,32 +110,12 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
|
||||
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
|
||||
"""Frame to update remote participants in Daily calls.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
`DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a future version.
|
||||
Create your own custom frame and use a custom processor to handle it or use, for example,
|
||||
`on_after_push_frame` event instead in the output transport.
|
||||
|
||||
Parameters:
|
||||
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
|
||||
"""
|
||||
|
||||
remote_participants: Mapping[str, Any] = None
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"DailyUpdateRemoteParticipantsFrame is deprecated and will be removed in a future version."
|
||||
"Instead, create your own custom frame and handle it in the "
|
||||
'`@transport.output().event_handler("on_after_push_frame")` event handler or a '
|
||||
"custom processor.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
class WebRTCVADAnalyzer(VADAnalyzer):
|
||||
"""Voice Activity Detection analyzer using WebRTC.
|
||||
|
||||
@@ -278,13 +278,6 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Main message receiving loop for WebSocket messages."""
|
||||
|
||||
async def trigger_disconnect_if_needed():
|
||||
# Trigger `on_client_disconnected` if the client actually disconnects,
|
||||
# that is, we are not the ones disconnecting.
|
||||
if not self._client.is_closing:
|
||||
await self._client.trigger_client_disconnected()
|
||||
|
||||
try:
|
||||
async for message in self._client.receive():
|
||||
if not self._params.serializer:
|
||||
@@ -301,14 +294,11 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
|
||||
finally:
|
||||
# Use shield to prevent cancellation from stopping the disconnect callback
|
||||
try:
|
||||
await asyncio.shield(trigger_disconnect_if_needed())
|
||||
except asyncio.CancelledError:
|
||||
# Even if we're cancelled, try to trigger the disconnect
|
||||
await trigger_disconnect_if_needed()
|
||||
raise
|
||||
|
||||
# Trigger `on_client_disconnected` if the client actually disconnects,
|
||||
# that is, we are not the ones disconnecting.
|
||||
if not self._client.is_closing:
|
||||
await self._client.trigger_client_disconnected()
|
||||
|
||||
async def _monitor_websocket(self):
|
||||
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
|
||||
|
||||
@@ -12,12 +12,14 @@ from dotenv import load_dotenv
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import LLMContextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams, LLMService
|
||||
from pipecat.services.llm_service import LLMService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.tests.utils import run_test
|
||||
|
||||
@@ -46,13 +48,8 @@ def standard_tools() -> ToolsSchema:
|
||||
|
||||
|
||||
async def _test_llm_function_calling(llm: LLMService):
|
||||
# Create a mock weather function
|
||||
call_count = 0
|
||||
|
||||
async def mock_fetch_weather(params: FunctionCallParams):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
pass
|
||||
# Create an AsyncMock for the function
|
||||
mock_fetch_weather = AsyncMock()
|
||||
|
||||
llm.register_function(None, mock_fetch_weather)
|
||||
|
||||
@@ -63,19 +60,21 @@ async def _test_llm_function_calling(llm: LLMService):
|
||||
},
|
||||
{"role": "user", "content": " How is the weather today in San Francisco, California?"},
|
||||
]
|
||||
context = LLMContext(messages, standard_tools())
|
||||
context = OpenAILLMContext(messages, standard_tools())
|
||||
# This is done by default inside the create_context_aggregator
|
||||
context.set_llm_adapter(llm.get_llm_adapter())
|
||||
|
||||
pipeline = Pipeline([llm])
|
||||
|
||||
frames_to_send = [LLMContextFrame(context)]
|
||||
frames_to_send = [OpenAILLMContextFrame(context)]
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=None,
|
||||
)
|
||||
|
||||
# Assert that the weather function was called once
|
||||
assert call_count == 1
|
||||
# Assert that the mock function was called
|
||||
mock_fetch_weather.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set")
|
||||
|
||||
@@ -10,21 +10,24 @@ from langchain.prompts import ChatPromptTemplate
|
||||
from langchain_core.language_models import FakeStreamingListLLM
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
LLMContextAssistantTimestampFrame,
|
||||
LLMContextFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.processors.frameworks.langchain import LangchainProcessor
|
||||
from pipecat.tests.utils import SleepFrame, run_test
|
||||
@@ -64,14 +67,13 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
proc = LangchainProcessor(chain=chain)
|
||||
self.mock_proc = self.MockProcessor("token_collector")
|
||||
|
||||
context = LLMContext()
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
context, assistant_params=LLMAssistantAggregatorParams(expect_stripped_words=False)
|
||||
context = OpenAILLMContext()
|
||||
tma_in = LLMUserContextAggregator(context)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[context_aggregator.user(), proc, self.mock_proc, context_aggregator.assistant()]
|
||||
)
|
||||
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
|
||||
|
||||
frames_to_send = [
|
||||
UserStartedSpeakingFrame(),
|
||||
@@ -82,8 +84,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
expected_down_frames = [
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
LLMContextFrame,
|
||||
LLMContextAssistantTimestampFrame,
|
||||
OpenAILLMContextFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
]
|
||||
await run_test(
|
||||
pipeline,
|
||||
@@ -92,6 +94,4 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
)
|
||||
|
||||
self.assertEqual("".join(self.mock_proc.token), self.expected_response)
|
||||
self.assertEqual(
|
||||
context_aggregator.assistant().messages[-1]["content"], self.expected_response
|
||||
)
|
||||
self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
|
||||
|
||||
Reference in New Issue
Block a user