AIService: handle StartFrame/EndFrame/CancelFrame exceptions
If AIService subclasses implement start()/stop()/cancel() and exception are not handled, execution will not continue and therefore the originator frames will not be pushed. This would cause the pipeline to not be started (i.e. StartFrame would not be pushed downstream) or stopped properly.
This commit is contained in:
1
changelog/3503.fixed.md
Normal file
1
changelog/3503.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed an issue in `AIService` where unhandled exceptions in `start()`, `stop()`, or `cancel()` implementations would prevent `process_frame()` to continue and therefore `StartFrame`, `EndFrame`, or `CancelFrame` from being pushed downstream, causing the pipeline to not start or stop properly.
|
||||
@@ -148,11 +148,11 @@ class AIService(FrameProcessor):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
await self.start(frame)
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await self.cancel(frame)
|
||||
await self._start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self.stop(frame)
|
||||
await self._stop(frame)
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await self._cancel(frame)
|
||||
|
||||
async def process_generator(self, generator: AsyncGenerator[Frame | None, None]):
|
||||
"""Process frames from an async generator.
|
||||
@@ -169,3 +169,21 @@ class AIService(FrameProcessor):
|
||||
await self.push_error_frame(f)
|
||||
else:
|
||||
await self.push_frame(f)
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
try:
|
||||
await self.start(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"{self}: exception processing {frame}: {e}")
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
try:
|
||||
await self.stop(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"{self}: exception processing {frame}: {e}")
|
||||
|
||||
async def _cancel(self, frame: CancelFrame):
|
||||
try:
|
||||
await self.cancel(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"{self}: exception processing {frame}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user