From a53738281697af4972a7fe81cce255aa1f003e5d Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sun, 7 Sep 2025 06:09:57 -0700 Subject: [PATCH] Add OpenAIRealtimeLLMService, AzureRealtimeLLMService (#2596) * Add OpenAI Realtime module * Add foundational examples for OpenAI Realtime * Add deprecation warning to OpenAIRealtimeBetaLLMService * Add deprecation warning to AzureRealtimeBetaLLMService * Update Changelog --- CHANGELOG.md | 13 + examples/foundational/19-openai-realtime.py | 228 ++++ examples/foundational/19a-azure-realtime.py | 221 ++++ .../19b-openai-realtime-beta-text.py | 21 +- .../foundational/19b-openai-realtime-text.py | 234 ++++ ...persistent-context-openai-realtime-beta.py | 274 ++++ .../20b-persistent-context-openai-realtime.py | 23 +- .../services/openai_realtime/__init__.py | 9 + src/pipecat/services/openai_realtime/azure.py | 67 + .../services/openai_realtime/context.py | 272 ++++ .../services/openai_realtime/events.py | 1106 +++++++++++++++++ .../services/openai_realtime/frames.py | 37 + .../services/openai_realtime/openai.py | 831 +++++++++++++ .../services/openai_realtime_beta/azure.py | 16 + .../services/openai_realtime_beta/openai.py | 14 + 15 files changed, 3349 insertions(+), 17 deletions(-) create mode 100644 examples/foundational/19-openai-realtime.py create mode 100644 examples/foundational/19a-azure-realtime.py create mode 100644 examples/foundational/19b-openai-realtime-text.py create mode 100644 examples/foundational/20b-persistent-context-openai-realtime-beta.py create mode 100644 src/pipecat/services/openai_realtime/__init__.py create mode 100644 src/pipecat/services/openai_realtime/azure.py create mode 100644 src/pipecat/services/openai_realtime/context.py create mode 100644 src/pipecat/services/openai_realtime/events.py create mode 100644 src/pipecat/services/openai_realtime/frames.py create mode 100644 src/pipecat/services/openai_realtime/openai.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b76cf08f7..46d49b7e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` which provide + access to OpenAI Realtime. + +### Deprecated + +- Deprecated `OpenAIRealtimeBetaLLMService` and `AzureRealtimeBetaLLMService`. + Use `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService`, respectively. + Each service will be removed in an upcoming version, 1.0.0. + ## [0.0.84] - 2025-09-05 ### Added diff --git a/examples/foundational/19-openai-realtime.py b/examples/foundational/19-openai-realtime.py new file mode 100644 index 000000000..0f9309f3d --- /dev/null +++ b/examples/foundational/19-openai-realtime.py @@ -0,0 +1,228 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os +from datetime import datetime + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage +from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai_realtime import ( + InputAudioNoiseReduction, + InputAudioTranscription, + OpenAIRealtimeLLMService, + SemanticTurnDetection, + SessionProperties, +) +from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +async def fetch_restaurant_recommendation(params: FunctionCallParams): + await params.result_callback({"name": "The Golden Dragon"}) + + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + required=["location", "format"], +) + +restaurant_function = FunctionSchema( + name="get_restaurant_recommendation", + description="Get a restaurant recommendation", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + }, + required=["location"], +) + +# Create tools schema +tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + session_properties = SessionProperties( + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(), + # Set openai TurnDetection parameters. Not setting this at all will turn it + # on by default + turn_detection=SemanticTurnDetection(), + # Or set to False to disable openai turn detection and use transport VAD + # turn_detection=False, + noise_reduction=InputAudioNoiseReduction(type="near_field"), + ) + ), + # tools=tools, + instructions="""You are a helpful and friendly AI. + +Act like a human, but remember that you aren't a human and that you can't do human +things in the real world. Your voice and personality should be warm and engaging, with a lively and +playful tone. + +If interacting in a non-English language, start by using the standard accent or dialect familiar to +the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, +even if you're asked about them. + +You are participating in a voice conversation. Keep your responses concise, short, and to the point +unless specifically asked to elaborate on a topic. + +You have access to the following tools: +- get_current_weather: Get the current weather for a given location. +- get_restaurant_recommendation: Get a restaurant recommendation for a given location. + +Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""", + ) + + llm = OpenAIRealtimeLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + session_properties=session_properties, + start_audio_paused=False, + ) + + # you can either register a single function for all function calls, or specific functions + # llm.register_function(None, fetch_weather_from_api) + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) + + transcript = TranscriptProcessor() + + # Create a standard OpenAI LLM context object using the normal messages format. The + # OpenAIRealtimeLLMService will convert this internally to messages that the + # openai WebSocket API can understand. + context = OpenAILLMContext( + [{"role": "user", "content": "Say hello!"}], + tools, + ) + + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), + llm, # LLM + transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream + transport.output(), # Transport bot output + transcript.assistant(), # After the transcript output, to time with the audio output + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + observers=[TranscriptionLogObserver()], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + for msg in frame.messages: + if isinstance(msg, TranscriptionMessage): + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + line = f"{timestamp}{msg.role}: {msg.content}" + logger.info(f"Transcript: {line}") + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/19a-azure-realtime.py b/examples/foundational/19a-azure-realtime.py new file mode 100644 index 000000000..a39826b81 --- /dev/null +++ b/examples/foundational/19a-azure-realtime.py @@ -0,0 +1,221 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os +from datetime import datetime + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai_realtime import ( + AzureRealtimeLLMService, + InputAudioTranscription, + SessionProperties, +) +from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +async def fetch_restaurant_recommendation(params: FunctionCallParams): + await params.result_callback({"name": "The Golden Dragon"}) + + +# Define weather function using standardized schema +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + required=["location", "format"], +) + +restaurant_function = FunctionSchema( + name="get_restaurant_recommendation", + description="Get a restaurant recommendation", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + }, + required=["location"], +) + +# Create tools schema +tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + session_properties = SessionProperties( + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(model="whisper-1"), + # Set openai TurnDetection parameters. Not setting this at all will turn it + # on by default + # turn_detection=TurnDetection(silence_duration_ms=1000), + # Or set to False to disable openai turn detection and use transport VAD + # turn_detection=False, + ) + ), + # tools=tools, + instructions="""You are a helpful and friendly AI. + +Act like a human, but remember that you aren't a human and that you can't do human +things in the real world. Your voice and personality should be warm and engaging, with a lively and +playful tone. + +If interacting in a non-English language, start by using the standard accent or dialect familiar to +the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, +even if you're asked about them. +- +You are participating in a voice conversation. Keep your responses concise, short, and to the point +unless specifically asked to elaborate on a topic. + +You have access to the following tools: +- get_current_weather: Get the current weather for a given location. +- get_restaurant_recommendation: Get a restaurant recommendation for a given location. + +Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""", + ) + + llm = AzureRealtimeLLMService( + api_key=os.getenv("AZURE_REALTIME_API_KEY"), + base_url=os.getenv("AZURE_REALTIME_BASE_URL"), + session_properties=session_properties, + start_audio_paused=False, + ) + + # you can either register a single function for all function calls, or specific functions + # llm.register_function(None, fetch_weather_from_api) + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) + + # Create a standard OpenAI LLM context object using the normal messages format. The + # OpenAIRealtimeBetaLLMService will convert this internally to messages that the + # openai WebSocket API can understand. + context = OpenAILLMContext( + [{"role": "user", "content": "Say hello!"}], + # [{"role": "user", "content": [{"type": "text", "text": "Say hello!"}]}], + # [ + # { + # "role": "user", + # "content": [ + # {"type": "text", "text": "Say"}, + # {"type": "text", "text": "yo what's up!"}, + # ], + # } + # ], + tools, + ) + + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), + llm, # LLM + transport.output(), # Transport bot output + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/19b-openai-realtime-beta-text.py b/examples/foundational/19b-openai-realtime-beta-text.py index 63a699ae9..6595f188b 100644 --- a/examples/foundational/19b-openai-realtime-beta-text.py +++ b/examples/foundational/19b-openai-realtime-beta-text.py @@ -31,6 +31,7 @@ from pipecat.services.openai_realtime_beta import ( SemanticTurnDetection, SessionProperties, ) +from pipecat.services.openai_realtime_beta.events import AudioConfiguration, AudioInput from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -113,14 +114,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") session_properties = SessionProperties( - input_audio_transcription=InputAudioTranscription(), - modalities=["text"], - # Set openai TurnDetection parameters. Not setting this at all will turn it - # on by default - turn_detection=SemanticTurnDetection(), - # Or set to False to disable openai turn detection and use transport VAD - # turn_detection=False, - input_audio_noise_reduction=InputAudioNoiseReduction(type="near_field"), + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(), + # Set openai TurnDetection parameters. Not setting this at all will turn it + # on by default + turn_detection=SemanticTurnDetection(), + # Or set to False to disable openai turn detection and use transport VAD + # turn_detection=False, + noise_reduction=InputAudioNoiseReduction(type="near_field"), + ) + ), + output_modalities=["text"], # tools=tools, instructions="""You are a helpful and friendly AI. diff --git a/examples/foundational/19b-openai-realtime-text.py b/examples/foundational/19b-openai-realtime-text.py new file mode 100644 index 000000000..b5d1d73e1 --- /dev/null +++ b/examples/foundational/19b-openai-realtime-text.py @@ -0,0 +1,234 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os +from datetime import datetime + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai_realtime import ( + InputAudioNoiseReduction, + InputAudioTranscription, + OpenAIRealtimeLLMService, + SemanticTurnDetection, + SessionProperties, +) +from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +async def fetch_restaurant_recommendation(params: FunctionCallParams): + await params.result_callback({"name": "The Golden Dragon"}) + + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + required=["location", "format"], +) + +restaurant_function = FunctionSchema( + name="get_restaurant_recommendation", + description="Get a restaurant recommendation", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + }, + required=["location"], +) + +# Create tools schema +tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + session_properties = SessionProperties( + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(), + # Set openai TurnDetection parameters. Not setting this at all will turn it + # on by default + turn_detection=SemanticTurnDetection(), + # Or set to False to disable openai turn detection and use transport VAD + # turn_detection=False, + noise_reduction=InputAudioNoiseReduction(type="near_field"), + ) + ), + output_modalities=["text"], + # tools=tools, + instructions="""You are a helpful and friendly AI. + +Act like a human, but remember that you aren't a human and that you can't do human +things in the real world. Your voice and personality should be warm and engaging, with a lively and +playful tone. + +If interacting in a non-English language, start by using the standard accent or dialect familiar to +the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, +even if you're asked about them. + +You are participating in a voice conversation. Keep your responses concise, short, and to the point +unless specifically asked to elaborate on a topic. + +You have access to the following tools: +- get_current_weather: Get the current weather for a given location. +- get_restaurant_recommendation: Get a restaurant recommendation for a given location. + +Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""", + ) + + llm = OpenAIRealtimeLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + session_properties=session_properties, + start_audio_paused=False, + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + # you can either register a single function for all function calls, or specific functions + # llm.register_function(None, fetch_weather_from_api) + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) + + transcript = TranscriptProcessor() + + # Create a standard OpenAI LLM context object using the normal messages format. The + # OpenAIRealtimeLLMService will convert this internally to messages that the + # openai WebSocket API can understand. + context = OpenAILLMContext( + [{"role": "user", "content": "Say hello!"}], + tools, + ) + + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), + llm, # LLM + tts, # TTS + transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream + transport.output(), # Transport bot output + transcript.assistant(), # After the transcript output, to time with the audio output + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + for msg in frame.messages: + if isinstance(msg, TranscriptionMessage): + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + line = f"{timestamp}{msg.role}: {msg.content}" + logger.info(f"Transcript: {line}") + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/20b-persistent-context-openai-realtime-beta.py b/examples/foundational/20b-persistent-context-openai-realtime-beta.py new file mode 100644 index 000000000..5748257a1 --- /dev/null +++ b/examples/foundational/20b-persistent-context-openai-realtime-beta.py @@ -0,0 +1,274 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import glob +import json +import os +from datetime import datetime + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai_realtime_beta import ( + InputAudioTranscription, + OpenAIRealtimeBetaLLMService, + SessionProperties, + TurnDetection, +) +from pipecat.services.openai_realtime_beta.events import AudioConfiguration, AudioInput +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + +BASE_FILENAME = "/tmp/pipecat_conversation_" + + +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +async def get_saved_conversation_filenames(params: FunctionCallParams): + # Construct the full pattern including the BASE_FILENAME + full_pattern = f"{BASE_FILENAME}*.json" + + # Use glob to find all matching files + matching_files = glob.glob(full_pattern) + logger.debug(f"matching files: {matching_files}") + + await params.result_callback({"filenames": matching_files}) + + +async def save_conversation(params: FunctionCallParams): + timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + filename = f"{BASE_FILENAME}{timestamp}.json" + logger.debug( + f"writing conversation to {filename}\n{json.dumps(params.context.messages, indent=4)}" + ) + try: + with open(filename, "w") as file: + messages = params.context.get_messages_for_persistent_storage() + # remove the last message, which is the instruction we just gave to save the conversation + messages.pop() + json.dump(messages, file, indent=2) + await params.result_callback({"success": True}) + except Exception as e: + await params.result_callback({"success": False, "error": str(e)}) + + +async def load_conversation(params: FunctionCallParams): + async def _reset(): + filename = params.arguments["filename"] + logger.debug(f"loading conversation from {filename}") + try: + with open(filename, "r") as file: + params.context.set_messages(json.load(file)) + await params.llm.reset_conversation() + await params.llm._create_response() + except Exception as e: + await params.result_callback({"success": False, "error": str(e)}) + + asyncio.create_task(_reset()) + + +tools = [ + { + "type": "function", + "name": "get_current_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + "required": ["location", "format"], + }, + }, + { + "type": "function", + "name": "save_conversation", + "description": "Save the current conversatione. Use this function to persist the current conversation to external storage.", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + { + "type": "function", + "name": "get_saved_conversation_filenames", + "description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + { + "type": "function", + "name": "load_conversation", + "description": "Load a conversation history. Use this function to load a conversation history into the current session.", + "parameters": { + "type": "object", + "properties": { + "filename": { + "type": "string", + "description": "The filename of the conversation history to load.", + } + }, + "required": ["filename"], + }, + }, +] + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + session_properties = SessionProperties( + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(), + # Set openai TurnDetection parameters. Not setting this at all will turn it + # on by default + turn_detection=TurnDetection(silence_duration_ms=1000), + # Or set to False to disable openai turn detection and use transport VAD + # turn_detection=False, + ) + ), + # tools=tools, + instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI. + +Act like a human, but remember that you aren't a human and that you can't do human +things in the real world. Your voice and personality should be warm and engaging, with a lively and +playful tone. + +If interacting in a non-English language, start by using the standard accent or dialect familiar to +the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, +even if you're asked about them. +- +You are participating in a voice conversation. Keep your responses concise, short, and to the point +unless specifically asked to elaborate on a topic. + +Remember, your responses should be short. Just one or two sentences, usually.""", + ) + + llm = OpenAIRealtimeBetaLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + session_properties=session_properties, + start_audio_paused=False, + ) + + # you can either register a single function for all function calls, or specific functions + # llm.register_function(None, fetch_weather_from_api) + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("save_conversation", save_conversation) + llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames) + llm.register_function("load_conversation", load_conversation) + + context = OpenAILLMContext([], tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), + llm, # LLM + transport.output(), # Transport bot output + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/20b-persistent-context-openai-realtime.py b/examples/foundational/20b-persistent-context-openai-realtime.py index 7aa75e4aa..52c7d8f49 100644 --- a/examples/foundational/20b-persistent-context-openai-realtime.py +++ b/examples/foundational/20b-persistent-context-openai-realtime.py @@ -25,12 +25,13 @@ from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.llm_service import FunctionCallParams -from pipecat.services.openai_realtime_beta import ( +from pipecat.services.openai_realtime import ( InputAudioTranscription, - OpenAIRealtimeBetaLLMService, + OpenAIRealtimeLLMService, SessionProperties, TurnDetection, ) +from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -182,12 +183,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) session_properties = SessionProperties( - input_audio_transcription=InputAudioTranscription(), - # Set openai TurnDetection parameters. Not setting this at all will turn it - # on by default - turn_detection=TurnDetection(silence_duration_ms=1000), - # Or set to False to disable openai turn detection and use transport VAD - # turn_detection=False, + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(), + # Set openai TurnDetection parameters. Not setting this at all will turn it + # on by default + turn_detection=TurnDetection(silence_duration_ms=1000), + # Or set to False to disable openai turn detection and use transport VAD + # turn_detection=False, + ) + ), # tools=tools, instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI. @@ -205,7 +210,7 @@ unless specifically asked to elaborate on a topic. Remember, your responses should be short. Just one or two sentences, usually.""", ) - llm = OpenAIRealtimeBetaLLMService( + llm = OpenAIRealtimeLLMService( api_key=os.getenv("OPENAI_API_KEY"), session_properties=session_properties, start_audio_paused=False, diff --git a/src/pipecat/services/openai_realtime/__init__.py b/src/pipecat/services/openai_realtime/__init__.py new file mode 100644 index 000000000..6f3154f1c --- /dev/null +++ b/src/pipecat/services/openai_realtime/__init__.py @@ -0,0 +1,9 @@ +from .azure import AzureRealtimeLLMService +from .events import ( + InputAudioNoiseReduction, + InputAudioTranscription, + SemanticTurnDetection, + SessionProperties, + TurnDetection, +) +from .openai import OpenAIRealtimeLLMService diff --git a/src/pipecat/services/openai_realtime/azure.py b/src/pipecat/services/openai_realtime/azure.py new file mode 100644 index 000000000..aed53b94c --- /dev/null +++ b/src/pipecat/services/openai_realtime/azure.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Azure OpenAI Realtime LLM service implementation.""" + +from loguru import logger + +from .openai import OpenAIRealtimeLLMService + +try: + from websockets.asyncio.client import connect as websocket_connect +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable." + ) + raise Exception(f"Missing module: {e}") + + +class AzureRealtimeLLMService(OpenAIRealtimeLLMService): + """Azure OpenAI Realtime LLM service with Azure-specific authentication. + + Extends the OpenAI Realtime service to work with Azure OpenAI endpoints, + using Azure's authentication headers and endpoint format. Provides the same + real-time audio and text communication capabilities as the base OpenAI service. + """ + + def __init__( + self, + *, + api_key: str, + base_url: str, + **kwargs, + ): + """Initialize Azure Realtime LLM service. + + Args: + api_key: The API key for the Azure OpenAI service. + base_url: The full Azure WebSocket endpoint URL including api-version and deployment. + Example: "wss://my-project.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=my-realtime-deployment" + **kwargs: Additional arguments passed to parent OpenAIRealtimeLLMService. + """ + super().__init__(base_url=base_url, api_key=api_key, **kwargs) + self.api_key = api_key + self.base_url = base_url + + async def _connect(self): + try: + if self._websocket: + # Here we assume that if we have a websocket, we are connected. We + # handle disconnections in the send/recv code paths. + return + + logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}") + self._websocket = await websocket_connect( + uri=self.base_url, + additional_headers={ + "api-key": self.api_key, + }, + ) + self._receive_task = self.create_task(self._receive_task_handler()) + except Exception as e: + logger.error(f"{self} initialization error: {e}") + self._websocket = None diff --git a/src/pipecat/services/openai_realtime/context.py b/src/pipecat/services/openai_realtime/context.py new file mode 100644 index 000000000..cb1c0a9f5 --- /dev/null +++ b/src/pipecat/services/openai_realtime/context.py @@ -0,0 +1,272 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""OpenAI Realtime LLM context and aggregator implementations.""" + +import copy +import json + +from loguru import logger + +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + InterimTranscriptionFrame, + LLMMessagesUpdateFrame, + LLMSetToolsFrame, + LLMTextFrame, + TranscriptionFrame, +) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.openai.llm import ( + OpenAIAssistantContextAggregator, + OpenAIUserContextAggregator, +) + +from . import events +from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame + + +class OpenAIRealtimeLLMContext(OpenAILLMContext): + """OpenAI Realtime LLM context with session management and message conversion. + + Extends the standard OpenAI LLM context to support real-time session properties, + instruction management, and conversion between standard message formats and + realtime conversation items. + """ + + def __init__(self, messages=None, tools=None, **kwargs): + """Initialize the OpenAIRealtimeLLMContext. + + Args: + messages: Initial conversation messages. Defaults to None. + tools: Available function tools. Defaults to None. + **kwargs: Additional arguments passed to parent OpenAILLMContext. + """ + super().__init__(messages=messages, tools=tools, **kwargs) + self.__setup_local() + + def __setup_local(self): + self.llm_needs_settings_update = True + self.llm_needs_initial_messages = True + self._session_instructions = "" + + return + + @staticmethod + def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext": + """Upgrade a standard OpenAI LLM context to a realtime context. + + Args: + obj: The OpenAILLMContext instance to upgrade. + + Returns: + The upgraded OpenAIRealtimeLLMContext instance. + """ + if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext): + obj.__class__ = OpenAIRealtimeLLMContext + obj.__setup_local() + return obj + + # todo + # - finish implementing all frames + + def from_standard_message(self, message): + """Convert a standard message format to a realtime conversation item. + + Args: + message: The standard message dictionary to convert. + + Returns: + A ConversationItem instance for the realtime API. + """ + if message.get("role") == "user": + content = message.get("content") + if isinstance(message.get("content"), list): + content = "" + for c in message.get("content"): + if c.get("type") == "text": + content += " " + c.get("text") + else: + logger.error( + f"Unhandled content type in context message: {c.get('type')} - {message}" + ) + return events.ConversationItem( + role="user", + type="message", + content=[events.ItemContent(type="input_text", text=content)], + ) + if message.get("role") == "assistant" and message.get("tool_calls"): + tc = message.get("tool_calls")[0] + return events.ConversationItem( + type="function_call", + call_id=tc["id"], + name=tc["function"]["name"], + arguments=tc["function"]["arguments"], + ) + logger.error(f"Unhandled message type in from_standard_message: {message}") + + def get_messages_for_initializing_history(self): + """Get conversation items for initializing the realtime session history. + + Converts the context's messages to a format suitable for the realtime API, + handling system instructions and conversation history packaging. + + Returns: + List of conversation items for session initialization. + """ + # We can't load a long conversation history into the openai realtime api yet. (The API/model + # forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So + # our general strategy until this is fixed is just to put everything into a first "user" + # message as a single input. + if not self.messages: + return [] + + messages = copy.deepcopy(self.messages) + + # If we have a "system" message as our first message, let's pull that out into session + # "instructions" + if messages[0].get("role") == "system": + self.llm_needs_settings_update = True + system = messages.pop(0) + content = system.get("content") + if isinstance(content, str): + self._session_instructions = content + elif isinstance(content, list): + self._session_instructions = content[0].get("text") + if not messages: + return [] + + # If we have just a single "user" item, we can just send it normally + if len(messages) == 1 and messages[0].get("role") == "user": + return [self.from_standard_message(messages[0])] + + # Otherwise, let's pack everything into a single "user" message with a bit of + # explanation for the LLM + intro_text = """ + This is a previously saved conversation. Please treat this conversation history as a + starting point for the current conversation.""" + + trailing_text = """ + This is the end of the previously saved conversation. Please continue the conversation + from here. If the last message is a user instruction or question, act on that instruction + or answer the question. If the last message is an assistant response, simple say that you + are ready to continue the conversation.""" + + return [ + { + "role": "user", + "type": "message", + "content": [ + { + "type": "input_text", + "text": "\n\n".join( + [intro_text, json.dumps(messages, indent=2), trailing_text] + ), + } + ], + } + ] + + def add_user_content_item_as_message(self, item): + """Add a user content item as a standard message to the context. + + Args: + item: The conversation item to add as a user message. + """ + message = { + "role": "user", + "content": [{"type": "text", "text": item.content[0].transcript}], + } + self.add_message(message) + + +class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator): + """User context aggregator for OpenAI Realtime API. + + Handles user input frames and generates appropriate context updates + for the realtime conversation, including message updates and tool settings. + + Args: + context: The OpenAI realtime LLM context. + **kwargs: Additional arguments passed to parent aggregator. + """ + + async def process_frame( + self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM + ): + """Process incoming frames and handle realtime-specific frame types. + + Args: + frame: The frame to process. + direction: The direction of frame flow in the pipeline. + """ + await super().process_frame(frame, direction) + # Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline, + # messages are only processed by the user context aggregator, which is generally what we want. But + # we also need to send new messages over the websocket, so the openai realtime API has them + # in its context. + if isinstance(frame, LLMMessagesUpdateFrame): + await self.push_frame(RealtimeMessagesUpdateFrame(context=self._context)) + + # Parent also doesn't push the LLMSetToolsFrame. + if isinstance(frame, LLMSetToolsFrame): + await self.push_frame(frame, direction) + + async def push_aggregation(self): + """Push user input aggregation. + + Currently ignores all user input coming into the pipeline as realtime + audio input is handled directly by the service. + """ + # for the moment, ignore all user input coming into the pipeline. + # todo: think about whether/how to fix this to allow for text input from + # upstream (transport/transcription, or other sources) + pass + + +class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator): + """Assistant context aggregator for OpenAI Realtime API. + + Handles assistant output frames from the realtime service, filtering + out duplicate text frames and managing function call results. + + Args: + context: The OpenAI realtime LLM context. + **kwargs: Additional arguments passed to parent aggregator. + """ + + # The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output, + # but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We + # need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames + # are process. This ensures that the context gets only one set of messages. + # OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames, + # so we need to ignore pushing those as well, as they're also TextFrames. + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process assistant frames, filtering out duplicate text content. + + Args: + frame: The frame to process. + direction: The direction of frame flow in the pipeline. + """ + if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)): + await super().process_frame(frame, direction) + + async def handle_function_call_result(self, frame: FunctionCallResultFrame): + """Handle function call result and notify the realtime service. + + Args: + frame: The function call result frame to handle. + """ + await super().handle_function_call_result(frame) + + # The standard function callback code path pushes the FunctionCallResultFrame from the llm itself, + # so we didn't have a chance to add the result to the openai realtime api context. Let's push a + # special frame to do that. + await self.push_frame( + RealtimeFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM + ) diff --git a/src/pipecat/services/openai_realtime/events.py b/src/pipecat/services/openai_realtime/events.py new file mode 100644 index 000000000..200e59d68 --- /dev/null +++ b/src/pipecat/services/openai_realtime/events.py @@ -0,0 +1,1106 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Event models and data structures for OpenAI Realtime API communication.""" + +import json +import uuid +from typing import Any, Dict, List, Literal, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field + +# +# session properties +# + + +class AudioFormat(BaseModel): + """Base class for audio format configuration.""" + + type: str + + +class PCMAudioFormat(AudioFormat): + """PCM audio format configuration. + + Parameters: + type: Audio format type, always "audio/pcm". + rate: Sample rate, always 24000 for PCM. + """ + + type: Literal["audio/pcm"] = "audio/pcm" + rate: Literal[24000] = 24000 + + +class PCMUAudioFormat(AudioFormat): + """PCMU (G.711 μ-law) audio format configuration. + + Parameters: + type: Audio format type, always "audio/pcmu". + """ + + type: Literal["audio/pcmu"] = "audio/pcmu" + + +class PCMAAudioFormat(AudioFormat): + """PCMA (G.711 A-law) audio format configuration. + + Parameters: + type: Audio format type, always "audio/pcma". + """ + + type: Literal["audio/pcma"] = "audio/pcma" + + +class InputAudioTranscription(BaseModel): + """Configuration for audio transcription settings.""" + + model: str = "gpt-4o-transcribe" + language: Optional[str] + prompt: Optional[str] + + def __init__( + self, + model: Optional[str] = "gpt-4o-transcribe", + language: Optional[str] = None, + prompt: Optional[str] = None, + ): + """Initialize InputAudioTranscription. + + Args: + model: Transcription model to use (e.g., "gpt-4o-transcribe", "whisper-1"). + language: Optional language code for transcription. + prompt: Optional transcription hint text. + """ + super().__init__(model=model, language=language, prompt=prompt) + + +class TurnDetection(BaseModel): + """Server-side voice activity detection configuration. + + Parameters: + type: Detection type, must be "server_vad". + threshold: Voice activity detection threshold (0.0-1.0). Defaults to 0.5. + prefix_padding_ms: Padding before speech starts in milliseconds. Defaults to 300. + silence_duration_ms: Silence duration to detect speech end in milliseconds. Defaults to 500. + """ + + type: Optional[Literal["server_vad"]] = "server_vad" + threshold: Optional[float] = 0.5 + prefix_padding_ms: Optional[int] = 300 + silence_duration_ms: Optional[int] = 500 + + +class SemanticTurnDetection(BaseModel): + """Semantic-based turn detection configuration. + + Parameters: + type: Detection type, must be "semantic_vad". + eagerness: Turn detection eagerness level. Can be "low", "medium", "high", or "auto". + create_response: Whether to automatically create responses on turn detection. + interrupt_response: Whether to interrupt ongoing responses on turn detection. + """ + + type: Optional[Literal["semantic_vad"]] = "semantic_vad" + eagerness: Optional[Literal["low", "medium", "high", "auto"]] = None + create_response: Optional[bool] = None + interrupt_response: Optional[bool] = None + + +class InputAudioNoiseReduction(BaseModel): + """Input audio noise reduction configuration. + + Parameters: + type: Noise reduction type for different microphone scenarios. + """ + + type: Optional[Literal["near_field", "far_field"]] + + +class AudioInput(BaseModel): + """Audio input configuration. + + Parameters: + format: The format of the input audio. + transcription: Configuration for input audio transcription. + noise_reduction: Configuration for input audio noise reduction. + turn_detection: Configuration for turn detection, or False to disable. + """ + + format: Optional[Union[PCMAudioFormat, PCMUAudioFormat, PCMAAudioFormat]] = None + transcription: Optional[InputAudioTranscription] = None + noise_reduction: Optional[InputAudioNoiseReduction] = None + turn_detection: Optional[Union[TurnDetection, SemanticTurnDetection, bool]] = None + + +class AudioOutput(BaseModel): + """Audio output configuration. + + Parameters: + format: The format of the output audio. + voice: The voice the model uses to respond. + speed: The speed of the model's spoken response. + """ + + format: Optional[Union[PCMAudioFormat, PCMUAudioFormat, PCMAAudioFormat]] = None + voice: Optional[str] = None + speed: Optional[float] = None + + +class AudioConfiguration(BaseModel): + """Audio configuration for input and output. + + Parameters: + input: Configuration for input audio. + output: Configuration for output audio. + """ + + input: Optional[AudioInput] = None + output: Optional[AudioOutput] = None + + +class SessionProperties(BaseModel): + """Configuration properties for an OpenAI Realtime session. + + Parameters: + type: The type of session, always "realtime". + object: Object type identifier, always "realtime.session". + id: Unique identifier for the session. + model: The Realtime model used for this session. + output_modalities: The set of modalities the model can respond with. + instructions: System instructions for the assistant. + audio: Configuration for input and output audio. + tools: Available function tools for the assistant. + tool_choice: Tool usage strategy ("auto", "none", or "required"). + max_output_tokens: Maximum tokens in response or "inf" for unlimited. + tracing: Configuration options for tracing. + prompt: Reference to a prompt template and its variables. + expires_at: Session expiration timestamp. + include: Additional fields to include in server outputs. + """ + + type: Optional[Literal["realtime"]] = "realtime" + object: Optional[Literal["realtime.session"]] = None + id: Optional[str] = None + model: Optional[str] = None + output_modalities: Optional[List[Literal["text", "audio"]]] = None + instructions: Optional[str] = None + audio: Optional[AudioConfiguration] = None + tools: Optional[List[Dict]] = None + tool_choice: Optional[Literal["auto", "none", "required"]] = None + max_output_tokens: Optional[Union[int, Literal["inf"]]] = None + tracing: Optional[Union[Literal["auto"], Dict]] = None + prompt: Optional[Dict] = None + expires_at: Optional[int] = None + include: Optional[List[str]] = None + + +# +# context +# + + +class ItemContent(BaseModel): + """Content within a conversation item. + + Parameters: + type: Content type (text, audio, input_text, input_audio, output_text, or output_audio). + text: Text content for text-based items. + audio: Base64-encoded audio data for audio items. + transcript: Transcribed text for audio items. + """ + + type: Literal["text", "audio", "input_text", "input_audio", "output_text", "output_audio"] + text: Optional[str] = None + audio: Optional[str] = None # base64-encoded audio + transcript: Optional[str] = None + + +class ConversationItem(BaseModel): + """A conversation item in the realtime session. + + Parameters: + id: Unique identifier for the item, auto-generated if not provided. + object: Object type identifier for the realtime API. + type: Item type (message, function_call, or function_call_output). + status: Current status of the item. + role: Speaker role for message items (user, assistant, or system). + content: Content list for message items. + call_id: Function call identifier for function_call items. + name: Function name for function_call items. + arguments: Function arguments as JSON string for function_call items. + output: Function output as JSON string for function_call_output items. + """ + + id: str = Field(default_factory=lambda: str(uuid.uuid4().hex)) + object: Optional[Literal["realtime.item"]] = None + type: Literal["message", "function_call", "function_call_output"] + status: Optional[Literal["completed", "in_progress", "incomplete"]] = None + # role and content are present for message items + role: Optional[Literal["user", "assistant", "system"]] = None + content: Optional[List[ItemContent]] = None + # these four fields are present for function_call items + call_id: Optional[str] = None + name: Optional[str] = None + arguments: Optional[str] = None + output: Optional[str] = None + + +class RealtimeConversation(BaseModel): + """A realtime conversation session. + + Parameters: + id: Unique identifier for the conversation. + object: Object type identifier, always "realtime.conversation". + """ + + id: str + object: Literal["realtime.conversation"] + + +class ResponseProperties(BaseModel): + """Properties for configuring assistant responses. + + Parameters: + output_modalities: Output modalities for the response. Must be either ["text"] or ["audio"]. Defaults to ["audio"]. + instructions: Specific instructions for this response. + audio: Audio configuration for this response. + tools: Available tools for this response. + tool_choice: Tool usage strategy for this response. + temperature: Sampling temperature for this response. + max_output_tokens: Maximum tokens for this response. + """ + + output_modalities: Optional[List[Literal["text", "audio"]]] = ["audio"] + instructions: Optional[str] = None + audio: Optional[AudioConfiguration] = None + tools: Optional[List[Dict]] = None + tool_choice: Optional[Literal["auto", "none", "required"]] = None + temperature: Optional[float] = None + max_output_tokens: Optional[Union[int, Literal["inf"]]] = None + + +# +# error class +# +class RealtimeError(BaseModel): + """Error information from the realtime API. + + Parameters: + type: Error type identifier. + code: Specific error code. + message: Human-readable error message. + param: Parameter name that caused the error, if applicable. + event_id: Event ID associated with the error, if applicable. + """ + + type: str + code: Optional[str] = "" + message: str + param: Optional[str] = None + event_id: Optional[str] = None + + +# +# client events +# + + +class ClientEvent(BaseModel): + """Base class for client events sent to the realtime API. + + Parameters: + event_id: Unique identifier for the event, auto-generated if not provided. + """ + + event_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + + +class SessionUpdateEvent(ClientEvent): + """Event to update session properties. + + Parameters: + type: Event type, always "session.update". + session: Updated session properties. + """ + + type: Literal["session.update"] = "session.update" + session: SessionProperties + + def model_dump(self, *args, **kwargs) -> Dict[str, Any]: + """Serialize the event to a dictionary. + + Handles special serialization for turn_detection where False becomes null. + + Args: + *args: Positional arguments passed to parent model_dump. + **kwargs: Keyword arguments passed to parent model_dump. + + Returns: + Dictionary representation of the event. + """ + dump = super().model_dump(*args, **kwargs) + + # Handle turn_detection in audio.input so that False becomes null + if "audio" in dump["session"] and dump["session"]["audio"]: + if "input" in dump["session"]["audio"] and dump["session"]["audio"]["input"]: + if "turn_detection" in dump["session"]["audio"]["input"]: + if dump["session"]["audio"]["input"]["turn_detection"] is False: + dump["session"]["audio"]["input"]["turn_detection"] = None + + return dump + + +class InputAudioBufferAppendEvent(ClientEvent): + """Event to append audio data to the input buffer. + + Parameters: + type: Event type, always "input_audio_buffer.append". + audio: Base64-encoded audio data to append. + """ + + type: Literal["input_audio_buffer.append"] = "input_audio_buffer.append" + audio: str # base64-encoded audio + + +class InputAudioBufferCommitEvent(ClientEvent): + """Event to commit the current input audio buffer. + + Parameters: + type: Event type, always "input_audio_buffer.commit". + """ + + type: Literal["input_audio_buffer.commit"] = "input_audio_buffer.commit" + + +class InputAudioBufferClearEvent(ClientEvent): + """Event to clear the input audio buffer. + + Parameters: + type: Event type, always "input_audio_buffer.clear". + """ + + type: Literal["input_audio_buffer.clear"] = "input_audio_buffer.clear" + + +class ConversationItemCreateEvent(ClientEvent): + """Event to create a new conversation item. + + Parameters: + type: Event type, always "conversation.item.create". + previous_item_id: ID of the item to insert after, if any. + item: The conversation item to create. + """ + + type: Literal["conversation.item.create"] = "conversation.item.create" + previous_item_id: Optional[str] = None + item: ConversationItem + + +class ConversationItemTruncateEvent(ClientEvent): + """Event to truncate a conversation item's audio content. + + Parameters: + type: Event type, always "conversation.item.truncate". + item_id: ID of the item to truncate. + content_index: Index of the content to truncate within the item. + audio_end_ms: End time in milliseconds for the truncated audio. + """ + + type: Literal["conversation.item.truncate"] = "conversation.item.truncate" + item_id: str + content_index: int + audio_end_ms: int + + +class ConversationItemDeleteEvent(ClientEvent): + """Event to delete a conversation item. + + Parameters: + type: Event type, always "conversation.item.delete". + item_id: ID of the item to delete. + """ + + type: Literal["conversation.item.delete"] = "conversation.item.delete" + item_id: str + + +class ConversationItemRetrieveEvent(ClientEvent): + """Event to retrieve a conversation item by ID. + + Parameters: + type: Event type, always "conversation.item.retrieve". + item_id: ID of the item to retrieve. + """ + + type: Literal["conversation.item.retrieve"] = "conversation.item.retrieve" + item_id: str + + +class ResponseCreateEvent(ClientEvent): + """Event to create a new assistant response. + + Parameters: + type: Event type, always "response.create". + response: Optional response configuration properties. + """ + + type: Literal["response.create"] = "response.create" + response: Optional[ResponseProperties] = None + + +class ResponseCancelEvent(ClientEvent): + """Event to cancel the current assistant response. + + Parameters: + type: Event type, always "response.cancel". + """ + + type: Literal["response.cancel"] = "response.cancel" + + +# +# server events +# + + +class ServerEvent(BaseModel): + """Base class for server events received from the realtime API. + + Parameters: + event_id: Unique identifier for the event. + type: Type of the server event. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + event_id: str + type: str + + +class SessionCreatedEvent(ServerEvent): + """Event indicating a session has been created. + + Parameters: + type: Event type, always "session.created". + session: The created session properties. + """ + + type: Literal["session.created"] + session: SessionProperties + + +class SessionUpdatedEvent(ServerEvent): + """Event indicating a session has been updated. + + Parameters: + type: Event type, always "session.updated". + session: The updated session properties. + """ + + type: Literal["session.updated"] + session: SessionProperties + + +class ConversationCreated(ServerEvent): + """Event indicating a conversation has been created. + + Parameters: + type: Event type, always "conversation.created". + conversation: The created conversation. + """ + + type: Literal["conversation.created"] + conversation: RealtimeConversation + + +class ConversationItemAdded(ServerEvent): + """Event indicating a conversation item has been added. + + Parameters: + type: Event type, always "conversation.item.added". + previous_item_id: ID of the previous item, if any. + item: The added conversation item. + """ + + type: Literal["conversation.item.added"] + previous_item_id: Optional[str] = None + item: ConversationItem + + +class ConversationItemDone(ServerEvent): + """Event indicating a conversation item is done processing. + + Parameters: + type: Event type, always "conversation.item.done". + previous_item_id: ID of the previous item, if any. + item: The completed conversation item. + """ + + type: Literal["conversation.item.done"] + previous_item_id: Optional[str] = None + item: ConversationItem + + +class ConversationItemInputAudioTranscriptionDelta(ServerEvent): + """Event containing incremental input audio transcription. + + Parameters: + type: Event type, always "conversation.item.input_audio_transcription.delta". + item_id: ID of the conversation item being transcribed. + content_index: Index of the content within the item. + delta: Incremental transcription text. + """ + + type: Literal["conversation.item.input_audio_transcription.delta"] + item_id: str + content_index: int + delta: str + + +class ConversationItemInputAudioTranscriptionCompleted(ServerEvent): + """Event indicating input audio transcription is complete. + + Parameters: + type: Event type, always "conversation.item.input_audio_transcription.completed". + item_id: ID of the conversation item that was transcribed. + content_index: Index of the content within the item. + transcript: Complete transcription text. + """ + + type: Literal["conversation.item.input_audio_transcription.completed"] + item_id: str + content_index: int + transcript: str + + +class ConversationItemInputAudioTranscriptionFailed(ServerEvent): + """Event indicating input audio transcription failed. + + Parameters: + type: Event type, always "conversation.item.input_audio_transcription.failed". + item_id: ID of the conversation item that failed transcription. + content_index: Index of the content within the item. + error: Error details for the transcription failure. + """ + + type: Literal["conversation.item.input_audio_transcription.failed"] + item_id: str + content_index: int + error: RealtimeError + + +class ConversationItemTruncated(ServerEvent): + """Event indicating a conversation item has been truncated. + + Parameters: + type: Event type, always "conversation.item.truncated". + item_id: ID of the truncated conversation item. + content_index: Index of the content within the item. + audio_end_ms: End time in milliseconds for the truncated audio. + """ + + type: Literal["conversation.item.truncated"] + item_id: str + content_index: int + audio_end_ms: int + + +class ConversationItemDeleted(ServerEvent): + """Event indicating a conversation item has been deleted. + + Parameters: + type: Event type, always "conversation.item.deleted". + item_id: ID of the deleted conversation item. + """ + + type: Literal["conversation.item.deleted"] + item_id: str + + +class ConversationItemRetrieved(ServerEvent): + """Event containing a retrieved conversation item. + + Parameters: + type: Event type, always "conversation.item.retrieved". + item: The retrieved conversation item. + """ + + type: Literal["conversation.item.retrieved"] + item: ConversationItem + + +class ResponseCreated(ServerEvent): + """Event indicating an assistant response has been created. + + Parameters: + type: Event type, always "response.created". + response: The created response object. + """ + + type: Literal["response.created"] + response: "Response" + + +class ResponseDone(ServerEvent): + """Event indicating an assistant response is complete. + + Parameters: + type: Event type, always "response.done". + response: The completed response object. + """ + + type: Literal["response.done"] + response: "Response" + + +class ResponseOutputItemAdded(ServerEvent): + """Event indicating an output item has been added to a response. + + Parameters: + type: Event type, always "response.output_item.added". + response_id: ID of the response. + output_index: Index of the output item. + item: The added conversation item. + """ + + type: Literal["response.output_item.added"] + response_id: str + output_index: int + item: ConversationItem + + +class ResponseOutputItemDone(ServerEvent): + """Event indicating an output item is complete. + + Parameters: + type: Event type, always "response.output_item.done". + response_id: ID of the response. + output_index: Index of the output item. + item: The completed conversation item. + """ + + type: Literal["response.output_item.done"] + response_id: str + output_index: int + item: ConversationItem + + +class ResponseContentPartAdded(ServerEvent): + """Event indicating a content part has been added to a response. + + Parameters: + type: Event type, always "response.content_part.added". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + part: The added content part. + """ + + type: Literal["response.content_part.added"] + response_id: str + item_id: str + output_index: int + content_index: int + part: ItemContent + + +class ResponseContentPartDone(ServerEvent): + """Event indicating a content part is complete. + + Parameters: + type: Event type, always "response.content_part.done". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + part: The completed content part. + """ + + type: Literal["response.content_part.done"] + response_id: str + item_id: str + output_index: int + content_index: int + part: ItemContent + + +class ResponseTextDelta(ServerEvent): + """Event containing incremental text from a response. + + Parameters: + type: Event type, always "response.output_text.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + delta: Incremental text content. + """ + + type: Literal["response.output_text.delta"] + response_id: str + item_id: str + output_index: int + content_index: int + delta: str + + +class ResponseTextDone(ServerEvent): + """Event indicating text content is complete. + + Parameters: + type: Event type, always "response.output_text.done". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + text: Complete text content. + """ + + type: Literal["response.output_text.done"] + response_id: str + item_id: str + output_index: int + content_index: int + text: str + + +class ResponseAudioTranscriptDelta(ServerEvent): + """Event containing incremental audio transcript from a response. + + Parameters: + type: Event type, always "response.output_audio_transcript.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + delta: Incremental transcript text. + """ + + type: Literal["response.output_audio_transcript.delta"] + response_id: str + item_id: str + output_index: int + content_index: int + delta: str + + +class ResponseAudioTranscriptDone(ServerEvent): + """Event indicating audio transcript is complete. + + Parameters: + type: Event type, always "response.output_audio_transcript.done". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + transcript: Complete transcript text. + """ + + type: Literal["response.output_audio_transcript.done"] + response_id: str + item_id: str + output_index: int + content_index: int + transcript: str + + +class ResponseAudioDelta(ServerEvent): + """Event containing incremental audio data from a response. + + Parameters: + type: Event type, always "response.output_audio.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + delta: Base64-encoded incremental audio data. + """ + + type: Literal["response.output_audio.delta"] + response_id: str + item_id: str + output_index: int + content_index: int + delta: str # base64-encoded audio + + +class ResponseAudioDone(ServerEvent): + """Event indicating audio content is complete. + + Parameters: + type: Event type, always "response.output_audio.done". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + """ + + type: Literal["response.output_audio.done"] + response_id: str + item_id: str + output_index: int + content_index: int + + +class ResponseFunctionCallArgumentsDelta(ServerEvent): + """Event containing incremental function call arguments. + + Parameters: + type: Event type, always "response.function_call_arguments.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + call_id: ID of the function call. + delta: Incremental function arguments as JSON. + """ + + type: Literal["response.function_call_arguments.delta"] + response_id: str + item_id: str + output_index: int + call_id: str + delta: str + + +class ResponseFunctionCallArgumentsDone(ServerEvent): + """Event indicating function call arguments are complete. + + Parameters: + type: Event type, always "response.function_call_arguments.done". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + call_id: ID of the function call. + arguments: Complete function arguments as JSON string. + """ + + type: Literal["response.function_call_arguments.done"] + response_id: str + item_id: str + output_index: int + call_id: str + arguments: str + + +class InputAudioBufferSpeechStarted(ServerEvent): + """Event indicating speech has started in the input audio buffer. + + Parameters: + type: Event type, always "input_audio_buffer.speech_started". + audio_start_ms: Start time of speech in milliseconds. + item_id: ID of the associated conversation item. + """ + + type: Literal["input_audio_buffer.speech_started"] + audio_start_ms: int + item_id: str + + +class InputAudioBufferSpeechStopped(ServerEvent): + """Event indicating speech has stopped in the input audio buffer. + + Parameters: + type: Event type, always "input_audio_buffer.speech_stopped". + audio_end_ms: End time of speech in milliseconds. + item_id: ID of the associated conversation item. + """ + + type: Literal["input_audio_buffer.speech_stopped"] + audio_end_ms: int + item_id: str + + +class InputAudioBufferCommitted(ServerEvent): + """Event indicating the input audio buffer has been committed. + + Parameters: + type: Event type, always "input_audio_buffer.committed". + previous_item_id: ID of the previous item, if any. + item_id: ID of the committed conversation item. + """ + + type: Literal["input_audio_buffer.committed"] + previous_item_id: Optional[str] = None + item_id: str + + +class InputAudioBufferCleared(ServerEvent): + """Event indicating the input audio buffer has been cleared. + + Parameters: + type: Event type, always "input_audio_buffer.cleared". + """ + + type: Literal["input_audio_buffer.cleared"] + + +class ErrorEvent(ServerEvent): + """Event indicating an error occurred. + + Parameters: + type: Event type, always "error". + error: Error details. + """ + + type: Literal["error"] + error: RealtimeError + + +class RateLimitsUpdated(ServerEvent): + """Event indicating rate limits have been updated. + + Parameters: + type: Event type, always "rate_limits.updated". + rate_limits: List of rate limit information. + """ + + type: Literal["rate_limits.updated"] + rate_limits: List[Dict[str, Any]] + + +class CachedTokensDetails(BaseModel): + """Details about cached tokens. + + Parameters: + text_tokens: Number of cached text tokens. + audio_tokens: Number of cached audio tokens. + """ + + text_tokens: Optional[int] = 0 + audio_tokens: Optional[int] = 0 + + +class TokenDetails(BaseModel): + """Detailed token usage information. + + Parameters: + cached_tokens: Number of cached tokens used. Defaults to 0. + text_tokens: Number of text tokens used. Defaults to 0. + audio_tokens: Number of audio tokens used. Defaults to 0. + cached_tokens_details: Detailed breakdown of cached tokens. + image_tokens: Number of image tokens used (for input only). + """ + + cached_tokens: Optional[int] = 0 + text_tokens: Optional[int] = 0 + audio_tokens: Optional[int] = 0 + cached_tokens_details: Optional[CachedTokensDetails] = None + image_tokens: Optional[int] = 0 + + class Config: + """Pydantic configuration for TokenDetails.""" + + extra = "allow" + + +class Usage(BaseModel): + """Token usage statistics for a response. + + Parameters: + total_tokens: Total number of tokens used. + input_tokens: Number of input tokens used. + output_tokens: Number of output tokens used. + input_token_details: Detailed breakdown of input token usage. + output_token_details: Detailed breakdown of output token usage. + """ + + total_tokens: int + input_tokens: int + output_tokens: int + input_token_details: TokenDetails + output_token_details: TokenDetails + + +class Response(BaseModel): + """A complete assistant response. + + Parameters: + id: Unique identifier for the response. + object: Object type, always "realtime.response". + status: Current status of the response. + status_details: Additional status information. + output: List of conversation items in the response. + conversation_id: Which conversation the response is added to. + output_modalities: The set of modalities the model used to respond. + max_output_tokens: Maximum number of output tokens used. + audio: Audio configuration for the response. + usage: Token usage statistics for the response. + voice: The voice the model used to respond. + temperature: Sampling temperature used for the response. + output_audio_format: The format of output audio. + """ + + id: str + object: Literal["realtime.response"] + status: Literal["completed", "in_progress", "incomplete", "cancelled", "failed"] + status_details: Any + output: List[ConversationItem] + output_modalities: Optional[List[Literal["text", "audio"]]] = None + max_output_tokens: Optional[Union[int, Literal["inf"]]] = None + audio: Optional[AudioConfiguration] = None + usage: Optional[Usage] = None + voice: Optional[str] = None + temperature: Optional[float] = None + output_audio_format: Optional[str] = None + + +_server_event_types = { + "error": ErrorEvent, + "session.created": SessionCreatedEvent, + "session.updated": SessionUpdatedEvent, + "conversation.created": ConversationCreated, + "input_audio_buffer.committed": InputAudioBufferCommitted, + "input_audio_buffer.cleared": InputAudioBufferCleared, + "input_audio_buffer.speech_started": InputAudioBufferSpeechStarted, + "input_audio_buffer.speech_stopped": InputAudioBufferSpeechStopped, + "conversation.item.added": ConversationItemAdded, + "conversation.item.done": ConversationItemDone, + "conversation.item.input_audio_transcription.delta": ConversationItemInputAudioTranscriptionDelta, + "conversation.item.input_audio_transcription.completed": ConversationItemInputAudioTranscriptionCompleted, + "conversation.item.input_audio_transcription.failed": ConversationItemInputAudioTranscriptionFailed, + "conversation.item.truncated": ConversationItemTruncated, + "conversation.item.deleted": ConversationItemDeleted, + "conversation.item.retrieved": ConversationItemRetrieved, + "response.created": ResponseCreated, + "response.done": ResponseDone, + "response.output_item.added": ResponseOutputItemAdded, + "response.output_item.done": ResponseOutputItemDone, + "response.content_part.added": ResponseContentPartAdded, + "response.content_part.done": ResponseContentPartDone, + "response.output_text.delta": ResponseTextDelta, + "response.output_text.done": ResponseTextDone, + "response.output_audio_transcript.delta": ResponseAudioTranscriptDelta, + "response.output_audio_transcript.done": ResponseAudioTranscriptDone, + "response.output_audio.delta": ResponseAudioDelta, + "response.output_audio.done": ResponseAudioDone, + "response.function_call_arguments.delta": ResponseFunctionCallArgumentsDelta, + "response.function_call_arguments.done": ResponseFunctionCallArgumentsDone, + "rate_limits.updated": RateLimitsUpdated, +} + + +def parse_server_event(str): + """Parse a server event from JSON string. + + Args: + str: JSON string containing the server event. + + Returns: + Parsed server event object of the appropriate type. + + Raises: + Exception: If the event type is unimplemented or parsing fails. + """ + try: + event = json.loads(str) + event_type = event["type"] + if event_type not in _server_event_types: + raise Exception(f"Unimplemented server event type: {event_type}") + return _server_event_types[event_type].model_validate(event) + except Exception as e: + raise Exception(f"{e} \n\n{str}") diff --git a/src/pipecat/services/openai_realtime/frames.py b/src/pipecat/services/openai_realtime/frames.py new file mode 100644 index 000000000..25e7c409c --- /dev/null +++ b/src/pipecat/services/openai_realtime/frames.py @@ -0,0 +1,37 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Custom frame types for OpenAI Realtime API integration.""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from pipecat.frames.frames import DataFrame, FunctionCallResultFrame + +if TYPE_CHECKING: + from pipecat.services.openai_realtime_beta.context import OpenAIRealtimeLLMContext + + +@dataclass +class RealtimeMessagesUpdateFrame(DataFrame): + """Frame indicating that the realtime context messages have been updated. + + Parameters: + context: The updated OpenAI realtime LLM context. + """ + + context: "OpenAIRealtimeLLMContext" + + +@dataclass +class RealtimeFunctionCallResultFrame(DataFrame): + """Frame containing function call results for the realtime service. + + Parameters: + result_frame: The function call result frame to send to the realtime API. + """ + + result_frame: FunctionCallResultFrame diff --git a/src/pipecat/services/openai_realtime/openai.py b/src/pipecat/services/openai_realtime/openai.py new file mode 100644 index 000000000..67e3ac9e6 --- /dev/null +++ b/src/pipecat/services/openai_realtime/openai.py @@ -0,0 +1,831 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""OpenAI Realtime LLM service implementation with WebSocket support.""" + +import base64 +import json +import time +from dataclasses import dataclass +from typing import Optional + +from loguru import logger + +from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter +from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InputAudioRawFrame, + InterimTranscriptionFrame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMMessagesAppendFrame, + LLMSetToolsFrame, + LLMTextFrame, + LLMUpdateSettingsFrame, + StartFrame, + StartInterruptionFrame, + TranscriptionFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, + TTSTextFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.metrics.metrics import LLMTokenUsage +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantAggregatorParams, + LLMUserAggregatorParams, +) +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.llm_service import FunctionCallFromLLM, LLMService +from pipecat.services.openai.llm import OpenAIContextAggregatorPair +from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 +from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt + +from . import events +from .context import ( + OpenAIRealtimeAssistantContextAggregator, + OpenAIRealtimeLLMContext, + OpenAIRealtimeUserContextAggregator, +) +from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame + +try: + from websockets.asyncio.client import connect as websocket_connect +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.") + raise Exception(f"Missing module: {e}") + + +@dataclass +class CurrentAudioResponse: + """Tracks the current audio response from the assistant. + + Parameters: + item_id: Unique identifier for the audio response item. + content_index: Index of the audio content within the item. + start_time_ms: Timestamp when the audio response started in milliseconds. + total_size: Total size of audio data received in bytes. Defaults to 0. + """ + + item_id: str + content_index: int + start_time_ms: int + total_size: int = 0 + + +class OpenAIRealtimeLLMService(LLMService): + """OpenAI Realtime LLM service providing real-time audio and text communication. + + Implements the OpenAI Realtime API with WebSocket communication for low-latency + bidirectional audio and text interactions. Supports function calling, conversation + management, and real-time transcription. + """ + + # Overriding the default adapter to use the OpenAIRealtimeLLMAdapter one. + adapter_class = OpenAIRealtimeLLMAdapter + + def __init__( + self, + *, + api_key: str, + model: str = "gpt-realtime", + base_url: str = "wss://api.openai.com/v1/realtime", + session_properties: Optional[events.SessionProperties] = None, + start_audio_paused: bool = False, + send_transcription_frames: bool = True, + **kwargs, + ): + """Initialize the OpenAI Realtime LLM service. + + Args: + api_key: OpenAI API key for authentication. + model: OpenAI model name. Defaults to "gpt-4o-realtime-preview-2025-06-03". + base_url: WebSocket base URL for the realtime API. + Defaults to "wss://api.openai.com/v1/realtime". + session_properties: Configuration properties for the realtime session. + If None, uses default SessionProperties. + start_audio_paused: Whether to start with audio input paused. Defaults to False. + send_transcription_frames: Whether to emit transcription frames. Defaults to True. + **kwargs: Additional arguments passed to parent LLMService. + """ + full_url = f"{base_url}?model={model}" + super().__init__(base_url=full_url, **kwargs) + + self.api_key = api_key + self.base_url = full_url + self.set_model_name(model) + + self._session_properties: events.SessionProperties = ( + session_properties or events.SessionProperties() + ) + self._audio_input_paused = start_audio_paused + self._send_transcription_frames = send_transcription_frames + self._websocket = None + self._receive_task = None + self._context = None + + self._disconnecting = False + self._api_session_ready = False + self._run_llm_when_api_session_ready = False + + self._current_assistant_response = None + self._current_audio_response = None + + self._messages_added_manually = {} + self._user_and_response_message_tuple = None + self._pending_function_calls = {} # Track function calls by call_id + + self._register_event_handler("on_conversation_item_created") + self._register_event_handler("on_conversation_item_updated") + self._retrieve_conversation_item_futures = {} + + def can_generate_metrics(self) -> bool: + """Check if the service can generate usage metrics. + + Returns: + True if metrics generation is supported. + """ + return True + + def set_audio_input_paused(self, paused: bool): + """Set whether audio input is paused. + + Args: + paused: True to pause audio input, False to resume. + """ + self._audio_input_paused = paused + + def _is_modality_enabled(self, modality: str) -> bool: + """Check if a specific modality is enabled, "text" or "audio".""" + modalities = self._session_properties.output_modalities or ["audio", "text"] + return modality in modalities + + def _get_enabled_modalities(self) -> list[str]: + """Get the list of enabled modalities.""" + modalities = self._session_properties.output_modalities or ["audio", "text"] + # API only supports single modality responses: either ["text"] or ["audio"] + if "audio" in modalities: + return ["audio"] + elif "text" in modalities: + return ["text"] + + async def retrieve_conversation_item(self, item_id: str): + """Retrieve a conversation item by ID from the server. + + Args: + item_id: The ID of the conversation item to retrieve. + + Returns: + The retrieved conversation item. + """ + future = self.get_event_loop().create_future() + retrieval_in_flight = False + if not self._retrieve_conversation_item_futures.get(item_id): + self._retrieve_conversation_item_futures[item_id] = [] + else: + retrieval_in_flight = True + self._retrieve_conversation_item_futures[item_id].append(future) + if not retrieval_in_flight: + await self.send_client_event( + # Set event_id to "rci_{item_id}" so that we can identify an + # error later if the retrieval fails. We don't need a UUID + # suffix to the event_id because we're ensuring only one + # in-flight retrieval per item_id. (Note: "rci" = "retrieve + # conversation item") + events.ConversationItemRetrieveEvent(item_id=item_id, event_id=f"rci_{item_id}") + ) + return await future + + # + # standard AIService frame handling + # + + async def start(self, frame: StartFrame): + """Start the service and establish WebSocket connection. + + Args: + frame: The start frame triggering service initialization. + """ + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the service and close WebSocket connection. + + Args: + frame: The end frame triggering service shutdown. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the service and close WebSocket connection. + + Args: + frame: The cancel frame triggering service cancellation. + """ + await super().cancel(frame) + await self._disconnect() + + # + # speech and interruption handling + # + + async def _handle_interruption(self): + # None and False are different. Check for False. None means we're using OpenAI's + # built-in turn detection defaults. + turn_detection_disabled = ( + self._session_properties.audio + and self._session_properties.audio.input + and self._session_properties.audio.input.turn_detection is False + ) + if turn_detection_disabled: + await self.send_client_event(events.InputAudioBufferClearEvent()) + await self.send_client_event(events.ResponseCancelEvent()) + await self._truncate_current_audio_response() + await self.stop_all_metrics() + if self._current_assistant_response: + await self.push_frame(LLMFullResponseEndFrame()) + # Only push TTSStoppedFrame if audio modality is enabled + if self._is_modality_enabled("audio"): + await self.push_frame(TTSStoppedFrame()) + + async def _handle_user_started_speaking(self, frame): + pass + + async def _handle_user_stopped_speaking(self, frame): + # None and False are different. Check for False. None means we're using OpenAI's + # built-in turn detection defaults. + turn_detection_disabled = ( + self._session_properties.audio + and self._session_properties.audio.input + and self._session_properties.audio.input.turn_detection is False + ) + if turn_detection_disabled: + await self.send_client_event(events.InputAudioBufferCommitEvent()) + await self.send_client_event(events.ResponseCreateEvent()) + + async def _handle_bot_stopped_speaking(self): + self._current_audio_response = None + + def _calculate_audio_duration_ms( + self, total_bytes: int, sample_rate: int = 24000, bytes_per_sample: int = 2 + ) -> int: + """Calculate audio duration in milliseconds based on PCM audio parameters.""" + samples = total_bytes / bytes_per_sample + duration_seconds = samples / sample_rate + return int(duration_seconds * 1000) + + async def _truncate_current_audio_response(self): + """Truncates the current audio response at the appropriate duration. + + Calculates the actual duration of the audio content and truncates at the shorter of + either the wall clock time or the actual audio duration to prevent invalid truncation + requests. + """ + if not self._current_audio_response: + return + + # if the bot is still speaking, truncate the last message + try: + current = self._current_audio_response + self._current_audio_response = None + + # Calculate actual audio duration instead of using wall clock time + audio_duration_ms = self._calculate_audio_duration_ms(current.total_size) + + # Use the shorter of wall clock time or actual audio duration + elapsed_ms = int(time.time() * 1000 - current.start_time_ms) + truncate_ms = min(elapsed_ms, audio_duration_ms) + + logger.trace( + f"Truncating audio: duration={audio_duration_ms}ms, " + f"elapsed={elapsed_ms}ms, truncate={truncate_ms}ms" + ) + + await self.send_client_event( + events.ConversationItemTruncateEvent( + item_id=current.item_id, + content_index=current.content_index, + audio_end_ms=truncate_ms, + ) + ) + except Exception as e: + # Log warning and don't re-raise - allow session to continue + logger.warning(f"Audio truncation failed (non-fatal): {e}") + + # + # frame processing + # + # StartFrame, StopFrame, CancelFrame implemented in base class + # + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames from the pipeline. + + Args: + frame: The frame to process. + direction: The direction of frame flow in the pipeline. + """ + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + pass + elif isinstance(frame, OpenAILLMContextFrame): + context: OpenAIRealtimeLLMContext = OpenAIRealtimeLLMContext.upgrade_to_realtime( + frame.context + ) + if not self._context: + self._context = context + elif frame.context is not self._context: + # If the context has changed, reset the conversation + self._context = context + await self.reset_conversation() + # Run the LLM at next opportunity + await self._create_response() + elif isinstance(frame, LLMContextFrame): + raise NotImplementedError( + "Universal LLMContext is not yet supported for OpenAI Realtime." + ) + elif isinstance(frame, InputAudioRawFrame): + if not self._audio_input_paused: + await self._send_user_audio(frame) + elif isinstance(frame, StartInterruptionFrame): + await self._handle_interruption() + elif isinstance(frame, UserStartedSpeakingFrame): + await self._handle_user_started_speaking(frame) + elif isinstance(frame, UserStoppedSpeakingFrame): + await self._handle_user_stopped_speaking(frame) + elif isinstance(frame, BotStoppedSpeakingFrame): + await self._handle_bot_stopped_speaking() + elif isinstance(frame, LLMMessagesAppendFrame): + await self._handle_messages_append(frame) + elif isinstance(frame, RealtimeMessagesUpdateFrame): + self._context = frame.context + elif isinstance(frame, LLMUpdateSettingsFrame): + self._session_properties = events.SessionProperties(**frame.settings) + await self._update_settings() + elif isinstance(frame, LLMSetToolsFrame): + await self._update_settings() + elif isinstance(frame, RealtimeFunctionCallResultFrame): + await self._handle_function_call_result(frame.result_frame) + + await self.push_frame(frame, direction) + + async def _handle_messages_append(self, frame): + logger.error("!!! NEED TO IMPLEMENT MESSAGES APPEND") + + async def _handle_function_call_result(self, frame): + item = events.ConversationItem( + type="function_call_output", + call_id=frame.tool_call_id, + output=json.dumps(frame.result), + ) + await self.send_client_event(events.ConversationItemCreateEvent(item=item)) + + # + # websocket communication + # + + async def send_client_event(self, event: events.ClientEvent): + """Send a client event to the OpenAI Realtime API. + + Args: + event: The client event to send. + """ + await self._ws_send(event.model_dump(exclude_none=True)) + + async def _connect(self): + try: + if self._websocket: + # Here we assume that if we have a websocket, we are connected. We + # handle disconnections in the send/recv code paths. + return + self._websocket = await websocket_connect( + uri=self.base_url, + additional_headers={ + "Authorization": f"Bearer {self.api_key}", + }, + ) + self._receive_task = self.create_task(self._receive_task_handler()) + except Exception as e: + logger.error(f"{self} initialization error: {e}") + self._websocket = None + + async def _disconnect(self): + try: + self._disconnecting = True + self._api_session_ready = False + await self.stop_all_metrics() + if self._websocket: + await self._websocket.close() + self._websocket = None + if self._receive_task: + await self.cancel_task(self._receive_task, timeout=1.0) + self._receive_task = None + self._disconnecting = False + except Exception as e: + logger.error(f"{self} error disconnecting: {e}") + + async def _ws_send(self, realtime_message): + try: + if self._websocket: + await self._websocket.send(json.dumps(realtime_message)) + except Exception as e: + if self._disconnecting: + return + logger.error(f"Error sending message to websocket: {e}") + # In server-to-server contexts, a WebSocket error should be quite rare. Given how hard + # it is to recover from a send-side error with proper state management, and that exponential + # backoff for retries can have cost/stability implications for a service cluster, let's just + # treat a send-side error as fatal. + await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True)) + + async def _update_settings(self): + settings = self._session_properties + # tools given in the context override the tools in the session properties + if self._context and self._context.tools: + settings.tools = self._context.tools + # instructions in the context come from an initial "system" message in the + # messages list, and override instructions in the session properties + if self._context and self._context._session_instructions: + settings.instructions = self._context._session_instructions + await self.send_client_event(events.SessionUpdateEvent(session=settings)) + + # + # inbound server event handling + # https://platform.openai.com/docs/api-reference/realtime-server-events + # + + async def _receive_task_handler(self): + async for message in self._websocket: + evt = events.parse_server_event(message) + if evt.type == "session.created": + await self._handle_evt_session_created(evt) + elif evt.type == "session.updated": + await self._handle_evt_session_updated(evt) + elif evt.type == "response.output_audio.delta": + await self._handle_evt_audio_delta(evt) + elif evt.type == "response.output_audio.done": + await self._handle_evt_audio_done(evt) + elif evt.type == "conversation.item.added": + await self._handle_evt_conversation_item_added(evt) + elif evt.type == "conversation.item.done": + await self._handle_evt_conversation_item_done(evt) + elif evt.type == "conversation.item.input_audio_transcription.delta": + await self._handle_evt_input_audio_transcription_delta(evt) + elif evt.type == "conversation.item.input_audio_transcription.completed": + await self.handle_evt_input_audio_transcription_completed(evt) + elif evt.type == "conversation.item.retrieved": + await self._handle_conversation_item_retrieved(evt) + elif evt.type == "response.done": + await self._handle_evt_response_done(evt) + elif evt.type == "input_audio_buffer.speech_started": + await self._handle_evt_speech_started(evt) + elif evt.type == "input_audio_buffer.speech_stopped": + await self._handle_evt_speech_stopped(evt) + elif evt.type == "response.output_text.delta": + await self._handle_evt_text_delta(evt) + elif evt.type == "response.output_audio_transcript.delta": + await self._handle_evt_audio_transcript_delta(evt) + elif evt.type == "response.function_call_arguments.done": + await self._handle_evt_function_call_arguments_done(evt) + elif evt.type == "error": + if not await self._maybe_handle_evt_retrieve_conversation_item_error(evt): + await self._handle_evt_error(evt) + # errors are fatal, so exit the receive loop + return + + @traced_openai_realtime(operation="llm_setup") + async def _handle_evt_session_created(self, evt): + # session.created is received right after connecting. Send a message + # to configure the session properties. + await self._update_settings() + + async def _handle_evt_session_updated(self, evt): + # If this is our first context frame, run the LLM + self._api_session_ready = True + # Now that we've configured the session, we can run the LLM if we need to. + if self._run_llm_when_api_session_ready: + self._run_llm_when_api_session_ready = False + await self._create_response() + + async def _handle_evt_audio_delta(self, evt): + # note: ttfb is faster by 1/2 RTT than ttfb as measured for other services, since we're getting + # this event from the server + await self.stop_ttfb_metrics() + if not self._current_audio_response: + self._current_audio_response = CurrentAudioResponse( + item_id=evt.item_id, + content_index=evt.content_index, + start_time_ms=int(time.time() * 1000), + ) + await self.push_frame(TTSStartedFrame()) + audio = base64.b64decode(evt.delta) + self._current_audio_response.total_size += len(audio) + frame = TTSAudioRawFrame( + audio=audio, + sample_rate=24000, + num_channels=1, + ) + await self.push_frame(frame) + + async def _handle_evt_audio_done(self, evt): + if self._current_audio_response: + await self.push_frame(TTSStoppedFrame()) + # Don't clear the self._current_audio_response here. We need to wait until we + # receive a BotStoppedSpeakingFrame from the output transport. + + async def _handle_evt_conversation_item_added(self, evt): + """Handle conversation.item.added event - item is added but may still be processing.""" + if evt.item.type == "function_call": + # Track this function call for when arguments are completed + # Only add if not already tracked (prevent duplicates) + if evt.item.call_id not in self._pending_function_calls: + self._pending_function_calls[evt.item.call_id] = evt.item + else: + logger.warning(f"Function call {evt.item.call_id} already tracked, skipping") + + await self._call_event_handler("on_conversation_item_created", evt.item.id, evt.item) + + # This will get sent from the server every time a new "message" is added + # to the server's conversation state, whether we create it via the API + # or the server creates it from LLM output. + if self._messages_added_manually.get(evt.item.id): + del self._messages_added_manually[evt.item.id] + return + + if evt.item.role == "user": + # We need to wait for completion of both user message and response message. Then we'll + # add both to the context. User message is complete when we have a "transcript" field + # that is not None. Response message is complete when we get a "response.done" event. + self._user_and_response_message_tuple = (evt.item, {"done": False, "output": []}) + elif evt.item.role == "assistant": + self._current_assistant_response = evt.item + await self.push_frame(LLMFullResponseStartFrame()) + + async def _handle_evt_conversation_item_done(self, evt): + """Handle conversation.item.done event - item is fully completed.""" + await self._call_event_handler("on_conversation_item_updated", evt.item.id, evt.item) + # The item is now fully processed and ready + # For now, no additional logic needed beyond the event handler call + + async def _handle_evt_input_audio_transcription_delta(self, evt): + if self._send_transcription_frames: + await self.push_frame( + # no way to get a language code? + InterimTranscriptionFrame(evt.delta, "", time_now_iso8601(), result=evt) + ) + + @traced_stt + async def _handle_user_transcription( + self, transcript: str, is_final: bool, language: Optional[Language] = None + ): + """Handle a transcription result with tracing.""" + pass + + async def handle_evt_input_audio_transcription_completed(self, evt): + """Handle completion of input audio transcription. + + Args: + evt: The transcription completed event. + """ + await self._call_event_handler("on_conversation_item_updated", evt.item_id, None) + + if self._send_transcription_frames: + await self.push_frame( + # no way to get a language code? + TranscriptionFrame(evt.transcript, "", time_now_iso8601(), result=evt) + ) + await self._handle_user_transcription(evt.transcript, True, Language.EN) + pair = self._user_and_response_message_tuple + if pair: + user, assistant = pair + user.content[0].transcript = evt.transcript + if assistant["done"]: + self._user_and_response_message_tuple = None + self._context.add_user_content_item_as_message(user) + else: + # User message without preceding conversation.item.created. Bug? + logger.warning(f"Transcript for unknown user message: {evt}") + + async def _handle_conversation_item_retrieved(self, evt: events.ConversationItemRetrieved): + futures = self._retrieve_conversation_item_futures.pop(evt.item.id, None) + if futures: + for future in futures: + future.set_result(evt.item) + + @traced_openai_realtime(operation="llm_response") + async def _handle_evt_response_done(self, evt): + # todo: figure out whether there's anything we need to do for "cancelled" events + # usage metrics + tokens = LLMTokenUsage( + prompt_tokens=evt.response.usage.input_tokens, + completion_tokens=evt.response.usage.output_tokens, + total_tokens=evt.response.usage.total_tokens, + ) + await self.start_llm_usage_metrics(tokens) + await self.stop_processing_metrics() + await self.push_frame(LLMFullResponseEndFrame()) + self._current_assistant_response = None + # error handling + if evt.response.status == "failed": + await self.push_error( + ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True) + ) + return + # response content + for item in evt.response.output: + await self._call_event_handler("on_conversation_item_updated", item.id, item) + pair = self._user_and_response_message_tuple + if pair: + user, assistant = pair + assistant["done"] = True + assistant["output"] = evt.response.output + if user.content[0].transcript is not None: + self._user_and_response_message_tuple = None + self._context.add_user_content_item_as_message(user) + else: + # Response message without preceding user message (standalone response) + # Function calls in this response were already processed immediately when arguments were complete + logger.debug(f"Handling standalone response: {evt.response.id}") + + async def _handle_evt_text_delta(self, evt): + if evt.delta: + await self.push_frame(LLMTextFrame(evt.delta)) + + async def _handle_evt_audio_transcript_delta(self, evt): + if evt.delta: + await self.push_frame(LLMTextFrame(evt.delta)) + await self.push_frame(TTSTextFrame(evt.delta)) + + async def _handle_evt_function_call_arguments_done(self, evt): + """Handle completion of function call arguments. + + Args: + evt: The response.function_call_arguments.done event. + """ + # Process the function call immediately when arguments are complete + # This is needed because function calls might not trigger response.done + try: + # Parse the arguments + args = json.loads(evt.arguments) + + # Get the function call item we tracked earlier + function_call_item = self._pending_function_calls.get(evt.call_id) + if function_call_item: + # Remove from pending calls FIRST to prevent duplicate processing + del self._pending_function_calls[evt.call_id] + + # Create the function call and process it + function_calls = [ + FunctionCallFromLLM( + context=self._context, + tool_call_id=evt.call_id, + function_name=function_call_item.name, + arguments=args, + ) + ] + + await self.run_function_calls(function_calls) + logger.debug(f"Processed function call: {function_call_item.name}") + else: + logger.warning(f"No tracked function call found for call_id: {evt.call_id}") + logger.warning( + f"Available pending calls: {list(self._pending_function_calls.keys())}" + ) + + except Exception as e: + logger.error(f"Failed to process function call arguments: {e}") + + async def _handle_evt_speech_started(self, evt): + await self._truncate_current_audio_response() + await self._start_interruption() # cancels this processor task + await self.push_frame(StartInterruptionFrame()) # cancels downstream tasks + await self.push_frame(UserStartedSpeakingFrame()) + + async def _handle_evt_speech_stopped(self, evt): + await self.start_ttfb_metrics() + await self.start_processing_metrics() + await self._stop_interruption() + await self.push_frame(UserStoppedSpeakingFrame()) + + async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent): + """Maybe handle an error event related to retrieving a conversation item. + + If the given error event is an error retrieving a conversation item: + + - set an exception on the future that retrieve_conversation_item() is waiting on + - return true + Otherwise: + - return false + """ + if evt.error.code == "item_retrieve_invalid_item_id": + item_id = evt.error.event_id.split("_", 1)[1] # event_id is of the form "rci_{item_id}" + futures = self._retrieve_conversation_item_futures.pop(item_id, None) + if futures: + for future in futures: + future.set_exception(Exception(evt.error.message)) + return True + return False + + async def _handle_evt_error(self, evt): + # Errors are fatal to this connection. Send an ErrorFrame. + await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True)) + + # + # state and client events for the current conversation + # https://platform.openai.com/docs/api-reference/realtime-client-events + # + + async def reset_conversation(self): + """Reset the conversation by disconnecting and reconnecting. + + This is the safest way to start a new conversation. Note that this will + fail if called from the receive task. + """ + logger.debug("Resetting conversation") + await self._disconnect() + if self._context: + self._context.llm_needs_settings_update = True + self._context.llm_needs_initial_messages = True + await self._connect() + + @traced_openai_realtime(operation="llm_request") + async def _create_response(self): + if not self._api_session_ready: + self._run_llm_when_api_session_ready = True + return + + if self._context.llm_needs_initial_messages: + messages = self._context.get_messages_for_initializing_history() + for item in messages: + evt = events.ConversationItemCreateEvent(item=item) + self._messages_added_manually[evt.item.id] = True + await self.send_client_event(evt) + self._context.llm_needs_initial_messages = False + + if self._context.llm_needs_settings_update: + await self._update_settings() + self._context.llm_needs_settings_update = False + + logger.debug(f"Creating response: {self._context.get_messages_for_logging()}") + + await self.push_frame(LLMFullResponseStartFrame()) + await self.start_processing_metrics() + await self.start_ttfb_metrics() + await self.send_client_event( + events.ResponseCreateEvent( + response=events.ResponseProperties(output_modalities=self._get_enabled_modalities()) + ) + ) + + async def _send_user_audio(self, frame): + payload = base64.b64encode(frame.audio).decode("utf-8") + await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload)) + + def create_context_aggregator( + self, + context: OpenAILLMContext, + *, + user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(), + assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(), + ) -> OpenAIContextAggregatorPair: + """Create an instance of OpenAIContextAggregatorPair from an OpenAILLMContext. + + Constructor keyword arguments for both the user and assistant aggregators can be provided. + + Args: + context: The LLM context. + user_params: User aggregator parameters. + assistant_params: Assistant aggregator parameters. + + Returns: + OpenAIContextAggregatorPair: A pair of context aggregators, one for + the user and one for the assistant, encapsulated in an + OpenAIContextAggregatorPair. + """ + context.set_llm_adapter(self.get_llm_adapter()) + + OpenAIRealtimeLLMContext.upgrade_to_realtime(context) + user = OpenAIRealtimeUserContextAggregator(context, params=user_params) + + assistant_params.expect_stripped_words = False + assistant = OpenAIRealtimeAssistantContextAggregator(context, params=assistant_params) + return OpenAIContextAggregatorPair(_user=user, _assistant=assistant) diff --git a/src/pipecat/services/openai_realtime_beta/azure.py b/src/pipecat/services/openai_realtime_beta/azure.py index 44bb9ed0c..784438e81 100644 --- a/src/pipecat/services/openai_realtime_beta/azure.py +++ b/src/pipecat/services/openai_realtime_beta/azure.py @@ -6,6 +6,8 @@ """Azure OpenAI Realtime Beta LLM service implementation.""" +import warnings + from loguru import logger from .openai import OpenAIRealtimeBetaLLMService @@ -23,6 +25,10 @@ except ModuleNotFoundError as e: class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService): """Azure OpenAI Realtime Beta LLM service with Azure-specific authentication. + .. deprecated:: 0.0.84 + `AzureRealtimeBetaLLMService` is deprecated, use `AzureRealtimeLLMService` instead. + This class will be removed in version 1.0.0. + Extends the OpenAI Realtime service to work with Azure OpenAI endpoints, using Azure's authentication headers and endpoint format. Provides the same real-time audio and text communication capabilities as the base OpenAI service. @@ -44,6 +50,16 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService): **kwargs: Additional arguments passed to parent OpenAIRealtimeBetaLLMService. """ super().__init__(base_url=base_url, api_key=api_key, **kwargs) + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "AzureRealtimeBetaLLMService is deprecated and will be removed in version 1.0.0. " + "Use AzureRealtimeLLMService instead.", + DeprecationWarning, + stacklevel=2, + ) + self.api_key = api_key self.base_url = base_url diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index abbf85eb2..ef0ea92d6 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -9,6 +9,7 @@ import base64 import json import time +import warnings from dataclasses import dataclass from typing import Optional @@ -92,6 +93,10 @@ class CurrentAudioResponse: class OpenAIRealtimeBetaLLMService(LLMService): """OpenAI Realtime Beta LLM service providing real-time audio and text communication. + .. deprecated:: 0.0.84 + `OpenAIRealtimeBetaLLMService` is deprecated, use `OpenAIRealtimeLLMService` instead. + This class will be removed in version 1.0.0. + Implements the OpenAI Realtime API Beta with WebSocket communication for low-latency bidirectional audio and text interactions. Supports function calling, conversation management, and real-time transcription. @@ -124,6 +129,15 @@ class OpenAIRealtimeBetaLLMService(LLMService): send_transcription_frames: Whether to emit transcription frames. Defaults to True. **kwargs: Additional arguments passed to parent LLMService. """ + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "OpenAIRealtimeBetaLLMService is deprecated and will be removed in version 1.0.0. " + "Use OpenAIRealtimeLLMService instead.", + DeprecationWarning, + stacklevel=2, + ) + full_url = f"{base_url}?model={model}" super().__init__(base_url=full_url, **kwargs)