diff --git a/CHANGELOG.md b/CHANGELOG.md index ee8a60b22..b42244ea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,15 +12,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added a new `FunctionFilter`. This filter will let you filter frames based on a given function, except system messages which should never be filtered. +- Added `FrameProcessor.can_generate_metrics()` method to indicate if a + processor can generate metrics. In the future this might get an extra argument + to ask for a specific type of metric. + +- Added `BasePipeline`. All pipeline classes should be based on this class. All + subclasses should implement a `processors_with_metrics()` method that returns + a list of all `FrameProcessor`s in the pipeline that can generate metrics. + - Added `enable_metrics` to `PipelineParams`. - Added `MetricsFrame`. The `MetricsFrame` will report different metrics in the system. Right now, it can report TTFB (Time To First Byte) values for different services, that is the time spent between the arrival of a `Frame` to - the processor/service until the first `DataFrame` is pushed downstream. + the processor/service until the first `DataFrame` is pushed downstream. If + metrics are enabled an intial `MetricsFrame` with all the services in the + pipeline will be sent. - Added TTFB metrics and debug logging for TTS services. +### Changed + +- Moved `ParallelTask` to `pipecat.pipeline.parallel_task`. + ### Fixed - Fixed PlayHT TTS service to work properly async. diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 1c2e97e8b..8ac660e15 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -23,11 +23,11 @@ from pipecat.frames.frames import ( from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.parallel_task import ParallelTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.aggregators.gated import GatedAggregator from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator from pipecat.processors.aggregators.sentence import SentenceAggregator -from pipecat.processors.aggregators.parallel_task import ParallelTask from pipecat.services.openai import OpenAILLMService from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService diff --git a/src/pipecat/pipeline/base_pipeline.py b/src/pipecat/pipeline/base_pipeline.py new file mode 100644 index 000000000..54f6499a9 --- /dev/null +++ b/src/pipecat/pipeline/base_pipeline.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from abc import abstractmethod + +from typing import List + +from pipecat.processors.frame_processor import FrameProcessor + + +class BasePipeline(FrameProcessor): + + def __init__(self): + super().__init__() + + @abstractmethod + def processors_with_metrics(self) -> List[FrameProcessor]: + pass diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index ccf72bd90..d045c3493 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -6,6 +6,10 @@ import asyncio +from itertools import chain +from typing import List + +from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame @@ -45,7 +49,7 @@ class Sink(FrameProcessor): await self._down_queue.put(frame) -class ParallelPipeline(FrameProcessor): +class ParallelPipeline(BasePipeline): def __init__(self, *args): super().__init__() @@ -81,6 +85,13 @@ class ParallelPipeline(FrameProcessor): logger.debug(f"Finished creating {self} pipelines") + # + # BasePipeline + # + + def processors_with_metrics(self) -> List[FrameProcessor]: + return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines)) + # # Frame processor # diff --git a/src/pipecat/processors/aggregators/parallel_task.py b/src/pipecat/pipeline/parallel_task.py similarity index 92% rename from src/pipecat/processors/aggregators/parallel_task.py rename to src/pipecat/pipeline/parallel_task.py index ce341bde5..306a772b7 100644 --- a/src/pipecat/processors/aggregators/parallel_task.py +++ b/src/pipecat/pipeline/parallel_task.py @@ -6,8 +6,10 @@ import asyncio +from itertools import chain from typing import List +from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import Frame @@ -47,7 +49,7 @@ class Sink(FrameProcessor): await self._down_queue.put(frame) -class ParallelTask(FrameProcessor): +class ParallelTask(BasePipeline): def __init__(self, *args): super().__init__() @@ -79,6 +81,13 @@ class ParallelTask(FrameProcessor): self._pipelines.append(pipeline) logger.debug(f"Finished creating {self} pipelines") + # + # BasePipeline + # + + def processors_with_metrics(self) -> List[FrameProcessor]: + return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines)) + # # Frame processor # diff --git a/src/pipecat/pipeline/pipeline.py b/src/pipecat/pipeline/pipeline.py index 2cb5b45d4..eb6bd78d1 100644 --- a/src/pipecat/pipeline/pipeline.py +++ b/src/pipecat/pipeline/pipeline.py @@ -4,11 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import asyncio +from itertools import chain from typing import Callable, Coroutine, List -from pipecat.frames.frames import Frame +from pipecat.frames.frames import Frame, MetricsFrame, StartFrame +from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -44,7 +45,7 @@ class PipelineSink(FrameProcessor): await self._downstream_push_frame(frame, direction) -class Pipeline(FrameProcessor): +class Pipeline(BasePipeline): def __init__(self, processors: List[FrameProcessor]): super().__init__() @@ -57,6 +58,19 @@ class Pipeline(FrameProcessor): self._link_processors() + # + # BasePipeline + # + + def processors_with_metrics(self): + services = [] + for p in self._processors: + if isinstance(p, BasePipeline): + services += p.processors_with_metrics() + elif p.can_generate_metrics(): + services.append(p) + return services + # # Frame processor # @@ -67,6 +81,9 @@ class Pipeline(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) + if isinstance(frame, StartFrame) and self.metrics_enabled: + await self._send_initial_metrics() + if direction == FrameDirection.DOWNSTREAM: await self._source.process_frame(frame, FrameDirection.DOWNSTREAM) elif direction == FrameDirection.UPSTREAM: @@ -81,3 +98,9 @@ class Pipeline(FrameProcessor): for curr in self._processors[1:]: prev.link(curr) prev = curr + + async def _send_initial_metrics(self): + processors = self.processors_with_metrics() + ttfb = dict(zip([p.name for p in processors], [0] * len(processors))) + frame = MetricsFrame(ttfb=ttfb) + await self._source.process_frame(frame, FrameDirection.DOWNSTREAM) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index e0a072477..49651c7a2 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -44,6 +44,9 @@ class FrameProcessor: def metrics_enabled(self): return self._enable_metrics + def can_generate_metrics(self) -> bool: + return False + async def start_ttfb_metrics(self): if self.metrics_enabled: self._start_ttfb_time = time.time() diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 95b31f336..0f3d7d241 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -49,6 +49,9 @@ class AnthropicLLMService(LLMService): self._model = model self._max_tokens = max_tokens + def can_generate_metrics(self) -> bool: + return True + def _get_messages_from_openai_context( self, context: OpenAILLMContext): openai_messages = context.get_messages() @@ -101,7 +104,7 @@ class AnthropicLLMService(LLMService): messages = self._get_messages_from_openai_context(context) - await self.start_ttfb_metric() + await self.start_ttfb_metrics() response = await self._client.messages.create( messages=messages, @@ -109,7 +112,7 @@ class AnthropicLLMService(LLMService): max_tokens=self._max_tokens, stream=True) - await self.stop_ttfb_metric() + await self.stop_ttfb_metrics() async for event in response: # logger.debug(f"Anthropic LLM event: {event}") @@ -119,7 +122,7 @@ class AnthropicLLMService(LLMService): await self.push_frame(LLMResponseEndFrame()) except Exception as e: - logger.error(f"Anthrophic exception: {e}") + logger.error(f"Anthropic exception: {e}") finally: await self.push_frame(LLMFullResponseEndFrame()) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index d35584303..f4184f647 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -44,6 +44,9 @@ class AzureTTSService(TTSService): ) self._voice = voice + def can_generate_metrics(self) -> bool: + return True + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: {text}") diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 474600ffa..161f0589b 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -39,6 +39,9 @@ class CartesiaTTSService(TTSService): except Exception as e: logger.error(f"Cartesia initialization error: {e}") + def can_generate_metrics(self) -> bool: + return True + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index aaf0c01ee..ce418916b 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -29,6 +29,9 @@ class DeepgramTTSService(TTSService): self._api_key = api_key self._aiohttp_session = aiohttp_session + def can_generate_metrics(self) -> bool: + return True + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 717460fd3..256f077c3 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -31,6 +31,9 @@ class ElevenLabsTTSService(TTSService): self._aiohttp_session = aiohttp_session self._model = model + def can_generate_metrics(self) -> bool: + return True + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index ffe9a9fee..b1da9ddf6 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -47,6 +47,9 @@ class GoogleLLMService(LLMService): gai.configure(api_key=api_key) self._client = gai.GenerativeModel(model) + def can_generate_metrics(self) -> bool: + return True + def _get_messages_from_openai_context( self, context: OpenAILLMContext) -> List[glm.Content]: openai_messages = context.get_messages() diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index aaed7d8e7..daa6125b3 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -76,6 +76,9 @@ class BaseOpenAILLMService(LLMService): def create_client(self, api_key=None, base_url=None): return AsyncOpenAI(api_key=api_key, base_url=base_url) + def can_generate_metrics(self) -> bool: + return True + async def _stream_chat_completions( self, context: OpenAILLMContext ) -> AsyncStream[ChatCompletionChunk]: @@ -107,15 +110,6 @@ class BaseOpenAILLMService(LLMService): return chunks - async def _chat_completions(self, messages) -> str | None: - response: ChatCompletion = await self._client.chat.completions.create( - model=self._model, stream=False, messages=messages - ) - if response and len(response.choices) > 0: - return response.choices[0].message.content - else: - return None - async def _process_context(self, context: OpenAILLMContext): function_name = "" arguments = "" @@ -307,6 +301,9 @@ class OpenAITTSService(TTSService): self._client = AsyncOpenAI(api_key=api_key) + def can_generate_metrics(self) -> bool: + return True + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index cf38e125d..c44719047 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -43,8 +43,8 @@ class PlayHTTTSService(TTSService): quality="higher", format=Format.FORMAT_WAV) - def __del__(self): - self._client.close() + def can_generate_metrics(self) -> bool: + return True async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index f1b37712e..6dfc5080e 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -56,6 +56,9 @@ class WhisperSTTService(STTService): self._model: WhisperModel | None = None self._load() + def can_generate_metrics(self) -> bool: + return True + def _load(self): """Loads the Whisper model. Note that if this is the first time this model is being run, it will take time to download.""" diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 0b440181c..30e857c2d 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -527,8 +527,6 @@ class DailyInputTransport(BaseInputTransport): if isinstance(frame, UserImageRequestFrame): self.request_participant_image(frame.user_id) - await super().process_frame(frame, direction) - # # Frames #