From c7a0d0db64c01fa34694d8d2b504b0ff13609d72 Mon Sep 17 00:00:00 2001 From: Ankur Duggal <38927181+ankykong@users.noreply.github.com> Date: Wed, 12 Jun 2024 14:23:56 -0700 Subject: [PATCH] OpenPipe Integration --- CHANGELOG.md | 21 +++ dot-env.template | 3 + .../06b-listen-and-respond-openpipe.py | 109 +++++++++++++ linux-py3.10-requirements.txt | 2 + macos-py3.10-requirements.txt | 2 + pyproject.toml | 1 + src/pipecat/services/openpipe.py | 148 ++++++++++++++++++ 7 files changed, 286 insertions(+) create mode 100644 examples/foundational/06b-listen-and-respond-openpipe.py create mode 100644 src/pipecat/services/openpipe.py diff --git a/CHANGELOG.md b/CHANGELOG.md index da1d9e9ba..471502b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new `Service`. This service will let you run OpenAI through + OpenPipe's SDK. + +### Changed + +- `OpenPipe` can now be used. Can call OpenAI through OpenPipe SDK to get + LLM logging stored in OpenPipe. + +### Fixed + +- None + +### Other + +- Added new `openpipe` example. This example shows how to use OpenPipe to run + OpenAI LLMs and get the logs stored in OpenPipe. + +## [0.0.29] - 2024-06-12 + +### Added + - Allow specifying frame processors' name through a new `name` constructor argument. diff --git a/dot-env.template b/dot-env.template index bb7db9e9f..d1aff9382 100644 --- a/dot-env.template +++ b/dot-env.template @@ -33,3 +33,6 @@ PLAY_HT_API_KEY=... # OpenAI OPENAI_API_KEY=... + +#OpenPipe +OPENPIPE_API_KEY=... diff --git a/examples/foundational/06b-listen-and-respond-openpipe.py b/examples/foundational/06b-listen-and-respond-openpipe.py new file mode 100644 index 000000000..3779c374e --- /dev/null +++ b/examples/foundational/06b-listen-and-respond-openpipe.py @@ -0,0 +1,109 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.processors.logger import FrameLogger +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openpipe import OpenPipeLLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger +import time + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(room_url: str, token): + + timestamp = int(time.time()) + + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + llm = OpenPipeLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o", + cli_id=f"cli-{timestamp}" + ) + + fl = FrameLogger("!!! after LLM", "red") + fltts = FrameLogger("@@@ out of tts", "green") + flend = FrameLogger("### out of the end", "magenta") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), + tma_in, + llm, + fl, + tts, + fltts, + transport.output(), + tma_out, + flend + ]) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/linux-py3.10-requirements.txt b/linux-py3.10-requirements.txt index 55acfcdd8..d68106ee6 100644 --- a/linux-py3.10-requirements.txt +++ b/linux-py3.10-requirements.txt @@ -265,6 +265,8 @@ openai==1.26.0 # via # langchain-openai # pipecat-ai (pyproject.toml) +openpipe==4.13.0 + # via pipecat-ai (pyproject.toml) orjson==3.10.4 # via langsmith packaging==23.2 diff --git a/macos-py3.10-requirements.txt b/macos-py3.10-requirements.txt index df0d7ff0d..38adada24 100644 --- a/macos-py3.10-requirements.txt +++ b/macos-py3.10-requirements.txt @@ -231,6 +231,8 @@ openai==1.26.0 # via # langchain-openai # pipecat-ai (pyproject.toml) +openpipe==4.13.0 + # via pipecat-ai (pyproject.toml) orjson==3.10.4 # via langsmith packaging==23.2 diff --git a/pyproject.toml b/pyproject.toml index 67dfc7e53..e777a4caf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ playht = [ "pyht~=0.0.28" ] silero = [ "torch~=2.3.0", "torchaudio~=2.3.0" ] websocket = [ "websockets~=12.0" ] whisper = [ "faster-whisper~=1.0.2" ] +openpipe = [ "openpipe~=4.13.0" ] [tool.setuptools.packages.find] # All the following settings are optional: diff --git a/src/pipecat/services/openpipe.py b/src/pipecat/services/openpipe.py new file mode 100644 index 000000000..667ccec74 --- /dev/null +++ b/src/pipecat/services/openpipe.py @@ -0,0 +1,148 @@ +from pipecat.services.ai_services import LLMService +from openpipe import AsyncOpenAI as OpenPipeAI +from openpipe import AsyncStream +import os +from loguru import logger +import secrets +import time +import base64 +from openai.types.chat import (ChatCompletionMessageParam, ChatCompletionChunk) +from typing import List +from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMMessagesFrame, + LLMResponseEndFrame, + LLMResponseStartFrame, + TextFrame, + URLImageRawFrame, + VisionImageRawFrame +) + + +class BaseOpenPipeLLMService(LLMService): + + def __init__(self, model: str, c_id=None, api_key=None, openpipe_api_key=None, openpipe_base_url=None, prompt=None): + super().__init__() + self._model = model + self._client = self.create_client(api_key=api_key, openpipe_api_key=openpipe_api_key, openpipe_base_url=openpipe_base_url) + self.c_id=c_id if c_id else secrets.token_urlsafe(16) + self.prompt = prompt + logger.debug(f"Client Created: {self._client}") + + def create_client(self, api_key=None, openpipe_api_key=None, openpipe_base_url=None): + # Set up the OpenPipe client with the provided API keys and base URL + client = OpenPipeAI( + api_key=api_key or os.environ.get("OPENAI_API_KEY"), + openpipe={ + "api_key": openpipe_api_key or os.environ.get("OPENPIPE_API_KEY"), + "base_url": openpipe_base_url or "https://app.openpipe.ai/api/v1" + } + ) + return client + + async def _stream_chat_completions(self, context): + logger.debug(f"Generating chat: {context.get_messages_json()}") + + messages: List[ChatCompletionMessageParam] = context.get_messages() + + # base64 encode any images + for message in messages: + if message.get("mime_type") == "image/jpeg": + encoded_image = base64.b64encode(message["data"].getvalue()).decode("utf-8") + text = message["content"] + message["content"] = [ + {"type": "text", "text": text}, + {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}} + ] + del message["data"] + del message["mime_type"] + + start_time = time.time() + # Stream chat completions using the OpenPipe client + chunks = ( + await self._client.chat.completions.create( + model=self._model, + stream=True, + messages=messages, + openpipe={ + "tags": {"conversation_id": self.c_id, + "prompt":self.prompt}, + "log_request": True + } + ) + ) + + logger.debug(f"OpenPipe LLM TTFB: {time.time() - start_time}") + + return chunks + + async def _process_context(self, context): + function_name = "" + arguments = "" + + chunk_stream: AsyncStream[ChatCompletionChunk] = ( + await self._stream_chat_completions(context) + ) + + await self.push_frame(LLMFullResponseStartFrame()) + + async for chunk in chunk_stream: + if len(chunk.choices) == 0: + continue + + if chunk.choices[0].delta.tool_calls: + # We're streaming the LLM response to enable the fastest response times. + # For text, we just yield each chunk as we receive it and count on consumers + # to do whatever coalescing they need (eg. to pass full sentences to TTS) + # + # If the LLM is a function call, we'll do some coalescing here. + # If the response contains a function name, we'll yield a frame to tell consumers + # that they can start preparing to call the function with that name. + # We accumulate all the arguments for the rest of the streamed response, then when + # the response is done, we package up all the arguments and the function name and + # yield a frame containing the function name and the arguments. + + tool_call = chunk.choices[0].delta.tool_calls[0] + if tool_call.function and tool_call.function.name: + function_name += tool_call.function.name + # yield LLMFunctionStartFrame(function_name=tool_call.function.name) + if tool_call.function and tool_call.function.arguments: + # Keep iterating through the response to collect all the argument fragments and + # yield a complete LLMFunctionCallFrame after run_llm_async + # completes + arguments += tool_call.function.arguments + elif chunk.choices[0].delta.content: + await self.push_frame(LLMResponseStartFrame()) + await self.push_frame(TextFrame(chunk.choices[0].delta.content)) + await self.push_frame(LLMResponseEndFrame()) + + await self.push_frame(LLMFullResponseEndFrame()) + + # if we got a function name and arguments, yield the frame with all the info so + # frame consumers can take action based on the function call. + # if function_name and arguments: + # yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + context = None + if isinstance(frame, OpenAILLMContextFrame): + context: OpenAILLMContext = frame.context + elif isinstance(frame, LLMMessagesFrame): + context = OpenAILLMContext.from_messages(frame.messages) + elif isinstance(frame, VisionImageRawFrame): + context = OpenAILLMContext.from_image_frame(frame) + else: + await self.push_frame(frame, direction) + + if context: + await self._process_context(context) + +class OpenPipeLLMService(BaseOpenPipeLLMService): + + def __init__(self, model="gpt-4o", cli_id=None, **kwargs): + super().__init__(model, cli_id, **kwargs) \ No newline at end of file