Add filesystem health monitoring and compression queue system

- Implement periodic filesystem write permission checks (60-minute intervals)
- Add real-time health status monitoring with SSE endpoints
- Display system health banner when storage issues detected
- Limit compression to 1 concurrent job with queue support
- Add max queue limit of 10 pending jobs
- Show queue positions for pending compression jobs
- Update button text dynamically (Start/Queue Compression)
- Enable write access to footage mount in Docker
- Add comprehensive logging for health checks and compression

Co-Authored-By: Alihan <alihan@example.com>
This commit is contained in:
Alihan
2025-10-12 22:54:21 +03:00
parent b01fea34aa
commit dec49a43f9
8 changed files with 442 additions and 26 deletions

View File

@@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
# Configuration constants
MAX_CONCURRENT_JOBS = 2
MAX_STORED_JOBS = 100
MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue
PRUNE_INTERVAL_SECONDS = 300 # 5 minutes
WATCHDOG_TIMEOUT_SECONDS = 300 # 5 minutes
WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds
@@ -51,11 +52,12 @@ class CompressionManager:
TIME_PATTERN = re.compile(r'out_time_ms=(\d+)')
FPS_PATTERN = re.compile(r'fps=([\d.]+)')
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None):
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None):
self.jobs: Dict[str, CompressionJob] = {}
self.active_jobs: Dict[str, asyncio.Task] = {}
self.semaphore = asyncio.Semaphore(max_concurrent)
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
self.health_checker = health_checker
self._pruning_task: Optional[asyncio.Task] = None
self._jobs_lock = asyncio.Lock()
self._start_periodic_pruning()
@@ -388,8 +390,27 @@ class CompressionManager:
"""Get a safe snapshot of all jobs for iteration"""
return list(self.jobs.values())
def get_pending_count(self) -> int:
"""Get count of pending jobs"""
return sum(1 for job in self.jobs.values() if job.status == "pending")
async def start_compression(self, file_path: str, reduce_percentage: int) -> str:
"""Start a new compression job"""
# Check filesystem health first
if self.health_checker:
status = self.health_checker.get_status()
if not status["healthy"]:
error_msg = f"Cannot start compression: Filesystem is not writable. {status.get('error', 'Unknown error')}"
logger.error(error_msg)
raise ValueError(error_msg)
# Check if queue is full
pending_count = self.get_pending_count()
if pending_count >= MAX_PENDING_JOBS:
error_msg = f"Queue is full: {pending_count}/{MAX_PENDING_JOBS} pending jobs. Please wait for jobs to complete."
logger.warning(error_msg)
raise ValueError(error_msg)
# Validate path is within allowed directory
if self.allowed_base_path:
abs_path = Path(file_path).resolve()

View File

@@ -0,0 +1,170 @@
import asyncio
import logging
import time
from pathlib import Path
from typing import Dict, Optional
from datetime import datetime
# Configure logging
logger = logging.getLogger(__name__)
# Configuration constants
HEALTH_CHECK_INTERVAL_SECONDS = 3600 # Check every 60 minutes
TEST_FILE_PREFIX = ".write_test_"
class FilesystemHealthChecker:
"""
Monitors filesystem write permissions by periodically attempting to write a test file.
Tracks health status and provides real-time updates to the application.
"""
def __init__(self, base_path: Path):
self.base_path = base_path.resolve()
self.is_healthy = True
self.last_check_time: Optional[datetime] = None
self.error_message: Optional[str] = None
self._monitoring_task: Optional[asyncio.Task] = None
self._status_change_callbacks = []
def add_status_change_callback(self, callback):
"""Register a callback to be notified when health status changes"""
self._status_change_callbacks.append(callback)
async def _notify_status_change(self):
"""Notify all registered callbacks of status change"""
for callback in self._status_change_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(self.get_status())
else:
callback(self.get_status())
except Exception as e:
logger.error(f"Error in status change callback: {e}")
async def check_write_permission(self) -> bool:
"""
Attempt to write a test file to verify write permissions.
Returns True if write successful, False otherwise.
"""
test_file_path = None
try:
# Generate unique test file name with timestamp
timestamp = int(time.time() * 1000)
test_file_name = f"{TEST_FILE_PREFIX}{timestamp}"
test_file_path = self.base_path / test_file_name
logger.debug(f"Testing write permission: {test_file_path}")
# Attempt to write test file
test_file_path.write_text(f"Health check at {datetime.now().isoformat()}\n")
# Verify file exists and is readable
if not test_file_path.exists():
raise IOError("Test file was not created successfully")
content = test_file_path.read_text()
if not content:
raise IOError("Test file is empty after write")
# Clean up test file immediately
test_file_path.unlink()
logger.debug("Write permission test passed")
return True
except PermissionError as e:
logger.error(f"Permission denied writing to {self.base_path}: {e}")
self.error_message = f"Permission denied: {str(e)}"
return False
except OSError as e:
logger.error(f"OS error writing to {self.base_path}: {e}")
if "Read-only file system" in str(e):
self.error_message = "Filesystem is mounted as read-only"
else:
self.error_message = f"OS error: {str(e)}"
return False
except Exception as e:
logger.error(f"Unexpected error during write test: {e}", exc_info=True)
self.error_message = f"Unexpected error: {str(e)}"
return False
finally:
# Ensure cleanup even if error occurs
if test_file_path and test_file_path.exists():
try:
test_file_path.unlink()
logger.debug(f"Cleaned up test file: {test_file_path}")
except Exception as e:
logger.warning(f"Failed to clean up test file {test_file_path}: {e}")
async def perform_health_check(self) -> Dict:
"""
Perform a single health check and update status.
Returns the current health status.
"""
previous_health = self.is_healthy
self.last_check_time = datetime.now()
can_write = await self.check_write_permission()
if can_write:
self.is_healthy = True
self.error_message = None
logger.info(f"Filesystem health check PASSED at {self.last_check_time.isoformat()}")
else:
self.is_healthy = False
logger.error(
f"Filesystem health check FAILED at {self.last_check_time.isoformat()}: "
f"{self.error_message}"
)
# Notify if status changed
if previous_health != self.is_healthy:
await self._notify_status_change()
return self.get_status()
def get_status(self) -> Dict:
"""Get current health status"""
return {
"healthy": self.is_healthy,
"last_check": self.last_check_time.isoformat() if self.last_check_time else None,
"error": self.error_message,
"base_path": str(self.base_path),
}
async def _monitoring_loop(self):
"""Background task that periodically checks filesystem health"""
interval_minutes = HEALTH_CHECK_INTERVAL_SECONDS / 60
logger.info(
f"Starting filesystem health monitoring for {self.base_path} "
f"(interval: {interval_minutes:.0f} minutes)"
)
while True:
try:
await self.perform_health_check()
await asyncio.sleep(HEALTH_CHECK_INTERVAL_SECONDS)
except asyncio.CancelledError:
logger.info("Filesystem health monitoring stopped")
break
except Exception as e:
logger.error(f"Error in health monitoring loop: {e}", exc_info=True)
await asyncio.sleep(HEALTH_CHECK_INTERVAL_SECONDS)
def start_monitoring(self):
"""Start the background health monitoring task"""
if self._monitoring_task is None or self._monitoring_task.done():
self._monitoring_task = asyncio.create_task(self._monitoring_loop())
logger.info("Filesystem health monitoring started")
else:
logger.warning("Monitoring task already running")
def stop_monitoring(self):
"""Stop the background health monitoring task"""
if self._monitoring_task and not self._monitoring_task.done():
self._monitoring_task.cancel()
logger.info("Filesystem health monitoring stopped")

View File

@@ -11,8 +11,13 @@ import aiofiles.os
import asyncio
import json
import time
import logging
from sse_starlette.sse import EventSourceResponse
from compression import CompressionManager
from filesystem_health import FilesystemHealthChecker
# Configure logging
logger = logging.getLogger(__name__)
app = FastAPI(title="Drone Footage Manager API")
@@ -47,8 +52,15 @@ class SimpleCache:
directory_cache = SimpleCache()
# Initialize compression manager
compression_manager = CompressionManager(max_concurrent=2, allowed_base_path=FOOTAGES_PATH)
# Initialize filesystem health checker first (compression manager needs it)
filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH)
# Initialize compression manager with health checker
compression_manager = CompressionManager(
max_concurrent=1,
allowed_base_path=FOOTAGES_PATH,
health_checker=filesystem_health_checker
)
# CORS middleware for frontend communication
ALLOWED_ORIGINS = os.getenv("ALLOWED_ORIGINS", "*").split(",")
@@ -83,6 +95,32 @@ async def get_file_info(file_path: Path) -> Dict:
}
@app.on_event("startup")
async def startup_event():
"""Run startup tasks"""
logger.info("Running startup tasks...")
# Perform initial filesystem health check
initial_status = await filesystem_health_checker.perform_health_check()
if initial_status["healthy"]:
logger.info("✓ Initial filesystem health check PASSED")
else:
logger.error(
f"✗ Initial filesystem health check FAILED: {initial_status['error']}"
)
# Start background monitoring
filesystem_health_checker.start_monitoring()
logger.info("Application startup complete")
@app.on_event("shutdown")
async def shutdown_event():
"""Run shutdown tasks"""
logger.info("Shutting down...")
filesystem_health_checker.stop_monitoring()
@app.get("/")
async def root():
return {"message": "Drone Footage Manager API", "status": "running"}
@@ -403,6 +441,39 @@ async def compression_events(request: Request):
return EventSourceResponse(event_generator())
# ========== SYSTEM HEALTH API ENDPOINTS ==========
@app.get("/api/system/health")
async def get_system_health():
"""Get current system health status"""
return filesystem_health_checker.get_status()
@app.get("/api/system/health/stream")
async def system_health_stream(request: Request):
"""Server-Sent Events endpoint for real-time health status updates"""
async def event_generator():
try:
while True:
# Check if client is still connected
if await request.is_disconnected():
break
# Send current health status
status = filesystem_health_checker.get_status()
yield {
"event": "health",
"data": json.dumps(status)
}
# Check every 5 seconds
await asyncio.sleep(5)
except asyncio.CancelledError:
pass
return EventSourceResponse(event_generator())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -7,7 +7,7 @@ services:
dockerfile: Dockerfile
container_name: drone-footage-backend
volumes:
- /home/uad/nextcloud/footages:/footages:ro
- /home/uad/nextcloud/footages:/footages
restart: unless-stopped
networks:
- drone-footage-network

View File

@@ -6,6 +6,19 @@ function ActiveJobsMonitor() {
const activeJobs = jobs.filter(j => ['pending', 'processing', 'validating'].includes(j.status))
// Calculate queue positions for pending jobs
const pendingJobs = activeJobs.filter(j => j.status === 'pending')
const processingJobs = activeJobs.filter(j => j.status === 'processing' || j.status === 'validating')
// Assign queue positions (1-based)
const jobsWithPosition = activeJobs.map((job, index) => {
if (job.status === 'pending') {
const queuePosition = pendingJobs.findIndex(j => j.job_id === job.job_id) + 1
return { ...job, queuePosition }
}
return { ...job, queuePosition: null }
})
// Don't show the component if there are no active jobs
if (activeJobs.length === 0) {
return null
@@ -17,11 +30,16 @@ function ActiveJobsMonitor() {
<h2 className="text-xl font-bold text-gray-800 flex items-center gap-2">
<span className="inline-block w-3 h-3 bg-blue-600 rounded-full animate-pulse"></span>
Active Compression Jobs ({activeJobs.length})
{pendingJobs.length > 0 && (
<span className="text-sm font-normal text-gray-600">
({processingJobs.length} processing, {pendingJobs.length} queued)
</span>
)}
</h2>
</div>
<div className="space-y-4">
{activeJobs.map(job => (
{jobsWithPosition.map(job => (
<div key={job.job_id} className="bg-white rounded-lg p-4 shadow border border-blue-200">
<div className="flex justify-between items-start mb-3">
<div className="flex-1">
@@ -33,7 +51,7 @@ function ActiveJobsMonitor() {
<div className="flex gap-4 mt-2 text-xs text-gray-500">
<span className="font-medium">
{job.status === 'validating' ? '✓ Validating...' :
job.status === 'pending' ? '⏳ Pending' :
job.status === 'pending' ? `⏳ Queue Position #${job.queuePosition}` :
`🎬 Pass ${job.current_pass}/2`}
</span>
{job.video_bitrate && <span>Bitrate: {job.video_bitrate}k</span>}
@@ -47,24 +65,35 @@ function ActiveJobsMonitor() {
</button>
</div>
{/* Progress Bar */}
<div className="w-full bg-gray-200 rounded-full h-4 overflow-hidden shadow-inner">
<div
className="bg-gradient-to-r from-blue-500 to-blue-600 h-4 rounded-full transition-all duration-300 flex items-center justify-end pr-2"
style={{ width: `${job.progress}%` }}
>
{job.progress > 10 && (
<span className="text-xs text-white font-bold">{job.progress.toFixed(1)}%</span>
)}
</div>
</div>
{/* Progress Bar - Only show for processing/validating jobs */}
{job.status !== 'pending' && (
<>
<div className="w-full bg-gray-200 rounded-full h-4 overflow-hidden shadow-inner">
<div
className="bg-gradient-to-r from-blue-500 to-blue-600 h-4 rounded-full transition-all duration-300 flex items-center justify-end pr-2"
style={{ width: `${job.progress}%` }}
>
{job.progress > 10 && (
<span className="text-xs text-white font-bold">{job.progress.toFixed(1)}%</span>
)}
</div>
</div>
<div className="flex justify-between text-sm mt-2">
<span className="text-gray-700 font-medium">{job.progress.toFixed(1)}% complete</span>
<span className="text-gray-600">
{job.status === 'validating' ? '✨ Almost done...' : `⏱ ETA: ${formatETA(job.eta_seconds)}`}
</span>
</div>
<div className="flex justify-between text-sm mt-2">
<span className="text-gray-700 font-medium">{job.progress.toFixed(1)}% complete</span>
<span className="text-gray-600">
{job.status === 'validating' ? '✨ Almost done...' : `⏱ ETA: ${formatETA(job.eta_seconds)}`}
</span>
</div>
</>
)}
{/* Queue status for pending jobs */}
{job.status === 'pending' && (
<div className="text-sm text-gray-600 mt-2">
<p> Waiting for current compression to finish...</p>
</div>
)}
</div>
))}
</div>

View File

@@ -1,8 +1,13 @@
import { useState, useEffect, useRef } from 'react'
import ActiveJobsMonitor from './ActiveJobsMonitor'
import SystemHealthBanner from './SystemHealthBanner'
import { useSystemHealth } from './hooks/useSystemHealth'
import { useCompressionJobs } from './hooks/useCompressionJobs'
import { formatFileSize } from './utils/formatters'
function App() {
const { isHealthy, error: healthError } = useSystemHealth()
const { jobs } = useCompressionJobs()
const [locations, setLocations] = useState([])
const [selectedLocation, setSelectedLocation] = useState(null)
const [dates, setDates] = useState([])
@@ -200,7 +205,12 @@ function App() {
})
if (response.ok) {
showToast('Compression job started!', 'success')
const hasActiveJobs = jobs.some(j => ['pending', 'processing', 'validating'].includes(j.status))
if (hasActiveJobs) {
showToast('Compression job queued successfully!', 'success')
} else {
showToast('Compression job started!', 'success')
}
} else {
const error = await response.json()
showToast(`Failed: ${error.detail}`, 'error')
@@ -212,6 +222,9 @@ function App() {
}
}
// Check if there are active jobs to change button text
const hasActiveJobs = jobs.some(j => ['pending', 'processing', 'validating'].includes(j.status))
return (
<div className="min-h-screen bg-gray-100">
{/* Header */}
@@ -226,6 +239,9 @@ function App() {
</div>
</header>
{/* System Health Banner */}
<SystemHealthBanner />
{/* Main Content */}
<div className="container mx-auto px-4 py-6">
{error && (
@@ -300,11 +316,17 @@ function App() {
<button
onClick={startCompression}
disabled={compressing}
disabled={compressing || !isHealthy}
className="mt-4 bg-blue-600 text-white px-6 py-2 rounded hover:bg-blue-700 disabled:bg-gray-400 disabled:cursor-not-allowed transition"
title={!isHealthy ? `Compression disabled: ${healthError || 'System not healthy'}` : ''}
>
{compressing ? 'Starting...' : 'Start Compression'}
{compressing ? 'Queueing...' : hasActiveJobs ? 'Queue Compression' : 'Start Compression'}
</button>
{!isHealthy && (
<p className="text-sm text-red-600 mt-2">
Compression is disabled due to system storage issues.
</p>
)}
</div>
</div>
)}

View File

@@ -0,0 +1,34 @@
import { useSystemHealth } from './hooks/useSystemHealth'
function SystemHealthBanner() {
const { health, loading } = useSystemHealth()
// Don't show banner if system is healthy or still loading
if (loading || health.healthy) {
return null
}
return (
<div className="bg-red-600 text-white px-4 py-4 shadow-lg">
<div className="container mx-auto flex items-start gap-3">
<span className="text-2xl flex-shrink-0"></span>
<div className="flex-1">
<h3 className="font-bold text-lg mb-1">System Error: Storage Not Writable</h3>
<p className="text-sm opacity-90">
Video compression is currently disabled. The system cannot write to the storage directory.
</p>
{health.error && (
<p className="text-xs mt-2 font-mono bg-red-700 bg-opacity-50 px-3 py-2 rounded">
<strong>Error:</strong> {health.error}
</p>
)}
<p className="text-xs mt-2 opacity-75">
Last checked: {health.last_check ? new Date(health.last_check).toLocaleString() : 'Never'}
</p>
</div>
</div>
</div>
)
}
export default SystemHealthBanner

View File

@@ -0,0 +1,69 @@
import { useState, useEffect, useRef } from 'react'
/**
* Custom hook for monitoring system health with real-time SSE updates
* Tracks filesystem write permissions and other system health metrics
*/
export function useSystemHealth() {
const [health, setHealth] = useState({
healthy: true,
last_check: null,
error: null,
base_path: null,
loading: true
})
const eventSourceRef = useRef(null)
const API_URL = import.meta.env.VITE_API_URL || '/api'
useEffect(() => {
// Fetch initial health status
fetchHealth()
// Connect to SSE for real-time updates
connectSSE()
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close()
}
}
}, [])
const fetchHealth = async () => {
try {
const response = await fetch(`${API_URL}/system/health`)
const data = await response.json()
setHealth({ ...data, loading: false })
} catch (error) {
console.error('Failed to fetch system health:', error)
setHealth(prev => ({ ...prev, loading: false }))
}
}
const connectSSE = () => {
const eventSource = new EventSource(`${API_URL}/system/health/stream`)
eventSource.addEventListener('health', (event) => {
const healthData = JSON.parse(event.data)
setHealth({ ...healthData, loading: false })
})
eventSource.onerror = () => {
console.error('System health SSE connection error')
eventSource.close()
// Reconnect after 10 seconds
setTimeout(connectSSE, 10000)
}
eventSourceRef.current = eventSource
}
return {
health,
isHealthy: health.healthy,
error: health.error,
loading: health.loading,
refreshHealth: fetchHealth
}
}