refactor and add more examples

This commit is contained in:
Shahar Abramov
2025-04-09 01:22:51 +03:00
parent 005d54ef0e
commit b2ebc5cf05
10 changed files with 164 additions and 133 deletions

View File

@@ -2,12 +2,11 @@
Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app. Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app.
""" """
from examples.apps import items from examples.shared.apps import items
from examples.setup import setup_logging from examples.shared.setup import setup_logging
from fastapi_mcp import FastApiMCP from fastapi_mcp import FastApiMCP
setup_logging() setup_logging()
@@ -17,8 +16,8 @@ mcp = FastApiMCP(
name="Item API MCP", name="Item API MCP",
description="MCP server for the Item API", description="MCP server for the Item API",
base_url="http://localhost:8000", base_url="http://localhost:8000",
describe_full_response_schema=True, describe_full_response_schema=True, # Describe the full response JSON-schema instead of just a response example
describe_all_responses=True, describe_all_responses=True, # Describe all the possible responses instead of just the success (2XX) response
) )
mcp.mount() mcp.mount()

View File

@@ -0,0 +1,48 @@
"""
Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app.
"""
from fastapi import FastAPI
import asyncio
import uvicorn
from examples.shared.apps import items
from examples.shared.setup import setup_logging
from fastapi_mcp import FastApiMCP
setup_logging()
MCP_SERVER_HOST = "localhost"
MCP_SERVER_PORT = 8000
ITEMS_API_HOST = "localhost"
ITEMS_API_PORT = 8001
# Take the FastAPI app only as a source for MCP server generation
mcp = FastApiMCP(
items.app,
base_url=f"http://{ITEMS_API_HOST}:{ITEMS_API_PORT}", # Note how the base URL is the **Items API** URL, not the MCP server URL
)
# And then mount the MCP server to a separate FastAPI app
mcp_app = FastAPI()
mcp.mount(mcp_app)
def run_items_app():
uvicorn.run(items.app, port=ITEMS_API_PORT)
def run_mcp_app():
uvicorn.run(mcp_app, port=MCP_SERVER_PORT)
# The MCP server depends on the Items API to be available, so we need to run both.
async def main():
await asyncio.gather(asyncio.to_thread(run_items_app), asyncio.to_thread(run_mcp_app))
if __name__ == "__main__":
asyncio.run(main())

View File

View File

@@ -2,12 +2,11 @@
Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app. Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app.
""" """
from examples.apps import items from examples.shared.apps import items
from examples.setup import setup_logging from examples.shared.setup import setup_logging
from fastapi_mcp import FastApiMCP from fastapi_mcp import FastApiMCP
setup_logging() setup_logging()

View File

@@ -1,94 +0,0 @@
import json
import logging
from typing import Any, Dict, List, Union
import httpx
import mcp.types as types
logger = logging.getLogger(__name__)
async def execute_api_tool(
base_url: str, tool_name: str, arguments: Dict[str, Any], operation_map: Dict[str, Dict[str, Any]]
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Execute an MCP tool by making an HTTP request to the corresponding API endpoint.
Args:
base_url: The base URL for the API
tool_name: The name of the tool to execute
arguments: The arguments for the tool
operation_map: A mapping from tool names to operation details
Returns:
The result as MCP content types
"""
if tool_name not in operation_map:
return [types.TextContent(type="text", text=f"Unknown tool: {tool_name}")]
operation = operation_map[tool_name]
path: str = operation["path"]
method: str = operation["method"]
parameters: List[Dict[str, Any]] = operation.get("parameters", [])
# Deep copy arguments to avoid modifying the original
kwargs = arguments.copy() if arguments else {}
# Prepare URL with path parameters
url = f"{base_url}{path}"
for param in parameters:
if param.get("in") == "path" and param.get("name") in kwargs:
param_name = param.get("name", None)
if param_name is None:
raise ValueError(f"Parameter name is None for parameter: {param}")
url = url.replace(f"{{{param_name}}}", str(kwargs.pop(param_name)))
# Prepare query parameters
query = {}
for param in parameters:
if param.get("in") == "query" and param.get("name") in kwargs:
param_name = param.get("name", None)
if param_name is None:
raise ValueError(f"Parameter name is None for parameter: {param}")
query[param_name] = kwargs.pop(param_name)
# Prepare headers
headers = {}
for param in parameters:
if param.get("in") == "header" and param.get("name") in kwargs:
param_name = param.get("name", None)
if param_name is None:
raise ValueError(f"Parameter name is None for parameter: {param}")
headers[param_name] = kwargs.pop(param_name)
# Prepare request body (remaining kwargs)
body = kwargs if kwargs else None
try:
# Make the request
logger.debug(f"Making {method.upper()} request to {url}")
async with httpx.AsyncClient() as client:
if method.lower() == "get":
response = await client.get(url, params=query, headers=headers)
elif method.lower() == "post":
response = await client.post(url, params=query, headers=headers, json=body)
elif method.lower() == "put":
response = await client.put(url, params=query, headers=headers, json=body)
elif method.lower() == "delete":
response = await client.delete(url, params=query, headers=headers)
elif method.lower() == "patch":
response = await client.patch(url, params=query, headers=headers, json=body)
else:
return [types.TextContent(type="text", text=f"Unsupported HTTP method: {method}")]
# Process the response
try:
result = response.json()
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
except ValueError:
return [types.TextContent(type="text", text=response.text)]
except Exception as e:
return [types.TextContent(type="text", text=f"Error calling {tool_name}: {str(e)}")]

View File

@@ -1,3 +1,5 @@
import json
import httpx
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Dict, Optional, Any, List, Union, AsyncIterator from typing import Dict, Optional, Any, List, Union, AsyncIterator
@@ -7,7 +9,6 @@ from mcp.server.lowlevel.server import Server
import mcp.types as types import mcp.types as types
from fastapi_mcp.openapi.convert import convert_openapi_to_mcp_tools from fastapi_mcp.openapi.convert import convert_openapi_to_mcp_tools
from fastapi_mcp.execute import execute_api_tool
from fastapi_mcp.transport.sse import FastApiSseTransport from fastapi_mcp.transport.sse import FastApiSseTransport
from logging import getLogger from logging import getLogger
@@ -37,7 +38,7 @@ class FastApiMCP:
self._describe_all_responses = describe_all_responses self._describe_all_responses = describe_all_responses
self._describe_full_response_schema = describe_full_response_schema self._describe_full_response_schema = describe_full_response_schema
self.mcp_server = self.create_server() self.server = self.create_server()
def create_server(self) -> Server: def create_server(self) -> Server:
""" """
@@ -127,13 +128,15 @@ class FastApiMCP:
operation_map = ctx.lifespan_context["operation_map"] operation_map = ctx.lifespan_context["operation_map"]
# Execute the tool # Execute the tool
return await execute_api_tool(base_url, name, arguments, operation_map) return await self.execute_api_tool(base_url, name, arguments, operation_map)
return mcp_server return mcp_server
def mount(self, router: Optional[FastAPI | APIRouter] = None, mount_path: str = "/mcp") -> None: def mount(self, router: Optional[FastAPI | APIRouter] = None, mount_path: str = "/mcp") -> None:
""" """
Mount the MCP server to the FastAPI app. Mount the MCP server to **any** FastAPI app or APIRouter.
There is no requirement that the FastAPI app or APIRouter is the same as the one that the MCP
server was created from.
Args: Args:
router: The FastAPI app or APIRouter to mount the MCP server to. If not provided, the MCP router: The FastAPI app or APIRouter to mount the MCP server to. If not provided, the MCP
@@ -156,12 +159,10 @@ class FastApiMCP:
@router.get(mount_path) @router.get(mount_path)
async def handle_mcp_connection(request: Request): async def handle_mcp_connection(request: Request):
async with sse_transport.connect_sse(request.scope, request.receive, request._send) as (reader, writer): async with sse_transport.connect_sse(request.scope, request.receive, request._send) as (reader, writer):
await self.mcp_server.run( await self.server.run(
reader, reader,
writer, writer,
self.mcp_server.create_initialization_options( self.server.create_initialization_options(notification_options=None, experimental_capabilities={}),
notification_options=None, experimental_capabilities={}
),
) )
# Route for MCP messages # Route for MCP messages
@@ -170,3 +171,84 @@ class FastApiMCP:
return await sse_transport.handle_fastapi_post_message(request) return await sse_transport.handle_fastapi_post_message(request)
logger.info(f"MCP server listening at {mount_path}") logger.info(f"MCP server listening at {mount_path}")
async def execute_api_tool(
self, base_url: str, tool_name: str, arguments: Dict[str, Any], operation_map: Dict[str, Dict[str, Any]]
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Execute an MCP tool by making an HTTP request to the corresponding API endpoint.
Args:
base_url: The base URL for the API
tool_name: The name of the tool to execute
arguments: The arguments for the tool
operation_map: A mapping from tool names to operation details
Returns:
The result as MCP content types
"""
if tool_name not in operation_map:
return [types.TextContent(type="text", text=f"Unknown tool: {tool_name}")]
operation = operation_map[tool_name]
path: str = operation["path"]
method: str = operation["method"]
parameters: List[Dict[str, Any]] = operation.get("parameters", [])
arguments = arguments.copy() if arguments else {} # Deep copy arguments to avoid mutating the original
# Prepare URL with path parameters
url = f"{base_url}{path}"
for param in parameters:
if param.get("in") == "path" and param.get("name") in arguments:
param_name = param.get("name", None)
if param_name is None:
raise ValueError(f"Parameter name is None for parameter: {param}")
url = url.replace(f"{{{param_name}}}", str(arguments.pop(param_name)))
# Prepare query parameters
query = {}
for param in parameters:
if param.get("in") == "query" and param.get("name") in arguments:
param_name = param.get("name", None)
if param_name is None:
raise ValueError(f"Parameter name is None for parameter: {param}")
query[param_name] = arguments.pop(param_name)
# Prepare headers
headers = {}
for param in parameters:
if param.get("in") == "header" and param.get("name") in arguments:
param_name = param.get("name", None)
if param_name is None:
raise ValueError(f"Parameter name is None for parameter: {param}")
headers[param_name] = arguments.pop(param_name)
# Prepare request body (remaining kwargs)
body = arguments if arguments else None
try:
# Make request
logger.debug(f"Making {method.upper()} request to {url}")
async with httpx.AsyncClient() as client:
if method.lower() == "get":
response = await client.get(url, params=query, headers=headers)
elif method.lower() == "post":
response = await client.post(url, params=query, headers=headers, json=body)
elif method.lower() == "put":
response = await client.put(url, params=query, headers=headers, json=body)
elif method.lower() == "delete":
response = await client.delete(url, params=query, headers=headers)
elif method.lower() == "patch":
response = await client.patch(url, params=query, headers=headers, json=body)
else:
return [types.TextContent(type="text", text=f"Unsupported HTTP method: {method}")]
# Process response
try:
result = response.json()
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
except ValueError:
return [types.TextContent(type="text", text=response.text)]
except Exception as e:
return [types.TextContent(type="text", text=f"Error calling {tool_name}: {str(e)}")]

View File

@@ -39,29 +39,6 @@ dependencies = [
"tomli>=2.2.1", "tomli>=2.2.1",
] ]
[project.urls]
Homepage = "https://github.com/tadata-org/fastapi_mcp"
Documentation = "https://github.com/tadata-org/fastapi_mcp#readme"
"Bug Tracker" = "https://github.com/tadata-org/fastapi_mcp/issues"
"PyPI" = "https://pypi.org/project/fastapi-mcp/"
"Source Code" = "https://github.com/tadata-org/fastapi_mcp"
"Changelog" = "https://github.com/tadata-org/fastapi_mcp/blob/main/CHANGELOG.md"
[project.scripts]
fastapi-mcp = "fastapi_mcp.cli:app"
[tool.hatch.build.targets.wheel]
packages = ["fastapi_mcp"]
[tool.ruff]
line-length = 120
target-version = "py310"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = "test_*.py"
[dependency-groups] [dependency-groups]
dev = [ dev = [
"mypy>=1.15.0", "mypy>=1.15.0",
@@ -72,3 +49,23 @@ dev = [
"pytest-cov>=4.1.0", "pytest-cov>=4.1.0",
"pre-commit>=4.2.0", "pre-commit>=4.2.0",
] ]
[project.urls]
Homepage = "https://github.com/tadata-org/fastapi_mcp"
Documentation = "https://github.com/tadata-org/fastapi_mcp#readme"
"Bug Tracker" = "https://github.com/tadata-org/fastapi_mcp/issues"
"PyPI" = "https://pypi.org/project/fastapi-mcp/"
"Source Code" = "https://github.com/tadata-org/fastapi_mcp"
"Changelog" = "https://github.com/tadata-org/fastapi_mcp/blob/main/CHANGELOG.md"
[tool.hatch.build.targets.wheel]
packages = ["fastapi_mcp"]
[tool.ruff]
line-length = 120
target-version = "py312"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = "test_*.py"