From f3b50bc3c4e243f2c39e39a51facc345e68259fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 24 Mar 2025 15:40:26 -0700 Subject: [PATCH] Revert "LLMAssistantContextAggregator: create a task to run on_context_updated" This reverts commit 397bae29f70f8851ed528610edd465be577dceab. --- CHANGELOG.md | 3 --- .../processors/aggregators/llm_response.py | 19 ++++--------------- src/pipecat/services/ai_services.py | 2 +- 3 files changed, 5 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7c8c5dd8..1c06240bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,9 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Fixed an issue that would cause `LLMAssistantContextAggregator` to block - processing more frames while processing a function call result. - - Fixed an issue where the `RTVIObserver` would report two bot started and stopped speaking events for each bot turn. diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 94be8b25b..75435a214 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -6,7 +6,7 @@ import asyncio from abc import abstractmethod -from typing import Dict, List, Set +from typing import Dict, List from loguru import logger @@ -380,7 +380,6 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): self._started = 0 self._function_calls_in_progress: Dict[str, FunctionCallInProgressFrame] = {} - self._context_updated_tasks: Set[asyncio.Task] = () async def handle_aggregation(self, aggregation: str): self._context.add_message({"role": "assistant", "content": aggregation}) @@ -487,13 +486,10 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): if run_llm: await self.push_context_frame(FrameDirection.UPSTREAM) - # Call the `on_context_updated` callback once the function call result - # is added to the context. Also, run this in a separate task to make - # sure we don't block the pipeline. + # Emit the on_context_updated callback once the function call + # result is added to the context if properties and properties.on_context_updated: - task = await self.create_task(properties.on_context_updated()) - self._context_updated_tasks.add(task) - task.add_done_callback(self._context_updated_task_finished) + await properties.on_context_updated() async def _handle_function_call_cancel(self, frame: FunctionCallCancelFrame): logger.debug( @@ -539,13 +535,6 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): else: self._aggregation += frame.text - def _context_updated_task_finished(self, task: asyncio.Task): - self._context_updated_tasks.discard(task) - # The task is finished so this should exit immediately. We need to do - # this because otherwise the task manager would report a dangling task - # if we don't remove it. - asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop()) - class LLMUserResponseAggregator(LLMUserContextAggregator): def __init__(self, messages: List[dict] = [], **kwargs): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index a78c268dd..9f9804e65 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -369,7 +369,7 @@ class LLMService(AIService): if tuple_to_remove: self._function_call_tasks.discard(tuple_to_remove) # The task is finished so this should exit immediately. We need to - # do this because otherwise the task manager would report a dangling + # do this because otherwise the task manager would have a dangling # task if we don't remove it. asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop())