From 8044c4170db2a31792dee09d41c2551fcd936983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 21 Aug 2025 23:05:57 -0700 Subject: [PATCH 1/2] PipelineRunner/PipelineTask: fix asyncio task cancellation --- CHANGELOG.md | 3 +++ src/pipecat/pipeline/runner.py | 9 ++++++++- src/pipecat/pipeline/task.py | 8 +++----- tests/test_pipeline.py | 21 +++++++++++++++------ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8e536d09..841203ed3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that would cause `PipelineRunner` and `PipelineTask` to not + handle external asyncio task cancellation properly. + - Added `SpeechmaticsSTTService` exception handling on connection and sending. - Replaced `asyncio.wait_for()` for `wait_for2.wait_for()` for Python < diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index 019007e32..e8c325521 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -71,7 +71,10 @@ class PipelineRunner(BaseObject): logger.debug(f"Runner {self} started running {task}") self._tasks[task.name] = task params = PipelineTaskParams(loop=self._loop) - await task.run(params) + try: + await task.run(params) + except asyncio.CancelledError: + await self._cancel() del self._tasks[task.name] # Cleanup base object. @@ -95,6 +98,10 @@ class PipelineRunner(BaseObject): async def cancel(self): """Cancel all running tasks immediately.""" logger.debug(f"Cancelling runner {self}") + await self._cancel() + + async def _cancel(self): + """Cancel all running tasks immediately.""" await asyncio.gather(*[t.cancel() for t in self._tasks.values()]) def _setup_sigint(self): diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 27e1fcd9a..45befca91 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -370,11 +370,9 @@ class PipelineTask(BasePipelineTask): # We have already cleaned up the pipeline inside the task. cleanup_pipeline = False except asyncio.CancelledError: - # We are awaiting on the push task and it might be cancelled - # (e.g. Ctrl-C). This means we will get a CancelledError here as - # well, because you get a CancelledError in every place you are - # awaiting a task. - pass + # Raise exception back to the pipeline runner so it can cancel this + # task properly. + raise finally: await self._cancel_tasks() await self._cleanup(cleanup_pipeline) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 4b2c34828..b79d96d1e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -298,8 +298,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): identity = IdentityFilter() pipeline = Pipeline([identity]) task = PipelineTask(pipeline, idle_timeout_secs=0.2) - await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) - assert True + try: + await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) + assert False + except asyncio.CancelledError: + assert True async def test_no_idle_task(self): identity = IdentityFilter() @@ -326,8 +329,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): ), idle_timeout_secs=0.3, ) - await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) - assert True + try: + await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) + assert False + except asyncio.CancelledError: + assert True async def test_idle_task_event_handler_no_frames(self): identity = IdentityFilter() @@ -342,8 +348,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): idle_timeout = True await task.cancel() - await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) - assert idle_timeout + try: + await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) + assert False + except asyncio.CancelledError: + assert idle_timeout async def test_idle_task_event_handler_quiet_user(self): identity = IdentityFilter() From a3ad31d0f6f0f31a25c7a74aa1c19f93024d3f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 21 Aug 2025 23:23:35 -0700 Subject: [PATCH 2/2] README: recommended python version is 3.12 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fb66a4f27..363aa1169 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ You can get started with Pipecat running on your local machine, then move your a ### Prerequisites **Minimum Python Version:** 3.10 -**Recommended Python Version:** 3.11-3.12 +**Recommended Python Version:** 3.12 ### Setup Steps