From 6cf0d53d0047d75eb164fa22564d8d0c84bfa2d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 19 Jan 2026 20:22:53 -0800 Subject: [PATCH] AIService: handle StartFrame/EndFrame/CancelFrame exceptions If AIService subclasses implement start()/stop()/cancel() and exception are not handled, execution will not continue and therefore the originator frames will not be pushed. This would cause the pipeline to not be started (i.e. StartFrame would not be pushed downstream) or stopped properly. --- changelog/3503.fixed.md | 1 + src/pipecat/services/ai_service.py | 26 ++++++++++++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) create mode 100644 changelog/3503.fixed.md diff --git a/changelog/3503.fixed.md b/changelog/3503.fixed.md new file mode 100644 index 000000000..4218f8781 --- /dev/null +++ b/changelog/3503.fixed.md @@ -0,0 +1 @@ +- Fixed an issue in `AIService` where unhandled exceptions in `start()`, `stop()`, or `cancel()` implementations would prevent `process_frame()` to continue and therefore `StartFrame`, `EndFrame`, or `CancelFrame` from being pushed downstream, causing the pipeline to not start or stop properly. diff --git a/src/pipecat/services/ai_service.py b/src/pipecat/services/ai_service.py index a9952fa00..c03ab9d0e 100644 --- a/src/pipecat/services/ai_service.py +++ b/src/pipecat/services/ai_service.py @@ -148,11 +148,11 @@ class AIService(FrameProcessor): await super().process_frame(frame, direction) if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) + await self._start(frame) elif isinstance(frame, EndFrame): - await self.stop(frame) + await self._stop(frame) + elif isinstance(frame, CancelFrame): + await self._cancel(frame) async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): """Process frames from an async generator. @@ -169,3 +169,21 @@ class AIService(FrameProcessor): await self.push_error_frame(f) else: await self.push_frame(f) + + async def _start(self, frame: StartFrame): + try: + await self.start(frame) + except Exception as e: + logger.error(f"{self}: exception processing {frame}: {e}") + + async def _stop(self, frame: EndFrame): + try: + await self.stop(frame) + except Exception as e: + logger.error(f"{self}: exception processing {frame}: {e}") + + async def _cancel(self, frame: CancelFrame): + try: + await self.cancel(frame) + except Exception as e: + logger.error(f"{self}: exception processing {frame}: {e}")