Add OpenAIRealtimeLLMService, AzureRealtimeLLMService (#2596)
* Add OpenAI Realtime module * Add foundational examples for OpenAI Realtime * Add deprecation warning to OpenAIRealtimeBetaLLMService * Add deprecation warning to AzureRealtimeBetaLLMService * Update Changelog
This commit is contained in:
13
CHANGELOG.md
13
CHANGELOG.md
@@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` which provide
|
||||
access to OpenAI Realtime.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Deprecated `OpenAIRealtimeBetaLLMService` and `AzureRealtimeBetaLLMService`.
|
||||
Use `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService`, respectively.
|
||||
Each service will be removed in an upcoming version, 1.0.0.
|
||||
|
||||
## [0.0.84] - 2025-09-05
|
||||
|
||||
### Added
|
||||
|
||||
228
examples/foundational/19-openai-realtime.py
Normal file
228
examples/foundational/19-openai-realtime.py
Normal file
@@ -0,0 +1,228 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
|
||||
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai_realtime import (
|
||||
InputAudioNoiseReduction,
|
||||
InputAudioTranscription,
|
||||
OpenAIRealtimeLLMService,
|
||||
SemanticTurnDetection,
|
||||
SessionProperties,
|
||||
)
|
||||
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
|
||||
# Create tools schema
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=SemanticTurnDetection(),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
noise_reduction=InputAudioNoiseReduction(type="near_field"),
|
||||
)
|
||||
),
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
playful tone.
|
||||
|
||||
If interacting in a non-English language, start by using the standard accent or dialect familiar to
|
||||
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
|
||||
even if you're asked about them.
|
||||
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
You have access to the following tools:
|
||||
- get_current_weather: Get the current weather for a given location.
|
||||
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
)
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
transcript = TranscriptProcessor()
|
||||
|
||||
# Create a standard OpenAI LLM context object using the normal messages format. The
|
||||
# OpenAIRealtimeLLMService will convert this internally to messages that the
|
||||
# openai WebSocket API can understand.
|
||||
context = OpenAILLMContext(
|
||||
[{"role": "user", "content": "Say hello!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream
|
||||
transport.output(), # Transport bot output
|
||||
transcript.assistant(), # After the transcript output, to time with the audio output
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[TranscriptionLogObserver()],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# Register event handler for transcript updates
|
||||
@transcript.event_handler("on_transcript_update")
|
||||
async def on_transcript_update(processor, frame):
|
||||
for msg in frame.messages:
|
||||
if isinstance(msg, TranscriptionMessage):
|
||||
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
|
||||
line = f"{timestamp}{msg.role}: {msg.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
221
examples/foundational/19a-azure-realtime.py
Normal file
221
examples/foundational/19a-azure-realtime.py
Normal file
@@ -0,0 +1,221 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai_realtime import (
|
||||
AzureRealtimeLLMService,
|
||||
InputAudioTranscription,
|
||||
SessionProperties,
|
||||
)
|
||||
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# Define weather function using standardized schema
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
|
||||
# Create tools schema
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(model="whisper-1"),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
# turn_detection=TurnDetection(silence_duration_ms=1000),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
)
|
||||
),
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
playful tone.
|
||||
|
||||
If interacting in a non-English language, start by using the standard accent or dialect familiar to
|
||||
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
|
||||
even if you're asked about them.
|
||||
-
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
You have access to the following tools:
|
||||
- get_current_weather: Get the current weather for a given location.
|
||||
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
llm = AzureRealtimeLLMService(
|
||||
api_key=os.getenv("AZURE_REALTIME_API_KEY"),
|
||||
base_url=os.getenv("AZURE_REALTIME_BASE_URL"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
)
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
# Create a standard OpenAI LLM context object using the normal messages format. The
|
||||
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
|
||||
# openai WebSocket API can understand.
|
||||
context = OpenAILLMContext(
|
||||
[{"role": "user", "content": "Say hello!"}],
|
||||
# [{"role": "user", "content": [{"type": "text", "text": "Say hello!"}]}],
|
||||
# [
|
||||
# {
|
||||
# "role": "user",
|
||||
# "content": [
|
||||
# {"type": "text", "text": "Say"},
|
||||
# {"type": "text", "text": "yo what's up!"},
|
||||
# ],
|
||||
# }
|
||||
# ],
|
||||
tools,
|
||||
)
|
||||
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -31,6 +31,7 @@ from pipecat.services.openai_realtime_beta import (
|
||||
SemanticTurnDetection,
|
||||
SessionProperties,
|
||||
)
|
||||
from pipecat.services.openai_realtime_beta.events import AudioConfiguration, AudioInput
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
@@ -113,14 +114,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
session_properties = SessionProperties(
|
||||
input_audio_transcription=InputAudioTranscription(),
|
||||
modalities=["text"],
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=SemanticTurnDetection(),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
input_audio_noise_reduction=InputAudioNoiseReduction(type="near_field"),
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=SemanticTurnDetection(),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
noise_reduction=InputAudioNoiseReduction(type="near_field"),
|
||||
)
|
||||
),
|
||||
output_modalities=["text"],
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
|
||||
|
||||
234
examples/foundational/19b-openai-realtime-text.py
Normal file
234
examples/foundational/19b-openai-realtime-text.py
Normal file
@@ -0,0 +1,234 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai_realtime import (
|
||||
InputAudioNoiseReduction,
|
||||
InputAudioTranscription,
|
||||
OpenAIRealtimeLLMService,
|
||||
SemanticTurnDetection,
|
||||
SessionProperties,
|
||||
)
|
||||
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
|
||||
# Create tools schema
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=SemanticTurnDetection(),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
noise_reduction=InputAudioNoiseReduction(type="near_field"),
|
||||
)
|
||||
),
|
||||
output_modalities=["text"],
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
playful tone.
|
||||
|
||||
If interacting in a non-English language, start by using the standard accent or dialect familiar to
|
||||
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
|
||||
even if you're asked about them.
|
||||
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
You have access to the following tools:
|
||||
- get_current_weather: Get the current weather for a given location.
|
||||
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
transcript = TranscriptProcessor()
|
||||
|
||||
# Create a standard OpenAI LLM context object using the normal messages format. The
|
||||
# OpenAIRealtimeLLMService will convert this internally to messages that the
|
||||
# openai WebSocket API can understand.
|
||||
context = OpenAILLMContext(
|
||||
[{"role": "user", "content": "Say hello!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream
|
||||
transport.output(), # Transport bot output
|
||||
transcript.assistant(), # After the transcript output, to time with the audio output
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# Register event handler for transcript updates
|
||||
@transcript.event_handler("on_transcript_update")
|
||||
async def on_transcript_update(processor, frame):
|
||||
for msg in frame.messages:
|
||||
if isinstance(msg, TranscriptionMessage):
|
||||
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
|
||||
line = f"{timestamp}{msg.role}: {msg.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,274 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai_realtime_beta import (
|
||||
InputAudioTranscription,
|
||||
OpenAIRealtimeBetaLLMService,
|
||||
SessionProperties,
|
||||
TurnDetection,
|
||||
)
|
||||
from pipecat.services.openai_realtime_beta.events import AudioConfiguration, AudioInput
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
BASE_FILENAME = "/tmp/pipecat_conversation_"
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def get_saved_conversation_filenames(params: FunctionCallParams):
|
||||
# Construct the full pattern including the BASE_FILENAME
|
||||
full_pattern = f"{BASE_FILENAME}*.json"
|
||||
|
||||
# Use glob to find all matching files
|
||||
matching_files = glob.glob(full_pattern)
|
||||
logger.debug(f"matching files: {matching_files}")
|
||||
|
||||
await params.result_callback({"filenames": matching_files})
|
||||
|
||||
|
||||
async def save_conversation(params: FunctionCallParams):
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
|
||||
filename = f"{BASE_FILENAME}{timestamp}.json"
|
||||
logger.debug(
|
||||
f"writing conversation to {filename}\n{json.dumps(params.context.messages, indent=4)}"
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
messages = params.context.get_messages_for_persistent_storage()
|
||||
# remove the last message, which is the instruction we just gave to save the conversation
|
||||
messages.pop()
|
||||
json.dump(messages, file, indent=2)
|
||||
await params.result_callback({"success": True})
|
||||
except Exception as e:
|
||||
await params.result_callback({"success": False, "error": str(e)})
|
||||
|
||||
|
||||
async def load_conversation(params: FunctionCallParams):
|
||||
async def _reset():
|
||||
filename = params.arguments["filename"]
|
||||
logger.debug(f"loading conversation from {filename}")
|
||||
try:
|
||||
with open(filename, "r") as file:
|
||||
params.context.set_messages(json.load(file))
|
||||
await params.llm.reset_conversation()
|
||||
await params.llm._create_response()
|
||||
except Exception as e:
|
||||
await params.result_callback({"success": False, "error": str(e)})
|
||||
|
||||
asyncio.create_task(_reset())
|
||||
|
||||
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"name": "save_conversation",
|
||||
"description": "Save the current conversatione. Use this function to persist the current conversation to external storage.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": [],
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"name": "get_saved_conversation_filenames",
|
||||
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": [],
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"name": "load_conversation",
|
||||
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"filename": {
|
||||
"type": "string",
|
||||
"description": "The filename of the conversation history to load.",
|
||||
}
|
||||
},
|
||||
"required": ["filename"],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=TurnDetection(silence_duration_ms=1000),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
)
|
||||
),
|
||||
# tools=tools,
|
||||
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
playful tone.
|
||||
|
||||
If interacting in a non-English language, start by using the standard accent or dialect familiar to
|
||||
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
|
||||
even if you're asked about them.
|
||||
-
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually.""",
|
||||
)
|
||||
|
||||
llm = OpenAIRealtimeBetaLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
)
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("save_conversation", save_conversation)
|
||||
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
|
||||
llm.register_function("load_conversation", load_conversation)
|
||||
|
||||
context = OpenAILLMContext([], tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -25,12 +25,13 @@ from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai_realtime_beta import (
|
||||
from pipecat.services.openai_realtime import (
|
||||
InputAudioTranscription,
|
||||
OpenAIRealtimeBetaLLMService,
|
||||
OpenAIRealtimeLLMService,
|
||||
SessionProperties,
|
||||
TurnDetection,
|
||||
)
|
||||
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
@@ -182,12 +183,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
session_properties = SessionProperties(
|
||||
input_audio_transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=TurnDetection(silence_duration_ms=1000),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=TurnDetection(silence_duration_ms=1000),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
)
|
||||
),
|
||||
# tools=tools,
|
||||
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
|
||||
|
||||
@@ -205,7 +210,7 @@ unless specifically asked to elaborate on a topic.
|
||||
Remember, your responses should be short. Just one or two sentences, usually.""",
|
||||
)
|
||||
|
||||
llm = OpenAIRealtimeBetaLLMService(
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
|
||||
9
src/pipecat/services/openai_realtime/__init__.py
Normal file
9
src/pipecat/services/openai_realtime/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from .azure import AzureRealtimeLLMService
|
||||
from .events import (
|
||||
InputAudioNoiseReduction,
|
||||
InputAudioTranscription,
|
||||
SemanticTurnDetection,
|
||||
SessionProperties,
|
||||
TurnDetection,
|
||||
)
|
||||
from .openai import OpenAIRealtimeLLMService
|
||||
67
src/pipecat/services/openai_realtime/azure.py
Normal file
67
src/pipecat/services/openai_realtime/azure.py
Normal file
@@ -0,0 +1,67 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Azure OpenAI Realtime LLM service implementation."""
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .openai import OpenAIRealtimeLLMService
|
||||
|
||||
try:
|
||||
from websockets.asyncio.client import connect as websocket_connect
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
|
||||
"""Azure OpenAI Realtime LLM service with Azure-specific authentication.
|
||||
|
||||
Extends the OpenAI Realtime service to work with Azure OpenAI endpoints,
|
||||
using Azure's authentication headers and endpoint format. Provides the same
|
||||
real-time audio and text communication capabilities as the base OpenAI service.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
base_url: str,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize Azure Realtime LLM service.
|
||||
|
||||
Args:
|
||||
api_key: The API key for the Azure OpenAI service.
|
||||
base_url: The full Azure WebSocket endpoint URL including api-version and deployment.
|
||||
Example: "wss://my-project.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=my-realtime-deployment"
|
||||
**kwargs: Additional arguments passed to parent OpenAIRealtimeLLMService.
|
||||
"""
|
||||
super().__init__(base_url=base_url, api_key=api_key, **kwargs)
|
||||
self.api_key = api_key
|
||||
self.base_url = base_url
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
if self._websocket:
|
||||
# Here we assume that if we have a websocket, we are connected. We
|
||||
# handle disconnections in the send/recv code paths.
|
||||
return
|
||||
|
||||
logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}")
|
||||
self._websocket = await websocket_connect(
|
||||
uri=self.base_url,
|
||||
additional_headers={
|
||||
"api-key": self.api_key,
|
||||
},
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
272
src/pipecat/services/openai_realtime/context.py
Normal file
272
src/pipecat/services/openai_realtime/context.py
Normal file
@@ -0,0 +1,272 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Realtime LLM context and aggregator implementations."""
|
||||
|
||||
import copy
|
||||
import json
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMTextFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.openai.llm import (
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
|
||||
from . import events
|
||||
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
|
||||
|
||||
|
||||
class OpenAIRealtimeLLMContext(OpenAILLMContext):
|
||||
"""OpenAI Realtime LLM context with session management and message conversion.
|
||||
|
||||
Extends the standard OpenAI LLM context to support real-time session properties,
|
||||
instruction management, and conversion between standard message formats and
|
||||
realtime conversation items.
|
||||
"""
|
||||
|
||||
def __init__(self, messages=None, tools=None, **kwargs):
|
||||
"""Initialize the OpenAIRealtimeLLMContext.
|
||||
|
||||
Args:
|
||||
messages: Initial conversation messages. Defaults to None.
|
||||
tools: Available function tools. Defaults to None.
|
||||
**kwargs: Additional arguments passed to parent OpenAILLMContext.
|
||||
"""
|
||||
super().__init__(messages=messages, tools=tools, **kwargs)
|
||||
self.__setup_local()
|
||||
|
||||
def __setup_local(self):
|
||||
self.llm_needs_settings_update = True
|
||||
self.llm_needs_initial_messages = True
|
||||
self._session_instructions = ""
|
||||
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
|
||||
"""Upgrade a standard OpenAI LLM context to a realtime context.
|
||||
|
||||
Args:
|
||||
obj: The OpenAILLMContext instance to upgrade.
|
||||
|
||||
Returns:
|
||||
The upgraded OpenAIRealtimeLLMContext instance.
|
||||
"""
|
||||
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext):
|
||||
obj.__class__ = OpenAIRealtimeLLMContext
|
||||
obj.__setup_local()
|
||||
return obj
|
||||
|
||||
# todo
|
||||
# - finish implementing all frames
|
||||
|
||||
def from_standard_message(self, message):
|
||||
"""Convert a standard message format to a realtime conversation item.
|
||||
|
||||
Args:
|
||||
message: The standard message dictionary to convert.
|
||||
|
||||
Returns:
|
||||
A ConversationItem instance for the realtime API.
|
||||
"""
|
||||
if message.get("role") == "user":
|
||||
content = message.get("content")
|
||||
if isinstance(message.get("content"), list):
|
||||
content = ""
|
||||
for c in message.get("content"):
|
||||
if c.get("type") == "text":
|
||||
content += " " + c.get("text")
|
||||
else:
|
||||
logger.error(
|
||||
f"Unhandled content type in context message: {c.get('type')} - {message}"
|
||||
)
|
||||
return events.ConversationItem(
|
||||
role="user",
|
||||
type="message",
|
||||
content=[events.ItemContent(type="input_text", text=content)],
|
||||
)
|
||||
if message.get("role") == "assistant" and message.get("tool_calls"):
|
||||
tc = message.get("tool_calls")[0]
|
||||
return events.ConversationItem(
|
||||
type="function_call",
|
||||
call_id=tc["id"],
|
||||
name=tc["function"]["name"],
|
||||
arguments=tc["function"]["arguments"],
|
||||
)
|
||||
logger.error(f"Unhandled message type in from_standard_message: {message}")
|
||||
|
||||
def get_messages_for_initializing_history(self):
|
||||
"""Get conversation items for initializing the realtime session history.
|
||||
|
||||
Converts the context's messages to a format suitable for the realtime API,
|
||||
handling system instructions and conversation history packaging.
|
||||
|
||||
Returns:
|
||||
List of conversation items for session initialization.
|
||||
"""
|
||||
# We can't load a long conversation history into the openai realtime api yet. (The API/model
|
||||
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
|
||||
# our general strategy until this is fixed is just to put everything into a first "user"
|
||||
# message as a single input.
|
||||
if not self.messages:
|
||||
return []
|
||||
|
||||
messages = copy.deepcopy(self.messages)
|
||||
|
||||
# If we have a "system" message as our first message, let's pull that out into session
|
||||
# "instructions"
|
||||
if messages[0].get("role") == "system":
|
||||
self.llm_needs_settings_update = True
|
||||
system = messages.pop(0)
|
||||
content = system.get("content")
|
||||
if isinstance(content, str):
|
||||
self._session_instructions = content
|
||||
elif isinstance(content, list):
|
||||
self._session_instructions = content[0].get("text")
|
||||
if not messages:
|
||||
return []
|
||||
|
||||
# If we have just a single "user" item, we can just send it normally
|
||||
if len(messages) == 1 and messages[0].get("role") == "user":
|
||||
return [self.from_standard_message(messages[0])]
|
||||
|
||||
# Otherwise, let's pack everything into a single "user" message with a bit of
|
||||
# explanation for the LLM
|
||||
intro_text = """
|
||||
This is a previously saved conversation. Please treat this conversation history as a
|
||||
starting point for the current conversation."""
|
||||
|
||||
trailing_text = """
|
||||
This is the end of the previously saved conversation. Please continue the conversation
|
||||
from here. If the last message is a user instruction or question, act on that instruction
|
||||
or answer the question. If the last message is an assistant response, simple say that you
|
||||
are ready to continue the conversation."""
|
||||
|
||||
return [
|
||||
{
|
||||
"role": "user",
|
||||
"type": "message",
|
||||
"content": [
|
||||
{
|
||||
"type": "input_text",
|
||||
"text": "\n\n".join(
|
||||
[intro_text, json.dumps(messages, indent=2), trailing_text]
|
||||
),
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
def add_user_content_item_as_message(self, item):
|
||||
"""Add a user content item as a standard message to the context.
|
||||
|
||||
Args:
|
||||
item: The conversation item to add as a user message.
|
||||
"""
|
||||
message = {
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": item.content[0].transcript}],
|
||||
}
|
||||
self.add_message(message)
|
||||
|
||||
|
||||
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
|
||||
"""User context aggregator for OpenAI Realtime API.
|
||||
|
||||
Handles user input frames and generates appropriate context updates
|
||||
for the realtime conversation, including message updates and tool settings.
|
||||
|
||||
Args:
|
||||
context: The OpenAI realtime LLM context.
|
||||
**kwargs: Additional arguments passed to parent aggregator.
|
||||
"""
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
|
||||
):
|
||||
"""Process incoming frames and handle realtime-specific frame types.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction of frame flow in the pipeline.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
# Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline,
|
||||
# messages are only processed by the user context aggregator, which is generally what we want. But
|
||||
# we also need to send new messages over the websocket, so the openai realtime API has them
|
||||
# in its context.
|
||||
if isinstance(frame, LLMMessagesUpdateFrame):
|
||||
await self.push_frame(RealtimeMessagesUpdateFrame(context=self._context))
|
||||
|
||||
# Parent also doesn't push the LLMSetToolsFrame.
|
||||
if isinstance(frame, LLMSetToolsFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def push_aggregation(self):
|
||||
"""Push user input aggregation.
|
||||
|
||||
Currently ignores all user input coming into the pipeline as realtime
|
||||
audio input is handled directly by the service.
|
||||
"""
|
||||
# for the moment, ignore all user input coming into the pipeline.
|
||||
# todo: think about whether/how to fix this to allow for text input from
|
||||
# upstream (transport/transcription, or other sources)
|
||||
pass
|
||||
|
||||
|
||||
class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
"""Assistant context aggregator for OpenAI Realtime API.
|
||||
|
||||
Handles assistant output frames from the realtime service, filtering
|
||||
out duplicate text frames and managing function call results.
|
||||
|
||||
Args:
|
||||
context: The OpenAI realtime LLM context.
|
||||
**kwargs: Additional arguments passed to parent aggregator.
|
||||
"""
|
||||
|
||||
# The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output,
|
||||
# but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We
|
||||
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
|
||||
# are process. This ensures that the context gets only one set of messages.
|
||||
# OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames,
|
||||
# so we need to ignore pushing those as well, as they're also TextFrames.
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process assistant frames, filtering out duplicate text content.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction of frame flow in the pipeline.
|
||||
"""
|
||||
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
|
||||
"""Handle function call result and notify the realtime service.
|
||||
|
||||
Args:
|
||||
frame: The function call result frame to handle.
|
||||
"""
|
||||
await super().handle_function_call_result(frame)
|
||||
|
||||
# The standard function callback code path pushes the FunctionCallResultFrame from the llm itself,
|
||||
# so we didn't have a chance to add the result to the openai realtime api context. Let's push a
|
||||
# special frame to do that.
|
||||
await self.push_frame(
|
||||
RealtimeFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
|
||||
)
|
||||
1106
src/pipecat/services/openai_realtime/events.py
Normal file
1106
src/pipecat/services/openai_realtime/events.py
Normal file
File diff suppressed because it is too large
Load Diff
37
src/pipecat/services/openai_realtime/frames.py
Normal file
37
src/pipecat/services/openai_realtime/frames.py
Normal file
@@ -0,0 +1,37 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Custom frame types for OpenAI Realtime API integration."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipecat.services.openai_realtime_beta.context import OpenAIRealtimeLLMContext
|
||||
|
||||
|
||||
@dataclass
|
||||
class RealtimeMessagesUpdateFrame(DataFrame):
|
||||
"""Frame indicating that the realtime context messages have been updated.
|
||||
|
||||
Parameters:
|
||||
context: The updated OpenAI realtime LLM context.
|
||||
"""
|
||||
|
||||
context: "OpenAIRealtimeLLMContext"
|
||||
|
||||
|
||||
@dataclass
|
||||
class RealtimeFunctionCallResultFrame(DataFrame):
|
||||
"""Frame containing function call results for the realtime service.
|
||||
|
||||
Parameters:
|
||||
result_frame: The function call result frame to send to the realtime API.
|
||||
"""
|
||||
|
||||
result_frame: FunctionCallResultFrame
|
||||
831
src/pipecat/services/openai_realtime/openai.py
Normal file
831
src/pipecat/services/openai_realtime/openai.py
Normal file
@@ -0,0 +1,831 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Realtime LLM service implementation with WebSocket support."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMContextFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TranscriptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSTextFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
|
||||
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
|
||||
|
||||
from . import events
|
||||
from .context import (
|
||||
OpenAIRealtimeAssistantContextAggregator,
|
||||
OpenAIRealtimeLLMContext,
|
||||
OpenAIRealtimeUserContextAggregator,
|
||||
)
|
||||
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
|
||||
|
||||
try:
|
||||
from websockets.asyncio.client import connect as websocket_connect
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class CurrentAudioResponse:
|
||||
"""Tracks the current audio response from the assistant.
|
||||
|
||||
Parameters:
|
||||
item_id: Unique identifier for the audio response item.
|
||||
content_index: Index of the audio content within the item.
|
||||
start_time_ms: Timestamp when the audio response started in milliseconds.
|
||||
total_size: Total size of audio data received in bytes. Defaults to 0.
|
||||
"""
|
||||
|
||||
item_id: str
|
||||
content_index: int
|
||||
start_time_ms: int
|
||||
total_size: int = 0
|
||||
|
||||
|
||||
class OpenAIRealtimeLLMService(LLMService):
|
||||
"""OpenAI Realtime LLM service providing real-time audio and text communication.
|
||||
|
||||
Implements the OpenAI Realtime API with WebSocket communication for low-latency
|
||||
bidirectional audio and text interactions. Supports function calling, conversation
|
||||
management, and real-time transcription.
|
||||
"""
|
||||
|
||||
# Overriding the default adapter to use the OpenAIRealtimeLLMAdapter one.
|
||||
adapter_class = OpenAIRealtimeLLMAdapter
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "gpt-realtime",
|
||||
base_url: str = "wss://api.openai.com/v1/realtime",
|
||||
session_properties: Optional[events.SessionProperties] = None,
|
||||
start_audio_paused: bool = False,
|
||||
send_transcription_frames: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the OpenAI Realtime LLM service.
|
||||
|
||||
Args:
|
||||
api_key: OpenAI API key for authentication.
|
||||
model: OpenAI model name. Defaults to "gpt-4o-realtime-preview-2025-06-03".
|
||||
base_url: WebSocket base URL for the realtime API.
|
||||
Defaults to "wss://api.openai.com/v1/realtime".
|
||||
session_properties: Configuration properties for the realtime session.
|
||||
If None, uses default SessionProperties.
|
||||
start_audio_paused: Whether to start with audio input paused. Defaults to False.
|
||||
send_transcription_frames: Whether to emit transcription frames. Defaults to True.
|
||||
**kwargs: Additional arguments passed to parent LLMService.
|
||||
"""
|
||||
full_url = f"{base_url}?model={model}"
|
||||
super().__init__(base_url=full_url, **kwargs)
|
||||
|
||||
self.api_key = api_key
|
||||
self.base_url = full_url
|
||||
self.set_model_name(model)
|
||||
|
||||
self._session_properties: events.SessionProperties = (
|
||||
session_properties or events.SessionProperties()
|
||||
)
|
||||
self._audio_input_paused = start_audio_paused
|
||||
self._send_transcription_frames = send_transcription_frames
|
||||
self._websocket = None
|
||||
self._receive_task = None
|
||||
self._context = None
|
||||
|
||||
self._disconnecting = False
|
||||
self._api_session_ready = False
|
||||
self._run_llm_when_api_session_ready = False
|
||||
|
||||
self._current_assistant_response = None
|
||||
self._current_audio_response = None
|
||||
|
||||
self._messages_added_manually = {}
|
||||
self._user_and_response_message_tuple = None
|
||||
self._pending_function_calls = {} # Track function calls by call_id
|
||||
|
||||
self._register_event_handler("on_conversation_item_created")
|
||||
self._register_event_handler("on_conversation_item_updated")
|
||||
self._retrieve_conversation_item_futures = {}
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if the service can generate usage metrics.
|
||||
|
||||
Returns:
|
||||
True if metrics generation is supported.
|
||||
"""
|
||||
return True
|
||||
|
||||
def set_audio_input_paused(self, paused: bool):
|
||||
"""Set whether audio input is paused.
|
||||
|
||||
Args:
|
||||
paused: True to pause audio input, False to resume.
|
||||
"""
|
||||
self._audio_input_paused = paused
|
||||
|
||||
def _is_modality_enabled(self, modality: str) -> bool:
|
||||
"""Check if a specific modality is enabled, "text" or "audio"."""
|
||||
modalities = self._session_properties.output_modalities or ["audio", "text"]
|
||||
return modality in modalities
|
||||
|
||||
def _get_enabled_modalities(self) -> list[str]:
|
||||
"""Get the list of enabled modalities."""
|
||||
modalities = self._session_properties.output_modalities or ["audio", "text"]
|
||||
# API only supports single modality responses: either ["text"] or ["audio"]
|
||||
if "audio" in modalities:
|
||||
return ["audio"]
|
||||
elif "text" in modalities:
|
||||
return ["text"]
|
||||
|
||||
async def retrieve_conversation_item(self, item_id: str):
|
||||
"""Retrieve a conversation item by ID from the server.
|
||||
|
||||
Args:
|
||||
item_id: The ID of the conversation item to retrieve.
|
||||
|
||||
Returns:
|
||||
The retrieved conversation item.
|
||||
"""
|
||||
future = self.get_event_loop().create_future()
|
||||
retrieval_in_flight = False
|
||||
if not self._retrieve_conversation_item_futures.get(item_id):
|
||||
self._retrieve_conversation_item_futures[item_id] = []
|
||||
else:
|
||||
retrieval_in_flight = True
|
||||
self._retrieve_conversation_item_futures[item_id].append(future)
|
||||
if not retrieval_in_flight:
|
||||
await self.send_client_event(
|
||||
# Set event_id to "rci_{item_id}" so that we can identify an
|
||||
# error later if the retrieval fails. We don't need a UUID
|
||||
# suffix to the event_id because we're ensuring only one
|
||||
# in-flight retrieval per item_id. (Note: "rci" = "retrieve
|
||||
# conversation item")
|
||||
events.ConversationItemRetrieveEvent(item_id=item_id, event_id=f"rci_{item_id}")
|
||||
)
|
||||
return await future
|
||||
|
||||
#
|
||||
# standard AIService frame handling
|
||||
#
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the service and establish WebSocket connection.
|
||||
|
||||
Args:
|
||||
frame: The start frame triggering service initialization.
|
||||
"""
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the service and close WebSocket connection.
|
||||
|
||||
Args:
|
||||
frame: The end frame triggering service shutdown.
|
||||
"""
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the service and close WebSocket connection.
|
||||
|
||||
Args:
|
||||
frame: The cancel frame triggering service cancellation.
|
||||
"""
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
#
|
||||
# speech and interruption handling
|
||||
#
|
||||
|
||||
async def _handle_interruption(self):
|
||||
# None and False are different. Check for False. None means we're using OpenAI's
|
||||
# built-in turn detection defaults.
|
||||
turn_detection_disabled = (
|
||||
self._session_properties.audio
|
||||
and self._session_properties.audio.input
|
||||
and self._session_properties.audio.input.turn_detection is False
|
||||
)
|
||||
if turn_detection_disabled:
|
||||
await self.send_client_event(events.InputAudioBufferClearEvent())
|
||||
await self.send_client_event(events.ResponseCancelEvent())
|
||||
await self._truncate_current_audio_response()
|
||||
await self.stop_all_metrics()
|
||||
if self._current_assistant_response:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
# Only push TTSStoppedFrame if audio modality is enabled
|
||||
if self._is_modality_enabled("audio"):
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
async def _handle_user_started_speaking(self, frame):
|
||||
pass
|
||||
|
||||
async def _handle_user_stopped_speaking(self, frame):
|
||||
# None and False are different. Check for False. None means we're using OpenAI's
|
||||
# built-in turn detection defaults.
|
||||
turn_detection_disabled = (
|
||||
self._session_properties.audio
|
||||
and self._session_properties.audio.input
|
||||
and self._session_properties.audio.input.turn_detection is False
|
||||
)
|
||||
if turn_detection_disabled:
|
||||
await self.send_client_event(events.InputAudioBufferCommitEvent())
|
||||
await self.send_client_event(events.ResponseCreateEvent())
|
||||
|
||||
async def _handle_bot_stopped_speaking(self):
|
||||
self._current_audio_response = None
|
||||
|
||||
def _calculate_audio_duration_ms(
|
||||
self, total_bytes: int, sample_rate: int = 24000, bytes_per_sample: int = 2
|
||||
) -> int:
|
||||
"""Calculate audio duration in milliseconds based on PCM audio parameters."""
|
||||
samples = total_bytes / bytes_per_sample
|
||||
duration_seconds = samples / sample_rate
|
||||
return int(duration_seconds * 1000)
|
||||
|
||||
async def _truncate_current_audio_response(self):
|
||||
"""Truncates the current audio response at the appropriate duration.
|
||||
|
||||
Calculates the actual duration of the audio content and truncates at the shorter of
|
||||
either the wall clock time or the actual audio duration to prevent invalid truncation
|
||||
requests.
|
||||
"""
|
||||
if not self._current_audio_response:
|
||||
return
|
||||
|
||||
# if the bot is still speaking, truncate the last message
|
||||
try:
|
||||
current = self._current_audio_response
|
||||
self._current_audio_response = None
|
||||
|
||||
# Calculate actual audio duration instead of using wall clock time
|
||||
audio_duration_ms = self._calculate_audio_duration_ms(current.total_size)
|
||||
|
||||
# Use the shorter of wall clock time or actual audio duration
|
||||
elapsed_ms = int(time.time() * 1000 - current.start_time_ms)
|
||||
truncate_ms = min(elapsed_ms, audio_duration_ms)
|
||||
|
||||
logger.trace(
|
||||
f"Truncating audio: duration={audio_duration_ms}ms, "
|
||||
f"elapsed={elapsed_ms}ms, truncate={truncate_ms}ms"
|
||||
)
|
||||
|
||||
await self.send_client_event(
|
||||
events.ConversationItemTruncateEvent(
|
||||
item_id=current.item_id,
|
||||
content_index=current.content_index,
|
||||
audio_end_ms=truncate_ms,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
# Log warning and don't re-raise - allow session to continue
|
||||
logger.warning(f"Audio truncation failed (non-fatal): {e}")
|
||||
|
||||
#
|
||||
# frame processing
|
||||
#
|
||||
# StartFrame, StopFrame, CancelFrame implemented in base class
|
||||
#
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames from the pipeline.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction of frame flow in the pipeline.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
pass
|
||||
elif isinstance(frame, OpenAILLMContextFrame):
|
||||
context: OpenAIRealtimeLLMContext = OpenAIRealtimeLLMContext.upgrade_to_realtime(
|
||||
frame.context
|
||||
)
|
||||
if not self._context:
|
||||
self._context = context
|
||||
elif frame.context is not self._context:
|
||||
# If the context has changed, reset the conversation
|
||||
self._context = context
|
||||
await self.reset_conversation()
|
||||
# Run the LLM at next opportunity
|
||||
await self._create_response()
|
||||
elif isinstance(frame, LLMContextFrame):
|
||||
raise NotImplementedError(
|
||||
"Universal LLMContext is not yet supported for OpenAI Realtime."
|
||||
)
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
if not self._audio_input_paused:
|
||||
await self._send_user_audio(frame)
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self._handle_interruption()
|
||||
elif isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self._handle_user_started_speaking(frame)
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
await self._handle_user_stopped_speaking(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self._handle_bot_stopped_speaking()
|
||||
elif isinstance(frame, LLMMessagesAppendFrame):
|
||||
await self._handle_messages_append(frame)
|
||||
elif isinstance(frame, RealtimeMessagesUpdateFrame):
|
||||
self._context = frame.context
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
self._session_properties = events.SessionProperties(**frame.settings)
|
||||
await self._update_settings()
|
||||
elif isinstance(frame, LLMSetToolsFrame):
|
||||
await self._update_settings()
|
||||
elif isinstance(frame, RealtimeFunctionCallResultFrame):
|
||||
await self._handle_function_call_result(frame.result_frame)
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _handle_messages_append(self, frame):
|
||||
logger.error("!!! NEED TO IMPLEMENT MESSAGES APPEND")
|
||||
|
||||
async def _handle_function_call_result(self, frame):
|
||||
item = events.ConversationItem(
|
||||
type="function_call_output",
|
||||
call_id=frame.tool_call_id,
|
||||
output=json.dumps(frame.result),
|
||||
)
|
||||
await self.send_client_event(events.ConversationItemCreateEvent(item=item))
|
||||
|
||||
#
|
||||
# websocket communication
|
||||
#
|
||||
|
||||
async def send_client_event(self, event: events.ClientEvent):
|
||||
"""Send a client event to the OpenAI Realtime API.
|
||||
|
||||
Args:
|
||||
event: The client event to send.
|
||||
"""
|
||||
await self._ws_send(event.model_dump(exclude_none=True))
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
if self._websocket:
|
||||
# Here we assume that if we have a websocket, we are connected. We
|
||||
# handle disconnections in the send/recv code paths.
|
||||
return
|
||||
self._websocket = await websocket_connect(
|
||||
uri=self.base_url,
|
||||
additional_headers={
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
},
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
|
||||
async def _disconnect(self):
|
||||
try:
|
||||
self._disconnecting = True
|
||||
self._api_session_ready = False
|
||||
await self.stop_all_metrics()
|
||||
if self._websocket:
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task, timeout=1.0)
|
||||
self._receive_task = None
|
||||
self._disconnecting = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error disconnecting: {e}")
|
||||
|
||||
async def _ws_send(self, realtime_message):
|
||||
try:
|
||||
if self._websocket:
|
||||
await self._websocket.send(json.dumps(realtime_message))
|
||||
except Exception as e:
|
||||
if self._disconnecting:
|
||||
return
|
||||
logger.error(f"Error sending message to websocket: {e}")
|
||||
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
|
||||
# it is to recover from a send-side error with proper state management, and that exponential
|
||||
# backoff for retries can have cost/stability implications for a service cluster, let's just
|
||||
# treat a send-side error as fatal.
|
||||
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True))
|
||||
|
||||
async def _update_settings(self):
|
||||
settings = self._session_properties
|
||||
# tools given in the context override the tools in the session properties
|
||||
if self._context and self._context.tools:
|
||||
settings.tools = self._context.tools
|
||||
# instructions in the context come from an initial "system" message in the
|
||||
# messages list, and override instructions in the session properties
|
||||
if self._context and self._context._session_instructions:
|
||||
settings.instructions = self._context._session_instructions
|
||||
await self.send_client_event(events.SessionUpdateEvent(session=settings))
|
||||
|
||||
#
|
||||
# inbound server event handling
|
||||
# https://platform.openai.com/docs/api-reference/realtime-server-events
|
||||
#
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
async for message in self._websocket:
|
||||
evt = events.parse_server_event(message)
|
||||
if evt.type == "session.created":
|
||||
await self._handle_evt_session_created(evt)
|
||||
elif evt.type == "session.updated":
|
||||
await self._handle_evt_session_updated(evt)
|
||||
elif evt.type == "response.output_audio.delta":
|
||||
await self._handle_evt_audio_delta(evt)
|
||||
elif evt.type == "response.output_audio.done":
|
||||
await self._handle_evt_audio_done(evt)
|
||||
elif evt.type == "conversation.item.added":
|
||||
await self._handle_evt_conversation_item_added(evt)
|
||||
elif evt.type == "conversation.item.done":
|
||||
await self._handle_evt_conversation_item_done(evt)
|
||||
elif evt.type == "conversation.item.input_audio_transcription.delta":
|
||||
await self._handle_evt_input_audio_transcription_delta(evt)
|
||||
elif evt.type == "conversation.item.input_audio_transcription.completed":
|
||||
await self.handle_evt_input_audio_transcription_completed(evt)
|
||||
elif evt.type == "conversation.item.retrieved":
|
||||
await self._handle_conversation_item_retrieved(evt)
|
||||
elif evt.type == "response.done":
|
||||
await self._handle_evt_response_done(evt)
|
||||
elif evt.type == "input_audio_buffer.speech_started":
|
||||
await self._handle_evt_speech_started(evt)
|
||||
elif evt.type == "input_audio_buffer.speech_stopped":
|
||||
await self._handle_evt_speech_stopped(evt)
|
||||
elif evt.type == "response.output_text.delta":
|
||||
await self._handle_evt_text_delta(evt)
|
||||
elif evt.type == "response.output_audio_transcript.delta":
|
||||
await self._handle_evt_audio_transcript_delta(evt)
|
||||
elif evt.type == "response.function_call_arguments.done":
|
||||
await self._handle_evt_function_call_arguments_done(evt)
|
||||
elif evt.type == "error":
|
||||
if not await self._maybe_handle_evt_retrieve_conversation_item_error(evt):
|
||||
await self._handle_evt_error(evt)
|
||||
# errors are fatal, so exit the receive loop
|
||||
return
|
||||
|
||||
@traced_openai_realtime(operation="llm_setup")
|
||||
async def _handle_evt_session_created(self, evt):
|
||||
# session.created is received right after connecting. Send a message
|
||||
# to configure the session properties.
|
||||
await self._update_settings()
|
||||
|
||||
async def _handle_evt_session_updated(self, evt):
|
||||
# If this is our first context frame, run the LLM
|
||||
self._api_session_ready = True
|
||||
# Now that we've configured the session, we can run the LLM if we need to.
|
||||
if self._run_llm_when_api_session_ready:
|
||||
self._run_llm_when_api_session_ready = False
|
||||
await self._create_response()
|
||||
|
||||
async def _handle_evt_audio_delta(self, evt):
|
||||
# note: ttfb is faster by 1/2 RTT than ttfb as measured for other services, since we're getting
|
||||
# this event from the server
|
||||
await self.stop_ttfb_metrics()
|
||||
if not self._current_audio_response:
|
||||
self._current_audio_response = CurrentAudioResponse(
|
||||
item_id=evt.item_id,
|
||||
content_index=evt.content_index,
|
||||
start_time_ms=int(time.time() * 1000),
|
||||
)
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
audio = base64.b64decode(evt.delta)
|
||||
self._current_audio_response.total_size += len(audio)
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=audio,
|
||||
sample_rate=24000,
|
||||
num_channels=1,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_evt_audio_done(self, evt):
|
||||
if self._current_audio_response:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
# Don't clear the self._current_audio_response here. We need to wait until we
|
||||
# receive a BotStoppedSpeakingFrame from the output transport.
|
||||
|
||||
async def _handle_evt_conversation_item_added(self, evt):
|
||||
"""Handle conversation.item.added event - item is added but may still be processing."""
|
||||
if evt.item.type == "function_call":
|
||||
# Track this function call for when arguments are completed
|
||||
# Only add if not already tracked (prevent duplicates)
|
||||
if evt.item.call_id not in self._pending_function_calls:
|
||||
self._pending_function_calls[evt.item.call_id] = evt.item
|
||||
else:
|
||||
logger.warning(f"Function call {evt.item.call_id} already tracked, skipping")
|
||||
|
||||
await self._call_event_handler("on_conversation_item_created", evt.item.id, evt.item)
|
||||
|
||||
# This will get sent from the server every time a new "message" is added
|
||||
# to the server's conversation state, whether we create it via the API
|
||||
# or the server creates it from LLM output.
|
||||
if self._messages_added_manually.get(evt.item.id):
|
||||
del self._messages_added_manually[evt.item.id]
|
||||
return
|
||||
|
||||
if evt.item.role == "user":
|
||||
# We need to wait for completion of both user message and response message. Then we'll
|
||||
# add both to the context. User message is complete when we have a "transcript" field
|
||||
# that is not None. Response message is complete when we get a "response.done" event.
|
||||
self._user_and_response_message_tuple = (evt.item, {"done": False, "output": []})
|
||||
elif evt.item.role == "assistant":
|
||||
self._current_assistant_response = evt.item
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
async def _handle_evt_conversation_item_done(self, evt):
|
||||
"""Handle conversation.item.done event - item is fully completed."""
|
||||
await self._call_event_handler("on_conversation_item_updated", evt.item.id, evt.item)
|
||||
# The item is now fully processed and ready
|
||||
# For now, no additional logic needed beyond the event handler call
|
||||
|
||||
async def _handle_evt_input_audio_transcription_delta(self, evt):
|
||||
if self._send_transcription_frames:
|
||||
await self.push_frame(
|
||||
# no way to get a language code?
|
||||
InterimTranscriptionFrame(evt.delta, "", time_now_iso8601(), result=evt)
|
||||
)
|
||||
|
||||
@traced_stt
|
||||
async def _handle_user_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
async def handle_evt_input_audio_transcription_completed(self, evt):
|
||||
"""Handle completion of input audio transcription.
|
||||
|
||||
Args:
|
||||
evt: The transcription completed event.
|
||||
"""
|
||||
await self._call_event_handler("on_conversation_item_updated", evt.item_id, None)
|
||||
|
||||
if self._send_transcription_frames:
|
||||
await self.push_frame(
|
||||
# no way to get a language code?
|
||||
TranscriptionFrame(evt.transcript, "", time_now_iso8601(), result=evt)
|
||||
)
|
||||
await self._handle_user_transcription(evt.transcript, True, Language.EN)
|
||||
pair = self._user_and_response_message_tuple
|
||||
if pair:
|
||||
user, assistant = pair
|
||||
user.content[0].transcript = evt.transcript
|
||||
if assistant["done"]:
|
||||
self._user_and_response_message_tuple = None
|
||||
self._context.add_user_content_item_as_message(user)
|
||||
else:
|
||||
# User message without preceding conversation.item.created. Bug?
|
||||
logger.warning(f"Transcript for unknown user message: {evt}")
|
||||
|
||||
async def _handle_conversation_item_retrieved(self, evt: events.ConversationItemRetrieved):
|
||||
futures = self._retrieve_conversation_item_futures.pop(evt.item.id, None)
|
||||
if futures:
|
||||
for future in futures:
|
||||
future.set_result(evt.item)
|
||||
|
||||
@traced_openai_realtime(operation="llm_response")
|
||||
async def _handle_evt_response_done(self, evt):
|
||||
# todo: figure out whether there's anything we need to do for "cancelled" events
|
||||
# usage metrics
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=evt.response.usage.input_tokens,
|
||||
completion_tokens=evt.response.usage.output_tokens,
|
||||
total_tokens=evt.response.usage.total_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._current_assistant_response = None
|
||||
# error handling
|
||||
if evt.response.status == "failed":
|
||||
await self.push_error(
|
||||
ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True)
|
||||
)
|
||||
return
|
||||
# response content
|
||||
for item in evt.response.output:
|
||||
await self._call_event_handler("on_conversation_item_updated", item.id, item)
|
||||
pair = self._user_and_response_message_tuple
|
||||
if pair:
|
||||
user, assistant = pair
|
||||
assistant["done"] = True
|
||||
assistant["output"] = evt.response.output
|
||||
if user.content[0].transcript is not None:
|
||||
self._user_and_response_message_tuple = None
|
||||
self._context.add_user_content_item_as_message(user)
|
||||
else:
|
||||
# Response message without preceding user message (standalone response)
|
||||
# Function calls in this response were already processed immediately when arguments were complete
|
||||
logger.debug(f"Handling standalone response: {evt.response.id}")
|
||||
|
||||
async def _handle_evt_text_delta(self, evt):
|
||||
if evt.delta:
|
||||
await self.push_frame(LLMTextFrame(evt.delta))
|
||||
|
||||
async def _handle_evt_audio_transcript_delta(self, evt):
|
||||
if evt.delta:
|
||||
await self.push_frame(LLMTextFrame(evt.delta))
|
||||
await self.push_frame(TTSTextFrame(evt.delta))
|
||||
|
||||
async def _handle_evt_function_call_arguments_done(self, evt):
|
||||
"""Handle completion of function call arguments.
|
||||
|
||||
Args:
|
||||
evt: The response.function_call_arguments.done event.
|
||||
"""
|
||||
# Process the function call immediately when arguments are complete
|
||||
# This is needed because function calls might not trigger response.done
|
||||
try:
|
||||
# Parse the arguments
|
||||
args = json.loads(evt.arguments)
|
||||
|
||||
# Get the function call item we tracked earlier
|
||||
function_call_item = self._pending_function_calls.get(evt.call_id)
|
||||
if function_call_item:
|
||||
# Remove from pending calls FIRST to prevent duplicate processing
|
||||
del self._pending_function_calls[evt.call_id]
|
||||
|
||||
# Create the function call and process it
|
||||
function_calls = [
|
||||
FunctionCallFromLLM(
|
||||
context=self._context,
|
||||
tool_call_id=evt.call_id,
|
||||
function_name=function_call_item.name,
|
||||
arguments=args,
|
||||
)
|
||||
]
|
||||
|
||||
await self.run_function_calls(function_calls)
|
||||
logger.debug(f"Processed function call: {function_call_item.name}")
|
||||
else:
|
||||
logger.warning(f"No tracked function call found for call_id: {evt.call_id}")
|
||||
logger.warning(
|
||||
f"Available pending calls: {list(self._pending_function_calls.keys())}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process function call arguments: {e}")
|
||||
|
||||
async def _handle_evt_speech_started(self, evt):
|
||||
await self._truncate_current_audio_response()
|
||||
await self._start_interruption() # cancels this processor task
|
||||
await self.push_frame(StartInterruptionFrame()) # cancels downstream tasks
|
||||
await self.push_frame(UserStartedSpeakingFrame())
|
||||
|
||||
async def _handle_evt_speech_stopped(self, evt):
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
await self._stop_interruption()
|
||||
await self.push_frame(UserStoppedSpeakingFrame())
|
||||
|
||||
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
|
||||
"""Maybe handle an error event related to retrieving a conversation item.
|
||||
|
||||
If the given error event is an error retrieving a conversation item:
|
||||
|
||||
- set an exception on the future that retrieve_conversation_item() is waiting on
|
||||
- return true
|
||||
Otherwise:
|
||||
- return false
|
||||
"""
|
||||
if evt.error.code == "item_retrieve_invalid_item_id":
|
||||
item_id = evt.error.event_id.split("_", 1)[1] # event_id is of the form "rci_{item_id}"
|
||||
futures = self._retrieve_conversation_item_futures.pop(item_id, None)
|
||||
if futures:
|
||||
for future in futures:
|
||||
future.set_exception(Exception(evt.error.message))
|
||||
return True
|
||||
return False
|
||||
|
||||
async def _handle_evt_error(self, evt):
|
||||
# Errors are fatal to this connection. Send an ErrorFrame.
|
||||
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
|
||||
|
||||
#
|
||||
# state and client events for the current conversation
|
||||
# https://platform.openai.com/docs/api-reference/realtime-client-events
|
||||
#
|
||||
|
||||
async def reset_conversation(self):
|
||||
"""Reset the conversation by disconnecting and reconnecting.
|
||||
|
||||
This is the safest way to start a new conversation. Note that this will
|
||||
fail if called from the receive task.
|
||||
"""
|
||||
logger.debug("Resetting conversation")
|
||||
await self._disconnect()
|
||||
if self._context:
|
||||
self._context.llm_needs_settings_update = True
|
||||
self._context.llm_needs_initial_messages = True
|
||||
await self._connect()
|
||||
|
||||
@traced_openai_realtime(operation="llm_request")
|
||||
async def _create_response(self):
|
||||
if not self._api_session_ready:
|
||||
self._run_llm_when_api_session_ready = True
|
||||
return
|
||||
|
||||
if self._context.llm_needs_initial_messages:
|
||||
messages = self._context.get_messages_for_initializing_history()
|
||||
for item in messages:
|
||||
evt = events.ConversationItemCreateEvent(item=item)
|
||||
self._messages_added_manually[evt.item.id] = True
|
||||
await self.send_client_event(evt)
|
||||
self._context.llm_needs_initial_messages = False
|
||||
|
||||
if self._context.llm_needs_settings_update:
|
||||
await self._update_settings()
|
||||
self._context.llm_needs_settings_update = False
|
||||
|
||||
logger.debug(f"Creating response: {self._context.get_messages_for_logging()}")
|
||||
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
await self.send_client_event(
|
||||
events.ResponseCreateEvent(
|
||||
response=events.ResponseProperties(output_modalities=self._get_enabled_modalities())
|
||||
)
|
||||
)
|
||||
|
||||
async def _send_user_audio(self, frame):
|
||||
payload = base64.b64encode(frame.audio).decode("utf-8")
|
||||
await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload))
|
||||
|
||||
def create_context_aggregator(
|
||||
self,
|
||||
context: OpenAILLMContext,
|
||||
*,
|
||||
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
|
||||
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
|
||||
) -> OpenAIContextAggregatorPair:
|
||||
"""Create an instance of OpenAIContextAggregatorPair from an OpenAILLMContext.
|
||||
|
||||
Constructor keyword arguments for both the user and assistant aggregators can be provided.
|
||||
|
||||
Args:
|
||||
context: The LLM context.
|
||||
user_params: User aggregator parameters.
|
||||
assistant_params: Assistant aggregator parameters.
|
||||
|
||||
Returns:
|
||||
OpenAIContextAggregatorPair: A pair of context aggregators, one for
|
||||
the user and one for the assistant, encapsulated in an
|
||||
OpenAIContextAggregatorPair.
|
||||
"""
|
||||
context.set_llm_adapter(self.get_llm_adapter())
|
||||
|
||||
OpenAIRealtimeLLMContext.upgrade_to_realtime(context)
|
||||
user = OpenAIRealtimeUserContextAggregator(context, params=user_params)
|
||||
|
||||
assistant_params.expect_stripped_words = False
|
||||
assistant = OpenAIRealtimeAssistantContextAggregator(context, params=assistant_params)
|
||||
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
@@ -6,6 +6,8 @@
|
||||
|
||||
"""Azure OpenAI Realtime Beta LLM service implementation."""
|
||||
|
||||
import warnings
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .openai import OpenAIRealtimeBetaLLMService
|
||||
@@ -23,6 +25,10 @@ except ModuleNotFoundError as e:
|
||||
class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
|
||||
"""Azure OpenAI Realtime Beta LLM service with Azure-specific authentication.
|
||||
|
||||
.. deprecated:: 0.0.84
|
||||
`AzureRealtimeBetaLLMService` is deprecated, use `AzureRealtimeLLMService` instead.
|
||||
This class will be removed in version 1.0.0.
|
||||
|
||||
Extends the OpenAI Realtime service to work with Azure OpenAI endpoints,
|
||||
using Azure's authentication headers and endpoint format. Provides the same
|
||||
real-time audio and text communication capabilities as the base OpenAI service.
|
||||
@@ -44,6 +50,16 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
|
||||
**kwargs: Additional arguments passed to parent OpenAIRealtimeBetaLLMService.
|
||||
"""
|
||||
super().__init__(base_url=base_url, api_key=api_key, **kwargs)
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"AzureRealtimeBetaLLMService is deprecated and will be removed in version 1.0.0. "
|
||||
"Use AzureRealtimeLLMService instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
self.api_key = api_key
|
||||
self.base_url = base_url
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
import base64
|
||||
import json
|
||||
import time
|
||||
import warnings
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
@@ -92,6 +93,10 @@ class CurrentAudioResponse:
|
||||
class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
"""OpenAI Realtime Beta LLM service providing real-time audio and text communication.
|
||||
|
||||
.. deprecated:: 0.0.84
|
||||
`OpenAIRealtimeBetaLLMService` is deprecated, use `OpenAIRealtimeLLMService` instead.
|
||||
This class will be removed in version 1.0.0.
|
||||
|
||||
Implements the OpenAI Realtime API Beta with WebSocket communication for low-latency
|
||||
bidirectional audio and text interactions. Supports function calling, conversation
|
||||
management, and real-time transcription.
|
||||
@@ -124,6 +129,15 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
send_transcription_frames: Whether to emit transcription frames. Defaults to True.
|
||||
**kwargs: Additional arguments passed to parent LLMService.
|
||||
"""
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"OpenAIRealtimeBetaLLMService is deprecated and will be removed in version 1.0.0. "
|
||||
"Use OpenAIRealtimeLLMService instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
full_url = f"{base_url}?model={model}"
|
||||
super().__init__(base_url=full_url, **kwargs)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user