major functionality working (not configurable, occasional timing bugs maybe)

This commit is contained in:
Kwindla Hultman Kramer
2024-10-04 15:01:57 -07:00
parent c32c65014b
commit 09a3c2a82d
4 changed files with 42 additions and 20 deletions

View File

@@ -11,7 +11,7 @@ import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -115,13 +115,21 @@ async def main():
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await tts.say("Hi! Ask me about the weather in San Francisco.")
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()

View File

@@ -14,7 +14,6 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.services.openai_realtime_beta import (
OpenAILLMServiceRealtimeBeta,
@@ -140,7 +139,7 @@ Remember, your responses should be short. Just one or two sentences, usually.
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([OpenAILLMContextFrame(context=context)])
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()

View File

@@ -470,7 +470,7 @@ class OpenAIUserContextAggregator(LLMUserContextAggregator):
if frame.user_id in self._context._user_image_request_context:
del self._context._user_image_request_context[frame.user_id]
elif isinstance(frame, UserImageRawFrame):
# Push a new AnthropicImageMessageFrame with the text context we cached
# Push a new OpenAIImageMessageFrame with the text context we cached
# downstream to be handled by our assistant context aggregator. This is
# necessary so that we add the message to the context in the right order.
text = self._context._user_image_request_context.get(frame.user_id) or ""

View File

@@ -21,6 +21,8 @@ from pipecat.frames.frames import (
TextFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.frame_processor import FrameDirection
@@ -84,9 +86,6 @@ class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator):
async def _push_aggregation(self):
await super()._push_aggregation()
logger.debug(
f"!!! AFTER ASSISTANT PUSH AGGREGATION {self._context.get_messages_for_logging()}"
)
class OpenAIInputTranscription(BaseModel):
@@ -147,6 +146,11 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
self._session_properties = session_properties
self._context = None
self._bot_speaking = False
def can_generate_metrics(self) -> bool:
return True
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
@@ -161,9 +165,8 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
async def _ws_send(self, realtime_message):
try:
if realtime_message.get("type") != "input_audio_buffer.append":
logger.debug(f"!!! Sending message to websocket: {realtime_message}")
# if realtime_message.get("type") != "input_audio_buffer.append":
# logger.debug(f"!!! Sending message to websocket: {realtime_message}")
await self._websocket.send(json.dumps(realtime_message))
except Exception as e:
logger.error(f"Error sending message to websocket: {e}")
@@ -228,7 +231,8 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
pass
elif msg["type"] == "input_audio_buffer.speech_stopped":
# user stopped speaking
pass
await self.start_processing_metrics()
await self.start_ttfb_metrics()
elif msg["type"] == "conversation.item.created":
# for input, this will get sent from the server whether the
# conversation item is created by audio transcription or by
@@ -237,7 +241,12 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
# logger.debug(f"Received {msg}")
pass
elif msg["type"] == "response.created":
# could use for processing metrics
# todo: 1. figure out TTS started/stopped frame semantics better
# 2. do not push these frames in text-only mode
logger.debug(f"Received response created: {msg}")
if not self._bot_speaking:
self._bot_speaking = True
await self.push_frame(TTSStartedFrame())
pass
elif msg["type"] == "conversation.item.input_audio_transcription.completed":
# or here maybe (possible send upstream to user context aggregator)
@@ -252,8 +261,11 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
pass
elif msg["type"] == "response.audio_transcript.delta":
# openai playground app uses this, not "text"
if msg["delta"]:
await self.push_frame(TextFrame(msg["delta"]))
pass
elif msg["type"] == "response.audio.delta":
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["delta"]),
sample_rate=24000,
@@ -261,7 +273,9 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
)
await self.push_frame(frame)
elif msg["type"] == "response.audio.done":
# bot stopped speaking - or do that at the end of the response?
if self._bot_speaking:
self._bot_speaking = False
await self.push_frame(TTSStoppedFrame())
pass
elif msg["type"] == "response.audio_transcript.done":
# probably ignore for now
@@ -275,11 +289,11 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
for item in item["content"]:
# output text
if item["type"] == "audio" and item["transcript"] is not None:
logger.debug(f"!!! >{item['transcript']}")
await self.push_frame(TextFrame(item["transcript"]))
# could send full transcript here instead of streaming chunks
# logger.debug(f"!!! >{item['transcript']}")
pass
elif msg["type"] == "response.done":
# logger.debug(f"Received response done: {msg}")
await self.stop_processing_metrics()
# usage metrics
# example.
# response.usage.total_tokens:592
@@ -290,13 +304,14 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
# response.usage.input_token_details.audio_tokens:115
# response.usage.output_token_details.text_tokens:32
# response.usage.output_token_details.audio_tokens:135
logger.debug("!!! Response done PPUSHING METRICS")
tokens = LLMTokenUsage(
prompt_tokens=msg["response"]["usage"]["input_tokens"],
completion_tokens=msg["response"]["usage"]["output_tokens"],
total_tokens=msg["response"]["usage"]["total_tokens"],
)
await self.start_llm_usage_metrics(tokens)
# question for mrkb: don't seem to be getting processing time on the console except the first inference
await self.stop_processing_metrics()
# function calls
items = msg["response"]["output"]
function_calls = [item for item in items if item.get("type") == "function_call"]
@@ -398,9 +413,9 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
)
async def _handle_interruption(self, frame):
logger.debug("!!! Handling interruption")
await self.stop_all_metrics()
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(TTSStoppedFrame())
# todo: do this but only when there's a response in progress?
# await self._ws_send(
# {