Progress on updating foundational examples to avoid using the newly-deprecated LLMMessagesFrame.

Skipping over 07b-interruptible-langchain.py for now, as it requires deeper changes involving `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator`.
This commit is contained in:
Paul Kompfner
2025-08-06 12:03:40 -04:00
parent 64e48e4660
commit f0391c3280
7 changed files with 47 additions and 60 deletions

View File

@@ -9,10 +9,14 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -59,7 +63,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
await task.queue_frames([OpenAILLMContextFrame(OpenAILLMContext(messages)), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

View File

@@ -15,13 +15,16 @@ from pipecat.frames.frames import (
DataFrame,
Frame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
@@ -153,7 +156,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
}
]
frames.append(MonthFrame(month=month))
frames.append(LLMMessagesFrame(messages))
frames.append(OpenAILLMContextFrame(OpenAILLMContext(messages)))
task = PipelineTask(
pipeline,

View File

@@ -15,7 +15,6 @@ from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMMessagesFrame,
OutputAudioRawFrame,
TextFrame,
TTSAudioRawFrame,
@@ -25,6 +24,10 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
@@ -137,7 +140,7 @@ async def main():
)
task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
await task.queue_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
await task.stop_when_done()
await runner.run(task)

View File

@@ -6,9 +6,13 @@ from typing import Tuple
import aiohttp
from dotenv import load_dotenv
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.runner.daily import configure
from pipecat.services.azure import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
@@ -79,7 +83,7 @@ async def main():
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
await source_queue.put(LLMMessagesFrame(messages))
await source_queue.put(OpenAILLMContextFrame(OpenAILLMContext(messages)))
await source_queue.put(EndFrame())
await pipeline.run_pipeline()

View File

@@ -11,7 +11,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -75,23 +75,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
message = {
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
message = {
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
else:
# Third attempt: End the conversation

View File

@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -60,10 +59,6 @@ classifier_statement = "Determine if the user's statement ends with a complete t
class StatementJudgeContextFilter(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
@@ -71,13 +66,8 @@ class StatementJudgeContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
# Just treat an LLMMessagesFrame as complete, no matter what.
if isinstance(frame, LLMMessagesFrame):
await self._notifier.notify()
return
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
# messages frame that contains a system prompt and the most recent user messages,
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
# context frame that contains a system prompt and the most recent user messages,
# concatenated.
if isinstance(frame, OpenAILLMContextFrame):
logger.debug(f"Context Frame: {frame}")
@@ -96,7 +86,7 @@ class StatementJudgeContextFilter(FrameProcessor):
for content in message["content"]:
if content["type"] == "text":
user_text_messages.insert(0, content["text"])
# If we have any user text content, push an LLMMessagesFrame
# If we have any user text content, push a context frame with the simplified context.
if user_text_messages:
logger.debug(f"User text messages: {user_text_messages}")
user_message = " ".join(reversed(user_text_messages))
@@ -110,7 +100,7 @@ class StatementJudgeContextFilter(FrameProcessor):
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(LLMMessagesFrame(messages))
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
class CompletenessCheck(FrameProcessor):
@@ -296,7 +286,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
statement_judge_context_filter = StatementJudgeContextFilter()
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(notifier=notifier)
@@ -316,7 +306,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
@@ -331,14 +320,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
ParallelPipeline(
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# LLMMessagesFrame to the statement classifier LLM. The only frame this
# simplified context frame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
# Block everything except frames that trigger LLM inference.
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.

View File

@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -266,10 +265,6 @@ Please be very concise in your responses. Unless you are explicitly asked to do
class StatementJudgeContextFilter(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
@@ -277,14 +272,8 @@ class StatementJudgeContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
# Just treat an LLMMessagesFrame as complete, no matter what.
if isinstance(frame, LLMMessagesFrame):
await self._notifier.notify()
return
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
# messages frame that contains a system prompt and the most recent user messages,
# concatenated.
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
# context frame that contains a system prompt and the most recent user messages,
if isinstance(frame, OpenAILLMContextFrame):
# Take text content from the most recent user messages.
messages = frame.context.messages
@@ -301,7 +290,7 @@ class StatementJudgeContextFilter(FrameProcessor):
for content in message["content"]:
if content["type"] == "text":
user_text_messages.insert(0, content["text"])
# If we have any user text content, push an LLMMessagesFrame
# If we have any user text content, push a context frame with the simplified context.
if user_text_messages:
user_message = " ".join(reversed(user_text_messages))
logger.debug(f"!!! {user_message}")
@@ -314,7 +303,7 @@ class StatementJudgeContextFilter(FrameProcessor):
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(LLMMessagesFrame(messages))
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
class CompletenessCheck(FrameProcessor):
@@ -499,7 +488,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
statement_judge_context_filter = StatementJudgeContextFilter()
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(notifier=notifier)
@@ -522,7 +511,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
@@ -542,14 +530,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# LLMMessagesFrame to the statement classifier LLM. The only frame this
# simplified context frame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
# Block everything except frames that trigger LLM inference.
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.