import argparse import asyncio import base64 import json import logging import os import sys from dataclasses import asdict, dataclass from functools import partial import aiohttp import httpx from dotenv import load_dotenv from livekit import api, rtc from livekit.agents import ( Agent, AgentSession, AudioConfig, BackgroundAudioPlayer, BuiltinAudioClip, JobContext, JobProcess, MetricsCollectedEvent, RoomInputOptions, RoomOutputOptions, RunContext, WorkerOptions, cli, get_job_context, metrics, ) from livekit.agents.llm import ImageContent, ToolError, function_tool from livekit.agents.voice.avatar import DataStreamAudioOutput from livekit.agents.voice.io import PlaybackFinishedEvent from livekit.agents.voice.room_io import ATTRIBUTE_PUBLISH_ON_BEHALF from livekit.plugins import silero # from livekit.plugins.turn_detector.multilingual import MultilingualModel from livekit.plugins import openai, azure, minimax, cartesia, deepgram, aliyun from pydantic_core.core_schema import SerializerFunction # uncomment to enable Krisp background voice/noise cancellation # from livekit.plugins import noise_cancellation logger = logging.getLogger("basic-agent") load_dotenv() AVATAR_IDENTITY = "avatar_worker" @dataclass class AvatarConnectionInfo: room_name: str url: str """LiveKit server URL""" token: str """Token for avatar worker to join""" class MyAgent(Agent): def __init__(self) -> None: self._tasks = [] # Prevent garbage collection of running tasks super().__init__( instructions="Your name is Kelly. You would interact with users via voice." "with that in mind keep your responses concise and to the point." "do not use emojis, asterisks, markdown, or other special characters in your responses." "You are curious and friendly, and have a sense of humor." "你正在和用户辩论AI是否可以取代人工,你的观点是AI可以取代人工" "you will speak chinese to the user", ) async def on_enter(self): # when the agent is added to the session, it'll generate a reply # according to its instructions self.session.generate_reply() def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() async def launch_avatar(ctx: JobContext, avatar_dispatcher_url: str, avatar_identity: str) -> None: """ Send a request to the avatar service for it to join the room This function should be wrapped in a avatar plugin. """ # create a token for the avatar to join the room token = ( api.AccessToken() .with_identity(avatar_identity) .with_name("Avatar Runner") .with_grants(api.VideoGrants(room_join=True, room=ctx.room.name)) .with_kind("agent") .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: ctx.local_participant_identity}) .to_jwt() ) logger.info(f"Sending connection info to avatar dispatcher {avatar_dispatcher_url}") connection_info = AvatarConnectionInfo(room_name=ctx.room.name, url=ctx._info.url, token=token) async with httpx.AsyncClient() as client: response = await client.post(avatar_dispatcher_url, json=asdict(connection_info)) response.raise_for_status() logger.info("Avatar handshake completed") async def entrypoint(ctx: JobContext, avatar_dispatcher_url: str = None): # each log entry will include these fields ctx.log_context_fields = { "room": ctx.room.name, } logger.info("connecting to room") await ctx.connect() logger.info("waiting for participant") participant = await ctx.wait_for_participant() logger.info(f"starting agent for participant {participant.identity}") initial_voice_id = "Chinese (Mandarin)_Male_Announcer" if participant.attributes.get("voice"): initial_voice_id = participant.attributes.get("voice") logger.info(f"User selected voice: {initial_voice_id}") session = AgentSession( # Speech-to-text (STT) is your agent's ears, turning the user's speech into text that the LLM can understand # See all available models at https://docs.livekit.io/agents/models/stt/ # stt="deepgram/nova-3", # stt=azure.STT( # speech_key="48KfrDwcw6hM0g0WtmF0ZDigaMW8YdUwjrlYDYIL6Rftp5U0V1yfJQQJ99BAAC3pKaRXJ3w3AAAYACOGCI1o", # speech_region="eastasia", # language="zh-CN" # ), # stt=deepgram.STT( # api_key="61dbb8aa4badb820c24029052e106b00f7498598", # language="zh-CN", # model="nova-2-general" # ), stt = aliyun.STT(model="paraformer-realtime-v2"), # A Large Language Model (LLM) is your agent's brain, processing user input and generating a response # See all available models at https://docs.livekit.io/agents/models/llm/ # llm="openai/gpt-4.1-mini", llm=openai.LLM.with_deepseek( model='deepseek-chat' ), # Text-to-speech (TTS) is your agent's voice, turning the LLM's text into speech that the user can hear # See all available models as well as voice selections at https://docs.livekit.io/agents/models/tts/ # tts="cartesia/sonic-2:9626c31c-bec5-4cca-baa8-f8ba9e84c8bc", # tts=minimax.TTS( # model="speech-2.6-turbo", # voice=initial_voice_id, # # voice="Friendly_Person", # # voice="Chinese (Mandarin)_Male_Announcer" # ), tts=aliyun.TTS(model="cosyvoice-v2", voice="longcheng_v2"), # tts=azure.TTS( # speech_key="48KfrDwcw6hM0g0WtmF0ZDigaMW8YdUwjrlYDYIL6Rftp5U0V1yfJQQJ99BAAC3pKaRXJ3w3AAAYACOGCI1o", # speech_region="eastasia", # language='zh-CN' # ), # tts = openai.TTS( # model='kokoro', # voice='zf_xiaoyi', # base_url='http://127.0.0.1:8880/v1', # api_key='not-needed', # ), # tts=cartesia.TTS(), # VAD and turn detection are used to determine when the user is speaking and when the agent should respond # See more at https://docs.livekit.io/agents/build/turns # turn_detection=MultilingualModel(), vad=ctx.proc.userdata["vad"], # allow the LLM to generate a response while waiting for the end of turn # See more at https://docs.livekit.io/agents/build/audio/#preemptive-generation preemptive_generation=True, # sometimes background noise could interrupt the agent session, these are considered false positive interruptions # when it's detected, you may resume the agent's speech resume_false_interruption=True, false_interruption_timeout=1.0, ) # log metrics as they are emitted, and total usage after session is over usage_collector = metrics.UsageCollector() @session.on("metrics_collected") def _on_metrics_collected(ev: MetricsCollectedEvent): metrics.log_metrics(ev.metrics) usage_collector.collect(ev.metrics) async def log_usage(): summary = usage_collector.get_summary() logger.info(f"Usage: {summary}") # shutdown callbacks are triggered when the session is over ctx.add_shutdown_callback(log_usage) # Launch avatar if avatar_dispatcher_url is provided if avatar_dispatcher_url: await launch_avatar(ctx, avatar_dispatcher_url, AVATAR_IDENTITY) session.output.audio = DataStreamAudioOutput( ctx.room, destination_identity=AVATAR_IDENTITY, # (optional) wait for the avatar to publish video track before generating a reply wait_remote_track=rtc.TrackKind.KIND_VIDEO, ) @session.output.audio.on("playback_finished") def on_playback_finished(ev: PlaybackFinishedEvent) -> None: # the avatar should notify when the audio playback is finished logger.info( "playback_finished", extra={ "playback_position": ev.playback_position, "interrupted": ev.interrupted, }, ) await session.start( agent=MyAgent(), room=ctx.room, room_input_options=RoomInputOptions( # uncomment to enable Krisp BVC noise cancellation # noise_cancellation=noise_cancellation.BVC(), ), room_output_options=RoomOutputOptions(transcription_enabled=True), ) # --- 3. 核心:监听 Metadata 变化 --- @ctx.room.on("room_metadata_changed") def on_metadata_changed(old_metadata: str, new_metadata: str): logger.info(f"收到新的比赛状态: {new_metadata} (旧状态: {old_metadata})") print(f"收到新的比赛状态: {new_metadata} (旧状态: {old_metadata})") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--avatar-url", type=str, default=None, help="Avatar dispatcher URL (e.g., http://localhost:8089/launch)") args, remaining_args = parser.parse_known_args() sys.argv = sys.argv[:1] + remaining_args if args.avatar_url: cli.run_app(WorkerOptions(entrypoint_fnc=partial(entrypoint, avatar_dispatcher_url=args.avatar_url), prewarm_fnc=prewarm)) else: cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))