write README.md and seperate classes to different files for ease of techdebt management

This commit is contained in:
TCUDIKEL
2025-05-10 19:30:23 +03:00
parent a2f4f6899f
commit 52343a2b0a
8 changed files with 418 additions and 301 deletions

90
README.md Normal file
View File

@@ -0,0 +1,90 @@
# MCPHost Server
A lightweight server providing an OpenAI-compatible API interface to interact with MCPHost.
## Prerequisites
- MCPHost binary
## Installation
```bash
# Clone the repository
git clone <repository-url>
cd <repository-directory>
# Install dependencies
pip install -r requirements.txt
```
## Configuration
Configure the application by setting the following environment variables or updating `settings.py`:
- `MCPHOST_PATH`: Path to the MCPHost binary
- `MCPHOST_CONFIG`: Path to MCPHost configuration file
- `MCPHOST_MODEL`: Model to use with MCPHost
- `HOST`: Host to bind the server to (default: 0.0.0.0)
- `PORT`: Port to run the server on (default: 8000)
- `DEBUG`: Enable debug mode (true/false)
## Usage
Start the OpenAI-compatible API server:
```bash
python serve_mcphost_openai_compatible.py
```
Or using uvicorn directly:
```bash
uvicorn serve_mcphost_openai_compatible:app --host 0.0.0.0 --port 8000
```
## API Endpoints
- `GET /v1/models`: List available models
- `GET /v1/models/{model_id}`: Get details of a specific model
- `POST /v1/chat/completions`: Send a chat completion request
- `GET /health`: Check server health
- `GET /`: Root endpoint with API information
## Testing with curl
List available models:
```bash
curl http://localhost:8000/v1/models
```
Get model details:
```bash
curl http://localhost:8000/v1/models/mcphost-model
```
Send a chat completion request:
```bash
curl -X POST http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "mcphost-model",
"messages": [{"role": "user", "content": "Hello, how are you?"}],
"stream": false
}'
```
Stream a chat completion request:
```bash
curl -X POST http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "mcphost-model",
"messages": [{"role": "user", "content": "Tell me a short story"}],
"stream": true
}'
```
Check server health:
```bash
curl http://localhost:8000/health
```

43
commons/logging_utils.py Normal file
View File

@@ -0,0 +1,43 @@
import sys
import time
from functools import wraps
from loguru import logger
from settings import settings
def log_performance(func):
"""Decorator to log function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
logger.debug("Starting {}", func.__name__)
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logger.debug("{} completed in {:.2f}s", func.__name__, duration)
return result
except Exception as e:
duration = time.time() - start_time
logger.error("{} failed after {:.2f}s: {}", func.__name__, duration, str(e))
raise
return wrapper
def setup_logger():
"""Configure and setup logger"""
logger.remove() # Remove default handler
logger.add(
sys.stderr,
level="DEBUG" if settings.debug else "INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
colorize=True,
backtrace=True,
diagnose=True
)
return logger

184
commons/mcp_manager.py Normal file
View File

@@ -0,0 +1,184 @@
import time
import threading
import asyncio
from typing import List, Optional
import pexpect
from loguru import logger
from settings import settings
from commons.response_cleaners import clean_response
from commons.logging_utils import log_performance
class Config:
"""Configuration constants for MCPHost management"""
SPAWN_TIMEOUT = 60
ECHO_DELAY = 0.5
READ_TIMEOUT = 0.1
RESPONSE_WAIT_TIME = 1
CHUNK_SIZE = 1000
MAX_READ_SIZE = 10000
PROMPT_INDICATOR = "Enter your prompt"
class MCPHostManager:
"""Manages MCP process lifecycle and communication"""
def __init__(self):
self.child: Optional[pexpect.spawn] = None
self.config = Config()
self.lock = threading.Lock()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
@log_performance
def start(self) -> bool:
"""Start the mcphost process"""
command = self._build_command()
logger.info("Starting mcphost: {}", ' '.join(command))
try:
self.child = pexpect.spawn(
' '.join(command),
timeout=self.config.SPAWN_TIMEOUT,
encoding='utf-8'
)
self.child.setecho(False)
return self._wait_for_ready()
except Exception as e:
logger.error("Error starting mcphost: {}", e)
logger.exception("Full traceback:")
return False
@log_performance
async def send_prompt_async(self, prompt: str) -> str:
"""Send a prompt to mcphost and get the response (async wrapper)"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.send_prompt, prompt)
def send_prompt(self, prompt: str) -> str:
"""Send a prompt to mcphost and get the response"""
with self.lock:
if not self._is_alive():
logger.warning("MCPHost not running, attempting to restart...")
if not self.start():
return "Error: Failed to restart MCPHost"
try:
self._clear_pending_output()
self._send_command(prompt)
response = self._collect_response()
return clean_response(response, prompt)
except Exception as e:
logger.exception("Exception in send_prompt: {}", str(e))
return f"Error: {str(e)}"
def shutdown(self):
"""Shutdown mcphost gracefully"""
if self._is_alive():
logger.info("Shutting down mcphost...")
self.child.sendcontrol('c')
self.child.close()
logger.info("MCPHost shutdown complete")
def _is_alive(self) -> bool:
"""Check if the process is running"""
return self.child is not None and self.child.isalive()
def _build_command(self) -> List[str]:
"""Build the command to start mcphost"""
command = [
settings.mcphost_path,
'--config', settings.mcphost_config,
'--model', settings.mcphost_model,
'--openai-url', settings.openai_url,
'--openai-api-key', settings.openai_api_key
]
if settings.debug:
command.insert(1, '--debug')
return command
def _wait_for_ready(self) -> bool:
"""Wait for the process to be ready"""
try:
self.child.expect(self.config.PROMPT_INDICATOR)
logger.success("MCPHost started and ready")
self._clear_buffer()
return True
except Exception as e:
logger.error("Error waiting for prompt: {}", e)
return False
def _clear_buffer(self):
"""Clear any remaining output in the buffer"""
time.sleep(self.config.ECHO_DELAY)
try:
self.child.read_nonblocking(
size=self.config.MAX_READ_SIZE,
timeout=self.config.READ_TIMEOUT
)
except:
pass
def _clear_pending_output(self):
"""Clear any pending output from the process"""
try:
self.child.read_nonblocking(
size=self.config.MAX_READ_SIZE,
timeout=self.config.READ_TIMEOUT
)
except:
pass
def _send_command(self, prompt: str):
"""Send a command to the process"""
logger.debug("Sending prompt: {}", prompt)
self.child.send(prompt)
self.child.send('\r')
# Wait for the model to process
time.sleep(self.config.RESPONSE_WAIT_TIME)
def _collect_response(self) -> str:
"""Collect response from the process"""
response = ""
response_complete = False
with logger.catch(message="Error during response collection"):
while not response_complete:
try:
chunk = self.child.read_nonblocking(
size=self.config.CHUNK_SIZE,
timeout=3
)
if chunk:
response += chunk
logger.trace("Received chunk: {}", chunk[:50] + "..." if len(chunk) > 50 else chunk)
if self.config.PROMPT_INDICATOR in chunk:
response_complete = True
else:
break
except pexpect.TIMEOUT:
if response and self.config.PROMPT_INDICATOR in response:
response_complete = True
elif response:
logger.debug("Waiting for more response data...")
time.sleep(1)
continue
else:
break
except Exception as e:
logger.error("Error reading response: {}", e)
break
logger.debug("Collected response length: {} characters", len(response))
return response

46
commons/openai_models.py Normal file
View File

@@ -0,0 +1,46 @@
import time
import uuid
from typing import List, Optional
from pydantic import BaseModel
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
temperature: Optional[float] = 1.0
stream: Optional[bool] = False
max_tokens: Optional[int] = None
# Configuration for available models
AVAILABLE_MODELS = [
{
"id": "mcphost-model",
"object": "model",
"created": 1686935002,
"owned_by": "mcphost",
"permission": [
{
"id": "modelcphost-" + str(uuid.uuid4())[:8],
"object": "model_permission",
"created": int(time.time()),
"allow_create_engine": False,
"allow_sampling": True,
"allow_logprobs": True,
"allow_search_indices": False,
"allow_view": True,
"allow_fine_tuning": False,
"organization": "*",
"group": None,
"is_blocking": False
}
],
"root": "mcphost-model",
"parent": None
},
]

45
commons/openai_utils.py Normal file
View File

@@ -0,0 +1,45 @@
import json
import time
import uuid
import asyncio
def generate_id() -> str:
"""Generate a unique ID for responses"""
return str(uuid.uuid4())[:8]
async def stream_response(content: str, model: str):
"""Stream response content in OpenAI format"""
words = content.split()
for i, word in enumerate(words):
chunk = {
"id": f"chatcmpl-{generate_id()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": word + " "
},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.1)
final_chunk = {
"id": f"chatcmpl-{generate_id()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"

View File

@@ -1,271 +1,20 @@
from typing import List, Optional
from pydantic import BaseModel
import time
import json
import asyncio
import uuid
import sys
import threading
import time
import json
from contextlib import asynccontextmanager
from typing import List
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from functools import wraps
from contextlib import asynccontextmanager
import pexpect
from loguru import logger
from commons.mcp_manager import MCPHostManager
from commons.logging_utils import setup_logger
from commons.openai_models import ChatMessage, ChatCompletionRequest, AVAILABLE_MODELS
from commons.openai_utils import generate_id, stream_response
from settings import settings
from helpers.response_cleaners import clean_response
class Config:
"""Configuration constants for MCPHost management"""
SPAWN_TIMEOUT = 60
ECHO_DELAY = 0.5
READ_TIMEOUT = 0.1
RESPONSE_WAIT_TIME = 1
CHUNK_SIZE = 1000
MAX_READ_SIZE = 10000
PROMPT_INDICATOR = "Enter your prompt"
def log_performance(func):
"""Decorator to log function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
logger.debug("Starting {}", func.__name__)
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logger.debug("{} completed in {:.2f}s", func.__name__, duration)
return result
except Exception as e:
duration = time.time() - start_time
logger.error("{} failed after {:.2f}s: {}", func.__name__, duration, str(e))
raise
return wrapper
# Configure loguru
logger.remove()
logger.add(
sys.stderr,
level="DEBUG" if settings.debug else "INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
colorize=True,
backtrace=True,
diagnose=True
)
class MCPHostManager:
"""Manages MCP process lifecycle and communication"""
def __init__(self):
self.child: Optional[pexpect.spawn] = None
self.config = Config()
self.lock = threading.Lock()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
@log_performance
def start(self) -> bool:
"""Start the mcphost process"""
command = self._build_command()
logger.info("Starting mcphost: {}", ' '.join(command))
try:
self.child = pexpect.spawn(
' '.join(command),
timeout=self.config.SPAWN_TIMEOUT,
encoding='utf-8'
)
self.child.setecho(False)
return self._wait_for_ready()
except Exception as e:
logger.error("Error starting mcphost: {}", e)
logger.exception("Full traceback:")
return False
@log_performance
async def send_prompt_async(self, prompt: str) -> str:
"""Send a prompt to mcphost and get the response (async wrapper)"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.send_prompt, prompt)
def send_prompt(self, prompt: str) -> str:
"""Send a prompt to mcphost and get the response"""
with self.lock:
if not self._is_alive():
logger.warning("MCPHost not running, attempting to restart...")
if not self.start():
return "Error: Failed to restart MCPHost"
try:
self._clear_pending_output()
self._send_command(prompt)
response = self._collect_response()
return clean_response(response, prompt)
except Exception as e:
logger.exception("Exception in send_prompt: {}", str(e))
return f"Error: {str(e)}"
def shutdown(self):
"""Shutdown mcphost gracefully"""
if self._is_alive():
logger.info("Shutting down mcphost...")
self.child.sendcontrol('c')
self.child.close()
logger.info("MCPHost shutdown complete")
def _is_alive(self) -> bool:
"""Check if the process is running"""
return self.child is not None and self.child.isalive()
def _build_command(self) -> List[str]:
"""Build the command to start mcphost"""
command = [
settings.mcphost_path,
'--config', settings.mcphost_config,
'--model', settings.mcphost_model,
'--openai-url', settings.openai_url,
'--openai-api-key', settings.openai_api_key
]
if settings.debug:
command.insert(1, '--debug')
return command
def _wait_for_ready(self) -> bool:
"""Wait for the process to be ready"""
try:
self.child.expect(self.config.PROMPT_INDICATOR)
logger.success("MCPHost started and ready")
self._clear_buffer()
return True
except Exception as e:
logger.error("Error waiting for prompt: {}", e)
return False
def _clear_buffer(self):
"""Clear any remaining output in the buffer"""
time.sleep(self.config.ECHO_DELAY)
try:
self.child.read_nonblocking(
size=self.config.MAX_READ_SIZE,
timeout=self.config.READ_TIMEOUT
)
except:
pass
def _clear_pending_output(self):
"""Clear any pending output from the process"""
try:
self.child.read_nonblocking(
size=self.config.MAX_READ_SIZE,
timeout=self.config.READ_TIMEOUT
)
except:
pass
def _send_command(self, prompt: str):
"""Send a command to the process"""
logger.debug("Sending prompt: {}", prompt)
self.child.send(prompt)
self.child.send('\r')
# Wait for the model to process
time.sleep(self.config.RESPONSE_WAIT_TIME)
def _collect_response(self) -> str:
"""Collect response from the process"""
response = ""
response_complete = False
with logger.catch(message="Error during response collection"):
while not response_complete:
try:
chunk = self.child.read_nonblocking(
size=self.config.CHUNK_SIZE,
timeout=3
)
if chunk:
response += chunk
logger.trace("Received chunk: {}", chunk[:50] + "..." if len(chunk) > 50 else chunk)
if self.config.PROMPT_INDICATOR in chunk:
response_complete = True
else:
break
except pexpect.TIMEOUT:
if response and self.config.PROMPT_INDICATOR in response:
response_complete = True
elif response:
logger.debug("Waiting for more response data...")
time.sleep(1)
continue
else:
break
except Exception as e:
logger.error("Error reading response: {}", e)
break
logger.debug("Collected response length: {} characters", len(response))
return response
# Configuration for available models
AVAILABLE_MODELS = [
{
"id": "mcphost-model",
"object": "model",
"created": 1686935002,
"owned_by": "mcphost",
"permission": [
{
"id": "modelcphost-" + str(uuid.uuid4())[:8],
"object": "model_permission",
"created": int(time.time()),
"allow_create_engine": False,
"allow_sampling": True,
"allow_logprobs": True,
"allow_search_indices": False,
"allow_view": True,
"allow_fine_tuning": False,
"organization": "*",
"group": None,
"is_blocking": False
}
],
"root": "mcphost-model",
"parent": None
},
]
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
temperature: Optional[float] = 1.0
stream: Optional[bool] = False
max_tokens: Optional[int] = None
# Setup logger
logger = setup_logger()
# Initialize the MCPHost manager
mcp_manager = MCPHostManager()
@@ -284,45 +33,6 @@ async def process_with_mcphost(messages: List[ChatMessage], model: str) -> str:
return response
def generate_id() -> str:
return str(uuid.uuid4())[:8]
async def stream_response(content: str, model: str):
words = content.split()
for i, word in enumerate(words):
chunk = {
"id": f"chatcmpl-{generate_id()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": word + " "
},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.1)
final_chunk = {
"id": f"chatcmpl-{generate_id()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan"""
@@ -417,5 +127,4 @@ async def health_check():
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)