From aa2589d3beb7bfca90b16d5b2a71c735ef515aaf Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 13 Jan 2026 15:13:05 -0500 Subject: [PATCH] Update examples to use transcription events from context aggregators --- examples/foundational/19-openai-realtime.py | 40 +++++++++++-------- .../19b-openai-realtime-beta-text.py | 16 +------- .../foundational/19b-openai-realtime-text.py | 16 +------- examples/foundational/40-aws-nova-sonic.py | 25 ++++++++++-- .../foundational/42-interruption-config.py | 13 ++---- examples/foundational/51-grok-realtime.py | 35 ++++++++-------- 6 files changed, 69 insertions(+), 76 deletions(-) diff --git a/examples/foundational/19-openai-realtime.py b/examples/foundational/19-openai-realtime.py index cea164543..2b3750d0b 100644 --- a/examples/foundational/19-openai-realtime.py +++ b/examples/foundational/19-openai-realtime.py @@ -15,14 +15,17 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame, TranscriptionMessage +from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair -from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + UserTurnStoppedMessage, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.llm_service import FunctionCallParams @@ -177,8 +180,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) llm.register_function("get_news", get_news) - transcript = TranscriptProcessor() - # Create a standard OpenAI LLM context object using the normal messages format. The # OpenAIRealtimeLLMService will convert this internally to messages that the # openai WebSocket API can understand. @@ -189,15 +190,16 @@ Remember, your responses should be short. Just one or two sentences, usually. Re context_aggregator = LLMContextAggregatorPair(context) + user_aggregator = context_aggregator.user() + assistant_aggregator = context_aggregator.assistant() + pipeline = Pipeline( [ transport.input(), # Transport user input - context_aggregator.user(), - transcript.user(), # LLM pushes TranscriptionFrames upstream + user_aggregator, llm, # LLM transport.output(), # Transport bot output - transcript.assistant(), # After the transcript output, to time with the audio output - context_aggregator.assistant(), + assistant_aggregator, ] ) @@ -238,14 +240,18 @@ Remember, your responses should be short. Just one or two sentences, usually. Re logger.info(f"Client disconnected") await task.cancel() - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - for msg in frame.messages: - if isinstance(msg, TranscriptionMessage): - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - line = f"{timestamp}{msg.role}: {msg.content}" - logger.info(f"Transcript: {line}") + # Log transcript updates + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}user: {message.content}" + logger.info(f"Transcript: {line}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}assistant: {message.content}" + logger.info(f"Transcript: {line}") runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) diff --git a/examples/foundational/19b-openai-realtime-beta-text.py b/examples/foundational/19b-openai-realtime-beta-text.py index 0c66385d2..4ddd068a6 100644 --- a/examples/foundational/19b-openai-realtime-beta-text.py +++ b/examples/foundational/19b-openai-realtime-beta-text.py @@ -14,12 +14,11 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService @@ -157,8 +156,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re llm.register_function("get_current_weather", fetch_weather_from_api) llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) - transcript = TranscriptProcessor() - # Create a standard OpenAI LLM context object using the normal messages format. The # OpenAIRealtimeBetaLLMService will convert this internally to messages that the # openai WebSocket API can understand. @@ -175,9 +172,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re context_aggregator.user(), llm, # LLM tts, # TTS - transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream transport.output(), # Transport bot output - transcript.assistant(), # After the transcript output, to time with the audio output context_aggregator.assistant(), ] ) @@ -202,15 +197,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re logger.info(f"Client disconnected") await task.cancel() - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - for msg in frame.messages: - if isinstance(msg, TranscriptionMessage): - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - line = f"{timestamp}{msg.role}: {msg.content}" - logger.info(f"Transcript: {line}") - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/examples/foundational/19b-openai-realtime-text.py b/examples/foundational/19b-openai-realtime-text.py index 927e5f5c1..ec254f186 100644 --- a/examples/foundational/19b-openai-realtime-text.py +++ b/examples/foundational/19b-openai-realtime-text.py @@ -14,13 +14,12 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair -from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService @@ -164,8 +163,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re llm.register_function("get_current_weather", fetch_weather_from_api) llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) - transcript = TranscriptProcessor() - # Create a standard OpenAI LLM context object using the normal messages format. The # OpenAIRealtimeLLMService will convert this internally to messages that the # openai WebSocket API can understand. @@ -180,11 +177,9 @@ Remember, your responses should be short. Just one or two sentences, usually. Re [ transport.input(), # Transport user input context_aggregator.user(), - transcript.user(), # LLM pushes TranscriptionFrames upstream llm, # LLM tts, # TTS transport.output(), # Transport bot output - transcript.assistant(), # After the transcript output, to time with the audio output context_aggregator.assistant(), ] ) @@ -209,15 +204,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re logger.info(f"Client disconnected") await task.cancel() - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - for msg in frame.messages: - if isinstance(msg, TranscriptionMessage): - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - line = f"{timestamp}{msg.role}: {msg.content}" - logger.info(f"Transcript: {line}") - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/examples/foundational/40-aws-nova-sonic.py b/examples/foundational/40-aws-nova-sonic.py index 253b0870a..42eac74a8 100644 --- a/examples/foundational/40-aws-nova-sonic.py +++ b/examples/foundational/40-aws-nova-sonic.py @@ -21,7 +21,11 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + UserTurnStoppedMessage, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService @@ -154,14 +158,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) context_aggregator = LLMContextAggregatorPair(context) + user_aggregator = context_aggregator.user() + assistant_aggregator = context_aggregator.assistant() + # Build the pipeline pipeline = Pipeline( [ transport.input(), - context_aggregator.user(), + user_aggregator, llm, transport.output(), - context_aggregator.assistant(), + assistant_aggregator, ] ) @@ -192,6 +199,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}user: {message.content}" + logger.info(f"Transcript: {line}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}assistant: {message.content}" + logger.info(f"Transcript: {line}") + # Run the pipeline runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/examples/foundational/42-interruption-config.py b/examples/foundational/42-interruption-config.py index d2c95eecb..46937abfa 100644 --- a/examples/foundational/42-interruption-config.py +++ b/examples/foundational/42-interruption-config.py @@ -13,6 +13,7 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import LLMRunFrame +from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -21,7 +22,6 @@ from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService @@ -36,6 +36,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies load_dotenv(override=True) + # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets # selected. @@ -70,8 +71,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - transcript = TranscriptProcessor() - messages = [ { "role": "system", @@ -94,7 +93,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): [ transport.input(), # Transport user input stt, - transcript.user(), # User transcripts context_aggregator.user(), # User responses llm, # LLM tts, # TTS @@ -110,6 +108,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + observers=[TranscriptionLogObserver()], ) @transport.event_handler("on_client_connected") @@ -124,12 +123,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - for message in frame.messages: - logger.info(f"Transcription [{message.role}]: {message.content}") - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/examples/foundational/51-grok-realtime.py b/examples/foundational/51-grok-realtime.py index f56b2cb8e..ee795270a 100644 --- a/examples/foundational/51-grok-realtime.py +++ b/examples/foundational/51-grok-realtime.py @@ -36,7 +36,7 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema # Note: Grok has built-in server-side VAD, so we don't need local VAD # from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage +from pipecat.frames.frames import LLMRunFrame from pipecat.observers.loggers.transcription_log_observer import ( TranscriptionLogObserver, ) @@ -45,9 +45,10 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, LLMContextAggregatorPair, + UserTurnStoppedMessage, ) -from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.grok.realtime.events import ( @@ -208,9 +209,6 @@ Always be helpful and proactive in offering assistance.""", llm.register_function("get_current_time", get_current_time) llm.register_function("get_restaurant_recommendation", get_restaurant_recommendation) - # Create transcript processor for logging - transcript = TranscriptProcessor() - # Create context with initial message and tools context = LLMContext( [{"role": "user", "content": "Say hello and introduce yourself!"}], @@ -219,18 +217,19 @@ Always be helpful and proactive in offering assistance.""", context_aggregator = LLMContextAggregatorPair(context) + user_aggregator = context_aggregator.user() + assistant_aggregator = context_aggregator.assistant() + # Build the pipeline # Note: In realtime mode, transcription comes from Grok (upstream), # so transcript.user() goes BEFORE llm pipeline = Pipeline( [ transport.input(), # Transport user input (audio) - context_aggregator.user(), - transcript.user(), # Transcription from Grok goes upstream + user_aggregator, llm, # Grok Realtime LLM (handles STT + LLM + TTS) transport.output(), # Transport bot output (audio) - transcript.assistant(), # Log assistant speech - context_aggregator.assistant(), + assistant_aggregator, ] ) @@ -256,13 +255,17 @@ Always be helpful and proactive in offering assistance.""", await task.cancel() # Log transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - for msg in frame.messages: - if isinstance(msg, TranscriptionMessage): - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - line = f"{timestamp}{msg.role}: {msg.content}" - logger.info(f"Transcript: {line}") + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}user: {message.content}" + logger.info(f"Transcript: {line}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}assistant: {message.content}" + logger.info(f"Transcript: {line}") runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)