Files
exo/.github/bench.py
2024-12-16 19:50:19 +00:00

402 lines
16 KiB
Python

import aiohttp
import asyncio
import time
import json
import os
import boto3
from typing import Dict, Any
from datetime import datetime
import subprocess
import psutil
import platform
from pathlib import Path
def check_system_state():
print("\n=== System State Check ===", flush=True)
# Add macOS-specific checks
try:
# Check powermetrics with sudo
try:
power_metrics = subprocess.run(
['sudo', 'powermetrics', '-n', '1', '-i', '1000', '--samplers', 'cpu_power'],
capture_output=True, text=True
)
print("\nPower Metrics:", power_metrics.stdout, flush=True)
except Exception as e:
print(f"Error getting power metrics: {e}", flush=True)
# Check thermal state
thermal_state = subprocess.run(['pmset', '-g', 'therm'], capture_output=True, text=True)
print("\nThermal State:", thermal_state.stdout, flush=True)
# Check if running under Rosetta
arch = subprocess.run(['arch'], capture_output=True, text=True)
print("\nArchitecture:", arch.stdout, flush=True)
# Check MLX compilation mode - only if mlx is available
try:
import mlx.core as mx
if hasattr(mx, 'build_info'):
print("\nMLX Build Info:", mx.build_info(), flush=True)
else:
print("\nMLX Build Info: Not available in this version", flush=True)
except ImportError:
print("\nMLX: Not installed", flush=True)
except Exception as e:
print(f"\nError checking MLX: {e}", flush=True)
except Exception as e:
print(f"Error in macOS checks: {e}", flush=True)
# CPU Info
print("\nCPU Information:", flush=True)
try:
if platform.system() == 'Darwin' and platform.processor() == 'arm':
# Use sysctl for Apple Silicon Macs
cpu_info = subprocess.run(['sysctl', 'machdep.cpu'], capture_output=True, text=True)
if cpu_info.returncode == 0:
print(f"CPU Info (Apple Silicon):", cpu_info.stdout, flush=True)
# Parse powermetrics output for clearer CPU frequency display
try:
power_metrics = subprocess.run(
['sudo', 'powermetrics', '-n', '1', '-i', '100', '--samplers', 'cpu_power'],
capture_output=True, text=True
)
if power_metrics.returncode == 0:
output = power_metrics.stdout
print("\nDetailed CPU Frequency Information:")
# Extract cluster frequencies and max frequencies
current_cluster = None
max_freqs = {'E': 0, 'P0': 0, 'P1': 0}
for line in output.split('\n'):
# Track which cluster we're processing
if "E-Cluster" in line:
current_cluster = 'E'
elif "P0-Cluster" in line:
current_cluster = 'P0'
elif "P1-Cluster" in line:
current_cluster = 'P1'
# Get current frequencies
if "HW active frequency:" in line:
freq = line.split(':')[1].strip()
if freq != "0 MHz":
print(f"Current {current_cluster}-Cluster Frequency: {freq}")
# Get max frequencies from residency lines
if current_cluster and "active residency:" in line and "MHz:" in line:
try:
# Extract all frequency values
freqs = []
parts = line.split('MHz:')[:-1] # Skip last part as it's not a frequency
for part in parts:
freq_str = part.split()[-1]
try:
freq = float(freq_str)
freqs.append(freq)
except ValueError:
continue
if freqs:
max_freqs[current_cluster] = max(max_freqs[current_cluster], max(freqs))
except Exception:
continue
# Print max frequencies
print("\nMaximum Available Frequencies:")
for cluster, max_freq in max_freqs.items():
if max_freq > 0:
print(f"{cluster}-Cluster Max: {max_freq:.0f} MHz")
except Exception as e:
print(f"Error parsing powermetrics: {e}", flush=True)
else:
# Use psutil for other systems
cpu_freq = psutil.cpu_freq()
print(f"CPU Frequency - Current: {cpu_freq.current:.2f}MHz, Min: {cpu_freq.min:.2f}MHz, Max: {cpu_freq.max:.2f}MHz", flush=True)
print(f"\nCPU Usage per Core: {psutil.cpu_percent(percpu=True)}%", flush=True)
# Check if running in low power mode
power_mode = subprocess.run(['pmset', '-g'], capture_output=True, text=True)
print("\nPower Settings:", power_mode.stdout, flush=True)
except Exception as e:
print(f"Error getting CPU info: {e}", flush=True)
# Memory Info
print("\nMemory Information:", flush=True)
try:
mem = psutil.virtual_memory()
print(f"Total: {mem.total/1024/1024/1024:.2f}GB", flush=True)
print(f"Available: {mem.available/1024/1024/1024:.2f}GB", flush=True)
print(f"Used: {mem.used/1024/1024/1024:.2f}GB ({mem.percent}%)", flush=True)
# Check swap
swap = psutil.swap_memory()
print(f"Swap Used: {swap.used/1024/1024/1024:.2f}GB of {swap.total/1024/1024/1024:.2f}GB", flush=True)
except Exception as e:
print(f"Error getting memory info: {e}", flush=True)
# GPU Info
print("\nGPU Information:", flush=True)
try:
# Check MLX GPU settings
print("MLX Environment Variables:", flush=True)
mlx_vars = {k: v for k, v in os.environ.items() if k.startswith('MLX')}
print(json.dumps(mlx_vars, indent=2), flush=True)
# Check Metal GPU memory allocation
gpu_mem = subprocess.run(['sysctl', 'iogpu'], capture_output=True, text=True)
print("GPU Memory Settings:", gpu_mem.stdout, flush=True)
except Exception as e:
print(f"Error getting GPU info: {e}", flush=True)
# Process Priority
print("\nProcess Priority Information:", flush=True)
try:
current_process = psutil.Process()
print(f"Process Nice Value: {current_process.nice()}", flush=True)
# Only try to get ionice if the platform supports it
if hasattr(current_process, 'ionice'):
print(f"Process IO Nice Value: {current_process.ionice()}", flush=True)
except Exception as e:
print(f"Error getting process priority info: {e}", flush=True)
# System Load
print("\nSystem Load:", flush=True)
try:
load_avg = psutil.getloadavg()
print(f"Load Average: {load_avg}", flush=True)
# Get top processes by CPU and Memory
print("\nTop Processes by CPU Usage:", flush=True)
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
pinfo = proc.info
if pinfo['cpu_percent'] is not None and pinfo['memory_percent'] is not None:
processes.append(pinfo)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Sort and display top 5 CPU-consuming processes
sorted_by_cpu = sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:5]
for proc in sorted_by_cpu:
print(f"PID: {proc['pid']}, Name: {proc['name']}, CPU: {proc['cpu_percent']}%, Memory: {proc['memory_percent']:.1f}%")
except Exception as e:
print(f"Error getting system load info: {e}", flush=True)
print("\n=== End System State Check ===\n", flush=True)
def check_gpu_access():
try:
# Check if MLX can see the GPU
import mlx.core as mx
print("MLX device info:", mx.default_device())
# Check Metal device availability
result = subprocess.run(['system_profiler', 'SPDisplaysDataType'], capture_output=True, text=True)
print("GPU Info:", result.stdout)
except Exception as e:
print(f"Failed to check GPU access: {e}")
async def measure_performance(api_endpoint: str, prompt: str, model: str) -> Dict[str, Any]:
"""
Measures the performance of an API endpoint by sending a prompt and recording metrics.
Args:
api_endpoint (str): The API endpoint URL.
prompt (str): The prompt to send to the API.
Returns:
Dict[str, Any]: A dictionary containing performance metrics or error information.
"""
results = {
'model': model,
'run_id': os.environ.get('GITHUB_RUN_ID', 'unknown'),
'branch': os.environ.get('GITHUB_REF_NAME', 'unknown'),
'commit': os.environ.get('GITHUB_SHA', 'unknown'),
'configuration': json.loads(os.environ.get('HARDWARE_CONFIG', '{}'))
}
# Get token count
session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600, connect=10, sock_read=600, sock_connect=10))
try:
response = await session.post(
"http://localhost:52415/v1/chat/token/encode",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}]
}
)
response.raise_for_status()
token_data = await response.json()
results['prompt_len'] = token_data['num_tokens']
except Exception as e:
await session.close()
raise RuntimeError(f"Failed to get token count: {str(e)}")
# Measure completion performance
try:
start_time = time.time()
response = await session.post(
api_endpoint,
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0,
"stream": True
}
)
response.raise_for_status()
first_token_time = None
total_tokens = 0
async for line in response.content.iter_chunks():
line = line[0].decode('utf-8').strip()
if not line.startswith('data: '):
continue
data = json.loads(line[6:]) # Skip 'data: ' prefix
if content := data.get('choices', [{}])[0].get('delta', {}).get('content'):
print(f"Received content: {content}", flush=True)
if first_token_time is None:
first_token_time = time.time()
ttft = first_token_time - start_time
results.update({
'ttft': ttft,
'prompt_tps': results['prompt_len'] / ttft
})
total_tokens += 1
total_time = time.time() - start_time
results.update({
'generation_tps': total_tokens / total_time,
'response_len': total_tokens,
'total_time': total_time
})
except Exception as e:
raise RuntimeError(f"Performance measurement failed: {str(e)}")
finally:
await session.close()
return results
async def main() -> None:
api_endpoint = "http://localhost:52415/v1/chat/completions"
# Define prompts
prompt_warmup = "what is the capital of France?"
prompt_essay = "write an essay about cats"
model = os.environ.get('model', 'llama-3.2-1b')
# Warmup request
print("\nPerforming warmup request...", flush=True)
try:
warmup_results = await measure_performance(api_endpoint, prompt_warmup, model)
print("Warmup completed successfully", flush=True)
except Exception as e:
print(f"Warmup request failed: {e}", flush=True)
# Measure performance for the essay prompt
print("\nMeasuring performance for the essay prompt...", flush=True)
results = await measure_performance(api_endpoint, prompt_essay, model)
try:
s3_client = boto3.client(
's3',
aws_access_key_id=os.environ.get('aws_access_key_id'),
aws_secret_access_key=os.environ.get('aws_secret_key')
)
job_name = os.environ.get('GITHUB_JOB')
# Create S3 key with timestamp and commit info
now = datetime.utcnow()
timestamp = now.strftime('%H-%M-%S')
commit_sha = os.environ.get('GITHUB_SHA', 'unknown')[:7]
s3_key = f"{job_name}/{model}/{now.year}/{now.month}/{now.day}/{timestamp}_{commit_sha}.json"
# Upload to S3
s3_client.put_object(
Bucket='exo-benchmarks',
Key=s3_key,
Body=json.dumps(results),
ContentType='application/json'
)
print(f"Performance metrics uploaded to S3: s3://exo-benchmarks/{s3_key}", flush=True)
except Exception as e:
print(f"Failed to upload metrics to S3: {e}", flush=True)
# Optionally print the metrics for visibility
print("Performance metrics:", flush=True)
print(json.dumps(results, indent=4), flush=True)
def optimize_system_performance():
"""Set optimal system performance settings before running benchmark."""
try:
# Try to set high performance power mode
subprocess.run(['sudo', 'pmset', '-a', 'powermode', '2'], check=False)
# Ensure MLX uses performance cores and GPU
os.environ['MLX_FORCE_P_CORES'] = '1'
os.environ['MLX_METAL_PREWARM'] = '1'
os.environ['MLX_USE_GPU'] = '1'
# Set process priority
current_process = psutil.Process()
try:
# Set highest priority
subprocess.run(['sudo', 'renice', '-n', '-20', '-p', str(current_process.pid)], check=False)
# Print current process state
print("\nProcess State Before Benchmark:", flush=True)
proc_info = subprocess.run(
['ps', '-o', 'pid,ppid,user,%cpu,%mem,nice,stat,pri,command', '-p', str(current_process.pid)],
capture_output=True, text=True
)
print(proc_info.stdout, flush=True)
# Verify power mode
power_info = subprocess.run(['pmset', '-g'], capture_output=True, text=True)
if 'powermode 0' in power_info.stdout:
print("\nWarning: System still in normal power mode. Trying to set high performance mode again...", flush=True)
subprocess.run(['sudo', 'pmset', '-a', 'powermode', '2'], check=False)
except Exception as e:
print(f"Warning: Could not set process priority: {e}", flush=True)
except Exception as e:
print(f"Warning: Could not optimize system performance: {e}", flush=True)
# Print optimization status
print("\nOptimization Settings:", flush=True)
print("MLX Environment Variables:", flush=True)
for var in ['MLX_FORCE_P_CORES', 'MLX_METAL_PREWARM', 'MLX_USE_GPU']:
print(f"{var}: {os.environ.get(var, 'Not set')}", flush=True)
try:
nice_value = psutil.Process().nice()
print(f"Process Nice Value: {nice_value}", flush=True)
if nice_value != -20:
print("Warning: Process not running at highest priority", flush=True)
except Exception:
pass
if __name__ == "__main__":
check_system_state()
check_gpu_access()
optimize_system_performance()
asyncio.run(main())