Compare commits

...

27 Commits

Author SHA1 Message Date
Chad Bailey
40605aef38 added logging for debugging 2025-08-16 21:52:18 +00:00
Chad Bailey
c246b54d4d Added ElevenLabs debug logging 2025-08-14 14:53:07 +00:00
Mark Backman
0502ee2b5a Merge pull request #2394 from pipecat-ai/mb/uv-lock
Update uv.lock
2025-08-07 15:25:38 -07:00
Mark Backman
9ec047094b Update uv.lock 2025-08-07 18:24:47 -04:00
Mark Backman
d991c106c8 Merge pull request #2393 from pipecat-ai/mb/openai-dep
fix: pin openai package upper bound to <=1.99.1
2025-08-07 15:19:05 -07:00
Mark Backman
312fb23c89 fix: pin openai package upper bound to <=1.99.1 2025-08-07 18:00:25 -04:00
Aleix Conchillo Flaqué
4d7f21d44e Merge pull request #2392 from pipecat-ai/aleix/avoid-using-tts-say
deprecate TTSService.say() method
2025-08-07 13:55:49 -07:00
Aleix Conchillo Flaqué
ec25d0a7c9 examples(foundational): fix 20a-persistent-context-openai 2025-08-07 13:48:32 -07:00
Aleix Conchillo Flaqué
2b8218deaa examples(foundational): use TTSSpeakFrame instead of TTSService.say() 2025-08-07 13:48:32 -07:00
Aleix Conchillo Flaqué
11119430cd TTSService: deprecate say() method 2025-08-07 13:48:32 -07:00
kompfner
9ca79232c1 Merge pull request #2380 from pipecat-ai/pk/deprecate-llm-messages-frame
Deprecate `LLMMessagesFrame`, `LLMUserResponseAggregator`, and `LLMAssistantResponseAggregator`
2025-08-07 15:13:01 -04:00
Paul Kompfner
9ea06c33f7 Bump deprecation version of LLMMessagesFrame, LLMUserResponseAggregator, and LLMAssistantResponseAggregator (the deprecation slipped past the 0.0.78 release) 2025-08-07 14:56:50 -04:00
Paul Kompfner
30a1dd202e Move deprecation of LLMMessagesFrame, LLMUserResponseAggregator, and LLMAssistantResponseAggregator into the next release in the changelog 2025-08-07 14:55:11 -04:00
Paul Kompfner
809ab0b7b6 Improve printed deprecation warning 2025-08-07 14:45:35 -04:00
Paul Kompfner
2b5db9c562 Remove redundant deprecation warning in docstring 2025-08-07 14:45:35 -04:00
Paul Kompfner
b4a886b59f Remove redundant deprecation warning in docstring 2025-08-07 14:45:35 -04:00
Paul Kompfner
07eb00722b Fix langchain unit test 2025-08-07 14:45:35 -04:00
Paul Kompfner
96652b8fba Add new deprecations to changelog 2025-08-07 14:45:30 -04:00
Paul Kompfner
df1fcf0c68 Remove unused import 2025-08-07 14:43:37 -04:00
Paul Kompfner
711f740d9e Update UserResponseAggregator to avoid using the now-deprecated LLMUserResponseAggregator 2025-08-07 14:43:37 -04:00
Paul Kompfner
a0bda98c20 Update langchain to avoid using the now-deprecated LLMMessagesFrame, LLMUserResponseAggregator, and LLMAssistantResponseAggregator 2025-08-07 14:43:37 -04:00
Paul Kompfner
1c1bae35ab Mention deprecation in docstring for LLMMessagesFrame 2025-08-07 14:43:37 -04:00
Paul Kompfner
56c52c2cf2 Deprecate LLMUserResponseAggregator and LLMAssistantResponseAggregator, which depend on the now-deprecated LLMMessagesFrame. 2025-08-07 14:43:37 -04:00
Paul Kompfner
740aee1a1a Fix an issue in AnthropicLLMContext where we would never initialize turns_above_cache_threshold if we were upgrading from an OpenAILLMContext.
I noticed this when working on 22c-natural-conversation-mixed-llms.py
2025-08-07 14:43:37 -04:00
Paul Kompfner
f0391c3280 Progress on updating foundational examples to avoid using the newly-deprecated LLMMessagesFrame.
Skipping over 07b-interruptible-langchain.py for now, as it requires deeper changes involving `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator`.
2025-08-07 14:43:37 -04:00
Paul Kompfner
64e48e4660 Deprecate LLMMessagesFrame.
The same functionality can be achieved using either:
- `LLMMessagesUpdateFrame` with the desired messages, with `run_llm` set to `True`
- `OpenAILLMContextFrame` with a new context initialized with the desired messages
2025-08-07 14:43:37 -04:00
Paul Kompfner
b8147bdbbd Add missing Deepgram key to env.example 2025-08-07 14:43:37 -04:00
27 changed files with 271 additions and 103 deletions

View File

@@ -5,6 +5,29 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.79] - 2025-08-07
### Changed
- Changed `pipecat-ai`'s `openai` dependency to `>=1.74.0,<=1.99.1` due to a
breaking change in `openai` 1.99.2 ([commit](https://github.com/openai/openai-python/commit/657f551dbe583ffb259d987dafae12c6211fba06))
### Deprecated
- `TTSService.say()` is deprecated, push a `TTSSpeakFrame` instead. Calling
functions directly is a discouraged pattern in Pipecat because, for example,
it might cause issues with frame ordering.
- `LLMMessagesFrame` is deprecated, in favor of either:
- `LLMMessagesUpdateFrame` with `run_llm=True`
- `OpenAILLMContextFrame` with desired messages in a new context
- `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator` are
deprecated, as they depended on the now-deprecated `LLMMessagesFrame`. Use
`LLMUserContextAggregator` and `LLMAssistantResponseAggregator` (or
LLM-specific subclasses thereof) instead.
## [0.0.78] - 2025-08-07
### Added
@@ -53,7 +76,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added Chinese, Japanese, Korean word timestamp support to
`CartesiaTTSService`.
- Added `region` parameter to `GladiaSTTService`. Accepted values: eu-west (default), us-west.
- Added `region` parameter to `GladiaSTTService`. Accepted values: eu-west
(default), us-west.
### Changed

View File

@@ -29,6 +29,9 @@ CARTESIA_API_KEY=...
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...
# Deepgram
DEEPGRAM_API_KEY=...
# ElevenLabs
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...

View File

@@ -9,10 +9,14 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -59,7 +63,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
await task.queue_frames([OpenAILLMContextFrame(OpenAILLMContext(messages)), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

View File

@@ -15,13 +15,16 @@ from pipecat.frames.frames import (
DataFrame,
Frame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
@@ -153,7 +156,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
}
]
frames.append(MonthFrame(month=month))
frames.append(LLMMessagesFrame(messages))
frames.append(OpenAILLMContextFrame(OpenAILLMContext(messages)))
task = PipelineTask(
pipeline,

View File

@@ -15,7 +15,6 @@ from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMMessagesFrame,
OutputAudioRawFrame,
TextFrame,
TTSAudioRawFrame,
@@ -25,6 +24,10 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
@@ -137,7 +140,7 @@ async def main():
)
task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
await task.queue_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
await task.stop_when_done()
await runner.run(task)

View File

@@ -16,13 +16,16 @@ from langchain_openai import ChatOpenAI
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import LLMMessagesUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.runner.types import RunnerArguments
@@ -97,8 +100,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
lc = LangchainProcessor(history_chain)
tma_in = LLMUserResponseAggregator()
tma_out = LLMAssistantResponseAggregator()
context = OpenAILLMContext()
tma_in = LLMUserContextAggregator(context=context)
tma_out = LLMAssistantContextAggregator(context=context)
pipeline = Pipeline(
[
@@ -125,11 +129,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
# An `OpenAILLMContextFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [({"content": "Please briefly introduce yourself to the user."})]
await task.queue_frames([LLMMessagesFrame(messages)])
await task.queue_frames([LLMMessagesUpdateFrame(messages, run_llm=True)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -6,9 +6,13 @@ from typing import Tuple
import aiohttp
from dotenv import load_dotenv
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.runner.daily import configure
from pipecat.services.azure import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
@@ -79,7 +83,7 @@ async def main():
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
await source_queue.put(LLMMessagesFrame(messages))
await source_queue.put(OpenAILLMContextFrame(OpenAILLMContext(messages)))
await source_queue.put(EndFrame())
await pipeline.run_pipeline()

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -119,7 +119,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
image_requester.set_participant_id(client_id)
# Welcome message
await tts.say("Hi there! Feel free to ask me what I see.")
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -123,7 +123,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
image_requester.set_participant_id(client_id)
# Welcome message
await tts.say("Hi there! Feel free to ask me what I see.")
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -123,7 +123,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
image_requester.set_participant_id(client_id)
# Welcome message
await tts.say("Hi there! Feel free to ask me what I see.")
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -123,7 +123,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
image_requester.set_participant_id(client_id)
# Welcome message
await tts.say("Hi there! Feel free to ask me what I see.")
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -75,23 +75,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
message = {
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
message = {
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
else:
# Third attempt: End the conversation

View File

@@ -14,6 +14,7 @@ from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -34,7 +35,6 @@ load_dotenv(override=True)
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(params: FunctionCallParams):
@@ -87,7 +87,7 @@ async def load_conversation(params: FunctionCallParams):
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(params.context.messages, indent=4)}"
)
await tts.say("Ok, I've loaded that conversation.")
await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation."))
except Exception as e:
await params.result_callback({"success": False, "error": str(e)})
@@ -190,7 +190,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
global tts
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -14,6 +14,7 @@ from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -88,7 +89,7 @@ async def load_conversation(params: FunctionCallParams):
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(params.context.messages, indent=4)}"
)
await tts.say("Ok, I've loaded that conversation.")
await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation."))
except Exception as e:
await params.result_callback({"success": False, "error": str(e)})

View File

@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -60,10 +59,6 @@ classifier_statement = "Determine if the user's statement ends with a complete t
class StatementJudgeContextFilter(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
@@ -71,13 +66,8 @@ class StatementJudgeContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
# Just treat an LLMMessagesFrame as complete, no matter what.
if isinstance(frame, LLMMessagesFrame):
await self._notifier.notify()
return
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
# messages frame that contains a system prompt and the most recent user messages,
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
# context frame that contains a system prompt and the most recent user messages,
# concatenated.
if isinstance(frame, OpenAILLMContextFrame):
logger.debug(f"Context Frame: {frame}")
@@ -96,7 +86,7 @@ class StatementJudgeContextFilter(FrameProcessor):
for content in message["content"]:
if content["type"] == "text":
user_text_messages.insert(0, content["text"])
# If we have any user text content, push an LLMMessagesFrame
# If we have any user text content, push a context frame with the simplified context.
if user_text_messages:
logger.debug(f"User text messages: {user_text_messages}")
user_message = " ".join(reversed(user_text_messages))
@@ -110,7 +100,7 @@ class StatementJudgeContextFilter(FrameProcessor):
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(LLMMessagesFrame(messages))
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
class CompletenessCheck(FrameProcessor):
@@ -296,7 +286,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
statement_judge_context_filter = StatementJudgeContextFilter()
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(notifier=notifier)
@@ -316,7 +306,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
@@ -331,14 +320,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
ParallelPipeline(
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# LLMMessagesFrame to the statement classifier LLM. The only frame this
# simplified context frame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
# Block everything except frames that trigger LLM inference.
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.

View File

@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -266,10 +265,6 @@ Please be very concise in your responses. Unless you are explicitly asked to do
class StatementJudgeContextFilter(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
@@ -277,14 +272,8 @@ class StatementJudgeContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
# Just treat an LLMMessagesFrame as complete, no matter what.
if isinstance(frame, LLMMessagesFrame):
await self._notifier.notify()
return
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
# messages frame that contains a system prompt and the most recent user messages,
# concatenated.
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
# context frame that contains a system prompt and the most recent user messages,
if isinstance(frame, OpenAILLMContextFrame):
# Take text content from the most recent user messages.
messages = frame.context.messages
@@ -301,7 +290,7 @@ class StatementJudgeContextFilter(FrameProcessor):
for content in message["content"]:
if content["type"] == "text":
user_text_messages.insert(0, content["text"])
# If we have any user text content, push an LLMMessagesFrame
# If we have any user text content, push a context frame with the simplified context.
if user_text_messages:
user_message = " ".join(reversed(user_text_messages))
logger.debug(f"!!! {user_message}")
@@ -314,7 +303,7 @@ class StatementJudgeContextFilter(FrameProcessor):
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(LLMMessagesFrame(messages))
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
class CompletenessCheck(FrameProcessor):
@@ -499,7 +488,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
statement_judge_context_filter = StatementJudgeContextFilter()
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(notifier=notifier)
@@ -522,7 +511,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
@@ -542,14 +530,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# LLMMessagesFrame to the statement classifier LLM. The only frame this
# simplified context frame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
# Block everything except frames that trigger LLM inference.
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.

View File

@@ -33,7 +33,7 @@ dependencies = [
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0",
"openai>=1.74.0,<2",
"openai>=1.74.0,<=1.99.1",
]
[project.urls]

View File

@@ -478,6 +478,12 @@ class TranscriptionUpdateFrame(DataFrame):
class LLMMessagesFrame(DataFrame):
"""Frame containing LLM messages for chat completion.
.. deprecated:: 0.0.79
This class is deprecated and will be removed in a future version.
Instead, use either:
- `LLMMessagesUpdateFrame` with `run_llm=True`
- `OpenAILLMContextFrame` with desired messages in a new context
A frame containing a list of LLM messages. Used to signal that an LLM
service should run a chat completion and emit an LLMFullResponseStartFrame,
TextFrames and an LLMFullResponseEndFrame. Note that the `messages`
@@ -490,6 +496,20 @@ class LLMMessagesFrame(DataFrame):
messages: List[dict]
def __post_init__(self):
super().__post_init__()
import warnings
warnings.simplefilter("always")
warnings.warn(
"LLMMessagesFrame is deprecated and will be removed in a future version. "
"Instead, use either "
"`LLMMessagesUpdateFrame` with `run_llm=True`, or "
"`OpenAILLMContextFrame` with desired messages in a new context",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class LLMMessagesAppendFrame(DataFrame):

View File

@@ -12,6 +12,7 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
"""
import asyncio
import warnings
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Literal, Optional, Set
@@ -994,6 +995,10 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
class LLMUserResponseAggregator(LLMUserContextAggregator):
"""User response aggregator that outputs LLMMessagesFrame instead of context frames.
.. deprecated:: 0.0.79
This class is deprecated and will be removed in a future version.
Use `LLMUserContextAggregator` or another LLM-specific subclass instead.
This aggregator extends LLMUserContextAggregator but pushes LLMMessagesFrame
objects downstream instead of OpenAILLMContextFrame objects. This is useful
when you need message-based output rather than context-based output.
@@ -1013,6 +1018,12 @@ class LLMUserResponseAggregator(LLMUserContextAggregator):
params: Configuration parameters for aggregation behavior.
**kwargs: Additional arguments passed to parent class.
"""
warnings.warn(
"LLMUserResponseAggregator is deprecated and will be removed in a future version. "
"Use LLMUserContextAggregator or another LLM-specific subclass instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(context=OpenAILLMContext(messages), params=params, **kwargs)
async def _process_aggregation(self):
@@ -1027,6 +1038,10 @@ class LLMUserResponseAggregator(LLMUserContextAggregator):
class LLMAssistantResponseAggregator(LLMAssistantContextAggregator):
"""Assistant response aggregator that outputs LLMMessagesFrame instead of context frames.
.. deprecated:: 0.0.79
This class is deprecated and will be removed in a future version.
Use `LLMAssistantContextAggregator` or another LLM-specific subclass instead.
This aggregator extends LLMAssistantContextAggregator but pushes LLMMessagesFrame
objects downstream instead of OpenAILLMContextFrame objects. This is useful
when you need message-based output rather than context-based output.
@@ -1046,6 +1061,12 @@ class LLMAssistantResponseAggregator(LLMAssistantContextAggregator):
params: Configuration parameters for aggregation behavior.
**kwargs: Additional arguments passed to parent class.
"""
warnings.warn(
"LLMAssistantResponseAggregator is deprecated and will be removed in a future version. "
"Use LLMAssistantContextAggregator or another LLM-specific subclass instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(context=OpenAILLMContext(messages), params=params, **kwargs)
async def push_aggregation(self):

View File

@@ -12,13 +12,14 @@ in conversational pipelines.
"""
from pipecat.frames.frames import TextFrame
from pipecat.processors.aggregators.llm_response import LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
class UserResponseAggregator(LLMUserResponseAggregator):
class UserResponseAggregator(LLMUserContextAggregator):
"""Aggregates user responses into TextFrame objects.
This aggregator extends LLMUserResponseAggregator to specifically handle
This aggregator extends LLMUserContextAggregator to specifically handle
user input by collecting text responses and outputting them as TextFrame
objects when the aggregation is complete.
"""
@@ -27,9 +28,9 @@ class UserResponseAggregator(LLMUserResponseAggregator):
"""Initialize the user response aggregator.
Args:
**kwargs: Additional arguments passed to parent LLMUserResponseAggregator.
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
"""
super().__init__(**kwargs)
super().__init__(context=OpenAILLMContext(), **kwargs)
async def push_aggregation(self):
"""Push the aggregated user response as a TextFrame.

View File

@@ -14,9 +14,9 @@ from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
try:
@@ -64,11 +64,11 @@ class LangchainProcessor(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMMessagesFrame):
if isinstance(frame, OpenAILLMContextFrame):
# Messages are accumulated on the context as a list of messages.
# The last one by the human is the one we want to send to the LLM.
logger.debug(f"Got transcription frame {frame}")
text: str = frame.messages[-1]["content"]
text: str = frame.context.messages[-1]["content"]
await self._ainvoke(text.strip())
else:

View File

@@ -448,14 +448,16 @@ class AnthropicLLMContext(OpenAILLMContext):
system: System message content.
"""
super().__init__(messages=messages, tools=tools, tool_choice=tool_choice)
self.__setup_local()
self.system = system
def __setup_local(self):
# For beta prompt caching. This is a counter that tracks the number of turns
# we've seen above the cache threshold. We reset this when we reset the
# messages list. We only care about this number being 0, 1, or 2. But
# it's easiest just to treat it as a counter.
self.turns_above_cache_threshold = 0
self.system = system
return
@staticmethod
def upgrade_to_anthropic(obj: OpenAILLMContext) -> "AnthropicLLMContext":
@@ -472,6 +474,7 @@ class AnthropicLLMContext(OpenAILLMContext):
logger.debug(f"Upgrading to Anthropic: {obj}")
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AnthropicLLMContext):
obj.__class__ = AnthropicLLMContext
obj.__setup_local()
obj._restructure_from_openai_messages()
return obj

View File

@@ -479,6 +479,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self._websocket = await websocket_connect(
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
logger.debug(f"{self}: WebSocket connected to ElevenLabs - ready to receive audio")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
@@ -513,7 +514,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
# Close the current context when interrupted without closing the websocket
if self._context_id and self._websocket:
logger.trace(f"Closing context {self._context_id} due to interruption")
logger.debug(
f"{self}: Closing context {self._context_id} due to interruption - this will stop audio stream"
)
try:
# ElevenLabs requires that Pipecat manages the contexts and closes them
# when they're not longer in use. Since a StartInterruptionFrame is pushed
@@ -524,6 +527,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.send(
json.dumps({"context_id": self._context_id, "close_context": True})
)
logger.debug(f"{self}: Sent close_context message for {self._context_id}")
except Exception as e:
logger.error(f"Error closing context on interruption: {e}")
self._context_id = None
@@ -534,15 +538,28 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
# Log raw message structure for debugging (truncated for readability)
message_preview = message[:500] + "..." if len(message) > 500 else message
logger.debug(f"{self}: Raw WebSocket message preview: {message_preview}")
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
# Log the message structure without the large audio data
msg_structure = {
k: (f"<{len(v)} chars>" if k == "audio" and isinstance(v, str) else v)
for k, v in msg.items()
}
logger.debug(f"{self}: Parsed message structure: {msg_structure}")
# Handle final messages first, regardless of context availability
# At the moment, this message is received AFTER the close_context message is
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
if msg.get("isFinal") is True:
logger.trace(f"Received final message for context {received_ctx_id}")
logger.debug(
f"{self}: Received final message for context {received_ctx_id} - audio stream ended"
)
continue
# Check if this message belongs to the current context.
@@ -564,7 +581,44 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
# Targeted logging for audio debugging
audio_size = len(audio)
# More comprehensive audio analysis
if audio_size > 0:
# Sample first and last 8 bytes
first_8 = audio[:8].hex() if audio_size >= 8 else audio.hex()
last_8 = audio[-8:].hex() if audio_size >= 8 else ""
# Statistical analysis
non_zero_count = sum(1 for b in audio if b != 0)
non_zero_percent = (non_zero_count / audio_size * 100) if audio_size > 0 else 0
# Check for common silence patterns
all_zeros = all(b == 0 for b in audio)
mostly_zeros = non_zero_percent < 1.0 # Less than 1% non-zero
# Sample some values to check amplitude range
max_val = max(audio) if audio else 0
min_val = min(audio) if audio else 0
logger.debug(
f"{self}: Audio received - size: {audio_size} bytes, "
f"first_8: {first_8}, last_8: {last_8}, "
f"non_zero: {non_zero_percent:.1f}%, all_zeros: {all_zeros}, "
f"mostly_zeros: {mostly_zeros}, range: {min_val}-{max_val}, "
f"context: {received_ctx_id}, cumulative_time: {self._cumulative_time:.3f}s"
)
else:
logger.warning(
f"{self}: Received empty audio data for context {received_ctx_id}"
)
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
logger.debug(
f"[ELEVENLABS_AUDIO] Creating TTSAudioRawFrame with {len(audio)} bytes for context {received_ctx_id}"
)
await self.append_to_audio_context(received_ctx_id, frame)
if msg.get("alignment"):
@@ -620,6 +674,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
"""Send text to the WebSocket for synthesis."""
if self._websocket and self._context_id:
msg = {"text": text, "context_id": self._context_id}
logger.debug(
f"{self}: Sending text to ElevenLabs - length: {len(text)}, context: {self._context_id}"
)
await self._websocket.send(json.dumps(msg))
@traced_tts
@@ -973,6 +1030,9 @@ class ElevenLabsHttpTTSService(WordTTSService):
if data and "audio_base64" in data:
await self.stop_ttfb_metrics()
audio = base64.b64decode(data["audio_base64"])
logger.debug(
f"[ELEVENLABS_AUDIO] Yielding TTSAudioRawFrame with {len(audio)} bytes"
)
yield TTSAudioRawFrame(audio, self.sample_rate, 1)
# Process alignment if present

View File

@@ -269,9 +269,20 @@ class TTSService(AIService):
async def say(self, text: str):
"""Immediately speak the provided text.
.. deprecated:: 0.0.79
Push a `TTSSpeakFrame` instead to ensure frame ordering is maintained.
Args:
text: The text to speak.
"""
import warnings
warnings.warn(
"`TTSService.say()` is deprecated. Push a `TTSSpeakFrame` instead.",
DeprecationWarning,
stacklevel=2,
)
await self.queue_frame(TTSSpeakFrame(text))
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -322,6 +333,9 @@ class TTSService(AIService):
elif isinstance(frame, TTSUpdateSettingsFrame):
await self._update_settings(frame.settings)
elif isinstance(frame, BotStoppedSpeakingFrame):
logger.warning(
f"[TTS_RESUME] {self.__class__.__name__} received BotStoppedSpeakingFrame"
)
await self._maybe_resume_frame_processing()
await self.push_frame(frame, direction)
else:
@@ -365,11 +379,15 @@ class TTSService(AIService):
async def _maybe_pause_frame_processing(self):
if self._processing_text and self._pause_frame_processing:
logger.warning(f"[TTS_PAUSE] {self.__class__.__name__} pausing frame processing")
await self.pause_processing_frames()
async def _maybe_resume_frame_processing(self):
if self._pause_frame_processing:
logger.warning(f"[TTS_RESUME] {self.__class__.__name__} resuming frame processing")
await self.resume_processing_frames()
else:
logger.debug(f"[TTS_RESUME] {self.__class__.__name__} resume called but not paused")
async def _process_text_frame(self, frame: TextFrame):
text: Optional[str] = None

View File

@@ -503,6 +503,9 @@ class BaseOutputTransport(FrameProcessor):
num_channels=frame.num_channels,
)
chunk.transport_destination = self._destination
logger.debug(
f"[AUDIO_QUEUE] Putting {chunk.__class__.__name__} with {len(chunk.audio)} bytes into audio queue"
)
await self._audio_queue.put(chunk)
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
@@ -582,15 +585,18 @@ class BaseOutputTransport(FrameProcessor):
async def _bot_stopped_speaking(self):
"""Handle bot stopped speaking event."""
if self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
logger.warning(
f"[BOT_STOPPED] Bot stopped speaking - sending BotStoppedSpeakingFrame upstream"
)
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
logger.debug(f"[BOT_STOPPED] Pushing downstream BotStoppedSpeakingFrame")
await self._transport.push_frame(downstream_frame)
logger.debug(f"[BOT_STOPPED] Pushing upstream BotStoppedSpeakingFrame")
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = False
@@ -598,6 +604,8 @@ class BaseOutputTransport(FrameProcessor):
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
else:
logger.debug(f"[BOT_STOPPED] _bot_stopped_speaking called but bot was not speaking")
async def _handle_frame(self, frame: Frame):
"""Handle various frame types with appropriate processing.
@@ -624,12 +632,24 @@ class BaseOutputTransport(FrameProcessor):
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
logger.debug(
f"[AUDIO_QUEUE] Waiting for audio frame (timeout={vad_stop_secs}s)"
)
start_time = time.time()
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
wait_time = time.time() - start_time
logger.debug(
f"[AUDIO_QUEUE] Got frame {frame.__class__.__name__} after {wait_time:.3f}s"
)
self._transport.reset_watchdog()
yield frame
except asyncio.TimeoutError:
wait_time = time.time() - start_time
logger.warning(
f"[AUDIO_QUEUE] TIMEOUT after {wait_time:.3f}s - triggering bot_stopped_speaking"
)
self._transport.reset_watchdog()
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()

View File

@@ -12,7 +12,7 @@ from langchain_core.language_models import FakeStreamingListLLM
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
OpenAILLMContextAssistantTimestampFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -21,8 +21,12 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.processors.frameworks.langchain import LangchainProcessor
@@ -63,9 +67,10 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
proc = LangchainProcessor(chain=chain)
self.mock_proc = self.MockProcessor("token_collector")
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(
messages, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
context = OpenAILLMContext()
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
)
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
@@ -79,7 +84,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
LLMMessagesFrame,
OpenAILLMContextFrame,
OpenAILLMContextAssistantTimestampFrame,
]
await run_test(
pipeline,

2
uv.lock generated
View File

@@ -4364,7 +4364,7 @@ requires-dist = [
{ name = "numpy", specifier = ">=1.26.4,<3" },
{ name = "nvidia-riva-client", marker = "extra == 'riva'", specifier = "~=2.21.1" },
{ name = "onnxruntime", marker = "extra == 'silero'", specifier = "~=1.20.1" },
{ name = "openai", specifier = ">=1.74.0,<2" },
{ name = "openai", specifier = ">=1.74.0,<=1.99.1" },
{ name = "opencv-python", marker = "extra == 'webrtc'", specifier = "~=4.11.0.86" },
{ name = "openpipe", marker = "extra == 'openpipe'", specifier = "~=4.50.0" },
{ name = "opentelemetry-api", marker = "extra == 'tracing'", specifier = ">=1.33.0" },