Merge pull request #2927 from pipecat-ai/marcus/2025-10-28_sample_rtvi_fix

Add RTVIProcessor to foundational example 38b
This commit is contained in:
Mark Backman
2025-11-06 10:19:10 -05:00
committed by GitHub

View File

@@ -20,6 +20,7 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -78,9 +79,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi,
stt,
context_aggregator.user(), # User responses
llm, # LLM
@@ -96,15 +100,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):