Add on_summary_applied event for observability

Emits a SummaryAppliedEvent after context summarization completes,
  providing message counts so applications can track compression
  metrics.
This commit is contained in:
Mark Backman
2026-02-26 21:34:01 -05:00
parent 50710e9c3f
commit be8ea818c8

View File

@@ -7,6 +7,7 @@
"""This module defines a summarizer for managing LLM context summarization."""
import uuid
from dataclasses import dataclass
from typing import Optional
from loguru import logger
@@ -27,6 +28,25 @@ from pipecat.utils.context.llm_context_summarization import (
)
@dataclass
class SummaryAppliedEvent:
"""Event data emitted when context summarization completes successfully.
Parameters:
original_message_count: Number of messages before summarization.
new_message_count: Number of messages after summarization.
summarized_message_count: Number of messages that were compressed
into the summary.
preserved_message_count: Number of recent messages preserved
uncompressed.
"""
original_message_count: int
new_message_count: int
summarized_message_count: int
preserved_message_count: int
class LLMContextSummarizer(BaseObject):
"""Summarizer for managing LLM context summarization.
@@ -39,6 +59,10 @@ class LLMContextSummarizer(BaseObject):
- on_request_summarization: Emitted when summarization should be triggered.
The aggregator should broadcast this frame to the LLM service.
- on_summary_applied: Emitted after a summary has been successfully applied
to the context. Receives a SummaryAppliedEvent with metrics about the
compression.
Example::
@summarizer.event_handler("on_request_summarization")
@@ -49,6 +73,10 @@ class LLMContextSummarizer(BaseObject):
context=frame.context,
...
)
@summarizer.event_handler("on_summary_applied")
async def on_summary_applied(summarizer, event: SummaryAppliedEvent):
logger.info(f"Compressed {event.original_message_count} -> {event.new_message_count} messages")
"""
def __init__(
@@ -74,6 +102,7 @@ class LLMContextSummarizer(BaseObject):
self._pending_summary_request_id: Optional[str] = None
self._register_event_handler("on_request_summarization", sync=True)
self._register_event_handler("on_summary_applied")
@property
def task_manager(self) -> BaseTaskManager:
@@ -320,9 +349,23 @@ class LLMContextSummarizer(BaseObject):
new_messages.extend(recent_messages)
# Update context
original_message_count = len(messages)
num_system_preserved = 1 if first_system_msg else 0
self._context.set_messages(new_messages)
# Messages actually summarized = index range minus the preserved system message
summarized_count = last_summarized_index + 1 - num_system_preserved
logger.info(
f"{self}: Applied context summary, compressed {last_summarized_index + 1} messages "
f"into summary. Context now has {len(new_messages)} messages (was {len(messages)})"
f"{self}: Applied context summary, compressed {summarized_count} messages "
f"into summary. Context now has {len(new_messages)} messages (was {original_message_count})"
)
# Emit event for observability
event = SummaryAppliedEvent(
original_message_count=original_message_count,
new_message_count=len(new_messages),
summarized_message_count=summarized_count,
preserved_message_count=len(recent_messages) + num_system_preserved,
)
await self._call_event_handler("on_summary_applied", event)