From be8ea818c877a2c4319c65749ae719ff876b895b Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Feb 2026 21:34:01 -0500 Subject: [PATCH] Add on_summary_applied event for observability Emits a SummaryAppliedEvent after context summarization completes, providing message counts so applications can track compression metrics. --- .../aggregators/llm_context_summarizer.py | 47 ++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/src/pipecat/processors/aggregators/llm_context_summarizer.py b/src/pipecat/processors/aggregators/llm_context_summarizer.py index 2618b558b..85da9bfa0 100644 --- a/src/pipecat/processors/aggregators/llm_context_summarizer.py +++ b/src/pipecat/processors/aggregators/llm_context_summarizer.py @@ -7,6 +7,7 @@ """This module defines a summarizer for managing LLM context summarization.""" import uuid +from dataclasses import dataclass from typing import Optional from loguru import logger @@ -27,6 +28,25 @@ from pipecat.utils.context.llm_context_summarization import ( ) +@dataclass +class SummaryAppliedEvent: + """Event data emitted when context summarization completes successfully. + + Parameters: + original_message_count: Number of messages before summarization. + new_message_count: Number of messages after summarization. + summarized_message_count: Number of messages that were compressed + into the summary. + preserved_message_count: Number of recent messages preserved + uncompressed. + """ + + original_message_count: int + new_message_count: int + summarized_message_count: int + preserved_message_count: int + + class LLMContextSummarizer(BaseObject): """Summarizer for managing LLM context summarization. @@ -39,6 +59,10 @@ class LLMContextSummarizer(BaseObject): - on_request_summarization: Emitted when summarization should be triggered. The aggregator should broadcast this frame to the LLM service. + - on_summary_applied: Emitted after a summary has been successfully applied + to the context. Receives a SummaryAppliedEvent with metrics about the + compression. + Example:: @summarizer.event_handler("on_request_summarization") @@ -49,6 +73,10 @@ class LLMContextSummarizer(BaseObject): context=frame.context, ... ) + + @summarizer.event_handler("on_summary_applied") + async def on_summary_applied(summarizer, event: SummaryAppliedEvent): + logger.info(f"Compressed {event.original_message_count} -> {event.new_message_count} messages") """ def __init__( @@ -74,6 +102,7 @@ class LLMContextSummarizer(BaseObject): self._pending_summary_request_id: Optional[str] = None self._register_event_handler("on_request_summarization", sync=True) + self._register_event_handler("on_summary_applied") @property def task_manager(self) -> BaseTaskManager: @@ -320,9 +349,23 @@ class LLMContextSummarizer(BaseObject): new_messages.extend(recent_messages) # Update context + original_message_count = len(messages) + num_system_preserved = 1 if first_system_msg else 0 self._context.set_messages(new_messages) + # Messages actually summarized = index range minus the preserved system message + summarized_count = last_summarized_index + 1 - num_system_preserved + logger.info( - f"{self}: Applied context summary, compressed {last_summarized_index + 1} messages " - f"into summary. Context now has {len(new_messages)} messages (was {len(messages)})" + f"{self}: Applied context summary, compressed {summarized_count} messages " + f"into summary. Context now has {len(new_messages)} messages (was {original_message_count})" ) + + # Emit event for observability + event = SummaryAppliedEvent( + original_message_count=original_message_count, + new_message_count=len(new_messages), + summarized_message_count=summarized_count, + preserved_message_count=len(recent_messages) + num_system_preserved, + ) + await self._call_event_handler("on_summary_applied", event)