Compare commits

...

32 Commits

Author SHA1 Message Date
James Hush
a39f8b4882 Remove extra code 2025-10-01 14:48:12 +08:00
James Hush
76fc36f621 Pre recorded message example 2025-10-01 14:43:36 +08:00
James Hush
c0878c5e09 Save progress 2025-10-01 13:58:32 +08:00
James Hush
c6a1013051 Pre-recorded message example 2025-10-01 13:52:58 +08:00
Aleix Conchillo Flaqué
feae3b6d2d Merge pull request #2742 from pipecat-ai/aleix/deprecate-daily-update-remote-participants-frame
DailyTransport: deprecated DailyUpdateRemoteParticipantsFrame
2025-09-30 16:27:34 -07:00
Aleix Conchillo Flaqué
92d3be8975 DailyTransport: deprecated DailyUpdateRemoteParticipantsFrame 2025-09-30 16:26:48 -07:00
Aleix Conchillo Flaqué
0f53e1db2c Merge pull request #2759 from pipecat-ai/aleix/dont-cancel-if-finished
PipelineTask: avoid cancellation if application is finished
2025-09-30 16:21:16 -07:00
Aleix Conchillo Flaqué
d398e8cc10 Merge pull request #2761 from pipecat-ai/aleix/rtvi-tail-updates
RTVI updates: audio levels and system logs
2025-09-30 13:55:17 -07:00
Aleix Conchillo Flaqué
e5f263d380 update CHANGELOG 2025-09-30 13:51:35 -07:00
Aleix Conchillo Flaqué
3a4c303c54 RTVIParams: add errors_enabled deprecation warnings 2025-09-30 13:49:51 -07:00
Mark Backman
54a1ef47d0 Merge pull request #2758 from pipecat-ai/mb/claude-sonnet-4.5
Update AnthropicLLMService to use claude-sonnet-4-5-20250929
2025-09-30 16:42:47 -04:00
Aleix Conchillo Flaqué
149ffa4f3c RTVIObserver: add support system logs 2025-09-30 13:42:40 -07:00
Aleix Conchillo Flaqué
e5465034d9 RTVIObserver: add support for user/bot audio levels 2025-09-30 13:41:26 -07:00
Aleix Conchillo Flaqué
568c7c782d rtvi: allow None RTVIProcessor and rename to send_rtvi_message() 2025-09-30 13:35:27 -07:00
Aleix Conchillo Flaqué
9851334221 rtvi: deprecate errors_enabled and always send errors 2025-09-30 13:31:30 -07:00
Aleix Conchillo Flaqué
e79c4fc99d PipelineTask: avoid cancellation if application is finished 2025-09-30 13:18:25 -07:00
Aleix Conchillo Flaqué
55c321f4ff Merge pull request #2751 from pipecat-ai/aleix/nova-sonic-disconnect-fix
AWSNovaSonicLLMService: add missing await
2025-09-30 13:12:22 -07:00
kompfner
a14a53a005 Merge pull request #2735 from pipecat-ai/pk/remove-openaillmcontext-usage
Remove remaining usage of `OpenAILLMContext` throughout the codebase …
2025-09-30 10:09:25 -04:00
Mark Backman
a71f937e8f Update AnthropicLLMService to use claude-sonnet-4-5-20250929 2025-09-30 08:49:30 -04:00
Mark Backman
d0178edad0 Merge pull request #2753 from pipecat-ai/mb/quickstart-0.0.86
Quickstart: Update to 0.0.86, removing pytorch requirements
2025-09-29 09:43:33 -04:00
Mark Backman
795c5e55d9 Quickstart: Update to 0.0.86, removing pytorch requirements 2025-09-27 08:30:37 -04:00
Aleix Conchillo Flaqué
8f8d8ae0d8 AWSNovaSonicLLMService: add missing await 2025-09-26 15:58:05 -07:00
Vanessa Pyne
741f192d04 Merge pull request #2096 from pipecat-ai/vp-mcp-ex-nit
mcp examples: check for env vars needed for examples
2025-09-26 10:21:22 -05:00
Aleix Conchillo Flaqué
ee00ee5c57 Merge pull request #2744 from pipecat-ai/aleix/vad-analyzer-thread-executor
BaseInputTransport: create VAD thread in VADAnalyzer
2025-09-25 13:43:34 -07:00
Aleix Conchillo Flaqué
f53fd880dc BaseInputTransport: create VAD thread in VADAnalyzer
We move the thread creation to the VADAnalyzer instead of the input
transport. This can potentially be useful if we need to analyze multiple audio
streams.
2025-09-25 13:41:20 -07:00
Aleix Conchillo Flaqué
de3461e4cc Merge pull request #2743 from pipecat-ai/aleix/turn-analyzer-fixes
turn analyzer fixes
2025-09-25 13:40:43 -07:00
Aleix Conchillo Flaqué
7bafc3a1bb BaseSmartTurn: process speech in a separate thread 2025-09-25 13:37:28 -07:00
Aleix Conchillo Flaqué
22ef61fe8d BaseTurnAnalyzer: add BaseTurnParams base class for parameters 2025-09-25 13:37:09 -07:00
Aleix Conchillo Flaqué
7078fb53bd Merge pull request #2738 from pipecat-ai/aleix/openai-cached-tokens-metrics
BaseOpenAILLMService: include cached tokens to metrics frame
2025-09-25 13:36:03 -07:00
Aleix Conchillo Flaqué
33447ad6f2 BaseOpenAILLMService: include cached tokens to metrics frame 2025-09-24 19:32:16 -07:00
Paul Kompfner
6faa50ae5b Remove remaining usage of OpenAILLMContext throughout the codebase in favor of LLMContext, except for:
- Usage in classes that are already deprecated
- Usage related to realtime LLMs, which don't yet support `LLMContext`
- Usage in (soon-to-be-deprecated) code paths related to `OpenAILLMContext` itself and associated machinery
2025-09-24 16:35:03 -04:00
vipyne
889dc19a27 mcp examples: check for env vars needed for examples 2025-09-19 12:09:50 -05:00
27 changed files with 366 additions and 139 deletions

View File

@@ -5,6 +5,33 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added RTVI messages for user/bot audio levels and system logs.
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
### Changed
- Updated the default model for `AnthropicLLMService` to
`claude-sonnet-4-5-20250929`.
### Deprecated
- `DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a
future version. Instead, create your own custom frame and handle it in the
`@transport.output().event_handler("on_after_push_frame")` event handler or a
custom processor.
## Fixed
- Fixed a `PipelineTask` issue that could prevent the application to exit if
`task.cancel()` was called when the task was already finished.
- Fixed an issue where local SmartTurn was not being ran in a separate thread.
## [0.0.86] - 2025-09-24
### Added

View File

@@ -5,6 +5,7 @@
#
import os
import wave
from dotenv import load_dotenv
from loguru import logger
@@ -13,7 +14,14 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
LLMTextFrame,
OutputAudioRawFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -103,7 +111,27 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
audio_file_path = os.path.join(os.path.dirname(__file__), "assets", "pre-recorded.wav")
with wave.open(audio_file_path, "rb") as wav_file:
llm_text_frame = TextFrame(text="This is a pre-recorded message.")
llm_text_frame.skip_tts = True
audio_data = wav_file.readframes(wav_file.getnframes())
output_audio_raw_frame = OutputAudioRawFrame(
audio=audio_data, sample_rate=44100, num_channels=1
)
await task.queue_frames(
[
LLMRunFrame(),
LLMFullResponseStartFrame(),
llm_text_frame,
output_audio_raw_frame,
LLMFullResponseEndFrame(),
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -206,6 +206,14 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("NASA_API_KEY"):
logger.error(
f"Please set NASA_API_KEY environment variable for this example. See https://api.nasa.gov"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -141,6 +141,14 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("MCP_RUN_SSE_URL"):
logger.error(
f"Please set MCP_RUN_SSE_URL environment variable for this example. See https://mcp.run"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -219,6 +219,14 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("NASA_API_KEY") or not os.getenv("MCP_RUN_SSE_URL"):
logger.error(
f"Please set NASA_API_KEY and MCP_RUN_SSE_URL environment variables. See https://api.nasa.gov and https://mcp.run"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -145,6 +145,14 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.error(
f"Please set GITHUB_PERSONAL_ACCESS_TOKEN environment variable for this example."
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

Binary file not shown.

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
description = "Quickstart example for building voice AI bots with Pipecat"
requires-python = ">=3.10"
dependencies = [
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.85",
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.86",
"pipecatcloud>=0.2.4"
]

View File

@@ -34,7 +34,8 @@ from pipecat.frames.frames import EndTaskFrame, LLMRunFrame, OutputImageRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
@@ -283,8 +284,8 @@ async def run_eval_pipeline(
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
audio_buffer = AudioBufferProcessor()

View File

@@ -14,6 +14,8 @@ from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Tuple
from pydantic import BaseModel
from pipecat.metrics.metrics import MetricsData
@@ -29,6 +31,12 @@ class EndOfTurnState(Enum):
INCOMPLETE = 2
class BaseTurnParams(BaseModel):
"""Base class for turn analyzer parameters."""
pass
class BaseTurnAnalyzer(ABC):
"""Abstract base class for analyzing user end of turn.
@@ -78,7 +86,7 @@ class BaseTurnAnalyzer(ABC):
@property
@abstractmethod
def params(self):
def params(self) -> BaseTurnParams:
"""Get the current turn analyzer parameters.
Returns:

View File

@@ -11,15 +11,17 @@ machine learning models to determine when a user has finished speaking, going
beyond simple silence-based detection.
"""
import asyncio
import time
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, Optional, Tuple
import numpy as np
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, BaseTurnParams, EndOfTurnState
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
# Default timing parameters
@@ -29,7 +31,7 @@ MAX_DURATION_SECONDS = 8 # Max allowed segment duration
USE_ONLY_LAST_VAD_SEGMENT = True
class SmartTurnParams(BaseModel):
class SmartTurnParams(BaseTurnParams):
"""Configuration parameters for smart turn analysis.
Parameters:
@@ -77,6 +79,9 @@ class BaseSmartTurn(BaseTurnAnalyzer):
self._speech_triggered = False
self._silence_ms = 0
self._speech_start_time = 0
# Thread executor that will run the model. We only need one thread per
# analyzer because one analyzer just handles one audio stream.
self._executor = ThreadPoolExecutor(max_workers=1)
@property
def speech_triggered(self) -> bool:
@@ -151,7 +156,10 @@ class BaseSmartTurn(BaseTurnAnalyzer):
Tuple containing the end-of-turn state and optional metrics data
from the ML model analysis.
"""
state, result = await self._process_speech_segment(self._audio_buffer)
loop = asyncio.get_running_loop()
state, result = await loop.run_in_executor(
self._executor, self._process_speech_segment, self._audio_buffer
)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
@@ -169,9 +177,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
self._speech_start_time = 0
self._silence_ms = 0
async def _process_speech_segment(
self, audio_buffer
) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
"""Process accumulated audio segment using ML model."""
state = EndOfTurnState.INCOMPLETE
@@ -203,7 +209,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
if len(segment_audio) > 0:
start_time = time.perf_counter()
try:
result = await self._predict_endpoint(segment_audio)
result = self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE
if result["prediction"] == 1
@@ -249,6 +255,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
return state, result_data
@abstractmethod
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using ML model from audio data."""
pass

View File

@@ -104,11 +104,15 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using remote HTTP ML service."""
try:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)
loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(
self._send_raw_request(serialized_array), loop
)
return future.result()
except Exception as e:
logger.error(f"Smart turn prediction failed: {str(e)}")
# Return an incomplete prediction when a failure occurs

View File

@@ -64,7 +64,7 @@ class LocalSmartTurnAnalyzer(BaseSmartTurn):
self._turn_model.eval()
logger.debug("Loaded Local Smart Turn")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local PyTorch model."""
inputs = self._turn_processor(
audio_array,

View File

@@ -73,7 +73,7 @@ class LocalSmartTurnAnalyzerV2(BaseSmartTurn):
self._turn_model.eval()
logger.debug("Loaded Local Smart Turn v2")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local PyTorch model."""
inputs = self._turn_processor(
audio_array,

View File

@@ -77,7 +77,7 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
logger.debug("Loaded Local Smart Turn v3")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local ONNX model."""
def truncate_audio_to_last_n_seconds(audio_array, n_seconds=8, sample_rate=16000):

View File

@@ -11,7 +11,9 @@ data structures for voice activity detection in audio streams. Includes state
management, parameter configuration, and audio analysis framework.
"""
import asyncio
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Optional
@@ -84,6 +86,10 @@ class VADAnalyzer(ABC):
self._smoothing_factor = 0.2
self._prev_volume = 0
# Thread executor that will run the model. We only need one thread per
# analyzer because one analyzer just handles one audio stream.
self._executor = ThreadPoolExecutor(max_workers=1)
@property
def sample_rate(self) -> int:
"""Get the current sample rate.
@@ -165,7 +171,7 @@ class VADAnalyzer(ABC):
volume = calculate_audio_volume(audio, self.sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
def analyze_audio(self, buffer) -> VADState:
async def analyze_audio(self, buffer: bytes) -> VADState:
"""Analyze audio buffer and return current VAD state.
Processes incoming audio data, maintains internal state, and determines
@@ -177,6 +183,12 @@ class VADAnalyzer(ABC):
Returns:
Current VAD state after processing the buffer.
"""
loop = asyncio.get_running_loop()
state = await loop.run_in_executor(self._executor, self._run_analyzer, buffer)
return state
def _run_analyzer(self, buffer: bytes) -> VADState:
"""Analyze audio buffer and return current VAD state."""
self._vad_buffer += buffer
num_required_bytes = self._vad_frames_num_bytes

View File

@@ -13,8 +13,7 @@ including heartbeats, idle detection, and observer integration.
import asyncio
import time
from collections import deque
from typing import Any, AsyncIterable, Deque, Dict, Iterable, List, Optional, Tuple, Type
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
from loguru import logger
from pydantic import BaseModel, ConfigDict, Field
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
HeartbeatFrame,
InputAudioRawFrame,
InterruptionFrame,
InterruptionTaskFrame,
MetricsFrame,
@@ -395,7 +393,8 @@ class PipelineTask(BasePipelineTask):
Cancels all running tasks and stops frame processing without
waiting for completion.
"""
await self._cancel()
if not self._finished:
await self._cancel()
async def run(self, params: PipelineTaskParams):
"""Start and manage the pipeline execution until completion or cancellation.

View File

@@ -13,6 +13,7 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
import asyncio
import json
from abc import abstractmethod
from typing import Any, Dict, List, Literal, Optional, Set
from loguru import logger
@@ -169,6 +170,11 @@ class LLMContextAggregator(FrameProcessor):
"""Reset the aggregation state."""
self._aggregation = ""
@abstractmethod
async def push_aggregation(self):
"""Push the current aggregation downstream."""
pass
class LLMUserAggregator(LLMContextAggregator):
"""User LLM aggregator that processes speech-to-text transcriptions.
@@ -301,7 +307,7 @@ class LLMUserAggregator(LLMContextAggregator):
frame = LLMContextFrame(self._context)
await self.push_frame(frame)
async def _push_aggregation(self):
async def push_aggregation(self):
"""Push the current aggregation based on interruption strategies and conditions."""
if len(self._aggregation) > 0:
if self.interruption_strategies and self._bot_speaking:
@@ -392,7 +398,7 @@ class LLMUserAggregator(LLMContextAggregator):
# pushing the aggregation as we will probably get a final transcription.
if len(self._aggregation) > 0:
if not self._seen_interim_results:
await self._push_aggregation()
await self.push_aggregation()
# Handles the case where both the user and the bot are not speaking,
# and the bot was previously speaking before the user interruption.
# So in this case we are resetting the aggregation timer
@@ -471,7 +477,7 @@ class LLMUserAggregator(LLMContextAggregator):
await self._maybe_emulate_user_speaking()
except asyncio.TimeoutError:
if not self._user_speaking:
await self._push_aggregation()
await self.push_aggregation()
# If we are emulating VAD we still need to send the user stopped
# speaking frame.
@@ -607,12 +613,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
await self._handle_user_image_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._push_aggregation()
await self.push_aggregation()
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _push_aggregation(self):
async def push_aggregation(self):
"""Push the current assistant aggregation with timestamp."""
if not self._aggregation:
return
@@ -644,7 +650,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_interruptions(self, frame: InterruptionFrame):
await self._push_aggregation()
await self.push_aggregation()
self._started = 0
await self.reset()
@@ -778,7 +784,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
text=frame.request.context,
)
await self._push_aggregation()
await self.push_aggregation()
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
@@ -786,7 +792,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
self._started -= 1
await self._push_aggregation()
await self.push_aggregation()
async def _handle_text(self, frame: TextFrame):
if not self._started:

View File

@@ -12,14 +12,14 @@ in conversational pipelines.
"""
from pipecat.frames.frames import TextFrame
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMUserAggregator
class UserResponseAggregator(LLMUserContextAggregator):
class UserResponseAggregator(LLMUserAggregator):
"""Aggregates user responses into TextFrame objects.
This aggregator extends LLMUserContextAggregator to specifically handle
This aggregator extends LLMUserAggregator to specifically handle
user input by collecting text responses and outputting them as TextFrame
objects when the aggregation is complete.
"""
@@ -28,9 +28,9 @@ class UserResponseAggregator(LLMUserContextAggregator):
"""Initialize the user response aggregator.
Args:
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
**kwargs: Additional arguments passed to parent LLMUserAggregator.
"""
super().__init__(context=OpenAILLMContext(), **kwargs)
super().__init__(context=LLMContext(), **kwargs)
async def push_aggregation(self):
"""Push the aggregated user response as a TextFrame.

View File

@@ -13,6 +13,7 @@ and frame observation for the RTVI protocol.
import asyncio
import base64
import time
from dataclasses import dataclass
from typing import (
Any,
@@ -29,6 +30,7 @@ from typing import (
from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
from pipecat.audio.utils import calculate_audio_volume
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
@@ -52,6 +54,7 @@ from pipecat.frames.frames import (
SystemFrame,
TranscriptionFrame,
TransportMessageUrgentFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
@@ -613,9 +616,9 @@ class RTVIAppendToContextData(BaseModel):
Contains the role, content, and whether to run the message immediately.
.. deprecated:: 0.0.85
The RTVI message, append-to-context, has been deprecated. Use send-text
or custom client and server messages instead.
.. deprecated:: 0.0.85
The RTVI message, append-to-context, has been deprecated. Use send-text
or custom client and server messages instead.
"""
role: Literal["user", "assistant"] | str
@@ -839,6 +842,36 @@ class RTVIServerMessage(BaseModel):
data: Any
class RTVIAudioLevelMessageData(BaseModel):
"""Data format for sending audio levels."""
value: float
class RTVIUserAudioLevelMessage(BaseModel):
"""Message indicating user audio level."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-audio-level"] = "user-audio-level"
data: RTVIAudioLevelMessageData
class RTVIBotAudioLevelMessage(BaseModel):
"""Message indicating bot audio level."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-audio-level"] = "bot-audio-level"
data: RTVIAudioLevelMessageData
class RTVISystemLogMessage(BaseModel):
"""Message including a system log."""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["system-log"] = "system-log"
data: RTVITextMessageData
@dataclass
class RTVIServerMessageFrame(SystemFrame):
"""A frame for sending server messages to the client.
@@ -858,25 +891,36 @@ class RTVIServerMessageFrame(SystemFrame):
class RTVIObserverParams:
"""Parameters for configuring RTVI Observer behavior.
.. deprecated:: 0.0.87
Parameter `errors_enabled` is deprecated. Error messages are always enabled.
Parameters:
bot_llm_enabled: Indicates if the bot's LLM messages should be sent.
bot_tts_enabled: Indicates if the bot's TTS messages should be sent.
bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent.
bot_audio_level_enabled: Indicates if bot's audio level messages should be sent.
user_llm_enabled: Indicates if the user's LLM input messages should be sent.
user_speaking_enabled: Indicates if the user's started/stopped speaking messages should be sent.
user_transcription_enabled: Indicates if user's transcription messages should be sent.
user_audio_level_enabled: Indicates if user's audio level messages should be sent.
metrics_enabled: Indicates if metrics messages should be sent.
errors_enabled: Indicates if errors messages should be sent.
system_logs_enabled: Indicates if system logs should be sent.
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
audio_level_period_secs: How often audio levels should be sent if enabled.
"""
bot_llm_enabled: bool = True
bot_tts_enabled: bool = True
bot_speaking_enabled: bool = True
bot_audio_level_enabled: bool = False
user_llm_enabled: bool = True
user_speaking_enabled: bool = True
user_transcription_enabled: bool = True
user_audio_level_enabled: bool = False
metrics_enabled: bool = True
system_logs_enabled: bool = False
errors_enabled: bool = True
audio_level_period_secs: float = 0.15
class RTVIObserver(BaseObserver):
@@ -892,7 +936,11 @@ class RTVIObserver(BaseObserver):
"""
def __init__(
self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs
self,
rtvi: Optional["RTVIProcessor"] = None,
*,
params: Optional[RTVIObserverParams] = None,
**kwargs,
):
"""Initialize the RTVI observer.
@@ -904,9 +952,50 @@ class RTVIObserver(BaseObserver):
super().__init__(**kwargs)
self._rtvi = rtvi
self._params = params or RTVIObserverParams()
self._bot_transcription = ""
self._frames_seen = set()
rtvi.set_errors_enabled(self._params.errors_enabled)
self._bot_transcription = ""
self._last_user_audio_level = 0
self._last_bot_audio_level = 0
if self._params.system_logs_enabled:
self._system_logger_id = logger.add(self._logger_sink)
if self._params.errors_enabled:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter `errors_enabled` is deprecated. Error messages are always enabled.",
DeprecationWarning,
)
async def _logger_sink(self, message):
"""Logger sink so we cna send system logs to RTVI clients."""
message = RTVISystemLogMessage(data=RTVITextMessageData(text=message))
await self.send_rtvi_message(message)
async def cleanup(self):
"""Cleanup RTVI observer resources."""
await super().cleanup()
if self._params.system_logs_enabled:
logger.remove(self._system_logger_id)
async def send_rtvi_message(self, model: BaseModel, exclude_none: bool = True):
"""Send an RTVI message.
By default, we push a transport frame. But this function can be
overriden by subclass to send RTVI messages in different ways.
Args:
model: The message to send.
exclude_none: Whether to exclude None values from the model dump.
"""
if self._rtvi:
await self._rtvi.push_transport_message(model, exclude_none)
async def on_push_frame(self, data: FramePushed):
"""Process a frame being pushed through the pipeline.
@@ -948,52 +1037,58 @@ class RTVIObserver(BaseObserver):
):
await self._handle_context(frame)
elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled:
await self.push_transport_message_urgent(RTVIBotLLMStartedMessage())
await self.send_rtvi_message(RTVIBotLLMStartedMessage())
elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled:
await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage())
await self.send_rtvi_message(RTVIBotLLMStoppedMessage())
elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled:
await self._handle_llm_text_frame(frame)
elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled:
await self.push_transport_message_urgent(RTVIBotTTSStartedMessage())
await self.send_rtvi_message(RTVIBotTTSStartedMessage())
elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled:
await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage())
await self.send_rtvi_message(RTVIBotTTSStoppedMessage())
elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled:
if isinstance(src, BaseOutputTransport):
message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text))
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
else:
mark_as_seen = False
elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled:
await self._handle_metrics(frame)
elif isinstance(frame, RTVIServerMessageFrame):
message = RTVIServerMessage(data=frame.data)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
elif isinstance(frame, RTVIServerResponseFrame):
if frame.error is not None:
await self._send_error_response(frame)
else:
await self._send_server_response(frame)
elif isinstance(frame, InputAudioRawFrame) and self._params.user_audio_level_enabled:
curr_time = time.time()
diff_time = curr_time - self._last_user_audio_level
if diff_time > self._params.audio_level_period_secs:
level = calculate_audio_volume(frame.audio, frame.sample_rate)
message = RTVIUserAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
await self.send_rtvi_message(message)
self._last_user_audio_level = curr_time
elif isinstance(frame, TTSAudioRawFrame) and self._params.bot_audio_level_enabled:
curr_time = time.time()
diff_time = curr_time - self._last_bot_audio_level
if diff_time > self._params.audio_level_period_secs:
level = calculate_audio_volume(frame.audio, frame.sample_rate)
message = RTVIBotAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
await self.send_rtvi_message(message)
self._last_bot_audio_level = curr_time
if mark_as_seen:
self._frames_seen.add(frame.id)
async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True):
"""Push an urgent transport message to the RTVI processor.
Args:
model: The message model to send.
exclude_none: Whether to exclude None values from the model dump.
"""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self._rtvi.push_frame(frame)
async def _push_bot_transcription(self):
"""Push accumulated bot transcription as a message."""
if len(self._bot_transcription) > 0:
message = RTVIBotTranscriptionMessage(
data=RTVITextMessageData(text=self._bot_transcription)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
self._bot_transcription = ""
async def _handle_interruptions(self, frame: Frame):
@@ -1005,7 +1100,7 @@ class RTVIObserver(BaseObserver):
message = RTVIUserStoppedSpeakingMessage()
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_bot_speaking(self, frame: Frame):
"""Handle bot speaking event frames."""
@@ -1016,12 +1111,12 @@ class RTVIObserver(BaseObserver):
message = RTVIBotStoppedSpeakingMessage()
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_llm_text_frame(self, frame: LLMTextFrame):
"""Handle LLM text output frames."""
message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text))
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
self._bot_transcription += frame.text
if match_endofsentence(self._bot_transcription):
@@ -1044,7 +1139,7 @@ class RTVIObserver(BaseObserver):
)
if message:
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _handle_context(self, frame: OpenAILLMContextFrame | LLMContextFrame):
"""Process LLM context frames to extract user messages for the RTVI client."""
@@ -1064,7 +1159,7 @@ class RTVIObserver(BaseObserver):
text = "".join(part.text for part in message.parts if hasattr(part, "text"))
if text:
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
await self.push_transport_message_urgent(rtvi_message)
await self.send_rtvi_message(rtvi_message)
# Handle OpenAI format (original implementation)
elif isinstance(message, dict):
@@ -1075,7 +1170,7 @@ class RTVIObserver(BaseObserver):
else:
text = content
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
await self.push_transport_message_urgent(rtvi_message)
await self.send_rtvi_message(rtvi_message)
except Exception as e:
logger.warning(f"Caught an error while trying to handle context: {e}")
@@ -1102,7 +1197,7 @@ class RTVIObserver(BaseObserver):
metrics["characters"].append(d.model_dump(exclude_none=True))
message = RTVIMetricsMessage(data=metrics)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _send_server_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
@@ -1110,7 +1205,7 @@ class RTVIObserver(BaseObserver):
id=str(frame.client_msg.msg_id),
data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data),
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
async def _send_error_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
@@ -1118,7 +1213,7 @@ class RTVIObserver(BaseObserver):
message = RTVIErrorResponse(
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
class RTVIProcessor(FrameProcessor):
@@ -1152,7 +1247,6 @@ class RTVIProcessor(FrameProcessor):
# Default to 0.3.0 which is the last version before actually having a
# "client-version".
self._client_version = [0, 3, 0]
self._errors_enabled = True
self._skip_tts: bool = False # Keep in sync with llm_service.py
self._registered_actions: Dict[str, RTVIAction] = {}
@@ -1222,14 +1316,6 @@ class RTVIProcessor(FrameProcessor):
await self._update_config(self._config, False)
await self._send_bot_ready()
def set_errors_enabled(self, enabled: bool):
"""Enable or disable error message sending.
Args:
enabled: Whether to send error messages.
"""
self._errors_enabled = enabled
async def interrupt_bot(self):
"""Send a bot interruption frame upstream."""
await self.push_interruption_task_frame_and_wait()
@@ -1258,6 +1344,11 @@ class RTVIProcessor(FrameProcessor):
"""
await self._send_error_frame(ErrorFrame(error=error))
async def push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self.push_frame(frame)
async def handle_message(self, message: RTVIMessage):
"""Handle an incoming RTVI message.
@@ -1278,7 +1369,7 @@ class RTVIProcessor(FrameProcessor):
args=params.arguments,
)
message = RTVILLMFunctionCallMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)
await self.push_transport_message(message, exclude_none=False)
async def handle_function_call_start(
self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext
@@ -1305,7 +1396,7 @@ class RTVIProcessor(FrameProcessor):
fn = RTVILLMFunctionCallStartMessageData(function_name=function_name)
message = RTVILLMFunctionCallStartMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)
await self.push_transport_message(message, exclude_none=False)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames through the RTVI processor.
@@ -1377,11 +1468,6 @@ class RTVIProcessor(FrameProcessor):
await self.cancel_task(self._message_task)
self._message_task = None
async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
await self.push_frame(frame)
async def _action_task_handler(self):
"""Handle incoming action frames."""
while True:
@@ -1518,7 +1604,7 @@ class RTVIProcessor(FrameProcessor):
services = list(self._registered_services.values())
message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _handle_describe_actions(self, request_id: str):
"""Handle a describe-actions request."""
@@ -1533,7 +1619,7 @@ class RTVIProcessor(FrameProcessor):
actions = list(self._registered_actions.values())
message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _handle_get_config(self, request_id: str):
"""Handle a get-config request."""
@@ -1547,7 +1633,7 @@ class RTVIProcessor(FrameProcessor):
)
message = RTVIConfigResponse(id=request_id, data=self._config)
await self._push_transport_message(message)
await self.push_transport_message(message)
def _update_config_option(self, service: str, config: RTVIServiceOptionConfig):
"""Update a specific configuration option."""
@@ -1672,7 +1758,7 @@ class RTVIProcessor(FrameProcessor):
# action responses (such as webhooks) don't set a request_id
if request_id:
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_bot_ready(self):
"""Send the bot-ready message to the client."""
@@ -1683,23 +1769,21 @@ class RTVIProcessor(FrameProcessor):
id=self._client_ready_id,
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config),
)
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse):
"""Send a message or response to the client."""
await self._push_transport_message(message)
await self.push_transport_message(message)
async def _send_error_frame(self, frame: ErrorFrame):
"""Send an error frame as an RTVI error message."""
if self._errors_enabled:
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
await self._push_transport_message(message)
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
await self.push_transport_message(message)
async def _send_error_response(self, id: str, error: str):
"""Send an error response message."""
if self._errors_enabled:
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
await self._push_transport_message(message)
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
await self.push_transport_message(message)
def _action_id(self, service: str, action: str) -> str:
"""Generate an action ID from service and action names."""

View File

@@ -151,7 +151,7 @@ class AnthropicLLMService(LLMService):
self,
*,
api_key: str,
model: str = "claude-sonnet-4-20250514",
model: str = "claude-sonnet-4-5-20250929",
params: Optional[InputParams] = None,
client=None,
retry_timeout_secs: Optional[float] = 5.0,
@@ -162,7 +162,7 @@ class AnthropicLLMService(LLMService):
Args:
api_key: Anthropic API key for authentication.
model: Model name to use. Defaults to "claude-sonnet-4-20250514".
model: Model name to use. Defaults to "claude-sonnet-4-5-20250929".
params: Optional model parameters for inference.
client: Optional custom Anthropic client instance.
retry_timeout_secs: Request timeout in seconds for retry logic.

View File

@@ -429,7 +429,7 @@ class AWSNovaSonicLLMService(LLMService):
await self._finish_connecting_if_context_available()
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._disconnect()
await self._disconnect()
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to

View File

@@ -337,10 +337,16 @@ class BaseOpenAILLMService(LLMService):
async for chunk in chunk_stream:
if chunk.usage:
cached_tokens = (
chunk.usage.prompt_tokens_details.cached_tokens
if chunk.usage.prompt_tokens_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
)
await self.start_llm_usage_metrics(tokens)

View File

@@ -11,7 +11,6 @@ input processing, including VAD, turn analysis, and interruption management.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from loguru import logger
@@ -79,10 +78,6 @@ class BaseInputTransport(FrameProcessor):
# Track user speaking state for interruption logic
self._user_speaking = False
# We read audio from a single queue one at a time and we then run VAD in
# a thread. Therefore, only one thread should be necessary.
self._executor = ThreadPoolExecutor(max_workers=1)
# Task to process incoming audio (VAD) and push audio frames downstream
# if passthrough is enabled.
self._audio_task = None
@@ -398,9 +393,7 @@ class BaseInputTransport(FrameProcessor):
"""Analyze audio frame for voice activity."""
state = VADState.QUIET
if self.vad_analyzer:
state = await self.get_event_loop().run_in_executor(
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
)
state = await self.vad_analyzer.analyze_audio(audio_frame.audio)
return state
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState) -> VADState:

View File

@@ -110,12 +110,32 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
.. deprecated:: 0.0.87
`DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a future version.
Create your own custom frame and use a custom processor to handle it or use, for example,
`on_after_push_frame` event instead in the output transport.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"DailyUpdateRemoteParticipantsFrame is deprecated and will be removed in a future version."
"Instead, create your own custom frame and handle it in the "
'`@transport.output().event_handler("on_after_push_frame")` event handler or a '
"custom processor.",
DeprecationWarning,
stacklevel=2,
)
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.

View File

@@ -12,14 +12,12 @@ from dotenv import load_dotenv
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import LLMService
from pipecat.services.llm_service import FunctionCallParams, LLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.tests.utils import run_test
@@ -48,8 +46,13 @@ def standard_tools() -> ToolsSchema:
async def _test_llm_function_calling(llm: LLMService):
# Create an AsyncMock for the function
mock_fetch_weather = AsyncMock()
# Create a mock weather function
call_count = 0
async def mock_fetch_weather(params: FunctionCallParams):
nonlocal call_count
call_count += 1
pass
llm.register_function(None, mock_fetch_weather)
@@ -60,21 +63,19 @@ async def _test_llm_function_calling(llm: LLMService):
},
{"role": "user", "content": " How is the weather today in San Francisco, California?"},
]
context = OpenAILLMContext(messages, standard_tools())
# This is done by default inside the create_context_aggregator
context.set_llm_adapter(llm.get_llm_adapter())
context = LLMContext(messages, standard_tools())
pipeline = Pipeline([llm])
frames_to_send = [OpenAILLMContextFrame(context)]
frames_to_send = [LLMContextFrame(context)]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=None,
)
# Assert that the mock function was called
mock_fetch_weather.assert_called_once()
# Assert that the weather function was called once
assert call_count == 1
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set")

View File

@@ -10,24 +10,21 @@ from langchain.prompts import ChatPromptTemplate
from langchain_core.language_models import FakeStreamingListLLM
from pipecat.frames.frames import (
LLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
OpenAILLMContextAssistantTimestampFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.tests.utils import SleepFrame, run_test
@@ -67,13 +64,14 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
proc = LangchainProcessor(chain=chain)
self.mock_proc = self.MockProcessor("token_collector")
context = OpenAILLMContext()
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=LLMAssistantAggregatorParams(expect_stripped_words=False)
)
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
pipeline = Pipeline(
[context_aggregator.user(), proc, self.mock_proc, context_aggregator.assistant()]
)
frames_to_send = [
UserStartedSpeakingFrame(),
@@ -84,8 +82,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
OpenAILLMContextFrame,
OpenAILLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMContextAssistantTimestampFrame,
]
await run_test(
pipeline,
@@ -94,4 +92,6 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
)
self.assertEqual("".join(self.mock_proc.token), self.expected_response)
self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
self.assertEqual(
context_aggregator.assistant().messages[-1]["content"], self.expected_response
)