Update examples to use transcription events from context aggregators

This commit is contained in:
Mark Backman
2026-01-13 15:13:05 -05:00
parent d0f227189c
commit aa2589d3be
6 changed files with 69 additions and 76 deletions

View File

@@ -15,14 +15,17 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame, TranscriptionMessage
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
@@ -177,8 +180,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
llm.register_function("get_news", get_news)
transcript = TranscriptProcessor()
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
@@ -189,15 +190,16 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
context_aggregator = LLMContextAggregatorPair(context)
user_aggregator = context_aggregator.user()
assistant_aggregator = context_aggregator.assistant()
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
transcript.user(), # LLM pushes TranscriptionFrames upstream
user_aggregator,
llm, # LLM
transport.output(), # Transport bot output
transcript.assistant(), # After the transcript output, to time with the audio output
context_aggregator.assistant(),
assistant_aggregator,
]
)
@@ -238,14 +240,18 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
logger.info(f"Client disconnected")
await task.cancel()
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, TranscriptionMessage):
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
line = f"{timestamp}{msg.role}: {msg.content}"
logger.info(f"Transcript: {line}")
# Log transcript updates
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

View File

@@ -14,12 +14,11 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -157,8 +156,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
transcript = TranscriptProcessor()
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
@@ -175,9 +172,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
context_aggregator.user(),
llm, # LLM
tts, # TTS
transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream
transport.output(), # Transport bot output
transcript.assistant(), # After the transcript output, to time with the audio output
context_aggregator.assistant(),
]
)
@@ -202,15 +197,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
logger.info(f"Client disconnected")
await task.cancel()
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, TranscriptionMessage):
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
line = f"{timestamp}{msg.role}: {msg.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -14,13 +14,12 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -164,8 +163,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
transcript = TranscriptProcessor()
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
@@ -180,11 +177,9 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
[
transport.input(), # Transport user input
context_aggregator.user(),
transcript.user(), # LLM pushes TranscriptionFrames upstream
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
transcript.assistant(), # After the transcript output, to time with the audio output
context_aggregator.assistant(),
]
)
@@ -209,15 +204,6 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
logger.info(f"Client disconnected")
await task.cancel()
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, TranscriptionMessage):
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
line = f"{timestamp}{msg.role}: {msg.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -21,7 +21,11 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
@@ -154,14 +158,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
context_aggregator = LLMContextAggregatorPair(context)
user_aggregator = context_aggregator.user()
assistant_aggregator = context_aggregator.assistant()
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
user_aggregator,
llm,
transport.output(),
context_aggregator.assistant(),
assistant_aggregator,
]
)
@@ -192,6 +199,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -13,6 +13,7 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -21,7 +22,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -36,6 +36,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -70,8 +71,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
transcript = TranscriptProcessor()
messages = [
{
"role": "system",
@@ -94,7 +93,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
[
transport.input(), # Transport user input
stt,
transcript.user(), # User transcripts
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
@@ -110,6 +108,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[TranscriptionLogObserver()],
)
@transport.event_handler("on_client_connected")
@@ -124,12 +123,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
for message in frame.messages:
logger.info(f"Transcription [{message.role}]: {message.content}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -36,7 +36,7 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema
# Note: Grok has built-in server-side VAD, so we don't need local VAD
# from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.transcription_log_observer import (
TranscriptionLogObserver,
)
@@ -45,9 +45,10 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
UserTurnStoppedMessage,
)
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.grok.realtime.events import (
@@ -208,9 +209,6 @@ Always be helpful and proactive in offering assistance.""",
llm.register_function("get_current_time", get_current_time)
llm.register_function("get_restaurant_recommendation", get_restaurant_recommendation)
# Create transcript processor for logging
transcript = TranscriptProcessor()
# Create context with initial message and tools
context = LLMContext(
[{"role": "user", "content": "Say hello and introduce yourself!"}],
@@ -219,18 +217,19 @@ Always be helpful and proactive in offering assistance.""",
context_aggregator = LLMContextAggregatorPair(context)
user_aggregator = context_aggregator.user()
assistant_aggregator = context_aggregator.assistant()
# Build the pipeline
# Note: In realtime mode, transcription comes from Grok (upstream),
# so transcript.user() goes BEFORE llm
pipeline = Pipeline(
[
transport.input(), # Transport user input (audio)
context_aggregator.user(),
transcript.user(), # Transcription from Grok goes upstream
user_aggregator,
llm, # Grok Realtime LLM (handles STT + LLM + TTS)
transport.output(), # Transport bot output (audio)
transcript.assistant(), # Log assistant speech
context_aggregator.assistant(),
assistant_aggregator,
]
)
@@ -256,13 +255,17 @@ Always be helpful and proactive in offering assistance.""",
await task.cancel()
# Log transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, TranscriptionMessage):
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
line = f"{timestamp}{msg.role}: {msg.content}"
logger.info(f"Transcript: {line}")
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)