🎉 Init Project

This commit is contained in:
Holegots
2025-06-23 01:27:50 +08:00
commit 6ce4226111
24 changed files with 3717 additions and 0 deletions

39
.env.example Normal file
View File

@@ -0,0 +1,39 @@
# Required: Your OpenAI API key
OPENAI_API_KEY="sk-your-openai-api-key-here"
# Optional: OpenAI API base URL (default: https://api.openai.com/v1)
# You can change this to use other providers like Azure OpenAI, local models, etc.
OPENAI_BASE_URL="https://api.openai.com/v1"
# Optional: Model mappings (BIG and SMALL models)
BIG_MODEL="gpt-4o" # Used for Claude sonnet/opus requests
SMALL_MODEL="gpt-4o-mini" # Used for Claude haiku requests
# Optional: Server settings
HOST="0.0.0.0"
PORT="8082"
LOG_LEVEL="WARNING" # DEBUG, INFO, WARNING, ERROR, CRITICAL
# Optional: Performance settings
MAX_TOKENS_LIMIT="4096"
# Minimum tokens limit for requests (to avoid errors with thinking model)
MIN_TOKENS_LIMIT="4096"
REQUEST_TIMEOUT="90"
MAX_RETRIES="2"
# Examples for other providers:
# For Azure OpenAI (recommended if OpenAI is not available in your region):
# OPENAI_API_KEY="your-azure-api-key"
# OPENAI_BASE_URL="https://your-resource-name.openai.azure.com/openai/deployments/your-deployment-name"
# BIG_MODEL="gpt-4"
# SMALL_MODEL="gpt-35-turbo"
# For local models (like Ollama):
# OPENAI_API_KEY="dummy-key" # Required but can be any value for local models
# OPENAI_BASE_URL="http://localhost:11434/v1"
# BIG_MODEL="llama3.1:70b"
# SMALL_MODEL="llama3.1:8b"
# Note: If you get "unsupported_country_region_territory" errors,
# consider using Azure OpenAI or a local model setup instead.

177
.gitignore vendored Normal file
View File

@@ -0,0 +1,177 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python
n

19
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,19 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: FastAPI",
"type": "debugpy",
"request": "launch",
"module": "uvicorn",
"args": [
"src.main:app",
"--reload",
],
"jinja": true
}
]
}

123
CLAUDE.md Executable file
View File

@@ -0,0 +1,123 @@
# Claude Code: Best Practices for Effective Collaboration
This document outlines best practices for working with Claude Code to ensure efficient and successful software development tasks.
## Task Management
For complex or multi-step tasks, Claude Code will use:
* **TodoWrite**: To create a structured task list, breaking down the work into manageable steps. This provides clarity on the plan and allows for tracking progress.
* **TodoRead**: To review the current list of tasks and their status, ensuring alignment and that all objectives are being addressed.
## File Handling and Reading
Understanding file content is crucial before making modifications.
1. **Targeted Information Retrieval**:
* When searching for specific content, patterns, or definitions within a codebase, prefer using search tools like `Grep` or `Task` (with a focused search prompt). This is more efficient than reading entire files.
2. **Reading File Content**:
* **Small to Medium Files**: For files where full context is needed or that are not excessively large, the `Read` tool can be used to retrieve the entire content.
* **Large File Strategy**:
1. **Assess Size**: Before reading a potentially large file, its size should be determined (e.g., using `ls -l` via the `Bash` tool or by an initial `Read` with a small `limit` to observe if content is truncated).
2. **Chunked Reading**: If a file is large (e.g., over a few thousand lines), it should be read in manageable chunks (e.g., 1000-2000 lines at a time) using the `offset` and `limit` parameters of the `Read` tool. This ensures all content can be processed without issues.
* Always ensure that the file path provided to `Read` is absolute.
## File Editing
Precision is key for successful file edits. The following strategies lead to reliable modifications:
1. **Pre-Edit Read**: **Always** use the `Read` tool to fetch the content of the file *immediately before* attempting any `Edit` or `MultiEdit` operation. This ensures modifications are based on the absolute latest version of the file.
2. **Constructing `old_string` (The text to be replaced)**:
* **Exact Match**: The `old_string` must be an *exact* character-for-character match of the segment in the file you intend to replace. This includes all whitespace (spaces, tabs, newlines) and special characters.
* **No Read Artifacts**: Crucially, do *not* include any formatting artifacts from the `Read` tool's output (e.g., `cat -n` style line numbers or display-only leading tabs) in the `old_string`. It must only contain the literal characters as they exist in the raw file.
* **Sufficient Context & Uniqueness**: Provide enough context (surrounding lines) in `old_string` to make it uniquely identifiable at the intended edit location. The "Anchor on a Known Good Line" strategy is preferred: `old_string` is a larger, unique block of text surrounding the change or insertion point. This is highly reliable.
3. **Constructing `new_string` (The replacement text)**:
* **Exact Representation**: The `new_string` must accurately represent the desired state of the code, including correct indentation, whitespace, and newlines.
* **No Read Artifacts**: As with `old_string`, ensure `new_string` does *not* contain any `Read` tool output artifacts.
4. **Choosing the Right Editing Tool**:
* **`Edit` Tool**: Suitable for a single, well-defined replacement in a file.
* **`MultiEdit` Tool**: Preferred when multiple changes are needed within the same file. Edits are applied sequentially, with each subsequent edit operating on the result of the previous one. This tool is highly effective for complex modifications.
5. **Verification**:
* The success confirmation from the `Edit` or `MultiEdit` tool (especially if `expected_replacements` is used and matches) is the primary indicator that the change was made.
* If further visual confirmation is needed, use the `Read` tool with `offset` and `limit` parameters to view only the specific section of the file that was changed, rather than re-reading the entire file.
### Reliable Code Insertion with MultiEdit
When inserting larger blocks of new code (e.g., multiple functions or methods) where a simple `old_string` might be fragile due to surrounding code, the following `MultiEdit` strategy can be more robust:
1. **First Edit - Targeted Insertion Point**: For the primary code block you want to insert (e.g., new methods within a class), identify a short, unique, and stable line of code immediately *after* your desired insertion point. Use this stable line as the `old_string`.
* The `new_string` will consist of your new block of code, followed by a newline, and then the original `old_string` (the stable line you matched on).
* Example: If inserting methods into a class, the `old_string` might be the closing brace `}` of the class, or a comment that directly follows the class.
2. **Second Edit (Optional) - Ancillary Code**: If there's another, smaller piece of related code to insert (e.g., a function call within an existing method, or an import statement), perform this as a separate, more straightforward edit within the `MultiEdit` call. This edit usually has a more clearly defined and less ambiguous `old_string`.
**Rationale**:
* By anchoring the main insertion on a very stable, unique line *after* the insertion point and prepending the new code to it, you reduce the risk of `old_string` mismatches caused by subtle variations in the code *before* the insertion point.
* Keeping ancillary edits separate allows them to succeed even if the main insertion point is complex, as they often target simpler, more reliable `old_string` patterns.
* This approach leverages `MultiEdit`'s sequential application of changes effectively.
**Example Scenario**: Adding new methods to a class and a call to one of these new methods elsewhere.
* **Edit 1**: Insert the new methods. `old_string` is the class's closing brace `}`. `new_string` is `
[new methods code]
}`.
* **Edit 2**: Insert the call to a new method. `old_string` is `// existing line before call`. `new_string` is `// existing line before call
this.newMethodCall();`.
This method provides a balance between precise editing and handling larger code insertions reliably when direct `old_string` matches for the entire new block are problematic.
## Handling Large Files for Incremental Refactoring
When refactoring large files incrementally rather than rewriting them completely:
1. **Initial Exploration and Planning**:
* Begin with targeted searches using `Grep` to locate specific patterns or sections within the file.
* Use `Bash` commands like `grep -n "pattern" file` to find line numbers for specific areas of interest.
* Create a clear mental model of the file structure before proceeding with edits.
2. **Chunked Reading for Large Files**:
* For files too large to read at once, use multiple `Read` operations with different `offset` and `limit` parameters.
* Read sequential chunks to build a complete understanding of the file.
* Use `Grep` to pinpoint key sections, then read just those sections with targeted `offset` parameters.
3. **Finding Key Implementation Sections**:
* Use `Bash` commands with `grep -A N` (to show N lines after a match) or `grep -B N` (to show N lines before) to locate function or method implementations.
* Example: `grep -n "function findTagBoundaries" -A 20 filename.js` to see the first 20 lines of a function.
4. **Pattern-Based Replacement Strategy**:
* Identify common patterns that need to be replaced across the file.
* Use the `Bash` tool with `sed` for quick previews of potential replacements.
* Example: `sed -n "s/oldPattern/newPattern/gp" filename.js` to preview changes without making them.
5. **Sequential Selective Edits**:
* Target specific sections or patterns one at a time rather than attempting a complete rewrite.
* Focus on clearest/simplest cases first to establish a pattern of successful edits.
* Use `Edit` for well-defined single changes within the file.
6. **Batch Similar Changes Together**:
* Group similar types of changes (e.g., all references to a particular function or variable).
* Use `Bash` with `sed` to preview the scope of batch changes: `grep -n "pattern" filename.js | wc -l`
* For systematic changes across a file, consider using `sed` through the `Bash` tool: `sed -i "s/oldPattern/newPattern/g" filename.js`
7. **Incremental Verification**:
* After each set of changes, verify the specific sections that were modified.
* For critical components, read the surrounding context to ensure the changes integrate correctly.
* Validate that each change maintains the file's structure and logic before proceeding to the next.
8. **Progress Tracking for Large Refactors**:
* Use the `TodoWrite` tool to track which sections or patterns have been updated.
* Create a checklist of all required changes and mark them off as they're completed.
* Record any sections that require special attention or that couldn't be automatically refactored.
## Commit Messages
When Claude Code generates commit messages on your behalf:
* The `Co-Authored-By: Claude <noreply@anthropic.com>` line will **not** be included.
* The `🤖 Generated with [Claude Code](https://claude.ai/code)` line will **not** be included.
## General Interaction
Claude Code will directly apply proposed changes and modifications using the available tools, rather than describing them and asking you to implement them manually. This ensures a more efficient and direct workflow.

82
QUICKSTART.md Normal file
View File

@@ -0,0 +1,82 @@
# Quick Start Guide
## 🚀 Get Started in 3 Steps
### Step 1: Install Dependencies
```bash
# Using UV (recommended)
uv sync
# Or using pip
pip install -r requirements.txt
```
### Step 2: Configure Your Provider
Choose your LLM provider and configure accordingly:
#### OpenAI
```bash
cp .env.example .env
# Edit .env:
# OPENAI_API_KEY="sk-your-openai-key"
# BIG_MODEL="gpt-4o"
# SMALL_MODEL="gpt-4o-mini"
```
#### Azure OpenAI
```bash
cp .env.example .env
# Edit .env:
# OPENAI_API_KEY="your-azure-key"
# OPENAI_BASE_URL="https://your-resource.openai.azure.com/openai/deployments/your-deployment"
# BIG_MODEL="gpt-4"
# SMALL_MODEL="gpt-35-turbo"
```
#### Local Models (Ollama)
```bash
cp .env.example .env
# Edit .env:
# OPENAI_API_KEY="dummy-key"
# OPENAI_BASE_URL="http://localhost:11434/v1"
# BIG_MODEL="llama3.1:70b"
# SMALL_MODEL="llama3.1:8b"
```
### Step 3: Start and Use
```bash
# Start the proxy server
python start_proxy.py
# In another terminal, use with Claude Code
ANTHROPIC_BASE_URL=http://localhost:8082 claude
```
## 🎯 How It Works
| Your Input | Proxy Action | Result |
|-----------|--------------|--------|
| Claude Code sends `claude-3-5-sonnet-20241022` | Maps to your `BIG_MODEL` | Uses `gpt-4o` (or whatever you configured) |
| Claude Code sends `claude-3-5-haiku-20241022` | Maps to your `SMALL_MODEL` | Uses `gpt-4o-mini` (or whatever you configured) |
## 📋 What You Need
- Python 3.9+
- API key for your chosen provider
- Claude Code CLI installed
- 2 minutes to configure
## 🔧 Default Settings
- Server runs on `http://localhost:8082`
- Maps haiku → SMALL_MODEL, sonnet/opus → BIG_MODEL
- Supports streaming, function calling, images
## 🧪 Test Your Setup
```bash
# Quick test
python src/test_claude_to_openai.py
```
That's it! Now Claude Code can use any OpenAI-compatible provider! 🎉

267
README.md Executable file
View File

@@ -0,0 +1,267 @@
# Claude Code Proxy
A proxy server that enables **Claude Code** to work with OpenAI-compatible API providers. Convert Claude API requests to OpenAI API calls, allowing you to use various LLM providers through the Claude Code CLI.
![Claude Code Proxy](demo.png)
## Features
- **Full Claude API Compatibility**: Complete `/v1/messages` endpoint support
- **Multiple Provider Support**: OpenAI, Azure OpenAI, local models (Ollama), and any OpenAI-compatible API
- **Smart Model Mapping**: Configure BIG and SMALL models via environment variables
- **Function Calling**: Complete tool use support with proper conversion
- **Streaming Responses**: Real-time SSE streaming support
- **Image Support**: Base64 encoded image input
- **Error Handling**: Comprehensive error handling and logging
## Quick Start
### 1. Install Dependencies
```bash
# Using UV (recommended)
uv sync
# Or using pip
pip install -r requirements.txt
```
### 2. Configure
```bash
cp .env.example .env
# Edit .env and add your API configuration
```
### 3. Start Server
```bash
# Direct run
python start_proxy.py
# Or with UV
uv run claude-code-proxy
```
### 4. Use with Claude Code
```bash
ANTHROPIC_BASE_URL=http://localhost:8082 claude
```
## Configuration
### Environment Variables
**Required:**
- `OPENAI_API_KEY` - Your API key for the target provider
**Model Configuration:**
- `BIG_MODEL` - Model for Claude sonnet/opus requests (default: `gpt-4o`)
- `SMALL_MODEL` - Model for Claude haiku requests (default: `gpt-4o-mini`)
**API Configuration:**
- `OPENAI_BASE_URL` - API base URL (default: `https://api.openai.com/v1`)
**Server Settings:**
- `HOST` - Server host (default: `0.0.0.0`)
- `PORT` - Server port (default: `8082`)
- `LOG_LEVEL` - Logging level (default: `WARNING`)
**Performance:**
- `MAX_TOKENS_LIMIT` - Token limit (default: `4096`)
- `REQUEST_TIMEOUT` - Request timeout in seconds (default: `90`)
### Model Mapping
The proxy maps Claude model requests to your configured models:
| Claude Request | Mapped To | Environment Variable |
| ------------------------------ | ------------- | ---------------------- |
| Models with "haiku" | `SMALL_MODEL` | Default: `gpt-4o-mini` |
| Models with "sonnet" or "opus" | `BIG_MODEL` | Default: `gpt-4o` |
### Provider Examples
#### OpenAI
```bash
OPENAI_API_KEY="sk-your-openai-key"
OPENAI_BASE_URL="https://api.openai.com/v1"
BIG_MODEL="gpt-4o"
SMALL_MODEL="gpt-4o-mini"
```
#### Azure OpenAI
```bash
OPENAI_API_KEY="your-azure-key"
OPENAI_BASE_URL="https://your-resource.openai.azure.com/openai/deployments/your-deployment"
BIG_MODEL="gpt-4"
SMALL_MODEL="gpt-35-turbo"
```
#### Local Models (Ollama)
```bash
OPENAI_API_KEY="dummy-key" # Required but can be dummy
OPENAI_BASE_URL="http://localhost:11434/v1"
BIG_MODEL="llama3.1:70b"
SMALL_MODEL="llama3.1:8b"
```
#### Other Providers
Any OpenAI-compatible API can be used by setting the appropriate `OPENAI_BASE_URL`.
## Usage Examples
### Basic Chat
```python
import httpx
response = httpx.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022", # Maps to BIG_MODEL
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Hello!"}
]
}
)
```
### Function Calling
```python
response = httpx.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-haiku-20241022", # Maps to SMALL_MODEL
"max_tokens": 200,
"messages": [
{"role": "user", "content": "What's the weather like?"}
],
"tools": [
{
"name": "get_weather",
"description": "Get current weather",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
}
)
```
### Streaming
```python
import httpx
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Tell me a story"}
],
"stream": True
}
) as response:
async for line in response.aiter_lines():
if line.strip():
print(line)
```
## API Endpoints
- `POST /v1/messages` - Main chat completion endpoint (Claude format)
- `POST /v1/messages/count_tokens` - Token counting
- `GET /health` - Health check
- `GET /test-connection` - Test provider API connectivity
- `GET /` - Server information and configuration
## Integration with Claude Code
This proxy is designed to work seamlessly with Claude Code CLI:
```bash
# Start the proxy
python start_proxy.py
# Use Claude Code with the proxy
ANTHROPIC_BASE_URL=http://localhost:8082 claude
# Or set permanently
export ANTHROPIC_BASE_URL=http://localhost:8082
claude
```
## Testing
Test the proxy functionality:
```bash
# Run comprehensive tests
python src/test_claude_to_openai.py
```
## Development
### Using UV
```bash
# Install dependencies
uv sync
# Run server
uv run claude-code-proxy
# Format code
uv run black src/
uv run isort src/
# Type checking
uv run mypy src/
```
### Project Structure
```
claude-code-proxy/
├── src/
│ ├── claude_to_openai_server.py # Main server
│ ├── test_claude_to_openai.py # Tests
│ └── [other modules...]
├── start_proxy.py # Startup script
├── .env.example # Config template
└── README.md # This file
```
## Performance
- **Async/await** for high concurrency
- **Connection pooling** for efficiency
- **Streaming support** for real-time responses
- **Configurable timeouts** and retries
- **Smart error handling** with detailed logging
## License
MIT License

BIN
demo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 751 KiB

72
pyproject.toml Normal file
View File

@@ -0,0 +1,72 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "claude-code-proxy"
version = "1.0.0"
description = "Proxy server enabling Claude Code to work with OpenAI-compatible API providers"
readme = "README.md"
authors = [
{name = "Claude Code Proxy", email = "noreply@example.com"}
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
requires-python = ">=3.9"
dependencies = [
"fastapi[standard]>=0.115.11",
"uvicorn>=0.34.0",
"pydantic>=2.0.0",
"python-dotenv>=1.0.0",
"openai>=1.54.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"httpx>=0.25.0",
]
[project.urls]
Homepage = "https://github.com/holegots/claude-code-proxy"
Repository = "https://github.com/holegots/claude-code-proxy.git"
Issues = "https://github.com/holegots/claude-code-proxy/issues"
[project.scripts]
claude-code-proxy = "src.claude_to_openai_server:main"
[tool.uv]
dev-dependencies = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"black>=23.0.0",
"isort>=5.12.0",
"mypy>=1.0.0",
]
[tool.black]
line-length = 100
target-version = ['py38']
[tool.isort]
profile = "black"
line_length = 100
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.mypy]
python_version = "3.9"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true

11
src/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""Claude Code Proxy
A proxy server that enables Claude Code to work with OpenAI-compatible API providers.
"""
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
__version__ = "1.0.0"
__author__ = "Claude Code Proxy"

208
src/api/endpoints.py Normal file
View File

@@ -0,0 +1,208 @@
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from datetime import datetime
import uuid
from src.core.config import config
from src.core.logging import logger
from src.core.client import OpenAIClient
from src.models.claude import ClaudeMessagesRequest, ClaudeTokenCountRequest
from src.conversion.request_converter import convert_claude_to_openai
from src.conversion.response_converter import (
convert_openai_to_claude_response,
convert_openai_streaming_to_claude_with_cancellation,
)
from src.core.model_manager import model_manager
router = APIRouter()
openai_client = OpenAIClient(
config.openai_api_key, config.openai_base_url, config.request_timeout
)
@router.post("/v1/messages")
async def create_message(request: ClaudeMessagesRequest, http_request: Request):
try:
logger.debug(
f"Processing Claude request: model={request.model}, stream={request.stream}"
)
# Generate unique request ID for cancellation tracking
request_id = str(uuid.uuid4())
# Convert Claude request to OpenAI format
openai_request = convert_claude_to_openai(request, model_manager)
# Check if client disconnected before processing
if await http_request.is_disconnected():
raise HTTPException(status_code=499, detail="Client disconnected")
if request.stream:
# Streaming response - wrap in error handling
try:
openai_stream = openai_client.create_chat_completion_stream(
openai_request, request_id
)
return StreamingResponse(
convert_openai_streaming_to_claude_with_cancellation(
openai_stream,
request,
logger,
http_request,
openai_client,
request_id,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
},
)
except HTTPException as e:
# Convert to proper error response for streaming
logger.error(f"Streaming error: {e.detail}")
import traceback
logger.error(traceback.format_exc())
error_message = openai_client.classify_openai_error(e.detail)
error_response = {
"type": "error",
"error": {"type": "api_error", "message": error_message},
}
return JSONResponse(status_code=e.status_code, content=error_response)
else:
# Non-streaming response
openai_response = await openai_client.create_chat_completion(
openai_request, request_id
)
claude_response = convert_openai_to_claude_response(
openai_response, request
)
# Check minimum tokens limit
if "usage" in claude_response and "output_tokens" in claude_response["usage"]:
output_tokens = claude_response["usage"]["output_tokens"]
if output_tokens < config.min_tokens_limit:
raise HTTPException(
status_code=400,
detail=f"Output tokens ({output_tokens}) is less than minimum limit ({config.min_tokens_limit}))",
)
except HTTPException:
raise
except Exception as e:
import traceback
logger.error(f"Unexpected error processing request: {e}")
logger.error(traceback.format_exc())
error_message = openai_client.classify_openai_error(str(e))
raise HTTPException(status_code=500, detail=error_message)
@router.post("/v1/messages/count_tokens")
async def count_tokens(request: ClaudeTokenCountRequest):
try:
# For token counting, we'll use a simple estimation
# In a real implementation, you might want to use tiktoken or similar
total_chars = 0
# Count system message characters
if request.system:
if isinstance(request.system, str):
total_chars += len(request.system)
elif isinstance(request.system, list):
for block in request.system:
if hasattr(block, "text"):
total_chars += len(block.text)
# Count message characters
for msg in request.messages:
if isinstance(msg.content, str):
total_chars += len(msg.content)
elif isinstance(msg.content, list):
for block in msg.content:
if hasattr(block, "text"):
total_chars += len(block.text)
# Rough estimation: 4 characters per token
estimated_tokens = max(1, total_chars // 4)
return {"input_tokens": estimated_tokens}
except Exception as e:
logger.error(f"Error counting tokens: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"openai_api_configured": bool(config.openai_api_key),
"api_key_valid": config.validate_api_key(),
}
@router.get("/test-connection")
async def test_connection():
"""Test API connectivity to OpenAI"""
try:
# Simple test request to verify API connectivity
test_response = await openai_client.create_chat_completion(
{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 5,
}
)
return {
"status": "success",
"message": "Successfully connected to OpenAI API",
"model_used": "gpt-3.5-turbo",
"timestamp": datetime.now().isoformat(),
"response_id": test_response.get("id", "unknown"),
}
except Exception as e:
logger.error(f"API connectivity test failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "failed",
"error_type": "API Error",
"message": str(e),
"timestamp": datetime.now().isoformat(),
"suggestions": [
"Check your OPENAI_API_KEY is valid",
"Verify your API key has the necessary permissions",
"Check if you have reached rate limits",
],
},
)
@router.get("/")
async def root():
"""Root endpoint"""
return {
"message": "Claude-to-OpenAI API Proxy v1.0.0",
"status": "running",
"config": {
"openai_base_url": config.openai_base_url,
"max_tokens_limit": config.max_tokens_limit,
"api_key_configured": bool(config.openai_api_key),
"big_model": config.big_model,
"small_model": config.small_model,
},
"endpoints": {
"messages": "/v1/messages",
"count_tokens": "/v1/messages/count_tokens",
"health": "/health",
"test_connection": "/test-connection",
},
}

View File

@@ -0,0 +1,258 @@
import json
from typing import Dict, Any, List
from venv import logger
from src.core.constants import Constants
from src.models.claude import ClaudeMessagesRequest, ClaudeMessage
from src.core.config import config
import logging
logger = logging.getLogger(__name__)
def convert_claude_to_openai(
claude_request: ClaudeMessagesRequest, model_manager
) -> Dict[str, Any]:
"""Convert Claude API request format to OpenAI format."""
# Map model
openai_model = model_manager.map_claude_model_to_openai(claude_request.model)
# Convert messages
openai_messages = []
# Add system message if present
if claude_request.system:
system_text = ""
if isinstance(claude_request.system, str):
system_text = claude_request.system
elif isinstance(claude_request.system, list):
text_parts = []
for block in claude_request.system:
if hasattr(block, "type") and block.type == Constants.CONTENT_TEXT:
text_parts.append(block.text)
elif (
isinstance(block, dict)
and block.get("type") == Constants.CONTENT_TEXT
):
text_parts.append(block.get("text", ""))
system_text = "\n\n".join(text_parts)
if system_text.strip():
openai_messages.append(
{"role": Constants.ROLE_SYSTEM, "content": system_text.strip()}
)
# Process Claude messages
i = 0
while i < len(claude_request.messages):
msg = claude_request.messages[i]
if msg.role == Constants.ROLE_USER:
openai_message = convert_claude_user_message(msg)
openai_messages.append(openai_message)
elif msg.role == Constants.ROLE_ASSISTANT:
openai_message = convert_claude_assistant_message(msg)
openai_messages.append(openai_message)
# Check if next message contains tool results
if i + 1 < len(claude_request.messages):
next_msg = claude_request.messages[i + 1]
if (
next_msg.role == Constants.ROLE_USER
and isinstance(next_msg.content, list)
and any(
block.type == Constants.CONTENT_TOOL_RESULT
for block in next_msg.content
if hasattr(block, "type")
)
):
# Process tool results
i += 1 # Skip to tool result message
tool_results = convert_claude_tool_results(next_msg)
openai_messages.extend(tool_results)
i += 1
# Build OpenAI request
openai_request = {
"model": openai_model,
"messages": openai_messages,
"max_tokens": min(
max(claude_request.max_tokens, config.min_tokens_limit),
config.max_tokens_limit,
),
"temperature": claude_request.temperature,
"stream": claude_request.stream,
}
logger.info(
f"Converted Claude request to OpenAI format: {json.dumps(openai_request, indent=2, ensure_ascii=False)}"
)
# Add optional parameters
if claude_request.stop_sequences:
openai_request["stop"] = claude_request.stop_sequences
if claude_request.top_p is not None:
openai_request["top_p"] = claude_request.top_p
# Convert tools
if claude_request.tools:
openai_tools = []
for tool in claude_request.tools:
if tool.name and tool.name.strip():
openai_tools.append(
{
"type": Constants.TOOL_FUNCTION,
Constants.TOOL_FUNCTION: {
"name": tool.name,
"description": tool.description or "",
"parameters": tool.input_schema,
},
}
)
if openai_tools:
openai_request["tools"] = openai_tools
# Convert tool choice
if claude_request.tool_choice:
choice_type = claude_request.tool_choice.get("type")
if choice_type == "auto":
openai_request["tool_choice"] = "auto"
elif choice_type == "any":
openai_request["tool_choice"] = "auto"
elif choice_type == "tool" and "name" in claude_request.tool_choice:
openai_request["tool_choice"] = {
"type": Constants.TOOL_FUNCTION,
Constants.TOOL_FUNCTION: {"name": claude_request.tool_choice["name"]},
}
else:
openai_request["tool_choice"] = "auto"
return openai_request
def convert_claude_user_message(msg: ClaudeMessage) -> Dict[str, Any]:
"""Convert Claude user message to OpenAI format."""
if isinstance(msg.content, str):
return {"role": Constants.ROLE_USER, "content": msg.content}
# Handle multimodal content
openai_content = []
for block in msg.content:
if block.type == Constants.CONTENT_TEXT:
openai_content.append({"type": "text", "text": block.text})
elif block.type == Constants.CONTENT_IMAGE:
# Convert Claude image format to OpenAI format
if (
isinstance(block.source, dict)
and block.source.get("type") == "base64"
and "media_type" in block.source
and "data" in block.source
):
openai_content.append(
{
"type": "image_url",
"image_url": {
"url": f"data:{block.source['media_type']};base64,{block.source['data']}"
},
}
)
if len(openai_content) == 1 and openai_content[0]["type"] == "text":
return {"role": Constants.ROLE_USER, "content": openai_content[0]["text"]}
else:
return {"role": Constants.ROLE_USER, "content": openai_content}
def convert_claude_assistant_message(msg: ClaudeMessage) -> Dict[str, Any]:
"""Convert Claude assistant message to OpenAI format."""
text_parts = []
tool_calls = []
if isinstance(msg.content, str):
return {"role": Constants.ROLE_ASSISTANT, "content": msg.content}
for block in msg.content:
if block.type == Constants.CONTENT_TEXT:
text_parts.append(block.text)
elif block.type == Constants.CONTENT_TOOL_USE:
tool_calls.append(
{
"id": block.id,
"type": Constants.TOOL_FUNCTION,
Constants.TOOL_FUNCTION: {
"name": block.name,
"arguments": json.dumps(block.input, ensure_ascii=False),
},
}
)
openai_message = {"role": Constants.ROLE_ASSISTANT}
# Set content
if text_parts:
openai_message["content"] = "".join(text_parts)
else:
openai_message["content"] = None
# Set tool calls
if tool_calls:
openai_message["tool_calls"] = tool_calls
return openai_message
def convert_claude_tool_results(msg: ClaudeMessage) -> List[Dict[str, Any]]:
"""Convert Claude tool results to OpenAI format."""
tool_messages = []
if isinstance(msg.content, list):
for block in msg.content:
if block.type == Constants.CONTENT_TOOL_RESULT:
content = parse_tool_result_content(block.content)
tool_messages.append(
{
"role": Constants.ROLE_TOOL,
"tool_call_id": block.tool_use_id,
"content": content,
}
)
return tool_messages
def parse_tool_result_content(content):
"""Parse and normalize tool result content into a string format."""
if content is None:
return "No content provided"
if isinstance(content, str):
return content
if isinstance(content, list):
result_parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == Constants.CONTENT_TEXT:
result_parts.append(item.get("text", ""))
elif isinstance(item, str):
result_parts.append(item)
elif isinstance(item, dict):
if "text" in item:
result_parts.append(item.get("text", ""))
else:
try:
result_parts.append(json.dumps(item, ensure_ascii=False))
except:
result_parts.append(str(item))
return "\n".join(result_parts).strip()
if isinstance(content, dict):
if content.get("type") == Constants.CONTENT_TEXT:
return content.get("text", "")
try:
return json.dumps(content, ensure_ascii=False)
except:
return str(content)
try:
return str(content)
except:
return "Unparseable content"

View File

@@ -0,0 +1,374 @@
import json
import uuid
from fastapi import HTTPException, Request
from src.core.constants import Constants
from src.models.claude import ClaudeMessagesRequest
def convert_openai_to_claude_response(
openai_response: dict, original_request: ClaudeMessagesRequest
) -> dict:
"""Convert OpenAI response to Claude format."""
# Extract response data
choices = openai_response.get("choices", [])
if not choices:
raise HTTPException(status_code=500, detail="No choices in OpenAI response")
choice = choices[0]
message = choice.get("message", {})
# Build Claude content blocks
content_blocks = []
# Add text content
text_content = message.get("content")
if text_content:
content_blocks.append({"type": Constants.CONTENT_TEXT, "text": text_content})
# Add tool calls
tool_calls = message.get("tool_calls", []) or []
for tool_call in tool_calls:
if tool_call.get("type") == Constants.TOOL_FUNCTION:
function_data = tool_call.get(Constants.TOOL_FUNCTION, {})
try:
arguments = json.loads(function_data.get("arguments", "{}"))
except json.JSONDecodeError:
arguments = {"raw_arguments": function_data.get("arguments", "")}
content_blocks.append(
{
"type": Constants.CONTENT_TOOL_USE,
"id": tool_call.get("id", f"tool_{uuid.uuid4()}"),
"name": function_data.get("name", ""),
"input": arguments,
}
)
# Ensure at least one content block
if not content_blocks:
content_blocks.append({"type": Constants.CONTENT_TEXT, "text": ""})
# Map finish reason
finish_reason = choice.get("finish_reason", "stop")
stop_reason = {
"stop": Constants.STOP_END_TURN,
"length": Constants.STOP_MAX_TOKENS,
"tool_calls": Constants.STOP_TOOL_USE,
"function_call": Constants.STOP_TOOL_USE,
}.get(finish_reason, Constants.STOP_END_TURN)
# Build Claude response
claude_response = {
"id": openai_response.get("id", f"msg_{uuid.uuid4()}"),
"type": "message",
"role": Constants.ROLE_ASSISTANT,
"model": original_request.model,
"content": content_blocks,
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
"input_tokens": openai_response.get("usage", {}).get("prompt_tokens", 0),
"output_tokens": openai_response.get("usage", {}).get(
"completion_tokens", 0
),
},
}
return claude_response
async def convert_openai_streaming_to_claude(
openai_stream, original_request: ClaudeMessagesRequest, logger
):
"""Convert OpenAI streaming response to Claude streaming format."""
message_id = f"msg_{uuid.uuid4().hex[:24]}"
# Send initial SSE events
yield f"event: {Constants.EVENT_MESSAGE_START}\ndata: {json.dumps({'type': Constants.EVENT_MESSAGE_START, 'message': {'id': message_id, 'type': 'message', 'role': Constants.ROLE_ASSISTANT, 'model': original_request.model, 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 0, 'output_tokens': 0}}}, ensure_ascii=False)}\n\n"
yield f"event: {Constants.EVENT_CONTENT_BLOCK_START}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_START, 'index': 0, 'content_block': {'type': Constants.CONTENT_TEXT, 'text': ''}}, ensure_ascii=False)}\n\n"
yield f"event: {Constants.EVENT_PING}\ndata: {json.dumps({'type': Constants.EVENT_PING}, ensure_ascii=False)}\n\n"
# Process streaming chunks
text_block_index = 0
tool_block_counter = 0
current_tool_calls = {}
final_stop_reason = Constants.STOP_END_TURN
try:
async for line in openai_stream:
if line.strip():
if line.startswith("data: "):
chunk_data = line[6:]
if chunk_data.strip() == "[DONE]":
break
try:
chunk = json.loads(chunk_data)
choices = chunk.get("choices", [])
if not choices:
continue
except json.JSONDecodeError as e:
logger.warning(
f"Failed to parse chunk: {chunk_data}, error: {e}"
)
continue
choice = choices[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
# Handle text delta
if "content" in delta and delta["content"]:
yield f"event: {Constants.EVENT_CONTENT_BLOCK_DELTA}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_DELTA, 'index': text_block_index, 'delta': {'type': Constants.DELTA_TEXT, 'text': delta['content']}}, ensure_ascii=False)}\n\n"
# Handle tool call deltas with improved incremental processing
if "tool_calls" in delta:
for tc_delta in delta["tool_calls"]:
tc_index = tc_delta.get("index", 0)
# Initialize tool call tracking by index if not exists
if tc_index not in current_tool_calls:
current_tool_calls[tc_index] = {
"id": None,
"name": None,
"args_buffer": "",
"json_sent": False,
"claude_index": None,
"started": False
}
tool_call = current_tool_calls[tc_index]
# Update tool call ID if provided
if tc_delta.get("id"):
tool_call["id"] = tc_delta["id"]
# Update function name and start content block if we have both id and name
function_data = tc_delta.get(Constants.TOOL_FUNCTION, {})
if function_data.get("name"):
tool_call["name"] = function_data["name"]
# Start content block when we have complete initial data
if (tool_call["id"] and tool_call["name"] and not tool_call["started"]):
tool_block_counter += 1
claude_index = text_block_index + tool_block_counter
tool_call["claude_index"] = claude_index
tool_call["started"] = True
yield f"event: {Constants.EVENT_CONTENT_BLOCK_START}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_START, 'index': claude_index, 'content_block': {'type': Constants.CONTENT_TOOL_USE, 'id': tool_call['id'], 'name': tool_call['name'], 'input': {}}}, ensure_ascii=False)}\n\n"
# Handle function arguments
if "arguments" in function_data and tool_call["started"]:
tool_call["args_buffer"] += function_data["arguments"]
# Try to parse complete JSON and send delta when we have valid JSON
try:
json.loads(tool_call["args_buffer"])
# If parsing succeeds and we haven't sent this JSON yet
if not tool_call["json_sent"]:
yield f"event: {Constants.EVENT_CONTENT_BLOCK_DELTA}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_DELTA, 'index': tool_call['claude_index'], 'delta': {'type': Constants.DELTA_INPUT_JSON, 'partial_json': tool_call['args_buffer']}}, ensure_ascii=False)}\n\n"
tool_call["json_sent"] = True
except json.JSONDecodeError:
# JSON is incomplete, continue accumulating
pass
# Handle finish reason
if finish_reason:
if finish_reason == "length":
final_stop_reason = Constants.STOP_MAX_TOKENS
elif finish_reason in ["tool_calls", "function_call"]:
final_stop_reason = Constants.STOP_TOOL_USE
elif finish_reason == "stop":
final_stop_reason = Constants.STOP_END_TURN
else:
final_stop_reason = Constants.STOP_END_TURN
break
except Exception as e:
# Handle any streaming errors gracefully
logger.error(f"Streaming error: {e}")
import traceback
logger.error(traceback.format_exc())
error_event = {
"type": "error",
"error": {"type": "api_error", "message": f"Streaming error: {str(e)}"},
}
yield f"event: error\ndata: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return
# Send final SSE events
yield f"event: {Constants.EVENT_CONTENT_BLOCK_STOP}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_STOP, 'index': text_block_index}, ensure_ascii=False)}\n\n"
for tool_data in current_tool_calls.values():
if tool_data.get("started") and tool_data.get("claude_index") is not None:
yield f"event: {Constants.EVENT_CONTENT_BLOCK_STOP}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_STOP, 'index': tool_data['claude_index']}, ensure_ascii=False)}\n\n"
usage_data = {"input_tokens": 0, "output_tokens": 0}
yield f"event: {Constants.EVENT_MESSAGE_DELTA}\ndata: {json.dumps({'type': Constants.EVENT_MESSAGE_DELTA, 'delta': {'stop_reason': final_stop_reason, 'stop_sequence': None}, 'usage': usage_data}, ensure_ascii=False)}\n\n"
yield f"event: {Constants.EVENT_MESSAGE_STOP}\ndata: {json.dumps({'type': Constants.EVENT_MESSAGE_STOP}, ensure_ascii=False)}\n\n"
async def convert_openai_streaming_to_claude_with_cancellation(
openai_stream,
original_request: ClaudeMessagesRequest,
logger,
http_request: Request,
openai_client,
request_id: str,
):
"""Convert OpenAI streaming response to Claude streaming format with cancellation support."""
message_id = f"msg_{uuid.uuid4().hex[:24]}"
# Send initial SSE events
yield f"event: {Constants.EVENT_MESSAGE_START}\ndata: {json.dumps({'type': Constants.EVENT_MESSAGE_START, 'message': {'id': message_id, 'type': 'message', 'role': Constants.ROLE_ASSISTANT, 'model': original_request.model, 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 0, 'output_tokens': 0}}}, ensure_ascii=False)}\n\n"
yield f"event: {Constants.EVENT_CONTENT_BLOCK_START}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_START, 'index': 0, 'content_block': {'type': Constants.CONTENT_TEXT, 'text': ''}}, ensure_ascii=False)}\n\n"
yield f"event: {Constants.EVENT_PING}\ndata: {json.dumps({'type': Constants.EVENT_PING}, ensure_ascii=False)}\n\n"
# Process streaming chunks
text_block_index = 0
tool_block_counter = 0
current_tool_calls = {}
final_stop_reason = Constants.STOP_END_TURN
try:
async for line in openai_stream:
# Check if client disconnected
if await http_request.is_disconnected():
logger.info(f"Client disconnected, cancelling request {request_id}")
openai_client.cancel_request(request_id)
break
if line.strip():
if line.startswith("data: "):
chunk_data = line[6:]
if chunk_data.strip() == "[DONE]":
break
try:
chunk = json.loads(chunk_data)
choices = chunk.get("choices", [])
if not choices:
continue
except json.JSONDecodeError as e:
logger.warning(
f"Failed to parse chunk: {chunk_data}, error: {e}"
)
continue
choice = choices[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
# Handle text delta
if "content" in delta and delta["content"]:
yield f"event: {Constants.EVENT_CONTENT_BLOCK_DELTA}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_DELTA, 'index': text_block_index, 'delta': {'type': Constants.DELTA_TEXT, 'text': delta['content']}}, ensure_ascii=False)}\n\n"
# Handle tool call deltas with improved incremental processing
if "tool_calls" in delta and delta["tool_calls"]:
for tc_delta in delta["tool_calls"]:
tc_index = tc_delta.get("index", 0)
# Initialize tool call tracking by index if not exists
if tc_index not in current_tool_calls:
current_tool_calls[tc_index] = {
"id": None,
"name": None,
"args_buffer": "",
"json_sent": False,
"claude_index": None,
"started": False
}
tool_call = current_tool_calls[tc_index]
# Update tool call ID if provided
if tc_delta.get("id"):
tool_call["id"] = tc_delta["id"]
# Update function name and start content block if we have both id and name
function_data = tc_delta.get(Constants.TOOL_FUNCTION, {})
if function_data.get("name"):
tool_call["name"] = function_data["name"]
# Start content block when we have complete initial data
if (tool_call["id"] and tool_call["name"] and not tool_call["started"]):
tool_block_counter += 1
claude_index = text_block_index + tool_block_counter
tool_call["claude_index"] = claude_index
tool_call["started"] = True
yield f"event: {Constants.EVENT_CONTENT_BLOCK_START}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_START, 'index': claude_index, 'content_block': {'type': Constants.CONTENT_TOOL_USE, 'id': tool_call['id'], 'name': tool_call['name'], 'input': {}}}, ensure_ascii=False)}\n\n"
# Handle function arguments
if "arguments" in function_data and tool_call["started"]:
tool_call["args_buffer"] += function_data["arguments"]
# Try to parse complete JSON and send delta when we have valid JSON
try:
json.loads(tool_call["args_buffer"])
# If parsing succeeds and we haven't sent this JSON yet
if not tool_call["json_sent"]:
yield f"event: {Constants.EVENT_CONTENT_BLOCK_DELTA}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_DELTA, 'index': tool_call['claude_index'], 'delta': {'type': Constants.DELTA_INPUT_JSON, 'partial_json': tool_call['args_buffer']}}, ensure_ascii=False)}\n\n"
tool_call["json_sent"] = True
except json.JSONDecodeError:
# JSON is incomplete, continue accumulating
pass
# Handle finish reason
if finish_reason:
if finish_reason == "length":
final_stop_reason = Constants.STOP_MAX_TOKENS
elif finish_reason in ["tool_calls", "function_call"]:
final_stop_reason = Constants.STOP_TOOL_USE
elif finish_reason == "stop":
final_stop_reason = Constants.STOP_END_TURN
else:
final_stop_reason = Constants.STOP_END_TURN
break
except HTTPException as e:
# Handle cancellation
if e.status_code == 499:
logger.info(f"Request {request_id} was cancelled")
error_event = {
"type": "error",
"error": {
"type": "cancelled",
"message": "Request was cancelled by client",
},
}
yield f"event: error\ndata: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return
else:
raise
except Exception as e:
# Handle any streaming errors gracefully
logger.error(f"Streaming error: {e}")
import traceback
logger.error(traceback.format_exc())
error_event = {
"type": "error",
"error": {"type": "api_error", "message": f"Streaming error: {str(e)}"},
}
yield f"event: error\ndata: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return
# Send final SSE events
yield f"event: {Constants.EVENT_CONTENT_BLOCK_STOP}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_STOP, 'index': text_block_index}, ensure_ascii=False)}\n\n"
for tool_data in current_tool_calls.values():
if tool_data.get("started") and tool_data.get("claude_index") is not None:
yield f"event: {Constants.EVENT_CONTENT_BLOCK_STOP}\ndata: {json.dumps({'type': Constants.EVENT_CONTENT_BLOCK_STOP, 'index': tool_data['claude_index']}, ensure_ascii=False)}\n\n"
usage_data = {"input_tokens": 0, "output_tokens": 0}
yield f"event: {Constants.EVENT_MESSAGE_DELTA}\ndata: {json.dumps({'type': Constants.EVENT_MESSAGE_DELTA, 'delta': {'stop_reason': final_stop_reason, 'stop_sequence': None}, 'usage': usage_data}, ensure_ascii=False)}\n\n"
yield f"event: {Constants.EVENT_MESSAGE_STOP}\ndata: {json.dumps({'type': Constants.EVENT_MESSAGE_STOP}, ensure_ascii=False)}\n\n"

159
src/core/client.py Normal file
View File

@@ -0,0 +1,159 @@
import asyncio
import json
from fastapi import HTTPException
from typing import Optional, AsyncGenerator, Dict, Any
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai._exceptions import APIError, RateLimitError, AuthenticationError, BadRequestError
class OpenAIClient:
"""Async OpenAI client with cancellation support."""
def __init__(self, api_key: str, base_url: str, timeout: int = 90):
self.api_key = api_key
self.base_url = base_url
self.client = AsyncOpenAI(
api_key=api_key,
base_url=base_url,
timeout=timeout
)
self.active_requests: Dict[str, asyncio.Event] = {}
async def create_chat_completion(self, request: Dict[str, Any], request_id: Optional[str] = None) -> Dict[str, Any]:
"""Send chat completion to OpenAI API with cancellation support."""
# Create cancellation token if request_id provided
if request_id:
cancel_event = asyncio.Event()
self.active_requests[request_id] = cancel_event
try:
# Create task that can be cancelled
completion_task = asyncio.create_task(
self.client.chat.completions.create(**request)
)
if request_id:
# Wait for either completion or cancellation
cancel_task = asyncio.create_task(cancel_event.wait())
done, pending = await asyncio.wait(
[completion_task, cancel_task],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Check if request was cancelled
if cancel_task in done:
completion_task.cancel()
raise HTTPException(status_code=499, detail="Request cancelled by client")
completion = await completion_task
else:
completion = await completion_task
# Convert to dict format that matches the original interface
return completion.model_dump()
except AuthenticationError as e:
raise HTTPException(status_code=401, detail=self.classify_openai_error(str(e)))
except RateLimitError as e:
raise HTTPException(status_code=429, detail=self.classify_openai_error(str(e)))
except BadRequestError as e:
raise HTTPException(status_code=400, detail=self.classify_openai_error(str(e)))
except APIError as e:
status_code = getattr(e, 'status_code', 500)
raise HTTPException(status_code=status_code, detail=self.classify_openai_error(str(e)))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
finally:
# Clean up active request tracking
if request_id and request_id in self.active_requests:
del self.active_requests[request_id]
async def create_chat_completion_stream(self, request: Dict[str, Any], request_id: Optional[str] = None) -> AsyncGenerator[str, None]:
"""Send streaming chat completion to OpenAI API with cancellation support."""
# Create cancellation token if request_id provided
if request_id:
cancel_event = asyncio.Event()
self.active_requests[request_id] = cancel_event
try:
# Ensure stream is enabled
request["stream"] = True
# Create the streaming completion
streaming_completion = await self.client.chat.completions.create(**request)
async for chunk in streaming_completion:
# Check for cancellation before yielding each chunk
if request_id and request_id in self.active_requests:
if self.active_requests[request_id].is_set():
raise HTTPException(status_code=499, detail="Request cancelled by client")
# Convert chunk to SSE format matching original HTTP client format
chunk_dict = chunk.model_dump()
chunk_json = json.dumps(chunk_dict, ensure_ascii=False)
yield f"data: {chunk_json}"
# Signal end of stream
yield "data: [DONE]"
except AuthenticationError as e:
raise HTTPException(status_code=401, detail=self.classify_openai_error(str(e)))
except RateLimitError as e:
raise HTTPException(status_code=429, detail=self.classify_openai_error(str(e)))
except BadRequestError as e:
raise HTTPException(status_code=400, detail=self.classify_openai_error(str(e)))
except APIError as e:
status_code = getattr(e, 'status_code', 500)
raise HTTPException(status_code=status_code, detail=self.classify_openai_error(str(e)))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
finally:
# Clean up active request tracking
if request_id and request_id in self.active_requests:
del self.active_requests[request_id]
def classify_openai_error(self, error_detail: Any) -> str:
"""Provide specific error guidance for common OpenAI API issues."""
error_str = str(error_detail).lower()
# Region/country restrictions
if "unsupported_country_region_territory" in error_str or "country, region, or territory not supported" in error_str:
return "OpenAI API is not available in your region. Consider using a VPN or Azure OpenAI service."
# API key issues
if "invalid_api_key" in error_str or "unauthorized" in error_str:
return "Invalid API key. Please check your OPENAI_API_KEY configuration."
# Rate limiting
if "rate_limit" in error_str or "quota" in error_str:
return "Rate limit exceeded. Please wait and try again, or upgrade your API plan."
# Model not found
if "model" in error_str and ("not found" in error_str or "does not exist" in error_str):
return "Model not found. Please check your BIG_MODEL and SMALL_MODEL configuration."
# Billing issues
if "billing" in error_str or "payment" in error_str:
return "Billing issue. Please check your OpenAI account billing status."
# Default: return original message
return str(error_detail)
def cancel_request(self, request_id: str) -> bool:
"""Cancel an active request by request_id."""
if request_id in self.active_requests:
self.active_requests[request_id].set()
return True
return False

40
src/core/config.py Normal file
View File

@@ -0,0 +1,40 @@
import os
import sys
# Configuration
class Config:
def __init__(self):
self.openai_api_key = os.environ.get("OPENAI_API_KEY")
if not self.openai_api_key:
raise ValueError("OPENAI_API_KEY not found in environment variables")
self.openai_base_url = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
self.host = os.environ.get("HOST", "0.0.0.0")
self.port = int(os.environ.get("PORT", "8082"))
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
self.max_tokens_limit = int(os.environ.get("MAX_TOKENS_LIMIT", "4096"))
self.min_tokens_limit = int(os.environ.get("MIN_TOKENS_LIMIT", "100"))
# Connection settings
self.request_timeout = int(os.environ.get("REQUEST_TIMEOUT", "90"))
self.max_retries = int(os.environ.get("MAX_RETRIES", "2"))
# Model settings - BIG and SMALL models
self.big_model = os.environ.get("BIG_MODEL", "gpt-4o")
self.small_model = os.environ.get("SMALL_MODEL", "gpt-4o-mini")
def validate_api_key(self):
"""Basic API key validation"""
if not self.openai_api_key:
return False
# Basic format check for OpenAI API keys
if not self.openai_api_key.startswith('sk-'):
return False
return True
try:
config = Config()
print(f" Configuration loaded: API_KEY={'*' * 20}..., BASE_URL='{config.openai_base_url}'")
except Exception as e:
print(f"=4 Configuration Error: {e}")
sys.exit(1)

29
src/core/constants.py Normal file
View File

@@ -0,0 +1,29 @@
# Constants for better maintainability
class Constants:
ROLE_USER = "user"
ROLE_ASSISTANT = "assistant"
ROLE_SYSTEM = "system"
ROLE_TOOL = "tool"
CONTENT_TEXT = "text"
CONTENT_IMAGE = "image"
CONTENT_TOOL_USE = "tool_use"
CONTENT_TOOL_RESULT = "tool_result"
TOOL_FUNCTION = "function"
STOP_END_TURN = "end_turn"
STOP_MAX_TOKENS = "max_tokens"
STOP_TOOL_USE = "tool_use"
STOP_ERROR = "error"
EVENT_MESSAGE_START = "message_start"
EVENT_MESSAGE_STOP = "message_stop"
EVENT_MESSAGE_DELTA = "message_delta"
EVENT_CONTENT_BLOCK_START = "content_block_start"
EVENT_CONTENT_BLOCK_STOP = "content_block_stop"
EVENT_CONTENT_BLOCK_DELTA = "content_block_delta"
EVENT_PING = "ping"
DELTA_TEXT = "text_delta"
DELTA_INPUT_JSON = "input_json_delta"

13
src/core/logging.py Normal file
View File

@@ -0,0 +1,13 @@
import logging
from src.core.config import config
# Logging Configuration
logging.basicConfig(
level=getattr(logging, config.log_level.upper()),
format='%(asctime)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# Configure uvicorn to be quieter
for uvicorn_logger in ["uvicorn", "uvicorn.access", "uvicorn.error"]:
logging.getLogger(uvicorn_logger).setLevel(logging.WARNING)

23
src/core/model_manager.py Normal file
View File

@@ -0,0 +1,23 @@
from src.core.config import config
class ModelManager:
def __init__(self, config):
self.config = config
def map_claude_model_to_openai(self, claude_model: str) -> str:
"""Map Claude model names to OpenAI model names based on BIG/SMALL pattern"""
# If it's already an OpenAI model, return as-is
if claude_model.startswith("gpt-") or claude_model.startswith("o1-"):
return claude_model
# Map based on model naming patterns
model_lower = claude_model.lower()
if 'haiku' in model_lower:
return self.config.small_model
elif 'sonnet' in model_lower or 'opus' in model_lower:
return self.config.big_model
else:
# Default to big model for unknown models
return self.config.big_model
model_manager = ModelManager(config)

61
src/main.py Normal file
View File

@@ -0,0 +1,61 @@
from fastapi import FastAPI
from src.api.endpoints import router as api_router
import uvicorn
import sys
from src.core.config import config
app = FastAPI(title="Claude-to-OpenAI API Proxy", version="1.0.0")
app.include_router(api_router)
def main():
if len(sys.argv) > 1 and sys.argv[1] == "--help":
print("Claude-to-OpenAI API Proxy v1.0.0")
print("")
print("Usage: python src/main.py")
print("")
print("Required environment variables:")
print(" OPENAI_API_KEY - Your OpenAI API key")
print("")
print("Optional environment variables:")
print(
f" OPENAI_BASE_URL - OpenAI API base URL (default: https://api.openai.com/v1)"
)
print(f" BIG_MODEL - Model for sonnet/opus requests (default: gpt-4o)")
print(f" SMALL_MODEL - Model for haiku requests (default: gpt-4o-mini)")
print(f" HOST - Server host (default: 0.0.0.0)")
print(f" PORT - Server port (default: 8082)")
print(f" LOG_LEVEL - Logging level (default: WARNING)")
print(f" MAX_TOKENS_LIMIT - Token limit (default: 4096)")
print(f" MIN_TOKENS_LIMIT - Minimum token limit (default: 100)")
print(f" REQUEST_TIMEOUT - Request timeout in seconds (default: 90)")
print("")
print("Model mapping:")
print(f" Claude haiku models -> {config.small_model}")
print(f" Claude sonnet/opus models -> {config.big_model}")
sys.exit(0)
# Configuration summary
print("🚀 Claude-to-OpenAI API Proxy v1.0.0")
print(f"✅ Configuration loaded successfully")
print(f" OpenAI Base URL: {config.openai_base_url}")
print(f" Big Model (sonnet/opus): {config.big_model}")
print(f" Small Model (haiku): {config.small_model}")
print(f" Max Tokens Limit: {config.max_tokens_limit}")
print(f" Request Timeout: {config.request_timeout}s")
print(f" Server: {config.host}:{config.port}")
print("")
# Start server
uvicorn.run(
"src.main:app",
host=config.host,
port=config.port,
log_level=config.log_level.lower(),
reload=True,
)
if __name__ == "__main__":
main()

60
src/models/claude.py Normal file
View File

@@ -0,0 +1,60 @@
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Union, Literal
class ClaudeContentBlockText(BaseModel):
type: Literal["text"]
text: str
class ClaudeContentBlockImage(BaseModel):
type: Literal["image"]
source: Dict[str, Any]
class ClaudeContentBlockToolUse(BaseModel):
type: Literal["tool_use"]
id: str
name: str
input: Dict[str, Any]
class ClaudeContentBlockToolResult(BaseModel):
type: Literal["tool_result"]
tool_use_id: str
content: Union[str, List[Dict[str, Any]], Dict[str, Any]]
class ClaudeSystemContent(BaseModel):
type: Literal["text"]
text: str
class ClaudeMessage(BaseModel):
role: Literal["user", "assistant"]
content: Union[str, List[Union[ClaudeContentBlockText, ClaudeContentBlockImage, ClaudeContentBlockToolUse, ClaudeContentBlockToolResult]]]
class ClaudeTool(BaseModel):
name: str
description: Optional[str] = None
input_schema: Dict[str, Any]
class ClaudeThinkingConfig(BaseModel):
enabled: bool = True
class ClaudeMessagesRequest(BaseModel):
model: str
max_tokens: int
messages: List[ClaudeMessage]
system: Optional[Union[str, List[ClaudeSystemContent]]] = None
stop_sequences: Optional[List[str]] = None
stream: Optional[bool] = False
temperature: Optional[float] = 1.0
top_p: Optional[float] = None
top_k: Optional[int] = None
metadata: Optional[Dict[str, Any]] = None
tools: Optional[List[ClaudeTool]] = None
tool_choice: Optional[Dict[str, Any]] = None
thinking: Optional[ClaudeThinkingConfig] = None
class ClaudeTokenCountRequest(BaseModel):
model: str
messages: List[ClaudeMessage]
system: Optional[Union[str, List[ClaudeSystemContent]]] = None
tools: Optional[List[ClaudeTool]] = None
thinking: Optional[ClaudeThinkingConfig] = None
tool_choice: Optional[Dict[str, Any]] = None

0
src/models/openai.py Normal file
View File

13
start_proxy.py Normal file
View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python3
"""Start Claude Code Proxy server."""
import sys
import os
# Add src to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from src.main import main
if __name__ == "__main__":
main()

130
test_cancellation.py Normal file
View File

@@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
Test script for HTTP request cancellation functionality.
This script demonstrates how client disconnection cancels ongoing requests.
"""
import asyncio
import httpx
import json
import time
async def test_non_streaming_cancellation():
"""Test cancellation for non-streaming requests."""
print("🧪 Testing non-streaming request cancellation...")
async with httpx.AsyncClient(timeout=30) as client:
try:
# Start a long-running request
task = asyncio.create_task(
client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1000,
"messages": [
{"role": "user", "content": "Write a very long story about a journey through space that takes at least 500 words."}
]
}
)
)
# Cancel after 2 seconds
await asyncio.sleep(2)
task.cancel()
try:
await task
print("❌ Request should have been cancelled")
except asyncio.CancelledError:
print("✅ Non-streaming request cancelled successfully")
except Exception as e:
print(f"❌ Non-streaming test error: {e}")
async def test_streaming_cancellation():
"""Test cancellation for streaming requests."""
print("\n🧪 Testing streaming request cancellation...")
async with httpx.AsyncClient(timeout=30) as client:
try:
# Start streaming request
async with client.stream(
"POST",
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1000,
"messages": [
{"role": "user", "content": "Write a very long story about a journey through space that takes at least 500 words."}
],
"stream": True
}
) as response:
if response.status_code == 200:
print("✅ Streaming request started successfully")
# Read a few chunks then simulate client disconnect
chunk_count = 0
async for line in response.aiter_lines():
if line.strip():
chunk_count += 1
print(f"📦 Received chunk {chunk_count}: {line[:100]}...")
# Simulate client disconnect after 3 chunks
if chunk_count >= 3:
print("🔌 Simulating client disconnect...")
break
print("✅ Streaming request cancelled successfully")
else:
print(f"❌ Streaming request failed: {response.status_code}")
except Exception as e:
print(f"❌ Streaming test error: {e}")
async def test_server_running():
"""Test if the server is running."""
print("🔍 Checking if server is running...")
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get("http://localhost:8082/health")
if response.status_code == 200:
print("✅ Server is running and healthy")
return True
else:
print(f"❌ Server health check failed: {response.status_code}")
return False
except Exception as e:
print(f"❌ Cannot connect to server: {e}")
print("💡 Make sure to start the server with: python start_proxy.py")
return False
async def main():
"""Main test function."""
print("🚀 Starting HTTP request cancellation tests")
print("=" * 50)
# Check if server is running
if not await test_server_running():
return
print("\n" + "=" * 50)
# Test non-streaming cancellation
await test_non_streaming_cancellation()
# Test streaming cancellation
await test_streaming_cancellation()
print("\n" + "=" * 50)
print("✅ All cancellation tests completed!")
print("\n💡 Note: The actual cancellation behavior depends on:")
print(" - Client implementation (httpx in this case)")
print(" - Network conditions")
print(" - Server response to client disconnection")
print(" - Whether the underlying OpenAI API supports cancellation")
if __name__ == "__main__":
asyncio.run(main())

265
tests/test_main.py Normal file
View File

@@ -0,0 +1,265 @@
"""Test script for Claude to OpenAI proxy."""
import asyncio
import json
import httpx
from dotenv import load_dotenv
load_dotenv()
async def test_basic_chat():
"""Test basic chat completion."""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Hello, how are you?"}
]
}
)
print("Basic chat response:")
print(json.dumps(response.json(), indent=2))
async def test_streaming_chat():
"""Test streaming chat completion."""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-haiku-20241022",
"max_tokens": 150,
"messages": [
{"role": "user", "content": "Tell me a short joke"}
],
"stream": True
}
) as response:
print("\nStreaming response:")
async for line in response.aiter_lines():
if line.strip():
print(line)
async def test_function_calling():
"""Test function calling capability."""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 200,
"messages": [
{"role": "user", "content": "What's the weather like in New York? Please use the weather function."}
],
"tools": [
{
"name": "get_weather",
"description": "Get the current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The location to get weather for"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
"required": ["location"]
}
}
],
"tool_choice": {"type": "auto"}
}
)
print("\nFunction calling response:")
print(json.dumps(response.json(), indent=2))
async def test_with_system_message():
"""Test with system message."""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 100,
"system": "You are a helpful assistant that always responds in haiku format.",
"messages": [
{"role": "user", "content": "Explain what AI is"}
]
}
)
print("\nSystem message response:")
print(json.dumps(response.json(), indent=2))
async def test_multimodal():
"""Test multimodal input (text + image)."""
async with httpx.AsyncClient() as client:
# Sample base64 image (1x1 pixel transparent PNG)
sample_image = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChAI9jU8PJAAAAASUVORK5CYII="
response = await client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 100,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What do you see in this image?"},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": sample_image
}
}
]
}
]
}
)
print("\nMultimodal response:")
print(json.dumps(response.json(), indent=2))
async def test_conversation_with_tool_use():
"""Test a complete conversation with tool use and results."""
async with httpx.AsyncClient() as client:
# First message with tool call
response1 = await client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 200,
"messages": [
{"role": "user", "content": "Calculate 25 * 4 using the calculator tool"}
],
"tools": [
{
"name": "calculator",
"description": "Perform basic arithmetic calculations",
"input_schema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to calculate"
}
},
"required": ["expression"]
}
}
]
}
)
print("\nTool call response:")
result1 = response1.json()
print(json.dumps(result1, indent=2))
# Simulate tool execution and send result
if result1.get("content"):
tool_use_blocks = [block for block in result1["content"] if block.get("type") == "tool_use"]
if tool_use_blocks:
tool_block = tool_use_blocks[0]
# Second message with tool result
response2 = await client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Calculate 25 * 4 using the calculator tool"},
{"role": "assistant", "content": result1["content"]},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_block["id"],
"content": "100"
}
]
}
]
}
)
print("\nTool result response:")
print(json.dumps(response2.json(), indent=2))
async def test_token_counting():
"""Test token counting endpoint."""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8082/v1/messages/count_tokens",
json={
"model": "claude-3-5-sonnet-20241022",
"messages": [
{"role": "user", "content": "This is a test message for token counting."}
]
}
)
print("\nToken count response:")
print(json.dumps(response.json(), indent=2))
async def test_health_and_connection():
"""Test health and connection endpoints."""
async with httpx.AsyncClient() as client:
# Health check
health_response = await client.get("http://localhost:8082/health")
print("\nHealth check:")
print(json.dumps(health_response.json(), indent=2))
# Connection test
connection_response = await client.get("http://localhost:8082/test-connection")
print("\nConnection test:")
print(json.dumps(connection_response.json(), indent=2))
async def main():
"""Run all tests."""
print("🧪 Testing Claude to OpenAI Proxy")
print("=" * 50)
try:
await test_health_and_connection()
await test_token_counting()
await test_basic_chat()
await test_with_system_message()
await test_streaming_chat()
await test_multimodal()
await test_function_calling()
await test_conversation_with_tool_use()
print("\n✅ All tests completed!")
except Exception as e:
print(f"\n❌ Test failed: {e}")
print("Make sure the server is running with a valid OPENAI_API_KEY")
if __name__ == "__main__":
asyncio.run(main())

1294
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff