diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a5feb0b..b3404d1ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Update `livekit` to 1.0.13. +- Updated `livekit` to 1.0.13. - `torch` and `torchaudio` are no longer required for running Smart Turn locally. This avoids gigabytes of dependencies being installed. @@ -44,6 +44,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that would cause `push_interruption_task_frame_and_wait()` to + not wait if a previous interruption had already happened. + - Fixed a couple of bugs in `ServiceSwitcher`: - Using multiple `ServiceSwitcher`s in a pipeline would result in an error. - `ServiceSwitcherFrame`s (such as `ManuallySwitchServiceFrame`s) were having diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 76400fc22..85ce18548 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -220,6 +220,11 @@ class FrameProcessor(BaseObject): self.__process_event: Optional[asyncio.Event] = None self.__process_frame_task: Optional[asyncio.Task] = None + # To interrupt a pipeline, we push an `InterruptionTaskFrame` upstream. + # Then we wait for the corresponding `InterruptionFrame` to travel from + # the start of the pipeline back to the processor that sent the + # `InterruptionTaskFrame`. This wait is handled using the following + # event. self._wait_for_interruption = False self._wait_interruption_event = asyncio.Event() @@ -632,7 +637,9 @@ class FrameProcessor(BaseObject): await self.__internal_push_frame(frame, direction) - if isinstance(frame, InterruptionFrame): + # If we are waiting for an interruption and we get an interruption, then + # we can unblock `push_interruption_task_frame_and_wait()`. + if self._wait_for_interruption and isinstance(frame, InterruptionFrame): self._wait_interruption_event.set() async def push_interruption_task_frame_and_wait(self):