task: start TaskObserver when tasks can be created

We have to start proxy observer tasks once we know the TaskManager has an event
loop.
This commit is contained in:
Aleix Conchillo Flaqué
2025-01-30 13:46:56 -08:00
parent 0131d0a531
commit b65f32e8e1
2 changed files with 10 additions and 3 deletions

View File

@@ -173,7 +173,7 @@ class PipelineTask(BaseTask):
if self.has_finished():
return
try:
push_task = self._create_tasks()
push_task = await self._create_tasks()
await self._task_manager.wait_for_task(push_task)
except asyncio.CancelledError:
# We are awaiting on the push task and it might be cancelled
@@ -203,7 +203,7 @@ class PipelineTask(BaseTask):
for frame in frames:
await self.queue_frame(frame)
def _create_tasks(self):
async def _create_tasks(self):
self._process_up_task = self._task_manager.create_task(
self._process_up_queue(), f"{self}::_process_up_queue"
)
@@ -214,6 +214,8 @@ class PipelineTask(BaseTask):
self._process_push_queue(), f"{self}::_process_push_queue"
)
await self._observer.start()
return self._process_push_task
def _maybe_start_heartbeat_tasks(self):

View File

@@ -58,8 +58,9 @@ class TaskObserver(BaseObserver):
def __init__(self, *, observers: List[BaseObserver] = [], task_manager: TaskManager):
self._id: int = obj_id()
self._name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self._proxies: List[Proxy] = self._create_proxies(observers)
self._observers = observers
self._task_manager = task_manager
self._proxies: List[Proxy] = []
@property
def id(self) -> int:
@@ -69,6 +70,10 @@ class TaskObserver(BaseObserver):
def name(self) -> str:
return self._name
async def start(self):
"""Starts all proxy observer tasks."""
self._proxies = self._create_proxies(self._observers)
async def stop(self):
"""Stops all proxy observer tasks."""
for proxy in self._proxies: