mirror of
https://github.com/exo-explore/exo.git
synced 2025-10-23 02:57:14 +03:00
Merge pull request #514 from exo-explore/dashratelimit
better pagination to avoid rate limits on dashboard
This commit is contained in:
@@ -39,15 +39,7 @@ class AsyncCircleCIClient:
|
|||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Get recent pipelines for a project with pagination support
|
Get recent pipelines for a project with pagination support
|
||||||
|
|
||||||
Args:
|
|
||||||
session: aiohttp client session
|
|
||||||
org_slug: Organization slug
|
|
||||||
page_token: Token for pagination
|
|
||||||
limit: Maximum number of pipelines to return
|
|
||||||
branch: Specific branch to fetch pipelines from
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"branch": branch,
|
"branch": branch,
|
||||||
"page-token": page_token
|
"page-token": page_token
|
||||||
@@ -62,13 +54,17 @@ class AsyncCircleCIClient:
|
|||||||
|
|
||||||
next_page_token = data.get("next_page_token")
|
next_page_token = data.get("next_page_token")
|
||||||
|
|
||||||
|
# If we have a limit, check if we need more pages
|
||||||
|
if limit and len(pipelines) >= limit:
|
||||||
|
return pipelines
|
||||||
|
|
||||||
# If there are more pages and we haven't hit the limit, recursively get them
|
# If there are more pages and we haven't hit the limit, recursively get them
|
||||||
if next_page_token and (limit is None or len(pipelines) < limit):
|
if next_page_token:
|
||||||
next_pipelines = await self.get_recent_pipelines(
|
next_pipelines = await self.get_recent_pipelines(
|
||||||
session,
|
session,
|
||||||
org_slug,
|
org_slug,
|
||||||
page_token=next_page_token,
|
page_token=next_page_token,
|
||||||
limit=limit,
|
limit=limit - len(pipelines) if limit else None, # Adjust limit for next page
|
||||||
branch=branch
|
branch=branch
|
||||||
)
|
)
|
||||||
pipelines.extend(next_pipelines)
|
pipelines.extend(next_pipelines)
|
||||||
@@ -284,16 +280,57 @@ class PackageSizeTracker:
|
|||||||
self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
|
self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def process_pipeline_batch(
|
||||||
|
self,
|
||||||
|
session: aiohttp.ClientSession,
|
||||||
|
pipelines: List[Dict],
|
||||||
|
batch_size: int = 5
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
Process a batch of pipelines with rate limiting.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: aiohttp client session
|
||||||
|
pipelines: List of pipelines to process
|
||||||
|
batch_size: Number of pipelines to process in parallel
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of processed pipeline data points
|
||||||
|
"""
|
||||||
|
data_points = []
|
||||||
|
|
||||||
|
for i in range(0, len(pipelines), batch_size):
|
||||||
|
batch = pipelines[i:i + batch_size]
|
||||||
|
|
||||||
|
# Process batch in parallel
|
||||||
|
tasks = [self.process_pipeline(session, pipeline) for pipeline in batch]
|
||||||
|
batch_results = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
# Filter out None results
|
||||||
|
batch_data = [r for r in batch_results if r is not None]
|
||||||
|
data_points.extend(batch_data)
|
||||||
|
|
||||||
|
# Add delay between batches if there are more to process
|
||||||
|
if i + batch_size < len(pipelines):
|
||||||
|
await asyncio.sleep(1) # 1 second delay between batches
|
||||||
|
|
||||||
|
return data_points
|
||||||
|
|
||||||
async def collect_data(self) -> List[Dict]:
|
async def collect_data(self) -> List[Dict]:
|
||||||
self.logger.info("Starting data collection...")
|
self.logger.info("Starting data collection...")
|
||||||
async with aiohttp.ClientSession(headers=self.client.headers) as session:
|
async with aiohttp.ClientSession(headers=self.client.headers) as session:
|
||||||
# Get pipelines from both main and circleci branches
|
# Get pipelines from main branch
|
||||||
main_pipelines = await self.client.get_recent_pipelines(
|
main_pipelines = await self.client.get_recent_pipelines(
|
||||||
session,
|
session,
|
||||||
org_slug=self.client.project_slug,
|
org_slug=self.client.project_slug,
|
||||||
limit=20,
|
limit=20,
|
||||||
branch="main"
|
branch="main"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Add delay between branch requests
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
# Get pipelines from circleci branch
|
||||||
circleci_pipelines = await self.client.get_recent_pipelines(
|
circleci_pipelines = await self.client.get_recent_pipelines(
|
||||||
session,
|
session,
|
||||||
org_slug=self.client.project_slug,
|
org_slug=self.client.project_slug,
|
||||||
@@ -301,18 +338,27 @@ class PackageSizeTracker:
|
|||||||
branch="circleci"
|
branch="circleci"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Combine pipelines and sort by created_at date
|
||||||
pipelines = main_pipelines + circleci_pipelines
|
pipelines = main_pipelines + circleci_pipelines
|
||||||
# Sort pipelines by created_at date
|
pipelines.sort(
|
||||||
pipelines.sort(key=lambda x: x.get("created_at", x.get("updated_at")), reverse=True)
|
key=lambda x: datetime.fromisoformat(
|
||||||
|
x.get("created_at", x.get("updated_at")).replace('Z', '+00:00')
|
||||||
|
),
|
||||||
|
reverse=True # Most recent first
|
||||||
|
)
|
||||||
|
|
||||||
self.logger.info(f"Found {len(pipelines)} recent pipelines")
|
self.logger.info(f"Found {len(pipelines)} recent pipelines")
|
||||||
|
|
||||||
# Process all pipelines in parallel
|
# Process pipelines in batches
|
||||||
tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines]
|
data_points = await self.process_pipeline_batch(session, pipelines)
|
||||||
results = await asyncio.gather(*tasks)
|
|
||||||
|
|
||||||
# Filter out None results
|
# Sort by timestamp
|
||||||
data_points = [r for r in results if r is not None]
|
data_points.sort(
|
||||||
|
key=lambda x: datetime.fromisoformat(
|
||||||
|
x.get("timestamp").replace('Z', '+00:00')
|
||||||
|
),
|
||||||
|
reverse=True # Most recent first
|
||||||
|
)
|
||||||
|
|
||||||
return data_points
|
return data_points
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user