dashboard async, clickable datapoints that go to commit url

This commit is contained in:
Alex Cheema
2024-11-24 23:38:57 +04:00
parent 8c6a6fabdf
commit b000d23b2a

View File

@@ -1,14 +1,15 @@
import os
import json
import logging
import requests
import asyncio
import aiohttp
import pandas as pd
import plotly.express as px
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
class CircleCIClient:
class AsyncCircleCIClient:
def __init__(self, token: str, project_slug: str):
self.token = token
self.project_slug = project_slug
@@ -19,66 +20,57 @@ class CircleCIClient:
}
self.logger = logging.getLogger("CircleCI")
def get_recent_pipelines(self, limit: int = 25) -> List[Dict]:
async def get_json(self, session: aiohttp.ClientSession, url: str, params: Dict = None) -> Dict:
async with session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
async def get_recent_pipelines(self, session: aiohttp.ClientSession, limit: int = 50) -> List[Dict]:
self.logger.info(f"Fetching {limit} recent pipelines...")
url = f"{self.base_url}/project/{self.project_slug}/pipeline"
params = {"limit": limit * 2} # Fetch extra to account for failed builds
params = {"limit": limit * 2}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = await self.get_json(session, url, params)
pipelines = [
p for p in data["items"]
if p["state"] == "created"
and p.get("trigger_parameters", {}).get("git", {}).get("branch") == "main"
][:limit]
pipelines = [p for p in response.json()["items"] if p["state"] == "created"]
pipelines = pipelines[:limit]
self.logger.info(f"Found {len(pipelines)} successful pipelines")
self.logger.info(f"Found {len(pipelines)} successful main branch pipelines")
return pipelines
# Fetch additional data for each pipeline
detailed_pipelines = []
for pipeline in pipelines:
try:
url = f"{self.base_url}/pipeline/{pipeline['id']}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
detailed_pipelines.append(response.json())
except Exception as e:
self.logger.warning(f"Could not fetch details for pipeline {pipeline['id']}: {e}")
continue
return detailed_pipelines
def get_workflow_jobs(self, pipeline_id: str) -> List[Dict]:
async def get_workflow_jobs(self, session: aiohttp.ClientSession, pipeline_id: str) -> List[Dict]:
self.logger.debug(f"Fetching workflows for pipeline {pipeline_id}")
url = f"{self.base_url}/pipeline/{pipeline_id}/workflow"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
workflows = response.json()["items"]
workflows_data = await self.get_json(session, url)
workflows = workflows_data["items"]
jobs = []
# Fetch all jobs for all workflows in parallel
jobs_tasks = []
for workflow in workflows:
self.logger.debug(f"Fetching jobs for workflow {workflow['id']}")
url = f"{self.base_url}/workflow/{workflow['id']}/job"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
jobs.extend(response.json()["items"])
jobs_tasks.append(self.get_json(session, url))
return jobs
jobs_responses = await asyncio.gather(*jobs_tasks, return_exceptions=True)
def get_artifacts(self, job_number: str) -> List[Dict]:
self.logger.debug(f"Fetching artifacts for job {job_number}")
all_jobs = []
for jobs_data in jobs_responses:
if isinstance(jobs_data, Exception):
continue
all_jobs.extend(jobs_data["items"])
return all_jobs
async def get_artifacts(self, session: aiohttp.ClientSession, job_number: str) -> List[Dict]:
url = f"{self.base_url}/project/{self.project_slug}/{job_number}/artifacts"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()["items"]
def download_artifact(self, artifact_url: str) -> Dict:
self.logger.debug(f"Downloading artifact from {artifact_url}")
response = requests.get(artifact_url, headers=self.headers)
response.raise_for_status()
return response.json()
data = await self.get_json(session, url)
return data["items"]
class PackageSizeTracker:
def __init__(self, token: str, project_slug: str, debug: bool = False):
self.setup_logging(debug)
self.client = CircleCIClient(token, project_slug)
self.client = AsyncCircleCIClient(token, project_slug)
self.logger = logging.getLogger("PackageSizeTracker")
def setup_logging(self, debug: bool):
@@ -89,80 +81,92 @@ class PackageSizeTracker:
datefmt='%H:%M:%S'
)
def extract_commit_info(self, pipeline: Dict) -> Optional[str]:
"""Extract commit hash from pipeline data structure"""
def extract_commit_info(self, pipeline: Dict) -> Optional[Dict]:
try:
# Try to get commit hash from trigger parameters
if 'trigger_parameters' in pipeline:
github_app = pipeline['trigger_parameters'].get('github_app', {})
if github_app:
return github_app.get('commit_sha')
return {
'commit_hash': github_app.get('checkout_sha'),
'web_url': f"{github_app.get('repo_url')}/commit/{github_app.get('checkout_sha')}"
}
# Fallback to git parameters if github_app is not available
git_params = pipeline['trigger_parameters'].get('git', {})
if git_params:
return git_params.get('checkout_sha')
return {
'commit_hash': git_params.get('checkout_sha'),
'web_url': f"{git_params.get('repo_url')}/commit/{git_params.get('checkout_sha')}"
}
self.logger.warning(f"Could not find commit hash in pipeline {pipeline['id']}")
self.logger.warning(f"Could not find commit info in pipeline {pipeline['id']}")
return None
except Exception as e:
self.logger.error(f"Error extracting commit info: {str(e)}")
return None
def collect_data(self) -> List[Dict]:
async def process_pipeline(self, session: aiohttp.ClientSession, pipeline: Dict) -> Optional[Dict]:
try:
commit_info = self.extract_commit_info(pipeline)
if not commit_info:
return None
jobs = await self.client.get_workflow_jobs(session, pipeline["id"])
size_job = next(
(j for j in jobs if j["name"] == "measure_pip_sizes" and j["status"] == "success"),
None
)
if not size_job:
self.logger.debug(f"No measure_pip_sizes job found for pipeline {pipeline['id']}")
return None
artifacts = await self.client.get_artifacts(session, size_job["job_number"])
size_report = next(
(a for a in artifacts if a["path"].endswith("pip-sizes.json")),
None
)
if not size_report:
self.logger.debug(f"No pip-sizes.json artifact found for job {size_job['job_number']}")
return None
json_data = await self.client.get_json(session, size_report["url"])
data_point = {
"commit_hash": commit_info['commit_hash'],
"commit_url": commit_info['web_url'],
"timestamp": pipeline.get("created_at", pipeline.get("updated_at")),
"total_size_mb": json_data["total_size_mb"],
"packages": json_data["packages"]
}
self.logger.info(
f"Processed pipeline {pipeline['id']}: "
f"commit {commit_info['commit_hash'][:7]}, "
f"size {json_data['total_size_mb']:.2f}MB"
)
return data_point
except Exception as e:
self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
return None
async def collect_data(self) -> List[Dict]:
self.logger.info("Starting data collection...")
pipelines = self.client.get_recent_pipelines(25)
async with aiohttp.ClientSession(headers=self.client.headers) as session:
# Get pipelines
pipelines = await self.client.get_recent_pipelines(session, 50)
data_points = []
for pipeline in pipelines:
try:
self.logger.debug(f"Processing pipeline {pipeline['id']}")
# Process all pipelines in parallel
tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines]
results = await asyncio.gather(*tasks)
# Extract commit hash
commit_hash = self.extract_commit_info(pipeline)
if not commit_hash:
continue
jobs = self.client.get_workflow_jobs(pipeline["id"])
size_job = next(
(j for j in jobs if j["name"] == "measure_pip_sizes" and j["status"] == "success"),
None
)
if size_job:
artifacts = self.client.get_artifacts(size_job["job_number"])
size_report = next(
(a for a in artifacts if a["path"].endswith("pip-sizes.json")),
None
)
if size_report:
json_data = self.client.download_artifact(size_report["url"])
data_point = {
"commit_hash": commit_hash,
"timestamp": pipeline.get("created_at", pipeline.get("updated_at")),
"total_size_mb": json_data["total_size_mb"],
"packages": json_data["packages"]
}
data_points.append(data_point)
self.logger.info(
f"Processed pipeline {pipeline['id']}: "
f"commit {commit_hash[:7]}, "
f"size {json_data['total_size_mb']:.2f}MB"
)
else:
self.logger.debug(f"No pip-sizes.json artifact found for job {size_job['job_number']}")
else:
self.logger.debug(f"No measure_pip_sizes job found for pipeline {pipeline['id']}")
except Exception as e:
self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
continue
# Filter out None results
data_points = [r for r in results if r is not None]
return data_points
def generate_report(self, data: List[Dict], output_dir: str = "reports"):
def generate_report(self, data: List[Dict], output_dir: str = "reports") -> Optional[str]:
self.logger.info("Generating report...")
if not data:
self.logger.error("No data to generate report from!")
@@ -171,18 +175,49 @@ class PackageSizeTracker:
df = pd.DataFrame(data)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
# commit_url is already in the data from process_pipeline
# Create trend plot
# Create trend plot with updated styling
fig = px.line(
df,
x='timestamp',
y='total_size_mb',
title='Package Size Trend'
title='Package Size Trend',
markers=True,
hover_data={'commit_hash': True, 'timestamp': True, 'total_size_mb': ':.2f'},
custom_data=['commit_hash', 'commit_url']
)
fig.update_layout(
xaxis_title="Date",
yaxis_title="Total Size (MB)",
hovermode='x unified'
hovermode='x unified',
plot_bgcolor='white',
paper_bgcolor='white',
font=dict(size=12),
title_x=0.5,
)
fig.update_traces(
line=dict(width=2),
marker=dict(size=8),
hovertemplate="<br>".join([
"Commit: %{customdata[0]}",
"Size: %{y:.2f}MB",
"Date: %{x}",
"<extra>Click to view commit</extra>"
])
)
# Add JavaScript for click handling
fig.update_layout(
clickmode='event',
annotations=[
dict(
text="Click any point to view the commit on GitHub",
xref="paper", yref="paper",
x=0, y=1.05,
showarrow=False
)
]
)
# Ensure output directory exists
@@ -191,14 +226,25 @@ class PackageSizeTracker:
# Save plot
plot_path = output_dir / "package_size_trend.html"
fig.write_html(str(plot_path))
fig.write_html(
str(plot_path),
include_plotlyjs=True,
full_html=True,
post_script="""
const plot = document.getElementsByClassName('plotly-graph-div')[0];
plot.on('plotly_click', function(data) {
const point = data.points[0];
const commitUrl = point.customdata[1];
window.open(commitUrl, '_blank');
});
"""
)
# Generate summary
latest = df.iloc[-1]
previous = df.iloc[-2] if len(df) > 1 else latest
size_change = latest['total_size_mb'] - previous['total_size_mb']
# Save latest data
latest_data = {
'timestamp': latest['timestamp'].isoformat(),
'commit_hash': latest['commit_hash'],
@@ -210,9 +256,7 @@ class PackageSizeTracker:
with open(output_dir / 'latest_data.json', 'w') as f:
json.dump(latest_data, f, indent=2)
# Print summary to console
self._print_summary(latest_data)
self.logger.info(f"Report generated in {output_dir}")
return str(plot_path)
@@ -232,8 +276,7 @@ class PackageSizeTracker:
print(f"- {pkg['name']}: {pkg['size_mb']:.2f}MB")
print("\n")
def main():
# Get configuration
async def main():
token = os.getenv("CIRCLECI_TOKEN")
project_slug = os.getenv("CIRCLECI_PROJECT_SLUG")
debug = os.getenv("DEBUG", "").lower() in ("true", "1", "yes")
@@ -242,17 +285,14 @@ def main():
print("Error: Please set CIRCLECI_TOKEN and CIRCLECI_PROJECT_SLUG environment variables")
return
# Initialize tracker
tracker = PackageSizeTracker(token, project_slug, debug)
try:
# Collect data
data = tracker.collect_data()
data = await tracker.collect_data()
if not data:
print("No data found!")
return
# Generate report
report_path = tracker.generate_report(data)
if report_path:
print(f"\nDetailed report available at: {report_path}")
@@ -263,4 +303,4 @@ def main():
raise
if __name__ == "__main__":
main()
asyncio.run(main())