From f0391c3280afcc49aa4414413f8b6f3496795c9d Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 6 Aug 2025 12:03:40 -0400 Subject: [PATCH] Progress on updating foundational examples to avoid using the newly-deprecated `LLMMessagesFrame`. Skipping over 07b-interruptible-langchain.py for now, as it requires deeper changes involving `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator`. --- examples/foundational/02-llm-say-one-thing.py | 8 ++++-- .../foundational/05-sync-speech-and-image.py | 7 +++-- .../05a-local-sync-speech-and-image.py | 7 +++-- examples/foundational/08-bots-arguing.py | 8 ++++-- examples/foundational/17-detect-user-idle.py | 26 ++++++++----------- .../22b-natural-conversation-proposal.py | 25 +++++------------- .../22c-natural-conversation-mixed-llms.py | 26 +++++-------------- 7 files changed, 47 insertions(+), 60 deletions(-) diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index 3dc739059..1282f3788 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -9,10 +9,14 @@ import os from dotenv import load_dotenv from loguru import logger -from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.frames.frames import EndFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService @@ -59,7 +63,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Register an event handler so we can play the audio when the client joins @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): - await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) + await task.queue_frames([OpenAILLMContextFrame(OpenAILLMContext(messages)), EndFrame()]) runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 6adff936d..384ef617c 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -15,13 +15,16 @@ from pipecat.frames.frames import ( DataFrame, Frame, LLMFullResponseStartFrame, - LLMMessagesFrame, TextFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) from pipecat.processors.aggregators.sentence import SentenceAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments @@ -153,7 +156,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): } ] frames.append(MonthFrame(month=month)) - frames.append(LLMMessagesFrame(messages)) + frames.append(OpenAILLMContextFrame(OpenAILLMContext(messages))) task = PipelineTask( pipeline, diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index a01f07c0e..f40d2a163 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -15,7 +15,6 @@ from loguru import logger from pipecat.frames.frames import ( Frame, - LLMMessagesFrame, OutputAudioRawFrame, TextFrame, TTSAudioRawFrame, @@ -25,6 +24,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) from pipecat.processors.aggregators.sentence import SentenceAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.cartesia.tts import CartesiaHttpTTSService @@ -137,7 +140,7 @@ async def main(): ) task = PipelineTask(pipeline) - await task.queue_frame(LLMMessagesFrame(messages)) + await task.queue_frame(OpenAILLMContextFrame(OpenAILLMContext(messages))) await task.stop_when_done() await runner.run(task) diff --git a/examples/foundational/08-bots-arguing.py b/examples/foundational/08-bots-arguing.py index b052ef08a..a38bc273a 100644 --- a/examples/foundational/08-bots-arguing.py +++ b/examples/foundational/08-bots-arguing.py @@ -6,9 +6,13 @@ from typing import Tuple import aiohttp from dotenv import load_dotenv -from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame +from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators import SentenceAggregator +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) from pipecat.runner.daily import configure from pipecat.services.azure import AzureLLMService, AzureTTSService from pipecat.services.elevenlabs import ElevenLabsTTSService @@ -79,7 +83,7 @@ async def main(): sentence_aggregator = SentenceAggregator() pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue) - await source_queue.put(LLMMessagesFrame(messages)) + await source_queue.put(OpenAILLMContextFrame(OpenAILLMContext(messages))) await source_queue.put(EndFrame()) await pipeline.run_pipeline() diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py index d40a9699c..2d97f10e2 100644 --- a/examples/foundational/17-detect-user-idle.py +++ b/examples/foundational/17-detect-user-idle.py @@ -11,7 +11,7 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame +from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -75,23 +75,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool: if retry_count == 1: # First attempt: Add a gentle prompt to the conversation - messages.append( - { - "role": "system", - "content": "The user has been quiet. Politely and briefly ask if they're still there.", - } - ) - await user_idle.push_frame(LLMMessagesFrame(messages)) + message = { + "role": "system", + "content": "The user has been quiet. Politely and briefly ask if they're still there.", + } + await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True)) return True elif retry_count == 2: # Second attempt: More direct prompt - messages.append( - { - "role": "system", - "content": "The user is still inactive. Ask if they'd like to continue our conversation.", - } - ) - await user_idle.push_frame(LLMMessagesFrame(messages)) + message = { + "role": "system", + "content": "The user is still inactive. Ask if they'd like to continue our conversation.", + } + await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True)) return True else: # Third attempt: End the conversation diff --git a/examples/foundational/22b-natural-conversation-proposal.py b/examples/foundational/22b-natural-conversation-proposal.py index 72c5ba310..90594cf09 100644 --- a/examples/foundational/22b-natural-conversation-proposal.py +++ b/examples/foundational/22b-natural-conversation-proposal.py @@ -19,7 +19,6 @@ from pipecat.frames.frames import ( Frame, FunctionCallInProgressFrame, FunctionCallResultFrame, - LLMMessagesFrame, StartFrame, StartInterruptionFrame, StopInterruptionFrame, @@ -60,10 +59,6 @@ classifier_statement = "Determine if the user's statement ends with a complete t class StatementJudgeContextFilter(FrameProcessor): - def __init__(self, notifier: BaseNotifier, **kwargs): - super().__init__(**kwargs) - self._notifier = notifier - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # We must not block system frames. @@ -71,13 +66,8 @@ class StatementJudgeContextFilter(FrameProcessor): await self.push_frame(frame, direction) return - # Just treat an LLMMessagesFrame as complete, no matter what. - if isinstance(frame, LLMMessagesFrame): - await self._notifier.notify() - return - - # Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple - # messages frame that contains a system prompt and the most recent user messages, + # We only want to handle OpenAILLMContextFrames, and only want to push through a simplified + # context frame that contains a system prompt and the most recent user messages, # concatenated. if isinstance(frame, OpenAILLMContextFrame): logger.debug(f"Context Frame: {frame}") @@ -96,7 +86,7 @@ class StatementJudgeContextFilter(FrameProcessor): for content in message["content"]: if content["type"] == "text": user_text_messages.insert(0, content["text"]) - # If we have any user text content, push an LLMMessagesFrame + # If we have any user text content, push a context frame with the simplified context. if user_text_messages: logger.debug(f"User text messages: {user_text_messages}") user_message = " ".join(reversed(user_text_messages)) @@ -110,7 +100,7 @@ class StatementJudgeContextFilter(FrameProcessor): if last_assistant_message: messages.append(last_assistant_message) messages.append({"role": "user", "content": user_message}) - await self.push_frame(LLMMessagesFrame(messages)) + await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages))) class CompletenessCheck(FrameProcessor): @@ -296,7 +286,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # This turns the LLM context into an inference request to classify the user's speech # as complete or incomplete. - statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier) + statement_judge_context_filter = StatementJudgeContextFilter() # This sends a UserStoppedSpeakingFrame and triggers the notifier event completeness_check = CompletenessCheck(notifier=notifier) @@ -316,7 +306,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): async def pass_only_llm_trigger_frames(frame): return ( isinstance(frame, OpenAILLMContextFrame) - or isinstance(frame, LLMMessagesFrame) or isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame) or isinstance(frame, FunctionCallInProgressFrame) @@ -331,14 +320,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ParallelPipeline( [ # Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed - # LLMMessagesFrame to the statement classifier LLM. The only frame this + # simplified context frame to the statement classifier LLM. The only frame this # sub-pipeline will output is a UserStoppedSpeakingFrame. statement_judge_context_filter, statement_llm, completeness_check, ], [ - # Block everything except OpenAILLMContextFrame and LLMMessagesFrame + # Block everything except frames that trigger LLM inference. FunctionFilter(filter=pass_only_llm_trigger_frames), llm, bot_output_gate, # Buffer all llm/tts output until notified. diff --git a/examples/foundational/22c-natural-conversation-mixed-llms.py b/examples/foundational/22c-natural-conversation-mixed-llms.py index c9ea66afb..c44034f1c 100644 --- a/examples/foundational/22c-natural-conversation-mixed-llms.py +++ b/examples/foundational/22c-natural-conversation-mixed-llms.py @@ -19,7 +19,6 @@ from pipecat.frames.frames import ( Frame, FunctionCallInProgressFrame, FunctionCallResultFrame, - LLMMessagesFrame, StartFrame, StartInterruptionFrame, StopInterruptionFrame, @@ -266,10 +265,6 @@ Please be very concise in your responses. Unless you are explicitly asked to do class StatementJudgeContextFilter(FrameProcessor): - def __init__(self, notifier: BaseNotifier, **kwargs): - super().__init__(**kwargs) - self._notifier = notifier - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # We must not block system frames. @@ -277,14 +272,8 @@ class StatementJudgeContextFilter(FrameProcessor): await self.push_frame(frame, direction) return - # Just treat an LLMMessagesFrame as complete, no matter what. - if isinstance(frame, LLMMessagesFrame): - await self._notifier.notify() - return - - # Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple - # messages frame that contains a system prompt and the most recent user messages, - # concatenated. + # We only want to handle OpenAILLMContextFrames, and only want to push through a simplified + # context frame that contains a system prompt and the most recent user messages, if isinstance(frame, OpenAILLMContextFrame): # Take text content from the most recent user messages. messages = frame.context.messages @@ -301,7 +290,7 @@ class StatementJudgeContextFilter(FrameProcessor): for content in message["content"]: if content["type"] == "text": user_text_messages.insert(0, content["text"]) - # If we have any user text content, push an LLMMessagesFrame + # If we have any user text content, push a context frame with the simplified context. if user_text_messages: user_message = " ".join(reversed(user_text_messages)) logger.debug(f"!!! {user_message}") @@ -314,7 +303,7 @@ class StatementJudgeContextFilter(FrameProcessor): if last_assistant_message: messages.append(last_assistant_message) messages.append({"role": "user", "content": user_message}) - await self.push_frame(LLMMessagesFrame(messages)) + await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages))) class CompletenessCheck(FrameProcessor): @@ -499,7 +488,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # This turns the LLM context into an inference request to classify the user's speech # as complete or incomplete. - statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier) + statement_judge_context_filter = StatementJudgeContextFilter() # This sends a UserStoppedSpeakingFrame and triggers the notifier event completeness_check = CompletenessCheck(notifier=notifier) @@ -522,7 +511,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): async def pass_only_llm_trigger_frames(frame): return ( isinstance(frame, OpenAILLMContextFrame) - or isinstance(frame, LLMMessagesFrame) or isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame) or isinstance(frame, FunctionCallInProgressFrame) @@ -542,14 +530,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ], [ # Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed - # LLMMessagesFrame to the statement classifier LLM. The only frame this + # simplified context frame to the statement classifier LLM. The only frame this # sub-pipeline will output is a UserStoppedSpeakingFrame. statement_judge_context_filter, statement_llm, completeness_check, ], [ - # Block everything except OpenAILLMContextFrame and LLMMessagesFrame + # Block everything except frames that trigger LLM inference. FunctionFilter(filter=pass_only_llm_trigger_frames), llm, bot_output_gate, # Buffer all llm/tts output until notified.