livekit-znjj-demo/agents/my_basic_agent_1_2_9.py

1190 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import asyncio
import base64
import json
import logging
import os
import random
import sys
import re
from dataclasses import asdict, dataclass
from functools import partial
import aiohttp
import httpx
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent,
AgentSession,
AudioConfig,
BackgroundAudioPlayer,
BuiltinAudioClip,
JobContext,
JobProcess,
UserStateChangedEvent,
MetricsCollectedEvent,
RoomInputOptions,
RoomOutputOptions,
RunContext,
WorkerOptions,
cli,
get_job_context,
metrics,
RoomIO
)
from livekit.agents.llm import ImageContent, ToolError, function_tool
from typing import Any, List, Optional
from livekit.agents.voice.avatar import DataStreamAudioOutput
from livekit.agents.voice.io import PlaybackFinishedEvent
from livekit.agents.voice.room_io import ATTRIBUTE_PUBLISH_ON_BEHALF
from livekit.plugins import silero
from livekit.plugins import openai, azure, minimax, aliyun, volcengine
# from livekit.plugins.turn_detector.multilingual import MultilingualModel
from datetime import datetime
logger = logging.getLogger("basic-agent")
# Load environment variables from .env file in the agents directory
env_path = os.path.join(os.path.dirname(__file__), ".env")
load_dotenv(env_path)
# Also try loading from current directory as fallback
load_dotenv()
AVATAR_IDENTITY = "avatar_worker"
DEFAULT_INSTRUCTIONS = """# 角色
你是无锡交警智能机器人,负责收集交通事故信息。
你像真人一样和用户对话而不是机器人。你收到的文字是经过ASR识别的语音输入。
# 当前日期和时间
日期:{datetime}
星期:{weekday}
# 能力
- 你具有调用工具操作前端界面系统的能力
- ask_image_capture工具被调用后会在系统播放拍摄的目标和需求所以你每次在调用它之前不需要重复引导用户拍摄什么而是使用ask_image_capture来传递拍摄需求
# 任务
你的职责是全流程引导用户完成:事故信息采集 -> 现场证据拍照 -> 驾驶员信息核实。
## 事故信息采集阶段
- 在事故信息采集阶段询问是否有人受伤请求用户简单描述事故情况询问事故发生时间并通过复述标准化时间xx年xx月xx日xx时xx分向用户确认询问事故车辆数量询问事故发生的原因例如追尾、刮擦、碰撞等。采集完成后进入现场证据拍照阶段
- 如果用户回答已包含需要问题的答案,改为与用户确认答案是否正确
- 采集完成之后进入现场证据拍照阶段
- 这个阶段不使用ask_important_question和ask_image_capture工具
## 现场证据拍照阶段
- 在现场证据拍照阶段使用askImageCapture工具引导用户依次拍摄照片1. 第一辆车的车牌2. 第一辆车的碰撞位置3. 第一辆车的驾驶员正脸;
- 如果车辆涉及一辆车则询问一位驾驶员如果涉及两辆车则要求拍摄4. 第二辆车的车牌5. 第二辆车的碰撞位置6. 第二辆车的驾驶员正脸;
- 拍摄完成之后和用户确认识别的车牌号是否正确
- 完成之后进入驾驶员信息核实阶段
## 驾驶员信息核实阶段
- 你只处理事故车辆有一辆或者两辆的情况,超过两辆的情况需要转人工处理
- 对于一辆车辆的情况,你首先询问司机的姓名,之后根据车牌号查询驾驶员手机号,如果查询到则用手机号后四位向用户确认,如果未查询到或者用户告知手机号后四位错误,则询问用户手机号。接着根据车牌号查询驾驶员身份证号,如果查询到则用身份证号后四位向用户确认,如果未查询到或者用户告知身份证号后四位错误,则询问用户身份证号
- 对于两辆车辆的情况,你在手机收集完成第一位驾驶员的信息后主动要求与第二位驾驶员通话,你通过主动询问确认电话转接之后再进行后续的通话
- 收集第二位驾驶员的过程与第一位驾驶员相同
- 完成之后进入后续办理提示阶段
## 后续办理提示阶段
- 在后续办理提示阶段使用ask_important_question显示已经提取的交通事故信息提示用户点击转人工继续处理用户点击之后调用enter_hand_off_to_human_mode工具转人工。
# 要求
- 在通话开始
- 你会在ask_image_capture的prompt参数中告诉用户拍摄的目标所以避免在对话中重复描述需要用户拍摄什么
- 使用get_mobile_by_plate和get_id_card_by_plate的时候不要告诉用户正在查询执行工具查看结果即可
# 回复风格
- 使用第一人称,语言简洁
- 一次询问一个问题
- 不要在你的回复中使用 emojis, asterisks, markdown, 或其他特殊字符
- 不同阶段直接的过渡语句自然
- 你已经说过下面的开场白所以不需要重复说:“您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。”
"""
DEFAULT_TALKING_MODE = 'push_to_talk'
# ## 黄金对话路径示例 GOLDEN_CONVERSATION_PATH
# ```
# # 事故初审阶段
# AI: 您好,请问现场有人受伤吗?
# 用户: 没有人员受伤。
# AI: 请您简单描述一下事故的经过。
# 用户: 我的车等红灯时被后面的车追尾了。
# AI: 请问事故大概发生在什么时间?
# 用户: 今天早上八点左右发生的。
# AI: 一共涉及几辆车?
# 用户: 两辆。
# AI: 事故主要原因是追尾,对吗?
# 用户: 对,是被追尾。
# AI: 好的,接下来我们将进行现场证据采集。
# # 现场证据拍照阶段
# AI: 现在需要采集事故现场照片,请先拍摄事故现场的整体照片并上传。
# (调用 ask_image_captureprompt="请拍摄事故现场的整体照片", extract_target="事故现场", requirements="清晰、完整,无模糊和遮挡")
# 工具返回: {"success": true, "analysis": {"target_found": false, "quality_ok": false, "quality_notes": ["画面模糊"]}, "raw": "..."}
# 用户上传模糊照片
# AI您上传的照片不够清晰或存在遮挡请确保画面清晰、完整地拍摄事故现场请重新拍摄并上传。
# (调用 ask_image_captureprompt="请重新清晰完整地拍摄事故现场照片", extract_target="事故现场", requirements="清晰、完整,无模糊和遮挡")
# 工具返回: {"success": true, "analysis": {"target_found": true, "quality_ok": true, "summary": "清晰的事故现场概览"}, "raw": "..."}
# 用户上传清晰照片
# AI很好事故现场照片已收到。接下来请拍摄驾驶员正脸照片。
# (调用 ask_image_captureprompt="请拍摄驾驶员正脸照片", extract_target="驾驶员正脸", requirements="面部清晰、正对镜头")
# 工具返回: {"success": true, "analysis": {"target_found": false, "quality_ok": false, "quality_notes": ["未正对镜头"]}, "raw": "..."}
# 用户上传侧脸照片
# AI检测到照片未正对镜头或面部不清楚请正对镜头并确保面部清晰再次拍摄并上传正脸照片。
# (调用 ask_image_captureprompt="请正对镜头并拍摄清晰的驾驶员正脸照片", extract_target="驾驶员正脸", requirements="面部清晰、正对镜头")
# 工具返回: {"success": true, "analysis": {"target_found": true, "quality_ok": true, "summary": "驾驶员正脸清晰"}, "raw": "..."}
# 用户上传正脸照片
# AI驾驶员正脸照片已收到。最后请拍摄并上传车辆号牌的清晰照片。
# (调用 ask_image_captureprompt="请拍摄车辆号牌照片", extract_target="车辆号牌", requirements="完整、清晰、无遮挡")
# 工具返回: {"success": true, "analysis": {"target_found": true, "quality_ok": true, "summary": "车牌完整可读", "target_details": "苏A12345"}, "raw": "..."}
# 用户上传合格车牌照片
# AI我识别到车牌号是“苏A12345”请确认是否正确如需修改请告知。
# # 驾驶员信息核实阶段
# AI: 请提供您的姓名,用于信息核实。
# 用户: 张三
# AI(调用 get_mobile_by_plateplate_number="苏A12345")
# 工具返回: {"success": true, "plate": "苏A12345", "mobile": "13800001234"}
# AI系统查询到登记手机号尾号1234请确认是否正确如不正确请告知真实手机号。
# 用户: 正确
# AI(调用 get_id_card_by_plateplate_number="苏A12345")
# 工具返回: {"success": true, "plate": "苏A12345", "id_card": "320101198001011234"}
# AI系统查询到登记身份证尾号1234请确认是否正确如不正确请告知完整身份证号。
# 用户: 正确
# AI: 信息采集已完成,请点击确认进入后续办理。(调用 askImportantQuestionmessage="请确认已采集信息:\n姓名张三\n手机号13800001234尾号1234\n身份证号320101198001011234尾号1234\n车牌号苏A12345", options=["确认无误", "需要修改"])
# 工具返回: {"selection": "确认无误"}
# AI: 已确认,后续流程请点击转人工继续处理。(调用 enter_hand_off_to_human_mode)
# ```
@dataclass
class AvatarConnectionInfo:
room_name: str
url: str
"""LiveKit server URL"""
token: str
"""Token for avatar worker to join"""
class MyAgent(Agent):
def __init__(self, instructions: str, vision_backend: str = "aliyun") -> None:
self.vision_backend = vision_backend
self._tasks = []
self._captured_images = []
self._image_event = asyncio.Event()
super().__init__(
instructions=instructions,
)
async def on_enter(self):
self.session.generate_reply(
instructions="调用ask_important_questionmessage=\"您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理。\"options=[\"继续办理\"]", allow_interruptions=False)
# Register byte stream handler for image uploads from frontend
def _image_received_handler(reader, participant_identity):
task = asyncio.create_task(
self._image_received(reader, participant_identity)
)
self._tasks.append(task)
task.add_done_callback(lambda t: self._tasks.remove(t))
# Add the handler when the agent joins
get_job_context().room.register_byte_stream_handler("image", _image_received_handler)
async def _send_chat_message(self, message: str):
"""Helper to send a chat message to the room."""
try:
room = get_job_context().room
import time
# Construct payload matching what useChat likely expects or just a simple JSON
# Standard LiveKit chat format often involves a simple JSON wrapper
payload = json.dumps({
"message": message,
"timestamp": int(time.time() * 1000)
}).encode('utf-8')
await room.local_participant.publish_data(
payload=payload,
topic="lk-chat-topic"
)
except Exception as e:
logger.error(f"Failed to send chat message: {e}")
async def _image_received(self, reader, participant_identity):
"""Handle image uploads from the frontend via byte stream."""
logger.info(f"Received image upload from participant: {participant_identity}")
image_bytes = bytes()
async for chunk in reader:
image_bytes += chunk
logger.info(f"Image received: {len(image_bytes)} bytes from {participant_identity}")
# Store the received image bytes in memory
self._captured_images.append(image_bytes)
self._image_event.set()
# Notify user that image was received
# chat_ctx = self.chat_ctx.copy()
# chat_ctx.add_message(
# role="user",
# content=["I have uploaded an image. You can analyze it when I ask you to."]
# )
# await self.update_chat_ctx(chat_ctx)
# Trigger a reply acknowledging receipt
# self.session.generate_reply()
def _normalize_vision_response(self, raw_content: Any) -> str:
"""Convert various content formats to a clean JSON string (no code fences)."""
# Handle multimodal content list responses
if isinstance(raw_content, list):
text_parts: List[str] = []
for part in raw_content:
if isinstance(part, dict):
if part.get("type") == "text" and "text" in part:
text_parts.append(str(part["text"]))
else:
text_parts.append(str(part))
raw_content = "".join(text_parts)
content = str(raw_content).strip()
if content.startswith("```"):
# Strip fences like ```json ... ```
content = content.lstrip("`")
if content.lower().startswith("json"):
content = content[4:].lstrip()
if "```" in content:
content = content.split("```", 1)[0].strip()
return content
async def describe_image_with_api(
self,
base64_image: str,
backend: str = "siliconflow",
prompt: str = "",
extract_target: str = "",
requirements: str = "",
) -> str:
"""
Send base64 image to Vision API (SiliconFlow or Aliyun) and request a
JSON-formatted analysis aligned with ask_image_capture signature.
"""
if backend == "aliyun":
return await self._describe_image_aliyun(
base64_image,
prompt=prompt,
extract_target=extract_target,
requirements=requirements,
)
else:
return await self._describe_image_siliconflow(
base64_image,
prompt=prompt,
extract_target=extract_target,
requirements=requirements,
)
async def _describe_image_aliyun(
self,
base64_image: str,
prompt: str,
extract_target: str,
requirements: str,
) -> str:
logger.info("Sending image to Aliyun Vision API (Qwen-VL-Max)...")
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
logger.error("DASHSCOPE_API_KEY not found")
return "Error: DASHSCOPE_API_KEY not configured."
url = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"
analysis_prompt = (
"请根据以下任务要求对图片进行结构化分析并且只返回紧凑的JSON"
f'{{"prompt": "{prompt}", '
f'"extract_target": "{extract_target}", '
f'"requirements": "{requirements}", '
'"summary": "<用不超过50字中文概述图片>", '
'"target_found": <true|false>, '
'"target_details": "<若存在目标,描述关键特征;否则写未发现>", '
'"quality_ok": <true|false>, '
'"quality_notes": ["<不满足要求的原因,若满足可为空数组>"]}'
"。不要添加额外说明、前缀或代码块。"
)
payload = {
"model": "qwen-vl-plus",
"messages": [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
},
{
"type": "text",
"text": analysis_prompt
}
]
}
]
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
raw_content = data['choices'][0]['message']['content']
normalized = self._normalize_vision_response(raw_content)
try:
parsed = json.loads(normalized)
description = json.dumps(parsed, ensure_ascii=False)
except Exception:
description = normalized
logger.info(f"Got image description from Aliyun: {description}")
return description
else:
error_text = await response.text()
logger.error(f"Aliyun API error: {response.status} - {error_text}")
return "Failed to analyze image due to Aliyun API error."
except Exception as e:
logger.error(f"Exception calling Aliyun Vision API: {e}")
return f"Error analyzing image: {str(e)}"
async def _describe_image_siliconflow(
self,
base64_image: str,
prompt: str,
extract_target: str,
requirements: str,
) -> str:
"""
Send base64 image to SiliconFlow Vision API.
"""
logger.info("Sending image to SiliconFlow Vision API...")
# NOTE: You need to set your SiliconFlow API key here or in your .env file
api_key = os.getenv("SILICONFLOW_API_KEY", "your-api-key")
url = "https://api.siliconflow.cn/v1/chat/completions"
analysis_prompt = (
"请根据以下任务要求输出JSON结果不要附加任何解释或代码块"
f'{{"prompt": "{prompt}", '
f'"extract_target": "{extract_target}", '
f'"requirements": "{requirements}", '
'"summary": "<50字内中文概述图片>", '
'"target_found": <true|false>, '
'"target_details": "<若找到目标描述关键特征,否则写未发现>", '
'"quality_ok": <true|false>, '
'"quality_notes": ["<若不满足拍摄要求,列出问题;满足则为空数组>"]}'
)
payload = {
"model": "Qwen/Qwen3-VL-8B-Instruct",
"messages": [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
},
{
"type": "text",
"text": analysis_prompt
}
]
}
],
"max_tokens": 512
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
raw_content = data['choices'][0]['message']['content']
normalized = self._normalize_vision_response(raw_content)
try:
parsed = json.loads(normalized)
description = json.dumps(parsed, ensure_ascii=False)
except Exception:
description = normalized
logger.info(f"Got image description: {description}")
return description
else:
error_text = await response.text()
logger.error(f"SiliconFlow API error: {response.status} - {error_text}")
return "Failed to analyze image due to API error."
except Exception as e:
logger.error(f"Exception calling Vision API: {e}")
return f"Error analyzing image: {str(e)}"
@function_tool()
async def ask_image_capture(
self,
context: RunContext,
prompt: str = "请拍摄照片",
extract_target: str = "车辆号牌",
requirements: str = "清晰完整、无模糊",
):
"""
请求用户根据提供的提示语拍摄或上传照片,并检查照片中是否包含指定的提取目标且满足相关要求。
参数:
context (RunContext): 代理运行上下文。
prompt (str, 可选): 提示用户拍摄什么内容(例如:“请拍摄事故现场照片”,“请拍摄车牌号”)。默认值为“请拍摄照片”。
extract_target (str, 可选): 期望照片中包含的主要目标(例如:“车辆号牌”)。默认值为“车辆号牌”。
requirements (str, 可选): 拍摄的具体要求说明(例如:清晰、构图完整)。默认值为“清晰完整、无模糊”。
返回:
dict: 简洁可序列化的工具结果。
- success: bool
- analysis: 解析后的结果(解析失败则为字符串)
- raw: 原始文本结果(字符串)
- error/details: 仅失败时返回
"""
await self._send_chat_message(
"┌─🔨 Call: ask_image_capture\n"
f"│ prompt: \"{prompt}\"\n"
"└───────────────"
)
try:
room = get_job_context().room
participant_identity = next(iter(room.remote_participants))
# Clear previous image event
self._image_event.clear()
# Speak the capture prompt so the user hears what to do
self.session.say(prompt, allow_interruptions=False)
# Ask for image capture and wait for user to capture/upload
response = await room.local_participant.perform_rpc(
destination_identity=participant_identity,
method="askImageCapture",
payload=json.dumps({
"prompt": prompt,
}),
response_timeout=120.0, # Longer timeout for user to capture image
)
logger.info(f"Received image capture response: {response}")
# Parse the response to check for success/error (image data comes via byte stream)
try:
response_data = json.loads(response)
if "error" in response_data:
return {
"success": False,
"error": "图片采集失败",
"details": response_data.get("error"),
}
# Wait for the image byte stream to complete (signaled by _image_received)
if not self._image_event.is_set():
try:
# Give a reasonable buffer for file transfer to finish after RPC returns
await asyncio.wait_for(self._image_event.wait(), timeout=10.0)
except asyncio.TimeoutError:
if not self._captured_images:
return {
"success": False,
"error": "未收到图片数据",
"details": "Timeout waiting for file stream",
}
# Use the LAST stored image
last_image_bytes = self._captured_images[-1]
logger.info(f"Analyzing last stored image, size: {len(last_image_bytes)} bytes")
base64_image = base64.b64encode(last_image_bytes).decode('utf-8')
# Analyze the image using the configured vision backend
description = await self.describe_image_with_api(
base64_image,
self.vision_backend,
prompt=prompt,
extract_target=extract_target,
requirements=requirements,
)
logger.info(f"Image analysis result: {description}")
# Try to parse analysis as JSON; fall back to raw string
raw_text = description if isinstance(description, str) else json.dumps(description, ensure_ascii=False)
parsed_analysis = raw_text
try:
parsed_analysis = json.loads(raw_text)
except Exception:
parsed_analysis = raw_text
# Derive validity signals for the LLM to decide on retries
valid = False
problems = []
target_details = None
if isinstance(parsed_analysis, dict):
target_found = bool(parsed_analysis.get("target_found"))
quality_ok = bool(parsed_analysis.get("quality_ok"))
valid = target_found and quality_ok
target_details = parsed_analysis.get("target_details")
if not target_found:
problems.append(f"未发现{extract_target}")
if not quality_ok:
notes = parsed_analysis.get("quality_notes") or []
problems.extend(notes)
# Format the full analysis output for chat
analysis_display = str(raw_text)
if isinstance(parsed_analysis, dict):
# Pretty print JSON for better readability
analysis_display = json.dumps(parsed_analysis, ensure_ascii=False, indent=2)
await self._send_chat_message(
"┌─✅ Result: ask_image_capture\n"
f"│ valid: {valid}, problems: {problems}\n"
f"│ analysis:\n{analysis_display}\n"
"└───────────────"
)
return {
"success": True,
"analysis": parsed_analysis,
"raw": raw_text,
"valid": valid,
"problems": problems,
"target_details": target_details,
}
except json.JSONDecodeError:
logger.error(f"Failed to parse response: {response}")
return {
"success": False,
"error": "图片数据解析失败",
"details": response,
}
except Exception as e:
logger.error(f"Failed to capture and analyze image: {e}")
raise ToolError(f"无法完成图片采集和分析: {str(e)}")
@function_tool()
async def get_mobile_by_plate(
self,
context: RunContext,
plate_number: str,
):
"""
根据车牌号查询登记的手机号。
返回:
dict: { success: bool, plate: str, mobile: str | None, error?: str }
"""
normalized_plate = plate_number.strip().upper()
await self._send_chat_message(
"┌─🔨 Call: get_mobile_by_plate\n"
f"│ plate: \"{normalized_plate}\"\n"
"└───────────────"
)
# Generate random mobile number (11 digits: 1[3-9] + 9 random digits)
mobile_prefix = random.choice(['13', '14', '15', '16', '17', '18', '19'])
mobile_suffix = ''.join([str(random.randint(0, 9)) for _ in range(9)])
random_mobile = f"{mobile_prefix}{mobile_suffix}"
result = {
"success": True,
"plate": normalized_plate,
"mobile": random_mobile,
}
await self._send_chat_message(
"┌─✅ Result: get_mobile_by_plate\n"
f"│ plate: \"{normalized_plate}\"\n"
f"│ mobile: \"{random_mobile}\"\n"
"└───────────────"
)
return result
@function_tool()
async def get_id_card_by_plate(
self,
context: RunContext,
plate_number: str,
):
"""
根据车牌号查询登记的身份证号。
返回:
dict: { success: bool, plate: str, id_card: str | None, error?: str }
"""
normalized_plate = plate_number.strip().upper()
await self._send_chat_message(
"┌─🔨 Call: get_id_card_by_plate\n"
f"│ plate: \"{normalized_plate}\"\n"
"└───────────────"
)
# Generate random ID card number (18 digits: 6-digit area code + 8-digit birth date + 3-digit sequence + 1 check digit)
# Area code: random 6 digits (typically 110000-659999 for Chinese ID cards)
area_code = random.randint(110000, 659999)
# Birth date: random date between 1950-01-01 and 2000-12-31
year = random.randint(1950, 2000)
month = random.randint(1, 12)
day = random.randint(1, 28) # Use 28 to avoid month-specific day issues
birth_date = f"{year:04d}{month:02d}{day:02d}"
# Sequence number: 3 random digits
sequence = random.randint(100, 999)
# Check digit: random digit or X (10% chance of X)
check_digit = 'X' if random.random() < 0.1 else str(random.randint(0, 9))
random_id_card = f"{area_code}{birth_date}{sequence}{check_digit}"
result = {
"success": True,
"plate": normalized_plate,
"id_card": random_id_card,
}
await self._send_chat_message(
"┌─✅ Result: get_id_card_by_plate\n"
f"│ plate: \"{normalized_plate}\"\n"
f"│ id_card: \"{random_id_card}\"\n"
"└───────────────"
)
return result
@function_tool()
async def validate_mobile_number(
self,
context: RunContext,
mobile: str,
):
"""
检查手机号格式是否正确(大陆 11 位手机号1[3-9] 开头)。
返回:
dict: { success: bool, valid: bool, mobile: str, error?: str }
"""
normalized = mobile.strip().replace(" ", "")
await self._send_chat_message(
"┌─🔨 Call: validate_mobile_number\n"
f"│ mobile: \"{normalized}\"\n"
"└───────────────"
)
is_valid = bool(re.fullmatch(r"1[3-9]\\d{9}", normalized))
if is_valid:
result = {
"success": True,
"valid": True,
"mobile": normalized,
}
await self._send_chat_message(
"┌─✅ Result: validate_mobile_number\n"
f"│ mobile: \"{normalized}\"\n"
f"│ valid: true\n"
"└───────────────"
)
return result
result = {
"success": True,
"valid": False,
"mobile": normalized,
"error": "手机号格式不正确应为1[3-9]开头的11位数字",
}
await self._send_chat_message(
"┌─✅ Result: validate_mobile_number\n"
f"│ mobile: \"{normalized}\"\n"
f"│ valid: false\n"
f"│ error: \"{result['error']}\"\n"
"└───────────────"
)
return result
@function_tool()
async def validate_id_card_number(
self,
context: RunContext,
id_card: str,
):
"""
检查身份证号格式是否正确18位含校验位X或15位纯数字
返回:
dict: { success: bool, valid: bool, id_card: str, error?: str }
"""
normalized = id_card.strip().replace(" ", "").upper()
await self._send_chat_message(
"┌─🔨 Call: validate_id_card_number\n"
f"│ id_card: \"{normalized}\"\n"
"└───────────────"
)
is_valid = bool(re.fullmatch(r"(\\d{17}[\\dX]|\\d{15})", normalized))
if is_valid:
result = {
"success": True,
"valid": True,
"id_card": normalized,
}
await self._send_chat_message(
"┌─✅ Result: validate_id_card_number\n"
f"│ id_card: \"{normalized}\"\n"
f"│ valid: true\n"
"└───────────────"
)
return result
result = {
"success": True,
"valid": False,
"id_card": normalized,
"error": "身份证格式不正确应为18位末位可为X或15位数字",
}
await self._send_chat_message(
"┌─✅ Result: validate_id_card_number\n"
f"│ id_card: \"{normalized}\"\n"
f"│ valid: false\n"
f"│ error: \"{result['error']}\"\n"
"└───────────────"
)
return result
@function_tool()
async def enter_hand_off_to_human_mode(
self,
context: RunContext,
):
"""切换到"转人工"模式(前端电话界面进入人工处理)。返回成功/失败。"""
await self._send_chat_message(
"┌─🔨 Call: enter_hand_off_to_human_mode\n"
"└───────────────"
)
try:
room = get_job_context().room
participant_identity = next(iter(room.remote_participants))
response = await room.local_participant.perform_rpc(
destination_identity=participant_identity,
method="enterHandOffToHumanMode",
payload=json.dumps({}),
response_timeout=5.0,
)
logger.info(f"Entered hand off to human mode: {response}")
await self._send_chat_message(
"┌─✅ Result: enter_hand_off_to_human_mode\n"
f"│ status: success\n"
f"│ response: {response}\n"
"└───────────────"
)
return response
except Exception as e:
logger.error(f"Failed to enter hand off to human mode: {e}")
await self._send_chat_message(
"┌─❌ Result: enter_hand_off_to_human_mode\n"
f"│ status: error\n"
f"│ error: \"{str(e)}\"\n"
"└───────────────"
)
raise ToolError(f"Unable to enter hand off to human mode: {str(e)}")
@function_tool()
async def hang_up_call(
self,
context: RunContext,
):
"""挂断当前通话(结束会话),返回成功/失败。"""
await self._send_chat_message(
"┌─🔨 Call: hang_up_call\n"
"└───────────────"
)
try:
room = get_job_context().room
participant_identity = next(iter(room.remote_participants))
response = await room.local_participant.perform_rpc(
destination_identity=participant_identity,
method="hangUpCall",
payload=json.dumps({}),
response_timeout=5.0,
)
logger.info(f"Hung up call: {response}")
await self._send_chat_message(
"┌─✅ Result: hang_up_call\n"
f"│ status: disconnected\n"
f"│ response: {response}\n"
"└───────────────"
)
return response
except Exception as e:
logger.error(f"Failed to hang up call: {e}")
await self._send_chat_message(
"┌─❌ Result: hang_up_call\n"
f"│ status: error\n"
f"│ error: \"{str(e)}\"\n"
"└───────────────"
)
raise ToolError(f"Unable to hang up call: {str(e)}")
@function_tool()
async def ask_important_question(self, context: RunContext, message: str, options: Optional[List[str]] | str = None):
"""询问关键问题并等待用户选择选项,返回用户的选择结果。
参数:
message: 要朗读/展示的问题。
options: 可选项列表(字符串数组),如 ["继续办理", "转人工"]。
返回:
str: 用户选择的文本内容。
"""
await self._send_chat_message(
"┌─🔨 Call: ask_important_question\n"
f"│ message: \"{message}\"\n"
f"│ options: {options}\n"
"└───────────────"
)
try:
room = get_job_context().room
participant_identity = next(iter(room.remote_participants))
payload_data = {
"message": message,
}
if options:
# Support both list input and JSON-stringified list input
if isinstance(options, str):
try:
options = json.loads(options)
except json.JSONDecodeError:
logger.error(f"Failed to parse options string: {options}")
options = None
# Ensure options is a list of strings
if isinstance(options, list):
payload_data["options"] = options
# Speak the message
speech_handle = self.session.say(message, allow_interruptions=False)
# Wait for user selection with longer timeout since user needs time to respond
response = await room.local_participant.perform_rpc(
destination_identity=participant_identity,
method="askImportantQuestion",
payload=json.dumps(payload_data),
response_timeout=60.0, # Increased timeout for user interaction
)
# Interrupt speech if user makes a selection while agent is speaking
if speech_handle and hasattr(speech_handle, "interrupt"):
speech_handle.interrupt()
logger.info("Interrupted speech due to user selection")
logger.info(f"User made selection: {response}")
# Parse the response to get the user's selection
try:
response_data = json.loads(response)
user_selection = response_data.get("selection", "确认")
logger.info(f"User selected: {user_selection}")
await self._send_chat_message(
"┌─✅ Result: ask_important_question\n"
f"│ selection: \"{user_selection}\"\n"
"└───────────────"
)
return f"用户选择了: {user_selection}"
except json.JSONDecodeError:
logger.error(f"Failed to parse response: {response}")
return f"用户选择了: {response}"
except Exception as e:
logger.error(f"Failed to ask important question: {e}")
raise ToolError(f"Unable to ask important question: {str(e)}")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def launch_avatar(ctx: JobContext, avatar_dispatcher_url: str, avatar_identity: str) -> None:
"""
Send a request to the avatar service for it to join the room
This function should be wrapped in a avatar plugin.
"""
# create a token for the avatar to join the room
token = (
api.AccessToken()
.with_identity(avatar_identity)
.with_name("Avatar Runner")
.with_grants(api.VideoGrants(room_join=True, room=ctx.room.name))
.with_kind("agent")
.with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: ctx.local_participant_identity})
.to_jwt()
)
logger.info(f"Sending connection info to avatar dispatcher {avatar_dispatcher_url}")
connection_info = AvatarConnectionInfo(room_name=ctx.room.name, url=ctx._info.url, token=token)
async with httpx.AsyncClient() as client:
response = await client.post(avatar_dispatcher_url, json=asdict(connection_info))
response.raise_for_status()
logger.info("Avatar handshake completed")
async def entrypoint(ctx: JobContext, avatar_dispatcher_url: str = None, vision_backend: str = "siliconflow", llm_backend: str = "deepseek"):
# inactivity_task: asyncio.Task | None = None
# async def user_presence_task():
# # try to ping the user 3 times, if we get no answer, close the session
# for _ in range(3):
# await session.generate_reply(
# instructions=(
# "你礼貌的询问用户是否还在通话。"
# )
# )
# await asyncio.sleep(10)
# session.shutdown()
# each log entry will include these fields
ctx.log_context_fields = {
"room": ctx.room.name,
}
# IMPORTANT: Connect to room first
logger.info("connecting to room")
await ctx.connect()
logger.info("waiting for participant")
participant = await ctx.wait_for_participant()
logger.info(f"starting agent for participant {participant.identity}")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Current time: {current_time}")
# Calculate weekday in Chinese
weekday_map = {
0: "星期一",
1: "星期二",
2: "星期三",
3: "星期四",
4: "星期五",
5: "星期六",
6: "星期日"
}
current_weekday = weekday_map[datetime.now().weekday()]
logger.info(f"Current weekday: {current_weekday}")
initial_voice_id = "BV001_streaming" # Female voice
if participant.attributes.get("voice"):
initial_voice_id = participant.attributes.get("voice")
logger.info(f"User selected voice: {initial_voice_id}")
# initial_instructions = """Your name is Kelly. You would interact with users via voice.
# with that in mind keep your responses concise and to the point.
# do not use emojis, asterisks, markdown, or other special characters in your responses.
# You are curious and friendly, and have a sense of humor.
# you will speak chinese to the user"
# """
initial_instructions = DEFAULT_INSTRUCTIONS
if participant.attributes.get("instructions"):
initial_instructions = participant.attributes.get("instructions")
logger.info(f"User selected instructions: {initial_instructions}")
# Replace the datetime and weekday placeholders to avoid KeyError from other braces in the prompt
initial_instructions = initial_instructions.replace("{datetime}", current_time)
initial_instructions = initial_instructions.replace("{weekday}", current_weekday)
logger.info(f"Initial instructions: {initial_instructions}")
if llm_backend == "dashscope":
logger.info("Using DashScope DeepSeek backend")
llm = openai.LLM(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key=os.getenv("DASHSCOPE_API_KEY"),
model="deepseek-v3.2",
)
else:
logger.info("Using default DeepSeek backend")
llm = openai.LLM.with_deepseek(
model='deepseek-chat',
)
session = AgentSession(
vad=ctx.proc.userdata["vad"],
# turn_detection=MultilingualModel(),
# any combination of STT, LLM, TTS, or realtime API can be used
# stt = aliyun.STT(model="paraformer-realtime-v2"),
stt = volcengine.BigModelSTT(
app_id="8607675070"
),
# stt = volcengine.STT(
# app_id="2931820332",
# cluster="volcengine_streaming_common"
# ),
llm=llm,
# tts=aliyun.TTS(model="cosyvoice-v2", voice="longcheng_v2"),
tts=volcengine.TTS(
app_id="2931820332",
cluster="volcano_tts",
voice=initial_voice_id,
sample_rate=8000,
speed=1.1,
),
# allow the LLM to generate a response while waiting for the end of turn
preemptive_generation=True,
# sometimes background noise could interrupt the agent session, these are considered false positive interruptions
# when it's detected, you may resume the agent's speech
resume_false_interruption=True,
false_interruption_timeout=1.0,
# Increase the maximum number of function calls per turn to avoid hitting the limit
max_tool_steps=15,
)
room_io = RoomIO(session, room=ctx.room)
await room_io.start()
# log metrics as they are emitted, and total usage after session is over
usage_collector = metrics.UsageCollector()
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: {summary}")
# shutdown callbacks are triggered when the session is over
ctx.add_shutdown_callback(log_usage)
# Launch avatar if avatar_dispatcher_url is provided
if avatar_dispatcher_url:
await launch_avatar(ctx, avatar_dispatcher_url, AVATAR_IDENTITY)
session.output.audio = DataStreamAudioOutput(
ctx.room,
destination_identity=AVATAR_IDENTITY,
# (optional) wait for the avatar to publish video track before generating a reply
wait_remote_track=rtc.TrackKind.KIND_VIDEO,
)
@session.output.audio.on("playback_finished")
def on_playback_finished(ev: PlaybackFinishedEvent) -> None:
# the avatar should notify when the audio playback is finished
logger.info(
"playback_finished",
extra={
"playback_position": ev.playback_position,
"interrupted": ev.interrupted,
},
)
# @session.on("user_state_changed")
# def _user_state_changed(ev: UserStateChangedEvent):
# nonlocal inactivity_task
# if ev.new_state == "away":
# inactivity_task = asyncio.create_task(user_presence_task())
# return
# # ev.new_state: listening, speaking, ..
# if inactivity_task is not None:
# inactivity_task.cancel()
await session.start(
agent=MyAgent(initial_instructions, vision_backend=vision_backend),
room=ctx.room,
room_input_options=RoomInputOptions(
# uncomment to enable Krisp BVC noise cancellation
# noise_cancellation=noise_cancellation.BVC(),
),
room_output_options=RoomOutputOptions(transcription_enabled=True),
)
# disable input audio at the start
_talking_mode = DEFAULT_TALKING_MODE
if _talking_mode == "push_to_talk":
session.input.set_audio_enabled(False)
else:
session.input.set_audio_enabled(True)
@ctx.room.local_participant.register_rpc_method("start_turn")
async def start_turn(data: rtc.RpcInvocationData):
try:
session.interrupt()
except RuntimeError as e:
logger.error(f"Failed to interrupt session: {e}")
# Raise RPC error so client can detect interrupt failure
# Use ERROR_INTERNAL (code 13) to indicate application error
raise rtc.RpcError(
code=13, # ERROR_INTERNAL
message="Application error in method handler"
)
session.clear_user_turn()
# listen to the caller if multi-user
room_io.set_participant(data.caller_identity)
session.input.set_audio_enabled(True)
@ctx.room.local_participant.register_rpc_method("end_turn")
async def end_turn(data: rtc.RpcInvocationData):
session.input.set_audio_enabled(False)
session.commit_user_turn(
# the timeout for the final transcript to be received after committing the user turn
# increase this value if the STT is slow to respond
transcript_timeout=10.0,
# the duration of the silence to be appended to the STT to make it generate the final transcript
stt_flush_duration=2.0,
)
@ctx.room.local_participant.register_rpc_method("cancel_turn")
async def cancel_turn(data: rtc.RpcInvocationData):
session.input.set_audio_enabled(False)
session.clear_user_turn()
logger.info("cancel turn")
@ctx.room.local_participant.register_rpc_method("switch_ptt_and_rt")
async def switch_ptt_and_rt(data: rtc.RpcInvocationData):
nonlocal _talking_mode
_talking_mode = "push_to_talk" if _talking_mode == "realtime" else "realtime"
if _talking_mode == "push_to_talk":
session.input.set_audio_enabled(False)
else:
session.input.set_audio_enabled(True)
return json.dumps({"success": True, "mode": _talking_mode})
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--avatar-url", type=str, default=None, help="Avatar dispatcher URL")
parser.add_argument("--vision-backend", type=str, default="aliyun", choices=["siliconflow", "aliyun"], help="Vision API backend")
parser.add_argument("--llm-backend", type=str, default="dashscope", choices=["deepseek", "dashscope"], help="LLM backend")
args, remaining_args = parser.parse_known_args()
sys.argv = sys.argv[:1] + remaining_args
if args.avatar_url:
cli.run_app(WorkerOptions(entrypoint_fnc=partial(entrypoint, avatar_dispatcher_url=args.avatar_url, vision_backend=args.vision_backend, llm_backend=args.llm_backend), prewarm_fnc=prewarm))
else:
cli.run_app(WorkerOptions(entrypoint_fnc=partial(entrypoint, vision_backend=args.vision_backend, llm_backend=args.llm_backend), prewarm_fnc=prewarm))