metrics: allow sending only initial TTFB metrics

This commit is contained in:
Aleix Conchillo Flaqué
2024-06-13 13:28:10 -07:00
parent 77a3b2ea5c
commit cb27e86266
7 changed files with 25 additions and 3 deletions

View File

@@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Allow specifying frame processors' name through a new `name` constructor
argument.
- Added `report_only_initial_ttfb` to `PipelineParams`. This will make it so
only the initial TTFB metrics after the user stops talking are reported.
### Changed
- `FrameSerializer.deserialize()` can now return `None` in case it is not

View File

@@ -76,7 +76,8 @@ async def main(room_url: str, token):
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True
enable_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")

View File

@@ -189,6 +189,7 @@ class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass

View File

@@ -21,6 +21,7 @@ from loguru import logger
class PipelineParams(BaseModel):
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False
class Source(FrameProcessor):
@@ -99,6 +100,7 @@ class PipelineTask:
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)

View File

@@ -73,6 +73,7 @@ class LLMResponseAggregator(FrameProcessor):
# S I E I T -> X
# S E T -> X
# S E I T -> X
#
# The following case would not be supported:
#
# S I E T1 I T2 -> X
@@ -90,6 +91,7 @@ class LLMResponseAggregator(FrameProcessor):
self._seen_start_frame = True
self._seen_end_frame = False
self._seen_interim_results = False
await self.push_frame(frame, direction)
elif isinstance(frame, self._end_frame):
self._seen_end_frame = True
self._seen_start_frame = False
@@ -102,6 +104,7 @@ class LLMResponseAggregator(FrameProcessor):
# Send the aggregation if we are not aggregating anymore (i.e. no
# more interim results received).
send_aggregation = not self._aggregating
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"

View File

@@ -91,6 +91,7 @@ class ResponseAggregator(FrameProcessor):
self._seen_start_frame = True
self._seen_end_frame = False
self._seen_interim_results = False
await self.push_frame(frame, direction)
elif isinstance(frame, self._end_frame):
self._seen_end_frame = True
self._seen_start_frame = False
@@ -103,6 +104,7 @@ class ResponseAggregator(FrameProcessor):
# Send the aggregation if we are not aggregating anymore (i.e. no
# more interim results received).
send_aggregation = not self._aggregating
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"

View File

@@ -9,7 +9,7 @@ import time
from enum import Enum
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, UserStoppedSpeakingFrame
from pipecat.utils.utils import obj_count, obj_id
from loguru import logger
@@ -36,9 +36,11 @@ class FrameProcessor:
# Properties
self._allow_interruptions = False
self._enable_metrics = False
self._report_only_initial_ttfb = False
# Metrics
self._start_ttfb_time = 0
self._should_report_ttfb = True
@property
def interruptions_allowed(self):
@@ -48,12 +50,17 @@ class FrameProcessor:
def metrics_enabled(self):
return self._enable_metrics
@property
def report_only_initial_ttfb(self):
return self._report_only_initial_ttfb
def can_generate_metrics(self) -> bool:
return False
async def start_ttfb_metrics(self):
if self.metrics_enabled:
if self.metrics_enabled and self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not self._report_only_initial_ttfb
async def stop_ttfb_metrics(self):
if self.metrics_enabled and self._start_ttfb_time > 0:
@@ -77,6 +84,9 @@ class FrameProcessor:
if isinstance(frame, StartFrame):
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, UserStoppedSpeakingFrame):
self._should_report_ttfb = True
async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)