Merge pull request #322 from pipecat-ai/aleix/base-input-transport-system-frames-fix

transports(inputs): don't queue incoming system frames
This commit is contained in:
Aleix Conchillo Flaqué
2024-07-24 14:44:18 -07:00
committed by GitHub
3 changed files with 28 additions and 10 deletions

View File

@@ -5,6 +5,17 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Changed
- `StartFrame` is now a control frame similar to `EndFrame`.
### Fixed
- Fixed a `BaseInputTransport` issue that was causing incoming system frames to
be queued instead of being pushed immediately.
## [0.0.39] - 2024-07-23
### Fixed

View File

@@ -212,14 +212,6 @@ class SystemFrame(Frame):
pass
@dataclass
class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class CancelFrame(SystemFrame):
"""Indicates that a pipeline needs to stop right away."""
@@ -306,6 +298,14 @@ class ControlFrame(Frame):
pass
@dataclass
class StartFrame(ControlFrame):
"""This is the first frame that should be pushed down a pipeline."""
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class EndFrame(ControlFrame):
"""Indicates that a pipeline has ended and frame processors and pipelines

View File

@@ -18,6 +18,7 @@ from pipecat.frames.frames import (
Frame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
from pipecat.transports.base_transport import TransportParams
@@ -69,18 +70,24 @@ class BaseInputTransport(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, CancelFrame):
await self.stop()
# We don't queue a CancelFrame since we want to stop ASAP.
await self.push_frame(frame, direction)
elif isinstance(frame, BotInterruptionFrame):
await self._handle_interruptions(frame, False)
# All other system frames
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
await self.start(frame)
await self._internal_push_frame(frame, direction)
elif isinstance(frame, EndFrame):
await self._internal_push_frame(frame, direction)
await self.stop()
elif isinstance(frame, BotInterruptionFrame):
await self._handle_interruptions(frame, False)
# Other frames
else:
await self._internal_push_frame(frame, direction)