Compare commits

...

4 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
ba86fc2f08 LLMUserAggregator: use queue_task_frame() to push user speaking frames 2025-12-30 15:18:38 -08:00
Aleix Conchillo Flaqué
d459465eb6 FrameProcessor: add queue_task_frame() and queue_task_frames() 2025-12-30 15:18:38 -08:00
Aleix Conchillo Flaqué
74aea65f17 PipelineTask: use QueueTaskFrame 2025-12-30 15:16:52 -08:00
Aleix Conchillo Flaqué
bd7b24596e frames: add QueueTaskFrame 2025-12-30 15:16:52 -08:00
5 changed files with 37 additions and 6 deletions

1
changelog/3326.added.md Normal file
View File

@@ -0,0 +1 @@
- Frame processors can now push frames from the top of the pipeline using new methods `queue_task_frame()` and `queue_task_frames()`.

View File

@@ -1653,6 +1653,18 @@ class InterruptionTaskFrame(TaskFrame):
pass
@dataclass
class QueueTaskFrame(TaskFrame):
"""Frame to request the pipeline task to push the included frames.
This is useful when you want the push frames from upstream down to the
pipeline. Pushing this frame upstream guarantees upstream frame ordering.
"""
frames: Sequence[Frame]
@dataclass
class BotInterruptionFrame(InterruptionTaskFrame):
"""Frame indicating the bot should be interrupted.

View File

@@ -35,6 +35,7 @@ from pipecat.frames.frames import (
InterruptionFrame,
InterruptionTaskFrame,
MetricsFrame,
QueueTaskFrame,
StartFrame,
StopFrame,
StopTaskFrame,
@@ -756,6 +757,9 @@ class PipelineTask(BasePipelineTask):
# pipeline-ending frame to finish traversing the pipeline.
logger.debug(f"{self}: received interruption task frame {frame}")
await self._pipeline.queue_frame(InterruptionFrame())
elif isinstance(frame, QueueTaskFrame):
logger.debug(f"{self}: received queue task frame {frame}")
await self.queue_frames(frame.frames)
elif isinstance(frame, ErrorFrame):
await self._call_event_handler("on_pipeline_error", frame)
if frame.fatal:

View File

@@ -561,12 +561,10 @@ class LLMUserAggregator(LLMContextAggregator):
await s.reset()
if params.enable_user_speaking_frames:
# TODO(aleix): This frame should really come from the top of the pipeline.
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.queue_task_frame(UserStartedSpeakingFrame())
if params.enable_interruptions and self._allow_interruptions:
# TODO(aleix): This frame should really come from the top of the pipeline.
await self.broadcast_frame(InterruptionFrame)
await self.queue_task_frame(InterruptionFrame())
await self._call_event_handler("on_user_turn_started", strategy)
@@ -588,8 +586,7 @@ class LLMUserAggregator(LLMContextAggregator):
await s.reset()
if params.enable_user_speaking_frames:
# TODO(aleix): This frame should really come from the top of the pipeline.
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self.queue_task_frame(UserStoppedSpeakingFrame())
await self._call_event_handler("on_user_turn_stopped", strategy)

View File

@@ -41,6 +41,7 @@ from pipecat.frames.frames import (
FrameProcessorResumeUrgentFrame,
InterruptionFrame,
InterruptionTaskFrame,
QueueTaskFrame,
StartFrame,
SystemFrame,
UninterruptibleFrame,
@@ -612,6 +613,22 @@ class FrameProcessor(BaseObject):
else:
await self.__input_queue.put((frame, direction, callback))
async def queue_task_frame(self, frame: Frame):
"""Queue a single frame to be pushed from the pipeline task down to the pipeline.
Args:
frame: A single frame to push downstream from the top of the pipeline.
"""
await self.queue_task_frames([frame])
async def queue_task_frames(self, frames: Sequence[Frame]):
"""Queue multiple frames to be pushed from the pipeline task down to the pipeline.
Args:
frames: A sequence of frames to push downstream from the top of the pipeline.
"""
await self.push_frame(QueueTaskFrame(frames=frames), FrameDirection.UPSTREAM)
async def pause_processing_frames(self):
"""Pause processing of queued frames."""
logger.trace(f"{self}: pausing frame processing")