From e0c87113a31fb86cf8df57b9aa73b11d25170e50 Mon Sep 17 00:00:00 2001 From: Alex Cheema Date: Thu, 28 Nov 2024 19:53:16 +0400 Subject: [PATCH] better pagination to avoid rate limits on dashboard --- extra/dashboard/dashboard.py | 82 ++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/extra/dashboard/dashboard.py b/extra/dashboard/dashboard.py index 89c93886..0ef6d3ed 100644 --- a/extra/dashboard/dashboard.py +++ b/extra/dashboard/dashboard.py @@ -39,15 +39,7 @@ class AsyncCircleCIClient: ): """ Get recent pipelines for a project with pagination support - - Args: - session: aiohttp client session - org_slug: Organization slug - page_token: Token for pagination - limit: Maximum number of pipelines to return - branch: Specific branch to fetch pipelines from """ - params = { "branch": branch, "page-token": page_token @@ -62,13 +54,17 @@ class AsyncCircleCIClient: next_page_token = data.get("next_page_token") + # If we have a limit, check if we need more pages + if limit and len(pipelines) >= limit: + return pipelines + # If there are more pages and we haven't hit the limit, recursively get them - if next_page_token and (limit is None or len(pipelines) < limit): + if next_page_token: next_pipelines = await self.get_recent_pipelines( session, org_slug, page_token=next_page_token, - limit=limit, + limit=limit - len(pipelines) if limit else None, # Adjust limit for next page branch=branch ) pipelines.extend(next_pipelines) @@ -284,16 +280,57 @@ class PackageSizeTracker: self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}") return None + async def process_pipeline_batch( + self, + session: aiohttp.ClientSession, + pipelines: List[Dict], + batch_size: int = 5 + ) -> List[Dict]: + """ + Process a batch of pipelines with rate limiting. + + Args: + session: aiohttp client session + pipelines: List of pipelines to process + batch_size: Number of pipelines to process in parallel + + Returns: + List of processed pipeline data points + """ + data_points = [] + + for i in range(0, len(pipelines), batch_size): + batch = pipelines[i:i + batch_size] + + # Process batch in parallel + tasks = [self.process_pipeline(session, pipeline) for pipeline in batch] + batch_results = await asyncio.gather(*tasks) + + # Filter out None results + batch_data = [r for r in batch_results if r is not None] + data_points.extend(batch_data) + + # Add delay between batches if there are more to process + if i + batch_size < len(pipelines): + await asyncio.sleep(1) # 1 second delay between batches + + return data_points + async def collect_data(self) -> List[Dict]: self.logger.info("Starting data collection...") async with aiohttp.ClientSession(headers=self.client.headers) as session: - # Get pipelines from both main and circleci branches + # Get pipelines from main branch main_pipelines = await self.client.get_recent_pipelines( session, org_slug=self.client.project_slug, limit=20, branch="main" ) + + # Add delay between branch requests + await asyncio.sleep(2) + + # Get pipelines from circleci branch circleci_pipelines = await self.client.get_recent_pipelines( session, org_slug=self.client.project_slug, @@ -301,18 +338,27 @@ class PackageSizeTracker: branch="circleci" ) + # Combine pipelines and sort by created_at date pipelines = main_pipelines + circleci_pipelines - # Sort pipelines by created_at date - pipelines.sort(key=lambda x: x.get("created_at", x.get("updated_at")), reverse=True) + pipelines.sort( + key=lambda x: datetime.fromisoformat( + x.get("created_at", x.get("updated_at")).replace('Z', '+00:00') + ), + reverse=True # Most recent first + ) self.logger.info(f"Found {len(pipelines)} recent pipelines") - # Process all pipelines in parallel - tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines] - results = await asyncio.gather(*tasks) + # Process pipelines in batches + data_points = await self.process_pipeline_batch(session, pipelines) - # Filter out None results - data_points = [r for r in results if r is not None] + # Sort by timestamp + data_points.sort( + key=lambda x: datetime.fromisoformat( + x.get("timestamp").replace('Z', '+00:00') + ), + reverse=True # Most recent first + ) return data_points