diff --git a/backend/compression.py b/backend/compression.py index c82c71a..89d05a0 100644 --- a/backend/compression.py +++ b/backend/compression.py @@ -18,6 +18,7 @@ logger = logging.getLogger(__name__) # Configuration constants MAX_CONCURRENT_JOBS = 2 MAX_STORED_JOBS = 100 +MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue PRUNE_INTERVAL_SECONDS = 300 # 5 minutes WATCHDOG_TIMEOUT_SECONDS = 300 # 5 minutes WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds @@ -51,11 +52,12 @@ class CompressionManager: TIME_PATTERN = re.compile(r'out_time_ms=(\d+)') FPS_PATTERN = re.compile(r'fps=([\d.]+)') - def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None): + def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None): self.jobs: Dict[str, CompressionJob] = {} self.active_jobs: Dict[str, asyncio.Task] = {} self.semaphore = asyncio.Semaphore(max_concurrent) self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None + self.health_checker = health_checker self._pruning_task: Optional[asyncio.Task] = None self._jobs_lock = asyncio.Lock() self._start_periodic_pruning() @@ -388,8 +390,27 @@ class CompressionManager: """Get a safe snapshot of all jobs for iteration""" return list(self.jobs.values()) + def get_pending_count(self) -> int: + """Get count of pending jobs""" + return sum(1 for job in self.jobs.values() if job.status == "pending") + async def start_compression(self, file_path: str, reduce_percentage: int) -> str: """Start a new compression job""" + # Check filesystem health first + if self.health_checker: + status = self.health_checker.get_status() + if not status["healthy"]: + error_msg = f"Cannot start compression: Filesystem is not writable. {status.get('error', 'Unknown error')}" + logger.error(error_msg) + raise ValueError(error_msg) + + # Check if queue is full + pending_count = self.get_pending_count() + if pending_count >= MAX_PENDING_JOBS: + error_msg = f"Queue is full: {pending_count}/{MAX_PENDING_JOBS} pending jobs. Please wait for jobs to complete." + logger.warning(error_msg) + raise ValueError(error_msg) + # Validate path is within allowed directory if self.allowed_base_path: abs_path = Path(file_path).resolve() diff --git a/backend/filesystem_health.py b/backend/filesystem_health.py new file mode 100644 index 0000000..d7a0e09 --- /dev/null +++ b/backend/filesystem_health.py @@ -0,0 +1,170 @@ +import asyncio +import logging +import time +from pathlib import Path +from typing import Dict, Optional +from datetime import datetime + +# Configure logging +logger = logging.getLogger(__name__) + +# Configuration constants +HEALTH_CHECK_INTERVAL_SECONDS = 3600 # Check every 60 minutes +TEST_FILE_PREFIX = ".write_test_" + + +class FilesystemHealthChecker: + """ + Monitors filesystem write permissions by periodically attempting to write a test file. + Tracks health status and provides real-time updates to the application. + """ + + def __init__(self, base_path: Path): + self.base_path = base_path.resolve() + self.is_healthy = True + self.last_check_time: Optional[datetime] = None + self.error_message: Optional[str] = None + self._monitoring_task: Optional[asyncio.Task] = None + self._status_change_callbacks = [] + + def add_status_change_callback(self, callback): + """Register a callback to be notified when health status changes""" + self._status_change_callbacks.append(callback) + + async def _notify_status_change(self): + """Notify all registered callbacks of status change""" + for callback in self._status_change_callbacks: + try: + if asyncio.iscoroutinefunction(callback): + await callback(self.get_status()) + else: + callback(self.get_status()) + except Exception as e: + logger.error(f"Error in status change callback: {e}") + + async def check_write_permission(self) -> bool: + """ + Attempt to write a test file to verify write permissions. + Returns True if write successful, False otherwise. + """ + test_file_path = None + try: + # Generate unique test file name with timestamp + timestamp = int(time.time() * 1000) + test_file_name = f"{TEST_FILE_PREFIX}{timestamp}" + test_file_path = self.base_path / test_file_name + + logger.debug(f"Testing write permission: {test_file_path}") + + # Attempt to write test file + test_file_path.write_text(f"Health check at {datetime.now().isoformat()}\n") + + # Verify file exists and is readable + if not test_file_path.exists(): + raise IOError("Test file was not created successfully") + + content = test_file_path.read_text() + if not content: + raise IOError("Test file is empty after write") + + # Clean up test file immediately + test_file_path.unlink() + + logger.debug("Write permission test passed") + return True + + except PermissionError as e: + logger.error(f"Permission denied writing to {self.base_path}: {e}") + self.error_message = f"Permission denied: {str(e)}" + return False + + except OSError as e: + logger.error(f"OS error writing to {self.base_path}: {e}") + if "Read-only file system" in str(e): + self.error_message = "Filesystem is mounted as read-only" + else: + self.error_message = f"OS error: {str(e)}" + return False + + except Exception as e: + logger.error(f"Unexpected error during write test: {e}", exc_info=True) + self.error_message = f"Unexpected error: {str(e)}" + return False + + finally: + # Ensure cleanup even if error occurs + if test_file_path and test_file_path.exists(): + try: + test_file_path.unlink() + logger.debug(f"Cleaned up test file: {test_file_path}") + except Exception as e: + logger.warning(f"Failed to clean up test file {test_file_path}: {e}") + + async def perform_health_check(self) -> Dict: + """ + Perform a single health check and update status. + Returns the current health status. + """ + previous_health = self.is_healthy + self.last_check_time = datetime.now() + + can_write = await self.check_write_permission() + + if can_write: + self.is_healthy = True + self.error_message = None + logger.info(f"Filesystem health check PASSED at {self.last_check_time.isoformat()}") + else: + self.is_healthy = False + logger.error( + f"Filesystem health check FAILED at {self.last_check_time.isoformat()}: " + f"{self.error_message}" + ) + + # Notify if status changed + if previous_health != self.is_healthy: + await self._notify_status_change() + + return self.get_status() + + def get_status(self) -> Dict: + """Get current health status""" + return { + "healthy": self.is_healthy, + "last_check": self.last_check_time.isoformat() if self.last_check_time else None, + "error": self.error_message, + "base_path": str(self.base_path), + } + + async def _monitoring_loop(self): + """Background task that periodically checks filesystem health""" + interval_minutes = HEALTH_CHECK_INTERVAL_SECONDS / 60 + logger.info( + f"Starting filesystem health monitoring for {self.base_path} " + f"(interval: {interval_minutes:.0f} minutes)" + ) + + while True: + try: + await self.perform_health_check() + await asyncio.sleep(HEALTH_CHECK_INTERVAL_SECONDS) + except asyncio.CancelledError: + logger.info("Filesystem health monitoring stopped") + break + except Exception as e: + logger.error(f"Error in health monitoring loop: {e}", exc_info=True) + await asyncio.sleep(HEALTH_CHECK_INTERVAL_SECONDS) + + def start_monitoring(self): + """Start the background health monitoring task""" + if self._monitoring_task is None or self._monitoring_task.done(): + self._monitoring_task = asyncio.create_task(self._monitoring_loop()) + logger.info("Filesystem health monitoring started") + else: + logger.warning("Monitoring task already running") + + def stop_monitoring(self): + """Stop the background health monitoring task""" + if self._monitoring_task and not self._monitoring_task.done(): + self._monitoring_task.cancel() + logger.info("Filesystem health monitoring stopped") diff --git a/backend/main.py b/backend/main.py index 2e46512..892f205 100644 --- a/backend/main.py +++ b/backend/main.py @@ -11,8 +11,13 @@ import aiofiles.os import asyncio import json import time +import logging from sse_starlette.sse import EventSourceResponse from compression import CompressionManager +from filesystem_health import FilesystemHealthChecker + +# Configure logging +logger = logging.getLogger(__name__) app = FastAPI(title="Drone Footage Manager API") @@ -47,8 +52,15 @@ class SimpleCache: directory_cache = SimpleCache() -# Initialize compression manager -compression_manager = CompressionManager(max_concurrent=2, allowed_base_path=FOOTAGES_PATH) +# Initialize filesystem health checker first (compression manager needs it) +filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH) + +# Initialize compression manager with health checker +compression_manager = CompressionManager( + max_concurrent=1, + allowed_base_path=FOOTAGES_PATH, + health_checker=filesystem_health_checker +) # CORS middleware for frontend communication ALLOWED_ORIGINS = os.getenv("ALLOWED_ORIGINS", "*").split(",") @@ -83,6 +95,32 @@ async def get_file_info(file_path: Path) -> Dict: } +@app.on_event("startup") +async def startup_event(): + """Run startup tasks""" + logger.info("Running startup tasks...") + + # Perform initial filesystem health check + initial_status = await filesystem_health_checker.perform_health_check() + if initial_status["healthy"]: + logger.info("✓ Initial filesystem health check PASSED") + else: + logger.error( + f"✗ Initial filesystem health check FAILED: {initial_status['error']}" + ) + + # Start background monitoring + filesystem_health_checker.start_monitoring() + logger.info("Application startup complete") + + +@app.on_event("shutdown") +async def shutdown_event(): + """Run shutdown tasks""" + logger.info("Shutting down...") + filesystem_health_checker.stop_monitoring() + + @app.get("/") async def root(): return {"message": "Drone Footage Manager API", "status": "running"} @@ -403,6 +441,39 @@ async def compression_events(request: Request): return EventSourceResponse(event_generator()) +# ========== SYSTEM HEALTH API ENDPOINTS ========== + +@app.get("/api/system/health") +async def get_system_health(): + """Get current system health status""" + return filesystem_health_checker.get_status() + + +@app.get("/api/system/health/stream") +async def system_health_stream(request: Request): + """Server-Sent Events endpoint for real-time health status updates""" + async def event_generator(): + try: + while True: + # Check if client is still connected + if await request.is_disconnected(): + break + + # Send current health status + status = filesystem_health_checker.get_status() + yield { + "event": "health", + "data": json.dumps(status) + } + + # Check every 5 seconds + await asyncio.sleep(5) + except asyncio.CancelledError: + pass + + return EventSourceResponse(event_generator()) + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/docker-compose.yml b/docker-compose.yml index 632d871..62239e1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ services: dockerfile: Dockerfile container_name: drone-footage-backend volumes: - - /home/uad/nextcloud/footages:/footages:ro + - /home/uad/nextcloud/footages:/footages restart: unless-stopped networks: - drone-footage-network diff --git a/frontend/src/ActiveJobsMonitor.jsx b/frontend/src/ActiveJobsMonitor.jsx index 9da7dc7..ed40d57 100644 --- a/frontend/src/ActiveJobsMonitor.jsx +++ b/frontend/src/ActiveJobsMonitor.jsx @@ -6,6 +6,19 @@ function ActiveJobsMonitor() { const activeJobs = jobs.filter(j => ['pending', 'processing', 'validating'].includes(j.status)) + // Calculate queue positions for pending jobs + const pendingJobs = activeJobs.filter(j => j.status === 'pending') + const processingJobs = activeJobs.filter(j => j.status === 'processing' || j.status === 'validating') + + // Assign queue positions (1-based) + const jobsWithPosition = activeJobs.map((job, index) => { + if (job.status === 'pending') { + const queuePosition = pendingJobs.findIndex(j => j.job_id === job.job_id) + 1 + return { ...job, queuePosition } + } + return { ...job, queuePosition: null } + }) + // Don't show the component if there are no active jobs if (activeJobs.length === 0) { return null @@ -17,11 +30,16 @@ function ActiveJobsMonitor() {
⏳ Waiting for current compression to finish...
++ ⚠️ Compression is disabled due to system storage issues. +
+ )}+ Video compression is currently disabled. The system cannot write to the storage directory. +
+ {health.error && ( ++ Error: {health.error} +
+ )} ++ Last checked: {health.last_check ? new Date(health.last_check).toLocaleString() : 'Never'} +
+