Files
mcphost-api/serve_mcphost_openai_compatible.py

128 lines
3.9 KiB
Python

import time
from contextlib import asynccontextmanager
from typing import List
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from commons.mcp_manager import MCPHostManager
from commons.logging_utils import setup_logger
from commons.openai_models import ChatMessage, ChatCompletionRequest, AVAILABLE_MODELS
from commons.openai_utils import generate_id, stream_response
from commons.settings import settings
# Setup logger
logger = setup_logger()
# Initialize the MCPHost manager
mcp_manager = MCPHostManager()
async def process_with_mcphost(messages: List[ChatMessage], model: str) -> str:
"""Process messages using MCPHost"""
# Get the last user message
last_user_message = next((msg.content for msg in reversed(messages) if msg.role == "user"), "")
if not last_user_message:
return "No user message found"
# Send to MCPHost and get response
response = await mcp_manager.send_prompt_async(last_user_message)
return response
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan"""
# Startup
logger.info("=" * 50)
logger.info("MCPHost OpenAI-compatible API Server v1.0")
logger.info("Debug Mode: {}", "ON" if settings.debug else "OFF")
logger.info("=" * 50)
if not mcp_manager.start():
logger.error("Failed to start MCPHost")
# You might want to exit or handle this differently
else:
logger.success("MCPHost started successfully")
yield
# Shutdown
logger.info("Shutting down MCPHost...")
mcp_manager.shutdown()
logger.success("Shutdown complete")
app = FastAPI(title="MCPHost OpenAI-compatible API", lifespan=lifespan)
@app.get("/v1/models")
async def list_models():
"""List all available models"""
return {
"object": "list",
"data": AVAILABLE_MODELS
}
@app.get("/v1/models/{model_id}")
async def get_model(model_id: str):
"""Get details of a specific model"""
model = next((m for m in AVAILABLE_MODELS if m["id"] == model_id), None)
if not model:
raise HTTPException(status_code=404, detail=f"Model {model_id} not found")
return model
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
# Validate model exists
if not any(model["id"] == request.model for model in AVAILABLE_MODELS):
raise HTTPException(status_code=404, detail=f"Model {request.model} not found")
response_content = await process_with_mcphost(request.messages, request.model)
if not request.stream:
return {
"id": f"chatcmpl-{generate_id()}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": response_content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": len(" ".join([msg.content for msg in request.messages]).split()),
"completion_tokens": len(response_content.split()),
"total_tokens": len(" ".join([msg.content for msg in request.messages]).split()) + len(
response_content.split())
}
}
else:
return StreamingResponse(
stream_response(response_content, request.model),
media_type="text/event-stream"
)
# Optional: Add a root endpoint that redirects to documentation
@app.get("/")
async def root():
return {"message": "MCPHost OpenAI-compatible API server. Visit /docs for documentation."}
# Optional: Add a health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy", "mcphost_alive": mcp_manager._is_alive()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=settings.host, port=settings.port)