Merge pull request #2493 from pipecat-ai/aleix/runner-task-asyncio-cancellation
PipelineRunner/PipelineTask: fix asyncio task cancellation
This commit is contained in:
@@ -65,6 +65,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue that would cause `PipelineRunner` and `PipelineTask` to not
|
||||
handle external asyncio task cancellation properly.
|
||||
|
||||
- Added `SpeechmaticsSTTService` exception handling on connection and sending.
|
||||
|
||||
- Replaced `asyncio.wait_for()` for `wait_for2.wait_for()` for Python <
|
||||
|
||||
@@ -115,7 +115,7 @@ You can get started with Pipecat running on your local machine, then move your a
|
||||
### Prerequisites
|
||||
|
||||
**Minimum Python Version:** 3.10
|
||||
**Recommended Python Version:** 3.11-3.12
|
||||
**Recommended Python Version:** 3.12
|
||||
|
||||
### Setup Steps
|
||||
|
||||
|
||||
@@ -71,7 +71,10 @@ class PipelineRunner(BaseObject):
|
||||
logger.debug(f"Runner {self} started running {task}")
|
||||
self._tasks[task.name] = task
|
||||
params = PipelineTaskParams(loop=self._loop)
|
||||
await task.run(params)
|
||||
try:
|
||||
await task.run(params)
|
||||
except asyncio.CancelledError:
|
||||
await self._cancel()
|
||||
del self._tasks[task.name]
|
||||
|
||||
# Cleanup base object.
|
||||
@@ -95,6 +98,10 @@ class PipelineRunner(BaseObject):
|
||||
async def cancel(self):
|
||||
"""Cancel all running tasks immediately."""
|
||||
logger.debug(f"Cancelling runner {self}")
|
||||
await self._cancel()
|
||||
|
||||
async def _cancel(self):
|
||||
"""Cancel all running tasks immediately."""
|
||||
await asyncio.gather(*[t.cancel() for t in self._tasks.values()])
|
||||
|
||||
def _setup_sigint(self):
|
||||
|
||||
@@ -370,11 +370,9 @@ class PipelineTask(BasePipelineTask):
|
||||
# We have already cleaned up the pipeline inside the task.
|
||||
cleanup_pipeline = False
|
||||
except asyncio.CancelledError:
|
||||
# We are awaiting on the push task and it might be cancelled
|
||||
# (e.g. Ctrl-C). This means we will get a CancelledError here as
|
||||
# well, because you get a CancelledError in every place you are
|
||||
# awaiting a task.
|
||||
pass
|
||||
# Raise exception back to the pipeline runner so it can cancel this
|
||||
# task properly.
|
||||
raise
|
||||
finally:
|
||||
await self._cancel_tasks()
|
||||
await self._cleanup(cleanup_pipeline)
|
||||
|
||||
@@ -298,8 +298,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
identity = IdentityFilter()
|
||||
pipeline = Pipeline([identity])
|
||||
task = PipelineTask(pipeline, idle_timeout_secs=0.2)
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert True
|
||||
try:
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert False
|
||||
except asyncio.CancelledError:
|
||||
assert True
|
||||
|
||||
async def test_no_idle_task(self):
|
||||
identity = IdentityFilter()
|
||||
@@ -326,8 +329,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
),
|
||||
idle_timeout_secs=0.3,
|
||||
)
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert True
|
||||
try:
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert False
|
||||
except asyncio.CancelledError:
|
||||
assert True
|
||||
|
||||
async def test_idle_task_event_handler_no_frames(self):
|
||||
identity = IdentityFilter()
|
||||
@@ -342,8 +348,11 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
idle_timeout = True
|
||||
await task.cancel()
|
||||
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert idle_timeout
|
||||
try:
|
||||
await task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))
|
||||
assert False
|
||||
except asyncio.CancelledError:
|
||||
assert idle_timeout
|
||||
|
||||
async def test_idle_task_event_handler_quiet_user(self):
|
||||
identity = IdentityFilter()
|
||||
|
||||
Reference in New Issue
Block a user