diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e5130d7a..1a6274667 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Observers `on_push_frame()` now take a single argument `FramePushed` instead + of multiple arguments. + - Updated the default voice for `DeepgramTTSService` to `aura-2-helena-en`. +### Deprecated + +- Observer `on_push_frame(src, dst, frame, direction, timestamp)` is now + deprecated, use `on_push_frame(data: FramePushed)` instead. + ### Fixed +- Fixed a `UltravoxSTTService` issue that would cause the service to generate + all tokens as one word. + - Fixed a `PipelineTask` issue that would cause tasks to not be cancelled if task was cancelled from outside of Pipecat. diff --git a/examples/foundational/30-observer.py b/examples/foundational/30-observer.py index d8c2ec100..46bd96e53 100644 --- a/examples/foundational/30-observer.py +++ b/examples/foundational/30-observer.py @@ -14,16 +14,15 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, - Frame, StartInterruptionFrame, ) -from pipecat.observers.base_observer import BaseObserver +from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.observers.loggers.llm_log_observer import LLMLogObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frame_processor import FrameDirection from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService @@ -46,14 +45,13 @@ class DebugObserver(BaseObserver): Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s """ - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): + async def on_push_frame(self, data: FramePushed): + src = data.source + dst = data.destination + frame = data.frame + direction = data.direction + timestamp = data.timestamp + # Convert timestamp to seconds for readability time_sec = timestamp / 1_000_000_000 diff --git a/src/pipecat/observers/base_observer.py b/src/pipecat/observers/base_observer.py index 46f746946..f1a0c2a1b 100644 --- a/src/pipecat/observers/base_observer.py +++ b/src/pipecat/observers/base_observer.py @@ -5,9 +5,38 @@ # from abc import ABC, abstractmethod +from dataclasses import dataclass + +from typing_extensions import TYPE_CHECKING from pipecat.frames.frames import Frame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +if TYPE_CHECKING: + from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +@dataclass +class FramePushed: + """Represents an event where a frame is pushed from one processor to another + within the pipeline. + + This data structure is typically used by observers to track the flow of + frames through the pipeline for logging, debugging, or analytics purposes. + + Attributes: + source (FrameProcessor): The processor sending the frame. + destination (FrameProcessor): The processor receiving the frame. + frame (Frame): The frame being transferred. + direction (FrameDirection): The direction of the transfer (e.g., downstream or upstream). + timestamp (int): The time when the frame was pushed, based on the pipeline clock. + + """ + + source: "FrameProcessor" + destination: "FrameProcessor" + frame: Frame + direction: "FrameDirection" + timestamp: int class BaseObserver(ABC): @@ -19,26 +48,15 @@ class BaseObserver(ABC): """ @abstractmethod - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): - """Abstract method to handle the event when a frame is pushed from one - processor to another. + async def on_push_frame(self, data: FramePushed): + """Handle the event when a frame is pushed from one processor to another. + + This method should be implemented by subclasses to define specific + behavior (e.g., logging, monitoring, debugging) when a frame is + transferred through the pipeline. Args: - src (FrameProcessor): The source frame processor that is sending the frame. - dst (FrameProcessor): The destination frame processor that will receive the frame. - frame (Frame): The frame being transferred between processors. - direction (FrameDirection): The direction of the frame transfer. - timestamp (int): The timestamp when the frame was pushed (based on the pipeline clock). - - This method should be implemented by subclasses to define specific behavior - when a frame is pushed. + data (FramePushed): The event data containing details about the frame transfer. """ pass diff --git a/src/pipecat/observers/loggers/llm_log_observer.py b/src/pipecat/observers/loggers/llm_log_observer.py index dd270abf5..9e4d53b28 100644 --- a/src/pipecat/observers/loggers/llm_log_observer.py +++ b/src/pipecat/observers/loggers/llm_log_observer.py @@ -15,7 +15,7 @@ from pipecat.frames.frames import ( LLMMessagesFrame, LLMTextFrame, ) -from pipecat.observers.base_observer import BaseObserver +from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.llm_service import LLMService @@ -38,14 +38,13 @@ class LLMLogObserver(BaseObserver): """ - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): + async def on_push_frame(self, data: FramePushed): + src = data.source + dst = data.destination + frame = data.frame + direction = data.direction + timestamp = data.timestamp + if not isinstance(src, LLMService) and not isinstance(dst, LLMService): return diff --git a/src/pipecat/observers/loggers/transcription_log_observer.py b/src/pipecat/observers/loggers/transcription_log_observer.py index 4547ee54f..57e38c952 100644 --- a/src/pipecat/observers/loggers/transcription_log_observer.py +++ b/src/pipecat/observers/loggers/transcription_log_observer.py @@ -11,7 +11,7 @@ from pipecat.frames.frames import ( InterimTranscriptionFrame, TranscriptionFrame, ) -from pipecat.observers.base_observer import BaseObserver +from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.stt_service import STTService @@ -29,14 +29,11 @@ class TranscriptionLogObserver(BaseObserver): """ - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): + async def on_push_frame(self, data: FramePushed): + src = data.source + frame = data.frame + timestamp = data.timestamp + if not isinstance(src, STTService): return diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index dd805032c..252708f8c 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -5,13 +5,12 @@ # import asyncio +import inspect from typing import List from attr import dataclass -from pipecat.frames.frames import Frame -from pipecat.observers.base_observer import BaseObserver -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.utils.asyncio import BaseTaskManager @@ -27,20 +26,6 @@ class Proxy: observer: BaseObserver -@dataclass -class ObserverData: - """This is the data we receive from the main observer and that we put into a - proxy queue for later processing. - - """ - - src: FrameProcessor - dst: FrameProcessor - frame: Frame - direction: FrameDirection - timestamp: int - - class TaskObserver(BaseObserver): """This is a pipeline frame observer that is meant to be used as a proxy to the user provided observers. That is, this is the observer that should be @@ -68,20 +53,9 @@ class TaskObserver(BaseObserver): for proxy in self._proxies: await self._task_manager.cancel_task(proxy.task) - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): + async def on_push_frame(self, data: FramePushed): for proxy in self._proxies: - await proxy.queue.put( - ObserverData( - src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp - ) - ) + await proxy.queue.put(data) def _create_proxies(self, observers) -> List[Proxy]: proxies = [] @@ -96,8 +70,26 @@ class TaskObserver(BaseObserver): return proxies async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver): + warning_reported = False while True: data = await queue.get() - await observer.on_push_frame( - data.src, data.dst, data.frame, data.direction, data.timestamp - ) + + signature = inspect.signature(observer.on_push_frame) + if len(signature.parameters) > 1: + if not warning_reported: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.", + DeprecationWarning, + ) + warning_reported = True + await observer.on_push_frame( + data.src, data.dst, data.frame, data.direction, data.timestamp + ) + else: + await observer.on_push_frame(data) + + queue.task_done() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 590698e7f..97cc24378 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -21,6 +21,7 @@ from pipecat.frames.frames import ( SystemFrame, ) from pipecat.metrics.metrics import LLMTokenUsage, MetricsData +from pipecat.observers.base_observer import FramePushed from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.asyncio import BaseTaskManager from pipecat.utils.base_object import BaseObject @@ -294,17 +295,28 @@ class FrameProcessor(BaseObject): timestamp = self._clock.get_time() if self._clock else 0 if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") + if self._observer: - await self._observer.on_push_frame( - self, self._next, frame, direction, timestamp + data = FramePushed( + source=self, + destination=self._next, + frame=frame, + direction=direction, + timestamp=timestamp, ) + await self._observer.on_push_frame(data) await self._next.queue_frame(frame, direction) elif direction == FrameDirection.UPSTREAM and self._prev: logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}") if self._observer: - await self._observer.on_push_frame( - self, self._prev, frame, direction, timestamp + data = FramePushed( + source=self, + destination=self._prev, + frame=frame, + direction=direction, + timestamp=timestamp, ) + await self._observer.on_push_frame(data) await self._prev.queue_frame(frame, direction) except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 55e91d7ff..ee0cced87 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -55,7 +55,7 @@ from pipecat.metrics.metrics import ( TTFBMetricsData, TTSUsageMetricsData, ) -from pipecat.observers.base_observer import BaseObserver +from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContext, OpenAILLMContextFrame, @@ -445,14 +445,7 @@ class RTVIObserver(BaseObserver): self._frames_seen = set() rtvi.set_errors_enabled(self._params.errors_enabled) - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): + async def on_push_frame(self, data: FramePushed): """Process a frame being pushed through the pipeline. Args: @@ -462,6 +455,10 @@ class RTVIObserver(BaseObserver): direction: Direction of frame flow in pipeline timestamp: Time when frame was pushed """ + src = data.source + frame = data.frame + direction = data.direction + # If we have already seen this frame, let's skip it. if frame.id in self._frames_seen: return diff --git a/src/pipecat/services/google/rtvi.py b/src/pipecat/services/google/rtvi.py index 88e67e6c6..cd60f6f1f 100644 --- a/src/pipecat/services/google/rtvi.py +++ b/src/pipecat/services/google/rtvi.py @@ -9,8 +9,9 @@ from typing import List, Literal, Optional from pydantic import BaseModel from pipecat.frames.frames import Frame +from pipecat.observers.base_observer import FramePushed from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.processors.frameworks.rtvi import RTVIObserver +from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame @@ -27,18 +28,13 @@ class RTVIBotLLMSearchResponseMessage(BaseModel): class GoogleRTVIObserver(RTVIObserver): - def __init__(self, rtvi: FrameProcessor): + def __init__(self, rtvi: RTVIProcessor): super().__init__(rtvi) - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): - await super().on_push_frame(src, dst, frame, direction, timestamp) + async def on_push_frame(self, data: FramePushed): + await super().on_push_frame(data) + + frame = data.frame if isinstance(frame, LLMSearchResponseFrame): await self._handle_llm_search_response_frame(frame) diff --git a/src/pipecat/tests/utils.py b/src/pipecat/tests/utils.py index e2368ba09..b5dfc5de1 100644 --- a/src/pipecat/tests/utils.py +++ b/src/pipecat/tests/utils.py @@ -15,7 +15,7 @@ from pipecat.frames.frames import ( StartFrame, SystemFrame, ) -from pipecat.observers.base_observer import BaseObserver +from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -42,14 +42,10 @@ class HeartbeatsObserver(BaseObserver): self._target = target self._callback = heartbeat_callback - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): + async def on_push_frame(self, data: FramePushed): + src = data.source + frame = data.frame + if src == self._target and isinstance(frame, HeartbeatFrame): await self._callback(self._target, frame)