Thinking, sometimes called "extended thinking" or "reasoning", is an LLM process where the model takes some additional time before giving an answer. It's useful for complex tasks that may require some level of planning and structured, step-by-step reasoning. The model can output its thoughts (or thought summaries, depending on the model) in addition to the answer. The thoughts are usually pretty granular and not really suitable for being spoken out loud in a conversation, but can be useful for logging or prompt debugging. Here's what's added: 1. New typed input parameters for Google and Anthropic LLMs that control the models' thinking behavior (like how much thinking to do, and whether to output thoughts or thought summaries). 2. New frames for representing thoughts output by LLMs. 3. A generic mechanism for associating extra LLM-specific data with a function call in context, used specifically to support Google's function-call-related "thought signatures", which are necessary to ensure thinking continuity between function calls in a chain (where the model thinks, makes a function call, thinks some more, etc.) 4. A generic mechanism for recording LLM thoughts to context, used specifically to support Anthropic, whose thought signatures are expected to appear alongside the text of the thoughts within assistant context messages. 5. An expansion of `TranscriptProcessor` to process LLM thoughts in addition to user and assistant utterances.
142 lines
5.0 KiB
Python
142 lines
5.0 KiB
Python
#
|
||
# Copyright (c) 2024–2025, Daily
|
||
#
|
||
# SPDX-License-Identifier: BSD 2-Clause License
|
||
#
|
||
|
||
|
||
import os
|
||
|
||
from dotenv import load_dotenv
|
||
from loguru import logger
|
||
|
||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||
from pipecat.frames.frames import LLMRunFrame
|
||
from pipecat.pipeline.pipeline import Pipeline
|
||
from pipecat.pipeline.runner import PipelineRunner
|
||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||
from pipecat.runner.types import RunnerArguments
|
||
from pipecat.runner.utils import create_transport
|
||
from pipecat.services.google.llm import GoogleLLMService
|
||
from pipecat.services.google.stt import GoogleSTTService
|
||
from pipecat.services.google.tts import GoogleHttpTTSService, GoogleTTSService
|
||
from pipecat.transcriptions.language import Language
|
||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||
from pipecat.transports.daily.transport import DailyParams
|
||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||
|
||
load_dotenv(override=True)
|
||
|
||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||
# instantiated. The function will be called when the desired transport gets
|
||
# selected.
|
||
transport_params = {
|
||
"daily": lambda: DailyParams(
|
||
audio_in_enabled=True,
|
||
audio_out_enabled=True,
|
||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||
),
|
||
"twilio": lambda: FastAPIWebsocketParams(
|
||
audio_in_enabled=True,
|
||
audio_out_enabled=True,
|
||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||
),
|
||
"webrtc": lambda: TransportParams(
|
||
audio_in_enabled=True,
|
||
audio_out_enabled=True,
|
||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||
),
|
||
}
|
||
|
||
|
||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||
logger.info(f"Starting bot")
|
||
|
||
stt = GoogleSTTService(
|
||
params=GoogleSTTService.InputParams(languages=Language.EN_US, model="chirp_3"),
|
||
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
|
||
location="us",
|
||
)
|
||
|
||
tts = GoogleHttpTTSService(
|
||
voice_id="en-US-Chirp3-HD-Charon",
|
||
params=GoogleHttpTTSService.InputParams(language=Language.EN_US),
|
||
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
|
||
)
|
||
|
||
llm = GoogleLLMService(
|
||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||
model="gemini-2.5-flash",
|
||
# force a certain amount of thinking if you want it
|
||
# params=GoogleLLMService.InputParams(
|
||
# thinking=GoogleLLMService.ThinkingConfig(thinking_budget=4096)
|
||
# ),
|
||
)
|
||
|
||
messages = [
|
||
{
|
||
"role": "system",
|
||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
|
||
},
|
||
]
|
||
|
||
context = LLMContext(messages)
|
||
context_aggregator = LLMContextAggregatorPair(context)
|
||
|
||
pipeline = Pipeline(
|
||
[
|
||
transport.input(), # Transport user input
|
||
stt, # STT
|
||
context_aggregator.user(), # User respones
|
||
llm, # LLM
|
||
tts, # TTS
|
||
transport.output(), # Transport bot output
|
||
context_aggregator.assistant(), # Assistant spoken responses
|
||
]
|
||
)
|
||
|
||
task = PipelineTask(
|
||
pipeline,
|
||
params=PipelineParams(
|
||
enable_metrics=True,
|
||
enable_usage_metrics=True,
|
||
),
|
||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||
)
|
||
|
||
@transport.event_handler("on_client_connected")
|
||
async def on_client_connected(transport, client):
|
||
logger.info(f"Client connected")
|
||
# Kick off the conversation.
|
||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||
await task.queue_frames([LLMRunFrame()])
|
||
|
||
@transport.event_handler("on_client_disconnected")
|
||
async def on_client_disconnected(transport, client):
|
||
logger.info(f"Client disconnected")
|
||
await task.cancel()
|
||
|
||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||
|
||
await runner.run(task)
|
||
|
||
|
||
async def bot(runner_args: RunnerArguments):
|
||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||
transport = await create_transport(runner_args, transport_params)
|
||
await run_bot(transport, runner_args)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
from pipecat.runner.run import main
|
||
|
||
main()
|