From 5bb64098e7c35f3c9ec2736148b331937a6ed278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 24 Jul 2024 14:33:09 -0700 Subject: [PATCH 1/2] transports(inputs): don't queue incoming system frames --- CHANGELOG.md | 7 +++++++ src/pipecat/transports/base_input.py | 11 +++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 902426bff..3ef4a75c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Fixed + +- Fixed a `BaseInputTransport` issue that was causing incoming system frames to + be queued instead of being pushed immediately. + ## [0.0.39] - 2024-07-23 ### Fixed diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index c0a13d1ce..9f2683e7b 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -18,6 +18,7 @@ from pipecat.frames.frames import ( Frame, StartInterruptionFrame, StopInterruptionFrame, + SystemFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) from pipecat.transports.base_transport import TransportParams @@ -69,18 +70,24 @@ class BaseInputTransport(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) + # Specific system frames if isinstance(frame, CancelFrame): await self.stop() # We don't queue a CancelFrame since we want to stop ASAP. await self.push_frame(frame, direction) + elif isinstance(frame, BotInterruptionFrame): + await self._handle_interruptions(frame, False) + # All other system frames + elif isinstance(frame, SystemFrame): + await self.push_frame(frame, direction) + # Control frames elif isinstance(frame, StartFrame): await self.start(frame) await self._internal_push_frame(frame, direction) elif isinstance(frame, EndFrame): await self._internal_push_frame(frame, direction) await self.stop() - elif isinstance(frame, BotInterruptionFrame): - await self._handle_interruptions(frame, False) + # Other frames else: await self._internal_push_frame(frame, direction) From 0fd2fca2311eefcd87ad1874ddde633680573aa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 24 Jul 2024 14:42:59 -0700 Subject: [PATCH 2/2] frames: StartFrame is now a control frame --- CHANGELOG.md | 4 ++++ src/pipecat/frames/frames.py | 16 ++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ef4a75c3..b5567afbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `StartFrame` is now a control frame similar to `EndFrame`. + ### Fixed - Fixed a `BaseInputTransport` issue that was causing incoming system frames to diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 46e2408bc..f1b0fc7b1 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -212,14 +212,6 @@ class SystemFrame(Frame): pass -@dataclass -class StartFrame(SystemFrame): - """This is the first frame that should be pushed down a pipeline.""" - allow_interruptions: bool = False - enable_metrics: bool = False - report_only_initial_ttfb: bool = False - - @dataclass class CancelFrame(SystemFrame): """Indicates that a pipeline needs to stop right away.""" @@ -306,6 +298,14 @@ class ControlFrame(Frame): pass +@dataclass +class StartFrame(ControlFrame): + """This is the first frame that should be pushed down a pipeline.""" + allow_interruptions: bool = False + enable_metrics: bool = False + report_only_initial_ttfb: bool = False + + @dataclass class EndFrame(ControlFrame): """Indicates that a pipeline has ended and frame processors and pipelines