Files
reasoning-gym/eval/eval.py
2025-07-30 18:11:27 +01:00

933 lines
36 KiB
Python
Executable File

#!/usr/bin/env python
"""
Evaluation script for reasoning gym datasets.
This script evaluates LLM performance on reasoning gym datasets using the OpenRouter API.
Usage:
python eval.py --config config.yaml [options]
Options:
--model MODEL Override model specified in config
--output-dir DIR Override output directory specified in config
--category CATEGORY Evaluate only datasets from this category
--max-concurrent NUM Maximum number of concurrent API calls
--n NUM Number of completions to generate per prompt (default: 1, each completion is a separate API call)
--base-url URL API base URL (default: https://openrouter.ai/api/v1)
--save-metadata Save entry metadata in results
--full-results Save the full results file
--verbose Print detailed model responses
--debug Enable debug logging
--resume DIR Resume evaluation from the specified directory
Environment variables:
OPENROUTER_API_KEY Required API key for OpenRouter
"""
import argparse
import asyncio
import json
import logging
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from eval_config import CategoryConfig, DatasetConfig, EvalConfig
from openai import AsyncOpenAI
from tqdm.asyncio import tqdm_asyncio
import reasoning_gym
from reasoning_gym.utils import extract_answer
class CheckpointManager:
"""Manages checkpoints for resumable evaluation."""
def __init__(self, output_dir: Path):
"""Initialize the checkpoint manager.
Args:
output_dir: Directory where checkpoints and results are stored
"""
self.output_dir = output_dir
self.checkpoint_path = output_dir / "checkpoint.json"
self.completed_datasets = set()
self.previous_category_results = {} # Store previously completed category results
self.load_checkpoint()
def load_checkpoint(self) -> None:
"""Load existing checkpoint and previous results if available."""
# Load checkpoint file
if self.checkpoint_path.exists():
with open(self.checkpoint_path, "r") as f:
checkpoint_data = json.load(f)
self.completed_datasets = set(checkpoint_data.get("completed_datasets", []))
# Load previous category results
for category_dir in self.output_dir.iterdir():
if category_dir.is_dir():
category_name = category_dir.name
self.previous_category_results[category_name] = []
# Load each dataset result file in this category
for dataset_file in category_dir.glob("*.json"):
try:
with open(dataset_file, "r") as f:
dataset_result = json.load(f)
self.previous_category_results[category_name].append(dataset_result)
except Exception as e:
logging.warning(f"Error loading previous result {dataset_file}: {str(e)}")
def is_dataset_completed(self, category: str, dataset: str) -> bool:
"""Check if a dataset has been completed.
Args:
category: Category name
dataset: Dataset name
Returns:
True if the dataset has been completed, False otherwise
"""
return f"{category}/{dataset}" in self.completed_datasets
def mark_dataset_completed(self, category: str, dataset: str) -> None:
"""Mark a dataset as completed and update checkpoint file.
Args:
category: Category name
dataset: Dataset name
"""
self.completed_datasets.add(f"{category}/{dataset}")
self._save_checkpoint()
def get_dataset_result(self, category: str, dataset: str) -> Optional[dict[str, Any]]:
"""Get previously completed dataset result if available.
Args:
category: Category name
dataset: Dataset name
Returns:
Dataset result dict if found, None otherwise
"""
# Try to find the dataset in previously loaded results first
if category in self.previous_category_results:
for dataset_result in self.previous_category_results[category]:
if dataset_result["name"] == dataset:
return dataset_result
# If not found in memory, try to load from file
dataset_path = self.output_dir / category / f"{dataset}.json"
if dataset_path.exists():
try:
with open(dataset_path, "r") as f:
return json.load(f)
except Exception as e:
logging.error(f"Error loading dataset result from {dataset_path}: {str(e)}")
return None
def _save_checkpoint(self) -> None:
"""Save checkpoint to disk."""
checkpoint_data = {
"completed_datasets": list(self.completed_datasets),
"last_updated": datetime.now().isoformat(),
}
with open(self.checkpoint_path, "w") as f:
json.dump(checkpoint_data, f, indent=2)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# httpx logging will be configured in the AsyncModelEvaluator class
# based on the debug flag
def get_git_hash() -> str:
"""Get current git hash for reproducibility."""
cmd = ["git", "rev-parse", "HEAD"]
try:
return subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE).strip()
except Exception:
return "unknown"
class AsyncModelEvaluator:
"""Evaluates models on reasoning datasets with async API calls via OpenRouter."""
def __init__(
self,
config: EvalConfig,
api_key: Optional[str] = None,
base_url: str = "https://openrouter.ai/api/v1",
verbose: bool = False,
debug: bool = False,
timeout: int = 600,
):
"""Initialize the evaluator with configuration.
Args:
config: Evaluation configuration
api_key: API key for the service (optional for some APIs)
base_url: API base URL
verbose: Whether to print detailed model responses
debug: Whether to enable debug logging
"""
self.config = config
self.base_url = base_url
self.verbose = verbose
self.debug = debug
self.timeout = timeout
# Set up logging
self.logger = logging.getLogger("AsyncModelEvaluator")
if debug:
self.logger.setLevel(logging.DEBUG)
# Enable httpx logs in debug mode
logging.getLogger("httpx").setLevel(logging.INFO)
else:
# Suppress httpx logs in normal mode
logging.getLogger("httpx").setLevel(logging.WARNING)
# Set up API client
self.client = AsyncOpenAI(base_url=self.base_url, api_key=api_key, timeout=self.timeout)
# Concurrency control
self.semaphore = asyncio.Semaphore(config.max_concurrent)
# Metadata
self.git_hash = get_git_hash()
self.start_time = datetime.now()
# Checkpoint and resume related attributes
self.resume_dir = None
self.output_dir = None
self.checkpoint_manager = None
def create_output_dir(self) -> Path:
"""Create output directory or use existing one for resuming.
Returns:
Path to the output directory
"""
# Check if we're resuming from a previous run
if self.resume_dir:
output_dir = Path(self.resume_dir)
if not output_dir.exists():
raise ValueError(f"Resume directory {output_dir} does not exist")
self.logger.info(f"Resuming evaluation from {output_dir}")
return output_dir
# Create new output directory
timestamp = self.start_time.strftime("%Y%m%d_%H%M%S")
model_name = self.config.model.replace("/", "_")
if len(self.config.categories) == 1:
# Include category name in the output directory when evaluating a single category
category_name = self.config.categories[0].category
output_dir = Path(self.config.output_dir) / f"{model_name}_{category_name}_{timestamp}"
else:
# Original format for multiple categories
output_dir = Path(self.config.output_dir) / f"{model_name}_{timestamp}"
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def _save_dataset_results(self, category_name: str, dataset_name: str, results: dict[str, Any]) -> None:
"""Save individual dataset results to file.
Args:
category_name: Category name
dataset_name: Dataset name
results: Dataset evaluation results
"""
category_dir = self.output_dir / category_name
category_dir.mkdir(exist_ok=True)
dataset_path = category_dir / f"{dataset_name}.json"
with open(dataset_path, "w") as f:
json.dump(results, f, indent=2)
def _update_partial_summary(self, category_results: list[dict[str, Any]]) -> None:
"""Update partial summary after each category completes.
Args:
category_results: List of category results completed so far
"""
partial_results = {
"metadata": {
"timestamp": self.start_time.isoformat(),
"model": self.config.model,
"provider": self.config.provider,
"git_hash": self.git_hash,
"duration_seconds": (datetime.now() - self.start_time).total_seconds(),
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"top_p": self.config.top_p,
"partial": True,
},
"categories": category_results,
}
# Generate partial summary
partial_results["summary"] = self.generate_summary(partial_results)
# Save partial summary
summary_path = self.output_dir / "summary.json"
with open(summary_path, "w") as f:
json.dump(partial_results["summary"], f, indent=2)
async def get_single_response(self, prompt: str) -> str:
"""Get a single response from model with retry logic via OpenRouter.
Args:
prompt: The prompt to send to the model
Returns:
The model's response text
Raises:
Exception: If all retries fail
"""
max_retries = 10
base_delay = 1.0
max_delay = 60.0
backoff_factor = 2.0
for attempt in range(max_retries):
try:
async with self.semaphore:
# Prepare API call parameters
params = {
"model": self.config.model,
"messages": [
{"role": self.config.system_role, "content": self.config.get_system_prompt()},
{"role": "user", "content": prompt},
],
}
# Add sampling parameters if specified
if self.config.max_tokens is not None:
params["max_tokens"] = self.config.max_tokens
if self.config.temperature is not None:
params["temperature"] = self.config.temperature
if self.config.top_p is not None:
params["top_p"] = self.config.top_p
# Add provider configuration if specified
if self.config.provider:
params["extra_body"] = {"provider": {"order": [self.config.provider], "allow_fallbacks": False}}
completion = await self.client.chat.completions.create(**params)
response = completion.choices[0].message.content
if self.verbose:
self.logger.info(f"Response: {response}")
return response
except Exception as e:
delay = min(max_delay, base_delay * (backoff_factor**attempt))
self.logger.warning(f"Attempt {attempt+1}/{max_retries} failed: {str(e)}")
self.logger.warning(f"Retrying in {delay:.2f} seconds...")
await asyncio.sleep(delay)
raise Exception(f"Failed to get model response after {max_retries} attempts")
async def get_model_response(self, prompt: str) -> list[str]:
"""Get multiple responses from model by making multiple API calls.
Args:
prompt: The prompt to send to the model
Returns:
A list of model response texts
Raises:
Exception: If all attempts fail
"""
if self.verbose:
self.logger.info(f"Prompt: {prompt}")
self.logger.info(f"Generating {self.config.completions_per_prompt} completions...")
# Create tasks for multiple completions
tasks = []
for i in range(self.config.completions_per_prompt):
tasks.append(self.get_single_response(prompt))
# Execute all tasks concurrently
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions
valid_responses = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
self.logger.error(f"Completion {i+1} failed: {str(response)}")
else:
valid_responses.append(response)
if self.verbose:
self.logger.info(f"Response {len(valid_responses)}: {response}")
if not valid_responses:
raise Exception("All completion attempts failed")
return valid_responses
async def process_entry(
self,
dataset: reasoning_gym.dataset.ProceduralDataset,
entry: dict[str, Any],
entry_index: int,
dataset_name: str,
) -> dict[str, Any]:
"""Process a single dataset entry.
Args:
dataset: The dataset instance
entry: The entry to process
entry_index: Index of the entry in the dataset
dataset_name: Name of the dataset
Returns:
Dict with processing results
"""
responses = None
completion_results = []
best_score = 0.0
total_score = 0.0
best_answer = None
best_response = None
try:
# Get multiple model responses
responses = await self.get_model_response(entry["question"])
# Count total completions for mean score calculation
total_completions = len(responses)
for i, response in enumerate(responses):
try:
# Try to extract answer and score it
model_answer = extract_answer(response)
score = dataset.score_answer(answer=model_answer, entry=entry)
completion_result = {
"model_answer": model_answer,
"full_model_response": response,
"score": score,
}
# Track scores
if score > best_score:
best_score = score
best_answer = model_answer
best_response = response
# If we don't have a best answer yet, use the first non-None answer
elif best_answer is None and model_answer is not None:
best_answer = model_answer
best_response = response
best_score = score
total_score += score
completion_results.append(completion_result)
if self.verbose:
print(f"Question: {entry['question']}")
print(f"Expected: {entry['answer']}")
print(f"Completion {i+1} Answer: {model_answer}")
print(f"Completion {i+1} Score: {score}")
print("-" * 40)
except Exception as e:
self.logger.error(f"Error processing completion {i+1}: {str(e)}")
# Add failed completion with score 0.0 (already counted in total_completions)
completion_results.append(
{
"model_answer": "ERROR",
"full_model_response": response,
"score": 0.0,
"error": str(e),
}
)
# If we have no valid completions, log a warning instead of raising an exception
if not best_answer:
self.logger.warning(
f"Failed to extract a valid answer from model responses for dataset '{dataset_name}', entry index {entry_index}"
)
# Use None instead of empty string as the best answer
best_answer = None
best_response = responses[0] if responses and len(responses) > 0 else None
best_score = 0.0
# Calculate mean score - count all completions including failures
mean_score = total_score / total_completions if total_completions > 0 else 0.0
result = {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"best_model_answer": best_answer,
"best_full_model_response": best_response,
"best_score": best_score,
"mean_score": mean_score,
"completions": completion_results,
}
# Only include metadata if configured to do so
if self.config.save_metadata:
result["metadata"] = entry["metadata"]
return result
except Exception as e:
self.logger.error(f"Error processing entry: {str(e)}")
result = {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"best_model_answer": None,
# First check if we already have a best_response from partial processing
# If not, then fall back to the first response or None
"best_full_model_response": (
best_response
if best_response is not None
else (responses[0] if responses and len(responses) > 0 else None)
),
"best_score": best_score if best_score > 0 else 0.0,
"mean_score": total_score / total_completions if total_completions > 0 else 0.0,
"error": str(e),
"completions": completion_results if "completion_results" in locals() else [],
}
# Only include metadata if configured to do so
if self.config.save_metadata:
result["metadata"] = entry["metadata"]
return result
async def evaluate_dataset(self, category_name: str, dataset_config: DatasetConfig) -> dict[str, Any]:
"""Evaluate a single dataset.
Args:
category_name: Name of the category
dataset_config: Configuration for the dataset
Returns:
Dict with evaluation results
"""
dataset_name = dataset_config.dataset
# Check if this dataset has already been completed
if self.checkpoint_manager.is_dataset_completed(category_name, dataset_name):
# Get the dataset result from checkpoint manager
dataset_result = self.checkpoint_manager.get_dataset_result(category_name, dataset_name)
if dataset_result:
self.logger.info(f"Skipping already completed dataset: {dataset_name}")
return dataset_result
# If we can't load the result, we'll need to re-evaluate the dataset
self.logger.info(f"Re-evaluating dataset: {dataset_name}")
# Remove from completed datasets so it will be processed
self.checkpoint_manager.completed_datasets.discard(f"{category_name}/{dataset_name}")
self.logger.info(f"Evaluating dataset: {dataset_name}")
try:
# Create dataset with all parameters
dataset_params = {}
# Add all parameters from the config params dictionary
# Make sure we don't have a nested 'params' dictionary
for k, v in dataset_config.params.items():
if k != "params":
dataset_params[k] = v
elif isinstance(v, dict):
# If there's a nested params dict, flatten it
dataset_params.update(v)
# Add size and seed if they're not None
if dataset_config.size is not None:
dataset_params["size"] = dataset_config.size
if dataset_config.seed is not None:
dataset_params["seed"] = dataset_config.seed
dataset = reasoning_gym.create_dataset(dataset_name, **dataset_params)
# Get all entries
all_entries = list(dataset)
# Process entries with progress bar, passing the entry index and dataset name
tasks = [self.process_entry(dataset, entry, idx, dataset_name) for idx, entry in enumerate(all_entries)]
results = await tqdm_asyncio.gather(*tasks, desc=f"Processing {dataset_name}", leave=True)
# Calculate metrics
total_best_score = sum(r["best_score"] for r in results)
total_mean_score = sum(r["mean_score"] for r in results)
average_best_score = total_best_score / len(results) if results else 0
average_mean_score = total_mean_score / len(results) if results else 0
dataset_results = {
"name": dataset_name,
"category": category_name,
"average_best_score": average_best_score,
"average_mean_score": average_mean_score,
"total_examples": len(results),
"config": {"size": dataset_config.size, "seed": dataset_config.seed, **dataset_config.params},
"system_prompt": self.config.get_system_prompt(),
"completions_per_prompt": self.config.completions_per_prompt,
"results": results,
}
# Mark dataset as completed and save results
self.checkpoint_manager.mark_dataset_completed(category_name, dataset_name)
self._save_dataset_results(category_name, dataset_name, dataset_results)
return dataset_results
except Exception as e:
self.logger.error(f"Error evaluating dataset {dataset_name}: {str(e)}")
return {
"name": dataset_name,
"category": category_name,
"average_best_score": 0.0,
"average_mean_score": 0.0,
"total_examples": 0,
"config": {"size": dataset_config.size, "seed": dataset_config.seed, **dataset_config.params},
"system_prompt": self.config.get_system_prompt(),
"error": str(e),
"results": [],
}
async def evaluate_category(self, category_config: CategoryConfig) -> dict[str, Any]:
"""Evaluate all datasets in a category.
Args:
category_config: Configuration for the category
Returns:
Dict with category evaluation results
"""
category_name = category_config.category
self.logger.info(f"Evaluating category: {category_name}")
# Check if all datasets in this category are already completed
all_completed = True
for dataset_config in category_config.datasets:
if not self.checkpoint_manager.is_dataset_completed(category_name, dataset_config.dataset):
all_completed = False
break
# If all datasets are completed and we have previous results, use them
if all_completed and category_name in self.checkpoint_manager.previous_category_results:
self.logger.info(f"Using previously completed results for category: {category_name}")
return {
"name": category_name,
"datasets": self.checkpoint_manager.previous_category_results[category_name],
}
# Process datasets sequentially to ensure proper checkpointing
dataset_results = []
for dataset_config in category_config.datasets:
result = await self.evaluate_dataset(category_name, dataset_config)
dataset_results.append(result)
return {
"name": category_name,
"datasets": dataset_results,
}
async def evaluate_all(self) -> dict[str, Any]:
"""Evaluate all categories and datasets, resuming from checkpoint if available.
Returns:
Dict with all evaluation results and summary
"""
self.logger.info(f"Starting evaluation of {len(self.config.categories)} categories")
# Initialize output directory and checkpoint manager
self.output_dir = self.create_output_dir()
self.checkpoint_manager = CheckpointManager(self.output_dir)
# Process each category sequentially to ensure proper checkpointing
category_results = []
tasks = [asyncio.create_task(self.evaluate_category(category)) for category in self.config.categories]
for finished_task in asyncio.as_completed(tasks):
result = await finished_task
category_results.append(result)
self._update_partial_summary(category_results)
# Generate results structure
results = {
"metadata": {
"timestamp": self.start_time.isoformat(),
"model": self.config.model,
"provider": self.config.provider,
"git_hash": self.git_hash,
"duration_seconds": (datetime.now() - self.start_time).total_seconds(),
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"top_p": self.config.top_p,
"partial": False, # Mark as complete
},
"categories": category_results,
}
# Generate summary
results["summary"] = self.generate_summary(results)
return results
def generate_summary(self, results: dict[str, Any]) -> dict[str, Any]:
"""Generate a summary of evaluation results in the original configuration order.
Args:
results: The full evaluation results
Returns:
Dict with summary information
"""
summary = {
"total_datasets": 0,
"total_examples": 0,
"dataset_best_scores": {},
"dataset_mean_scores": {},
}
# Iterate through categories and datasets in the original order from config
for category_config in self.config.categories:
for dataset_config in category_config.datasets:
dataset_name = dataset_config.dataset
dataset_found = False
# Find corresponding results
for category in results["categories"]:
if category["name"] == category_config.category:
for dataset in category["datasets"]:
if dataset["name"] == dataset_name:
# Add to summary in original order
summary["dataset_best_scores"][dataset_name] = dataset["average_best_score"]
summary["dataset_mean_scores"][dataset_name] = dataset["average_mean_score"]
summary["total_datasets"] += 1
summary["total_examples"] += dataset["total_examples"]
dataset_found = True
break
# If dataset wasn't found in results (error), add with score 0
if not dataset_found:
summary["dataset_best_scores"][dataset_name] = 0.0
summary["dataset_mean_scores"][dataset_name] = 0.0
summary["total_datasets"] += 1
return summary
def save_results(self, results: dict[str, Any]) -> tuple[str, str]:
"""Save evaluation results to files.
Args:
results: The evaluation results to save
Returns:
Tuple of (results_path, summary_path)
"""
# Output directory is already created during evaluation
results_path = None
# Save full results if configured to do so
if self.config.save_full_results:
results_path = self.output_dir / "results.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2)
# Add timestamp, git hash, model, provider, sampling parameters, and duration to summary
summary_data = results["summary"].copy()
summary_data["timestamp"] = self.start_time.isoformat()
summary_data["git_hash"] = self.git_hash
summary_data["model"] = self.config.model
summary_data["provider"] = self.config.provider
summary_data["system_prompt"] = self.config.get_system_prompt()
if self.config.system_prompt_id:
summary_data["system_prompt_id"] = self.config.system_prompt_id
summary_data["max_tokens"] = self.config.max_tokens
summary_data["temperature"] = self.config.temperature
summary_data["top_p"] = self.config.top_p
summary_data["completions_per_prompt"] = self.config.completions_per_prompt
summary_data["duration_seconds"] = results["metadata"]["duration_seconds"]
summary_data["partial"] = False # Mark as complete
# Save summary
summary_path = self.output_dir / "summary.json"
with open(summary_path, "w") as f:
json.dump(summary_data, f, indent=2)
# Individual dataset results are already saved during evaluation
return str(results_path) if results_path else None, str(summary_path)
def print_summary(self, results: dict[str, Any]) -> None:
"""Print a summary of evaluation results to the console.
Args:
results: The evaluation results
"""
summary = results["summary"]
print("\nEvaluation Summary:")
print("------------------")
print(f"Model: {self.config.model}")
print(f"Provider: {self.config.provider}")
system_prompt = self.config.get_system_prompt()
print(f"System Prompt: {system_prompt[:50]}..." if len(system_prompt) > 50 else system_prompt)
print(f"Max Tokens: {self.config.max_tokens}")
print(f"Temperature: {self.config.temperature}")
print(f"Top-p: {self.config.top_p}")
print(f"Completions per prompt: {self.config.completions_per_prompt}")
print(f"Git Hash: {self.git_hash}")
print(f"Duration: {results['metadata']['duration_seconds']:.2f} seconds")
print()
print("Dataset Scores (in configuration order):")
print(" Dataset Name Best Score Mean Score Examples")
print(" ------------------------------------------------------------------")
for dataset_name in summary["dataset_best_scores"].keys():
best_score = summary["dataset_best_scores"][dataset_name]
mean_score = summary["dataset_mean_scores"][dataset_name]
# Find the number of examples for this dataset
examples = 0
for category in results["categories"]:
for dataset in category["datasets"]:
if dataset["name"] == dataset_name:
examples = dataset["total_examples"]
break
# Use fixed-width formatting for better alignment
print(f" {dataset_name:<30} {best_score:>8.1%} {mean_score:>8.1%} {examples:>8}")
print()
print(f"Total datasets: {summary['total_datasets']}")
print(f"Total examples: {summary['total_examples']}")
async def main_async():
"""Main async function."""
parser = argparse.ArgumentParser(description="Evaluate models on reasoning datasets")
parser.add_argument("--config", required=True, help="Path to configuration file (YAML or JSON)")
parser.add_argument("--model", help="Override model specified in config")
parser.add_argument("--output-dir", help="Override output directory specified in config")
parser.add_argument("--category", help="Evaluate only datasets from this category")
parser.add_argument("--max-concurrent", type=int, help="Maximum number of concurrent API calls")
parser.add_argument("--n", type=int, default=1, help="Number of completions to generate per prompt")
parser.add_argument("--base-url", default="https://openrouter.ai/api/v1", help="API base URL")
parser.add_argument(
"--api-key",
help="API key for the service (optional for some APIs, defaults to OPENROUTER_API_KEY env var for OpenRouter URLs)",
)
parser.add_argument("--save-metadata", action="store_true", help="Save entry metadata in results")
parser.add_argument("--full-results", action="store_true", help="Save the full results file")
parser.add_argument("--verbose", action="store_true", help="Print detailed model responses")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument("--resume", help="Resume evaluation from the specified directory")
parser.add_argument("--timeout", type=int, default=600, help="Time in seconds for timeout of a single request")
args = parser.parse_args()
# Get API key from command line or environment variable
api_key = args.api_key
if api_key is None:
# If base_url is OpenRouter, try to get API key from environment
if args.base_url.startswith("https://openrouter.ai/api"):
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
print("Warning: OPENROUTER_API_KEY environment variable is not set")
print("Please set it using: export OPENROUTER_API_KEY=your-api-key")
print("Or provide it directly with --api-key")
return 1
# Load configuration
config_path = args.config
if config_path.endswith(".yaml") or config_path.endswith(".yml"):
config = EvalConfig.from_yaml(config_path)
elif config_path.endswith(".json"):
config = EvalConfig.from_json(config_path)
else:
print("Error: Configuration file must be YAML or JSON")
return 1
# Apply command line overrides
if args.model:
config.model = args.model
if args.output_dir:
config.output_dir = args.output_dir
if args.max_concurrent:
config.max_concurrent = args.max_concurrent
if args.n:
config.completions_per_prompt = args.n
if args.save_metadata:
config.save_metadata = True
if args.full_results:
config.save_full_results = True
# Filter categories if --category is specified
if args.category:
# Keep only the specified category
filtered_categories = [cat for cat in config.categories if cat.category == args.category]
if not filtered_categories:
print(f"Error: Category '{args.category}' not found in configuration")
return 1
config.categories = filtered_categories
# Create evaluator
evaluator = AsyncModelEvaluator(
config=config,
api_key=api_key,
base_url=args.base_url,
verbose=args.verbose,
debug=args.debug,
timeout=args.timeout,
)
# Set resume directory if specified
if args.resume:
evaluator.resume_dir = args.resume
# Run evaluation
try:
results = await evaluator.evaluate_all()
# Save and print results
results_path, summary_path = evaluator.save_results(results)
evaluator.print_summary(results)
if results_path:
print(f"\nResults saved to: {results_path}")
print(f"Summary saved to: {summary_path}")
return 0
except Exception as e:
print(f"Error during evaluation: {str(e)}")
if args.debug:
import traceback
traceback.print_exc()
return 1
def main():
"""Entry point."""
exit_code = asyncio.run(main_async())
sys.exit(exit_code)
if __name__ == "__main__":
main()