diff --git a/changelog/3503.fixed.md b/changelog/3503.fixed.md new file mode 100644 index 000000000..4218f8781 --- /dev/null +++ b/changelog/3503.fixed.md @@ -0,0 +1 @@ +- Fixed an issue in `AIService` where unhandled exceptions in `start()`, `stop()`, or `cancel()` implementations would prevent `process_frame()` to continue and therefore `StartFrame`, `EndFrame`, or `CancelFrame` from being pushed downstream, causing the pipeline to not start or stop properly. diff --git a/src/pipecat/services/ai_service.py b/src/pipecat/services/ai_service.py index a9952fa00..c03ab9d0e 100644 --- a/src/pipecat/services/ai_service.py +++ b/src/pipecat/services/ai_service.py @@ -148,11 +148,11 @@ class AIService(FrameProcessor): await super().process_frame(frame, direction) if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) + await self._start(frame) elif isinstance(frame, EndFrame): - await self.stop(frame) + await self._stop(frame) + elif isinstance(frame, CancelFrame): + await self._cancel(frame) async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): """Process frames from an async generator. @@ -169,3 +169,21 @@ class AIService(FrameProcessor): await self.push_error_frame(f) else: await self.push_frame(f) + + async def _start(self, frame: StartFrame): + try: + await self.start(frame) + except Exception as e: + logger.error(f"{self}: exception processing {frame}: {e}") + + async def _stop(self, frame: EndFrame): + try: + await self.stop(frame) + except Exception as e: + logger.error(f"{self}: exception processing {frame}: {e}") + + async def _cancel(self, frame: CancelFrame): + try: + await self.cancel(frame) + except Exception as e: + logger.error(f"{self}: exception processing {frame}: {e}")