Compare commits
1 Commits
v0.0.106
...
aleix/intr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3efc5fd4a9 |
@@ -73,8 +73,24 @@ class Frame:
|
||||
|
||||
@dataclass
|
||||
class SystemFrame(Frame):
|
||||
"""System frames are frames that are not internally queued by any of the
|
||||
frame processors and should be processed immediately.
|
||||
"""A frame that bypasses the internal processing queues and is handled
|
||||
immediately upon being pushed. Unlike other frames, system frames are not
|
||||
queued by any frame processor. Instead, they are processed directly in the
|
||||
task that pushed them, operating outside (i.e. out-of-band) the normal
|
||||
streaming workflow of frame processors. System frames are not affected by
|
||||
user interruptions.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class UrgentFrame(Frame):
|
||||
"""A high-priority frame that is internally queued but always processed
|
||||
before other frames. Unlike system frames, urgent frames are processed from
|
||||
the frame processors' streaming task but are prioritized over other
|
||||
frames. Urgent frames are not affected by user interruptions.
|
||||
|
||||
"""
|
||||
|
||||
@@ -83,8 +99,9 @@ class SystemFrame(Frame):
|
||||
|
||||
@dataclass
|
||||
class DataFrame(Frame):
|
||||
"""Data frames are frames that will be processed in order and usually
|
||||
contain data such as LLM context, text, audio or images.
|
||||
"""A frame that is processed in order and usually contains data such as LLM
|
||||
context, text, audio or images. They are processed from the frame
|
||||
processors' streaming task. Data frames are cancelled by user interruptions.
|
||||
|
||||
"""
|
||||
|
||||
@@ -93,9 +110,10 @@ class DataFrame(Frame):
|
||||
|
||||
@dataclass
|
||||
class ControlFrame(Frame):
|
||||
"""Control frames are frames that, similar to data frames, will be processed
|
||||
in order and usually contain control information such as frames to update
|
||||
settings or to end the pipeline.
|
||||
"""A frame that, as data frames, is processed in order and usually contains
|
||||
control information such as update settings or to end the pipeline after
|
||||
everything is flushed. They are processed from the frame processors'
|
||||
streaming task. Control frames are cancelled by user interruptions.
|
||||
|
||||
"""
|
||||
|
||||
@@ -478,38 +496,6 @@ class HeartbeatFrame(SystemFrame):
|
||||
timestamp: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class EndTaskFrame(SystemFrame):
|
||||
"""This is used to notify the pipeline task that the pipeline should be
|
||||
closed nicely (flushing all the queued frames) by pushing an EndFrame
|
||||
downstream.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class CancelTaskFrame(SystemFrame):
|
||||
"""This is used to notify the pipeline task that the pipeline should be
|
||||
stopped immediately by pushing a CancelFrame downstream.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class StopTaskFrame(SystemFrame):
|
||||
"""This is used to notify the pipeline task that it should be stopped as
|
||||
soon as possible (flushing all the queued frames) but that the pipeline
|
||||
processors should be kept in a running state.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class StartInterruptionFrame(SystemFrame):
|
||||
"""Emitted by VAD to indicate that a user has started speaking (i.e. is
|
||||
@@ -534,8 +520,56 @@ class StopInterruptionFrame(SystemFrame):
|
||||
pass
|
||||
|
||||
|
||||
#
|
||||
# Urgent frames
|
||||
#
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserStartedSpeakingFrame(SystemFrame):
|
||||
class EndTaskFrame(UrgentFrame):
|
||||
"""This is used to notify the pipeline task that the pipeline should be
|
||||
closed nicely (flushing all the queued frames) by pushing an EndFrame
|
||||
downstream.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class CancelTaskFrame(UrgentFrame):
|
||||
"""This is used to notify the pipeline task that the pipeline should be
|
||||
stopped immediately by pushing a CancelFrame downstream.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class StopTaskFrame(UrgentFrame):
|
||||
"""This is used to notify the pipeline task that it should be stopped as
|
||||
soon as possible (flushing all the queued frames) but that the pipeline
|
||||
processors should be kept in a running state.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotInterruptionFrame(UrgentFrame):
|
||||
"""Emitted by when the bot should be interrupted. This will mainly cause the
|
||||
same actions as if the user interrupted except that the
|
||||
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserStartedSpeakingFrame(UrgentFrame):
|
||||
"""Emitted by VAD to indicate that a user has started speaking. This can be
|
||||
used for interruptions or other times when detecting that someone is
|
||||
speaking is more important than knowing what they're saying (as you will
|
||||
@@ -547,57 +581,28 @@ class UserStartedSpeakingFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserStoppedSpeakingFrame(SystemFrame):
|
||||
class UserStoppedSpeakingFrame(UrgentFrame):
|
||||
"""Emitted by the VAD to indicate that a user stopped speaking."""
|
||||
|
||||
emulated: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmulateUserStartedSpeakingFrame(SystemFrame):
|
||||
"""Emitted by internal processors upstream to emulate VAD behavior when a
|
||||
user starts speaking.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmulateUserStoppedSpeakingFrame(SystemFrame):
|
||||
"""Emitted by internal processors upstream to emulate VAD behavior when a
|
||||
user stops speaking.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotInterruptionFrame(SystemFrame):
|
||||
"""Emitted by when the bot should be interrupted. This will mainly cause the
|
||||
same actions as if the user interrupted except that the
|
||||
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotStartedSpeakingFrame(SystemFrame):
|
||||
class BotStartedSpeakingFrame(UrgentFrame):
|
||||
"""Emitted upstream by transport outputs to indicate the bot started speaking."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotStoppedSpeakingFrame(SystemFrame):
|
||||
class BotStoppedSpeakingFrame(UrgentFrame):
|
||||
"""Emitted upstream by transport outputs to indicate the bot stopped speaking."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotSpeakingFrame(SystemFrame):
|
||||
class BotSpeakingFrame(UrgentFrame):
|
||||
"""Emitted upstream by transport outputs while the bot is still
|
||||
speaking. This can be used, for example, to detect when a user is idle. That
|
||||
is, while the bot is speaking we don't want to trigger any user idle timeout
|
||||
@@ -609,14 +614,32 @@ class BotSpeakingFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricsFrame(SystemFrame):
|
||||
class EmulateUserStartedSpeakingFrame(UrgentFrame):
|
||||
"""Emitted by internal processors upstream to emulate VAD behavior when a
|
||||
user starts speaking.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmulateUserStoppedSpeakingFrame(UrgentFrame):
|
||||
"""Emitted by internal processors upstream to emulate VAD behavior when a
|
||||
user stops speaking.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricsFrame(UrgentFrame):
|
||||
"""Emitted by processor that can compute metrics like latencies."""
|
||||
|
||||
data: List[MetricsData]
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionCallInProgressFrame(SystemFrame):
|
||||
class FunctionCallInProgressFrame(UrgentFrame):
|
||||
"""A frame signaling that a function call is in progress."""
|
||||
|
||||
function_name: str
|
||||
@@ -626,7 +649,7 @@ class FunctionCallInProgressFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionCallCancelFrame(SystemFrame):
|
||||
class FunctionCallCancelFrame(UrgentFrame):
|
||||
"""A frame to signal a function call has been cancelled."""
|
||||
|
||||
function_name: str
|
||||
@@ -642,7 +665,7 @@ class FunctionCallResultProperties:
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionCallResultFrame(SystemFrame):
|
||||
class FunctionCallResultFrame(UrgentFrame):
|
||||
"""A frame containing the result of an LLM function (tool) call."""
|
||||
|
||||
function_name: str
|
||||
@@ -653,14 +676,14 @@ class FunctionCallResultFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class STTMuteFrame(SystemFrame):
|
||||
"""System frame to mute/unmute the STT service."""
|
||||
class STTMuteFrame(UrgentFrame):
|
||||
"""A frame to mute/unmute the STT service."""
|
||||
|
||||
mute: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageUrgentFrame(SystemFrame):
|
||||
class TransportMessageUrgentFrame(UrgentFrame):
|
||||
message: Any
|
||||
|
||||
def __str__(self):
|
||||
@@ -668,7 +691,7 @@ class TransportMessageUrgentFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserImageRequestFrame(SystemFrame):
|
||||
class UserImageRequestFrame(UrgentFrame):
|
||||
"""A frame to request an image from the given user. The frame might be
|
||||
generated by a function call in which case the corresponding fields will be
|
||||
properly set.
|
||||
@@ -685,7 +708,7 @@ class UserImageRequestFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputAudioRawFrame(SystemFrame, AudioRawFrame):
|
||||
class InputAudioRawFrame(UrgentFrame, AudioRawFrame):
|
||||
"""A chunk of audio usually coming from an input transport."""
|
||||
|
||||
def __post_init__(self):
|
||||
@@ -698,7 +721,7 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputImageRawFrame(SystemFrame, ImageRawFrame):
|
||||
class InputImageRawFrame(UrgentFrame, ImageRawFrame):
|
||||
"""An image usually coming from an input transport."""
|
||||
|
||||
def __str__(self):
|
||||
@@ -740,7 +763,7 @@ class EndFrame(ControlFrame):
|
||||
should be shut down. If the transport receives this frame, it will stop
|
||||
sending frames to its output channel(s) and close all its threads. Note,
|
||||
that this is a control frame, which means it will received in the order it
|
||||
was sent (unline system frames).
|
||||
was sent.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Coroutine, Optional
|
||||
|
||||
@@ -19,6 +20,7 @@ from pipecat.frames.frames import (
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
SystemFrame,
|
||||
UrgentFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
|
||||
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
|
||||
@@ -31,6 +33,51 @@ class FrameDirection(Enum):
|
||||
UPSTREAM = 2
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameProcessorQueueItem:
|
||||
frame: Frame
|
||||
direction: FrameDirection
|
||||
callback: Optional[Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]]
|
||||
|
||||
|
||||
class FrameProcessorQueue:
|
||||
def __init__(self):
|
||||
self._queue = asyncio.Queue()
|
||||
self._urgent_queue = asyncio.Queue()
|
||||
self._event = asyncio.Event()
|
||||
|
||||
async def put(self, item: FrameProcessorQueueItem):
|
||||
if isinstance(item.frame, UrgentFrame):
|
||||
await self._urgent_queue.put(item)
|
||||
else:
|
||||
await self._queue.put(item)
|
||||
self._event.set()
|
||||
|
||||
async def get(self) -> FrameProcessorQueueItem:
|
||||
# Wait for an item in any of the queues.
|
||||
await self._event.wait()
|
||||
|
||||
if self._urgent_queue.empty():
|
||||
item = await self._queue.get()
|
||||
self._queue.task_done()
|
||||
else:
|
||||
item = await self._urgent_queue.get()
|
||||
self._urgent_queue.task_done()
|
||||
|
||||
# Clear the event only if all queues are empty.
|
||||
if self._queue.empty() and self._urgent_queue.empty():
|
||||
self._event.clear()
|
||||
|
||||
return item
|
||||
|
||||
def clear(self):
|
||||
self._queue = asyncio.Queue()
|
||||
|
||||
# Clear the event only if all queues are empty.
|
||||
if self._queue.empty() and self._urgent_queue.empty():
|
||||
self._event.clear()
|
||||
|
||||
|
||||
class FrameProcessor(BaseObject):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -68,18 +115,21 @@ class FrameProcessor(BaseObject):
|
||||
self._metrics = metrics or FrameProcessorMetrics()
|
||||
self._metrics.set_processor_name(self.name)
|
||||
|
||||
# Processors have an input queue. The input queue will be processed
|
||||
# immediately (default) or it will block if `pause_processing_frames()`
|
||||
# Processors receive frames on a streaming queue which are then
|
||||
# processed by a streaming task. This guarantees that all frames are
|
||||
# processed in the same task. By default, the streaming queue is
|
||||
# processed immediately but it may block if `pause_processing_frames()`
|
||||
# is called. To resume processing frames we need to call
|
||||
# `resume_processing_frames()` which will wake up the event.
|
||||
self.__should_block_frames = False
|
||||
self.__input_event = asyncio.Event()
|
||||
self.__input_frame_task: Optional[asyncio.Task] = None
|
||||
self.__streaming_event = asyncio.Event()
|
||||
self.__streaming_queue = FrameProcessorQueue()
|
||||
self.__streaming_frame_task: Optional[asyncio.Task] = None
|
||||
|
||||
# Every processor in Pipecat should only output frames from a single
|
||||
# task. This avoid problems like audio overlapping. System frames are the
|
||||
# exception to this rule. This create this task.
|
||||
self.__push_frame_task: Optional[asyncio.Task] = None
|
||||
self.__process_queue = asyncio.Queue()
|
||||
self.__process_task: Optional[asyncio.Task] = None
|
||||
self.__process_urgent_queue = asyncio.Queue()
|
||||
self.__process_urgent_task: Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
@@ -169,7 +219,8 @@ class FrameProcessor(BaseObject):
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
await self.__cancel_process_task()
|
||||
await self.__cancel_process_urgent_task()
|
||||
|
||||
def link(self, processor: "FrameProcessor"):
|
||||
self._next = processor
|
||||
@@ -214,7 +265,7 @@ class FrameProcessor(BaseObject):
|
||||
await self.process_frame(frame, direction)
|
||||
else:
|
||||
# We queue everything else.
|
||||
await self.__input_queue.put((frame, direction, callback))
|
||||
await self.__streaming_queue.put(FrameProcessorQueueItem(frame, direction, callback))
|
||||
|
||||
async def pause_processing_frames(self):
|
||||
logger.trace(f"{self}: pausing frame processing")
|
||||
@@ -222,7 +273,7 @@ class FrameProcessor(BaseObject):
|
||||
|
||||
async def resume_processing_frames(self):
|
||||
logger.trace(f"{self}: resuming frame processing")
|
||||
self.__input_event.set()
|
||||
self.__streaming_event.set()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, StartFrame):
|
||||
@@ -249,47 +300,6 @@ class FrameProcessor(BaseObject):
|
||||
if not self._check_ready(frame):
|
||||
return
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
else:
|
||||
await self.__push_queue.put((frame, direction))
|
||||
|
||||
async def __start(self, frame: StartFrame):
|
||||
self.__create_input_task()
|
||||
self.__create_push_task()
|
||||
|
||||
async def __cancel(self, frame: CancelFrame):
|
||||
self._cancelling = True
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
|
||||
#
|
||||
# Handle interruptions
|
||||
#
|
||||
|
||||
async def _start_interruption(self):
|
||||
try:
|
||||
# Cancel the push frame task. This will stop pushing frames downstream.
|
||||
await self.__cancel_push_task()
|
||||
|
||||
# Cancel the input task. This will stop processing queued frames.
|
||||
await self.__cancel_input_task()
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
raise
|
||||
|
||||
# Create a new input queue and task.
|
||||
self.__create_input_task()
|
||||
|
||||
# Create a new output queue and task.
|
||||
self.__create_push_task()
|
||||
|
||||
async def _stop_interruption(self):
|
||||
# Nothing to do right now.
|
||||
pass
|
||||
|
||||
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
|
||||
try:
|
||||
timestamp = self._clock.get_time() if self._clock else 0
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
@@ -311,6 +321,49 @@ class FrameProcessor(BaseObject):
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
raise
|
||||
|
||||
async def __start(self, frame: StartFrame):
|
||||
self.__create_process_task()
|
||||
self.__create_process_urgent_task()
|
||||
self.__create_input_task()
|
||||
|
||||
async def __cancel(self, frame: CancelFrame):
|
||||
self._cancelling = True
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_process_task()
|
||||
await self.__cancel_process_urgent_task()
|
||||
|
||||
#
|
||||
# Handle interruptions
|
||||
#
|
||||
|
||||
async def _start_interruption(self):
|
||||
try:
|
||||
# Cancel the streaming task.
|
||||
await self.__cancel_input_task()
|
||||
|
||||
# Cancel the task processing frames. We do not cancel the task that
|
||||
# is processing urgent frames.
|
||||
await self.__cancel_process_task()
|
||||
|
||||
# If there's an interruption we should not block frames anymore.
|
||||
self.__should_block_frames = False
|
||||
|
||||
# Clear the streaming queue, since we don't want to process its
|
||||
# frame anymore (except system and urgent frames).
|
||||
self.__streaming_queue.clear()
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
raise
|
||||
|
||||
# Create a new tasks.
|
||||
self.__create_process_task()
|
||||
self.__create_input_task()
|
||||
|
||||
async def _stop_interruption(self):
|
||||
# Nothing to do right now.
|
||||
pass
|
||||
|
||||
def _check_ready(self, frame: Frame):
|
||||
# If we are trying to push a frame but we still have no clock, it means
|
||||
# we didn't process a StartFrame.
|
||||
@@ -322,49 +375,60 @@ class FrameProcessor(BaseObject):
|
||||
return True
|
||||
|
||||
def __create_input_task(self):
|
||||
if not self.__input_frame_task:
|
||||
self.__should_block_frames = False
|
||||
self.__input_event.clear()
|
||||
self.__input_queue = asyncio.Queue()
|
||||
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
|
||||
if not self.__streaming_frame_task:
|
||||
self.__streaming_frame_task = self.create_task(self.__streaming_frame_task_handler())
|
||||
|
||||
async def __cancel_input_task(self):
|
||||
if self.__input_frame_task:
|
||||
await self.cancel_task(self.__input_frame_task)
|
||||
self.__input_frame_task = None
|
||||
if self.__streaming_frame_task:
|
||||
await self.cancel_task(self.__streaming_frame_task)
|
||||
self.__streaming_frame_task = None
|
||||
|
||||
async def __input_frame_task_handler(self):
|
||||
def __create_process_task(self):
|
||||
if not self.__process_task:
|
||||
self.__process_queue = asyncio.Queue()
|
||||
self.__process_task = self.create_task(
|
||||
self.__process_task_handler(self.__process_queue)
|
||||
)
|
||||
|
||||
async def __cancel_process_task(self):
|
||||
if self.__process_task:
|
||||
await self.cancel_task(self.__process_task)
|
||||
self.__process_task = None
|
||||
|
||||
def __create_process_urgent_task(self):
|
||||
if not self.__process_urgent_task:
|
||||
self.__process_urgent_task = self.create_task(
|
||||
self.__process_task_handler(self.__process_urgent_queue)
|
||||
)
|
||||
|
||||
async def __cancel_process_urgent_task(self):
|
||||
if self.__process_urgent_task:
|
||||
await self.cancel_task(self.__process_urgent_task)
|
||||
self.__process_urgent_task = None
|
||||
|
||||
async def __streaming_frame_task_handler(self):
|
||||
while True:
|
||||
if self.__should_block_frames:
|
||||
logger.trace(f"{self}: frame processing paused")
|
||||
await self.__input_event.wait()
|
||||
self.__input_event.clear()
|
||||
await self.__streaming_event.wait()
|
||||
self.__streaming_event.clear()
|
||||
self.__should_block_frames = False
|
||||
logger.trace(f"{self}: frame processing resumed")
|
||||
|
||||
(frame, direction, callback) = await self.__input_queue.get()
|
||||
item = await self.__streaming_queue.get()
|
||||
|
||||
if isinstance(item.frame, UrgentFrame):
|
||||
await self.__process_urgent_queue.put(item)
|
||||
else:
|
||||
await self.__process_queue.put(item)
|
||||
|
||||
async def __process_task_handler(self, queue: asyncio.Queue):
|
||||
while True:
|
||||
item = await queue.get()
|
||||
|
||||
# Process the frame.
|
||||
await self.process_frame(frame, direction)
|
||||
await self.process_frame(item.frame, item.direction)
|
||||
|
||||
# If this frame has an associated callback, call it now.
|
||||
if callback:
|
||||
await callback(self, frame, direction)
|
||||
|
||||
self.__input_queue.task_done()
|
||||
|
||||
def __create_push_task(self):
|
||||
if not self.__push_frame_task:
|
||||
self.__push_queue = asyncio.Queue()
|
||||
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
|
||||
|
||||
async def __cancel_push_task(self):
|
||||
if self.__push_frame_task:
|
||||
await self.cancel_task(self.__push_frame_task)
|
||||
self.__push_frame_task = None
|
||||
|
||||
async def __push_frame_task_handler(self):
|
||||
while True:
|
||||
(frame, direction) = await self.__push_queue.get()
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
self.__push_queue.task_done()
|
||||
if item.callback:
|
||||
await item.callback(self, item.frame, item.direction)
|
||||
|
||||
@@ -9,6 +9,7 @@ import unittest
|
||||
from pipecat.frames.frames import (
|
||||
EndFrame,
|
||||
Frame,
|
||||
StartInterruptionFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
@@ -57,8 +58,8 @@ class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def test_system_frame(self):
|
||||
filter = FrameFilter(types=())
|
||||
frames_to_send = [UserStartedSpeakingFrame()]
|
||||
expected_down_frames = [UserStartedSpeakingFrame]
|
||||
frames_to_send = [StartInterruptionFrame()]
|
||||
expected_down_frames = [StartInterruptionFrame]
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
|
||||
Reference in New Issue
Block a user