Major features: - GPU auto-reset on CUDA errors with cooldown protection (handles sleep/wake) - Async job queue system for long-running transcriptions - Comprehensive GPU health monitoring with real model tests - Phase 1 component testing with detailed logging New modules: - src/core/gpu_reset.py: GPU driver reset with 5-min cooldown - src/core/gpu_health.py: Real GPU health checks using model inference - src/core/job_queue.py: FIFO queue with background worker and persistence - src/utils/test_audio_generator.py: Test audio generation for GPU checks - test_phase1.py: Component tests with logging - reset_gpu.sh: GPU driver reset script Updates: - CLAUDE.md: Added GPU auto-reset docs and passwordless sudo setup - requirements.txt: Updated to PyTorch CUDA 12.4 - Model manager: Integrated GPU health check with reset - Both servers: Added startup GPU validation with auto-reset - Startup scripts: Added GPU_RESET_COOLDOWN_MINUTES env var
46 KiB
Development Plan: Async Job Queue & GPU Health Monitoring
Version: 1.0 Date: 2025-10-07 Status: Implementation Ready
Table of Contents
- Executive Summary
- Problem Statement
- Solution Architecture
- Component Specifications
- Data Structures
- API Specifications
- Implementation Phases
- Testing Strategy
- Environment Variables
- Error Handling
Executive Summary
This plan introduces an asynchronous job queue system with GPU health monitoring to address two critical production issues:
- HTTP Request Timeouts: Long audio transcriptions (10+ minutes) cause client timeouts
- Silent GPU Failures: GPU driver issues cause models to fall back to CPU silently, resulting in 10-100x slower processing
Solution Overview
- Async Job Queue: FIFO queue with immediate response, background processing, and disk persistence
- GPU Health Monitoring: Real transcription tests with tiny model, periodic monitoring, and strict failure handling
- Clean API: Async-only endpoints (REST + MCP) optimized for LLM agents
- Zero External Dependencies: Uses Python stdlib (threading, queue, json) only
Problem Statement
Problem 1: Request Timeout Issues
Current Behavior:
Client → POST /transcribe → [waits 10+ minutes] → Timeout
Impact:
- Clients experience HTTP timeouts on long audio files
- No way to check progress
- Failed requests waste GPU time
Root Cause:
- Synchronous request/response pattern
- HTTP clients have timeout limits (30-120 seconds typical)
- Transcription can take 5-20+ minutes for long files
Problem 2: Silent GPU Fallback
Current Behavior:
# model_manager.py:64-66
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
Issue:
torch.cuda.is_available()can returnTruebut model loading can still fail- GPU driver issues, OOM errors, or CUDA incompatibilities cause silent fallback to CPU
- Current
test_gpu_driver()only tests tensor operations, not model loading - Processing becomes 10-100x slower without notification
Impact:
- Users expect 2-minute transcription, get 30-minute CPU transcription
- No error message, just extremely slow processing
- Wastes resources and user time
Solution Architecture
High-Level Design
┌─────────────────────────────────────────────────────────────────┐
│ Client (HTTP / MCP) │
└───────────────────┬─────────────────────────────────────────────┘
│
│ Submit Job
↓
┌─────────────────────────────────────────────────────────────────┐
│ API Server / MCP Server │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 1. Validate Request │ │
│ │ 2. Check GPU Health (if device=cuda) │ │
│ │ 3. Generate job_id │ │
│ │ 4. Add to Queue │ │
│ │ 5. Save Job Metadata to Disk │ │
│ │ 6. Return {job_id, status: "queued", queue_position} │ │
│ └──────────────────────────────────────────────────────────┘ │
└───────────────────┬─────────────────────────────────────────────┘
│
│ Job in Queue
↓
┌─────────────────────────────────────────────────────────────────┐
│ Job Queue Manager │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ In-Memory Queue (queue.Queue) │ │
│ │ - FIFO ordering │ │
│ │ - Thread-safe │ │
│ │ - Max size limit from env var │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Background Worker Thread (Single Worker) │ │
│ │ 1. Pop job from queue │ │
│ │ 2. Update status → "running" │ │
│ │ 3. Call transcribe_audio() │ │
│ │ 4. Update status → "completed"/"failed" │ │
│ │ 5. Save result metadata │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Disk Persistence Layer │ │
│ │ - One JSON file per job: {job_id}.json │ │
│ │ - Survives server restarts │ │
│ │ - Load on startup │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ GPU Health Monitor │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Startup Health Check │ │
│ │ 1. Generate 1-second test audio │ │
│ │ 2. Load tiny model │ │
│ │ 3. Transcribe test audio │ │
│ │ 4. Time execution (GPU: <1s, CPU: 5-10s) │ │
│ │ 5. If expected=cuda but got=cpu → REJECT │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Periodic Background Monitoring (Every 10 min) │ │
│ │ - Re-run health check │ │
│ │ - Log warnings if degradation detected │ │
│ │ - Store health history │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ On-Demand Health Check │ │
│ │ - Exposed via /health/gpu endpoint │ │
│ │ - Returns detailed GPU status │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Client Interaction Flow
Client Server Background Worker
| | |
|-- POST /jobs (submit) ------->| |
| |-- Add to queue --------------->|
| |-- Save to disk |
|<-- {job_id, queued, pos:3} ---| |
| | |
| | [Processing job 1]
| | |
|-- GET /jobs/{id} (poll) ----->| |
|<-- {status: queued, pos:2} ---| |
| | |
| [Wait 10 seconds] | [Processing job 2]
| | |
|-- GET /jobs/{id} (poll) ----->| |
|<-- {status: queued, pos:1} ---| |
| | |
| [Wait 10 seconds] | |
| | |
|-- GET /jobs/{id} (poll) ----->| [Start our job]
|<-- {status: running} ---------| |
| | |
| [Wait 30 seconds] | [Transcribing...]
| | |
|-- GET /jobs/{id} (poll) ----->| |
|<-- {status: running} ---------| |
| | |
| [Wait 30 seconds] | [Job completed]
| |<-- Update status --------------|
| |<-- Save result |
| | |
|-- GET /jobs/{id} (poll) ----->| |
|<-- {status: completed} -------| |
| | |
|-- GET /jobs/{id}/result ----->| |
|<-- Transcription text --------| |
| | |
Component Specifications
Component 1: Test Audio Generator
File: src/utils/test_audio_generator.py
Purpose: Generate synthetic test audio programmatically (no need to bundle .mp3 files)
Key Functions:
def generate_test_audio() -> str:
"""
Generate a 1-second test audio file for GPU health checks.
Returns:
str: Path to temporary audio file
Implementation:
- Generate 1 second of 440Hz sine wave (A note)
- 16kHz sample rate, mono
- Save as WAV format (simplest)
- Store in system temp directory
- Reuse same file if exists (cache)
"""
Dependencies:
- numpy (already installed)
- scipy.io.wavfile (stdlib)
- tempfile (stdlib)
Component 2: GPU Health Monitor
File: src/core/gpu_health.py
Purpose: Test GPU functionality with actual model loading and transcription
Key Classes/Functions:
class GPUHealthStatus:
"""Data class for health check results"""
gpu_available: bool # torch.cuda.is_available()
gpu_working: bool # Model actually loaded on GPU
device_used: str # "cuda" or "cpu"
device_name: str # GPU name if available
memory_total_gb: float # Total GPU memory
memory_available_gb: float # Available GPU memory
test_duration_seconds: float # How long test took
timestamp: str # ISO timestamp
error: str | None # Error message if any
def check_gpu_health(expected_device: str = "auto") -> GPUHealthStatus:
"""
Comprehensive GPU health check using real model + transcription.
Args:
expected_device: Expected device ("auto", "cuda", "cpu")
Returns:
GPUHealthStatus object
Raises:
RuntimeError: If expected_device="cuda" but GPU test fails
Implementation Steps:
1. Generate test audio (1 second)
2. Load tiny model with requested device
3. Transcribe test audio
4. Time the operation
5. Verify model actually ran on GPU (check torch.cuda.memory_allocated)
6. CRITICAL: If expected_device="cuda" but used="cpu" → raise RuntimeError
7. Return detailed status
Performance Expectations:
- GPU (tiny model): 0.3-1.0 seconds
- CPU (tiny model): 3-10 seconds
- If GPU test takes >2 seconds, likely running on CPU
"""
class HealthMonitor:
"""Background thread for periodic GPU health monitoring"""
def __init__(self, check_interval_minutes: int = 10):
"""Initialize health monitor"""
def start(self):
"""Start background monitoring thread"""
def stop(self):
"""Stop background monitoring thread"""
def get_latest_status(self) -> GPUHealthStatus:
"""Get most recent health check result"""
def get_health_history(self, limit: int = 10) -> List[GPUHealthStatus]:
"""Get recent health check history"""
Critical Error Handling:
# In check_gpu_health()
if expected_device == "cuda" and actual_device == "cpu":
error_msg = (
"GPU device requested but model loaded on CPU. "
"This indicates GPU driver issues or insufficient memory. "
"Transcription would be 10-100x slower than expected. "
"Please check CUDA installation and GPU availability."
)
raise RuntimeError(error_msg)
Component 3: Job Queue Manager
File: src/core/job_queue.py
Purpose: Manage async job queue with FIFO processing and disk persistence
Key Classes:
class JobStatus(Enum):
"""Job status enumeration"""
QUEUED = "queued" # In queue, waiting
RUNNING = "running" # Currently processing
COMPLETED = "completed" # Successfully finished
FAILED = "failed" # Error occurred
class Job:
"""Represents a transcription job"""
job_id: str # UUID
status: JobStatus # Current status
created_at: datetime # When job was created
started_at: datetime | None # When processing started
completed_at: datetime | None # When processing finished
queue_position: int # Position in queue (0 if running)
# Request parameters
audio_path: str
model_name: str
device: str
compute_type: str
language: str | None
output_format: str
beam_size: int
temperature: float
initial_prompt: str | None
output_directory: str | None
# Results
result_path: str | None # Path to transcription file
error: str | None # Error message if failed
processing_time_seconds: float | None
def to_dict(self) -> dict:
"""Serialize to dictionary for JSON storage"""
@classmethod
def from_dict(cls, data: dict) -> 'Job':
"""Deserialize from dictionary"""
def save_to_disk(self, metadata_dir: str):
"""Save job metadata to {metadata_dir}/{job_id}.json"""
class JobQueue:
"""Manages job queue with background worker"""
def __init__(self,
max_queue_size: int = 100,
metadata_dir: str = "/outputs/jobs"):
"""
Initialize job queue.
Args:
max_queue_size: Maximum number of jobs in queue
metadata_dir: Directory to store job metadata JSON files
"""
self._queue = queue.Queue(maxsize=max_queue_size)
self._jobs = {} # job_id -> Job
self._metadata_dir = metadata_dir
self._worker_thread = None
self._stop_event = threading.Event()
self._current_job_id = None
self._lock = threading.Lock()
def start(self):
"""
Start background worker thread.
Load existing jobs from disk on startup.
"""
def stop(self, wait_for_current: bool = True):
"""
Stop background worker.
Args:
wait_for_current: If True, wait for current job to complete
"""
def submit_job(self,
audio_path: str,
model_name: str = "large-v3",
device: str = "auto",
compute_type: str = "auto",
language: str | None = None,
output_format: str = "txt",
beam_size: int = 5,
temperature: float = 0.0,
initial_prompt: str | None = None,
output_directory: str | None = None) -> dict:
"""
Submit a new transcription job.
Returns:
dict: {
"job_id": str,
"status": str,
"queue_position": int,
"created_at": str
}
Raises:
queue.Full: If queue is at max capacity
RuntimeError: If GPU health check fails (when device="cuda")
"""
# 1. Validate audio file exists
# 2. Check GPU health if device="cuda" (raises if fails)
# 3. Generate job_id
# 4. Create Job object
# 5. Add to queue (raises queue.Full if full)
# 6. Save to disk
# 7. Return job info
def get_job_status(self, job_id: str) -> dict:
"""
Get current status of a job.
Returns:
dict: {
"job_id": str,
"status": str,
"queue_position": int | None,
"created_at": str,
"started_at": str | None,
"completed_at": str | None,
"result_path": str | None,
"error": str | None,
"processing_time_seconds": float | None
}
Raises:
KeyError: If job_id not found
"""
def get_job_result(self, job_id: str) -> str:
"""
Get transcription result text for completed job.
Returns:
str: Content of transcription file
Raises:
KeyError: If job_id not found
ValueError: If job not completed
FileNotFoundError: If result file missing
"""
def list_jobs(self,
status_filter: JobStatus | None = None,
limit: int = 100) -> List[dict]:
"""
List jobs with optional status filter.
Args:
status_filter: Only return jobs with this status
limit: Maximum number of jobs to return
Returns:
List of job status dictionaries
"""
def _worker_loop(self):
"""
Background worker thread function.
Processes jobs from queue in FIFO order.
"""
while not self._stop_event.is_set():
try:
# Get job from queue (with timeout to check stop_event)
job = self._queue.get(timeout=1.0)
with self._lock:
self._current_job_id = job.job_id
job.status = JobStatus.RUNNING
job.started_at = datetime.utcnow()
job.save_to_disk(self._metadata_dir)
# Process job
start_time = time.time()
try:
result = transcribe_audio(
audio_path=job.audio_path,
model_name=job.model_name,
device=job.device,
compute_type=job.compute_type,
language=job.language,
output_format=job.output_format,
beam_size=job.beam_size,
temperature=job.temperature,
initial_prompt=job.initial_prompt,
output_directory=job.output_directory
)
# Parse result
if "saved to:" in result:
job.result_path = result.split("saved to:")[1].strip()
job.status = JobStatus.COMPLETED
else:
job.status = JobStatus.FAILED
job.error = result
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
logger.error(f"Job {job.job_id} failed: {e}")
finally:
job.completed_at = datetime.utcnow()
job.processing_time_seconds = time.time() - start_time
job.save_to_disk(self._metadata_dir)
with self._lock:
self._current_job_id = None
self._queue.task_done()
except queue.Empty:
continue
def _load_jobs_from_disk(self):
"""Load existing job metadata from disk on startup"""
def _calculate_queue_positions(self):
"""Update queue_position for all queued jobs"""
Data Structures
Job Metadata JSON Format
File: {JOB_METADATA_DIR}/{job_id}.json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"created_at": "2025-10-07T10:30:00.123456Z",
"started_at": "2025-10-07T10:30:05.234567Z",
"completed_at": "2025-10-07T10:32:15.345678Z",
"queue_position": 0,
"request_params": {
"audio_path": "/media/raid/audio/interview.mp3",
"model_name": "large-v3",
"device": "cuda",
"compute_type": "float16",
"language": "en",
"output_format": "txt",
"beam_size": 5,
"temperature": 0.0,
"initial_prompt": null,
"output_directory": "/media/raid/outputs"
},
"result_path": "/media/raid/outputs/interview.txt",
"error": null,
"processing_time_seconds": 130.22
}
GPU Health Status JSON Format
{
"gpu_available": true,
"gpu_working": true,
"device_used": "cuda",
"device_name": "NVIDIA RTX 3090",
"memory_total_gb": 24.0,
"memory_available_gb": 20.5,
"test_duration_seconds": 0.87,
"timestamp": "2025-10-07T10:30:00.123456Z",
"error": null
}
API Specifications
REST API Endpoints
1. Submit Job (Async Transcription)
Endpoint: POST /jobs
Request Body:
{
"audio_path": "/path/to/audio.mp3",
"model_name": "large-v3",
"device": "auto",
"compute_type": "auto",
"language": "en",
"output_format": "txt",
"beam_size": 5,
"temperature": 0.0,
"initial_prompt": null,
"output_directory": null
}
Success Response (200):
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "queued",
"queue_position": 3,
"created_at": "2025-10-07T10:30:00.123456Z",
"message": "Job submitted successfully. Poll /jobs/{job_id} for status."
}
Error Responses:
- 400 Bad Request: Invalid parameters
- 503 Service Unavailable: Queue is full
- 500 Internal Server Error: GPU health check failed (if device=cuda)
2. Get Job Status
Endpoint: GET /jobs/{job_id}
Success Response (200):
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"queue_position": null,
"created_at": "2025-10-07T10:30:00.123456Z",
"started_at": "2025-10-07T10:30:05.234567Z",
"completed_at": null,
"result_path": null,
"error": null,
"processing_time_seconds": null
}
Error Responses:
- 404 Not Found: Job ID not found
3. Get Job Result
Endpoint: GET /jobs/{job_id}/result
Success Response (200):
Content-Type: text/plain
This is the transcribed text from the audio file...
Error Responses:
- 404 Not Found: Job ID not found
- 409 Conflict: Job not completed yet
- 500 Internal Server Error: Result file missing
4. List Jobs
Endpoint: GET /jobs?status={status}&limit={limit}
Query Parameters:
status(optional): Filter by status (queued, running, completed, failed)limit(optional): Max results (default: 100)
Success Response (200):
{
"jobs": [
{
"job_id": "...",
"status": "completed",
"created_at": "...",
...
}
],
"total": 42
}
5. GPU Health Check
Endpoint: GET /health/gpu
Success Response (200):
{
"gpu_available": true,
"gpu_working": true,
"device_used": "cuda",
"device_name": "NVIDIA RTX 3090",
"memory_total_gb": 24.0,
"memory_available_gb": 20.5,
"test_duration_seconds": 0.87,
"timestamp": "2025-10-07T10:30:00.123456Z",
"error": null,
"interpretation": "GPU is healthy and working correctly"
}
MCP Tools
1. transcribe_async
@mcp.tool()
def transcribe_async(
audio_path: str,
model_name: str = "large-v3",
device: str = "auto",
compute_type: str = "auto",
language: str = None,
output_format: str = "txt",
beam_size: int = 5,
temperature: float = 0.0,
initial_prompt: str = None,
output_directory: str = None
) -> str:
"""
Submit an audio file for asynchronous transcription.
IMPORTANT: This tool returns immediately with a job_id. Use get_job_status()
to check progress and get_job_result() to retrieve the transcription.
WORKFLOW FOR LLM AGENTS:
1. Call this tool to submit the job
2. You will receive a job_id and queue_position
3. Poll get_job_status(job_id) every 5-10 seconds to check progress
4. When status="completed", call get_job_result(job_id) to get transcription
For long audio files (>10 minutes), expect processing to take several minutes.
You can check queue_position to estimate wait time (each job ~2-5 minutes).
Args:
audio_path: Path to audio file on server
model_name: Whisper model (tiny, base, small, medium, large-v3)
device: Execution device (cpu, cuda, auto)
compute_type: Computation type (float16, int8, auto)
language: Language code (en, zh, ja, etc.) or auto-detect
output_format: Output format (txt, vtt, srt, json)
beam_size: Beam search size (larger=better quality, slower)
temperature: Sampling temperature (0.0=greedy)
initial_prompt: Optional prompt to guide transcription
output_directory: Where to save result (uses default if not specified)
Returns:
JSON string with job_id, status, queue_position, and instructions
"""
2. get_job_status
@mcp.tool()
def get_job_status(job_id: str) -> str:
"""
Check the status of a transcription job.
Status values:
- "queued": Job is waiting in queue. Check queue_position.
- "running": Job is currently being processed.
- "completed": Transcription finished. Call get_job_result() to retrieve.
- "failed": Job failed. Check error field for details.
Args:
job_id: Job ID from transcribe_async()
Returns:
JSON string with detailed job status including:
- status, queue_position, timestamps, error (if any)
"""
3. get_job_result
@mcp.tool()
def get_job_result(job_id: str) -> str:
"""
Retrieve the transcription result for a completed job.
IMPORTANT: Only call this when get_job_status() returns status="completed".
If the job is not completed, this will return an error.
Args:
job_id: Job ID from transcribe_async()
Returns:
Transcription text as a string
Errors:
- "Job not found" if invalid job_id
- "Job not completed yet" if status is not "completed"
- "Result file not found" if transcription file is missing
"""
4. list_transcription_jobs
@mcp.tool()
def list_transcription_jobs(
status_filter: str = None,
limit: int = 20
) -> str:
"""
List transcription jobs with optional filtering.
Useful for:
- Checking all your submitted jobs
- Finding completed jobs
- Monitoring queue status
Args:
status_filter: Filter by status (queued, running, completed, failed)
limit: Maximum number of jobs to return (default: 20)
Returns:
JSON string with list of jobs
"""
5. check_gpu_health
@mcp.tool()
def check_gpu_health() -> str:
"""
Test GPU availability and performance by running a quick transcription.
This tool loads the tiny model and transcribes a 1-second test audio file
to verify the GPU is working correctly.
Use this when:
- You want to verify GPU is available before submitting large jobs
- You suspect GPU performance issues
- For monitoring/debugging purposes
Returns:
JSON string with detailed GPU status including:
- gpu_available, gpu_working, device_name, memory_info
- test_duration_seconds (GPU: <1s, CPU: 5-10s)
- interpretation message
Note: If this returns gpu_working=false, transcriptions will be very slow.
"""
Implementation Phases
Phase 1: Core Infrastructure (Estimate: 2-3 hours)
Tasks:
-
✅ Create
src/utils/test_audio_generator.py- Implement
generate_test_audio() - Test audio file generation
- Implement
-
✅ Create
src/core/gpu_health.py- Implement
GPUHealthStatusdataclass - Implement
check_gpu_health()with strict failure handling - Implement
HealthMonitorclass - Test GPU health check (verify it raises error on GPU failure)
- Implement
-
✅ Create
src/core/job_queue.py- Implement
Jobclass with serialization - Implement
JobQueueclass - Test job submission, processing, status retrieval
- Test disk persistence (save/load)
- Implement
Testing:
- Unit test each component independently
- Verify GPU health check rejects when GPU fails
- Verify job queue persists and loads correctly
Phase 2: Server Integration (Estimate: 1-2 hours)
Tasks:
4. ✅ Update src/servers/api_server.py
- Initialize JobQueue singleton
- Initialize HealthMonitor
- Add POST /jobs endpoint
- Add GET /jobs/{id} endpoint
- Add GET /jobs/{id}/result endpoint
- Add GET /jobs endpoint
- Add GET /health/gpu endpoint
- Remove or deprecate old sync endpoints
- ✅ Update
src/servers/whisper_server.py- Initialize JobQueue singleton
- Initialize HealthMonitor
- Replace old tools with async tools:
- transcribe_async()
- get_job_status()
- get_job_result()
- list_transcription_jobs()
- check_gpu_health()
- Remove old tools
Testing:
- Test each endpoint with curl/httpie
- Test MCP tools with
mcp devcommand - Verify error handling
Phase 3: Configuration & Environment (Estimate: 30 min)
Tasks:
6. ✅ Update run_api_server.sh
- Add job queue env vars
- Add GPU health monitor env vars
- Create job metadata directory
- ✅ Update
run_mcp_server.sh- Add job queue env vars
- Add GPU health monitor env vars
- Create job metadata directory
Testing:
- Test startup with new env vars
- Verify directories are created
Phase 4: Integration Testing (Estimate: 1-2 hours)
Tasks: 8. ✅ Test end-to-end flow
- Submit job → Poll status → Get result
- Test with real audio files
- Test queue limits (submit 101 jobs)
- Test GPU health check
- Test server restart (verify job persistence)
-
✅ Test error conditions
- Invalid audio path
- Queue full
- GPU failure (mock by setting device=cuda on CPU-only machine)
- Job not found
- Result retrieval before completion
-
✅ Test MCP integration
- Add to Claude Desktop config
- Test transcribe_async flow
- Test all MCP tools
Testing Checklist:
- Single job submission and completion
- Multiple jobs in queue (FIFO ordering)
- Queue full rejection (503 error)
- GPU health check passes on GPU machine
- GPU health check fails on CPU-only machine (when device=cuda)
- Server restart with queued jobs (resume processing)
- Server restart with running job (mark as failed)
- Result retrieval for completed job
- Error handling for invalid job_id
- MCP tools work in Claude Desktop
- Periodic GPU monitoring runs in background
Testing Strategy
Unit Tests
# tests/test_gpu_health.py
def test_gpu_health_check_success():
"""Test GPU health check on working GPU"""
def test_gpu_health_check_rejects_cpu_fallback():
"""Test that expected=cuda but actual=cpu raises error"""
def test_health_monitor_periodic_checks():
"""Test background monitoring thread"""
# tests/test_job_queue.py
def test_job_submission():
"""Test job submission returns job_id"""
def test_job_processing_fifo():
"""Test jobs processed in FIFO order"""
def test_queue_full_rejection():
"""Test queue rejects when full"""
def test_job_persistence():
"""Test jobs saved and loaded from disk"""
def test_job_status_retrieval():
"""Test get_job_status() returns correct info"""
Integration Tests
# Test API endpoints
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d '{"audio_path": "/path/to/test.mp3"}'
# Expected: {"job_id": "...", "status": "queued", "queue_position": 1}
# Poll status
curl http://localhost:8000/jobs/{job_id}
# Expected: {"status": "running", ...}
# Get result (when completed)
curl http://localhost:8000/jobs/{job_id}/result
# Expected: Transcription text
MCP Tests
# Test with MCP CLI
mcp dev src/servers/whisper_server.py
# In MCP client, call:
transcribe_async(audio_path="/path/to/test.mp3")
# Returns: {job_id, status, queue_position}
get_job_status(job_id="...")
# Returns: {status, ...}
get_job_result(job_id="...")
# Returns: transcription text
Environment Variables
New Environment Variables
Add to run_api_server.sh and run_mcp_server.sh:
# Job Queue Configuration
export JOB_QUEUE_MAX_SIZE=100
export JOB_METADATA_DIR="/media/raid/agents/tools/mcp-transcriptor/outputs/jobs"
export JOB_RETENTION_DAYS=7 # Optional: auto-cleanup old jobs (0=disabled)
# GPU Health Monitoring
export GPU_HEALTH_CHECK_ENABLED=true
export GPU_HEALTH_CHECK_INTERVAL_MINUTES=10
export GPU_HEALTH_TEST_MODEL="tiny" # Model to use for health checks
# Create job metadata directory
mkdir -p "$JOB_METADATA_DIR"
Existing Variables (Keep)
export CUDA_VISIBLE_DEVICES=1
export WHISPER_MODEL_DIR="/home/uad/agents/tools/mcp-transcriptor/data/models"
export TRANSCRIPTION_OUTPUT_DIR="/media/raid/agents/tools/mcp-transcriptor/outputs"
export TRANSCRIPTION_BATCH_OUTPUT_DIR="/media/raid/agents/tools/mcp-transcriptor/outputs/batch"
export TRANSCRIPTION_MODEL="large-v3"
export TRANSCRIPTION_DEVICE="cuda"
export TRANSCRIPTION_COMPUTE_TYPE="float16"
export TRANSCRIPTION_OUTPUT_FORMAT="txt"
export TRANSCRIPTION_BEAM_SIZE="5"
export TRANSCRIPTION_TEMPERATURE="0.0"
Error Handling
Critical: GPU Failure Rejection
Scenario: User requests device=cuda but GPU is unavailable/failing
Current Behavior (BAD):
# model_manager.py:64-66
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu" # Silent fallback!
New Behavior (GOOD):
# In job_queue.py:submit_job()
if device == "cuda":
try:
health_status = check_gpu_health(expected_device="cuda")
if not health_status.gpu_working:
raise RuntimeError(
f"GPU device requested but not available. "
f"GPU check failed: {health_status.error}. "
f"Transcription would run on CPU and be 10-100x slower. "
f"Please use device='cpu' explicitly if you want CPU processing."
)
except RuntimeError as e:
# Re-raise with clear message
raise RuntimeError(f"Job rejected: {e}")
# In gpu_health.py:check_gpu_health()
if expected_device == "cuda":
# Run health check
if actual_device != "cuda":
raise RuntimeError(
"GPU requested but model loaded on CPU. "
"Possible causes: GPU driver issues, insufficient memory, "
"CUDA version mismatch. Check logs for details."
)
Result:
- Job submission fails immediately with clear error
- User knows GPU is not working
- User can decide to use CPU explicitly or fix GPU
- No wasted time on slow CPU processing
Other Error Scenarios
1. Queue Full
# Return 503 Service Unavailable
{
"error": "Job queue is full",
"queue_size": 100,
"message": "Please try again later or contact administrator"
}
2. Invalid Audio Path
# Return 400 Bad Request
{
"error": "Audio file not found",
"audio_path": "/invalid/path.mp3",
"message": "Please verify the file exists and path is correct"
}
3. Job Not Found
# Return 404 Not Found
{
"error": "Job not found",
"job_id": "invalid-uuid",
"message": "Job ID does not exist or has been cleaned up"
}
4. Result Not Ready
# Return 409 Conflict
{
"error": "Job not completed",
"job_id": "...",
"current_status": "running",
"message": "Please wait for job to complete before requesting result"
}
Monitoring & Observability
Logging Strategy
Log Levels:
- INFO: Normal operations (job submitted, started, completed)
- WARNING: Performance issues (GPU slow, queue filling up)
- ERROR: Failures (job failed, GPU check failed)
Key Log Messages:
# Job lifecycle
logger.info(f"Job {job_id} submitted: {audio_path}")
logger.info(f"Job {job_id} started processing (queue position was {pos})")
logger.info(f"Job {job_id} completed in {duration:.1f}s")
logger.error(f"Job {job_id} failed: {error}")
# GPU health
logger.info(f"GPU health check passed: {device_name}, {test_duration:.2f}s")
logger.warning(f"GPU health check slow: {test_duration:.2f}s (expected <1s)")
logger.error(f"GPU health check failed: {error}")
# Queue status
logger.warning(f"Job queue filling up: {queue_size}/{max_size}")
logger.error(f"Job queue full, rejecting request")
Metrics to Track
Job Metrics:
- Total jobs submitted
- Jobs completed successfully
- Jobs failed
- Average processing time
- Average queue wait time
Queue Metrics:
- Current queue size
- Max queue size seen
- Queue full rejections
GPU Metrics:
- GPU health check results (success/fail)
- GPU utilization (if available)
- Model loading failures
Migration Strategy
Backward Compatibility
Option 1: Deprecate Old Endpoints (Recommended)
- Keep old endpoints for 1-2 releases with deprecation warnings
- Return warning header:
X-Deprecated: Use /jobs endpoint instead - Document migration path in CLAUDE.md
Option 2: Remove Old Endpoints Immediately
- Clean break, simpler codebase
- Update CLAUDE.md with new API only
- Announce breaking change in release notes
Recommendation: Option 1 for REST API, Option 2 for MCP tools (MCP users update config anyway)
Deployment Steps
-
Pre-deployment:
- Test all components in development
- Verify GPU health check works
- Test job persistence
-
Deployment:
- Update code
- Update environment variables in run scripts
- Create job metadata directory
- Restart services
-
Post-deployment:
- Monitor logs for errors
- Check GPU health status
- Verify jobs are processing
- Test with real workload
-
Rollback Plan:
- Keep old code in git branch
- Can quickly revert if issues found
- Job metadata on disk survives rollback
Future Enhancements
Phase 2 Features (Not in Initial Implementation)
-
Job Cancellation
- Add
DELETE /jobs/{id}endpoint - Gracefully stop running job
- Add
-
Priority Queue
- Add priority parameter to job submission
- Use PriorityQueue instead of Queue
-
Batch Job Submission
- Submit multiple files as single batch
- Track as parent job with sub-jobs
-
Result Streaming
- Stream partial results as transcription progresses
- Use Server-Sent Events or WebSockets
-
Distributed Workers
- Multiple worker processes/machines
- Use Redis/RabbitMQ for queue
- Horizontal scaling
-
Job Expiration
- Auto-delete old completed jobs
- Configurable retention policy
-
Retry Logic
- Auto-retry failed jobs
- Exponential backoff
-
Progress Reporting
- Report percentage complete
- Estimate time remaining
Appendix A: Code Examples
Example 1: Using REST API
import requests
import time
# Submit job
response = requests.post('http://localhost:8000/jobs', json={
'audio_path': '/path/to/audio.mp3',
'model_name': 'large-v3',
'output_format': 'txt'
})
job = response.json()
job_id = job['job_id']
print(f"Job submitted: {job_id}, queue position: {job['queue_position']}")
# Poll for completion
while True:
response = requests.get(f'http://localhost:8000/jobs/{job_id}')
status = response.json()
if status['status'] == 'completed':
print("Job completed!")
break
elif status['status'] == 'failed':
print(f"Job failed: {status['error']}")
break
else:
print(f"Status: {status['status']}, queue_position: {status.get('queue_position', 'N/A')}")
time.sleep(10) # Poll every 10 seconds
# Get result
response = requests.get(f'http://localhost:8000/jobs/{job_id}/result')
transcription = response.text
print(f"Transcription:\n{transcription}")
Example 2: Using MCP Tools (LLM Agent)
LLM Agent workflow:
1. Submit job:
transcribe_async(audio_path="/path/to/podcast.mp3", model_name="large-v3")
→ Returns: {"job_id": "abc-123", "status": "queued", "queue_position": 2}
2. Poll status:
get_job_status(job_id="abc-123")
→ Returns: {"status": "queued", "queue_position": 1}
[Wait 10 seconds]
get_job_status(job_id="abc-123")
→ Returns: {"status": "running"}
[Wait 30 seconds]
get_job_status(job_id="abc-123")
→ Returns: {"status": "completed", "result_path": "/outputs/podcast.txt"}
3. Get result:
get_job_result(job_id="abc-123")
→ Returns: "Welcome to our podcast. Today we're discussing..."
Appendix B: Architecture Decisions
Why In-Memory Queue Instead of Redis?
Pros of In-Memory:
- Zero external dependencies
- Simple to implement and test
- Fast (no network overhead)
- Sufficient for single-machine deployment
Cons:
- Not distributed (can't scale horizontally)
- Jobs lost if process crashes before saving to disk
- No shared queue across multiple processes
Decision: Start with in-memory, migrate to Redis if scaling needed
Why Single Worker Thread?
Pros:
- No concurrent GPU access (avoids memory issues)
- Simple to implement and debug
- Predictable resource usage
- FIFO ordering guaranteed
Cons:
- Lower throughput (one job at a time)
- Can't utilize multiple GPUs
Decision: Single worker is best for GPU processing. Can add multi-worker for CPU-only mode later.
Why JSON Files Instead of SQLite?
Pros of JSON Files:
- Simple to inspect (just cat the file)
- No database corruption issues
- Easy to backup/restore
- No locking issues
- One file per job (no shared state)
Cons:
- Slower for large job counts (10,000+)
- No complex queries
- No transactions
Decision: JSON files sufficient for expected workload (<1000 jobs). Can migrate to SQLite if needed.
Appendix C: Security Considerations
Input Validation
Audio Path:
- Verify file exists
- Check file extension
- Verify file size (<10GB recommended)
- Consider path traversal attacks (validate no
../in path)
Model Selection:
- Validate against whitelist of allowed models
- Prevent arbitrary model loading
Output Directory:
- Validate directory exists and is writable
- Consider restricting to specific base directories
Resource Limits
Queue Size:
- Limit max queue size (prevent DOS)
- Return 503 when full
File Size:
- Warn on files >1GB
- Consider max file size limit
Job Retention:
- Implement cleanup of old jobs
- Prevent disk space exhaustion
Success Criteria
Implementation is considered successful when:
- ✅ Jobs can be submitted and return immediately (no timeout)
- ✅ Jobs are processed in FIFO order
- ✅ GPU health check correctly detects GPU failures
- ✅ GPU device=cuda requests are REJECTED if GPU unavailable
- ✅ Jobs persist to disk and survive server restarts
- ✅ Queue full scenario returns 503 error
- ✅ MCP tools work correctly in Claude Desktop
- ✅ All tests pass
- ✅ Documentation is complete and accurate
- ✅ Existing functionality is not broken
END OF DEVELOPMENT PLAN