Merge pull request #218 from pipecat-ai/aleix/send-inital-metrics-mapping

send inital metrics mapping
This commit is contained in:
Aleix Conchillo Flaqué
2024-06-08 04:41:59 +08:00
committed by GitHub
17 changed files with 120 additions and 23 deletions

View File

@@ -12,15 +12,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added a new `FunctionFilter`. This filter will let you filter frames based on
a given function, except system messages which should never be filtered.
- Added `FrameProcessor.can_generate_metrics()` method to indicate if a
processor can generate metrics. In the future this might get an extra argument
to ask for a specific type of metric.
- Added `BasePipeline`. All pipeline classes should be based on this class. All
subclasses should implement a `processors_with_metrics()` method that returns
a list of all `FrameProcessor`s in the pipeline that can generate metrics.
- Added `enable_metrics` to `PipelineParams`.
- Added `MetricsFrame`. The `MetricsFrame` will report different metrics in the
system. Right now, it can report TTFB (Time To First Byte) values for
different services, that is the time spent between the arrival of a `Frame` to
the processor/service until the first `DataFrame` is pushed downstream.
the processor/service until the first `DataFrame` is pushed downstream. If
metrics are enabled an intial `MetricsFrame` with all the services in the
pipeline will be sent.
- Added TTFB metrics and debug logging for TTS services.
### Changed
- Moved `ParallelTask` to `pipecat.pipeline.parallel_task`.
### Fixed
- Fixed PlayHT TTS service to work properly async.

View File

@@ -23,11 +23,11 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.parallel_task import ParallelTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.aggregators.parallel_task import ParallelTask
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import abstractmethod
from typing import List
from pipecat.processors.frame_processor import FrameProcessor
class BasePipeline(FrameProcessor):
def __init__(self):
super().__init__()
@abstractmethod
def processors_with_metrics(self) -> List[FrameProcessor]:
pass

View File

@@ -6,6 +6,10 @@
import asyncio
from itertools import chain
from typing import List
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
@@ -45,7 +49,7 @@ class Sink(FrameProcessor):
await self._down_queue.put(frame)
class ParallelPipeline(FrameProcessor):
class ParallelPipeline(BasePipeline):
def __init__(self, *args):
super().__init__()
@@ -81,6 +85,13 @@ class ParallelPipeline(FrameProcessor):
logger.debug(f"Finished creating {self} pipelines")
#
# BasePipeline
#
def processors_with_metrics(self) -> List[FrameProcessor]:
return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines))
#
# Frame processor
#

View File

@@ -6,8 +6,10 @@
import asyncio
from itertools import chain
from typing import List
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import Frame
@@ -47,7 +49,7 @@ class Sink(FrameProcessor):
await self._down_queue.put(frame)
class ParallelTask(FrameProcessor):
class ParallelTask(BasePipeline):
def __init__(self, *args):
super().__init__()
@@ -79,6 +81,13 @@ class ParallelTask(FrameProcessor):
self._pipelines.append(pipeline)
logger.debug(f"Finished creating {self} pipelines")
#
# BasePipeline
#
def processors_with_metrics(self) -> List[FrameProcessor]:
return list(chain.from_iterable(p.processors_with_metrics() for p in self._pipelines))
#
# Frame processor
#

View File

@@ -4,11 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from itertools import chain
from typing import Callable, Coroutine, List
from pipecat.frames.frames import Frame
from pipecat.frames.frames import Frame, MetricsFrame, StartFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -44,7 +45,7 @@ class PipelineSink(FrameProcessor):
await self._downstream_push_frame(frame, direction)
class Pipeline(FrameProcessor):
class Pipeline(BasePipeline):
def __init__(self, processors: List[FrameProcessor]):
super().__init__()
@@ -57,6 +58,19 @@ class Pipeline(FrameProcessor):
self._link_processors()
#
# BasePipeline
#
def processors_with_metrics(self):
services = []
for p in self._processors:
if isinstance(p, BasePipeline):
services += p.processors_with_metrics()
elif p.can_generate_metrics():
services.append(p)
return services
#
# Frame processor
#
@@ -67,6 +81,9 @@ class Pipeline(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame) and self.metrics_enabled:
await self._send_initial_metrics()
if direction == FrameDirection.DOWNSTREAM:
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
elif direction == FrameDirection.UPSTREAM:
@@ -81,3 +98,9 @@ class Pipeline(FrameProcessor):
for curr in self._processors[1:]:
prev.link(curr)
prev = curr
async def _send_initial_metrics(self):
processors = self.processors_with_metrics()
ttfb = dict(zip([p.name for p in processors], [0] * len(processors)))
frame = MetricsFrame(ttfb=ttfb)
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)

View File

@@ -44,6 +44,9 @@ class FrameProcessor:
def metrics_enabled(self):
return self._enable_metrics
def can_generate_metrics(self) -> bool:
return False
async def start_ttfb_metrics(self):
if self.metrics_enabled:
self._start_ttfb_time = time.time()

View File

@@ -49,6 +49,9 @@ class AnthropicLLMService(LLMService):
self._model = model
self._max_tokens = max_tokens
def can_generate_metrics(self) -> bool:
return True
def _get_messages_from_openai_context(
self, context: OpenAILLMContext):
openai_messages = context.get_messages()
@@ -101,7 +104,7 @@ class AnthropicLLMService(LLMService):
messages = self._get_messages_from_openai_context(context)
await self.start_ttfb_metric()
await self.start_ttfb_metrics()
response = await self._client.messages.create(
messages=messages,
@@ -109,7 +112,7 @@ class AnthropicLLMService(LLMService):
max_tokens=self._max_tokens,
stream=True)
await self.stop_ttfb_metric()
await self.stop_ttfb_metrics()
async for event in response:
# logger.debug(f"Anthropic LLM event: {event}")
@@ -119,7 +122,7 @@ class AnthropicLLMService(LLMService):
await self.push_frame(LLMResponseEndFrame())
except Exception as e:
logger.error(f"Anthrophic exception: {e}")
logger.error(f"Anthropic exception: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -44,6 +44,9 @@ class AzureTTSService(TTSService):
)
self._voice = voice
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: {text}")

View File

@@ -39,6 +39,9 @@ class CartesiaTTSService(TTSService):
except Exception as e:
logger.error(f"Cartesia initialization error: {e}")
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -29,6 +29,9 @@ class DeepgramTTSService(TTSService):
self._api_key = api_key
self._aiohttp_session = aiohttp_session
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -31,6 +31,9 @@ class ElevenLabsTTSService(TTSService):
self._aiohttp_session = aiohttp_session
self._model = model
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -47,6 +47,9 @@ class GoogleLLMService(LLMService):
gai.configure(api_key=api_key)
self._client = gai.GenerativeModel(model)
def can_generate_metrics(self) -> bool:
return True
def _get_messages_from_openai_context(
self, context: OpenAILLMContext) -> List[glm.Content]:
openai_messages = context.get_messages()

View File

@@ -76,6 +76,9 @@ class BaseOpenAILLMService(LLMService):
def create_client(self, api_key=None, base_url=None):
return AsyncOpenAI(api_key=api_key, base_url=base_url)
def can_generate_metrics(self) -> bool:
return True
async def _stream_chat_completions(
self, context: OpenAILLMContext
) -> AsyncStream[ChatCompletionChunk]:
@@ -107,15 +110,6 @@ class BaseOpenAILLMService(LLMService):
return chunks
async def _chat_completions(self, messages) -> str | None:
response: ChatCompletion = await self._client.chat.completions.create(
model=self._model, stream=False, messages=messages
)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
async def _process_context(self, context: OpenAILLMContext):
function_name = ""
arguments = ""
@@ -307,6 +301,9 @@ class OpenAITTSService(TTSService):
self._client = AsyncOpenAI(api_key=api_key)
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -43,8 +43,8 @@ class PlayHTTTSService(TTSService):
quality="higher",
format=Format.FORMAT_WAV)
def __del__(self):
self._client.close()
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -56,6 +56,9 @@ class WhisperSTTService(STTService):
self._model: WhisperModel | None = None
self._load()
def can_generate_metrics(self) -> bool:
return True
def _load(self):
"""Loads the Whisper model. Note that if this is the first time
this model is being run, it will take time to download."""

View File

@@ -527,8 +527,6 @@ class DailyInputTransport(BaseInputTransport):
if isinstance(frame, UserImageRequestFrame):
self.request_participant_image(frame.user_id)
await super().process_frame(frame, direction)
#
# Frames
#