|
|
|
|
@@ -16,6 +16,7 @@ from pathlib import Path
|
|
|
|
|
from typing import Any, List, Optional, Tuple
|
|
|
|
|
|
|
|
|
|
import aiofiles
|
|
|
|
|
import aiohttp
|
|
|
|
|
from loguru import logger
|
|
|
|
|
from PIL.ImageFile import ImageFile
|
|
|
|
|
from utils import (
|
|
|
|
|
@@ -53,6 +54,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
|
|
|
|
|
from pipecat.services.llm_service import FunctionCallParams
|
|
|
|
|
from pipecat.services.openai.llm import OpenAILLMService
|
|
|
|
|
from pipecat.transports.daily.transport import DailyParams, DailyTransport
|
|
|
|
|
from pipecat.transports.daily.utils import DailyRESTHelper, DailyRoomObject, DailyRoomParams
|
|
|
|
|
|
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
|
|
|
|
|
|
|
|
@@ -87,7 +89,6 @@ class EvalRunner:
|
|
|
|
|
self._log_level = log_level
|
|
|
|
|
self._total_success = 0
|
|
|
|
|
self._tests: List[EvalResult] = []
|
|
|
|
|
self._result_future: Optional[asyncio.Future[bool]] = None
|
|
|
|
|
|
|
|
|
|
# We to save runner files.
|
|
|
|
|
name = name or f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
|
@@ -97,17 +98,6 @@ class EvalRunner:
|
|
|
|
|
os.makedirs(self._logs_dir, exist_ok=True)
|
|
|
|
|
os.makedirs(self._recordings_dir, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
async def function_assert_eval(self, params: FunctionCallParams):
|
|
|
|
|
result = params.arguments["result"]
|
|
|
|
|
reasoning = params.arguments["reasoning"]
|
|
|
|
|
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
|
|
|
|
|
await params.result_callback(None)
|
|
|
|
|
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
|
|
|
|
|
|
|
|
|
|
async def assert_eval(self, result: bool):
|
|
|
|
|
if self._result_future:
|
|
|
|
|
self._result_future.set_result(result)
|
|
|
|
|
|
|
|
|
|
async def run_eval(
|
|
|
|
|
self,
|
|
|
|
|
example_file: str,
|
|
|
|
|
@@ -116,23 +106,44 @@ class EvalRunner:
|
|
|
|
|
if not re.match(self._pattern, example_file):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"\n🚀 Launching {example_file} ...\n")
|
|
|
|
|
|
|
|
|
|
# Store logs
|
|
|
|
|
filename = self._log_file_name(example_file)
|
|
|
|
|
log_file_id = logger.add(filename, level=self._log_level)
|
|
|
|
|
|
|
|
|
|
print_begin_test(example_file)
|
|
|
|
|
|
|
|
|
|
# Create Daily Room
|
|
|
|
|
logger.info("Creating Daily room...")
|
|
|
|
|
async with aiohttp.ClientSession() as aiohttp_session:
|
|
|
|
|
helper = DailyRESTHelper(
|
|
|
|
|
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
|
|
|
|
|
)
|
|
|
|
|
room = await helper.create_room(params=DailyRoomParams())
|
|
|
|
|
logger.info(f"Created Daily room {room.url}")
|
|
|
|
|
|
|
|
|
|
script_path = self._examples_dir / example_file
|
|
|
|
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
|
|
# Create a future to store the eval result.
|
|
|
|
|
self._result_future = asyncio.get_running_loop().create_future()
|
|
|
|
|
# Create per-eval future and callbacks so multiple evals can run concurrently.
|
|
|
|
|
result_future = asyncio.get_running_loop().create_future()
|
|
|
|
|
function_assert_eval, assert_eval = self._create_eval_callbacks(result_future)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
tasks = [
|
|
|
|
|
asyncio.create_task(run_example_pipeline(script_path, eval_config)),
|
|
|
|
|
asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)),
|
|
|
|
|
asyncio.create_task(run_example_pipeline(room.url, script_path, eval_config)),
|
|
|
|
|
asyncio.create_task(
|
|
|
|
|
run_eval_pipeline(
|
|
|
|
|
self,
|
|
|
|
|
example_file,
|
|
|
|
|
eval_config,
|
|
|
|
|
room_url=room.url,
|
|
|
|
|
function_assert_eval=function_assert_eval,
|
|
|
|
|
assert_eval=assert_eval,
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
|
|
|
|
|
if pending:
|
|
|
|
|
@@ -149,7 +160,7 @@ class EvalRunner:
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Wait for the future to resolve.
|
|
|
|
|
result = await asyncio.wait_for(self._result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
|
|
|
|
|
result = await asyncio.wait_for(result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
logger.error(f"ERROR: Timeout waiting for eval result.")
|
|
|
|
|
result = False
|
|
|
|
|
@@ -163,6 +174,15 @@ class EvalRunner:
|
|
|
|
|
|
|
|
|
|
print_end_test(example_file, result, eval_time)
|
|
|
|
|
|
|
|
|
|
# Delete Daily Room
|
|
|
|
|
logger.info(f"Deleting Daily room {room.url} ...")
|
|
|
|
|
async with aiohttp.ClientSession() as aiohttp_session:
|
|
|
|
|
helper = DailyRESTHelper(
|
|
|
|
|
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
|
|
|
|
|
)
|
|
|
|
|
await helper.delete_room_by_name(room.name)
|
|
|
|
|
logger.info(f"Deleted Daily room {room.url}")
|
|
|
|
|
|
|
|
|
|
logger.remove(log_file_id)
|
|
|
|
|
|
|
|
|
|
def print_results(self):
|
|
|
|
|
@@ -183,6 +203,25 @@ class EvalRunner:
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(f"There's no audio to save for {name}")
|
|
|
|
|
|
|
|
|
|
def _create_eval_callbacks(self, result_future: asyncio.Future):
|
|
|
|
|
"""Create per-eval callback functions that capture a local result_future.
|
|
|
|
|
|
|
|
|
|
This allows multiple evals to run concurrently without sharing state.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
async def function_assert_eval(params: FunctionCallParams):
|
|
|
|
|
result = params.arguments["result"]
|
|
|
|
|
reasoning = params.arguments["reasoning"]
|
|
|
|
|
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
|
|
|
|
|
await params.result_callback(None)
|
|
|
|
|
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
|
|
|
|
|
|
|
|
|
|
async def assert_eval(result: bool):
|
|
|
|
|
if not result_future.done():
|
|
|
|
|
result_future.set_result(result)
|
|
|
|
|
|
|
|
|
|
return function_assert_eval, assert_eval
|
|
|
|
|
|
|
|
|
|
def _base_file_name(self, example_file: str):
|
|
|
|
|
base_name = os.path.splitext(example_file)[0]
|
|
|
|
|
return f"{base_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
|
@@ -196,9 +235,7 @@ class EvalRunner:
|
|
|
|
|
return os.path.join(self._recordings_dir, f"{base_name}.wav")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_example_pipeline(script_path: Path, eval_config: EvalConfig):
|
|
|
|
|
room_url = os.getenv("DAILY_ROOM_URL")
|
|
|
|
|
|
|
|
|
|
async def run_example_pipeline(room_url: str, script_path: Path, eval_config: EvalConfig):
|
|
|
|
|
module = load_module_from_path(script_path)
|
|
|
|
|
|
|
|
|
|
transport = DailyTransport(
|
|
|
|
|
@@ -223,11 +260,13 @@ async def run_eval_pipeline(
|
|
|
|
|
eval_runner: EvalRunner,
|
|
|
|
|
example_file: str,
|
|
|
|
|
eval_config: EvalConfig,
|
|
|
|
|
*,
|
|
|
|
|
room_url: str,
|
|
|
|
|
function_assert_eval,
|
|
|
|
|
assert_eval,
|
|
|
|
|
):
|
|
|
|
|
logger.info(f"Starting eval bot")
|
|
|
|
|
|
|
|
|
|
room_url = os.getenv("DAILY_ROOM_URL")
|
|
|
|
|
|
|
|
|
|
transport = DailyTransport(
|
|
|
|
|
room_url,
|
|
|
|
|
None,
|
|
|
|
|
@@ -306,7 +345,7 @@ async def run_eval_pipeline(
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
llm.register_function("eval_function", eval_runner.function_assert_eval)
|
|
|
|
|
llm.register_function("eval_function", function_assert_eval)
|
|
|
|
|
|
|
|
|
|
context = LLMContext(tools=tools)
|
|
|
|
|
context_aggregator = LLMContextAggregatorPair(
|
|
|
|
|
@@ -374,9 +413,9 @@ async def run_eval_pipeline(
|
|
|
|
|
@task.event_handler("on_pipeline_finished")
|
|
|
|
|
async def on_pipeline_finished(task, frame):
|
|
|
|
|
if isinstance(frame, EndFrame):
|
|
|
|
|
await eval_runner.assert_eval(frame.reason)
|
|
|
|
|
await assert_eval(frame.reason)
|
|
|
|
|
elif isinstance(frame, CancelFrame):
|
|
|
|
|
await eval_runner.assert_eval(False)
|
|
|
|
|
await assert_eval(False)
|
|
|
|
|
|
|
|
|
|
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
|
|
|
|
|
# eval and the example.
|
|
|
|
|
|