diff --git a/CHANGELOG.md b/CHANGELOG.md index 950668ca2..666a87c2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Allow specifying frame processors' name through a new `name` constructor argument. +- Added `report_only_initial_ttfb` to `PipelineParams`. This will make it so + only the initial TTFB metrics after the user stops talking are reported. + ### Changed - `FrameSerializer.deserialize()` can now return `None` in case it is not diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index b37b5decb..531dc75c4 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -76,7 +76,8 @@ async def main(room_url: str, token): task = PipelineTask(pipeline, PipelineParams( allow_interruptions=True, - enable_metrics=True + enable_metrics=True, + report_only_initial_ttfb=True, )) @transport.event_handler("on_first_participant_joined") diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 572246c6d..49c3eaaaf 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -189,6 +189,7 @@ class StartFrame(SystemFrame): """This is the first frame that should be pushed down a pipeline.""" allow_interruptions: bool = False enable_metrics: bool = False + report_only_initial_ttfb: bool = False @dataclass diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 32aeea61f..cf9a9404e 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -21,6 +21,7 @@ from loguru import logger class PipelineParams(BaseModel): allow_interruptions: bool = False enable_metrics: bool = False + report_only_initial_ttfb: bool = False class Source(FrameProcessor): @@ -99,6 +100,7 @@ class PipelineTask: start_frame = StartFrame( allow_interruptions=self._params.allow_interruptions, enable_metrics=self._params.enable_metrics, + report_only_initial_ttfb=self._params.report_only_initial_ttfb ) await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM) await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 4f2882e9d..4c60949bd 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -73,6 +73,7 @@ class LLMResponseAggregator(FrameProcessor): # S I E I T -> X # S E T -> X # S E I T -> X + # # The following case would not be supported: # # S I E T1 I T2 -> X @@ -90,6 +91,7 @@ class LLMResponseAggregator(FrameProcessor): self._seen_start_frame = True self._seen_end_frame = False self._seen_interim_results = False + await self.push_frame(frame, direction) elif isinstance(frame, self._end_frame): self._seen_end_frame = True self._seen_start_frame = False @@ -102,6 +104,7 @@ class LLMResponseAggregator(FrameProcessor): # Send the aggregation if we are not aggregating anymore (i.e. no # more interim results received). send_aggregation = not self._aggregating + await self.push_frame(frame, direction) elif isinstance(frame, self._accumulator_frame): if self._aggregating: self._aggregation += f" {frame.text}" diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index 12f3bcb93..af87e1b0f 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -91,6 +91,7 @@ class ResponseAggregator(FrameProcessor): self._seen_start_frame = True self._seen_end_frame = False self._seen_interim_results = False + await self.push_frame(frame, direction) elif isinstance(frame, self._end_frame): self._seen_end_frame = True self._seen_start_frame = False @@ -103,6 +104,7 @@ class ResponseAggregator(FrameProcessor): # Send the aggregation if we are not aggregating anymore (i.e. no # more interim results received). send_aggregation = not self._aggregating + await self.push_frame(frame, direction) elif isinstance(frame, self._accumulator_frame): if self._aggregating: self._aggregation += f" {frame.text}" diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index a1dfebc9e..db873ca26 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -9,7 +9,7 @@ import time from enum import Enum -from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame +from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, UserStoppedSpeakingFrame from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -36,9 +36,11 @@ class FrameProcessor: # Properties self._allow_interruptions = False self._enable_metrics = False + self._report_only_initial_ttfb = False # Metrics self._start_ttfb_time = 0 + self._should_report_ttfb = True @property def interruptions_allowed(self): @@ -48,12 +50,17 @@ class FrameProcessor: def metrics_enabled(self): return self._enable_metrics + @property + def report_only_initial_ttfb(self): + return self._report_only_initial_ttfb + def can_generate_metrics(self) -> bool: return False async def start_ttfb_metrics(self): - if self.metrics_enabled: + if self.metrics_enabled and self._should_report_ttfb: self._start_ttfb_time = time.time() + self._should_report_ttfb = not self._report_only_initial_ttfb async def stop_ttfb_metrics(self): if self.metrics_enabled and self._start_ttfb_time > 0: @@ -77,6 +84,9 @@ class FrameProcessor: if isinstance(frame, StartFrame): self._allow_interruptions = frame.allow_interruptions self._enable_metrics = frame.enable_metrics + self._report_only_initial_ttfb = frame.report_only_initial_ttfb + elif isinstance(frame, UserStoppedSpeakingFrame): + self._should_report_ttfb = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM)