diff --git a/api.logs b/api.logs new file mode 100644 index 0000000..b00b111 --- /dev/null +++ b/api.logs @@ -0,0 +1,85 @@ +INFO:__main__:====================================================================== +INFO:__main__:PERFORMING STARTUP GPU HEALTH CHECK +INFO:__main__:====================================================================== +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 1.04s +INFO:__main__:====================================================================== +INFO:__main__:STARTUP GPU CHECK SUCCESSFUL +INFO:__main__:GPU Device: NVIDIA GeForce RTX 3060 +INFO:__main__:Memory Available: 11.66 GB +INFO:__main__:Test Duration: 1.04s +INFO:__main__:====================================================================== +INFO:__main__:Starting Whisper REST API server on 0.0.0.0:8000 +INFO: Started server process [69821] +INFO: Waiting for application startup. +INFO:__main__:Starting job queue and health monitor... +INFO:core.job_queue:Starting job queue (max size: 100) +INFO:core.job_queue:Loading jobs from /media/raid/agents/tools/mcp-transcriptor/outputs/jobs +INFO:core.job_queue:Loaded 8 jobs from disk +INFO:core.job_queue:Job queue worker loop started +INFO:core.job_queue:Job queue worker started +INFO:__main__:Job queue started (max_size=100, metadata_dir=/media/raid/agents/tools/mcp-transcriptor/outputs/jobs) +INFO:core.gpu_health:Starting GPU health monitor (interval: 10.0 minutes) +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.37s +INFO:__main__:GPU health monitor started (interval=10 minutes) +INFO: Application startup complete. +INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) +INFO: 127.0.0.1:48092 - "GET /jobs HTTP/1.1" 200 OK +INFO: 127.0.0.1:60874 - "GET /jobs?status=completed&limit=3 HTTP/1.1" 200 OK +INFO: 127.0.0.1:60876 - "GET /jobs?status=failed&limit=10 HTTP/1.1" 200 OK +INFO:core.job_queue:Running GPU health check before job submission +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.39s +INFO:core.job_queue:GPU health check passed +INFO:core.job_queue:Job 6be8e49a-bdc1-4508-af99-280bef033cb0 submitted: /tmp/whisper_test_voice_1s.mp3 (queue position: 1) +INFO: 127.0.0.1:58376 - "POST /jobs HTTP/1.1" 200 OK +INFO:core.job_queue:Job 6be8e49a-bdc1-4508-af99-280bef033cb0 started processing +INFO:core.model_manager:Running GPU health check with auto-reset before model loading +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.54s +INFO:core.model_manager:Loading Whisper model: tiny device: cuda compute type: float16 +INFO:core.model_manager:Available GPU memory: 12.52 GB +INFO:core.model_manager:Enabling batch processing acceleration, batch size: 16 +INFO:core.transcriber:Starting transcription of file: whisper_test_voice_1s.mp3 +INFO:utils.audio_processor:Successfully preprocessed audio: whisper_test_voice_1s.mp3 +INFO:core.transcriber:Using batch acceleration for transcription... +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:VAD filter removed 00:00.000 of audio +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.transcriber:Transcription completed, time used: 0.16 seconds, detected language: en, audio length: 1.51 seconds +INFO:core.transcriber:Transcription results saved to: /media/raid/agents/tools/mcp-transcriptor/outputs/whisper_test_voice_1s.txt +INFO:core.job_queue:Job 6be8e49a-bdc1-4508-af99-280bef033cb0 completed successfully: /media/raid/agents/tools/mcp-transcriptor/outputs/whisper_test_voice_1s.txt +INFO:core.job_queue:Job 6be8e49a-bdc1-4508-af99-280bef033cb0 finished: status=completed, duration=1.1s +INFO: 127.0.0.1:41646 - "GET /jobs/6be8e49a-bdc1-4508-af99-280bef033cb0 HTTP/1.1" 200 OK +INFO: 127.0.0.1:34046 - "GET /jobs/6be8e49a-bdc1-4508-af99-280bef033cb0/result HTTP/1.1" 200 OK +INFO:core.job_queue:Running GPU health check before job submission +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.39s +INFO:core.job_queue:GPU health check passed +INFO:core.job_queue:Job 41ce74c0-8929-457b-96b3-1b8e4a720a7a submitted: /home/uad/agents/tools/mcp-transcriptor/data/test.mp3 (queue position: 1) +INFO: 127.0.0.1:44576 - "POST /jobs HTTP/1.1" 200 OK +INFO:core.job_queue:Job 41ce74c0-8929-457b-96b3-1b8e4a720a7a started processing +INFO:core.model_manager:Running GPU health check with auto-reset before model loading +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.39s +INFO:core.model_manager:Loading Whisper model: large-v3 device: cuda compute type: float16 +INFO:core.model_manager:Available GPU memory: 12.52 GB +INFO:core.model_manager:Enabling batch processing acceleration, batch size: 16 +INFO:core.transcriber:Starting transcription of file: test.mp3 +INFO:utils.audio_processor:Successfully preprocessed audio: test.mp3 +INFO:core.transcriber:Using batch acceleration for transcription... +INFO:faster_whisper:Processing audio with duration 00:06.955 +INFO:faster_whisper:VAD filter removed 00:00.299 of audio +INFO:core.transcriber:Transcription completed, time used: 0.52 seconds, detected language: en, audio length: 6.95 seconds +INFO:core.transcriber:Transcription results saved to: /media/raid/agents/tools/mcp-transcriptor/outputs/test.txt +INFO:core.job_queue:Job 41ce74c0-8929-457b-96b3-1b8e4a720a7a completed successfully: /media/raid/agents/tools/mcp-transcriptor/outputs/test.txt +INFO:core.job_queue:Job 41ce74c0-8929-457b-96b3-1b8e4a720a7a finished: status=completed, duration=23.3s +INFO: 127.0.0.1:59120 - "GET /jobs/41ce74c0-8929-457b-96b3-1b8e4a720a7a HTTP/1.1" 200 OK +INFO: 127.0.0.1:53806 - "GET /jobs/41ce74c0-8929-457b-96b3-1b8e4a720a7a/result HTTP/1.1" 200 OK diff --git a/mcp.logs b/mcp.logs index 8bb7525..7f4bb24 100644 --- a/mcp.logs +++ b/mcp.logs @@ -1 +1,25 @@ starting mcp server for whisper stt transcriptor +INFO:__main__:====================================================================== +INFO:__main__:PERFORMING STARTUP GPU HEALTH CHECK +INFO:__main__:====================================================================== +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.93s +INFO:__main__:====================================================================== +INFO:__main__:STARTUP GPU CHECK SUCCESSFUL +INFO:__main__:GPU Device: NVIDIA GeForce RTX 3060 +INFO:__main__:Memory Available: 11.66 GB +INFO:__main__:Test Duration: 0.93s +INFO:__main__:====================================================================== +INFO:__main__:Initializing job queue... +INFO:core.job_queue:Starting job queue (max size: 100) +INFO:core.job_queue:Loading jobs from /media/raid/agents/tools/mcp-transcriptor/outputs/jobs +INFO:core.job_queue:Loaded 5 jobs from disk +INFO:core.job_queue:Job queue worker loop started +INFO:core.job_queue:Job queue worker started +INFO:__main__:Job queue started (max_size=100, metadata_dir=/media/raid/agents/tools/mcp-transcriptor/outputs/jobs) +INFO:core.gpu_health:Starting GPU health monitor (interval: 10.0 minutes) +INFO:faster_whisper:Processing audio with duration 00:01.512 +INFO:faster_whisper:Detected language 'en' with probability 0.95 +INFO:core.gpu_health:GPU health check passed: NVIDIA GeForce RTX 3060, test duration: 0.38s +INFO:__main__:GPU health monitor started (interval=10 minutes) diff --git a/run_api_server.sh b/run_api_server.sh index 8e1171f..0e52387 100755 --- a/run_api_server.sh +++ b/run_api_server.sh @@ -32,6 +32,16 @@ export API_PORT="8000" # GPU Auto-Reset Configuration export GPU_RESET_COOLDOWN_MINUTES=5 # Minimum time between GPU reset attempts +# Job Queue Configuration +export JOB_QUEUE_MAX_SIZE=100 +export JOB_METADATA_DIR="/media/raid/agents/tools/mcp-transcriptor/outputs/jobs" +export JOB_RETENTION_DAYS=7 + +# GPU Health Monitoring +export GPU_HEALTH_CHECK_ENABLED=true +export GPU_HEALTH_CHECK_INTERVAL_MINUTES=10 +export GPU_HEALTH_TEST_MODEL="tiny" + # Log start of the script echo "$(datetime_prefix) Starting Whisper REST API server..." echo "$(datetime_prefix) Model directory: $WHISPER_MODEL_DIR" @@ -46,6 +56,7 @@ fi # Ensure output directories exist mkdir -p "$TRANSCRIPTION_OUTPUT_DIR" mkdir -p "$TRANSCRIPTION_BATCH_OUTPUT_DIR" +mkdir -p "$JOB_METADATA_DIR" # Run the API server /home/uad/agents/tools/mcp-transcriptor/venv/bin/python -u /home/uad/agents/tools/mcp-transcriptor/src/servers/api_server.py 2>&1 | tee /home/uad/agents/tools/mcp-transcriptor/api.logs diff --git a/run_mcp_server.sh b/run_mcp_server.sh index c978550..7cecdde 100755 --- a/run_mcp_server.sh +++ b/run_mcp_server.sh @@ -32,6 +32,16 @@ export TRANSCRIPTION_FILENAME_PREFIX="test_" # GPU Auto-Reset Configuration export GPU_RESET_COOLDOWN_MINUTES=5 # Minimum time between GPU reset attempts +# Job Queue Configuration +export JOB_QUEUE_MAX_SIZE=100 +export JOB_METADATA_DIR="/media/raid/agents/tools/mcp-transcriptor/outputs/jobs" +export JOB_RETENTION_DAYS=7 + +# GPU Health Monitoring +export GPU_HEALTH_CHECK_ENABLED=true +export GPU_HEALTH_CHECK_INTERVAL_MINUTES=10 +export GPU_HEALTH_TEST_MODEL="tiny" + # Log start of the script echo "$(datetime_prefix) Starting whisper server script..." echo "test: $WHISPER_MODEL_DIR" @@ -42,6 +52,9 @@ if [ ! -d "$WHISPER_MODEL_DIR" ]; then exit 1 fi +# Ensure job metadata directory exists +mkdir -p "$JOB_METADATA_DIR" + # Run the Python script with the defined environment variables #/home/uad/agents/tools/mcp-transcriptor/venv/bin/python /home/uad/agents/tools/mcp-transcriptor/whisper_server.py 2>&1 | tee /home/uad/agents/tools/mcp-transcriptor/mcp.logs /home/uad/agents/tools/mcp-transcriptor/venv/bin/python -u /home/uad/agents/tools/mcp-transcriptor/src/servers/whisper_server.py 2>&1 | tee /home/uad/agents/tools/mcp-transcriptor/mcp.logs diff --git a/src/core/job_queue.py b/src/core/job_queue.py index 04a4ef4..670c8ef 100644 --- a/src/core/job_queue.py +++ b/src/core/job_queue.py @@ -141,7 +141,7 @@ class JobQueue: self._worker_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._current_job_id: Optional[str] = None - self._lock = threading.Lock() + self._lock = threading.RLock() # Use RLock to allow re-entrant locking self._max_queue_size = max_queue_size # Create metadata directory diff --git a/src/core/transcriber.py b/src/core/transcriber.py index 8b47e49..9150129 100644 --- a/src/core/transcriber.py +++ b/src/core/transcriber.py @@ -76,9 +76,10 @@ def transcribe_audio( temperature = temperature if temperature is not None else DEFAULT_TEMPERATURE # Validate audio file - validation_result = validate_audio_file(audio_path) - if validation_result != "ok": - return validation_result + try: + validate_audio_file(audio_path) + except (FileNotFoundError, ValueError, OSError) as e: + return f"Error: {str(e)}" try: # Get model instance diff --git a/src/servers/api_server.py b/src/servers/api_server.py index bad48ec..1bc7fa9 100644 --- a/src/servers/api_server.py +++ b/src/servers/api_server.py @@ -7,14 +7,17 @@ Provides HTTP REST endpoints for audio transcription import os import sys import logging -from typing import Optional +import queue as queue_module +from contextlib import asynccontextmanager +from typing import Optional, List from fastapi import FastAPI, HTTPException, UploadFile, File, Form from fastapi.responses import JSONResponse, FileResponse from pydantic import BaseModel, Field import json from core.model_manager import get_model_info -from core.transcriber import transcribe_audio, batch_transcribe +from core.job_queue import JobQueue, JobStatus +from core.gpu_health import HealthMonitor, check_gpu_health # Logging configuration logging.basicConfig(level=logging.INFO) @@ -28,19 +31,60 @@ except ImportError as e: logger.warning(f"GPU health check with reset not available: {e}") GPU_HEALTH_CHECK_AVAILABLE = False +# Global instances +job_queue: Optional[JobQueue] = None +health_monitor: Optional[HealthMonitor] = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """FastAPI lifespan context manager for startup/shutdown""" + global job_queue, health_monitor + + # Startup + logger.info("Starting job queue and health monitor...") + + # Initialize job queue + max_queue_size = int(os.getenv("JOB_QUEUE_MAX_SIZE", "100")) + metadata_dir = os.getenv("JOB_METADATA_DIR", "/media/raid/agents/tools/mcp-transcriptor/outputs/jobs") + job_queue = JobQueue(max_queue_size=max_queue_size, metadata_dir=metadata_dir) + job_queue.start() + logger.info(f"Job queue started (max_size={max_queue_size}, metadata_dir={metadata_dir})") + + # Initialize health monitor + health_check_enabled = os.getenv("GPU_HEALTH_CHECK_ENABLED", "true").lower() == "true" + if health_check_enabled: + check_interval = int(os.getenv("GPU_HEALTH_CHECK_INTERVAL_MINUTES", "10")) + health_monitor = HealthMonitor(check_interval_minutes=check_interval) + health_monitor.start() + logger.info(f"GPU health monitor started (interval={check_interval} minutes)") + + yield + + # Shutdown + logger.info("Shutting down job queue and health monitor...") + if job_queue: + job_queue.stop(wait_for_current=True) + logger.info("Job queue stopped") + if health_monitor: + health_monitor.stop() + logger.info("Health monitor stopped") + + # Create FastAPI app app = FastAPI( title="Whisper Speech Recognition API", - description="High-performance audio transcription API based on Faster Whisper", - version="0.1.1" + description="High-performance audio transcription API based on Faster Whisper with async job queue", + version="0.2.0", + lifespan=lifespan ) # Request/Response Models -class TranscribeRequest(BaseModel): +class SubmitJobRequest(BaseModel): audio_path: str = Field(..., description="Path to the audio file on the server") model_name: str = Field("large-v3", description="Whisper model name") - device: str = Field("auto", description="Execution device (cpu, cuda, auto)") + device: str = Field("auto", description="Execution device (cuda, auto)") compute_type: str = Field("auto", description="Computation type (float16, int8, auto)") language: Optional[str] = Field(None, description="Language code (zh, en, ja, etc.)") output_format: str = Field("txt", description="Output format (vtt, srt, json, txt)") @@ -50,31 +94,6 @@ class TranscribeRequest(BaseModel): output_directory: Optional[str] = Field(None, description="Output directory path") -class BatchTranscribeRequest(BaseModel): - audio_folder: str = Field(..., description="Path to folder containing audio files") - output_folder: Optional[str] = Field(None, description="Output folder path") - model_name: str = Field("large-v3", description="Whisper model name") - device: str = Field("auto", description="Execution device (cpu, cuda, auto)") - compute_type: str = Field("auto", description="Computation type (float16, int8, auto)") - language: Optional[str] = Field(None, description="Language code (zh, en, ja, etc.)") - output_format: str = Field("txt", description="Output format (vtt, srt, json, txt)") - beam_size: int = Field(5, description="Beam search size") - temperature: float = Field(0.0, description="Sampling temperature") - initial_prompt: Optional[str] = Field(None, description="Initial prompt text") - parallel_files: int = Field(1, description="Number of files to process in parallel") - - -class TranscribeResponse(BaseModel): - success: bool - message: str - output_path: Optional[str] = None - - -class BatchTranscribeResponse(BaseModel): - success: bool - summary: str - - # API Endpoints @app.get("/") @@ -82,13 +101,22 @@ async def root(): """Root endpoint with API information""" return { "name": "Whisper Speech Recognition API", - "version": "0.1.1", + "version": "0.2.0", + "description": "Async job queue-based transcription service", "endpoints": { + "GET /": "API information", "GET /health": "Health check", + "GET /health/gpu": "GPU health check", "GET /models": "Get available models information", - "POST /transcribe": "Transcribe a single audio file", - "POST /batch-transcribe": "Batch transcribe audio files", - "POST /upload-transcribe": "Upload and transcribe audio file" + "POST /jobs": "Submit transcription job (async)", + "GET /jobs/{job_id}": "Get job status", + "GET /jobs/{job_id}/result": "Get job result", + "GET /jobs": "List jobs with optional filtering" + }, + "workflow": { + "1": "Submit job via POST /jobs → receive job_id", + "2": "Poll status via GET /jobs/{job_id} → wait for status='completed'", + "3": "Get result via GET /jobs/{job_id}/result → retrieve transcription" } } @@ -110,17 +138,15 @@ async def get_models(): raise HTTPException(status_code=500, detail=f"Failed to get model info: {str(e)}") -@app.post("/transcribe", response_model=TranscribeResponse) -async def transcribe(request: TranscribeRequest): +@app.post("/jobs") +async def submit_job(request: SubmitJobRequest): """ - Transcribe a single audio file + Submit a transcription job for async processing. - The audio file must already exist on the server at the specified path. + Returns immediately with job_id. Poll GET /jobs/{job_id} for status. """ try: - logger.info(f"Received transcription request for: {request.audio_path}") - - result = transcribe_audio( + job_info = job_queue.submit_job( audio_path=request.audio_path, model_name=request.model_name, device=request.device, @@ -133,149 +159,267 @@ async def transcribe(request: TranscribeRequest): output_directory=request.output_directory ) - # Parse result to determine success - if result.startswith("Error") or "failed" in result.lower(): - return TranscribeResponse( - success=False, - message=result, - output_path=None - ) + return JSONResponse( + status_code=200, + content={ + **job_info, + "message": f"Job submitted successfully. Poll /jobs/{job_info['job_id']} for status." + } + ) - # Extract output path from success message - output_path = None - if "saved to:" in result: - output_path = result.split("saved to:")[1].strip() + except queue_module.Full: + # Queue is full + logger.warning("Job queue is full, rejecting request") + raise HTTPException( + status_code=503, + detail={ + "error": "Queue full", + "message": f"Job queue is full ({job_queue._max_queue_size}/{job_queue._max_queue_size}). Please try again later or contact administrator.", + "queue_size": job_queue._max_queue_size, + "max_queue_size": job_queue._max_queue_size + } + ) - return TranscribeResponse( - success=True, - message=result, - output_path=output_path + except FileNotFoundError as e: + # Invalid audio file + logger.error(f"Invalid audio file: {e}") + raise HTTPException( + status_code=400, + detail={ + "error": "Invalid audio file", + "message": str(e), + "audio_path": request.audio_path + } + ) + + except ValueError as e: + # CPU device rejected + logger.error(f"Invalid device parameter: {e}") + raise HTTPException( + status_code=400, + detail={ + "error": "Invalid device", + "message": str(e) + } + ) + + except RuntimeError as e: + # GPU health check failed + logger.error(f"GPU health check failed: {e}") + raise HTTPException( + status_code=500, + detail={ + "error": "GPU unavailable", + "message": f"Job rejected: {str(e)}" + } ) except Exception as e: - logger.error(f"Transcription failed: {str(e)}") - raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") + logger.error(f"Failed to submit job: {e}") + raise HTTPException( + status_code=500, + detail={ + "error": "Internal error", + "message": f"Failed to submit job: {str(e)}" + } + ) -@app.post("/batch-transcribe", response_model=BatchTranscribeResponse) -async def batch_transcribe_endpoint(request: BatchTranscribeRequest): +@app.get("/jobs/{job_id}") +async def get_job_status_endpoint(job_id: str): """ - Batch transcribe all audio files in a folder + Get the current status of a job. - Processes all supported audio files in the specified folder. + Returns job status including queue position, timestamps, and result path when completed. """ try: - logger.info(f"Received batch transcription request for: {request.audio_folder}") + status = job_queue.get_job_status(job_id) + return JSONResponse(status_code=200, content=status) - result = batch_transcribe( - audio_folder=request.audio_folder, - output_folder=request.output_folder, - model_name=request.model_name, - device=request.device, - compute_type=request.compute_type, - language=request.language, - output_format=request.output_format, - beam_size=request.beam_size, - temperature=request.temperature, - initial_prompt=request.initial_prompt, - parallel_files=request.parallel_files - ) - - # Check if there were errors - success = not result.startswith("Error") - - return BatchTranscribeResponse( - success=success, - summary=result + except KeyError: + raise HTTPException( + status_code=404, + detail={ + "error": "Job not found", + "message": f"Job ID '{job_id}' does not exist or has been cleaned up", + "job_id": job_id + } ) except Exception as e: - logger.error(f"Batch transcription failed: {str(e)}") - raise HTTPException(status_code=500, detail=f"Batch transcription failed: {str(e)}") + logger.error(f"Failed to get job status: {e}") + raise HTTPException( + status_code=500, + detail={ + "error": "Internal error", + "message": f"Failed to get job status: {str(e)}" + } + ) -@app.post("/upload-transcribe") -async def upload_and_transcribe( - file: UploadFile = File(...), - model_name: str = Form("large-v3"), - device: str = Form("auto"), - compute_type: str = Form("auto"), - language: Optional[str] = Form(None), - output_format: str = Form("txt"), - beam_size: int = Form(5), - temperature: float = Form(0.0), - initial_prompt: Optional[str] = Form(None) +@app.get("/jobs/{job_id}/result") +async def get_job_result_endpoint(job_id: str): + """ + Get the transcription result for a completed job. + + Returns the transcription text. Only works for jobs with status='completed'. + """ + try: + result_text = job_queue.get_job_result(job_id) + return JSONResponse( + status_code=200, + content={"job_id": job_id, "result": result_text} + ) + + except KeyError: + raise HTTPException( + status_code=404, + detail={ + "error": "Job not found", + "message": f"Job ID '{job_id}' does not exist or has been cleaned up", + "job_id": job_id + } + ) + + except ValueError as e: + # Job not completed yet + # Extract current status from error message + status_match = str(e).split("Current status: ") + current_status = status_match[1] if len(status_match) > 1 else "unknown" + + raise HTTPException( + status_code=409, + detail={ + "error": "Job not completed", + "message": f"Job is not completed yet. Current status: {current_status}. Please wait and poll again.", + "job_id": job_id, + "current_status": current_status + } + ) + + except FileNotFoundError as e: + # Result file missing + logger.error(f"Result file not found for job {job_id}: {e}") + raise HTTPException( + status_code=500, + detail={ + "error": "Result file not found", + "message": str(e), + "job_id": job_id + } + ) + + except Exception as e: + logger.error(f"Failed to get job result: {e}") + raise HTTPException( + status_code=500, + detail={ + "error": "Internal error", + "message": f"Failed to get job result: {str(e)}" + } + ) + + +@app.get("/jobs") +async def list_jobs_endpoint( + status: Optional[str] = None, + limit: int = 100 ): """ - Upload an audio file and transcribe it + List jobs with optional filtering. - This endpoint accepts file uploads via multipart/form-data. + Query parameters: + - status: Filter by status (queued, running, completed, failed) + - limit: Maximum number of results (default: 100) """ - import tempfile - import shutil - try: - # Create temporary directory for upload - temp_dir = tempfile.mkdtemp(prefix="whisper_upload_") + # Parse status filter + status_filter = None + if status: + try: + status_filter = JobStatus(status) + except ValueError: + raise HTTPException( + status_code=400, + detail={ + "error": "Invalid status", + "message": f"Invalid status value: {status}. Must be one of: queued, running, completed, failed" + } + ) - # Save uploaded file - file_ext = os.path.splitext(file.filename)[1] - temp_audio_path = os.path.join(temp_dir, f"upload{file_ext}") + jobs = job_queue.list_jobs(status_filter=status_filter, limit=limit) - with open(temp_audio_path, "wb") as buffer: - shutil.copyfileobj(file.file, buffer) - - logger.info(f"Uploaded file saved to: {temp_audio_path}") - - # Transcribe the uploaded file - result = transcribe_audio( - audio_path=temp_audio_path, - model_name=model_name, - device=device, - compute_type=compute_type, - language=language, - output_format=output_format, - beam_size=beam_size, - temperature=temperature, - initial_prompt=initial_prompt, - output_directory=temp_dir + return JSONResponse( + status_code=200, + content={ + "jobs": jobs, + "total": len(jobs), + "filters": { + "status": status, + "limit": limit + } + } ) - # Parse result - if result.startswith("Error") or "failed" in result.lower(): - # Clean up temp files - shutil.rmtree(temp_dir, ignore_errors=True) - raise HTTPException(status_code=500, detail=result) + except Exception as e: + logger.error(f"Failed to list jobs: {e}") + raise HTTPException( + status_code=500, + detail={ + "error": "Internal error", + "message": f"Failed to list jobs: {str(e)}" + } + ) - # Extract output path - output_path = None - if "saved to:" in result: - output_path = result.split("saved to:")[1].strip() - # Return the transcription file - if output_path and os.path.exists(output_path): - return FileResponse( - output_path, - media_type="text/plain", - filename=os.path.basename(output_path), - background=None # Don't delete yet, we'll clean up after - ) - else: - # Clean up temp files - shutil.rmtree(temp_dir, ignore_errors=True) - return JSONResponse(content={ - "success": True, - "message": result - }) +@app.get("/health/gpu") +async def gpu_health_check_endpoint(): + """ + Check GPU health by running a quick transcription test. + + Returns detailed GPU status including device name, memory, and test duration. + """ + try: + status = check_gpu_health(expected_device="auto") + + # Add interpretation + interpretation = "GPU is healthy and working correctly" + if not status.gpu_available: + interpretation = "GPU not available on this system" + elif not status.gpu_working: + interpretation = f"GPU available but not working correctly: {status.error}" + elif status.test_duration_seconds > 2.0: + interpretation = f"GPU working but performance degraded (test took {status.test_duration_seconds:.2f}s, expected <1s)" + + return JSONResponse( + status_code=200, + content={ + **status.to_dict(), + "interpretation": interpretation + } + ) except Exception as e: - logger.error(f"Upload and transcribe failed: {str(e)}") - # Clean up temp files on error - if 'temp_dir' in locals(): - shutil.rmtree(temp_dir, ignore_errors=True) - raise HTTPException(status_code=500, detail=f"Upload and transcribe failed: {str(e)}") - finally: - await file.close() + logger.error(f"GPU health check failed: {e}") + return JSONResponse( + status_code=200, # Still return 200 but with error details + content={ + "gpu_available": False, + "gpu_working": False, + "device_used": "unknown", + "device_name": "", + "memory_total_gb": 0.0, + "memory_available_gb": 0.0, + "test_duration_seconds": 0.0, + "timestamp": "", + "error": str(e), + "interpretation": f"GPU health check failed: {str(e)}" + } + ) + + + + if __name__ == "__main__": diff --git a/src/servers/whisper_server.py b/src/servers/whisper_server.py index e23c3e2..65bc871 100644 --- a/src/servers/whisper_server.py +++ b/src/servers/whisper_server.py @@ -7,10 +7,13 @@ Provides high-performance audio transcription with batch processing acceleration import os import sys import logging +import json +from typing import Optional from mcp.server.fastmcp import FastMCP from core.model_manager import get_model_info -from core.transcriber import transcribe_audio, batch_transcribe +from core.job_queue import JobQueue, JobStatus +from core.gpu_health import HealthMonitor, check_gpu_health # Log configuration logging.basicConfig(level=logging.INFO) @@ -24,93 +27,294 @@ except ImportError as e: logger.warning(f"GPU health check with reset not available: {e}") GPU_HEALTH_CHECK_AVAILABLE = False +# Global instances +job_queue: Optional[JobQueue] = None +health_monitor: Optional[HealthMonitor] = None + # Create FastMCP server instance mcp = FastMCP( name="fast-whisper-mcp-server", - version="0.1.1", + version="0.2.0", dependencies=["faster-whisper>=0.9.0", "torch==2.6.0+cu126", "torchaudio==2.6.0+cu126", "numpy>=1.20.0"] ) @mcp.tool() def get_model_info_api() -> str: """ - Get available Whisper model information + Get available Whisper model information and system configuration. + + Returns available models, devices, languages, and GPU information. """ return get_model_info() -@mcp.tool() -def transcribe(audio_path: str, model_name: str = "large-v3", device: str = "auto", - compute_type: str = "auto", language: str = None, output_format: str = "vtt", - beam_size: int = 5, temperature: float = 0.0, initial_prompt: str = None, - output_directory: str = None) -> str: - """ - Transcribe audio files using Faster Whisper - - Args: - audio_path: Path to the audio file - model_name: Model name (tiny, base, small, medium, large-v1, large-v2, large-v3) - device: Execution device (cpu, cuda, auto) - compute_type: Computation type (float16, int8, auto) - language: Language code (such as zh, en, ja, etc., auto-detect by default) - output_format: Output format (vtt, srt, json or txt) - beam_size: Beam search size, larger values may improve accuracy but reduce speed - temperature: Sampling temperature, greedy decoding - initial_prompt: Initial prompt text, can help the model better understand context - output_directory: Output directory path, defaults to the audio file's directory - - Returns: - str: Transcription result, in VTT subtitle or JSON format - """ - return transcribe_audio( - audio_path=audio_path, - model_name=model_name, - device=device, - compute_type=compute_type, - language=language, - output_format=output_format, - beam_size=beam_size, - temperature=temperature, - initial_prompt=initial_prompt, - output_directory=output_directory - ) @mcp.tool() -def batch_transcribe_audio(audio_folder: str, output_folder: str = None, model_name: str = "large-v3", - device: str = "auto", compute_type: str = "auto", language: str = None, - output_format: str = "vtt", beam_size: int = 5, temperature: float = 0.0, - initial_prompt: str = None, parallel_files: int = 1) -> str: +def transcribe_async( + audio_path: str, + model_name: str = "large-v3", + device: str = "auto", + compute_type: str = "auto", + language: Optional[str] = None, + output_format: str = "txt", + beam_size: int = 5, + temperature: float = 0.0, + initial_prompt: Optional[str] = None, + output_directory: Optional[str] = None +) -> str: """ - Batch transcribe audio files in a folder + Submit an audio file for asynchronous transcription. + + IMPORTANT: This tool returns immediately with a job_id. Use get_job_status() + to check progress and get_job_result() to retrieve the transcription. + + WORKFLOW FOR LLM AGENTS: + 1. Call this tool to submit the job + 2. You will receive a job_id and queue_position + 3. Poll get_job_status(job_id) every 5-10 seconds to check progress + 4. When status="completed", call get_job_result(job_id) to get transcription + + For long audio files (>10 minutes), expect processing to take several minutes. + You can check queue_position to estimate wait time (each job ~2-5 minutes). Args: - audio_folder: Path to the folder containing audio files - output_folder: Output folder path, defaults to a 'transcript' subfolder in audio_folder - model_name: Model name (tiny, base, small, medium, large-v1, large-v2, large-v3) - device: Execution device (cpu, cuda, auto) + audio_path: Path to audio file on server + model_name: Whisper model (tiny, base, small, medium, large-v3) + device: Execution device (cuda, auto) - cpu is rejected compute_type: Computation type (float16, int8, auto) - language: Language code (such as zh, en, ja, etc., auto-detect by default) - output_format: Output format (vtt, srt, json or txt) - beam_size: Beam search size, larger values may improve accuracy but reduce speed - temperature: Sampling temperature, 0 means greedy decoding - initial_prompt: Initial prompt text, can help the model better understand context - parallel_files: Number of files to process in parallel (only effective in CPU mode) + language: Language code (en, zh, ja, etc.) or auto-detect + output_format: Output format (txt, vtt, srt, json) + beam_size: Beam search size (larger=better quality, slower) + temperature: Sampling temperature (0.0=greedy) + initial_prompt: Optional prompt to guide transcription + output_directory: Where to save result (uses default if not specified) Returns: - str: Batch processing summary, including processing time and success rate + JSON string with job_id, status, queue_position, and instructions """ - return batch_transcribe( - audio_folder=audio_folder, - output_folder=output_folder, - model_name=model_name, - device=device, - compute_type=compute_type, - language=language, - output_format=output_format, - beam_size=beam_size, - temperature=temperature, - initial_prompt=initial_prompt, - parallel_files=parallel_files - ) + try: + job_info = job_queue.submit_job( + audio_path=audio_path, + model_name=model_name, + device=device, + compute_type=compute_type, + language=language, + output_format=output_format, + beam_size=beam_size, + temperature=temperature, + initial_prompt=initial_prompt, + output_directory=output_directory + ) + return json.dumps({ + **job_info, + "message": f"Job submitted successfully. Poll get_job_status('{job_info['job_id']}') for updates." + }, indent=2) + + except Exception as e: + error_type = type(e).__name__ + if "Full" in error_type or "queue is full" in str(e).lower(): + error_code = "QUEUE_FULL" + message = f"Job queue is full. Please try again in a few minutes. Error: {str(e)}" + elif "FileNotFoundError" in error_type or "not found" in str(e).lower(): + error_code = "INVALID_AUDIO_FILE" + message = f"Audio file not found or invalid. Error: {str(e)}" + elif "RuntimeError" in error_type or "GPU" in str(e): + error_code = "GPU_UNAVAILABLE" + message = f"GPU unavailable. Error: {str(e)}" + elif "ValueError" in error_type or "CPU" in str(e): + error_code = "INVALID_DEVICE" + message = f"Invalid device parameter. Error: {str(e)}" + else: + error_code = "INTERNAL_ERROR" + message = f"Failed to submit job. Error: {str(e)}" + + return json.dumps({ + "error": error_code, + "message": message + }, indent=2) + + +@mcp.tool() +def get_job_status(job_id: str) -> str: + """ + Check the status of a transcription job. + + Status values: + - "queued": Job is waiting in queue. Check queue_position. + - "running": Job is currently being processed. + - "completed": Transcription finished. Call get_job_result() to retrieve. + - "failed": Job failed. Check error field for details. + + Args: + job_id: Job ID from transcribe_async() + + Returns: + JSON string with detailed job status including: + - status, queue_position, timestamps, error (if any) + """ + try: + status = job_queue.get_job_status(job_id) + return json.dumps(status, indent=2) + + except KeyError: + return json.dumps({ + "error": "JOB_NOT_FOUND", + "message": f"Job ID '{job_id}' does not exist. Please check the job_id." + }, indent=2) + + except Exception as e: + return json.dumps({ + "error": "INTERNAL_ERROR", + "message": f"Failed to get job status. Error: {str(e)}" + }, indent=2) + + +@mcp.tool() +def get_job_result(job_id: str) -> str: + """ + Retrieve the transcription result for a completed job. + + IMPORTANT: Only call this when get_job_status() returns status="completed". + If the job is not completed, this will return an error. + + Args: + job_id: Job ID from transcribe_async() + + Returns: + Transcription text as a string + + Errors: + - "Job not found" if invalid job_id + - "Job not completed yet" if status is not "completed" + - "Result file not found" if transcription file is missing + """ + try: + result_text = job_queue.get_job_result(job_id) + return result_text # Return raw text, not JSON + + except KeyError: + return json.dumps({ + "error": "JOB_NOT_FOUND", + "message": f"Job ID '{job_id}' does not exist." + }, indent=2) + + except ValueError as e: + # Extract status from error message + status_match = str(e).split("Current status: ") + current_status = status_match[1] if len(status_match) > 1 else "unknown" + return json.dumps({ + "error": "JOB_NOT_COMPLETED", + "message": f"Job is not completed yet. Current status: {current_status}. Please wait and check again.", + "current_status": current_status + }, indent=2) + + except FileNotFoundError as e: + return json.dumps({ + "error": "RESULT_FILE_NOT_FOUND", + "message": f"Result file not found. Error: {str(e)}" + }, indent=2) + + except Exception as e: + return json.dumps({ + "error": "INTERNAL_ERROR", + "message": f"Failed to get job result. Error: {str(e)}" + }, indent=2) + + +@mcp.tool() +def list_transcription_jobs( + status_filter: Optional[str] = None, + limit: int = 20 +) -> str: + """ + List transcription jobs with optional filtering. + + Useful for: + - Checking all your submitted jobs + - Finding completed jobs + - Monitoring queue status + + Args: + status_filter: Filter by status (queued, running, completed, failed) + limit: Maximum number of jobs to return (default: 20) + + Returns: + JSON string with list of jobs + """ + try: + # Parse status filter + status_obj = None + if status_filter: + try: + status_obj = JobStatus(status_filter) + except ValueError: + return json.dumps({ + "error": "INVALID_STATUS", + "message": f"Invalid status: {status_filter}. Must be one of: queued, running, completed, failed" + }, indent=2) + + jobs = job_queue.list_jobs(status_filter=status_obj, limit=limit) + + return json.dumps({ + "jobs": jobs, + "total": len(jobs), + "filters": { + "status": status_filter, + "limit": limit + } + }, indent=2) + + except Exception as e: + return json.dumps({ + "error": "INTERNAL_ERROR", + "message": f"Failed to list jobs. Error: {str(e)}" + }, indent=2) + + +@mcp.tool() +def check_gpu_health() -> str: + """ + Test GPU availability and performance by running a quick transcription. + + This tool loads the tiny model and transcribes a 1-second test audio file + to verify the GPU is working correctly. + + Use this when: + - You want to verify GPU is available before submitting large jobs + - You suspect GPU performance issues + - For monitoring/debugging purposes + + Returns: + JSON string with detailed GPU status including: + - gpu_available, gpu_working, device_name, memory_info + - test_duration_seconds (GPU: <1s, CPU: 5-10s) + - interpretation message + + Note: If this returns gpu_working=false, transcriptions will be very slow. + """ + try: + status = check_gpu_health(expected_device="auto") + + # Add interpretation + interpretation = "GPU is healthy and working correctly" + if not status.gpu_available: + interpretation = "GPU not available on this system" + elif not status.gpu_working: + interpretation = f"GPU available but not working correctly: {status.error}" + elif status.test_duration_seconds > 2.0: + interpretation = f"GPU working but performance degraded (test took {status.test_duration_seconds:.2f}s, expected <1s)" + + result = status.to_dict() + result["interpretation"] = interpretation + + return json.dumps(result, indent=2) + + except Exception as e: + return json.dumps({ + "error": "GPU_CHECK_FAILED", + "message": f"GPU health check failed. Error: {str(e)}", + "gpu_available": False, + "gpu_working": False + }, indent=2) if __name__ == "__main__": print("starting mcp server for whisper stt transcriptor") @@ -141,4 +345,30 @@ if __name__ == "__main__": else: logger.warning("GPU health check not available, starting without GPU validation") - mcp.run() \ No newline at end of file + # Initialize job queue + logger.info("Initializing job queue...") + max_queue_size = int(os.getenv("JOB_QUEUE_MAX_SIZE", "100")) + metadata_dir = os.getenv("JOB_METADATA_DIR", "/media/raid/agents/tools/mcp-transcriptor/outputs/jobs") + job_queue = JobQueue(max_queue_size=max_queue_size, metadata_dir=metadata_dir) + job_queue.start() + logger.info(f"Job queue started (max_size={max_queue_size}, metadata_dir={metadata_dir})") + + # Initialize health monitor + health_check_enabled = os.getenv("GPU_HEALTH_CHECK_ENABLED", "true").lower() == "true" + if health_check_enabled: + check_interval = int(os.getenv("GPU_HEALTH_CHECK_INTERVAL_MINUTES", "10")) + health_monitor = HealthMonitor(check_interval_minutes=check_interval) + health_monitor.start() + logger.info(f"GPU health monitor started (interval={check_interval} minutes)") + + try: + mcp.run() + finally: + # Cleanup on shutdown + logger.info("Shutting down...") + if job_queue: + job_queue.stop(wait_for_current=True) + logger.info("Job queue stopped") + if health_monitor: + health_monitor.stop() + logger.info("Health monitor stopped") \ No newline at end of file diff --git a/src/utils/audio_processor.py b/src/utils/audio_processor.py index 0e45b1f..9801a46 100644 --- a/src/utils/audio_processor.py +++ b/src/utils/audio_processor.py @@ -12,40 +12,46 @@ from faster_whisper import decode_audio # Log configuration logger = logging.getLogger(__name__) -def validate_audio_file(audio_path: str) -> str: +def validate_audio_file(audio_path: str) -> None: """ Validate if an audio file is valid Args: audio_path: Path to the audio file + Raises: + FileNotFoundError: If audio file doesn't exist + ValueError: If audio file format is unsupported or file is empty + OSError: If file size cannot be checked + Returns: - str: Validation result, "ok" indicates validation passed, otherwise returns error message + None: If validation passes """ # Validate parameters if not os.path.exists(audio_path): - return f"Error: Audio file does not exist: {audio_path}" + raise FileNotFoundError(f"Audio file does not exist: {audio_path}") # Validate file format supported_formats = [".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac"] file_ext = os.path.splitext(audio_path)[1].lower() if file_ext not in supported_formats: - return f"Error: Unsupported audio format: {file_ext}. Supported formats: {', '.join(supported_formats)}" + raise ValueError(f"Unsupported audio format: {file_ext}. Supported formats: {', '.join(supported_formats)}") # Validate file size try: file_size = os.path.getsize(audio_path) if file_size == 0: - return f"Error: Audio file is empty: {audio_path}" + raise ValueError(f"Audio file is empty: {audio_path}") # Warning for large files (over 1GB) if file_size > 1024 * 1024 * 1024: logger.warning(f"Warning: File size exceeds 1GB, may require longer processing time: {audio_path}") - except Exception as e: + except OSError as e: logger.error(f"Failed to check file size: {str(e)}") - return f"Error: Failed to check file size: {str(e)}" + raise OSError(f"Failed to check file size: {str(e)}") - return "ok" + # Validation passed + return None def process_audio(audio_path: str) -> Union[str, Any]: """ diff --git a/test_phase2.py b/test_phase2.py new file mode 100755 index 0000000..0ccef18 --- /dev/null +++ b/test_phase2.py @@ -0,0 +1,535 @@ +#!/usr/bin/env python3 +""" +Test Phase 2: Async Job Queue Integration + +Tests the async job queue system for both API and MCP servers. +Validates all new endpoints and error handling. +""" + +import os +import sys +import time +import json +import logging +import requests +from pathlib import Path + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# Add src to path +sys.path.insert(0, str(Path(__file__).parent / "src")) + +# Color codes for terminal output +class Colors: + GREEN = '\033[92m' + RED = '\033[91m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + END = '\033[0m' + BOLD = '\033[1m' + +def print_success(msg): + print(f"{Colors.GREEN}✓ {msg}{Colors.END}") + +def print_error(msg): + print(f"{Colors.RED}✗ {msg}{Colors.END}") + +def print_info(msg): + print(f"{Colors.BLUE}ℹ {msg}{Colors.END}") + +def print_section(msg): + print(f"\n{Colors.BOLD}{Colors.YELLOW}{'='*70}{Colors.END}") + print(f"{Colors.BOLD}{Colors.YELLOW}{msg}{Colors.END}") + print(f"{Colors.BOLD}{Colors.YELLOW}{'='*70}{Colors.END}\n") + + +class Phase2Tester: + def __init__(self, api_url="http://localhost:8000"): + self.api_url = api_url + self.test_results = [] + + def test(self, name, func): + """Run a test and record result""" + try: + logger.info(f"Testing: {name}") + print_info(f"Testing: {name}") + func() + logger.info(f"PASSED: {name}") + print_success(f"PASSED: {name}") + self.test_results.append((name, True, None)) + return True + except AssertionError as e: + logger.error(f"FAILED: {name} - {str(e)}") + print_error(f"FAILED: {name}") + print_error(f" Reason: {str(e)}") + self.test_results.append((name, False, str(e))) + return False + except Exception as e: + logger.error(f"ERROR: {name} - {str(e)}") + print_error(f"ERROR: {name}") + print_error(f" Exception: {str(e)}") + self.test_results.append((name, False, f"Exception: {str(e)}")) + return False + + def print_summary(self): + """Print test summary""" + print_section("TEST SUMMARY") + + passed = sum(1 for _, result, _ in self.test_results if result) + failed = len(self.test_results) - passed + + for name, result, error in self.test_results: + if result: + print_success(f"{name}") + else: + print_error(f"{name}") + if error: + print(f" {error}") + + print(f"\n{Colors.BOLD}Total: {len(self.test_results)} | ", end="") + print(f"{Colors.GREEN}Passed: {passed}{Colors.END} | ", end="") + print(f"{Colors.RED}Failed: {failed}{Colors.END}\n") + + return failed == 0 + + # ======================================================================== + # API Server Tests + # ======================================================================== + + def test_api_root_endpoint(self): + """Test GET / returns new API information""" + logger.info(f"GET {self.api_url}/") + resp = requests.get(f"{self.api_url}/") + logger.info(f"Response status: {resp.status_code}") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + logger.info(f"Response data: {json.dumps(data, indent=2)}") + assert data["version"] == "0.2.0", "Version should be 0.2.0" + assert "POST /jobs" in str(data["endpoints"]), "Should have POST /jobs endpoint" + assert "workflow" in data, "Should have workflow documentation" + + def test_api_health_endpoint(self): + """Test GET /health still works""" + logger.info(f"GET {self.api_url}/health") + resp = requests.get(f"{self.api_url}/health") + logger.info(f"Response status: {resp.status_code}") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + logger.info(f"Response data: {data}") + assert data["status"] == "healthy", "Health check should return healthy" + + def test_api_models_endpoint(self): + """Test GET /models still works""" + logger.info(f"GET {self.api_url}/models") + resp = requests.get(f"{self.api_url}/models") + logger.info(f"Response status: {resp.status_code}") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + logger.info(f"Available models: {data.get('available_models', [])}") + assert "available_models" in data, "Should return available models" + + def test_api_gpu_health_endpoint(self): + """Test GET /health/gpu returns GPU status""" + logger.info(f"GET {self.api_url}/health/gpu") + resp = requests.get(f"{self.api_url}/health/gpu") + logger.info(f"Response status: {resp.status_code}") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + logger.info(f"GPU health: {json.dumps(data, indent=2)}") + assert "gpu_available" in data, "Should have gpu_available field" + assert "gpu_working" in data, "Should have gpu_working field" + assert "interpretation" in data, "Should have interpretation field" + + print_info(f" GPU Status: {data.get('interpretation', 'unknown')}") + + def test_api_submit_job_invalid_audio(self): + """Test POST /jobs with invalid audio path returns 400""" + payload = { + "audio_path": "/nonexistent/file.mp3", + "model_name": "tiny", + "output_format": "txt" + } + + logger.info(f"POST {self.api_url}/jobs with invalid audio path") + logger.info(f"Payload: {json.dumps(payload, indent=2)}") + resp = requests.post(f"{self.api_url}/jobs", json=payload) + logger.info(f"Response status: {resp.status_code}") + logger.info(f"Response: {resp.json()}") + assert resp.status_code == 400, f"Expected 400, got {resp.status_code}" + + data = resp.json() + assert "error" in data["detail"], "Should have error field" + assert data["detail"]["error"] == "Invalid audio file", f"Wrong error type: {data['detail']['error']}" + + print_info(f" Error message: {data['detail']['message'][:50]}...") + + def test_api_submit_job_cpu_device_rejected(self): + """Test POST /jobs with device=cpu is rejected (400)""" + # Create a test audio file first + logger.info("Creating test audio file...") + test_audio = self._create_test_audio_file() + logger.info(f"Test audio created at: {test_audio}") + + payload = { + "audio_path": test_audio, + "model_name": "tiny", + "device": "cpu", + "output_format": "txt" + } + + logger.info(f"POST {self.api_url}/jobs with device=cpu") + logger.info(f"Payload: {json.dumps(payload, indent=2)}") + resp = requests.post(f"{self.api_url}/jobs", json=payload) + logger.info(f"Response status: {resp.status_code}") + logger.info(f"Response: {resp.json()}") + assert resp.status_code == 400, f"Expected 400, got {resp.status_code}" + + data = resp.json() + assert "error" in data["detail"], "Should have error field" + assert "Invalid device" in data["detail"]["error"] or "CPU" in data["detail"]["message"], \ + "Should reject CPU device" + + def test_api_submit_job_success(self): + """Test POST /jobs with valid audio returns job_id""" + logger.info("Creating test audio file...") + test_audio = self._create_test_audio_file() + logger.info(f"Test audio created at: {test_audio}") + + payload = { + "audio_path": test_audio, + "model_name": "tiny", + "device": "auto", + "output_format": "txt" + } + + logger.info(f"POST {self.api_url}/jobs with valid audio") + logger.info(f"Payload: {json.dumps(payload, indent=2)}") + resp = requests.post(f"{self.api_url}/jobs", json=payload) + logger.info(f"Response status: {resp.status_code}") + logger.info(f"Response: {json.dumps(resp.json(), indent=2)}") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + assert "job_id" in data, "Should return job_id" + assert "status" in data, "Should return status" + assert data["status"] == "queued", f"Status should be queued, got {data['status']}" + assert "queue_position" in data, "Should return queue_position" + assert "message" in data, "Should return message" + + logger.info(f"Job submitted successfully: {data['job_id']}") + print_info(f" Job ID: {data['job_id']}") + print_info(f" Queue position: {data['queue_position']}") + + # Store job_id for later tests + self.test_job_id = data["job_id"] + + def test_api_get_job_status(self): + """Test GET /jobs/{job_id} returns job status""" + if not hasattr(self, 'test_job_id'): + print_info(" Skipping (no test_job_id from previous test)") + return + + resp = requests.get(f"{self.api_url}/jobs/{self.test_job_id}") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + assert "job_id" in data, "Should return job_id" + assert "status" in data, "Should return status" + assert data["status"] in ["queued", "running", "completed", "failed"], \ + f"Invalid status: {data['status']}" + + print_info(f" Status: {data['status']}") + + def test_api_get_job_status_not_found(self): + """Test GET /jobs/{job_id} with invalid ID returns 404""" + fake_job_id = "00000000-0000-0000-0000-000000000000" + + resp = requests.get(f"{self.api_url}/jobs/{fake_job_id}") + assert resp.status_code == 404, f"Expected 404, got {resp.status_code}" + + data = resp.json() + assert "error" in data["detail"], "Should have error field" + assert data["detail"]["error"] == "Job not found", f"Wrong error: {data['detail']['error']}" + + def test_api_get_job_result_not_completed(self): + """Test GET /jobs/{job_id}/result when job not completed returns 409""" + if not hasattr(self, 'test_job_id'): + print_info(" Skipping (no test_job_id from previous test)") + return + + # Check current status + status_resp = requests.get(f"{self.api_url}/jobs/{self.test_job_id}") + current_status = status_resp.json()["status"] + + if current_status == "completed": + print_info(" Skipping (job already completed)") + return + + resp = requests.get(f"{self.api_url}/jobs/{self.test_job_id}/result") + assert resp.status_code == 409, f"Expected 409, got {resp.status_code}" + + data = resp.json() + assert "error" in data["detail"], "Should have error field" + assert data["detail"]["error"] == "Job not completed", f"Wrong error: {data['detail']['error']}" + assert "current_status" in data["detail"], "Should include current_status" + + def test_api_list_jobs(self): + """Test GET /jobs returns job list""" + resp = requests.get(f"{self.api_url}/jobs") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + assert "jobs" in data, "Should have jobs field" + assert "total" in data, "Should have total field" + assert isinstance(data["jobs"], list), "Jobs should be a list" + + print_info(f" Total jobs: {data['total']}") + + def test_api_list_jobs_with_filter(self): + """Test GET /jobs?status=queued filters by status""" + resp = requests.get(f"{self.api_url}/jobs?status=queued&limit=10") + assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" + + data = resp.json() + assert "jobs" in data, "Should have jobs field" + assert "filters" in data, "Should have filters field" + assert data["filters"]["status"] == "queued", "Filter should be applied" + + # All returned jobs should be queued + for job in data["jobs"]: + assert job["status"] == "queued", f"Job {job['job_id']} has wrong status: {job['status']}" + + def test_api_wait_for_job_completion(self): + """Test waiting for job to complete and retrieving result""" + if not hasattr(self, 'test_job_id'): + logger.warning("Skipping - no test_job_id from previous test") + print_info(" Skipping (no test_job_id from previous test)") + return + + logger.info(f"Waiting for job {self.test_job_id} to complete (max 60s)...") + print_info(" Waiting for job to complete (max 60s)...") + + max_wait = 60 + start_time = time.time() + + while time.time() - start_time < max_wait: + resp = requests.get(f"{self.api_url}/jobs/{self.test_job_id}") + data = resp.json() + status = data["status"] + elapsed = int(time.time() - start_time) + + logger.info(f"Job status: {status} (elapsed: {elapsed}s)") + print_info(f" Status: {status} (elapsed: {elapsed}s)") + + if status == "completed": + logger.info("Job completed successfully!") + print_success(" Job completed!") + + # Now get the result + logger.info("Fetching job result...") + result_resp = requests.get(f"{self.api_url}/jobs/{self.test_job_id}/result") + logger.info(f"Result response status: {result_resp.status_code}") + assert result_resp.status_code == 200, f"Expected 200, got {result_resp.status_code}" + + result_data = result_resp.json() + logger.info(f"Result data keys: {result_data.keys()}") + assert "result" in result_data, "Should have result field" + assert len(result_data["result"]) > 0, "Result should not be empty" + + actual_text = result_data["result"].strip() + logger.info(f"Transcription result: '{actual_text}'") + print_info(f" Transcription: '{actual_text}'") + return + + elif status == "failed": + error_msg = f"Job failed: {data.get('error', 'unknown error')}" + logger.error(error_msg) + raise AssertionError(error_msg) + + time.sleep(2) + + error_msg = f"Job did not complete within {max_wait}s" + logger.error(error_msg) + raise AssertionError(error_msg) + + # ======================================================================== + # MCP Server Tests (Import-based) + # ======================================================================== + + def test_mcp_imports(self): + """Test MCP server modules can be imported""" + try: + logger.info("Importing MCP server module...") + from servers import whisper_server + + logger.info("Checking for new async tools...") + assert hasattr(whisper_server, 'transcribe_async'), "Should have transcribe_async tool" + assert hasattr(whisper_server, 'get_job_status'), "Should have get_job_status tool" + assert hasattr(whisper_server, 'get_job_result'), "Should have get_job_result tool" + assert hasattr(whisper_server, 'list_transcription_jobs'), "Should have list_transcription_jobs tool" + assert hasattr(whisper_server, 'check_gpu_health'), "Should have check_gpu_health tool" + assert hasattr(whisper_server, 'get_model_info_api'), "Should have get_model_info_api tool" + logger.info("All new tools found!") + + # Verify old tools are removed + logger.info("Verifying old tools are removed...") + assert not hasattr(whisper_server, 'transcribe'), "Old transcribe tool should be removed" + assert not hasattr(whisper_server, 'batch_transcribe_audio'), "Old batch_transcribe_audio tool should be removed" + logger.info("Old tools successfully removed!") + + except ImportError as e: + logger.error(f"Failed to import MCP server: {e}") + raise AssertionError(f"Failed to import MCP server: {e}") + + def test_job_queue_integration(self): + """Test JobQueue integration is working""" + from core.job_queue import JobQueue, JobStatus + + # Create a test queue + test_queue = JobQueue(max_queue_size=5, metadata_dir="/tmp/test_job_queue") + + try: + # Verify it can be started + test_queue.start() + assert test_queue._worker_thread is not None, "Worker thread should be created" + assert test_queue._worker_thread.is_alive(), "Worker thread should be running" + + finally: + # Clean up + test_queue.stop(wait_for_current=False) + + def test_health_monitor_integration(self): + """Test HealthMonitor integration is working""" + from core.gpu_health import HealthMonitor + + # Create a test monitor + test_monitor = HealthMonitor(check_interval_minutes=60) # Long interval + + try: + # Verify it can be started + test_monitor.start() + assert test_monitor._thread is not None, "Monitor thread should be created" + assert test_monitor._thread.is_alive(), "Monitor thread should be running" + + # Check we can get status + status = test_monitor.get_latest_status() + assert status is not None, "Should have initial status" + + finally: + # Clean up + test_monitor.stop() + + # ======================================================================== + # Helper Methods + # ======================================================================== + + def _create_test_audio_file(self): + """Get the path to the test audio file""" + test_audio_path = "/home/uad/agents/tools/mcp-transcriptor/data/test.mp3" + if not os.path.exists(test_audio_path): + raise FileNotFoundError(f"Test audio file not found: {test_audio_path}") + return test_audio_path + + +def main(): + print_section("PHASE 2: ASYNC JOB QUEUE INTEGRATION TESTS") + logger.info("=" * 70) + logger.info("PHASE 2: ASYNC JOB QUEUE INTEGRATION TESTS") + logger.info("=" * 70) + + # Check if API server is running + api_url = os.getenv("API_URL", "http://localhost:8000") + logger.info(f"Testing API server at: {api_url}") + print_info(f"Testing API server at: {api_url}") + + try: + logger.info("Checking API server health...") + resp = requests.get(f"{api_url}/health", timeout=2) + logger.info(f"Health check status: {resp.status_code}") + if resp.status_code != 200: + logger.error(f"API server not responding correctly at {api_url}") + print_error(f"API server not responding correctly at {api_url}") + print_error("Please start the API server with: ./run_api_server.sh") + return 1 + except requests.exceptions.RequestException as e: + logger.error(f"Cannot connect to API server: {e}") + print_error(f"Cannot connect to API server at {api_url}") + print_error("Please start the API server with: ./run_api_server.sh") + return 1 + + logger.info(f"API server is running at {api_url}") + print_success(f"API server is running at {api_url}") + + # Create tester + tester = Phase2Tester(api_url=api_url) + + # ======================================================================== + # Run API Tests + # ======================================================================== + print_section("API SERVER TESTS") + logger.info("Starting API server tests...") + + tester.test("API Root Endpoint", tester.test_api_root_endpoint) + tester.test("API Health Endpoint", tester.test_api_health_endpoint) + tester.test("API Models Endpoint", tester.test_api_models_endpoint) + tester.test("API GPU Health Endpoint", tester.test_api_gpu_health_endpoint) + + print_section("API JOB SUBMISSION TESTS") + + tester.test("Submit Job - Invalid Audio (400)", tester.test_api_submit_job_invalid_audio) + tester.test("Submit Job - CPU Device Rejected (400)", tester.test_api_submit_job_cpu_device_rejected) + tester.test("Submit Job - Success (200)", tester.test_api_submit_job_success) + + print_section("API JOB STATUS TESTS") + + tester.test("Get Job Status - Success", tester.test_api_get_job_status) + tester.test("Get Job Status - Not Found (404)", tester.test_api_get_job_status_not_found) + tester.test("Get Job Result - Not Completed (409)", tester.test_api_get_job_result_not_completed) + + print_section("API JOB LISTING TESTS") + + tester.test("List Jobs", tester.test_api_list_jobs) + tester.test("List Jobs - With Filter", tester.test_api_list_jobs_with_filter) + + print_section("API JOB COMPLETION TEST") + + tester.test("Wait for Job Completion & Get Result", tester.test_api_wait_for_job_completion) + + # ======================================================================== + # Run MCP Tests + # ======================================================================== + print_section("MCP SERVER TESTS") + logger.info("Starting MCP server tests...") + + tester.test("MCP Module Imports", tester.test_mcp_imports) + tester.test("JobQueue Integration", tester.test_job_queue_integration) + tester.test("HealthMonitor Integration", tester.test_health_monitor_integration) + + # ======================================================================== + # Print Summary + # ======================================================================== + logger.info("All tests completed, generating summary...") + success = tester.print_summary() + + if success: + logger.info("ALL TESTS PASSED!") + print_section("ALL TESTS PASSED! ✓") + return 0 + else: + logger.error("SOME TESTS FAILED!") + print_section("SOME TESTS FAILED! ✗") + return 1 + + +if __name__ == "__main__": + sys.exit(main())