{ "cells": [ { "cell_type": "markdown", "id": "9610be80", "metadata": { "id": "9610be80" }, "source": [ "# Usage & Cost Admin API Cookbook\n", "\n", "**A practical guide to programmatically accessing your Claude API usage and cost data**" ] }, { "cell_type": "markdown", "id": "69b800fc", "metadata": { "id": "69b800fc" }, "source": [ "### What You Can Do\n", "\n", "**Usage Tracking:**\n", "- Monitor token consumption (uncached input, output, cache creation/reads)\n", "- Track usage across models, workspaces, and API keys\n", "- Analyze cache efficiency and server tool usage\n", "\n", "**Cost Analysis:**\n", "- Retrieve detailed cost breakdowns by service type\n", "- Monitor spending trends across workspaces\n", "- Generate reports for finance and chargeback scenarios\n", "\n", "**Common Use Cases:**\n", "- **Usage Monitoring**: Track consumption patterns and optimize costs\n", "- **Cost Attribution**: Allocate expenses across teams/projects by workspace\n", "- **Cache Analysis**: Measure and improve cache efficiency\n", "- **Financial Reporting**: Generate executive summaries and budget reports\n", "\n", "### API Overview\n", "\n", "Two main endpoints:\n", "1. **Messages Usage API**: Token-level usage data with flexible grouping\n", "2. **Cost API**: Financial data in USD with service breakdowns\n", "\n", "### Prerequisites & Security\n", "\n", "- **Admin API Key**: Get from [Claude Console](https://console.anthropic.com/settings/admin-keys) (format: `sk-ant-admin...`)\n", "- **Security**: Store keys in environment variables, rotate regularly, never commit to version control" ] }, { "cell_type": "code", "execution_count": 11, "id": "edd50a16", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "edd50a16", "outputId": "68f38db3-48ee-429f-cd6e-2d2ae74e009a" }, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "✅ Connection successful!\n" ] } ], "source": [ "import os\n", "import requests\n", "import pandas as pd\n", "from datetime import datetime, timedelta, time\n", "from typing import Dict, List, Optional, Any\n", "\n", "class AnthropicAdminAPI:\n", " \"\"\"Secure wrapper for Anthropic Admin API endpoints.\"\"\"\n", "\n", " def __init__(self, api_key: Optional[str] = None):\n", " self.api_key = api_key or os.getenv('ANTHROPIC_ADMIN_API_KEY')\n", " if not self.api_key:\n", " raise ValueError(\"Admin API key required. Set ANTHROPIC_ADMIN_API_KEY environment variable.\")\n", "\n", " if not self.api_key.startswith('sk-ant-admin'):\n", " raise ValueError(\"Invalid Admin API key format.\")\n", "\n", " self.base_url = \"https://api.anthropic.com/v1/organizations\"\n", " self.headers = {\n", " \"anthropic-version\": \"2023-06-01\",\n", " \"x-api-key\": self.api_key,\n", " \"Content-Type\": \"application/json\"\n", " }\n", "\n", " def _make_request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Make authenticated request with basic error handling.\"\"\"\n", " url = f\"{self.base_url}/{endpoint}\"\n", "\n", " try:\n", " response = requests.get(url, headers=self.headers, params=params)\n", " response.raise_for_status()\n", " return response.json()\n", " except requests.exceptions.HTTPError as e:\n", " if response.status_code == 401:\n", " raise ValueError(\"Invalid API key or insufficient permissions\")\n", " elif response.status_code == 429:\n", " raise requests.exceptions.RequestException(\"Rate limit exceeded - try again later\")\n", " else:\n", " raise requests.exceptions.RequestException(f\"API error: {e}\")\n", "\n", "# Test connection\n", "def test_connection():\n", " try:\n", " client = AnthropicAdminAPI()\n", "\n", " # Simple test query - snap to start of day to align with bucket boundaries\n", " params = {\n", " 'starting_at': (datetime.combine(datetime.utcnow(), time.min) -\n", "timedelta(days=1)).strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': datetime.combine(datetime.utcnow(),\n", "time.min).strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'bucket_width': '1d',\n", " 'limit': 1\n", " }\n", "\n", " response = client._make_request(\"usage_report/messages\", params)\n", " print(\"✅ Connection successful!\")\n", " return client\n", "\n", " except Exception as e:\n", " print(f\"❌ Connection failed: {e}\")\n", " return None\n", "\n", "client = test_connection()" ] }, { "cell_type": "markdown", "id": "781a0721", "metadata": { "id": "781a0721" }, "source": [ "## Basic Usage & Cost Tracking\n", "\n", "### Understanding Usage Data\n", "\n", "The Messages Usage API provides token consumption in **time buckets** - fixed intervals containing aggregated usage.\n", "\n", "**Key Metrics:**\n", "- **uncached_input_tokens**: New input tokens (prompts, system messages)\n", "- **output_tokens**: Claude's responses\n", "- **cache_creation**: Tokens cached for reuse\n", "- **cache_read_input_tokens**: Previously cached tokens reused\n", "\n", "### Basic Usage Query" ] }, { "cell_type": "code", "execution_count": 12, "id": "f9b26143", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "f9b26143", "outputId": "c60b91c0-084d-4629-eddd-cf0fb941e042" }, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "📊 Usage Summary:\n", "Uncached input tokens: 267,751\n", "Output tokens: 2,848,746\n", "Cache creation: 0\n", "Cache reads: 0\n", "Cache efficiency: 0.0%\n", "Web searches: 0\n" ] } ], "source": [ "def get_daily_usage(client, days_back=7):\n", " \"\"\"Get usage data for the last N days.\"\"\"\n", " end_time = datetime.combine(datetime.utcnow(), time.min)\n", " start_time = end_time - timedelta(days=days_back)\n", "\n", " params = {\n", " 'starting_at': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'bucket_width': '1d',\n", " 'limit': days_back\n", " }\n", "\n", " return client._make_request(\"usage_report/messages\", params)\n", "\n", "def analyze_usage_data(response):\n", " \"\"\"Process and display usage data.\"\"\"\n", " if not response or not response.get('data'):\n", " print(\"No usage data found.\")\n", " return\n", "\n", " total_uncached_input = total_output = total_cache_creation = 0\n", " total_cache_reads = total_web_searches = 0\n", " daily_data = []\n", "\n", " for bucket in response['data']:\n", " date = bucket['starting_at'][:10]\n", "\n", " # Sum all results in bucket\n", " bucket_uncached = bucket_output = bucket_cache_creation = 0\n", " bucket_cache_reads = bucket_web_searches = 0\n", "\n", " for result in bucket['results']:\n", " bucket_uncached += result.get('uncached_input_tokens', 0)\n", " bucket_output += result.get('output_tokens', 0)\n", "\n", " cache_creation = result.get('cache_creation', {})\n", " bucket_cache_creation += (\n", " cache_creation.get('ephemeral_1h_input_tokens', 0) +\n", " cache_creation.get('ephemeral_5m_input_tokens', 0)\n", " )\n", " bucket_cache_reads += result.get('cache_read_input_tokens', 0)\n", "\n", " server_tools = result.get('server_tool_use', {})\n", " bucket_web_searches += server_tools.get('web_search_requests', 0)\n", "\n", " daily_data.append({\n", " 'date': date,\n", " 'uncached_input_tokens': bucket_uncached,\n", " 'output_tokens': bucket_output,\n", " 'cache_creation': bucket_cache_creation,\n", " 'cache_reads': bucket_cache_reads,\n", " 'web_searches': bucket_web_searches,\n", " 'total_tokens': bucket_uncached + bucket_output\n", " })\n", "\n", " # Add to totals\n", " total_uncached_input += bucket_uncached\n", " total_output += bucket_output\n", " total_cache_creation += bucket_cache_creation\n", " total_cache_reads += bucket_cache_reads\n", " total_web_searches += bucket_web_searches\n", "\n", " # Calculate cache efficiency\n", " total_input_tokens = total_uncached_input + total_cache_creation + total_cache_reads\n", " cache_efficiency = (total_cache_reads / total_input_tokens * 100) if total_input_tokens > 0 else 0\n", "\n", " # Display summary\n", " print(f\"📊 Usage Summary:\")\n", " print(f\"Uncached input tokens: {total_uncached_input:,}\")\n", " print(f\"Output tokens: {total_output:,}\")\n", " print(f\"Cache creation: {total_cache_creation:,}\")\n", " print(f\"Cache reads: {total_cache_reads:,}\")\n", " print(f\"Cache efficiency: {cache_efficiency:.1f}%\")\n", " print(f\"Web searches: {total_web_searches:,}\")\n", "\n", " return daily_data\n", "\n", "# Example usage\n", "if client:\n", " usage_response = get_daily_usage(client, days_back=7)\n", " daily_usage = analyze_usage_data(usage_response)" ] }, { "cell_type": "markdown", "id": "e3164089", "metadata": { "id": "e3164089" }, "source": [ "## Basic Cost Tracking\n", "\n", "Note: Priority Tier costs use a different billing model and will never appear in the cost endpoint. You can track Priority Tier usage in the usage endpoint, but not costs." ] }, { "cell_type": "code", "source": [ "def get_daily_costs(client, days_back=7):\n", " \"\"\"Get cost data for the last N days.\"\"\"\n", " end_time = datetime.combine(datetime.utcnow(), time.min)\n", " start_time = end_time - timedelta(days=days_back)\n", "\n", " params = {\n", " 'starting_at': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'bucket_width': '1d', # Only 1d supported for cost API\n", " 'limit': min(days_back, 31) # Max 31 days per request\n", " }\n", "\n", " return client._make_request(\"cost_report\", params)\n", "\n", "def analyze_cost_data(response):\n", " \"\"\"Process and display cost data.\"\"\"\n", " if not response or not response.get('data'):\n", " print(\"No cost data found.\")\n", " return\n", "\n", " total_cost_minor_units = 0\n", " daily_costs = []\n", "\n", " for bucket in response['data']:\n", " date = bucket['starting_at'][:10]\n", "\n", " # Sum all costs in this bucket\n", " bucket_cost = 0\n", " for result in bucket['results']:\n", " # Convert string amounts to float if needed\n", " amount = result.get('amount', 0)\n", " if isinstance(amount, str):\n", " try:\n", " amount = float(amount)\n", " except (ValueError, TypeError):\n", " amount = 0\n", " bucket_cost += amount\n", "\n", " daily_costs.append({\n", " 'date': date,\n", " 'cost_minor_units': bucket_cost,\n", " 'cost_usd': bucket_cost / 100 # Convert to dollars\n", " })\n", "\n", " total_cost_minor_units += bucket_cost\n", "\n", " total_cost_usd = total_cost_minor_units / 100\n", "\n", " print(f\"💰 Cost Summary:\")\n", " print(f\"Total cost: ${total_cost_usd:.4f}\")\n", " print(f\"Average daily cost: ${total_cost_usd / len(daily_costs):.4f}\")\n", "\n", " return daily_costs\n", "\n", "# Example usage\n", "if client:\n", " cost_response = get_daily_costs(client, days_back=7)\n", " daily_costs = analyze_cost_data(cost_response)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "SP4zefdtF0ft", "outputId": "a1daa8d0-f36f-474b-9967-6fe00a0efe52" }, "id": "SP4zefdtF0ft", "execution_count": 13, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "💰 Cost Summary:\n", "Total cost: $83.7574\n", "Average daily cost: $11.9653\n" ] } ] }, { "cell_type": "markdown", "id": "6ce76fc4", "metadata": { "id": "6ce76fc4" }, "source": [ "## Grouping, Filtering & Pagination\n", "\n", "### Time Granularity Options\n", "\n", "**Usage API** supports three granularities:\n", "- `1m` (1 minute): High-resolution analysis, max 1440 buckets per request\n", "- `1h` (1 hour): Medium-resolution, max 168 buckets per request \n", "- `1d` (1 day): Daily analysis, max 31 buckets per request\n", "\n", "**Cost API** supports:\n", "- `1d` (1 day): Only option available, max 31 buckets per request\n", "\n", "### Grouping and Filtering" ] }, { "cell_type": "code", "execution_count": 14, "id": "87f1a7ac", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "87f1a7ac", "outputId": "3266318c-8af4-4647-9360-b133ffa82452" }, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "📊 Usage by Model:\n", " claude-3-5-haiku-20241022: 995,781 tokens\n", " claude-3-5-sonnet-20241022: 861,880 tokens\n", " claude-3-opus-20240229: 394,646 tokens\n", " claude-sonnet-4-20250514: 356,766 tokens\n", " claude-opus-4-20250514: 308,223 tokens\n", " claude-opus-4-1-20250805: 199,201 tokens\n", "Found 7 days of filtered usage data\n" ] } ], "source": [ "def get_usage_by_model(client, days_back=7):\n", " \"\"\"Get usage data grouped by model, handling pagination automatically.\"\"\"\n", " end_time = datetime.combine(datetime.utcnow(), time.min)\n", " start_time = end_time - timedelta(days=days_back)\n", "\n", " params = {\n", " 'starting_at': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'group_by[]': ['model'],\n", " 'bucket_width': '1d'\n", " }\n", "\n", " # Aggregate across all pages of data\n", " model_usage = {}\n", " page_count = 0\n", " max_pages = 10 # Reasonable limit to avoid infinite loops\n", "\n", " try:\n", " next_page = None\n", "\n", " while page_count < max_pages:\n", " current_params = params.copy()\n", " if next_page:\n", " current_params['page'] = next_page\n", "\n", " response = client._make_request(\"usage_report/messages\", current_params)\n", " page_count += 1\n", "\n", " # Process this page's data\n", " for bucket in response.get('data', []):\n", " for result in bucket.get('results', []):\n", " model = result.get('model', 'Unknown')\n", " uncached = result.get('uncached_input_tokens', 0)\n", " output = result.get('output_tokens', 0)\n", " cache_creation = result.get('cache_creation', {})\n", " cache_creation_tokens = (\n", " cache_creation.get('ephemeral_1h_input_tokens', 0) +\n", " cache_creation.get('ephemeral_5m_input_tokens', 0)\n", " )\n", " cache_reads = result.get('cache_read_input_tokens', 0)\n", " tokens = uncached + output + cache_creation_tokens + cache_reads\n", "\n", " if model not in model_usage:\n", " model_usage[model] = 0\n", " model_usage[model] += tokens\n", "\n", " # Check if there's more data\n", " if not response.get('has_more', False):\n", " break\n", "\n", " next_page = response.get('next_page')\n", " if not next_page:\n", " break\n", "\n", " except Exception as e:\n", " print(f\"❌ Error retrieving usage data: {e}\")\n", " return {}\n", "\n", " # Display results\n", " print(\"📊 Usage by Model:\")\n", " if not model_usage:\n", " print(f\" No usage data found in the last {days_back} days\")\n", " print(\" 💡 Try increasing the time range or check if you have recent API usage\")\n", " else:\n", " for model, tokens in sorted(model_usage.items(), key=lambda x: x[1],\n", "reverse=True):\n", " print(f\" {model}: {tokens:,} tokens\")\n", "\n", " return model_usage\n", "\n", "def filter_usage_example(client):\n", " \"\"\"Example of filtering usage data.\"\"\"\n", " params = {\n", " 'starting_at': (datetime.combine(datetime.utcnow(), time.min) -\n", "timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': datetime.combine(datetime.utcnow(),\n", "time.min).strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'models[]': ['claude-3-5-sonnet-20241022'], # Filter to specific model\n", " 'service_tiers[]': ['standard'], # Filter to standard tier\n", " 'bucket_width': '1d'\n", " }\n", "\n", " response = client._make_request(\"usage_report/messages\", params)\n", " print(f\"Found {len(response.get('data', []))} days of filtered usage data\")\n", " return response\n", "\n", "# Example usage\n", "if client:\n", " model_usage = get_usage_by_model(client, days_back=14)\n", " filtered_usage = filter_usage_example(client)" ] }, { "cell_type": "markdown", "id": "487bc11d", "metadata": { "id": "487bc11d" }, "source": [ "### Pagination for Large Datasets" ] }, { "cell_type": "code", "execution_count": 18, "id": "d7cc5437", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "d7cc5437", "outputId": "e7875fb4-09ae-44d2-f5fc-12a4cbdcce5a" }, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "📥 Fetching paginated data...\n", " Page 1: 24 time buckets\n", " Page 2: 24 time buckets\n", " Page 3: 24 time buckets\n", "✅ Complete: Retrieved all data in 3 pages\n", "📊 Total retrieved: 72 time buckets\n", "📈 Total tokens across all data: 1,336,287\n" ] } ], "source": [ "def fetch_all_usage_data(client, params, max_pages=10):\n", " \"\"\"Fetch all paginated usage data.\"\"\"\n", " all_data = []\n", " page_count = 0\n", " next_page = None\n", "\n", " print(\"📥 Fetching paginated data...\")\n", "\n", " while page_count < max_pages:\n", " current_params = params.copy()\n", " if next_page:\n", " current_params['page'] = next_page\n", "\n", " try:\n", " response = client._make_request(\"usage_report/messages\", current_params)\n", "\n", " if not response or not response.get('data'):\n", " break\n", "\n", " page_data = response['data']\n", " all_data.extend(page_data)\n", " page_count += 1\n", "\n", " print(f\" Page {page_count}: {len(page_data)} time buckets\")\n", "\n", " if not response.get('has_more', False):\n", " print(f\"✅ Complete: Retrieved all data in {page_count} pages\")\n", " break\n", "\n", " next_page = response.get('next_page')\n", " if not next_page:\n", " break\n", "\n", " except Exception as e:\n", " print(f\"❌ Error on page {page_count + 1}: {e}\")\n", " break\n", "\n", " print(f\"📊 Total retrieved: {len(all_data)} time buckets\")\n", " return all_data\n", "\n", "def large_dataset_example(client, days_back=3):\n", " \"\"\"Example of handling a large dataset with pagination.\"\"\"\n", " # Use recent time range to ensure we have data\n", " start_time = datetime.combine(datetime.utcnow(), time.min) - timedelta(days=days_back)\n", " end_time = datetime.combine(datetime.utcnow(), time.min)\n", "\n", " params = {\n", " 'starting_at': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'bucket_width': '1h', # Hourly data for more buckets\n", " 'group_by[]': ['model'],\n", " 'limit': 24 # One day per page\n", " }\n", "\n", " all_buckets = fetch_all_usage_data(client, params, max_pages=5)\n", "\n", " # Process the large dataset\n", " if all_buckets:\n", " total_tokens = sum(\n", " sum(result.get('uncached_input_tokens', 0) + result.get('output_tokens', 0)\n", " for result in bucket['results'])\n", " for bucket in all_buckets\n", " )\n", " print(f\"📈 Total tokens across all data: {total_tokens:,}\")\n", "\n", " return all_buckets\n", "\n", "# Example usage - use shorter time range to find recent data\n", "if client:\n", " large_dataset = large_dataset_example(client, days_back=3)" ] }, { "cell_type": "markdown", "id": "d55bbb55", "metadata": { "id": "d55bbb55" }, "source": [ "## Simple Data Export\n", "\n", "### CSV Export for External Analysis" ] }, { "cell_type": "code", "execution_count": 16, "id": "f365296a", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "f365296a", "outputId": "87b4a733-6f90-4b8d-a109-7a23281b2a4d" }, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "✅ Exported 36 rows to my_usage_data.csv\n", "✅ Exported 72 cost records to my_cost_data.csv\n" ] } ], "source": [ "import csv\n", "from pathlib import Path\n", "\n", "def export_usage_to_csv(client, output_file=\"usage_data.csv\", days_back=30):\n", " \"\"\"Export usage data to CSV for external analysis.\"\"\"\n", "\n", " end_time = datetime.combine(datetime.utcnow(), time.min)\n", " start_time = end_time - timedelta(days=days_back)\n", "\n", " params = {\n", " 'starting_at': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'group_by[]': ['model', 'service_tier', 'workspace_id'],\n", " 'bucket_width': '1d'\n", " }\n", "\n", " try:\n", " # Collect all data across pages\n", " rows = []\n", " page_count = 0\n", " max_pages = 20 # Allow more pages for export\n", " next_page = None\n", "\n", " while page_count < max_pages:\n", " current_params = params.copy()\n", " if next_page:\n", " current_params['page'] = next_page\n", "\n", " response = client._make_request(\"usage_report/messages\", current_params)\n", " page_count += 1\n", "\n", " # Process this page's data\n", " for bucket in response.get('data', []):\n", " date = bucket['starting_at'][:10]\n", " for result in bucket['results']:\n", " rows.append({\n", " 'date': date,\n", " 'model': result.get('model', ''),\n", " 'service_tier': result.get('service_tier', ''),\n", " 'workspace_id': result.get('workspace_id', ''),\n", " 'uncached_input_tokens': result.get('uncached_input_tokens', 0),\n", " 'output_tokens': result.get('output_tokens', 0),\n", " 'cache_creation_tokens': (\n", " result.get('cache_creation', {}).get('ephemeral_1h_input_tokens', 0) +\n", " result.get('cache_creation', {}).get('ephemeral_5m_input_tokens', 0)\n", " ),\n", " 'cache_read_tokens': result.get('cache_read_input_tokens', 0),\n", " 'web_search_requests': result.get('server_tool_use', {}).get('web_search_requests', 0)\n", " })\n", "\n", " # Check if there's more data\n", " if not response.get('has_more', False):\n", " break\n", "\n", " next_page = response.get('next_page')\n", " if not next_page:\n", " break\n", "\n", " # Write CSV\n", " if rows:\n", " with open(output_file, 'w', newline='') as csvfile:\n", " writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys())\n", " writer.writeheader()\n", " writer.writerows(rows)\n", "\n", " print(f\"✅ Exported {len(rows)} rows to {output_file}\")\n", " else:\n", " print(f\"No usage data to export for the last {days_back} days\")\n", " print(\"💡 Try increasing days_back or check if you have recent API usage\")\n", "\n", " except Exception as e:\n", " print(f\"❌ Export failed: {e}\")\n", "\n", "def export_costs_to_csv(client, output_file=\"cost_data.csv\", days_back=30):\n", " \"\"\"Export cost data to CSV.\"\"\"\n", "\n", " end_time = datetime.combine(datetime.utcnow(), time.min)\n", " start_time = end_time - timedelta(days=days_back)\n", "\n", " params = {\n", " 'starting_at': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'ending_at': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", " 'group_by[]': ['workspace_id', 'description']\n", " }\n", "\n", " try:\n", " # Collect all data across pages\n", " rows = []\n", " page_count = 0\n", " max_pages = 20\n", " next_page = None\n", "\n", " while page_count < max_pages:\n", " current_params = params.copy()\n", " if next_page:\n", " current_params['page'] = next_page\n", "\n", " response = client._make_request(\"cost_report\", current_params)\n", " page_count += 1\n", "\n", " # Process this page's data\n", " for bucket in response.get('data', []):\n", " date = bucket['starting_at'][:10]\n", " for result in bucket['results']:\n", " # Handle both string and numeric amounts\n", " amount = result.get('amount', 0)\n", " if isinstance(amount, str):\n", " try:\n", " amount = float(amount)\n", " except (ValueError, TypeError):\n", " amount = 0\n", "\n", " rows.append({\n", " 'date': date,\n", " 'workspace_id': result.get('workspace_id', ''), # null for default workspace\n", " 'description': result.get('description', ''),\n", " 'currency': result.get('currency', 'USD'),\n", " 'amount_usd': amount / 100\n", " })\n", "\n", " # Check if there's more data\n", " if not response.get('has_more', False):\n", " break\n", "\n", " next_page = response.get('next_page')\n", " if not next_page:\n", " break\n", "\n", " if rows:\n", " with open(output_file, 'w', newline='') as csvfile:\n", " writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys())\n", " writer.writeheader()\n", " writer.writerows(rows)\n", "\n", " print(f\"✅ Exported {len(rows)} cost records to {output_file}\")\n", " else:\n", " print(f\"No cost data to export for the last {days_back} days\")\n", " print(\"💡 Try increasing days_back or check if you have recent API usage\")\n", "\n", " except Exception as e:\n", " print(f\"❌ Cost export failed: {e}\")\n", "\n", "# Example usage\n", "if client:\n", " export_usage_to_csv(client, \"my_usage_data.csv\", days_back=14)\n", " export_costs_to_csv(client, \"my_cost_data.csv\", days_back=14)" ] }, { "cell_type": "markdown", "id": "5327a1c5", "metadata": { "id": "5327a1c5" }, "source": [ "## Wrapping Up\n", "\n", "This cookbook covers the essential patterns for working with the Usage & Cost Admin API:\n", "\n", "- **Basic queries** for usage and cost data\n", "- **Grouping and filtering** for detailed analysis \n", "- **Pagination** for large datasets\n", "- **Cost description parsing** for categorization\n", "- **Common gotchas** to avoid issues\n", "- **Simple CSV export** for external tools\n", "\n", "### Next Steps\n", "\n", "- Check the [official API documentation](https://docs.claude.com) for the latest field definitions\n", "- Test your integration with small date ranges first\n", "- Consider data retention needs for your use case\n", "- Monitor for new API features that may enhance your analysis\n", "\n", "### Important Notes\n", "\n", "- Field names and available options may evolve as the API matures\n", "- Always handle unknown values gracefully in production code\n", "- The API is designed for historical analysis, not real-time monitoring\n", "- Priority Tier costs use a different billing model and don't appear in cost endpoints\n", "\n", "Happy analyzing! 📊" ] } ], "metadata": { "kernelspec": { "display_name": "base", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" }, "colab": { "provenance": [] } }, "nbformat": 4, "nbformat_minor": 5 }