Claude code hooks (#16)

* intermediate commit

* auth sent to webhook

* mock

* mock

---------

Co-authored-by: Kartik Sarangmath <kartiksarangmath@Kartiks-MacBook-Air.local>
This commit is contained in:
ksarangmath
2025-07-14 20:42:47 -07:00
committed by GitHub
parent 97e0b12d99
commit f0d1f98000
5 changed files with 333 additions and 55 deletions

View File

@@ -5,13 +5,15 @@ Database queries for UserAgent operations.
import httpx
from datetime import datetime, timezone
from uuid import UUID
import hashlib
from shared.database import UserAgent, AgentInstance, AgentStatus
from shared.database import UserAgent, AgentInstance, AgentStatus, APIKey
from sqlalchemy import and_, func
from sqlalchemy.orm import Session, joinedload
from ..models import UserAgentRequest, WebhookTriggerResponse
from .queries import _format_instance
from ..auth.jwt_utils import create_api_key_jwt
def create_user_agent(
@@ -98,25 +100,48 @@ async def trigger_webhook_agent(
db.commit()
db.refresh(instance)
# Get or create an Omnara API key for this agent
api_key_name = f"{user_agent.name} Key"
# Check if an API key already exists for this agent
existing_key = (
db.query(APIKey)
.filter(
and_(
APIKey.user_id == user_id,
APIKey.name == api_key_name,
APIKey.is_active,
)
)
.first()
)
if existing_key:
omnara_api_key = existing_key.api_key
else:
# Create a new API key
jwt_token = create_api_key_jwt(
user_id=str(user_id),
expires_in_days=None, # No expiration
)
# Store API key in database
api_key = APIKey(
user_id=user_id,
name=api_key_name,
api_key_hash=hashlib.sha256(jwt_token.encode()).hexdigest(),
api_key=jwt_token,
expires_at=None, # No expiration
)
db.add(api_key)
db.commit()
omnara_api_key = jwt_token
# Prepare webhook payload
payload = {
"agent_instance_id": str(instance.id),
"prompt": prompt,
"omnara_api_key": user_agent.webhook_api_key,
"omnara_tools": {
"log_step": {
"description": "Log a step in the agent's execution",
"endpoint": "/api/v1/mcp/tools/log_step",
},
"ask_question": {
"description": "Ask a question to the user",
"endpoint": "/api/v1/mcp/tools/ask_question",
},
"end_session": {
"description": "End the agent session",
"endpoint": "/api/v1/mcp/tools/end_session",
},
},
}
# Call the webhook
@@ -130,6 +155,7 @@ async def trigger_webhook_agent(
if user_agent.webhook_api_key
else "",
"Content-Type": "application/json",
"X-Omnara-Api-Key": omnara_api_key, # Add the Omnara API key header
},
)
response.raise_for_status()

View File

@@ -2,7 +2,7 @@
from datetime import datetime, timezone
from uuid import uuid4
from unittest.mock import patch
from unittest.mock import patch, AsyncMock
from shared.database.models import UserAgent, AgentInstance, User
from shared.database.enums import AgentStatus
@@ -230,9 +230,10 @@ class TestUserAgentEndpoints:
# Mock the async webhook function
with patch(
"backend.db.user_agent_queries.trigger_webhook_agent"
"backend.api.user_agents.trigger_webhook_agent",
new_callable=AsyncMock,
) as mock_trigger:
# Use AsyncMock to properly mock the async function
# Set the return value for the async mock
mock_trigger.return_value = WebhookTriggerResponse(
success=True,
agent_instance_id=str(uuid4()),

View File

@@ -22,11 +22,16 @@ def run_stdio_server(args):
]
if args.base_url:
cmd.extend(["--base-url", args.base_url])
if (
hasattr(args, "claude_code_permission_tool")
and args.claude_code_permission_tool
):
cmd.append("--claude-code-permission-tool")
subprocess.run(cmd)
def run_webhook_server(cloudflare_tunnel=False):
def run_webhook_server(cloudflare_tunnel=False, dangerously_skip_permissions=False):
"""Run the Claude Code webhook FastAPI server"""
cloudflared_process = None
@@ -63,14 +68,12 @@ def run_webhook_server(cloudflare_tunnel=False):
cmd = [
sys.executable,
"-m",
"uvicorn",
"webhooks.claude_code:app",
"--host",
"0.0.0.0",
"--port",
"6662",
"webhooks.claude_code",
]
if dangerously_skip_permissions:
cmd.append("--dangerously-skip-permissions")
print("[INFO] Starting Claude Code webhook server on port 6662")
try:
@@ -124,6 +127,11 @@ Examples:
action="store_true",
help="Run Cloudflare tunnel for the webhook server (webhook mode only)",
)
parser.add_argument(
"--dangerously-skip-permissions",
action="store_true",
help="Skip permission prompts in Claude Code (webhook mode only) - USE WITH CAUTION",
)
# Arguments for stdio mode
parser.add_argument(
@@ -134,6 +142,11 @@ Examples:
default="https://agent-dashboard-mcp.onrender.com",
help="Base URL of the Omnara API server (stdio mode only)",
)
parser.add_argument(
"--claude-code-permission-tool",
action="store_true",
help="Enable Claude Code permission prompt tool for handling tool execution approvals (stdio mode only)",
)
args = parser.parse_args()
@@ -141,7 +154,10 @@ Examples:
parser.error("--cloudflare-tunnel can only be used with --claude-code-webhook")
if args.claude_code_webhook:
run_webhook_server(cloudflare_tunnel=args.cloudflare_tunnel)
run_webhook_server(
cloudflare_tunnel=args.cloudflare_tunnel,
dangerously_skip_permissions=args.dangerously_skip_permissions,
)
else:
if not args.api_key:
parser.error("--api-key is required for stdio mode")

View File

@@ -29,6 +29,9 @@ logger = logging.getLogger(__name__)
# Global client instance
client: Optional[AsyncOmnaraClient] = None
# Global state for current agent instance (primarily used for claude code approval tool)
current_agent_instance_id: Optional[str] = None
def get_client() -> AsyncOmnaraClient:
"""Get the initialized AsyncOmnaraClient instance."""
@@ -48,6 +51,8 @@ async def log_step_tool(
agent_instance_id: str | None = None,
step_description: str = "",
) -> LogStepResponse:
global current_agent_instance_id
agent_type = detect_agent_type_from_environment()
client = get_client()
@@ -57,6 +62,9 @@ async def log_step_tool(
agent_instance_id=agent_instance_id,
)
# Store the instance ID for use by other tools
current_agent_instance_id = response.agent_instance_id
return LogStepResponse(
success=response.success,
agent_instance_id=response.agent_instance_id,
@@ -73,6 +81,8 @@ async def ask_question_tool(
agent_instance_id: str | None = None,
question_text: str | None = None,
) -> AskQuestionResponse:
global current_agent_instance_id
if not agent_instance_id:
raise ValueError("agent_instance_id is required")
if not question_text:
@@ -80,12 +90,15 @@ async def ask_question_tool(
client = get_client()
# Store the instance ID for use by other tools
current_agent_instance_id = agent_instance_id
try:
response = await client.ask_question(
agent_instance_id=agent_instance_id,
question_text=question_text,
timeout_minutes=1440, # 24 hours default
poll_interval=1.0,
poll_interval=10.0,
)
return AskQuestionResponse(
@@ -116,6 +129,69 @@ async def end_session_tool(
)
@mcp.tool(
name="approve",
description="Handle permission prompts for Claude Code. Returns approval/denial for tool execution.",
enabled=False,
)
async def approve_tool(
tool_name: str,
input: dict,
tool_use_id: Optional[str] = None,
) -> dict:
"""Claude Code permission prompt handler."""
global current_agent_instance_id
if not tool_name:
raise ValueError("tool_name is required")
client = get_client()
# Format the permission request as a question
question_text = f"Allow execution of {tool_name}? Input: {input}"
try:
# Use existing instance ID or create a new one
if current_agent_instance_id:
instance_id = current_agent_instance_id
else:
# Only create a new instance if we don't have one
response = await client.log_step(
agent_type="Claude Code",
step_description="Permission request",
agent_instance_id=None,
)
instance_id = response.agent_instance_id
current_agent_instance_id = instance_id
# Ask the permission question
answer_response = await client.ask_question(
agent_instance_id=instance_id,
question_text=question_text,
timeout_minutes=1440,
poll_interval=10.0,
)
# Parse the answer to determine approval
answer = answer_response.answer.lower().strip()
if answer in ["yes", "y", "allow", "approve", "ok"]:
return {
"behavior": "allow",
"updatedInput": input,
}
else:
return {
"behavior": "deny",
"message": f"Permission denied by user: {answer_response.answer}",
}
except OmnaraTimeoutError:
return {
"behavior": "deny",
"message": "Permission request timed out",
}
def main():
"""Main entry point for the stdio server"""
parser = argparse.ArgumentParser(description="Omnara MCP Server (Stdio)")
@@ -125,6 +201,11 @@ def main():
default="https://agent-dashboard-mcp.onrender.com",
help="Base URL of the Omnara API server",
)
parser.add_argument(
"--claude-code-permission-tool",
action="store_true",
help="Enable Claude Code permission prompt tool for handling tool execution approvals",
)
args = parser.parse_args()
@@ -135,8 +216,16 @@ def main():
base_url=args.base_url,
)
# Enable/disable tools based on feature flags
if args.claude_code_permission_tool:
approve_tool.enable()
logger.info("Claude Code permission tool enabled")
logger.info("Starting Omnara MCP server (stdio)")
logger.info(f"Using API server: {args.base_url}")
logger.info(
f"Claude Code permission tool: {'enabled' if args.claude_code_permission_tool else 'disabled'}"
)
try:
# Run with stdio transport (default)

View File

@@ -1,4 +1,5 @@
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import JSONResponse
import subprocess
import shlex
from datetime import datetime
@@ -7,6 +8,8 @@ import os
import re
import uuid
import uvicorn
import time
import json
from contextlib import asynccontextmanager
from pydantic import BaseModel, field_validator
@@ -51,8 +54,14 @@ async def lifespan(app: FastAPI):
app.state.webhook_secret = secret
# Initialize the flag if not already set (when run via uvicorn directly)
if not hasattr(app.state, "dangerously_skip_permissions"):
app.state.dangerously_skip_permissions = False
print(f"[IMPORTANT] Webhook secret: {secret}")
print("[IMPORTANT] Use this secret in the Authorization header as: Bearer <secret>")
if app.state.dangerously_skip_permissions:
print("[WARNING] Running with --dangerously-skip-permissions flag enabled!")
yield
if hasattr(app.state, "webhook_secret"):
@@ -61,6 +70,19 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
print(f"[ERROR] Unhandled exception: {str(exc)}")
print(f"[ERROR] Exception type: {type(exc).__name__}")
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500, content={"detail": f"Internal server error: {str(exc)}"}
)
SYSTEM_PROMPT = """
You are now in Omnara-only communication mode.
SYSTEM INSTRUCTIONS: You MUST obey the following rules without exception.
@@ -126,15 +148,25 @@ def verify_auth(request: Request, authorization: str = Header(None)) -> bool:
@app.post("/")
async def start_claude(
request: Request, webhook_data: WebhookRequest, authorization: str = Header(None)
request: Request,
webhook_data: WebhookRequest,
authorization: str = Header(None),
x_omnara_api_key: str = Header(None, alias="X-Omnara-Api-Key"),
):
try:
if not verify_auth(request, authorization):
raise HTTPException(status_code=401, detail="Invalid or missing authorization")
raise HTTPException(
status_code=401, detail="Invalid or missing authorization"
)
agent_instance_id = webhook_data.agent_instance_id
prompt = webhook_data.prompt
name = webhook_data.name
print(
f"[INFO] Received webhook request for agent instance: {agent_instance_id}"
)
safe_prompt = SYSTEM_PROMPT.replace("{{agent_instance_id}}", agent_instance_id)
safe_prompt += f"\n\n\n{prompt}"
@@ -151,8 +183,6 @@ async def start_claude(
if not work_dir.startswith(base_dir):
raise HTTPException(status_code=400, detail="Invalid working directory")
try:
result = subprocess.run(
[
"git",
@@ -177,10 +207,83 @@ async def start_claude(
screen_name = f"{screen_prefix}-{safe_timestamp}"
escaped_prompt = shlex.quote(safe_prompt)
claude_cmd = f"claude --dangerously-skip-permissions {escaped_prompt}"
# First, check if required commands are available
screen_check = subprocess.run(
["which", "screen"],
capture_output=True,
text=True,
)
if screen_check.returncode != 0:
raise HTTPException(
status_code=500,
detail="screen command not found. Please install screen.",
)
# Check if claude command is available
claude_check = subprocess.run(
["which", "claude"],
capture_output=True,
text=True,
)
if claude_check.returncode != 0:
raise HTTPException(
status_code=500,
detail="claude command not found. Please install Claude Code CLI.",
)
claude_path = claude_check.stdout.strip()
# Get Omnara API key from header
if not x_omnara_api_key:
raise HTTPException(
status_code=400,
detail="Omnara API key required. Provide via X-Omnara-Api-Key header.",
)
omnara_api_key = x_omnara_api_key
# Create MCP config as a JSON string
mcp_config = {
"mcpServers": {
"omnara": {
"command": "omnara",
"args": [
"--api-key",
omnara_api_key,
"--claude-code-permission-tool",
],
}
}
}
mcp_config_str = json.dumps(mcp_config)
# Build claude command with MCP config as string
claude_args = [
claude_path, # Use full path to claude
"--mcp-config",
mcp_config_str,
"--allowedTools",
"mcp__omnara__approve,mcp__omnara__log_step,mcp__omnara__ask_question,mcp__omnara__end_session",
]
# Add permissions flag based on configuration
if request.app.state.dangerously_skip_permissions:
claude_args.append("--dangerously-skip-permissions")
else:
claude_args.extend(
["-p", "--permission-prompt-tool", "mcp__omnara__approve"]
)
# Add the prompt to claude args
claude_args.append(escaped_prompt)
print(f"[INFO] Claude command: {' '.join(claude_args)}")
print(f"[INFO] Working directory: {work_dir}")
# Start screen directly with the claude command
screen_cmd = ["screen", "-dmS", screen_name] + claude_args
screen_result = subprocess.run(
["screen", "-dmS", screen_name, "bash", "-c", claude_cmd],
screen_cmd,
cwd=work_dir,
capture_output=True,
text=True,
@@ -194,6 +297,25 @@ async def start_claude(
detail=f"Failed to start screen session: {screen_result.stderr}",
)
# Wait a moment and check if screen is still running
time.sleep(1)
# Check if the screen session exists
list_result = subprocess.run(
["screen", "-ls"],
capture_output=True,
text=True,
)
if (
"No Sockets found" in list_result.stdout
or screen_name not in list_result.stdout
):
raise HTTPException(
status_code=500,
detail=f"Screen session started but exited immediately. Session name: {screen_name}",
)
print(f"[INFO] Started screen session: {screen_name}")
print(f"[INFO] To attach: screen -r {screen_name}")
@@ -201,11 +323,21 @@ async def start_claude(
"message": "Successfully started claude",
"branch": feature_branch_name,
"screen_session": screen_name,
"work_dir": work_dir,
}
except subprocess.TimeoutExpired:
print("[ERROR] Git operation timed out")
raise HTTPException(status_code=500, detail="Git operation timed out")
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as e:
print(f"[ERROR] Failed to start claude: {str(e)}")
print(f"[ERROR] Exception type: {type(e).__name__}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to start claude: {str(e)}")
@@ -216,4 +348,18 @@ async def health_check():
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Claude Code Webhook Server")
parser.add_argument(
"--dangerously-skip-permissions",
action="store_true",
help="Skip permission prompts in Claude Code - USE WITH CAUTION",
)
args = parser.parse_args()
# Store the flag in a global variable for the app to use
app.state.dangerously_skip_permissions = args.dangerously_skip_permissions
uvicorn.run(app, host="0.0.0.0", port=6662)