From 1f8cc3d216b75d582f2ddc04b2ea968dc49d8b38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 6 Mar 2026 20:55:32 -0800 Subject: [PATCH 1/2] Expose on_summary_applied event on LLMAssistantAggregator Forward the on_summary_applied event from the internal summarizer to the aggregator so users can listen for it without accessing private members. Update summarization examples to use the new public event. --- .../54-context-summarization-openai.py | 19 ++++++-------- .../54a-context-summarization-google.py | 19 ++++++-------- ...54c-context-summarization-dedicated-llm.py | 19 ++++++-------- .../aggregators/llm_response_universal.py | 25 ++++++++++++++++++- 4 files changed, 48 insertions(+), 34 deletions(-) diff --git a/examples/foundational/54-context-summarization-openai.py b/examples/foundational/54-context-summarization-openai.py index 9c3f75fa5..4c3e10359 100644 --- a/examples/foundational/54-context-summarization-openai.py +++ b/examples/foundational/54-context-summarization-openai.py @@ -138,17 +138,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) # Listen for summarization events - summarizer = assistant_aggregator._summarizer - if summarizer: - - @summarizer.event_handler("on_summary_applied") - async def on_summary_applied(summarizer, event: SummaryAppliedEvent): - logger.info( - f"Context summarized: {event.original_message_count} messages -> " - f"{event.new_message_count} messages " - f"({event.summarized_message_count} summarized, " - f"{event.preserved_message_count} preserved)" - ) + @assistant_aggregator.event_handler("on_summary_applied") + async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent): + logger.info( + f"Context summarized: {event.original_message_count} messages -> " + f"{event.new_message_count} messages " + f"({event.summarized_message_count} summarized, " + f"{event.preserved_message_count} preserved)" + ) pipeline = Pipeline( [ diff --git a/examples/foundational/54a-context-summarization-google.py b/examples/foundational/54a-context-summarization-google.py index b9ce3d7c7..531708748 100644 --- a/examples/foundational/54a-context-summarization-google.py +++ b/examples/foundational/54a-context-summarization-google.py @@ -138,17 +138,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) # Listen for summarization events - summarizer = assistant_aggregator._summarizer - if summarizer: - - @summarizer.event_handler("on_summary_applied") - async def on_summary_applied(summarizer, event: SummaryAppliedEvent): - logger.info( - f"Context summarized: {event.original_message_count} messages -> " - f"{event.new_message_count} messages " - f"({event.summarized_message_count} summarized, " - f"{event.preserved_message_count} preserved)" - ) + @assistant_aggregator.event_handler("on_summary_applied") + async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent): + logger.info( + f"Context summarized: {event.original_message_count} messages -> " + f"{event.new_message_count} messages " + f"({event.summarized_message_count} summarized, " + f"{event.preserved_message_count} preserved)" + ) pipeline = Pipeline( [ diff --git a/examples/foundational/54c-context-summarization-dedicated-llm.py b/examples/foundational/54c-context-summarization-dedicated-llm.py index c3c4a0c2e..01b4e0862 100644 --- a/examples/foundational/54c-context-summarization-dedicated-llm.py +++ b/examples/foundational/54c-context-summarization-dedicated-llm.py @@ -177,17 +177,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) # Listen for summarization events - summarizer = assistant_aggregator._summarizer - if summarizer: - - @summarizer.event_handler("on_summary_applied") - async def on_summary_applied(summarizer, event: SummaryAppliedEvent): - logger.info( - f"Context summarized: {event.original_message_count} messages -> " - f"{event.new_message_count} messages " - f"({event.summarized_message_count} summarized, " - f"{event.preserved_message_count} preserved)" - ) + @assistant_aggregator.event_handler("on_summary_applied") + async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent): + logger.info( + f"Context summarized: {event.original_message_count} messages -> " + f"{event.new_message_count} messages " + f"({event.summarized_message_count} summarized, " + f"{event.preserved_message_count} preserved)" + ) pipeline = Pipeline( [ diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index cf6c81e5f..ca25650b4 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -70,7 +70,10 @@ from pipecat.processors.aggregators.llm_context import ( LLMSpecificMessage, NotGiven, ) -from pipecat.processors.aggregators.llm_context_summarizer import LLMContextSummarizer +from pipecat.processors.aggregators.llm_context_summarizer import ( + LLMContextSummarizer, + SummaryAppliedEvent, +) from pipecat.processors.frame_processor import FrameCallback, FrameDirection, FrameProcessor from pipecat.turns.user_idle_controller import UserIdleController from pipecat.turns.user_mute import BaseUserMuteStrategy @@ -796,6 +799,7 @@ class LLMAssistantAggregator(LLMContextAggregator): - on_assistant_turn_started: Called when the assistant turn starts - on_assistant_turn_stopped: Called when the assistant turn ends - on_assistant_thought: Called when an assistant thought is available + - on_summary_applied: Called when a context summarization is applied Example:: @@ -811,6 +815,10 @@ class LLMAssistantAggregator(LLMContextAggregator): async def on_assistant_thought(aggregator, message: AssistantThoughtMessage): ... + @aggregator.event_handler("on_summary_applied") + async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent): + ... + """ def __init__( @@ -873,10 +881,12 @@ class LLMAssistantAggregator(LLMContextAggregator): self._summarizer.add_event_handler( "on_request_summarization", self._on_request_summarization ) + self._summarizer.add_event_handler("on_summary_applied", self._on_summary_applied) self._register_event_handler("on_assistant_turn_started") self._register_event_handler("on_assistant_turn_stopped") self._register_event_handler("on_assistant_thought") + self._register_event_handler("on_summary_applied") @property def has_function_calls_in_progress(self) -> bool: @@ -1294,6 +1304,19 @@ class LLMAssistantAggregator(LLMContextAggregator): """ await self.push_frame(frame, FrameDirection.UPSTREAM) + async def _on_summary_applied( + self, summarizer: LLMContextSummarizer, event: SummaryAppliedEvent + ): + """Handle summary applied event from the summarizer. + + Forwards the event to any registered `on_summary_applied` handlers. + + Args: + summarizer: The summarizer that applied the summary. + event: The summary applied event. + """ + await self._call_event_handler("on_summary_applied", summarizer, event) + class LLMContextAggregatorPair: """Pair of LLM context aggregators for updating context with user and assistant messages.""" From 3b947b7844a110f09260d1f3b7185de3037b32e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 6 Mar 2026 20:56:02 -0800 Subject: [PATCH 2/2] Add changelog for #3947 --- changelog/3947.added.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/3947.added.md diff --git a/changelog/3947.added.md b/changelog/3947.added.md new file mode 100644 index 000000000..175cb87bd --- /dev/null +++ b/changelog/3947.added.md @@ -0,0 +1 @@ +- Exposed `on_summary_applied` event on `LLMAssistantAggregator`, allowing users to listen for context summarization events without accessing private members.