diff --git a/CHANGELOG.md b/CHANGELOG.md index 1267263df..ff46e3430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that would cause `PipelineRunner` and `PipelineTask` to not + handle external asyncio task cancellation properly. + - Added `SpeechmaticsSTTService` exception handling on connection and sending. - Replaced `asyncio.wait_for()` for `wait_for2.wait_for()` for Python < diff --git a/README.md b/README.md index fb66a4f27..363aa1169 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ You can get started with Pipecat running on your local machine, then move your a ### Prerequisites **Minimum Python Version:** 3.10 -**Recommended Python Version:** 3.11-3.12 +**Recommended Python Version:** 3.12 ### Setup Steps diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index 019007e32..e8c325521 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -71,7 +71,10 @@ class PipelineRunner(BaseObject): logger.debug(f"Runner {self} started running {task}") self._tasks[task.name] = task params = PipelineTaskParams(loop=self._loop) - await task.run(params) + try: + await task.run(params) + except asyncio.CancelledError: + await self._cancel() del self._tasks[task.name] # Cleanup base object. @@ -95,6 +98,10 @@ class PipelineRunner(BaseObject): async def cancel(self): """Cancel all running tasks immediately.""" logger.debug(f"Cancelling runner {self}") + await self._cancel() + + async def _cancel(self): + """Cancel all running tasks immediately.""" await asyncio.gather(*[t.cancel() for t in self._tasks.values()]) def _setup_sigint(self): diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 487cf248f..e809a690b 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -370,11 +370,9 @@ class PipelineTask(BasePipelineTask): # We have already cleaned up the pipeline inside the task. cleanup_pipeline = False except asyncio.CancelledError: - # We are awaiting on the push task and it might be cancelled - # (e.g. Ctrl-C). This means we will get a CancelledError here as - # well, because you get a CancelledError in every place you are - # awaiting a task. - pass + # Raise exception back to the pipeline runner so it can cancel this + # task properly. + raise finally: await self._cancel_tasks() await self._cleanup(cleanup_pipeline) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 4b2c34828..b79d96d1e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -298,8 +298,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): identity = IdentityFilter() pipeline = Pipeline([identity]) task = PipelineTask(pipeline, idle_timeout_secs=0.2) - await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) - assert True + try: + await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) + assert False + except asyncio.CancelledError: + assert True async def test_no_idle_task(self): identity = IdentityFilter() @@ -326,8 +329,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): ), idle_timeout_secs=0.3, ) - await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) - assert True + try: + await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) + assert False + except asyncio.CancelledError: + assert True async def test_idle_task_event_handler_no_frames(self): identity = IdentityFilter() @@ -342,8 +348,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): idle_timeout = True await task.cancel() - await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) - assert idle_timeout + try: + await task.run(PipelineTaskParams(loop=asyncio.get_event_loop())) + assert False + except asyncio.CancelledError: + assert idle_timeout async def test_idle_task_event_handler_quiet_user(self): identity = IdentityFilter()