Compare commits
1 Commits
main
...
aleix/obse
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0bf61d9e58 |
18
CHANGELOG.md
18
CHANGELOG.md
@@ -11,8 +11,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- Added support to `RimeHttpTTSService` for the `arcana` model.
|
||||
|
||||
### Changed
|
||||
|
||||
- Observers `on_push_frame()` now take a single argument `FramePushed` instead
|
||||
of multiple arguments.
|
||||
|
||||
### Removed
|
||||
|
||||
- `StartFrame.observer` has been replaced by `StartFrame.observers`. This allows
|
||||
a user to pass all observers instead of a single proxy one.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Observer `on_push_frame(src, dst, frame, direction, timestamp)` is now
|
||||
deprecated, use `on_push_frame(data: FramePushed)` instead.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `UltravoxSTTService` issue that would cause the service to generate
|
||||
all tokens as one word.
|
||||
|
||||
- Remove custom audio tracks from `DailyTransport` before leaving.
|
||||
|
||||
## [0.0.66] - 2025-05-02
|
||||
|
||||
@@ -14,16 +14,15 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
@@ -46,14 +45,13 @@ class DebugObserver(BaseObserver):
|
||||
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
src = data.source
|
||||
dst = data.destination
|
||||
frame = data.frame
|
||||
direction = data.direction
|
||||
timestamp = data.timestamp
|
||||
|
||||
# Convert timestamp to seconds for readability
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ from typing import (
|
||||
Literal,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
@@ -449,12 +450,12 @@ class StartFrame(SystemFrame):
|
||||
|
||||
clock: BaseClock
|
||||
task_manager: BaseTaskManager
|
||||
observers: Sequence["BaseObserver"]
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
observer: Optional["BaseObserver"] = None
|
||||
report_only_initial_ttfb: bool = False
|
||||
|
||||
|
||||
|
||||
@@ -5,9 +5,38 @@
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
|
||||
from typing_extensions import TYPE_CHECKING
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
@dataclass
|
||||
class FramePushed:
|
||||
"""Represents an event where a frame is pushed from one processor to another
|
||||
within the pipeline.
|
||||
|
||||
This data structure is typically used by observers to track the flow of
|
||||
frames through the pipeline for logging, debugging, or analytics purposes.
|
||||
|
||||
Attributes:
|
||||
source (FrameProcessor): The processor sending the frame.
|
||||
destination (FrameProcessor): The processor receiving the frame.
|
||||
frame (Frame): The frame being transferred.
|
||||
direction (FrameDirection): The direction of the transfer (e.g., downstream or upstream).
|
||||
timestamp (int): The time when the frame was pushed, based on the pipeline clock.
|
||||
|
||||
"""
|
||||
|
||||
source: "FrameProcessor"
|
||||
destination: "FrameProcessor"
|
||||
frame: Frame
|
||||
direction: "FrameDirection"
|
||||
timestamp: int
|
||||
|
||||
|
||||
class BaseObserver(ABC):
|
||||
@@ -19,26 +48,15 @@ class BaseObserver(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
"""Abstract method to handle the event when a frame is pushed from one
|
||||
processor to another.
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Handle the event when a frame is pushed from one processor to another.
|
||||
|
||||
This method should be implemented by subclasses to define specific
|
||||
behavior (e.g., logging, monitoring, debugging) when a frame is
|
||||
transferred through the pipeline.
|
||||
|
||||
Args:
|
||||
src (FrameProcessor): The source frame processor that is sending the frame.
|
||||
dst (FrameProcessor): The destination frame processor that will receive the frame.
|
||||
frame (Frame): The frame being transferred between processors.
|
||||
direction (FrameDirection): The direction of the frame transfer.
|
||||
timestamp (int): The timestamp when the frame was pushed (based on the pipeline clock).
|
||||
|
||||
This method should be implemented by subclasses to define specific behavior
|
||||
when a frame is pushed.
|
||||
data (FramePushed): The event data containing details about the frame transfer.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
|
||||
LLMMessagesFrame,
|
||||
LLMTextFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.llm_service import LLMService
|
||||
@@ -38,14 +38,13 @@ class LLMLogObserver(BaseObserver):
|
||||
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
src = data.source
|
||||
dst = data.destination
|
||||
frame = data.frame
|
||||
direction = data.direction
|
||||
timestamp = data.timestamp
|
||||
|
||||
if not isinstance(src, LLMService) and not isinstance(dst, LLMService):
|
||||
return
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.stt_service import STTService
|
||||
|
||||
@@ -29,14 +29,11 @@ class TranscriptionLogObserver(BaseObserver):
|
||||
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
src = data.source
|
||||
frame = data.frame
|
||||
timestamp = data.timestamp
|
||||
|
||||
if not isinstance(src, STTService):
|
||||
return
|
||||
|
||||
|
||||
@@ -32,7 +32,6 @@ from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.pipeline.base_pipeline import BasePipeline
|
||||
from pipecat.pipeline.base_task import BaseTask
|
||||
from pipecat.pipeline.task_observer import TaskObserver
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.asyncio import BaseTaskManager, TaskManager
|
||||
|
||||
@@ -184,6 +183,7 @@ class PipelineTask(BaseTask):
|
||||
self._idle_timeout_secs = idle_timeout_secs
|
||||
self._idle_timeout_frames = idle_timeout_frames
|
||||
self._cancel_on_idle_timeout = cancel_on_idle_timeout
|
||||
self._observers = observers
|
||||
if self._params.observers:
|
||||
import warnings
|
||||
|
||||
@@ -193,7 +193,7 @@ class PipelineTask(BaseTask):
|
||||
"Field 'observers' is deprecated, use the 'observers' parameter instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
observers = self._params.observers
|
||||
self._observers = self._params.observers
|
||||
self._finished = False
|
||||
|
||||
# This queue receives frames coming from the pipeline upstream.
|
||||
@@ -229,11 +229,6 @@ class PipelineTask(BaseTask):
|
||||
# PipelineTask and its frame processors.
|
||||
self._task_manager = task_manager or TaskManager()
|
||||
|
||||
# The task observer acts as a proxy to the provided observers. This way,
|
||||
# we only need to pass a single observer (using the StartFrame) which
|
||||
# then just acts as a proxy.
|
||||
self._observer = TaskObserver(observers=observers, task_manager=self._task_manager)
|
||||
|
||||
# These events can be used to check which frames make it to the source
|
||||
# or sink processors. Instead of calling the event handlers for every
|
||||
# frame the user needs to specify which events they are interested
|
||||
@@ -347,8 +342,6 @@ class PipelineTask(BaseTask):
|
||||
self._process_push_queue(), f"{self}::_process_push_queue"
|
||||
)
|
||||
|
||||
await self._observer.start()
|
||||
|
||||
return self._process_push_task
|
||||
|
||||
def _maybe_start_heartbeat_tasks(self):
|
||||
@@ -367,8 +360,6 @@ class PipelineTask(BaseTask):
|
||||
)
|
||||
|
||||
async def _cancel_tasks(self):
|
||||
await self._observer.stop()
|
||||
|
||||
await self._task_manager.cancel_task(self._process_up_task)
|
||||
await self._task_manager.cancel_task(self._process_down_task)
|
||||
|
||||
@@ -425,7 +416,7 @@ class PipelineTask(BaseTask):
|
||||
audio_out_sample_rate=self._params.audio_out_sample_rate,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_usage_metrics,
|
||||
observer=self._observer,
|
||||
observers=self._observers,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
)
|
||||
start_frame.metadata = self._params.start_metadata
|
||||
|
||||
@@ -1,103 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import List
|
||||
|
||||
from attr import dataclass
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.asyncio import BaseTaskManager
|
||||
|
||||
|
||||
@dataclass
|
||||
class Proxy:
|
||||
"""This is the data we receive from the main observer and that we put into
|
||||
a queue for later processing.
|
||||
|
||||
"""
|
||||
|
||||
queue: asyncio.Queue
|
||||
task: asyncio.Task
|
||||
observer: BaseObserver
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObserverData:
|
||||
"""This is the data we receive from the main observer and that we put into a
|
||||
proxy queue for later processing.
|
||||
|
||||
"""
|
||||
|
||||
src: FrameProcessor
|
||||
dst: FrameProcessor
|
||||
frame: Frame
|
||||
direction: FrameDirection
|
||||
timestamp: int
|
||||
|
||||
|
||||
class TaskObserver(BaseObserver):
|
||||
"""This is a pipeline frame observer that is meant to be used as a proxy to
|
||||
the user provided observers. That is, this is the observer that should be
|
||||
passed to the frame processors. Then, every time a frame is pushed this
|
||||
observer will call all the observers registered to the pipeline task.
|
||||
|
||||
This observer makes sure that passing frames to observers doesn't block the
|
||||
pipeline by creating a queue and a task for each user observer. When a frame
|
||||
is received, it will be put in a queue for efficiency and later processed by
|
||||
each task.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *, observers: List[BaseObserver] = [], task_manager: BaseTaskManager):
|
||||
self._observers = observers
|
||||
self._task_manager = task_manager
|
||||
self._proxies: List[Proxy] = []
|
||||
|
||||
async def start(self):
|
||||
"""Starts all proxy observer tasks."""
|
||||
self._proxies = self._create_proxies(self._observers)
|
||||
|
||||
async def stop(self):
|
||||
"""Stops all proxy observer tasks."""
|
||||
for proxy in self._proxies:
|
||||
await self._task_manager.cancel_task(proxy.task)
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
for proxy in self._proxies:
|
||||
await proxy.queue.put(
|
||||
ObserverData(
|
||||
src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp
|
||||
)
|
||||
)
|
||||
|
||||
def _create_proxies(self, observers) -> List[Proxy]:
|
||||
proxies = []
|
||||
for observer in observers:
|
||||
queue = asyncio.Queue()
|
||||
task = self._task_manager.create_task(
|
||||
self._proxy_task_handler(queue, observer),
|
||||
f"TaskObserver::{observer.__class__.__name__}::_proxy_task_handler",
|
||||
)
|
||||
proxy = Proxy(queue=queue, task=task, observer=observer)
|
||||
proxies.append(proxy)
|
||||
return proxies
|
||||
|
||||
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
|
||||
while True:
|
||||
data = await queue.get()
|
||||
await observer.on_push_frame(
|
||||
data.src, data.dst, data.frame, data.direction, data.timestamp
|
||||
)
|
||||
@@ -5,8 +5,9 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Coroutine, Optional
|
||||
from typing import Awaitable, Callable, Coroutine, List, Optional, Sequence
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -21,8 +22,9 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
|
||||
from pipecat.utils.asyncio import BaseTaskManager
|
||||
from pipecat.utils.asyncio import BaseTaskManager, TaskManager
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
|
||||
@@ -31,6 +33,52 @@ class FrameDirection(Enum):
|
||||
UPSTREAM = 2
|
||||
|
||||
|
||||
class FrameObserverProxy:
|
||||
def __init__(self, task_manager: TaskManager, observer: BaseObserver) -> None:
|
||||
self._task_manager = task_manager
|
||||
self._observer = observer
|
||||
self._queue = asyncio.Queue()
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._warning_reported = False
|
||||
|
||||
def start(self):
|
||||
if not self._task:
|
||||
self._task = self._task_manager.create_task(
|
||||
self._observer_task_handler(), f"ObserverProxy::{self._observer.__class__.__name__}"
|
||||
)
|
||||
|
||||
async def stop(self):
|
||||
if self._task:
|
||||
await self._task_manager.cancel_task(self._task)
|
||||
self._task = None
|
||||
|
||||
async def observe(self, data: FramePushed):
|
||||
await self._queue.put(data)
|
||||
|
||||
async def _observer_task_handler(self):
|
||||
while True:
|
||||
data = await self._queue.get()
|
||||
|
||||
signature = inspect.signature(self._observer.on_push_frame)
|
||||
if len(signature.parameters) > 1:
|
||||
if not self._warning_reported:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, use `on_push_frame(data: FramePushed)` instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
self._warning_reported = True
|
||||
await self._observer.on_push_frame(
|
||||
data.source, data.destination, data.frame, data.direction, data.timestamp
|
||||
)
|
||||
else:
|
||||
await self._observer.on_push_frame(data)
|
||||
self._queue.task_done()
|
||||
|
||||
|
||||
class FrameProcessor(BaseObject):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -55,7 +103,7 @@ class FrameProcessor(BaseObject):
|
||||
self._enable_metrics = False
|
||||
self._enable_usage_metrics = False
|
||||
self._report_only_initial_ttfb = False
|
||||
self._observer = None
|
||||
self._observer_proxies: List[FrameObserverProxy] = []
|
||||
|
||||
# Cancellation is done through CancelFrame (a system frame). This could
|
||||
# cause other events being triggered (e.g. closing a transport) which
|
||||
@@ -77,10 +125,16 @@ class FrameProcessor(BaseObject):
|
||||
self.__input_frame_task: Optional[asyncio.Task] = None
|
||||
|
||||
# Every processor in Pipecat should only output frames from a single
|
||||
# task. This avoid problems like audio overlapping. System frames are the
|
||||
# exception to this rule. This create this task.
|
||||
# task. This avoids problems like audio overlapping. System frames are
|
||||
# the exception to this rule.
|
||||
self.__push_frame_task: Optional[asyncio.Task] = None
|
||||
|
||||
# The observers task will push observed frames to the observers'
|
||||
# proxies. This task avoids the pipeline to block. Then, there is one
|
||||
# proxy per observer and each proxy has its own task. This way each
|
||||
# observer is independent to the rest.
|
||||
self.__observers_task: Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._id
|
||||
@@ -170,6 +224,7 @@ class FrameProcessor(BaseObject):
|
||||
await super().cleanup()
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
await self.__cancel_observers_task()
|
||||
|
||||
def link(self, processor: "FrameProcessor"):
|
||||
self._next = processor
|
||||
@@ -232,7 +287,9 @@ class FrameProcessor(BaseObject):
|
||||
self._enable_metrics = frame.enable_metrics
|
||||
self._enable_usage_metrics = frame.enable_usage_metrics
|
||||
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
|
||||
self._observer = frame.observer
|
||||
self._observer_proxies = self._create_observer_proxies(
|
||||
frame.task_manager, frame.observers
|
||||
)
|
||||
await self.__start(frame)
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self._start_interruption()
|
||||
@@ -255,6 +312,7 @@ class FrameProcessor(BaseObject):
|
||||
await self.__push_queue.put((frame, direction))
|
||||
|
||||
async def __start(self, frame: StartFrame):
|
||||
self.__create_observers_task()
|
||||
self.__create_input_task()
|
||||
self.__create_push_task()
|
||||
|
||||
@@ -262,6 +320,7 @@ class FrameProcessor(BaseObject):
|
||||
self._cancelling = True
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
await self.__cancel_observers_task()
|
||||
|
||||
#
|
||||
# Handle interruptions
|
||||
@@ -294,17 +353,25 @@ class FrameProcessor(BaseObject):
|
||||
timestamp = self._clock.get_time() if self._clock else 0
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
||||
if self._observer:
|
||||
await self._observer.on_push_frame(
|
||||
self, self._next, frame, direction, timestamp
|
||||
)
|
||||
data = FramePushed(
|
||||
source=self,
|
||||
destination=self._next,
|
||||
frame=frame,
|
||||
direction=direction,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
await self.__observe_frame(data)
|
||||
await self._next.queue_frame(frame, direction)
|
||||
elif direction == FrameDirection.UPSTREAM and self._prev:
|
||||
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
||||
if self._observer:
|
||||
await self._observer.on_push_frame(
|
||||
self, self._prev, frame, direction, timestamp
|
||||
)
|
||||
data = FramePushed(
|
||||
source=self,
|
||||
destination=self._prev,
|
||||
frame=frame,
|
||||
direction=direction,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
await self.__observe_frame(data)
|
||||
await self._prev.queue_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
@@ -368,3 +435,36 @@ class FrameProcessor(BaseObject):
|
||||
(frame, direction) = await self.__push_queue.get()
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
self.__push_queue.task_done()
|
||||
|
||||
def _create_observer_proxies(
|
||||
self, task_manager: TaskManager, observers: Sequence[BaseObserver]
|
||||
) -> List[FrameObserverProxy]:
|
||||
result = []
|
||||
for observer in observers:
|
||||
proxy = FrameObserverProxy(task_manager, observer)
|
||||
proxy.start()
|
||||
result.append(proxy)
|
||||
return result
|
||||
|
||||
def __create_observers_task(self):
|
||||
if not self.__observers_task:
|
||||
self.__observers_queue = asyncio.Queue()
|
||||
self.__observers_task = self.create_task(self.__observers_task_handler())
|
||||
|
||||
async def __cancel_observers_task(self):
|
||||
if self.__observers_task:
|
||||
for proxy in self._observer_proxies:
|
||||
await proxy.stop()
|
||||
await self.cancel_task(self.__observers_task)
|
||||
self.__observers_task = None
|
||||
|
||||
async def __observe_frame(self, data: FramePushed):
|
||||
await self.__observers_queue.put(data)
|
||||
|
||||
async def __observers_task_handler(self):
|
||||
while True:
|
||||
data = await self.__observers_queue.get()
|
||||
# Proxy observation to all observers.
|
||||
for observer in self._observer_proxies:
|
||||
await observer.observe(data)
|
||||
self.__observers_queue.task_done()
|
||||
|
||||
@@ -55,7 +55,7 @@ from pipecat.metrics.metrics import (
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
@@ -445,14 +445,7 @@ class RTVIObserver(BaseObserver):
|
||||
self._frames_seen = set()
|
||||
rtvi.set_errors_enabled(self._params.errors_enabled)
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Process a frame being pushed through the pipeline.
|
||||
|
||||
Args:
|
||||
@@ -462,6 +455,10 @@ class RTVIObserver(BaseObserver):
|
||||
direction: Direction of frame flow in pipeline
|
||||
timestamp: Time when frame was pushed
|
||||
"""
|
||||
src = data.source
|
||||
frame = data.frame
|
||||
direction = data.direction
|
||||
|
||||
# If we have already seen this frame, let's skip it.
|
||||
if frame.id in self._frames_seen:
|
||||
return
|
||||
|
||||
@@ -9,8 +9,9 @@ from typing import List, Literal, Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.observers.base_observer import FramePushed
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIObserver
|
||||
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
|
||||
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
|
||||
|
||||
|
||||
@@ -27,18 +28,13 @@ class RTVIBotLLMSearchResponseMessage(BaseModel):
|
||||
|
||||
|
||||
class GoogleRTVIObserver(RTVIObserver):
|
||||
def __init__(self, rtvi: FrameProcessor):
|
||||
def __init__(self, rtvi: RTVIProcessor):
|
||||
super().__init__(rtvi)
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
await super().on_push_frame(src, dst, frame, direction, timestamp)
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
await super().on_push_frame(data)
|
||||
|
||||
frame = data.frame
|
||||
|
||||
if isinstance(frame, LLMSearchResponseFrame):
|
||||
await self._handle_llm_search_response_frame(frame)
|
||||
|
||||
@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -42,14 +42,10 @@ class HeartbeatsObserver(BaseObserver):
|
||||
self._target = target
|
||||
self._callback = heartbeat_callback
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
src = data.source
|
||||
frame = data.frame
|
||||
|
||||
if src == self._target and isinstance(frame, HeartbeatFrame):
|
||||
await self._callback(self._target, frame)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user