Merge pull request #3947 from pipecat-ai/aleix/summary-applied-event

Expose on_summary_applied event on LLMAssistantAggregator
This commit is contained in:
Aleix Conchillo Flaqué
2026-03-08 19:05:50 -07:00
committed by GitHub
5 changed files with 49 additions and 34 deletions

1
changelog/3947.added.md Normal file
View File

@@ -0,0 +1 @@
- Exposed `on_summary_applied` event on `LLMAssistantAggregator`, allowing users to listen for context summarization events without accessing private members.

View File

@@ -138,17 +138,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
# Listen for summarization events
summarizer = assistant_aggregator._summarizer
if summarizer:
@summarizer.event_handler("on_summary_applied")
async def on_summary_applied(summarizer, event: SummaryAppliedEvent):
logger.info(
f"Context summarized: {event.original_message_count} messages -> "
f"{event.new_message_count} messages "
f"({event.summarized_message_count} summarized, "
f"{event.preserved_message_count} preserved)"
)
@assistant_aggregator.event_handler("on_summary_applied")
async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent):
logger.info(
f"Context summarized: {event.original_message_count} messages -> "
f"{event.new_message_count} messages "
f"({event.summarized_message_count} summarized, "
f"{event.preserved_message_count} preserved)"
)
pipeline = Pipeline(
[

View File

@@ -138,17 +138,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
# Listen for summarization events
summarizer = assistant_aggregator._summarizer
if summarizer:
@summarizer.event_handler("on_summary_applied")
async def on_summary_applied(summarizer, event: SummaryAppliedEvent):
logger.info(
f"Context summarized: {event.original_message_count} messages -> "
f"{event.new_message_count} messages "
f"({event.summarized_message_count} summarized, "
f"{event.preserved_message_count} preserved)"
)
@assistant_aggregator.event_handler("on_summary_applied")
async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent):
logger.info(
f"Context summarized: {event.original_message_count} messages -> "
f"{event.new_message_count} messages "
f"({event.summarized_message_count} summarized, "
f"{event.preserved_message_count} preserved)"
)
pipeline = Pipeline(
[

View File

@@ -177,17 +177,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
# Listen for summarization events
summarizer = assistant_aggregator._summarizer
if summarizer:
@summarizer.event_handler("on_summary_applied")
async def on_summary_applied(summarizer, event: SummaryAppliedEvent):
logger.info(
f"Context summarized: {event.original_message_count} messages -> "
f"{event.new_message_count} messages "
f"({event.summarized_message_count} summarized, "
f"{event.preserved_message_count} preserved)"
)
@assistant_aggregator.event_handler("on_summary_applied")
async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent):
logger.info(
f"Context summarized: {event.original_message_count} messages -> "
f"{event.new_message_count} messages "
f"({event.summarized_message_count} summarized, "
f"{event.preserved_message_count} preserved)"
)
pipeline = Pipeline(
[

View File

@@ -70,7 +70,10 @@ from pipecat.processors.aggregators.llm_context import (
LLMSpecificMessage,
NotGiven,
)
from pipecat.processors.aggregators.llm_context_summarizer import LLMContextSummarizer
from pipecat.processors.aggregators.llm_context_summarizer import (
LLMContextSummarizer,
SummaryAppliedEvent,
)
from pipecat.processors.frame_processor import FrameCallback, FrameDirection, FrameProcessor
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
@@ -796,6 +799,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
- on_assistant_turn_started: Called when the assistant turn starts
- on_assistant_turn_stopped: Called when the assistant turn ends
- on_assistant_thought: Called when an assistant thought is available
- on_summary_applied: Called when a context summarization is applied
Example::
@@ -811,6 +815,10 @@ class LLMAssistantAggregator(LLMContextAggregator):
async def on_assistant_thought(aggregator, message: AssistantThoughtMessage):
...
@aggregator.event_handler("on_summary_applied")
async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent):
...
"""
def __init__(
@@ -873,10 +881,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
self._summarizer.add_event_handler(
"on_request_summarization", self._on_request_summarization
)
self._summarizer.add_event_handler("on_summary_applied", self._on_summary_applied)
self._register_event_handler("on_assistant_turn_started")
self._register_event_handler("on_assistant_turn_stopped")
self._register_event_handler("on_assistant_thought")
self._register_event_handler("on_summary_applied")
@property
def has_function_calls_in_progress(self) -> bool:
@@ -1294,6 +1304,19 @@ class LLMAssistantAggregator(LLMContextAggregator):
"""
await self.push_frame(frame, FrameDirection.UPSTREAM)
async def _on_summary_applied(
self, summarizer: LLMContextSummarizer, event: SummaryAppliedEvent
):
"""Handle summary applied event from the summarizer.
Forwards the event to any registered `on_summary_applied` handlers.
Args:
summarizer: The summarizer that applied the summary.
event: The summary applied event.
"""
await self._call_event_handler("on_summary_applied", summarizer, event)
class LLMContextAggregatorPair:
"""Pair of LLM context aggregators for updating context with user and assistant messages."""