use sdk for stdio

This commit is contained in:
Kartik Sarangmath
2025-07-09 10:30:41 -07:00
parent 6cf2fae74d
commit 55b672a04f
5 changed files with 160 additions and 145 deletions

View File

@@ -42,7 +42,7 @@ Issues = "https://github.com/omnara-ai/omnara/issues"
omnara = "servers.mcp_server.stdio_server:main"
[tool.setuptools.packages.find]
include = ["omnara*", "servers*", "shared*", "backend*"]
include = ["omnara*", "servers", "servers.mcp_server"]
[tool.pytest.ini_options]
markers = [

View File

@@ -0,0 +1,86 @@
"""Tool descriptions for MCP server"""
LOG_STEP_DESCRIPTION = """Log a high-level step the agent is performing.
⚠️ CRITICAL: MUST be called for EVERY significant action:
• Before answering any user question or request
• When performing analysis, searches, or investigations
• When reading files, exploring code, or gathering information
• When making code changes, edits, or file modifications
• When running commands, tests, or terminal operations
• When providing explanations, solutions, or recommendations
• At the start of multi-step processes or complex tasks
This call retrieves unread user feedback that you MUST incorporate into your work.
Feedback may contain corrections, clarifications, or additional instructions that override your original plan.
Args:
agent_instance_id: Existing agent instance ID (optional). If omitted, creates a new instance for reuse in subsequent steps.
step_description: Clear, specific description of what you're about to do or currently doing.
⚠️ RETURNS USER FEEDBACK: If user_feedback is not empty, you MUST:
1. Read and understand each feedback message
2. Adjust your current approach based on the feedback
3. Acknowledge the feedback in your response
4. Prioritize user feedback over your original plan
Feedback is automatically marked as retrieved. If empty, continue as planned."""
ASK_QUESTION_DESCRIPTION = """🤖 INTERACTIVE: Ask the user a question and WAIT for their reply (BLOCKS execution).
⚠️ CRITICAL: ALWAYS call log_step BEFORE using this tool to track the interaction.
🎯 USE WHEN YOU NEED:
• Clarification on ambiguous requirements or unclear instructions
• User decision between multiple valid approaches or solutions
• Confirmation before making significant changes (deleting files, major refactors)
• Missing information that you cannot determine from context or codebase
• User preferences for implementation details (styling, naming, architecture)
• Validation of assumptions before proceeding with complex tasks
💡 BEST PRACTICES:
• Keep questions clear, specific, and actionable
• Provide context: explain WHY you're asking
• Offer options when multiple choices exist
• Ask one focused question at a time
• Include relevant details to help user decide
Args:
agent_instance_id: Current agent instance ID. REQUIRED.
question_text: Clear, specific question with sufficient context for the user to provide a helpful answer."""
END_SESSION_DESCRIPTION = """End the current agent session and mark it as completed.
⚠️ IMPORTANT: Before using this tool, you MUST:
1. Provide a comprehensive summary of all actions taken to complete the task
2. Use the ask_question tool to confirm with the user that the task is complete
3. Only proceed with end_session if the user confirms completion
Example confirmation question:
"I've completed the following tasks:
• [List of specific actions taken]
• [Key changes or implementations made]
• [Any important outcomes or results]
Is this task complete and ready to be marked as finished?"
If the user:
• Confirms completion → Use end_session tool
• Does NOT confirm → Continue working on their feedback or new requirements
• Requests additional work → Do NOT end the session, continue with the new tasks
Use this tool ONLY when:
• The user has explicitly confirmed the task is complete
• The user explicitly asks to end the session
• An unrecoverable error prevents any further work
This will:
• Mark the agent instance status as COMPLETED
• Set the session end time
• Deactivate any pending questions
• Prevent further updates to this session
Args:
agent_instance_id: Current agent instance ID to end. REQUIRED."""

View File

@@ -12,10 +12,12 @@ from fastmcp.server.dependencies import get_access_token
from shared.config import settings
from .models import AskQuestionResponse, EndSessionResponse, LogStepResponse
from .tools import (
from .descriptions import (
LOG_STEP_DESCRIPTION,
ASK_QUESTION_DESCRIPTION,
END_SESSION_DESCRIPTION,
)
from .tools import (
log_step_impl,
ask_question_impl,
end_session_impl,

View File

@@ -8,23 +8,17 @@ It provides the same functionality as the hosted server but uses stdio transport
import argparse
import asyncio
import logging
from collections.abc import Callable, Coroutine
from functools import wraps
from typing import Any, ParamSpec, TypeVar
from typing import Optional
from fastmcp import FastMCP
from shared.config import settings
from shared.database import Base
from shared.database.session import engine
from omnara.sdk import AsyncOmnaraClient
from omnara.sdk.exceptions import TimeoutError as OmnaraTimeoutError
from .models import AskQuestionResponse, EndSessionResponse, LogStepResponse
from .tools import (
from .descriptions import (
LOG_STEP_DESCRIPTION,
ASK_QUESTION_DESCRIPTION,
END_SESSION_DESCRIPTION,
log_step_impl,
ask_question_impl,
end_session_impl,
)
from .utils import detect_agent_type_from_environment
@@ -32,31 +26,15 @@ from .utils import detect_agent_type_from_environment
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Type variables for decorator
P = ParamSpec("P")
T = TypeVar("T")
# Global client instance
client: Optional[AsyncOmnaraClient] = None
def require_api_key(func: Callable[P, T]) -> Callable[P, Coroutine[Any, Any, T]]:
"""Decorator to ensure API key is provided for stdio server."""
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
# For stdio, we get the API key from command line args
# and use it as the user_id for simplicity
api_key = getattr(require_api_key, "_api_key", None)
if not api_key:
raise ValueError("API key is required. Use --api-key argument.")
# Add user_id to kwargs for use in the function
kwargs["_user_id"] = api_key
result = func(*args, **kwargs)
# Handle both sync and async functions
if asyncio.iscoroutine(result):
return await result
return result
return wrapper
def get_client() -> AsyncOmnaraClient:
"""Get the initialized AsyncOmnaraClient instance."""
if client is None:
raise RuntimeError("Client not initialized. Run main() first.")
return client
# Create FastMCP server
@@ -64,18 +42,24 @@ mcp = FastMCP("Omnara Agent Dashboard MCP Server")
@mcp.tool(name="log_step", description=LOG_STEP_DESCRIPTION)
@require_api_key
def log_step_tool(
async def log_step_tool(
agent_instance_id: str | None = None,
step_description: str = "",
_user_id: str = "", # Injected by decorator
) -> LogStepResponse:
agent_type = detect_agent_type_from_environment()
return log_step_impl(
agent_instance_id=agent_instance_id,
client = get_client()
response = await client.log_step(
agent_type=agent_type,
step_description=step_description,
user_id=_user_id,
agent_instance_id=agent_instance_id,
)
return LogStepResponse(
success=response.success,
agent_instance_id=response.agent_instance_id,
step_number=response.step_number,
user_feedback=response.user_feedback,
)
@@ -83,31 +67,50 @@ def log_step_tool(
name="ask_question",
description=ASK_QUESTION_DESCRIPTION,
)
@require_api_key
async def ask_question_tool(
agent_instance_id: str | None = None,
question_text: str | None = None,
_user_id: str = "", # Injected by decorator
) -> AskQuestionResponse:
return await ask_question_impl(
if not agent_instance_id:
raise ValueError("agent_instance_id is required")
if not question_text:
raise ValueError("question_text is required")
client = get_client()
try:
response = await client.ask_question(
agent_instance_id=agent_instance_id,
question_text=question_text,
user_id=_user_id,
timeout_minutes=1440, # 24 hours default
poll_interval=1.0,
)
return AskQuestionResponse(
answer=response.answer,
question_id=response.question_id,
)
except OmnaraTimeoutError:
raise TimeoutError("Question timed out waiting for user response")
@mcp.tool(
name="end_session",
description=END_SESSION_DESCRIPTION,
)
@require_api_key
def end_session_tool(
async def end_session_tool(
agent_instance_id: str,
_user_id: str = "", # Injected by decorator
) -> EndSessionResponse:
return end_session_impl(
client = get_client()
response = await client.end_session(
agent_instance_id=agent_instance_id,
user_id=_user_id,
)
return EndSessionResponse(
success=response.success,
agent_instance_id=response.agent_instance_id,
final_status=response.final_status,
)
@@ -115,18 +118,23 @@ def main():
"""Main entry point for the stdio server"""
parser = argparse.ArgumentParser(description="Omnara MCP Server (Stdio)")
parser.add_argument("--api-key", required=True, help="API key for authentication")
parser.add_argument(
"--base-url",
default="https://agent-dashboard-mcp.onrender.com",
help="Base URL of the Omnara API server",
)
args = parser.parse_args()
# Store API key for auth decorator
require_api_key._api_key = args.api_key
# Ensure database tables exist
Base.metadata.create_all(bind=engine)
logger.info("Database tables created/verified")
# Initialize the global client
global client
client = AsyncOmnaraClient(
api_key=args.api_key,
base_url=args.base_url,
)
logger.info("Starting Omnara MCP server (stdio)")
logger.info(f"Database URL configured: {settings.database_url[:50]}...")
logger.info(f"Using API server: {args.base_url}")
try:
# Run with stdio transport (default)
@@ -134,6 +142,10 @@ def main():
except Exception as e:
logger.error(f"Failed to start MCP server: {e}")
raise
finally:
# Clean up client
if client:
asyncio.run(client.close())
if __name__ == "__main__":

View File

@@ -16,91 +16,6 @@ from servers.shared.core import (
)
from .models import AskQuestionResponse, EndSessionResponse, LogStepResponse
LOG_STEP_DESCRIPTION = """Log a high-level step the agent is performing.
⚠️ CRITICAL: MUST be called for EVERY significant action:
• Before answering any user question or request
• When performing analysis, searches, or investigations
• When reading files, exploring code, or gathering information
• When making code changes, edits, or file modifications
• When running commands, tests, or terminal operations
• When providing explanations, solutions, or recommendations
• At the start of multi-step processes or complex tasks
This call retrieves unread user feedback that you MUST incorporate into your work.
Feedback may contain corrections, clarifications, or additional instructions that override your original plan.
Args:
agent_instance_id: Existing agent instance ID (optional). If omitted, creates a new instance for reuse in subsequent steps.
step_description: Clear, specific description of what you're about to do or currently doing.
⚠️ RETURNS USER FEEDBACK: If user_feedback is not empty, you MUST:
1. Read and understand each feedback message
2. Adjust your current approach based on the feedback
3. Acknowledge the feedback in your response
4. Prioritize user feedback over your original plan
Feedback is automatically marked as retrieved. If empty, continue as planned."""
ASK_QUESTION_DESCRIPTION = """🤖 INTERACTIVE: Ask the user a question and WAIT for their reply (BLOCKS execution).
⚠️ CRITICAL: ALWAYS call log_step BEFORE using this tool to track the interaction.
🎯 USE WHEN YOU NEED:
• Clarification on ambiguous requirements or unclear instructions
• User decision between multiple valid approaches or solutions
• Confirmation before making significant changes (deleting files, major refactors)
• Missing information that you cannot determine from context or codebase
• User preferences for implementation details (styling, naming, architecture)
• Validation of assumptions before proceeding with complex tasks
💡 BEST PRACTICES:
• Keep questions clear, specific, and actionable
• Provide context: explain WHY you're asking
• Offer options when multiple choices exist
• Ask one focused question at a time
• Include relevant details to help user decide
Args:
agent_instance_id: Current agent instance ID. REQUIRED.
question_text: Clear, specific question with sufficient context for the user to provide a helpful answer."""
END_SESSION_DESCRIPTION = """End the current agent session and mark it as completed.
⚠️ IMPORTANT: Before using this tool, you MUST:
1. Provide a comprehensive summary of all actions taken to complete the task
2. Use the ask_question tool to confirm with the user that the task is complete
3. Only proceed with end_session if the user confirms completion
Example confirmation question:
"I've completed the following tasks:
• [List of specific actions taken]
• [Key changes or implementations made]
• [Any important outcomes or results]
Is this task complete and ready to be marked as finished?"
If the user:
• Confirms completion → Use end_session tool
• Does NOT confirm → Continue working on their feedback or new requirements
• Requests additional work → Do NOT end the session, continue with the new tasks
Use this tool ONLY when:
• The user has explicitly confirmed the task is complete
• The user explicitly asks to end the session
• An unrecoverable error prevents any further work
This will:
• Mark the agent instance status as COMPLETED
• Set the session end time
• Deactivate any pending questions
• Prevent further updates to this session
Args:
agent_instance_id: Current agent instance ID to end. REQUIRED."""
def log_step_impl(
agent_instance_id: str | None = None,