This commit is contained in:
vipyne
2025-10-21 11:42:06 -05:00
parent abf0150261
commit 8b24bae9c5

View File

@@ -16,20 +16,9 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
FunctionCallResultFrame,
InputAudioRawFrame,
InterruptionFrame,
LLMRunFrame,
LLMTextFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
MetricsFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -48,76 +37,42 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class CustomFrameProcessor(FrameProcessor):
"""CustomFrameProcessor does 3 things:
def format_metrics(metrics, indent=0):
lines = []
tab = "\t" * indent
1. keeps count of `InputAudioRawFrame` frames and logs count
when a `UserStoppedSpeakingFrame` is emitted.
for metric in metrics:
lines.append(tab + type(metric).__name__)
for field, value in vars(metric).items():
if hasattr(value, "__dict__") and not isinstance(
value, (str, int, float, bool, type(None))
):
lines.append(f"{tab}\t{field}={type(value).__name__}")
for k, v in vars(value).items():
lines.append(f"{tab}\t\t{k}={repr(v)}")
else:
lines.append(f"{tab}\t{field}={repr(value)}")
2. Filters `LLMTextFrame` frames and replaces "the" with "the pumpkin".
return "\n".join(lines)
3. Logs the following frames:
BotStartedSpeakingFrame
BotStoppedSpeakingFrame
CancelFrame
EndFrame
InterruptionFrame
StartFrame
UserStartedSpeakingFrame
VADUserStartedSpeakingFrame
4. Always pushes all frames
class MetricsFrameLogger(FrameProcessor):
"""MetricsFrameLogger logs all MetericsFrames.
AND it Always pushes all frames.
"""
def __init__(self):
super().__init__()
self._raw_audio_input_frame_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
#### 1.
# InputAudioRawFrames are noisy- probably don't want to log every instance
# keep a count and only log it when we see `UserStoppedSpeakingFrame`
if isinstance(frame, InputAudioRawFrame):
self._raw_audio_input_frame_count = self._raw_audio_input_frame_count + 1
if isinstance(frame, MetricsFrame):
logger.info(f"{frame.name}\n {format_metrics(frame.data)}")
await self.push_frame(frame, direction)
elif isinstance(frame, UserStoppedSpeakingFrame):
logger.info(
f"* * frame: {frame}; number of `InputAudioRawFrame` frames so far: {self._raw_audio_input_frame_count}"
)
await self.push_frame(frame, direction)
#### 2.
# everytime the LLM's response includes "the", replace it with "the pumpkin"
elif isinstance(frame, LLMTextFrame):
if "the" in frame.text:
text = re.sub(r" the\b", " the pumpkin", frame.text)
frame.text = text
await self.push_frame(frame, direction)
#### 3.
# frames types to log
elif isinstance(
frame,
(
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
InterruptionFrame,
StartFrame,
UserStartedSpeakingFrame,
VADUserStartedSpeakingFrame,
),
):
logger.info(f"* * frame: {frame}")
await self.push_frame(frame, direction)
#### 4.
# ALWAYS push all other frames
# ALWAYS push all frames
else:
# SUPER IMPORTANT: always push every frame!
await self.push_frame(frame, direction)
@@ -155,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
custom_frame_processor = CustomFrameProcessor()
metrics_frame_processor = MetricsFrameLogger()
messages = [
{
@@ -173,10 +128,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt,
context_aggregator.user(),
llm,
custom_frame_processor, # filter and log frames
tts,
transport.output(),
context_aggregator.assistant(),
metrics_frame_processor, # pretty print metrics frames
]
)
@@ -193,12 +148,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": "Please introduce yourself to the user and inform them that your responses illustrate use of a Custom Frame Processor.",
}
)
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")