Revert "LLMAssistantContextAggregator: create a task to run on_context_updated"

This reverts commit 397bae29f7.
This commit is contained in:
Aleix Conchillo Flaqué
2025-03-24 15:40:26 -07:00
parent 397bae29f7
commit f3b50bc3c4
3 changed files with 5 additions and 19 deletions

View File

@@ -13,9 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue that would cause `LLMAssistantContextAggregator` to block
processing more frames while processing a function call result.
- Fixed an issue where the `RTVIObserver` would report two bot started and
stopped speaking events for each bot turn.

View File

@@ -6,7 +6,7 @@
import asyncio
from abc import abstractmethod
from typing import Dict, List, Set
from typing import Dict, List
from loguru import logger
@@ -380,7 +380,6 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
self._started = 0
self._function_calls_in_progress: Dict[str, FunctionCallInProgressFrame] = {}
self._context_updated_tasks: Set[asyncio.Task] = ()
async def handle_aggregation(self, aggregation: str):
self._context.add_message({"role": "assistant", "content": aggregation})
@@ -487,13 +486,10 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
if run_llm:
await self.push_context_frame(FrameDirection.UPSTREAM)
# Call the `on_context_updated` callback once the function call result
# is added to the context. Also, run this in a separate task to make
# sure we don't block the pipeline.
# Emit the on_context_updated callback once the function call
# result is added to the context
if properties and properties.on_context_updated:
task = await self.create_task(properties.on_context_updated())
self._context_updated_tasks.add(task)
task.add_done_callback(self._context_updated_task_finished)
await properties.on_context_updated()
async def _handle_function_call_cancel(self, frame: FunctionCallCancelFrame):
logger.debug(
@@ -539,13 +535,6 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
else:
self._aggregation += frame.text
def _context_updated_task_finished(self, task: asyncio.Task):
self._context_updated_tasks.discard(task)
# The task is finished so this should exit immediately. We need to do
# this because otherwise the task manager would report a dangling task
# if we don't remove it.
asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop())
class LLMUserResponseAggregator(LLMUserContextAggregator):
def __init__(self, messages: List[dict] = [], **kwargs):

View File

@@ -369,7 +369,7 @@ class LLMService(AIService):
if tuple_to_remove:
self._function_call_tasks.discard(tuple_to_remove)
# The task is finished so this should exit immediately. We need to
# do this because otherwise the task manager would report a dangling
# do this because otherwise the task manager would have a dangling
# task if we don't remove it.
asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop())