import asyncio import socket import subprocess import time from contextlib import asynccontextmanager from typing import List from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from commons.mcp_manager import MCPHostManager from commons.logging_utils import setup_logger from commons.openai_utils import ChatMessage, ChatCompletionRequest, AVAILABLE_MODELS, generate_id, stream_response from commons.settings import settings # Setup logger logger = setup_logger() # Initialize the MCPHost manager mcp_manager = MCPHostManager() async def process_with_mcphost(messages: List[ChatMessage], model: str) -> str: """Process messages using MCPHost""" # Get the last user message last_user_message = next((msg.content for msg in reversed(messages) if msg.role == "user"), "") if not last_user_message: return "No user message found" # Send to MCPHost and get response response = await mcp_manager.send_prompt_async(last_user_message) return response @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan""" # Startup logger.info("=" * 50) logger.info("MCPHost OpenAI-compatible API Server v1.0") logger.info("Debug Mode: {}", "ON" if settings.debug else "OFF") logger.info("=" * 50) if not mcp_manager.start(): logger.error("Failed to start MCPHost") # You might want to exit or handle this differently else: logger.success("MCPHost started successfully") yield # Shutdown logger.info("Shutting down MCPHost...") mcp_manager.shutdown() logger.success("Shutdown complete") app = FastAPI(title="MCPHost OpenAI-compatible API", lifespan=lifespan) @app.get("/v1/models") async def list_models(): """List all available models""" return { "object": "list", "data": AVAILABLE_MODELS } @app.get("/v1/models/{model_id}") async def get_model(model_id: str): """Get details of a specific model""" model = next((m for m in AVAILABLE_MODELS if m["id"] == model_id), None) if not model: raise HTTPException(status_code=404, detail=f"Model {model_id} not found") return model @app.post("/v1/chat/completions") async def chat_completions(request: ChatCompletionRequest): # Validate model exists if not any(model["id"] == request.model for model in AVAILABLE_MODELS): raise HTTPException(status_code=404, detail=f"Model {request.model} not found") response_content = await process_with_mcphost(request.messages, request.model) if not request.stream: return { "id": f"chatcmpl-{generate_id()}", "object": "chat.completion", "created": int(time.time()), "model": request.model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": response_content }, "finish_reason": "stop" }], "usage": { "prompt_tokens": len(" ".join([msg.content for msg in request.messages]).split()), "completion_tokens": len(response_content.split()), "total_tokens": len(" ".join([msg.content for msg in request.messages]).split()) + len( response_content.split()) } } else: return StreamingResponse( stream_response(response_content, request.model), media_type="text/event-stream" ) @app.get("/") async def root(): return {"message": "MCPHost OpenAI-compatible API server. Visit /docs for documentation."} @app.get("/health") async def health_check(): return {"status": "healthy", "mcphost_alive": mcp_manager._is_alive()} @app.get("/restart") async def restart_service(): """Restart the llm-api-mcphost service using supervisorctl""" hostname = socket.gethostname() if hostname == "afsar": async def delayed_restart(): await asyncio.sleep(settings.restart_delay) try: # Use subprocess.Popen with start_new_session=True import subprocess process = subprocess.Popen( ["sudo", "supervisorctl", "restart", "llm-api-mcphost"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True # This detaches the process ) logger.info("Restart command initiated (detached)") except Exception as e: logger.error(f"Error initiating restart: {str(e)}") # Schedule the restart without waiting for it asyncio.create_task(delayed_restart()) logger.info("Service restart scheduled") return { "status": "scheduled", "message": "Service restart has been scheduled", "hostname": hostname, "note": f"Service will restart in {settings.restart_delay}s" } else: logger.info(f"Restart request ignored - hostname is '{hostname}', not 'afsar'") return { "status": "ignored", "message": "Service restart not performed - incorrect hostname", "hostname": hostname, "required_hostname": "afsar" } if __name__ == "__main__": import uvicorn uvicorn.run(app, host=settings.host, port=settings.port)