Compare commits

...

21 Commits

Author SHA1 Message Date
James Hush
a39f8b4882 Remove extra code 2025-10-01 14:48:12 +08:00
James Hush
76fc36f621 Pre recorded message example 2025-10-01 14:43:36 +08:00
James Hush
c0878c5e09 Save progress 2025-10-01 13:58:32 +08:00
James Hush
c6a1013051 Pre-recorded message example 2025-10-01 13:52:58 +08:00
Aleix Conchillo Flaqué
feae3b6d2d Merge pull request #2742 from pipecat-ai/aleix/deprecate-daily-update-remote-participants-frame
DailyTransport: deprecated DailyUpdateRemoteParticipantsFrame
2025-09-30 16:27:34 -07:00
Aleix Conchillo Flaqué
92d3be8975 DailyTransport: deprecated DailyUpdateRemoteParticipantsFrame 2025-09-30 16:26:48 -07:00
Aleix Conchillo Flaqué
0f53e1db2c Merge pull request #2759 from pipecat-ai/aleix/dont-cancel-if-finished
PipelineTask: avoid cancellation if application is finished
2025-09-30 16:21:16 -07:00
Aleix Conchillo Flaqué
d398e8cc10 Merge pull request #2761 from pipecat-ai/aleix/rtvi-tail-updates
RTVI updates: audio levels and system logs
2025-09-30 13:55:17 -07:00
Aleix Conchillo Flaqué
e5f263d380 update CHANGELOG 2025-09-30 13:51:35 -07:00
Aleix Conchillo Flaqué
3a4c303c54 RTVIParams: add errors_enabled deprecation warnings 2025-09-30 13:49:51 -07:00
Mark Backman
54a1ef47d0 Merge pull request #2758 from pipecat-ai/mb/claude-sonnet-4.5
Update AnthropicLLMService to use claude-sonnet-4-5-20250929
2025-09-30 16:42:47 -04:00
Aleix Conchillo Flaqué
149ffa4f3c RTVIObserver: add support system logs 2025-09-30 13:42:40 -07:00
Aleix Conchillo Flaqué
e5465034d9 RTVIObserver: add support for user/bot audio levels 2025-09-30 13:41:26 -07:00
Aleix Conchillo Flaqué
568c7c782d rtvi: allow None RTVIProcessor and rename to send_rtvi_message() 2025-09-30 13:35:27 -07:00
Aleix Conchillo Flaqué
9851334221 rtvi: deprecate errors_enabled and always send errors 2025-09-30 13:31:30 -07:00
Aleix Conchillo Flaqué
e79c4fc99d PipelineTask: avoid cancellation if application is finished 2025-09-30 13:18:25 -07:00
Aleix Conchillo Flaqué
55c321f4ff Merge pull request #2751 from pipecat-ai/aleix/nova-sonic-disconnect-fix
AWSNovaSonicLLMService: add missing await
2025-09-30 13:12:22 -07:00
kompfner
a14a53a005 Merge pull request #2735 from pipecat-ai/pk/remove-openaillmcontext-usage
Remove remaining usage of `OpenAILLMContext` throughout the codebase …
2025-09-30 10:09:25 -04:00
Mark Backman
a71f937e8f Update AnthropicLLMService to use claude-sonnet-4-5-20250929 2025-09-30 08:49:30 -04:00
Aleix Conchillo Flaqué
8f8d8ae0d8 AWSNovaSonicLLMService: add missing await 2025-09-26 15:58:05 -07:00
Paul Kompfner
6faa50ae5b Remove remaining usage of OpenAILLMContext throughout the codebase in favor of LLMContext, except for:
- Usage in classes that are already deprecated
- Usage related to realtime LLMs, which don't yet support `LLMContext`
- Usage in (soon-to-be-deprecated) code paths related to `OpenAILLMContext` itself and associated machinery
2025-09-24 16:35:03 -04:00
13 changed files with 271 additions and 115 deletions

View File

@@ -9,10 +9,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added RTVI messages for user/bot audio levels and system logs.
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
### Changed
- Updated the default model for `AnthropicLLMService` to
`claude-sonnet-4-5-20250929`.
### Deprecated
- `DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a
future version. Instead, create your own custom frame and handle it in the
`@transport.output().event_handler("on_after_push_frame")` event handler or a
custom processor.
## Fixed
- Fixed a `PipelineTask` issue that could prevent the application to exit if
`task.cancel()` was called when the task was already finished.
- Fixed an issue where local SmartTurn was not being ran in a separate thread.
## [0.0.86] - 2025-09-24

View File

@@ -5,6 +5,7 @@
#
import os
import wave
from dotenv import load_dotenv
from loguru import logger
@@ -13,7 +14,14 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
LLMTextFrame,
OutputAudioRawFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -103,7 +111,27 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
audio_file_path = os.path.join(os.path.dirname(__file__), "assets", "pre-recorded.wav")
with wave.open(audio_file_path, "rb") as wav_file:
llm_text_frame = TextFrame(text="This is a pre-recorded message.")
llm_text_frame.skip_tts = True
audio_data = wav_file.readframes(wav_file.getnframes())
output_audio_raw_frame = OutputAudioRawFrame(
audio=audio_data, sample_rate=44100, num_channels=1
)
await task.queue_frames(
[
LLMRunFrame(),
LLMFullResponseStartFrame(),
llm_text_frame,
output_audio_raw_frame,
LLMFullResponseEndFrame(),
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

Binary file not shown.

View File

@@ -34,7 +34,8 @@ from pipecat.frames.frames import EndTaskFrame, LLMRunFrame, OutputImageRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
@@ -283,8 +284,8 @@ async def run_eval_pipeline(
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
audio_buffer = AudioBufferProcessor()

View File

@@ -13,8 +13,7 @@ including heartbeats, idle detection, and observer integration.
import asyncio
import time
from collections import deque
from typing import Any, AsyncIterable, Deque, Dict, Iterable, List, Optional, Tuple, Type
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
from loguru import logger
from pydantic import BaseModel, ConfigDict, Field
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
HeartbeatFrame,
InputAudioRawFrame,
InterruptionFrame,
InterruptionTaskFrame,
MetricsFrame,
@@ -395,7 +393,8 @@ class PipelineTask(BasePipelineTask):
Cancels all running tasks and stops frame processing without
waiting for completion.
"""
await self._cancel()
if not self._finished:
await self._cancel()
async def run(self, params: PipelineTaskParams):
"""Start and manage the pipeline execution until completion or cancellation.

View File

@@ -13,6 +13,7 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
import asyncio
import json
from abc import abstractmethod
from typing import Any, Dict, List, Literal, Optional, Set
from loguru import logger
@@ -169,6 +170,11 @@ class LLMContextAggregator(FrameProcessor):
"""Reset the aggregation state."""
self._aggregation = ""
@abstractmethod
async def push_aggregation(self):
"""Push the current aggregation downstream."""
pass
class LLMUserAggregator(LLMContextAggregator):
"""User LLM aggregator that processes speech-to-text transcriptions.
@@ -301,7 +307,7 @@ class LLMUserAggregator(LLMContextAggregator):
frame = LLMContextFrame(self._context)
await self.push_frame(frame)
async def _push_aggregation(self):
async def push_aggregation(self):
"""Push the current aggregation based on interruption strategies and conditions."""
if len(self._aggregation) > 0:
if self.interruption_strategies and self._bot_speaking:
@@ -392,7 +398,7 @@ class LLMUserAggregator(LLMContextAggregator):
# pushing the aggregation as we will probably get a final transcription.
if len(self._aggregation) > 0:
if not self._seen_interim_results:
await self._push_aggregation()
await self.push_aggregation()
# Handles the case where both the user and the bot are not speaking,
# and the bot was previously speaking before the user interruption.
# So in this case we are resetting the aggregation timer
@@ -471,7 +477,7 @@ class LLMUserAggregator(LLMContextAggregator):
await self._maybe_emulate_user_speaking()
except asyncio.TimeoutError:
if not self._user_speaking:
await self._push_aggregation()
await self.push_aggregation()
# If we are emulating VAD we still need to send the user stopped
# speaking frame.
@@ -607,12 +613,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
await self._handle_user_image_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._push_aggregation()
await self.push_aggregation()
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _push_aggregation(self):
async def push_aggregation(self):
"""Push the current assistant aggregation with timestamp."""
if not self._aggregation:
return
@@ -644,7 +650,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_interruptions(self, frame: InterruptionFrame):
await self._push_aggregation()
await self.push_aggregation()
self._started = 0
await self.reset()
@@ -778,7 +784,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
text=frame.request.context,
)
await self._push_aggregation()
await self.push_aggregation()
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
@@ -786,7 +792,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
self._started -= 1
await self._push_aggregation()
await self.push_aggregation()
async def _handle_text(self, frame: TextFrame):
if not self._started:

View File

@@ -12,14 +12,14 @@ in conversational pipelines.
"""
from pipecat.frames.frames import TextFrame
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMUserAggregator
class UserResponseAggregator(LLMUserContextAggregator):
class UserResponseAggregator(LLMUserAggregator):
"""Aggregates user responses into TextFrame objects.
This aggregator extends LLMUserContextAggregator to specifically handle
This aggregator extends LLMUserAggregator to specifically handle
user input by collecting text responses and outputting them as TextFrame
objects when the aggregation is complete.
"""
@@ -28,9 +28,9 @@ class UserResponseAggregator(LLMUserContextAggregator):
"""Initialize the user response aggregator.
Args:
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
**kwargs: Additional arguments passed to parent LLMUserAggregator.
"""
super().__init__(context=OpenAILLMContext(), **kwargs)
super().__init__(context=LLMContext(), **kwargs)
async def push_aggregation(self):
"""Push the aggregated user response as a TextFrame.

View File

@@ -13,6 +13,7 @@ and frame observation for the RTVI protocol.
import asyncio
import base64
import time
from dataclasses import dataclass
from typing import (
Any,
@@ -29,6 +30,7 @@ from typing import (
from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
from pipecat.audio.utils import calculate_audio_volume
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
@@ -52,6 +54,7 @@ from pipecat.frames.frames import (
SystemFrame,
TranscriptionFrame,
TransportMessageUrgentFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
@@ -613,9 +616,9 @@ class RTVIAppendToContextData(BaseModel):
Contains the role, content, and whether to run the message immediately.
.. deprecated:: 0.0.85
The RTVI message, append-to-context, has been deprecated. Use send-text
or custom client and server messages instead.
.. deprecated:: 0.0.85
The RTVI message, append-to-context, has been deprecated. Use send-text
or custom client and server messages instead.
"""
role: Literal["user", "assistant"] | str
@@ -839,6 +842,36 @@ class RTVIServerMessage(BaseModel):
data: Any
class RTVIAudioLevelMessageData(BaseModel):
"""Data format for sending audio levels."""
value: float
class RTVIUserAudioLevelMessage(BaseModel):
"""Message indicating user audio level."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-audio-level"] = "user-audio-level"
data: RTVIAudioLevelMessageData
class RTVIBotAudioLevelMessage(BaseModel):
"""Message indicating bot audio level."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-audio-level"] = "bot-audio-level"
data: RTVIAudioLevelMessageData
class RTVISystemLogMessage(BaseModel):
"""Message including a system log."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["system-log"] = "system-log"
data: RTVITextMessageData
@dataclass
class RTVIServerMessageFrame(SystemFrame):
"""A frame for sending server messages to the client.
@@ -858,25 +891,36 @@ class RTVIServerMessageFrame(SystemFrame):
class RTVIObserverParams:
"""Parameters for configuring RTVI Observer behavior.
.. deprecated:: 0.0.87
Parameter `errors_enabled` is deprecated. Error messages are always enabled.
Parameters:
bot_llm_enabled: Indicates if the bot's LLM messages should be sent.
bot_tts_enabled: Indicates if the bot's TTS messages should be sent.
bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent.
bot_audio_level_enabled: Indicates if bot's audio level messages should be sent.
user_llm_enabled: Indicates if the user's LLM input messages should be sent.
user_speaking_enabled: Indicates if the user's started/stopped speaking messages should be sent.
user_transcription_enabled: Indicates if user's transcription messages should be sent.
user_audio_level_enabled: Indicates if user's audio level messages should be sent.
metrics_enabled: Indicates if metrics messages should be sent.
errors_enabled: Indicates if errors messages should be sent.
system_logs_enabled: Indicates if system logs should be sent.
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
audio_level_period_secs: How often audio levels should be sent if enabled.
"""
bot_llm_enabled: bool = True
bot_tts_enabled: bool = True
bot_speaking_enabled: bool = True
bot_audio_level_enabled: bool = False
user_llm_enabled: bool = True
user_speaking_enabled: bool = True
user_transcription_enabled: bool = True
user_audio_level_enabled: bool = False
metrics_enabled: bool = True
system_logs_enabled: bool = False
errors_enabled: bool = True
audio_level_period_secs: float = 0.15
class RTVIObserver(BaseObserver):
@@ -892,7 +936,11 @@ class RTVIObserver(BaseObserver):
"""
def __init__(
self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs
self,
rtvi: Optional["RTVIProcessor"] = None,
*,
params: Optional[RTVIObserverParams] = None,
**kwargs,
):
"""Initialize the RTVI observer.
@@ -904,9 +952,50 @@ class RTVIObserver(BaseObserver):
super().__init__(**kwargs)
self._rtvi = rtvi
self._params = params or RTVIObserverParams()
self._bot_transcription = ""
self._frames_seen = set()
rtvi.set_errors_enabled(self._params.errors_enabled)
self._bot_transcription = ""
self._last_user_audio_level = 0
self._last_bot_audio_level = 0
if self._params.system_logs_enabled:
self._system_logger_id = logger.add(self._logger_sink)
if self._params.errors_enabled:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter `errors_enabled` is deprecated. Error messages are always enabled.",
DeprecationWarning,
)
async def _logger_sink(self, message):
"""Logger sink so we cna send system logs to RTVI clients."""
message = RTVISystemLogMessage(data=RTVITextMessageData(text=message))
await self.send_rtvi_message(message)
async def cleanup(self):
"""Cleanup RTVI observer resources."""
await super().cleanup()
if self._params.system_logs_enabled:
logger.remove(self._system_logger_id)
async def send_rtvi_message(self, model: BaseModel, exclude_none: bool = True):
"""Send an RTVI message.
By default, we push a transport frame. But this function can be
overriden by subclass to send RTVI messages in different ways.
Args:
model: The message to send.
exclude_none: Whether to exclude None values from the model dump.
"""
if self._rtvi:
await self._rtvi.push_transport_message(model, exclude_none)
async def on_push_frame(self, data: FramePushed):
"""Process a frame being pushed through the pipeline.
@@ -948,52 +1037,58 @@ class RTVIObserver(BaseObserver):
):
await self._handle_context(frame)
elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled:
await self.push_transport_message_urgent(RTVIBotLLMStartedMessage())
await self.send_rtvi_message(RTVIBotLLMStartedMessage())
elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled:
await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage())
await self.send_rtvi_message(RTVIBotLLMStoppedMessage())
elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled:
await self._handle_llm_text_frame(frame)
elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled:
await self.push_transport_message_urgent(RTVIBotTTSStartedMessage())
await self.send_rtvi_message(RTVIBotTTSStartedMessage())
elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled:
await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage())
await self.send_rtvi_message(RTVIBotTTSStoppedMessage())
elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled:
if isinstance(src, BaseOutputTransport):
message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text))
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
else:
mark_as_seen = False
elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled:
await self._handle_metrics(frame)
elif isinstance(frame, RTVIServerMessageFrame):
message = RTVIServerMessage(data=frame.data)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
elif isinstance(frame, RTVIServerResponseFrame):
if frame.error is not None:
await self._send_error_response(frame)
else:
await self._send_server_response(frame)
elif isinstance(frame, InputAudioRawFrame) and self._params.user_audio_level_enabled:
curr_time = time.time()
diff_time = curr_time - self._last_user_audio_level
if diff_time > self._params.audio_level_period_secs:
level = calculate_audio_volume(frame.audio, frame.sample_rate)
message = RTVIUserAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
await self.send_rtvi_message(message)
self._last_user_audio_level = curr_time
elif isinstance(frame, TTSAudioRawFrame) and self._params.bot_audio_level_enabled:
curr_time = time.time()
diff_time = curr_time - self._last_bot_audio_level
if diff_time > self._params.audio_level_period_secs:
level = calculate_audio_volume(frame.audio, frame.sample_rate)
message = RTVIBotAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
await self.send_rtvi_message(message)
self._last_bot_audio_level = curr_time
if mark_as_seen:
self._frames_seen.add(frame.id)
async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True):
"""Push an urgent transport message to the RTVI processor.
Args:
model: The message model to send.
exclude_none: Whether to exclude None values from the model dump.
"""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self._rtvi.push_frame(frame)
async def _push_bot_transcription(self):
"""Push accumulated bot transcription as a message."""
if len(self._bot_transcription) > 0:
message = RTVIBotTranscriptionMessage(
data=RTVITextMessageData(text=self._bot_transcription)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
self._bot_transcription = ""
async def _handle_interruptions(self, frame: Frame):
@@ -1005,7 +1100,7 @@ class RTVIObserver(BaseObserver):
message = RTVIUserStoppedSpeakingMessage()
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_bot_speaking(self, frame: Frame):
"""Handle bot speaking event frames."""
@@ -1016,12 +1111,12 @@ class RTVIObserver(BaseObserver):
message = RTVIBotStoppedSpeakingMessage()
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_llm_text_frame(self, frame: LLMTextFrame):
"""Handle LLM text output frames."""
message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text))
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
self._bot_transcription += frame.text
if match_endofsentence(self._bot_transcription):
@@ -1044,7 +1139,7 @@ class RTVIObserver(BaseObserver):
)
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_context(self, frame: OpenAILLMContextFrame | LLMContextFrame):
"""Process LLM context frames to extract user messages for the RTVI client."""
@@ -1064,7 +1159,7 @@ class RTVIObserver(BaseObserver):
text = "".join(part.text for part in message.parts if hasattr(part, "text"))
if text:
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
await self.push_transport_message_urgent(rtvi_message)
await self.send_rtvi_message(rtvi_message)
# Handle OpenAI format (original implementation)
elif isinstance(message, dict):
@@ -1075,7 +1170,7 @@ class RTVIObserver(BaseObserver):
else:
text = content
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
await self.push_transport_message_urgent(rtvi_message)
await self.send_rtvi_message(rtvi_message)
except Exception as e:
logger.warning(f"Caught an error while trying to handle context: {e}")
@@ -1102,7 +1197,7 @@ class RTVIObserver(BaseObserver):
metrics["characters"].append(d.model_dump(exclude_none=True))
message = RTVIMetricsMessage(data=metrics)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _send_server_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
@@ -1110,7 +1205,7 @@ class RTVIObserver(BaseObserver):
id=str(frame.client_msg.msg_id),
data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data),
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _send_error_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
@@ -1118,7 +1213,7 @@ class RTVIObserver(BaseObserver):
message = RTVIErrorResponse(
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
class RTVIProcessor(FrameProcessor):
@@ -1152,7 +1247,6 @@ class RTVIProcessor(FrameProcessor):
# Default to 0.3.0 which is the last version before actually having a
# "client-version".
self._client_version = [0, 3, 0]
self._errors_enabled = True
self._skip_tts: bool = False # Keep in sync with llm_service.py
self._registered_actions: Dict[str, RTVIAction] = {}
@@ -1222,14 +1316,6 @@ class RTVIProcessor(FrameProcessor):
await self._update_config(self._config, False)
await self._send_bot_ready()
def set_errors_enabled(self, enabled: bool):
"""Enable or disable error message sending.
Args:
enabled: Whether to send error messages.
"""
self._errors_enabled = enabled
async def interrupt_bot(self):
"""Send a bot interruption frame upstream."""
await self.push_interruption_task_frame_and_wait()
@@ -1258,6 +1344,11 @@ class RTVIProcessor(FrameProcessor):
"""
await self._send_error_frame(ErrorFrame(error=error))
async def push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self.push_frame(frame)
async def handle_message(self, message: RTVIMessage):
"""Handle an incoming RTVI message.
@@ -1278,7 +1369,7 @@ class RTVIProcessor(FrameProcessor):
args=params.arguments,
)
message = RTVILLMFunctionCallMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)
await self.push_transport_message(message, exclude_none=False)
async def handle_function_call_start(
self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext
@@ -1305,7 +1396,7 @@ class RTVIProcessor(FrameProcessor):
fn = RTVILLMFunctionCallStartMessageData(function_name=function_name)
message = RTVILLMFunctionCallStartMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)
await self.push_transport_message(message, exclude_none=False)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames through the RTVI processor.
@@ -1377,11 +1468,6 @@ class RTVIProcessor(FrameProcessor):
await self.cancel_task(self._message_task)
self._message_task = None
async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self.push_frame(frame)
async def _action_task_handler(self):
"""Handle incoming action frames."""
while True:
@@ -1518,7 +1604,7 @@ class RTVIProcessor(FrameProcessor):
services = list(self._registered_services.values())
message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _handle_describe_actions(self, request_id: str):
"""Handle a describe-actions request."""
@@ -1533,7 +1619,7 @@ class RTVIProcessor(FrameProcessor):
actions = list(self._registered_actions.values())
message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _handle_get_config(self, request_id: str):
"""Handle a get-config request."""
@@ -1547,7 +1633,7 @@ class RTVIProcessor(FrameProcessor):
)
message = RTVIConfigResponse(id=request_id, data=self._config)
await self._push_transport_message(message)
await self.push_transport_message(message)
def _update_config_option(self, service: str, config: RTVIServiceOptionConfig):
"""Update a specific configuration option."""
@@ -1672,7 +1758,7 @@ class RTVIProcessor(FrameProcessor):
# action responses (such as webhooks) don't set a request_id
if request_id:
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_bot_ready(self):
"""Send the bot-ready message to the client."""
@@ -1683,23 +1769,21 @@ class RTVIProcessor(FrameProcessor):
id=self._client_ready_id,
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config),
)
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse):
"""Send a message or response to the client."""
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_error_frame(self, frame: ErrorFrame):
"""Send an error frame as an RTVI error message."""
if self._errors_enabled:
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
await self._push_transport_message(message)
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
await self.push_transport_message(message)
async def _send_error_response(self, id: str, error: str):
"""Send an error response message."""
if self._errors_enabled:
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
await self._push_transport_message(message)
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
await self.push_transport_message(message)
def _action_id(self, service: str, action: str) -> str:
"""Generate an action ID from service and action names."""

View File

@@ -151,7 +151,7 @@ class AnthropicLLMService(LLMService):
self,
*,
api_key: str,
model: str = "claude-sonnet-4-20250514",
model: str = "claude-sonnet-4-5-20250929",
params: Optional[InputParams] = None,
client=None,
retry_timeout_secs: Optional[float] = 5.0,
@@ -162,7 +162,7 @@ class AnthropicLLMService(LLMService):
Args:
api_key: Anthropic API key for authentication.
model: Model name to use. Defaults to "claude-sonnet-4-20250514".
model: Model name to use. Defaults to "claude-sonnet-4-5-20250929".
params: Optional model parameters for inference.
client: Optional custom Anthropic client instance.
retry_timeout_secs: Request timeout in seconds for retry logic.

View File

@@ -429,7 +429,7 @@ class AWSNovaSonicLLMService(LLMService):
await self._finish_connecting_if_context_available()
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._disconnect()
await self._disconnect()
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to

View File

@@ -110,12 +110,32 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
.. deprecated:: 0.0.87
`DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a future version.
Create your own custom frame and use a custom processor to handle it or use, for example,
`on_after_push_frame` event instead in the output transport.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"DailyUpdateRemoteParticipantsFrame is deprecated and will be removed in a future version."
"Instead, create your own custom frame and handle it in the "
'`@transport.output().event_handler("on_after_push_frame")` event handler or a '
"custom processor.",
DeprecationWarning,
stacklevel=2,
)
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.

View File

@@ -12,14 +12,12 @@ from dotenv import load_dotenv
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import LLMService
from pipecat.services.llm_service import FunctionCallParams, LLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.tests.utils import run_test
@@ -48,8 +46,13 @@ def standard_tools() -> ToolsSchema:
async def _test_llm_function_calling(llm: LLMService):
# Create an AsyncMock for the function
mock_fetch_weather = AsyncMock()
# Create a mock weather function
call_count = 0
async def mock_fetch_weather(params: FunctionCallParams):
nonlocal call_count
call_count += 1
pass
llm.register_function(None, mock_fetch_weather)
@@ -60,21 +63,19 @@ async def _test_llm_function_calling(llm: LLMService):
},
{"role": "user", "content": " How is the weather today in San Francisco, California?"},
]
context = OpenAILLMContext(messages, standard_tools())
# This is done by default inside the create_context_aggregator
context.set_llm_adapter(llm.get_llm_adapter())
context = LLMContext(messages, standard_tools())
pipeline = Pipeline([llm])
frames_to_send = [OpenAILLMContextFrame(context)]
frames_to_send = [LLMContextFrame(context)]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=None,
)
# Assert that the mock function was called
mock_fetch_weather.assert_called_once()
# Assert that the weather function was called once
assert call_count == 1
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set")

View File

@@ -10,24 +10,21 @@ from langchain.prompts import ChatPromptTemplate
from langchain_core.language_models import FakeStreamingListLLM
from pipecat.frames.frames import (
LLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
OpenAILLMContextAssistantTimestampFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.tests.utils import SleepFrame, run_test
@@ -67,13 +64,14 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
proc = LangchainProcessor(chain=chain)
self.mock_proc = self.MockProcessor("token_collector")
context = OpenAILLMContext()
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=LLMAssistantAggregatorParams(expect_stripped_words=False)
)
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
pipeline = Pipeline(
[context_aggregator.user(), proc, self.mock_proc, context_aggregator.assistant()]
)
frames_to_send = [
UserStartedSpeakingFrame(),
@@ -84,8 +82,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
OpenAILLMContextFrame,
OpenAILLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMContextAssistantTimestampFrame,
]
await run_test(
pipeline,
@@ -94,4 +92,6 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
)
self.assertEqual("".join(self.mock_proc.token), self.expected_response)
self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
self.assertEqual(
context_aggregator.assistant().messages[-1]["content"], self.expected_response
)