Compare commits
4 Commits
pk/optiona
...
aleix/fram
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba86fc2f08 | ||
|
|
d459465eb6 | ||
|
|
74aea65f17 | ||
|
|
bd7b24596e |
1
changelog/3326.added.md
Normal file
1
changelog/3326.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Frame processors can now push frames from the top of the pipeline using new methods `queue_task_frame()` and `queue_task_frames()`.
|
||||
@@ -1653,6 +1653,18 @@ class InterruptionTaskFrame(TaskFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueueTaskFrame(TaskFrame):
|
||||
"""Frame to request the pipeline task to push the included frames.
|
||||
|
||||
This is useful when you want the push frames from upstream down to the
|
||||
pipeline. Pushing this frame upstream guarantees upstream frame ordering.
|
||||
|
||||
"""
|
||||
|
||||
frames: Sequence[Frame]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotInterruptionFrame(InterruptionTaskFrame):
|
||||
"""Frame indicating the bot should be interrupted.
|
||||
|
||||
@@ -35,6 +35,7 @@ from pipecat.frames.frames import (
|
||||
InterruptionFrame,
|
||||
InterruptionTaskFrame,
|
||||
MetricsFrame,
|
||||
QueueTaskFrame,
|
||||
StartFrame,
|
||||
StopFrame,
|
||||
StopTaskFrame,
|
||||
@@ -756,6 +757,9 @@ class PipelineTask(BasePipelineTask):
|
||||
# pipeline-ending frame to finish traversing the pipeline.
|
||||
logger.debug(f"{self}: received interruption task frame {frame}")
|
||||
await self._pipeline.queue_frame(InterruptionFrame())
|
||||
elif isinstance(frame, QueueTaskFrame):
|
||||
logger.debug(f"{self}: received queue task frame {frame}")
|
||||
await self.queue_frames(frame.frames)
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
await self._call_event_handler("on_pipeline_error", frame)
|
||||
if frame.fatal:
|
||||
|
||||
@@ -561,12 +561,10 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await s.reset()
|
||||
|
||||
if params.enable_user_speaking_frames:
|
||||
# TODO(aleix): This frame should really come from the top of the pipeline.
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
await self.queue_task_frame(UserStartedSpeakingFrame())
|
||||
|
||||
if params.enable_interruptions and self._allow_interruptions:
|
||||
# TODO(aleix): This frame should really come from the top of the pipeline.
|
||||
await self.broadcast_frame(InterruptionFrame)
|
||||
await self.queue_task_frame(InterruptionFrame())
|
||||
|
||||
await self._call_event_handler("on_user_turn_started", strategy)
|
||||
|
||||
@@ -588,8 +586,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await s.reset()
|
||||
|
||||
if params.enable_user_speaking_frames:
|
||||
# TODO(aleix): This frame should really come from the top of the pipeline.
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
await self.queue_task_frame(UserStoppedSpeakingFrame())
|
||||
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy)
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ from pipecat.frames.frames import (
|
||||
FrameProcessorResumeUrgentFrame,
|
||||
InterruptionFrame,
|
||||
InterruptionTaskFrame,
|
||||
QueueTaskFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
UninterruptibleFrame,
|
||||
@@ -612,6 +613,22 @@ class FrameProcessor(BaseObject):
|
||||
else:
|
||||
await self.__input_queue.put((frame, direction, callback))
|
||||
|
||||
async def queue_task_frame(self, frame: Frame):
|
||||
"""Queue a single frame to be pushed from the pipeline task down to the pipeline.
|
||||
|
||||
Args:
|
||||
frame: A single frame to push downstream from the top of the pipeline.
|
||||
"""
|
||||
await self.queue_task_frames([frame])
|
||||
|
||||
async def queue_task_frames(self, frames: Sequence[Frame]):
|
||||
"""Queue multiple frames to be pushed from the pipeline task down to the pipeline.
|
||||
|
||||
Args:
|
||||
frames: A sequence of frames to push downstream from the top of the pipeline.
|
||||
"""
|
||||
await self.push_frame(QueueTaskFrame(frames=frames), FrameDirection.UPSTREAM)
|
||||
|
||||
async def pause_processing_frames(self):
|
||||
"""Pause processing of queued frames."""
|
||||
logger.trace(f"{self}: pausing frame processing")
|
||||
|
||||
Reference in New Issue
Block a user