diff --git a/.gitignore b/.gitignore
index ce2d5e6..bbba6c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,7 @@
.venv
venv
config.json
-mcphost
+bins/mcphost
mcphost_openai_api.log.*
settings.py
.idea
diff --git a/bins/.gitkeep b/bins/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/helpers/__init__.py b/helpers/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/helpers/response_cleaners.py b/helpers/response_cleaners.py
new file mode 100644
index 0000000..d8e0088
--- /dev/null
+++ b/helpers/response_cleaners.py
@@ -0,0 +1,112 @@
+import re
+from loguru import logger
+
+
+class Config:
+ """Configuration constants for response cleaning"""
+ # Patterns for cleaning debug output
+ ANSI_PATTERN = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+ TUI_BORDER = '┃'
+ SKIP_PATTERNS = ['alt+enter', 'Enter your prompt']
+ DEBUG_LOG_PATTERN = re.compile(r'^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \w+ <.*?>.*$')
+ THINKING_SPINNER_PATTERN = re.compile(r'[⣽⢿⡿⣟⣯⣷⣾⣻] Thinking\.\.\.')
+ ASSISTANT_MARKER = "Assistant:"
+ PROMPT_INDICATOR = "Enter your prompt"
+
+
+def clean_response(response: str, original_prompt: str) -> str:
+ """Clean and format MCP response"""
+ if not response:
+ return ""
+
+ # Debug log the raw response
+ logger.debug(f"Raw response before cleaning: {response}")
+
+ # Remove ANSI escape sequences
+ response = Config.ANSI_PATTERN.sub('', response)
+
+ # Look for the Assistant: marker and extract content after it
+ if Config.ASSISTANT_MARKER in response:
+ parts = response.split(Config.ASSISTANT_MARKER)
+ if len(parts) > 1:
+ assistant_section = parts[-1]
+
+ # Find the end of the assistant response
+ if Config.PROMPT_INDICATOR in assistant_section:
+ assistant_response = assistant_section.split(Config.PROMPT_INDICATOR)[0]
+ else:
+ assistant_response = assistant_section
+
+ # Clean and extract the response
+ return clean_assistant_section(assistant_response)
+
+ # Fallback to cleaning the entire response if no Assistant: marker found
+ return clean_entire_response(response, original_prompt)
+
+
+def clean_assistant_section(assistant_response: str) -> str:
+ """Clean the assistant section"""
+ lines = assistant_response.split('\n')
+ cleaned_lines = []
+
+ for line in lines:
+ stripped = line.strip()
+
+ # Skip empty lines
+ if not stripped:
+ continue
+
+ # Skip debug log lines
+ if Config.DEBUG_LOG_PATTERN.match(line):
+ continue
+
+ # Skip thinking spinner lines
+ if Config.THINKING_SPINNER_PATTERN.search(line):
+ continue
+
+ # Handle TUI borders
+ if stripped.startswith(Config.TUI_BORDER):
+ content = stripped.strip(Config.TUI_BORDER).strip()
+ if content:
+ cleaned_lines.append(content)
+ else:
+ cleaned_lines.append(stripped)
+
+ return '\n'.join(cleaned_lines).strip()
+
+
+def clean_entire_response(response: str, original_prompt: str) -> str:
+ """Clean the entire response when no Assistant: marker is found"""
+ lines = response.split('\n')
+ cleaned_lines = []
+
+ for line in lines:
+ stripped = line.strip()
+
+ # Skip empty lines and original prompt
+ if not stripped or stripped == original_prompt:
+ continue
+
+ # Skip debug log lines
+ if Config.DEBUG_LOG_PATTERN.match(line):
+ continue
+
+ # Skip thinking spinner lines
+ if Config.THINKING_SPINNER_PATTERN.search(line):
+ continue
+
+ # Handle TUI decorations
+ if stripped.startswith(Config.TUI_BORDER):
+ content = stripped.strip(Config.TUI_BORDER).strip()
+ if content and content != original_prompt:
+ cleaned_lines.append(content)
+ continue
+
+ # Skip navigation hints
+ if any(pattern in line for pattern in Config.SKIP_PATTERNS):
+ continue
+
+ # Add non-empty, non-decoration lines
+ cleaned_lines.append(stripped)
+
+ return '\n'.join(cleaned_lines).strip()
\ No newline at end of file
diff --git a/serve_mcphost_openai_compatible.py b/serve_mcphost_openai_compatible.py
new file mode 100644
index 0000000..c961b11
--- /dev/null
+++ b/serve_mcphost_openai_compatible.py
@@ -0,0 +1,421 @@
+from typing import List, Optional
+from pydantic import BaseModel
+import time
+import json
+import asyncio
+import uuid
+import sys
+import threading
+from fastapi import FastAPI, HTTPException
+from fastapi.responses import StreamingResponse
+from functools import wraps
+from contextlib import asynccontextmanager
+
+import pexpect
+from loguru import logger
+
+from settings import settings
+from helpers.response_cleaners import clean_response
+
+
+class Config:
+ """Configuration constants for MCPHost management"""
+ SPAWN_TIMEOUT = 60
+ ECHO_DELAY = 0.5
+ READ_TIMEOUT = 0.1
+ RESPONSE_WAIT_TIME = 1
+ CHUNK_SIZE = 1000
+ MAX_READ_SIZE = 10000
+ PROMPT_INDICATOR = "Enter your prompt"
+
+
+def log_performance(func):
+ """Decorator to log function performance"""
+
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ start_time = time.time()
+ logger.debug("Starting {}", func.__name__)
+
+ try:
+ result = func(*args, **kwargs)
+ duration = time.time() - start_time
+ logger.debug("{} completed in {:.2f}s", func.__name__, duration)
+ return result
+ except Exception as e:
+ duration = time.time() - start_time
+ logger.error("{} failed after {:.2f}s: {}", func.__name__, duration, str(e))
+ raise
+
+ return wrapper
+
+
+# Configure loguru
+logger.remove()
+logger.add(
+ sys.stderr,
+ level="DEBUG" if settings.debug else "INFO",
+ format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
+ colorize=True,
+ backtrace=True,
+ diagnose=True
+)
+
+
+class MCPHostManager:
+ """Manages MCP process lifecycle and communication"""
+
+ def __init__(self):
+ self.child: Optional[pexpect.spawn] = None
+ self.config = Config()
+ self.lock = threading.Lock()
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+
+ @log_performance
+ def start(self) -> bool:
+ """Start the mcphost process"""
+ command = self._build_command()
+ logger.info("Starting mcphost: {}", ' '.join(command))
+
+ try:
+ self.child = pexpect.spawn(
+ ' '.join(command),
+ timeout=self.config.SPAWN_TIMEOUT,
+ encoding='utf-8'
+ )
+ self.child.setecho(False)
+ return self._wait_for_ready()
+ except Exception as e:
+ logger.error("Error starting mcphost: {}", e)
+ logger.exception("Full traceback:")
+ return False
+
+ @log_performance
+ async def send_prompt_async(self, prompt: str) -> str:
+ """Send a prompt to mcphost and get the response (async wrapper)"""
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(None, self.send_prompt, prompt)
+
+ def send_prompt(self, prompt: str) -> str:
+ """Send a prompt to mcphost and get the response"""
+ with self.lock:
+ if not self._is_alive():
+ logger.warning("MCPHost not running, attempting to restart...")
+ if not self.start():
+ return "Error: Failed to restart MCPHost"
+
+ try:
+ self._clear_pending_output()
+ self._send_command(prompt)
+ response = self._collect_response()
+ return clean_response(response, prompt)
+ except Exception as e:
+ logger.exception("Exception in send_prompt: {}", str(e))
+ return f"Error: {str(e)}"
+
+ def shutdown(self):
+ """Shutdown mcphost gracefully"""
+ if self._is_alive():
+ logger.info("Shutting down mcphost...")
+ self.child.sendcontrol('c')
+ self.child.close()
+ logger.info("MCPHost shutdown complete")
+
+ def _is_alive(self) -> bool:
+ """Check if the process is running"""
+ return self.child is not None and self.child.isalive()
+
+ def _build_command(self) -> List[str]:
+ """Build the command to start mcphost"""
+ command = [
+ settings.mcphost_path,
+ '--config', settings.mcphost_config,
+ '--model', settings.mcphost_model,
+ '--openai-url', settings.openai_url,
+ '--openai-api-key', settings.openai_api_key
+ ]
+
+ if settings.debug:
+ command.insert(1, '--debug')
+
+ return command
+
+ def _wait_for_ready(self) -> bool:
+ """Wait for the process to be ready"""
+ try:
+ self.child.expect(self.config.PROMPT_INDICATOR)
+ logger.success("MCPHost started and ready")
+ self._clear_buffer()
+ return True
+ except Exception as e:
+ logger.error("Error waiting for prompt: {}", e)
+ return False
+
+ def _clear_buffer(self):
+ """Clear any remaining output in the buffer"""
+ time.sleep(self.config.ECHO_DELAY)
+ try:
+ self.child.read_nonblocking(
+ size=self.config.MAX_READ_SIZE,
+ timeout=self.config.READ_TIMEOUT
+ )
+ except:
+ pass
+
+ def _clear_pending_output(self):
+ """Clear any pending output from the process"""
+ try:
+ self.child.read_nonblocking(
+ size=self.config.MAX_READ_SIZE,
+ timeout=self.config.READ_TIMEOUT
+ )
+ except:
+ pass
+
+ def _send_command(self, prompt: str):
+ """Send a command to the process"""
+ logger.debug("Sending prompt: {}", prompt)
+ self.child.send(prompt)
+ self.child.send('\r')
+
+ # Wait for the model to process
+ time.sleep(self.config.RESPONSE_WAIT_TIME)
+
+ def _collect_response(self) -> str:
+ """Collect response from the process"""
+ response = ""
+ response_complete = False
+
+ with logger.catch(message="Error during response collection"):
+ while not response_complete:
+ try:
+ chunk = self.child.read_nonblocking(
+ size=self.config.CHUNK_SIZE,
+ timeout=3
+ )
+
+ if chunk:
+ response += chunk
+ logger.trace("Received chunk: {}", chunk[:50] + "..." if len(chunk) > 50 else chunk)
+ if self.config.PROMPT_INDICATOR in chunk:
+ response_complete = True
+ else:
+ break
+
+ except pexpect.TIMEOUT:
+ if response and self.config.PROMPT_INDICATOR in response:
+ response_complete = True
+ elif response:
+ logger.debug("Waiting for more response data...")
+ time.sleep(1)
+ continue
+ else:
+ break
+ except Exception as e:
+ logger.error("Error reading response: {}", e)
+ break
+
+ logger.debug("Collected response length: {} characters", len(response))
+ return response
+
+
+# Configuration for available models
+AVAILABLE_MODELS = [
+ {
+ "id": "mcphost-model",
+ "object": "model",
+ "created": 1686935002,
+ "owned_by": "mcphost",
+ "permission": [
+ {
+ "id": "modelcphost-" + str(uuid.uuid4())[:8],
+ "object": "model_permission",
+ "created": int(time.time()),
+ "allow_create_engine": False,
+ "allow_sampling": True,
+ "allow_logprobs": True,
+ "allow_search_indices": False,
+ "allow_view": True,
+ "allow_fine_tuning": False,
+ "organization": "*",
+ "group": None,
+ "is_blocking": False
+ }
+ ],
+ "root": "mcphost-model",
+ "parent": None
+ },
+]
+
+
+class ChatMessage(BaseModel):
+ role: str
+ content: str
+
+
+class ChatCompletionRequest(BaseModel):
+ model: str
+ messages: List[ChatMessage]
+ temperature: Optional[float] = 1.0
+ stream: Optional[bool] = False
+ max_tokens: Optional[int] = None
+
+
+# Initialize the MCPHost manager
+mcp_manager = MCPHostManager()
+
+
+async def process_with_mcphost(messages: List[ChatMessage], model: str) -> str:
+ """Process messages using MCPHost"""
+ # Get the last user message
+ last_user_message = next((msg.content for msg in reversed(messages) if msg.role == "user"), "")
+
+ if not last_user_message:
+ return "No user message found"
+
+ # Send to MCPHost and get response
+ response = await mcp_manager.send_prompt_async(last_user_message)
+ return response
+
+
+def generate_id() -> str:
+ return str(uuid.uuid4())[:8]
+
+
+async def stream_response(content: str, model: str):
+ words = content.split()
+
+ for i, word in enumerate(words):
+ chunk = {
+ "id": f"chatcmpl-{generate_id()}",
+ "object": "chat.completion.chunk",
+ "created": int(time.time()),
+ "model": model,
+ "choices": [{
+ "index": 0,
+ "delta": {
+ "content": word + " "
+ },
+ "finish_reason": None
+ }]
+ }
+ yield f"data: {json.dumps(chunk)}\n\n"
+ await asyncio.sleep(0.1)
+
+ final_chunk = {
+ "id": f"chatcmpl-{generate_id()}",
+ "object": "chat.completion.chunk",
+ "created": int(time.time()),
+ "model": model,
+ "choices": [{
+ "index": 0,
+ "delta": {},
+ "finish_reason": "stop"
+ }]
+ }
+ yield f"data: {json.dumps(final_chunk)}\n\n"
+ yield "data: [DONE]\n\n"
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Manage application lifespan"""
+ # Startup
+ logger.info("=" * 50)
+ logger.info("MCPHost OpenAI-compatible API Server v1.0")
+ logger.info("Debug Mode: {}", "ON" if settings.debug else "OFF")
+ logger.info("=" * 50)
+
+ if not mcp_manager.start():
+ logger.error("Failed to start MCPHost")
+ # You might want to exit or handle this differently
+ else:
+ logger.success("MCPHost started successfully")
+
+ yield
+
+ # Shutdown
+ logger.info("Shutting down MCPHost...")
+ mcp_manager.shutdown()
+ logger.success("Shutdown complete")
+
+
+app = FastAPI(title="MCPHost OpenAI-compatible API", lifespan=lifespan)
+
+
+@app.get("/v1/models")
+async def list_models():
+ """List all available models"""
+ return {
+ "object": "list",
+ "data": AVAILABLE_MODELS
+ }
+
+
+@app.get("/v1/models/{model_id}")
+async def get_model(model_id: str):
+ """Get details of a specific model"""
+ model = next((m for m in AVAILABLE_MODELS if m["id"] == model_id), None)
+ if not model:
+ raise HTTPException(status_code=404, detail=f"Model {model_id} not found")
+ return model
+
+
+@app.post("/v1/chat/completions")
+async def chat_completions(request: ChatCompletionRequest):
+ # Validate model exists
+ if not any(model["id"] == request.model for model in AVAILABLE_MODELS):
+ raise HTTPException(status_code=404, detail=f"Model {request.model} not found")
+
+ response_content = await process_with_mcphost(request.messages, request.model)
+
+ if not request.stream:
+ return {
+ "id": f"chatcmpl-{generate_id()}",
+ "object": "chat.completion",
+ "created": int(time.time()),
+ "model": request.model,
+ "choices": [{
+ "index": 0,
+ "message": {
+ "role": "assistant",
+ "content": response_content
+ },
+ "finish_reason": "stop"
+ }],
+ "usage": {
+ "prompt_tokens": len(" ".join([msg.content for msg in request.messages]).split()),
+ "completion_tokens": len(response_content.split()),
+ "total_tokens": len(" ".join([msg.content for msg in request.messages]).split()) + len(
+ response_content.split())
+ }
+ }
+ else:
+ return StreamingResponse(
+ stream_response(response_content, request.model),
+ media_type="text/event-stream"
+ )
+
+
+# Optional: Add a root endpoint that redirects to documentation
+@app.get("/")
+async def root():
+ return {"message": "MCPHost OpenAI-compatible API server. Visit /docs for documentation."}
+
+
+# Optional: Add a health check endpoint
+@app.get("/health")
+async def health_check():
+ return {"status": "healthy", "mcphost_alive": mcp_manager._is_alive()}
+
+
+if __name__ == "__main__":
+ import uvicorn
+
+ uvicorn.run(app, host="0.0.0.0", port=8000)
\ No newline at end of file
diff --git a/serve.py b/serve_most_simple.py
similarity index 71%
rename from serve.py
rename to serve_most_simple.py
index b30b15a..6f6b3b0 100644
--- a/serve.py
+++ b/serve_most_simple.py
@@ -62,72 +62,55 @@ class Config:
SKIP_PATTERNS = ['alt+enter', 'Enter your prompt']
-class ResponseCleaner:
- """Handles cleaning and formatting of MCP responses"""
+def clean_response(response: str, original_prompt: str) -> str:
+ """Clean and format MCP response"""
+ if not response:
+ return ""
- def __init__(self):
- self.ansi_pattern = Config.ANSI_PATTERN
- self.tui_border = Config.TUI_BORDER
- self.skip_patterns = Config.SKIP_PATTERNS
+ # Remove ANSI escape sequences
+ response = Config.ANSI_PATTERN.sub('', response)
- def clean(self, response: str, original_prompt: str) -> str:
- """Clean response with clear steps"""
- if not response:
- return ""
+ lines = response.split('\n')
+ cleaned_lines = []
- response = self._remove_ansi(response)
- lines = self._extract_content_lines(response, original_prompt)
- return '\n'.join(lines)
-
- def _remove_ansi(self, text: str) -> str:
- """Remove ANSI escape sequences"""
- return self.ansi_pattern.sub('', text)
-
- def _extract_content_lines(self, response: str, original_prompt: str) -> List[str]:
- """Extract meaningful content lines from response"""
- lines = response.split('\n')
- cleaned_lines = []
-
- for line in lines:
- cleaned_line = self._process_line(line, original_prompt)
- if cleaned_line is not None:
- cleaned_lines.append(cleaned_line)
-
- return cleaned_lines
-
- def _process_line(self, line: str, original_prompt: str) -> Optional[str]:
- """Process a single line and return cleaned content or None to skip"""
+ for line in lines:
stripped = line.strip()
- # Skip empty lines
- if not stripped:
- return None
-
- # Skip the original prompt
- if stripped == original_prompt:
- return None
+ # Skip empty lines and original prompt
+ if not stripped or stripped == original_prompt:
+ continue
# Handle TUI decorations
- if stripped.startswith(self.tui_border):
- content = stripped.strip(self.tui_border).strip()
+ if stripped.startswith(Config.TUI_BORDER):
+ content = stripped.strip(Config.TUI_BORDER).strip()
if content and content != original_prompt:
- return content
- return None
+ cleaned_lines.append(content)
+ continue
# Skip navigation hints
- if any(pattern in line for pattern in self.skip_patterns):
- return None
+ if any(pattern in line for pattern in Config.SKIP_PATTERNS):
+ continue
- # Return non-empty, non-decoration lines
- return stripped
+ # Add non-empty, non-decoration lines
+ cleaned_lines.append(stripped)
+
+ return '\n'.join(cleaned_lines)
-class ProcessManager:
- """Manages the MCP process lifecycle"""
+class MCPHostManager:
+ """Manages MCP process lifecycle and communication"""
def __init__(self):
self.child: Optional[pexpect.spawn] = None
self.config = Config()
+ self.lock = threading.Lock()
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
@log_performance
def start(self) -> bool:
@@ -141,25 +124,43 @@ class ProcessManager:
timeout=self.config.SPAWN_TIMEOUT,
encoding='utf-8'
)
- self._configure_process()
+ self.child.setecho(False)
return self._wait_for_ready()
except Exception as e:
logger.error("Error starting mcphost: {}", e)
logger.exception("Full traceback:")
return False
- def is_alive(self) -> bool:
- """Check if the process is running"""
- return self.child is not None and self.child.isalive()
+ @log_performance
+ def send_prompt(self, prompt: str) -> str:
+ """Send a prompt to mcphost and get the response"""
+ with self.lock:
+ if not self._is_alive():
+ logger.warning("MCPHost not running, attempting to restart...")
+ if not self.start():
+ return "Error: Failed to restart MCPHost"
+
+ try:
+ self._clear_pending_output()
+ self._send_command(prompt)
+ response = self._collect_response()
+ return clean_response(response, prompt)
+ except Exception as e:
+ logger.exception("Exception in send_prompt: {}", str(e))
+ return f"Error: {str(e)}"
def shutdown(self):
"""Shutdown mcphost gracefully"""
- if self.is_alive():
+ if self._is_alive():
logger.info("Shutting down mcphost...")
self.child.sendcontrol('c')
self.child.close()
logger.info("MCPHost shutdown complete")
+ def _is_alive(self) -> bool:
+ """Check if the process is running"""
+ return self.child is not None and self.child.isalive()
+
def _build_command(self) -> List[str]:
"""Build the command to start mcphost"""
command = [
@@ -175,10 +176,6 @@ class ProcessManager:
return command
- def _configure_process(self):
- """Configure the spawned process"""
- self.child.setecho(False)
-
def _wait_for_ready(self) -> bool:
"""Wait for the process to be ready"""
try:
@@ -201,29 +198,10 @@ class ProcessManager:
except:
pass
-
-class MCPCommunicator:
- """Handles communication with the MCP process"""
-
- def __init__(self, process: ProcessManager, cleaner: ResponseCleaner):
- self.process = process
- self.cleaner = cleaner
- self.config = Config()
-
- def send_prompt(self, prompt: str) -> str:
- """Send a prompt and receive response"""
- if not self.process.is_alive():
- raise RuntimeError("MCP process is not running")
-
- self._clear_pending_output()
- self._send_command(prompt)
- response = self._collect_response()
- return self.cleaner.clean(response, prompt)
-
def _clear_pending_output(self):
"""Clear any pending output from the process"""
try:
- self.process.child.read_nonblocking(
+ self.child.read_nonblocking(
size=self.config.MAX_READ_SIZE,
timeout=self.config.READ_TIMEOUT
)
@@ -233,8 +211,8 @@ class MCPCommunicator:
def _send_command(self, prompt: str):
"""Send a command to the process"""
logger.debug("Sending prompt: {}", prompt)
- self.process.child.send(prompt)
- self.process.child.send('\r')
+ self.child.send(prompt)
+ self.child.send('\r')
# Wait for the model to process
time.sleep(self.config.RESPONSE_WAIT_TIME)
@@ -247,7 +225,7 @@ class MCPCommunicator:
with logger.catch(message="Error during response collection"):
while not response_complete:
try:
- chunk = self.process.child.read_nonblocking(
+ chunk = self.child.read_nonblocking(
size=self.config.CHUNK_SIZE,
timeout=3
)
@@ -277,46 +255,6 @@ class MCPCommunicator:
return response
-class MCPHostManager:
- """Main manager that orchestrates process and communication"""
-
- def __init__(self):
- self.process = ProcessManager()
- self.cleaner = ResponseCleaner()
- self.communicator = MCPCommunicator(self.process, self.cleaner)
- self.lock = threading.Lock()
-
- def __enter__(self):
- self.start()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.shutdown()
-
- def start(self) -> bool:
- """Start the MCP host"""
- return self.process.start()
-
- @log_performance
- def send_prompt(self, prompt: str) -> str:
- """Send a prompt to mcphost and get the response"""
- with self.lock:
- if not self.process.is_alive():
- logger.warning("MCPHost not running, attempting to restart...")
- if not self.start():
- return "Error: Failed to restart MCPHost"
-
- try:
- return self.communicator.send_prompt(prompt)
- except Exception as e:
- logger.exception("Exception in send_prompt: {}", str(e))
- return f"Error: {str(e)}"
-
- def shutdown(self):
- """Shutdown mcphost gracefully"""
- self.process.shutdown()
-
-
class SubprocessHandler(BaseHTTPRequestHandler):
"""HTTP request handler with dependency injection"""
@@ -393,30 +331,27 @@ def create_server(manager: MCPHostManager) -> HTTPServer:
@logger.catch
def main():
- """Main function with clean structure"""
+ """Simple and clean main function"""
# Startup banner
logger.info("=" * 50)
logger.info("MCP Host Server v1.0")
logger.info("Debug Mode: {}", "ON" if settings.debug else "OFF")
logger.info("=" * 50)
- logger.info("Initializing MCPHost...")
-
- with MCPHostManager() as manager:
- server = create_server(manager)
-
- try:
+ try:
+ with MCPHostManager() as manager:
+ server = create_server(manager)
logger.success("Server started at {}:{}", settings.host, settings.port)
logger.info("Ready to accept requests.")
logger.info("Press Ctrl+C to shutdown")
server.serve_forever()
- except KeyboardInterrupt:
- logger.info("Received shutdown signal, shutting down gracefully...")
- except Exception as e:
- logger.error("Server error: {}", e)
- logger.exception("Full traceback:")
- finally:
- logger.info("Shutting down server...")
+ except KeyboardInterrupt:
+ logger.info("Shutdown signal received")
+ except Exception as e:
+ logger.error("Fatal error: {}", e)
+ sys.exit(1)
+ finally:
+ if 'server' in locals():
server.shutdown()
logger.success("Server shutdown complete")
diff --git a/serve_most_simple_openai_compatible.py b/serve_most_simple_openai_compatible.py
new file mode 100644
index 0000000..bac867e
--- /dev/null
+++ b/serve_most_simple_openai_compatible.py
@@ -0,0 +1,171 @@
+from typing import List, Optional
+from pydantic import BaseModel
+import time
+import json
+import asyncio
+import uuid
+from fastapi import FastAPI, HTTPException
+from fastapi.responses import StreamingResponse
+
+app = FastAPI(title="OpenAI-compatible API")
+
+# Configuration for available models
+AVAILABLE_MODELS = [
+ {
+ "id": "dummy-model",
+ "object": "model",
+ "created": 1686935002,
+ "owned_by": "alihan",
+ "permission": [
+ {
+ "id": "modeldummy-" + str(uuid.uuid4())[:8],
+ "object": "model_permission",
+ "created": int(time.time()),
+ "allow_create_engine": False,
+ "allow_sampling": True,
+ "allow_logprobs": True,
+ "allow_search_indices": False,
+ "allow_view": True,
+ "allow_fine_tuning": False,
+ "organization": "*",
+ "group": None,
+ "is_blocking": False
+ }
+ ],
+ "root": "dummy-model",
+ "parent": None
+ },
+]
+
+
+class ChatMessage(BaseModel):
+ role: str
+ content: str
+
+
+class ChatCompletionRequest(BaseModel):
+ model: str
+ messages: List[ChatMessage]
+ temperature: Optional[float] = 1.0
+ stream: Optional[bool] = False
+ max_tokens: Optional[int] = None
+
+
+async def process_with_your_model(messages: List[ChatMessage], model: str) -> str:
+ """
+ Replace this with your actual model processing logic.
+ You might want to route to different models based on the model parameter.
+ """
+ last_user_message = next((msg.content for msg in reversed(messages) if msg.role == "user"), "")
+ return f"Response from {model}: {last_user_message}"
+
+
+def generate_id() -> str:
+ return str(uuid.uuid4())[:8]
+
+
+async def stream_response(content: str, model: str):
+ words = content.split()
+
+ for i, word in enumerate(words):
+ chunk = {
+ "id": f"chatcmpl-{generate_id()}",
+ "object": "chat.completion.chunk",
+ "created": int(time.time()),
+ "model": model,
+ "choices": [{
+ "index": 0,
+ "delta": {
+ "content": word + " "
+ },
+ "finish_reason": None
+ }]
+ }
+ yield f"data: {json.dumps(chunk)}\n\n"
+ await asyncio.sleep(0.1)
+
+ final_chunk = {
+ "id": f"chatcmpl-{generate_id()}",
+ "object": "chat.completion.chunk",
+ "created": int(time.time()),
+ "model": model,
+ "choices": [{
+ "index": 0,
+ "delta": {},
+ "finish_reason": "stop"
+ }]
+ }
+ yield f"data: {json.dumps(final_chunk)}\n\n"
+ yield "data: [DONE]\n\n"
+
+
+@app.get("/v1/models")
+async def list_models():
+ """List all available models"""
+ return {
+ "object": "list",
+ "data": AVAILABLE_MODELS
+ }
+
+
+@app.get("/v1/models/{model_id}")
+async def get_model(model_id: str):
+ """Get details of a specific model"""
+ model = next((m for m in AVAILABLE_MODELS if m["id"] == model_id), None)
+ if not model:
+ raise HTTPException(status_code=404, detail=f"Model {model_id} not found")
+ return model
+
+
+@app.post("/v1/chat/completions")
+async def chat_completions(request: ChatCompletionRequest):
+ # Validate model exists
+ if not any(model["id"] == request.model for model in AVAILABLE_MODELS):
+ raise HTTPException(status_code=404, detail=f"Model {request.model} not found")
+
+ response_content = await process_with_your_model(request.messages, request.model)
+
+ if not request.stream:
+ return {
+ "id": f"chatcmpl-{generate_id()}",
+ "object": "chat.completion",
+ "created": int(time.time()),
+ "model": request.model,
+ "choices": [{
+ "index": 0,
+ "message": {
+ "role": "assistant",
+ "content": response_content
+ },
+ "finish_reason": "stop"
+ }],
+ "usage": {
+ "prompt_tokens": len(" ".join([msg.content for msg in request.messages]).split()),
+ "completion_tokens": len(response_content.split()),
+ "total_tokens": len(" ".join([msg.content for msg in request.messages]).split()) + len(
+ response_content.split())
+ }
+ }
+ else:
+ return StreamingResponse(
+ stream_response(response_content, request.model),
+ media_type="text/event-stream"
+ )
+
+
+# Optional: Add a root endpoint that redirects to documentation
+@app.get("/")
+async def root():
+ return {"message": "OpenAI-compatible API server. Visit /docs for documentation."}
+
+
+# Optional: Add a health check endpoint
+@app.get("/health")
+async def health_check():
+ return {"status": "healthy"}
+
+
+if __name__ == "__main__":
+ import uvicorn
+
+ uvicorn.run(app, host="0.0.0.0", port=8000)
\ No newline at end of file
diff --git a/test.sh b/test.sh
index 02ef0a8..c6a9f96 100755
--- a/test.sh
+++ b/test.sh
@@ -2,16 +2,24 @@
#clear
-curl -X POST \
- -H "Content-Type: plain/text" \
- -d "When is your knowledge cut off? /no_think" \
- http://localhost:8000
+#curl -X POST \
+# -H "Content-Type: plain/text" \
+# -d "When is your knowledge cut off? /no_think" \
+# http://localhost:8000
-#curl -X POST http://0.0.0.0:8000/v1/chat/completions -H "Content-Type: application/json" -H "Authorization: Bearer fake-api-key" -d '{
-# "model": "mcphost-model",
-# "messages": [
-# {"role": "system", "content": "You are a helpful assistant."},
-# {"role": "user", "content": "Tell me a joke."}
-# ],
-# "temperature": 0.7
-# }'
+curl -X 'POST' \
+ 'http://0.0.0.0:8000/v1/chat/completions' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "model": "mcphost-model",
+ "messages": [
+ {
+ "role": "user",
+ "content": "can you give me your previous answer in JSON format? /no_think"
+ }
+ ],
+ "temperature": 0.7,
+ "stream": false,
+ "max_tokens": 1024
+}'