Files
pipecat/examples/simple-chatbot/server/bot-openai.py
2025-07-07 11:58:03 +08:00

137 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import (
DailyMeetingTokenParams,
DailyMeetingTokenProperties,
DailyRESTHelper,
DailyRoomParams,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
"""Main bot execution function."""
async with aiohttp.ClientSession() as session:
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
room = await daily_rest_helper.create_room(
DailyRoomParams(properties={"enable_prejoin_ui": False})
)
token_params = DailyMeetingTokenParams(
properties=DailyMeetingTokenProperties(
is_owner=True,
permissions={
"hasPresence": False, # Example: join as a hidden participant
},
start_video_off=True,
start_audio_off=True,
)
)
token = await daily_rest_helper.get_token(room_url=room.url, params=token_params)
# Set up Daily transport with video/audio parameters
transport = DailyTransport(
room.url,
token,
"Chatbot",
DailyParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "Summerize the conversation so far in a single sentence.",
},
]
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
#
# RTVI events for Pipecat client UI
#
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
context_aggregator.user(),
llm,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
print(f"Participant joined: {participant}")
await transport.capture_participant_transcription(participant["id"])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())