FrameProcessor: fix push_interruption_task_frame_and_wait()

This commit is contained in:
Aleix Conchillo Flaqué
2025-09-16 14:47:12 -07:00
parent 615239b7d2
commit db5bcfaa51
2 changed files with 12 additions and 2 deletions

View File

@@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Update `livekit` to 1.0.13.
- Updated `livekit` to 1.0.13.
- `torch` and `torchaudio` are no longer required for running Smart Turn
locally. This avoids gigabytes of dependencies being installed.
@@ -44,6 +44,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue that would cause `push_interruption_task_frame_and_wait()` to
not wait if a previous interruption had already happened.
- Fixed a couple of bugs in `ServiceSwitcher`:
- Using multiple `ServiceSwitcher`s in a pipeline would result in an error.
- `ServiceSwitcherFrame`s (such as `ManuallySwitchServiceFrame`s) were having

View File

@@ -220,6 +220,11 @@ class FrameProcessor(BaseObject):
self.__process_event: Optional[asyncio.Event] = None
self.__process_frame_task: Optional[asyncio.Task] = None
# To interrupt a pipeline, we push an `InterruptionTaskFrame` upstream.
# Then we wait for the corresponding `InterruptionFrame` to travel from
# the start of the pipeline back to the processor that sent the
# `InterruptionTaskFrame`. This wait is handled using the following
# event.
self._wait_for_interruption = False
self._wait_interruption_event = asyncio.Event()
@@ -632,7 +637,9 @@ class FrameProcessor(BaseObject):
await self.__internal_push_frame(frame, direction)
if isinstance(frame, InterruptionFrame):
# If we are waiting for an interruption and we get an interruption, then
# we can unblock `push_interruption_task_frame_and_wait()`.
if self._wait_for_interruption and isinstance(frame, InterruptionFrame):
self._wait_interruption_event.set()
async def push_interruption_task_frame_and_wait(self):