Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
644babe241 scripts(evals): run evals in parallel 2026-03-27 15:37:01 -07:00
2 changed files with 82 additions and 29 deletions

View File

@@ -16,6 +16,7 @@ from pathlib import Path
from typing import Any, List, Optional, Tuple
import aiofiles
import aiohttp
from loguru import logger
from PIL.ImageFile import ImageFile
from utils import (
@@ -53,6 +54,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyParams, DailyTransport
from pipecat.transports.daily.utils import DailyRESTHelper, DailyRoomObject, DailyRoomParams
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -87,7 +89,6 @@ class EvalRunner:
self._log_level = log_level
self._total_success = 0
self._tests: List[EvalResult] = []
self._result_future: Optional[asyncio.Future[bool]] = None
# We to save runner files.
name = name or f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
@@ -97,17 +98,6 @@ class EvalRunner:
os.makedirs(self._logs_dir, exist_ok=True)
os.makedirs(self._recordings_dir, exist_ok=True)
async def function_assert_eval(self, params: FunctionCallParams):
result = params.arguments["result"]
reasoning = params.arguments["reasoning"]
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
await params.result_callback(None)
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
async def assert_eval(self, result: bool):
if self._result_future:
self._result_future.set_result(result)
async def run_eval(
self,
example_file: str,
@@ -116,23 +106,44 @@ class EvalRunner:
if not re.match(self._pattern, example_file):
return
print(f"\n🚀 Launching {example_file} ...\n")
# Store logs
filename = self._log_file_name(example_file)
log_file_id = logger.add(filename, level=self._log_level)
print_begin_test(example_file)
# Create Daily Room
logger.info("Creating Daily room...")
async with aiohttp.ClientSession() as aiohttp_session:
helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
)
room = await helper.create_room(params=DailyRoomParams())
logger.info(f"Created Daily room {room.url}")
script_path = self._examples_dir / example_file
start_time = time.time()
# Create a future to store the eval result.
self._result_future = asyncio.get_running_loop().create_future()
# Create per-eval future and callbacks so multiple evals can run concurrently.
result_future = asyncio.get_running_loop().create_future()
function_assert_eval, assert_eval = self._create_eval_callbacks(result_future)
try:
tasks = [
asyncio.create_task(run_example_pipeline(script_path, eval_config)),
asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)),
asyncio.create_task(run_example_pipeline(room.url, script_path, eval_config)),
asyncio.create_task(
run_eval_pipeline(
self,
example_file,
eval_config,
room_url=room.url,
function_assert_eval=function_assert_eval,
assert_eval=assert_eval,
)
),
]
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
if pending:
@@ -149,7 +160,7 @@ class EvalRunner:
try:
# Wait for the future to resolve.
result = await asyncio.wait_for(self._result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
result = await asyncio.wait_for(result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"ERROR: Timeout waiting for eval result.")
result = False
@@ -163,6 +174,15 @@ class EvalRunner:
print_end_test(example_file, result, eval_time)
# Delete Daily Room
logger.info(f"Deleting Daily room {room.url} ...")
async with aiohttp.ClientSession() as aiohttp_session:
helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
)
await helper.delete_room_by_name(room.name)
logger.info(f"Deleted Daily room {room.url}")
logger.remove(log_file_id)
def print_results(self):
@@ -183,6 +203,25 @@ class EvalRunner:
else:
logger.warning(f"There's no audio to save for {name}")
def _create_eval_callbacks(self, result_future: asyncio.Future):
"""Create per-eval callback functions that capture a local result_future.
This allows multiple evals to run concurrently without sharing state.
"""
async def function_assert_eval(params: FunctionCallParams):
result = params.arguments["result"]
reasoning = params.arguments["reasoning"]
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
await params.result_callback(None)
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
async def assert_eval(result: bool):
if not result_future.done():
result_future.set_result(result)
return function_assert_eval, assert_eval
def _base_file_name(self, example_file: str):
base_name = os.path.splitext(example_file)[0]
return f"{base_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
@@ -196,9 +235,7 @@ class EvalRunner:
return os.path.join(self._recordings_dir, f"{base_name}.wav")
async def run_example_pipeline(script_path: Path, eval_config: EvalConfig):
room_url = os.getenv("DAILY_ROOM_URL")
async def run_example_pipeline(room_url: str, script_path: Path, eval_config: EvalConfig):
module = load_module_from_path(script_path)
transport = DailyTransport(
@@ -223,11 +260,13 @@ async def run_eval_pipeline(
eval_runner: EvalRunner,
example_file: str,
eval_config: EvalConfig,
*,
room_url: str,
function_assert_eval,
assert_eval,
):
logger.info(f"Starting eval bot")
room_url = os.getenv("DAILY_ROOM_URL")
transport = DailyTransport(
room_url,
None,
@@ -306,7 +345,7 @@ async def run_eval_pipeline(
),
)
llm.register_function("eval_function", eval_runner.function_assert_eval)
llm.register_function("eval_function", function_assert_eval)
context = LLMContext(tools=tools)
context_aggregator = LLMContextAggregatorPair(
@@ -374,9 +413,9 @@ async def run_eval_pipeline(
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(task, frame):
if isinstance(frame, EndFrame):
await eval_runner.assert_eval(frame.reason)
await assert_eval(frame.reason)
elif isinstance(frame, CancelFrame):
await eval_runner.assert_eval(False)
await assert_eval(False)
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
# eval and the example.

View File

@@ -321,11 +321,18 @@ async def main(args: argparse.Namespace):
log_level=log_level,
)
# Parse test config: (test, prompt, eval, user_speaks_first)
for test_config in TESTS:
test, eval_config = test_config
concurrency = args.concurrency
semaphore = asyncio.Semaphore(concurrency)
await runner.run_eval(test, eval_config)
async def run_with_semaphore(test: str, eval_config: EvalConfig):
async with semaphore:
await runner.run_eval(test, eval_config)
tasks = []
for test, eval_config in TESTS:
tasks.append(asyncio.create_task(run_with_semaphore(test, eval_config)))
await asyncio.gather(*tasks)
runner.print_results()
@@ -333,6 +340,13 @@ async def main(args: argparse.Namespace):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Eval Runner")
parser.add_argument("--audio", "-a", action="store_true", help="Record audio for each test")
parser.add_argument(
"--concurrency",
"-c",
type=int,
default=3,
help="Max number of evals to run concurrently (default: 3)",
)
parser.add_argument("--name", "-n", help="Name for the current runner (e.g. 'v.0.0.68')")
parser.add_argument("--pattern", "-p", help="Only run tests that match the pattern")
parser.add_argument("--verbose", "-v", action="count", default=0)