mirror of
https://github.com/tadata-org/fastapi_mcp.git
synced 2025-04-13 23:32:11 +03:00
change to class based
This commit is contained in:
@@ -4,11 +4,11 @@ Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app.
|
||||
|
||||
from examples.apps import items
|
||||
|
||||
from fastapi_mcp import add_mcp_server
|
||||
from fastapi_mcp import FastApiMCP
|
||||
|
||||
|
||||
# Add MCP server to the FastAPI app
|
||||
mcp = add_mcp_server(
|
||||
mcp = FastApiMCP(
|
||||
items.app,
|
||||
mount_path="/mcp",
|
||||
name="Item API MCP",
|
||||
@@ -18,6 +18,9 @@ mcp = add_mcp_server(
|
||||
describe_all_responses=True,
|
||||
)
|
||||
|
||||
mcp.mount()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
|
||||
@@ -4,11 +4,11 @@ Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app.
|
||||
|
||||
from examples.apps import items
|
||||
|
||||
from fastapi_mcp import add_mcp_server
|
||||
from fastapi_mcp import FastApiMCP
|
||||
|
||||
|
||||
# Add MCP server to the FastAPI app
|
||||
mcp = add_mcp_server(
|
||||
mcp = FastApiMCP(
|
||||
items.app,
|
||||
mount_path="/mcp",
|
||||
name="Item API MCP",
|
||||
@@ -16,6 +16,8 @@ mcp = add_mcp_server(
|
||||
base_url="http://localhost:8000",
|
||||
)
|
||||
|
||||
mcp.mount()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
|
||||
@@ -12,11 +12,9 @@ except Exception:
|
||||
# Fallback for local development
|
||||
__version__ = "0.0.0.dev0"
|
||||
|
||||
from .server import add_mcp_server, create_mcp_server, mount_mcp_server
|
||||
from .server import FastApiMCP
|
||||
|
||||
|
||||
__all__ = [
|
||||
"add_mcp_server",
|
||||
"create_mcp_server",
|
||||
"mount_mcp_server",
|
||||
"FastApiMCP",
|
||||
]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Dict, Optional, Any, Tuple, List, Union, AsyncIterator
|
||||
from typing import Dict, Optional, Any, List, Union, AsyncIterator
|
||||
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.openapi.utils import get_openapi
|
||||
@@ -11,179 +11,152 @@ from fastapi_mcp.openapi.convert import convert_openapi_to_mcp_tools
|
||||
from fastapi_mcp.execute import execute_api_tool
|
||||
|
||||
|
||||
def create_mcp_server(
|
||||
app: FastAPI,
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
describe_all_responses: bool = False,
|
||||
describe_full_response_schema: bool = False,
|
||||
) -> Tuple[Server, Dict[str, Dict[str, Any]]]:
|
||||
"""
|
||||
Create an MCP server from a FastAPI app.
|
||||
class FastApiMCP:
|
||||
def __init__(
|
||||
self,
|
||||
fastapi: FastAPI,
|
||||
mount_path: str = "/mcp",
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
describe_all_responses: bool = False,
|
||||
describe_full_response_schema: bool = False,
|
||||
):
|
||||
self.operation_map: Dict[str, Dict[str, Any]]
|
||||
self.tools: List[types.Tool]
|
||||
|
||||
Args:
|
||||
app: The FastAPI application
|
||||
name: Name for the MCP server (defaults to app.title)
|
||||
description: Description for the MCP server (defaults to app.description)
|
||||
base_url: Base URL for API requests (defaults to http://localhost:$PORT)
|
||||
describe_all_responses: Whether to include all possible response schemas in tool descriptions
|
||||
describe_full_response_schema: Whether to include full json schema for responses in tool descriptions
|
||||
self.fastapi = fastapi
|
||||
self.name = name
|
||||
self.description = description
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- The created MCP Server instance (NOT mounted to the app)
|
||||
- A mapping of operation IDs to operation details for HTTP execution
|
||||
"""
|
||||
# Get OpenAPI schema from FastAPI app
|
||||
openapi_schema = get_openapi(
|
||||
title=app.title,
|
||||
version=app.version,
|
||||
openapi_version=app.openapi_version,
|
||||
description=app.description,
|
||||
routes=app.routes,
|
||||
)
|
||||
self._mount_path = mount_path
|
||||
self._base_url = base_url
|
||||
self._describe_all_responses = describe_all_responses
|
||||
self._describe_full_response_schema = describe_full_response_schema
|
||||
|
||||
# Get server name and description from app if not provided
|
||||
server_name = name or app.title or "FastAPI MCP"
|
||||
server_description = description or app.description
|
||||
self.mcp_server = self.create_server()
|
||||
|
||||
# Convert OpenAPI schema to MCP tools
|
||||
tools, operation_map = convert_openapi_to_mcp_tools(
|
||||
openapi_schema,
|
||||
describe_all_responses=describe_all_responses,
|
||||
describe_full_response_schema=describe_full_response_schema,
|
||||
)
|
||||
def create_server(self) -> Server:
|
||||
"""
|
||||
Create an MCP server from the FastAPI app.
|
||||
|
||||
# Determine base URL if not provided
|
||||
if not base_url:
|
||||
# Try to determine the base URL from FastAPI config
|
||||
if hasattr(app, "root_path") and app.root_path:
|
||||
base_url = app.root_path
|
||||
else:
|
||||
# Default to localhost with FastAPI default port
|
||||
port = 8000
|
||||
for route in app.routes:
|
||||
if hasattr(route, "app") and hasattr(route.app, "port"):
|
||||
port = route.app.port
|
||||
break
|
||||
base_url = f"http://localhost:{port}"
|
||||
Args:
|
||||
app: The FastAPI application
|
||||
name: Name for the MCP server (defaults to app.title)
|
||||
description: Description for the MCP server (defaults to app.description)
|
||||
base_url: Base URL for API requests (defaults to http://localhost:$PORT)
|
||||
describe_all_responses: Whether to include all possible response schemas in tool descriptions
|
||||
describe_full_response_schema: Whether to include full json schema for responses in tool descriptions
|
||||
|
||||
# Normalize base URL
|
||||
if base_url.endswith("/"):
|
||||
base_url = base_url[:-1]
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- The created MCP Server instance (NOT mounted to the app)
|
||||
- A mapping of operation IDs to operation details for HTTP execution
|
||||
"""
|
||||
# Get OpenAPI schema from FastAPI app
|
||||
openapi_schema = get_openapi(
|
||||
title=self.fastapi.title,
|
||||
version=self.fastapi.version,
|
||||
openapi_version=self.fastapi.openapi_version,
|
||||
description=self.fastapi.description,
|
||||
routes=self.fastapi.routes,
|
||||
)
|
||||
|
||||
# Create the MCP server
|
||||
mcp_server: Server = Server(server_name, server_description)
|
||||
# Get server name and description from app if not provided
|
||||
server_name = self.name or self.fastapi.title or "FastAPI MCP"
|
||||
server_description = self.description or self.fastapi.description
|
||||
|
||||
# Create a lifespan context manager to store the base_url and operation_map
|
||||
@asynccontextmanager
|
||||
async def server_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
|
||||
# Store context data that will be available to all server handlers
|
||||
context = {"base_url": base_url, "operation_map": operation_map}
|
||||
yield context
|
||||
# Convert OpenAPI schema to MCP tools
|
||||
self.tools, self.operation_map = convert_openapi_to_mcp_tools(
|
||||
openapi_schema,
|
||||
describe_all_responses=self._describe_all_responses,
|
||||
describe_full_response_schema=self._describe_full_response_schema,
|
||||
)
|
||||
|
||||
# Use our custom lifespan
|
||||
mcp_server.lifespan = server_lifespan
|
||||
# Determine base URL if not provided
|
||||
if not self._base_url:
|
||||
# Try to determine the base URL from FastAPI config
|
||||
if hasattr(self.fastapi, "root_path") and self.fastapi.root_path:
|
||||
self._base_url = self.fastapi.root_path
|
||||
else:
|
||||
# Default to localhost with FastAPI default port
|
||||
port = 8000
|
||||
for route in self.fastapi.routes:
|
||||
if hasattr(route, "app") and hasattr(route.app, "port"):
|
||||
port = route.app.port
|
||||
break
|
||||
self._base_url = f"http://localhost:{port}"
|
||||
|
||||
# Register handlers for tools
|
||||
@mcp_server.list_tools()
|
||||
async def handle_list_tools() -> List[types.Tool]:
|
||||
"""Handler for the tools/list request"""
|
||||
return tools
|
||||
# Normalize base URL
|
||||
if self._base_url.endswith("/"):
|
||||
self._base_url = self._base_url[:-1]
|
||||
|
||||
# Register the tool call handler
|
||||
@mcp_server.call_tool()
|
||||
async def handle_call_tool(
|
||||
name: str, arguments: Dict[str, Any]
|
||||
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
|
||||
"""Handler for the tools/call request"""
|
||||
# Get context from server lifespan
|
||||
ctx = mcp_server.request_context
|
||||
base_url = ctx.lifespan_context["base_url"]
|
||||
operation_map = ctx.lifespan_context["operation_map"]
|
||||
# Create the MCP server
|
||||
mcp_server: Server = Server(server_name, server_description)
|
||||
|
||||
# Execute the tool
|
||||
return await execute_api_tool(base_url, name, arguments, operation_map)
|
||||
# Create a lifespan context manager to store the base_url and operation_map
|
||||
@asynccontextmanager
|
||||
async def server_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
|
||||
# Store context data that will be available to all server handlers
|
||||
context = {"base_url": self._base_url, "operation_map": self.operation_map}
|
||||
yield context
|
||||
|
||||
return mcp_server, operation_map
|
||||
# Use our custom lifespan
|
||||
mcp_server.lifespan = server_lifespan
|
||||
|
||||
# Register handlers for tools
|
||||
@mcp_server.list_tools()
|
||||
async def handle_list_tools() -> List[types.Tool]:
|
||||
"""Handler for the tools/list request"""
|
||||
return self.tools
|
||||
|
||||
def mount_mcp_server(
|
||||
app: FastAPI,
|
||||
mcp_server: Server,
|
||||
operation_map: Dict[str, Dict[str, Any]],
|
||||
mount_path: str = "/mcp",
|
||||
base_url: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Mount an MCP server to a FastAPI app.
|
||||
# Register the tool call handler
|
||||
@mcp_server.call_tool()
|
||||
async def handle_call_tool(
|
||||
name: str, arguments: Dict[str, Any]
|
||||
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
|
||||
"""Handler for the tools/call request"""
|
||||
# Get context from server lifespan
|
||||
ctx = mcp_server.request_context
|
||||
base_url = ctx.lifespan_context["base_url"]
|
||||
operation_map = ctx.lifespan_context["operation_map"]
|
||||
|
||||
Args:
|
||||
app: The FastAPI application
|
||||
mcp_server: The MCP server to mount
|
||||
operation_map: A mapping of operation IDs to operation details
|
||||
mount_path: Path where the MCP server will be mounted
|
||||
base_url: Base URL for API requests
|
||||
"""
|
||||
# Normalize mount path
|
||||
if not mount_path.startswith("/"):
|
||||
mount_path = f"/{mount_path}"
|
||||
if mount_path.endswith("/"):
|
||||
mount_path = mount_path[:-1]
|
||||
# Execute the tool
|
||||
return await execute_api_tool(base_url, name, arguments, operation_map)
|
||||
|
||||
# Create SSE transport for MCP messages
|
||||
sse_transport = SseServerTransport(f"{mount_path}/messages/")
|
||||
return mcp_server
|
||||
|
||||
# Define MCP connection handler
|
||||
async def handle_mcp_connection(request: Request):
|
||||
async with sse_transport.connect_sse(request.scope, request.receive, request._send) as streams:
|
||||
await mcp_server.run(
|
||||
streams[0],
|
||||
streams[1],
|
||||
mcp_server.create_initialization_options(notification_options=None, experimental_capabilities={}),
|
||||
)
|
||||
def mount(self) -> None:
|
||||
"""
|
||||
Mount the MCP server to the FastAPI app.
|
||||
|
||||
# Mount the MCP connection handler
|
||||
app.get(mount_path)(handle_mcp_connection)
|
||||
app.mount(f"{mount_path}/messages/", app=sse_transport.handle_post_message)
|
||||
Args:
|
||||
app: The FastAPI application
|
||||
mcp_server: The MCP server to mount
|
||||
operation_map: A mapping of operation IDs to operation details
|
||||
mount_path: Path where the MCP server will be mounted
|
||||
base_url: Base URL for API requests
|
||||
"""
|
||||
# Normalize mount path
|
||||
if not self._mount_path.startswith("/"):
|
||||
self._mount_path = f"/{self._mount_path}"
|
||||
if self._mount_path.endswith("/"):
|
||||
self._mount_path = self._mount_path[:-1]
|
||||
|
||||
# Create SSE transport for MCP messages
|
||||
sse_transport = SseServerTransport(f"{self._mount_path}/messages/")
|
||||
|
||||
def add_mcp_server(
|
||||
app: FastAPI,
|
||||
mount_path: str = "/mcp",
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
describe_all_responses: bool = False,
|
||||
describe_full_response_schema: bool = False,
|
||||
) -> Server:
|
||||
"""
|
||||
Add an MCP server to a FastAPI app.
|
||||
# Define MCP connection handler
|
||||
async def handle_mcp_connection(request: Request):
|
||||
async with sse_transport.connect_sse(request.scope, request.receive, request._send) as streams:
|
||||
await self.mcp_server.run(
|
||||
streams[0],
|
||||
streams[1],
|
||||
self.mcp_server.create_initialization_options(
|
||||
notification_options=None, experimental_capabilities={}
|
||||
),
|
||||
)
|
||||
|
||||
Args:
|
||||
app: The FastAPI application
|
||||
mount_path: Path where the MCP server will be mounted
|
||||
name: Name for the MCP server (defaults to app.title)
|
||||
description: Description for the MCP server (defaults to app.description)
|
||||
base_url: Base URL for API requests (defaults to http://localhost:$PORT)
|
||||
describe_all_responses: Whether to include all possible response schemas in tool descriptions
|
||||
describe_full_response_schema: Whether to include full json schema for responses in tool descriptions
|
||||
|
||||
Returns:
|
||||
The MCP server instance that was created and mounted
|
||||
"""
|
||||
# Create MCP server
|
||||
mcp_server, operation_map = create_mcp_server(
|
||||
app,
|
||||
name,
|
||||
description,
|
||||
base_url,
|
||||
describe_all_responses=describe_all_responses,
|
||||
describe_full_response_schema=describe_full_response_schema,
|
||||
)
|
||||
|
||||
# Mount MCP server
|
||||
mount_mcp_server(app, mcp_server, operation_map, mount_path, base_url)
|
||||
|
||||
return mcp_server
|
||||
# Mount the MCP connection handler
|
||||
self.fastapi.get(self._mount_path)(handle_mcp_connection)
|
||||
self.fastapi.mount(f"{self._mount_path}/messages/", app=sse_transport.handle_post_message)
|
||||
|
||||
Reference in New Issue
Block a user