From bff741a6479bd0fcfed2ddef2b79dff7bfb7595a Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 20 May 2026 15:51:18 -0400 Subject: [PATCH] Migrate realtime examples to RealtimeServiceModeConfig MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pass realtime_service_mode=RealtimeServiceModeConfig() through every realtime LLM service example (base, async-tool, video, text-output, persistent-context, update-settings, MCP) so context aggregation uses the new realtime-mode semantics instead of relying on local VAD as a workaround. Where examples previously wired SileroVADAnalyzer into LLMUserAggregatorParams to coax turn frames out of services that don't emit them server-side (AWS Nova Sonic, Ultravox, Gemini Live), the local VAD is now removed. realtime_service_mode keeps context writes correct without it, and the Phase 1.5 server-side InterruptionFrame fixes for Nova Sonic and Ultravox keep the bot from talking past the user when they barge in. Transcript-logging event handlers move from on_user_turn_stopped / on_assistant_turn_stopped to on_user_message_added / on_assistant_message_added, which carry the finalized text in realtime mode (the turn-stopped events fire before the message is finalized, so their `content` is None in that mode). For services that don't emit user-turn frames (Gemini Live, AWS Nova Sonic, Ultravox) the example now carries a Tier 1 comment block that spells out which downstream processors won't activate, how to add local VAD if needed, and the caveat that locally-generated turn boundaries are a heuristic that may diverge from server-side ground truth. Adds examples/realtime/realtime-openai-local-vad.py, a new variant of the OpenAI Realtime example that disables OpenAI's server-side turn detection and drives turn boundaries locally — useful when you want a turn analyzer like LocalSmartTurnV3 to decide when the user is done speaking. Server-emitted turn frames are still preferred when available. The Gemini Live local-VAD variant already existed; it's been updated in place rather than rewritten. --- .../mcp/mcp-streamable-http-gemini-live.py | 5 +- .../persistent-context-aws-nova-sonic.py | 5 +- .../persistent-context-grok-realtime.py | 6 +- .../persistent-context-openai-realtime.py | 5 +- .../realtime-aws-nova-sonic-async-tool.py | 5 +- examples/realtime/realtime-aws-nova-sonic.py | 32 ++- .../realtime/realtime-azure-async-tool.py | 5 +- examples/realtime/realtime-azure.py | 5 +- .../realtime-gemini-live-async-tool.py | 10 +- .../realtime-gemini-live-files-api.py | 10 +- .../realtime-gemini-live-google-search.py | 10 +- .../realtime-gemini-live-graceful-end.py | 10 +- ...realtime-gemini-live-grounding-metadata.py | 10 +- .../realtime-gemini-live-local-vad.py | 36 ++- .../realtime/realtime-gemini-live-vertex.py | 10 +- .../realtime/realtime-gemini-live-video.py | 10 +- examples/realtime/realtime-gemini-live.py | 33 ++- examples/realtime/realtime-grok-async-tool.py | 10 +- examples/realtime/realtime-grok.py | 23 +- examples/realtime/realtime-inworld.py | 17 +- .../realtime/realtime-openai-async-tool.py | 5 +- .../realtime/realtime-openai-live-video.py | 5 +- .../realtime/realtime-openai-local-vad.py | 267 ++++++++++++++++++ examples/realtime/realtime-openai-text.py | 5 +- examples/realtime/realtime-openai.py | 25 +- .../realtime/realtime-ultravox-async-tool.py | 12 +- examples/realtime/realtime-ultravox-text.py | 26 +- examples/realtime/realtime-ultravox.py | 42 +-- .../update-settings/llm/llm-aws-nova-sonic.py | 5 +- .../update-settings/llm/llm-azure-realtime.py | 9 +- .../llm/llm-gemini-live-vertex.py | 5 +- .../update-settings/llm/llm-gemini-live.py | 5 +- .../update-settings/llm/llm-grok-realtime.py | 9 +- .../llm/llm-openai-realtime.py | 9 +- .../llm/llm-ultravox-realtime.py | 9 +- 35 files changed, 537 insertions(+), 158 deletions(-) create mode 100644 examples/realtime/realtime-openai-local-vad.py diff --git a/examples/mcp/mcp-streamable-http-gemini-live.py b/examples/mcp/mcp-streamable-http-gemini-live.py index 213667e7a..ac3cf1601 100644 --- a/examples/mcp/mcp-streamable-http-gemini-live.py +++ b/examples/mcp/mcp-streamable-http-gemini-live.py @@ -11,7 +11,6 @@ from dotenv import load_dotenv from loguru import logger from mcp.client.session_group import StreamableHttpParameters -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -19,7 +18,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -84,7 +83,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext([{"role": "user", "content": "Please introduce yourself."}]) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/persistent-context/persistent-context-aws-nova-sonic.py b/examples/persistent-context/persistent-context-aws-nova-sonic.py index 8a4b7ba47..9d7db2390 100644 --- a/examples/persistent-context/persistent-context-aws-nova-sonic.py +++ b/examples/persistent-context/persistent-context-aws-nova-sonic.py @@ -15,7 +15,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -23,7 +22,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -241,7 +240,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/persistent-context/persistent-context-grok-realtime.py b/examples/persistent-context/persistent-context-grok-realtime.py index e21dbf1c3..78cdf2da4 100644 --- a/examples/persistent-context/persistent-context-grok-realtime.py +++ b/examples/persistent-context/persistent-context-grok-realtime.py @@ -33,6 +33,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -203,7 +204,10 @@ Remember, your responses should be short - just one or two sentences usually.""" llm.register_function("load_conversation", load_conversation) context = LLMContext([{"role": "developer", "content": "Say hello!"}], tools) - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/persistent-context/persistent-context-openai-realtime.py b/examples/persistent-context/persistent-context-openai-realtime.py index 067e1f73f..f3a907db8 100644 --- a/examples/persistent-context/persistent-context-openai-realtime.py +++ b/examples/persistent-context/persistent-context-openai-realtime.py @@ -15,7 +15,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -23,7 +22,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -217,7 +216,7 @@ Remember, your responses should be short. Just one or two sentences, usually.""" context = LLMContext([{"role": "developer", "content": "Say hello!"}], tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-aws-nova-sonic-async-tool.py b/examples/realtime/realtime-aws-nova-sonic-async-tool.py index 7a79fefeb..b0ec483d9 100644 --- a/examples/realtime/realtime-aws-nova-sonic-async-tool.py +++ b/examples/realtime/realtime-aws-nova-sonic-async-tool.py @@ -24,7 +24,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -32,7 +31,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -133,7 +132,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-aws-nova-sonic.py b/examples/realtime/realtime-aws-nova-sonic.py index 66be581e8..f0a010a6e 100644 --- a/examples/realtime/realtime-aws-nova-sonic.py +++ b/examples/realtime/realtime-aws-nova-sonic.py @@ -15,7 +15,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -24,7 +23,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -148,10 +147,25 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm.register_function("get_current_weather", fetch_weather_from_api) # Set up context and context management. + # + # AWS Nova Sonic drives the conversation server-side. It does NOT emit + # UserStartedSpeakingFrame / UserStoppedSpeakingFrame, so pipeline + # processors that depend on those frames — RTVI client speech events, + # TurnTrackingObserver, AudioBufferProcessor turn recording, + # UserIdleController, user mute strategies, voicemail detector — won't + # activate with the default server-VAD-only setup. Context aggregation + # still works with realtime_service_mode. + # + # To produce these frames locally, wire a VAD analyzer (e.g. + # SileroVADAnalyzer) into LLMUserAggregatorParams. Caveat: locally- + # generated turn boundaries are a heuristic and may not match Nova + # Sonic's server-side turn decisions, which is what drives the + # conversation; the two can drift apart in subtle ways especially + # around interruptions and overlapping speech. context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) # Build the pipeline @@ -195,14 +209,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # Nova Sonic doesn't emit user-turn frames so on_user_turn_stopped + # would never fire. The *_message_added events fire when messages are + # written to context and carry the finalized content; use those for + # transcript logging. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/realtime/realtime-azure-async-tool.py b/examples/realtime/realtime-azure-async-tool.py index 46e561e0a..3c77c6d83 100644 --- a/examples/realtime/realtime-azure-async-tool.py +++ b/examples/realtime/realtime-azure-async-tool.py @@ -24,7 +24,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -32,7 +31,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -144,7 +143,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-azure.py b/examples/realtime/realtime-azure.py index c4ff80dcb..a653d14f3 100644 --- a/examples/realtime/realtime-azure.py +++ b/examples/realtime/realtime-azure.py @@ -13,7 +13,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -21,7 +20,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -174,7 +173,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-gemini-live-async-tool.py b/examples/realtime/realtime-gemini-live-async-tool.py index 23d9d8fa0..d1cfb1588 100644 --- a/examples/realtime/realtime-gemini-live-async-tool.py +++ b/examples/realtime/realtime-gemini-live-async-tool.py @@ -28,7 +28,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService @@ -125,7 +128,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext() # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-gemini-live-files-api.py b/examples/realtime/realtime-gemini-live-files-api.py index 99f35d52a..ded199038 100644 --- a/examples/realtime/realtime-gemini-live-files-api.py +++ b/examples/realtime/realtime-gemini-live-files-api.py @@ -15,7 +15,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService @@ -158,7 +161,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) # Build the pipeline pipeline = Pipeline( diff --git a/examples/realtime/realtime-gemini-live-google-search.py b/examples/realtime/realtime-gemini-live-google-search.py index 6b132cef1..be48d555c 100644 --- a/examples/realtime/realtime-gemini-live-google-search.py +++ b/examples/realtime/realtime-gemini-live-google-search.py @@ -15,7 +15,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService @@ -84,7 +87,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ], ) # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-gemini-live-graceful-end.py b/examples/realtime/realtime-gemini-live-graceful-end.py index d65414022..4499e3c1d 100644 --- a/examples/realtime/realtime-gemini-live-graceful-end.py +++ b/examples/realtime/realtime-gemini-live-graceful-end.py @@ -17,7 +17,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -148,7 +151,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): [{"role": "developer", "content": "Say hello."}], ) # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-gemini-live-grounding-metadata.py b/examples/realtime/realtime-gemini-live-grounding-metadata.py index e78d5cffa..7c771bf1b 100644 --- a/examples/realtime/realtime-gemini-live-grounding-metadata.py +++ b/examples/realtime/realtime-gemini-live-grounding-metadata.py @@ -9,7 +9,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -115,7 +118,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Set up conversation context and management context = LLMContext() # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-gemini-live-local-vad.py b/examples/realtime/realtime-gemini-live-local-vad.py index c7580f0c6..912bef792 100644 --- a/examples/realtime/realtime-gemini-live-local-vad.py +++ b/examples/realtime/realtime-gemini-live-local-vad.py @@ -4,6 +4,29 @@ # SPDX-License-Identifier: BSD 2-Clause License # +"""Gemini Live with locally-driven turn detection. + +By default Gemini Live drives the conversation with its own server-side VAD +(see `realtime-gemini-live.py`). That setup doesn't surface +``UserStartedSpeakingFrame`` / ``UserStoppedSpeakingFrame``, so pipeline +processors that depend on those frames (RTVI client speech events, +``TurnTrackingObserver``, ``AudioBufferProcessor`` turn recording, +``UserIdleController``, user mute strategies, voicemail detector) don't +activate. + +This variant disables Gemini Live's server-side VAD +(``GeminiVADParams(disabled=True)``) and instead drives turn boundaries +locally with ``SileroVADAnalyzer`` wired into the user aggregator. Use this +variant if you need those downstream processors, or if you want a turn +analyzer like ``LocalSmartTurnV3`` to decide when the user is done speaking. + +Caveat: locally-generated turn boundaries are a heuristic and may not match +the provider's actual server-side turn decisions, which is what really +drives the conversation. The two can drift apart in subtle, hard-to-debug +ways, especially around interruptions and overlapping speech. Prefer +server-emitted turn frames (i.e. the base `realtime-gemini-live.py` example) +unless you have a specific reason to drive turn detection locally. +""" import os @@ -20,6 +43,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, LLMUserAggregatorParams, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -72,6 +96,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, + realtime_service_mode=RealtimeServiceModeConfig(), user_params=LLMUserAggregatorParams( vad_analyzer=SileroVADAnalyzer(), ), @@ -107,14 +132,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # The *_message_added events fire when messages are written to context + # and carry the finalized content. In realtime mode the turn-stopped + # events fire before the message text is finalized. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/realtime/realtime-gemini-live-vertex.py b/examples/realtime/realtime-gemini-live-vertex.py index 2c0861006..1120dc53b 100644 --- a/examples/realtime/realtime-gemini-live-vertex.py +++ b/examples/realtime/realtime-gemini-live-vertex.py @@ -18,7 +18,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.google.gemini_live.vertex.llm import GeminiLiveVertexLLMService @@ -124,7 +127,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext([{"role": "developer", "content": "Say hello."}]) # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-gemini-live-video.py b/examples/realtime/realtime-gemini-live-video.py index 4319b534a..4a6ca5f01 100644 --- a/examples/realtime/realtime-gemini-live-video.py +++ b/examples/realtime/realtime-gemini-live-video.py @@ -16,7 +16,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -64,7 +67,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ], ) # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-gemini-live.py b/examples/realtime/realtime-gemini-live.py index 6751a16a7..426f12238 100644 --- a/examples/realtime/realtime-gemini-live.py +++ b/examples/realtime/realtime-gemini-live.py @@ -21,6 +21,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -130,8 +131,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) context = LLMContext() - # Server-side VAD is enabled by default; no local VAD is added. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + # Gemini Live drives the conversation server-side. It does NOT emit + # UserStartedSpeakingFrame / UserStoppedSpeakingFrame, so pipeline + # processors that depend on those frames — RTVI client speech events, + # TurnTrackingObserver, AudioBufferProcessor turn recording, + # UserIdleController, user mute strategies, voicemail detector — won't + # activate with the default server-VAD-only setup. Context aggregation + # still works with realtime_service_mode. + # + # To produce these frames locally, see `realtime-gemini-live-local-vad.py`. + # Caveat: locally-generated turn boundaries are a heuristic and may not + # match Gemini Live's server-side turn decisions, which is what drives the + # conversation; the two can drift apart in subtle ways especially around + # interruptions and overlapping speech. + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ @@ -166,14 +182,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # Gemini Live doesn't emit user-turn frames so on_user_turn_stopped + # would never fire. The *_message_added events fire when messages are + # written to context and carry the finalized content; use those for + # transcript logging regardless of whether the service emits turn + # frames. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/realtime/realtime-grok-async-tool.py b/examples/realtime/realtime-grok-async-tool.py index c54668fbb..8ff11c643 100644 --- a/examples/realtime/realtime-grok-async-tool.py +++ b/examples/realtime/realtime-grok-async-tool.py @@ -29,7 +29,10 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + RealtimeServiceModeConfig, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.llm_service import FunctionCallParams @@ -129,7 +132,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) context = LLMContext(tools=tools) - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) pipeline = Pipeline( [ diff --git a/examples/realtime/realtime-grok.py b/examples/realtime/realtime-grok.py index 1e122865a..347ba6b80 100644 --- a/examples/realtime/realtime-grok.py +++ b/examples/realtime/realtime-grok.py @@ -33,9 +33,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema - -# Note: Grok has built-in server-side VAD, so we don't need local VAD -# from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.observers.loggers.transcription_log_observer import ( TranscriptionLogObserver, @@ -47,6 +44,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -212,7 +210,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): tools, ) - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) # Build the pipeline # Note: In realtime mode, transcription comes from Grok (upstream), @@ -248,15 +249,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await task.cancel() - # Log transcript updates - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # Log transcript updates. In realtime mode the turn-stopped events + # fire before the message text is finalized (UserTurnStoppedMessage + # content is None), so subscribe to the *_message_added events + # instead — they fire when the message is written to context and + # carry the finalized content. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/realtime/realtime-inworld.py b/examples/realtime/realtime-inworld.py index fdc1712ce..4f5083fc0 100644 --- a/examples/realtime/realtime-inworld.py +++ b/examples/realtime/realtime-inworld.py @@ -47,6 +47,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -149,7 +150,10 @@ Always be helpful and proactive in offering assistance.""", tools, ) - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + realtime_service_mode=RealtimeServiceModeConfig(), + ) # Build the pipeline pipeline = Pipeline( @@ -182,13 +186,16 @@ Always be helpful and proactive in offering assistance.""", logger.info("Client disconnected") await task.cancel() - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # In realtime mode the turn-stopped events fire before the message + # text is finalized; subscribe to the *_message_added events for the + # finalized content. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" logger.info(f"Transcript: {timestamp}user: {message.content}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" logger.info(f"Transcript: {timestamp}assistant: {message.content}") diff --git a/examples/realtime/realtime-openai-async-tool.py b/examples/realtime/realtime-openai-async-tool.py index 49c23e038..084df6674 100644 --- a/examples/realtime/realtime-openai-async-tool.py +++ b/examples/realtime/realtime-openai-async-tool.py @@ -24,7 +24,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -32,7 +31,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -147,7 +146,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-openai-live-video.py b/examples/realtime/realtime-openai-live-video.py index b35aa2648..9dce17a7d 100644 --- a/examples/realtime/realtime-openai-live-video.py +++ b/examples/realtime/realtime-openai-live-video.py @@ -10,7 +10,6 @@ import os from dotenv import load_dotenv from loguru import logger -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver from pipecat.pipeline.pipeline import Pipeline @@ -19,7 +18,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( @@ -106,7 +105,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-openai-local-vad.py b/examples/realtime/realtime-openai-local-vad.py new file mode 100644 index 000000000..9d0505a0a --- /dev/null +++ b/examples/realtime/realtime-openai-local-vad.py @@ -0,0 +1,267 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""OpenAI Realtime with locally-driven turn detection. + +By default OpenAI Realtime drives the conversation with its own server-side +VAD (see `realtime-openai.py`). This variant disables server-side turn +detection (``turn_detection=False``) and instead drives turn boundaries +locally with ``SileroVADAnalyzer`` wired into the user aggregator. This is +the path to take if you want a turn analyzer like ``LocalSmartTurnV3`` to +decide when the user is done speaking, or if you need ``UserStartedSpeakingFrame`` +/ ``UserStoppedSpeakingFrame`` to fire from the same source as +``InterruptionFrame``. + +Caveat: locally-generated turn boundaries are a heuristic and may not match +the provider's actual server-side turn decisions. With OpenAI Realtime, +server-side turn detection is generally what the service expects to drive +the conversation, and disabling it puts the responsibility on you. Prefer +server-emitted turn frames (i.e. the base `realtime-openai.py` example) +unless you have a specific reason to drive turn detection locally. +""" + +import asyncio +import os +from datetime import datetime + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame +from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + LLMUserAggregatorParams, + RealtimeServiceModeConfig, + UserTurnStoppedMessage, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.realtime.events import ( + AudioConfiguration, + AudioInput, + InputAudioNoiseReduction, + InputAudioTranscription, + SessionProperties, +) +from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +async def get_news(params: FunctionCallParams): + await params.result_callback( + { + "news": [ + "Massive UFO currently hovering above New York City", + "Stock markets reach all-time highs", + "Living dinosaur species discovered in the Amazon rainforest", + ], + } + ) + + +async def fetch_restaurant_recommendation(params: FunctionCallParams): + await params.result_callback({"name": "The Golden Dragon"}) + + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + required=["location", "format"], +) + +get_news_function = FunctionSchema( + name="get_news", + description="Get the current news.", + properties={}, + required=[], +) + +restaurant_function = FunctionSchema( + name="get_restaurant_recommendation", + description="Get a restaurant recommendation", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + }, + required=["location"], +) + +tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) + + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + llm = OpenAIRealtimeLLMService( + api_key=os.environ["OPENAI_API_KEY"], + settings=OpenAIRealtimeLLMService.Settings( + system_instruction="""You are a helpful and friendly AI. + +Act like a human, but remember that you aren't a human and that you can't do human +things in the real world. Your voice and personality should be warm and engaging, with a lively and +playful tone. + +If interacting in a non-English language, start by using the standard accent or dialect familiar to +the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, +even if you're asked about them. + +You are participating in a voice conversation. Keep your responses concise, short, and to the point +unless specifically asked to elaborate on a topic. + +Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""", + session_properties=SessionProperties( + audio=AudioConfiguration( + input=AudioInput( + transcription=InputAudioTranscription(), + # Disable OpenAI's server-side turn detection — this + # example drives turn boundaries locally via the + # SileroVADAnalyzer wired into the user aggregator + # below. + turn_detection=False, + noise_reduction=InputAudioNoiseReduction(type="near_field"), + ) + ), + ), + ), + ) + + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) + llm.register_function("get_news", get_news) + + context = LLMContext( + [{"role": "developer", "content": "Say hello!"}], + tools, + ) + + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + # Drive turn detection locally via SileroVAD wired into the user + # aggregator. realtime_service_mode keeps context-write semantics + # correct and (by default) drops the transcript wait on turn-end so + # local VAD can drive turn boundaries on the latency critical path. + realtime_service_mode=RealtimeServiceModeConfig(), + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + user_aggregator, + llm, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + observers=[TranscriptionLogObserver()], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + await task.queue_frames([LLMRunFrame()]) + + await asyncio.sleep(15) + new_tools = ToolsSchema( + standard_tools=[weather_function, restaurant_function, get_news_function] + ) + await task.queue_frames([LLMSetToolsFrame(tools=new_tools)]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}user: {message.content}" + logger.info(f"Transcript: {line}") + + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}assistant: {message.content}" + logger.info(f"Transcript: {line}") + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/realtime/realtime-openai-text.py b/examples/realtime/realtime-openai-text.py index f36a3c773..75d47ae52 100644 --- a/examples/realtime/realtime-openai-text.py +++ b/examples/realtime/realtime-openai-text.py @@ -13,7 +13,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -21,7 +20,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -177,7 +176,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-openai.py b/examples/realtime/realtime-openai.py index 0ec913add..fdce67671 100644 --- a/examples/realtime/realtime-openai.py +++ b/examples/realtime/realtime-openai.py @@ -14,7 +14,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver from pipecat.pipeline.pipeline import Pipeline @@ -24,7 +23,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -187,7 +186,13 @@ Remember, your responses should be short. Just one or two sentences, usually. Re user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + # OpenAI Realtime drives the conversation server-side and emits its + # own UserStarted/StoppedSpeakingFrame from server VAD events, so + # local VAD on the aggregator is unnecessary. realtime_service_mode + # decouples context writes from turn frames and transcript-bound + # turn-end. See `realtime-openai-local-vad.py` for the variant + # that disables server VAD and drives turn detection locally. + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( @@ -251,15 +256,19 @@ Remember, your responses should be short. Just one or two sentences, usually. Re logger.info(f"Client disconnected") await task.cancel() - # Log transcript updates - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # Log transcript updates. In realtime mode the turn-stopped events + # fire before the message text is finalized (UserTurnStoppedMessage + # content is None), so subscribe to the *_message_added events + # instead — they fire when the message is written to context and + # carry the finalized content. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/realtime/realtime-ultravox-async-tool.py b/examples/realtime/realtime-ultravox-async-tool.py index 449844d5a..0a78b4dd3 100644 --- a/examples/realtime/realtime-ultravox-async-tool.py +++ b/examples/realtime/realtime-ultravox-async-tool.py @@ -26,14 +26,13 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -42,8 +41,6 @@ from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLL from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams -from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import UserTurnStrategies load_dotenv(override=True) @@ -134,12 +131,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext([]) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams( - user_turn_strategies=UserTurnStrategies( - stop=[SpeechTimeoutUserTurnStopStrategy()], - ), - vad_analyzer=SileroVADAnalyzer(), - ), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/realtime/realtime-ultravox-text.py b/examples/realtime/realtime-ultravox-text.py index a144af919..e4d0cfba8 100644 --- a/examples/realtime/realtime-ultravox-text.py +++ b/examples/realtime/realtime-ultravox-text.py @@ -12,8 +12,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -21,7 +19,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -32,8 +30,6 @@ from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLL from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams -from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import UserTurnStrategies # Load environment variables load_dotenv(override=True) @@ -188,17 +184,9 @@ There is also a secret menu that changes daily. If the user asks about it, use t context = LLMContext([]) - # Necessary to complete the function call lifecycle in Pipecat and - # to produce user and assistant turn stopped events. user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams( - user_turn_strategies=UserTurnStrategies( - stop=[SpeechTimeoutUserTurnStopStrategy()], - ), - # Set the VAD analyzer to emulate timing of the model. - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), - ), + realtime_service_mode=RealtimeServiceModeConfig(), ) # Build the pipeline @@ -234,14 +222,16 @@ There is also a secret menu that changes daily. If the user asks about it, use t logger.info(f"Client disconnected") await task.cancel() - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # Ultravox doesn't emit user-turn frames; subscribe to the + # *_message_added events for the finalized message text. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/realtime/realtime-ultravox.py b/examples/realtime/realtime-ultravox.py index ed46aba2f..95760d525 100644 --- a/examples/realtime/realtime-ultravox.py +++ b/examples/realtime/realtime-ultravox.py @@ -12,7 +12,6 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -20,7 +19,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, UserTurnStoppedMessage, ) from pipecat.runner.types import RunnerArguments @@ -30,8 +29,6 @@ from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLL from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams -from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import UserTurnStrategies # Load environment variables load_dotenv(override=True) @@ -178,18 +175,23 @@ There is also a secret menu that changes daily. If the user asks about it, use t context = LLMContext([]) - # Necessary to complete the function call lifecycle in Pipecat and - # to produce user and assistant turn stopped events. + # Ultravox drives the conversation server-side. It does NOT emit + # UserStartedSpeakingFrame / UserStoppedSpeakingFrame, so pipeline + # processors that depend on those frames — RTVI client speech events, + # TurnTrackingObserver, AudioBufferProcessor turn recording, + # UserIdleController, user mute strategies, voicemail detector — won't + # activate with this default setup. Context aggregation still works + # with realtime_service_mode. + # + # To produce these frames locally, wire a VAD analyzer (e.g. + # SileroVADAnalyzer) into LLMUserAggregatorParams. Caveat: locally- + # generated turn boundaries are a heuristic and may not match + # Ultravox's server-side turn decisions, which is what drives the + # conversation; the two can drift apart in subtle ways especially + # around interruptions and overlapping speech. user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams( - user_turn_strategies=UserTurnStrategies( - stop=[SpeechTimeoutUserTurnStopStrategy()], - ), - # Set the VAD analyzer to create reliable TTFB measurements and - # user stop events. - vad_analyzer=SileroVADAnalyzer(), - ), + realtime_service_mode=RealtimeServiceModeConfig(), ) # Build the pipeline @@ -224,14 +226,18 @@ There is also a secret menu that changes daily. If the user asks about it, use t logger.info(f"Client disconnected") await task.cancel() - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + # Ultravox doesn't emit user-turn frames so on_user_turn_stopped + # would never fire. The *_message_added events fire when messages are + # written to context and carry the finalized content; use those for + # transcript logging. + @user_aggregator.event_handler("on_user_message_added") + async def on_user_message_added(aggregator, message: UserTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}user: {message.content}" logger.info(f"Transcript: {line}") - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/update-settings/llm/llm-aws-nova-sonic.py b/examples/update-settings/llm/llm-aws-nova-sonic.py index 19ca3c805..e91bfb8a5 100644 --- a/examples/update-settings/llm/llm-aws-nova-sonic.py +++ b/examples/update-settings/llm/llm-aws-nova-sonic.py @@ -10,7 +10,6 @@ import os from dotenv import load_dotenv from loguru import logger -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -18,7 +17,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -60,7 +59,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext() user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/update-settings/llm/llm-azure-realtime.py b/examples/update-settings/llm/llm-azure-realtime.py index 3d724bd53..db29848bd 100644 --- a/examples/update-settings/llm/llm-azure-realtime.py +++ b/examples/update-settings/llm/llm-azure-realtime.py @@ -11,7 +11,6 @@ from dotenv import load_dotenv from loguru import logger from pipecat.adapters.base_llm_adapter import LLMContextMessage -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -20,7 +19,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -66,7 +65,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( @@ -88,8 +87,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/update-settings/llm/llm-gemini-live-vertex.py b/examples/update-settings/llm/llm-gemini-live-vertex.py index 3992be309..4ab8124d2 100644 --- a/examples/update-settings/llm/llm-gemini-live-vertex.py +++ b/examples/update-settings/llm/llm-gemini-live-vertex.py @@ -10,7 +10,6 @@ import os from dotenv import load_dotenv from loguru import logger -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -18,7 +17,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -60,7 +59,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext() user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/update-settings/llm/llm-gemini-live.py b/examples/update-settings/llm/llm-gemini-live.py index 0c066648e..1bbb82cd9 100644 --- a/examples/update-settings/llm/llm-gemini-live.py +++ b/examples/update-settings/llm/llm-gemini-live.py @@ -10,7 +10,6 @@ import os from dotenv import load_dotenv from loguru import logger -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -18,7 +17,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -58,7 +57,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext() user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( diff --git a/examples/update-settings/llm/llm-grok-realtime.py b/examples/update-settings/llm/llm-grok-realtime.py index 4ab28cdda..5c9850cd8 100644 --- a/examples/update-settings/llm/llm-grok-realtime.py +++ b/examples/update-settings/llm/llm-grok-realtime.py @@ -11,7 +11,6 @@ from dotenv import load_dotenv from loguru import logger from pipecat.adapters.base_llm_adapter import LLMContextMessage -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -20,7 +19,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -63,7 +62,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( @@ -85,8 +84,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/update-settings/llm/llm-openai-realtime.py b/examples/update-settings/llm/llm-openai-realtime.py index 120d8a86e..905a6cb0a 100644 --- a/examples/update-settings/llm/llm-openai-realtime.py +++ b/examples/update-settings/llm/llm-openai-realtime.py @@ -11,7 +11,6 @@ from dotenv import load_dotenv from loguru import logger from pipecat.adapters.base_llm_adapter import LLMContextMessage -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -20,7 +19,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -63,7 +62,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( @@ -85,8 +84,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}") diff --git a/examples/update-settings/llm/llm-ultravox-realtime.py b/examples/update-settings/llm/llm-ultravox-realtime.py index 4c530735e..8deae4fe1 100644 --- a/examples/update-settings/llm/llm-ultravox-realtime.py +++ b/examples/update-settings/llm/llm-ultravox-realtime.py @@ -13,7 +13,6 @@ from loguru import logger from pipecat.adapters.base_llm_adapter import LLMContextMessage from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -22,7 +21,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( AssistantTurnStoppedMessage, LLMContextAggregatorPair, - LLMUserAggregatorParams, + RealtimeServiceModeConfig, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport @@ -74,7 +73,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + realtime_service_mode=RealtimeServiceModeConfig(), ) pipeline = Pipeline( @@ -96,8 +95,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + @assistant_aggregator.event_handler("on_assistant_message_added") + async def on_assistant_message_added(aggregator, message: AssistantTurnStoppedMessage): timestamp = f"[{message.timestamp}] " if message.timestamp else "" line = f"{timestamp}assistant: {message.content}" logger.info(f"Transcript: {line}")