diff --git a/backend/compression.py b/backend/compression.py index 0519b41..827ca97 100644 --- a/backend/compression.py +++ b/backend/compression.py @@ -4,9 +4,11 @@ import uuid import os import re import logging +import shutil from pathlib import Path from datetime import datetime from typing import Dict, Optional +from job_persistence import JobPersistence # Configure logging logging.basicConfig( @@ -30,8 +32,8 @@ MIN_VIDEO_BITRATE = 100 # kbps class CompressionJob: - def __init__(self, file_path: str, reduce_percentage: int): - self.job_id = str(uuid.uuid4()) + def __init__(self, file_path: str, reduce_percentage: int, job_id: Optional[str] = None): + self.job_id = job_id or str(uuid.uuid4()) self.file_path = file_path self.reduce_percentage = reduce_percentage self.status = "pending" # pending, processing, validating, completed, failed, cancelled @@ -47,6 +49,59 @@ class CompressionJob: self.target_size_mb = None self.video_bitrate = None self.duration_seconds = None + self.ffmpeg_pid = None # Track ffmpeg process PID + + def to_dict(self) -> Dict: + """Convert job to dictionary for database persistence""" + return { + 'job_id': self.job_id, + 'file_path': self.file_path, + 'reduce_percentage': self.reduce_percentage, + 'status': self.status, + 'progress': self.progress, + 'eta_seconds': self.eta_seconds, + 'current_pass': self.current_pass, + 'created_at': self.created_at, + 'started_at': self.started_at, + 'completed_at': self.completed_at, + 'error': self.error, + 'output_file': self.output_file, + 'current_size_mb': self.current_size_mb, + 'target_size_mb': self.target_size_mb, + 'video_bitrate': self.video_bitrate, + 'duration_seconds': self.duration_seconds, + 'ffmpeg_pid': self.ffmpeg_pid + } + + @classmethod + def from_dict(cls, data: Dict) -> 'CompressionJob': + """Create job from database dictionary""" + job = cls( + file_path=data['file_path'], + reduce_percentage=data['reduce_percentage'], + job_id=data['job_id'] + ) + job.status = data.get('status', 'pending') + job.progress = data.get('progress', 0.0) + job.eta_seconds = data.get('eta_seconds') + job.current_pass = data.get('current_pass', 0) + job.error = data.get('error') + job.output_file = data.get('output_file') + job.current_size_mb = data.get('current_size_mb') + job.target_size_mb = data.get('target_size_mb') + job.video_bitrate = data.get('video_bitrate') + job.duration_seconds = data.get('duration_seconds') + job.ffmpeg_pid = data.get('ffmpeg_pid') + + # Parse datetime strings back to datetime objects + for key in ['created_at', 'started_at', 'completed_at']: + if data.get(key): + if isinstance(data[key], str): + job.__dict__[key] = datetime.fromisoformat(data[key]) + else: + job.__dict__[key] = data[key] + + return job class CompressionManager: @@ -54,15 +109,36 @@ class CompressionManager: TIME_PATTERN = re.compile(r'out_time_ms=(\d+)') FPS_PATTERN = re.compile(r'fps=([\d.]+)') - def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None): + def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None, cache_invalidation_callback=None): self.jobs: Dict[str, CompressionJob] = {} self.active_jobs: Dict[str, asyncio.Task] = {} self.semaphore = asyncio.Semaphore(max_concurrent) self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None self.health_checker = health_checker + self.cache_invalidation_callback = cache_invalidation_callback self._pruning_task: Optional[asyncio.Task] = None self._jobs_lock = asyncio.Lock() - self._start_periodic_pruning() + self.persistence = JobPersistence() + self._pruning_started = False + + async def initialize_persistence(self): + """Initialize the persistence layer - call this on startup""" + await self.persistence.initialize() + logger.info("Job persistence initialized") + + # Start periodic pruning (only once when persistence is initialized) + if not self._pruning_started: + self._start_periodic_pruning() + self._pruning_started = True + + async def load_jobs_from_database(self): + """Load all jobs from database into memory""" + async with self._jobs_lock: + jobs_data = await self.persistence.get_all_jobs() + for job_data in jobs_data: + job = CompressionJob.from_dict(job_data) + self.jobs[job.job_id] = job + logger.info(f"Loaded {len(jobs_data)} jobs from database") async def get_video_info(self, file_path: str) -> Dict: """Extract video duration and file size using ffprobe""" @@ -106,6 +182,7 @@ class CompressionManager: try: job.status = "processing" job.started_at = datetime.now() + await self.persistence.save_job(job.to_dict()) # Get video information info = await self.get_video_info(job.file_path) @@ -125,6 +202,29 @@ class CompressionManager: f"Duration: {job.duration_seconds}s, " f"Video Bitrate: {job.video_bitrate} kbps") + # Check disk space BEFORE starting compression + file_path = Path(job.file_path) + disk_usage = shutil.disk_usage(file_path.parent) + + # Estimate required space: temp file + original + safety margin (1.5x original) + required_bytes = (job.current_size_mb * 1024 * 1024) * 1.5 + available_bytes = disk_usage.free + + if available_bytes < required_bytes: + required_gb = required_bytes / (1024**3) + available_gb = available_bytes / (1024**3) + error_msg = ( + f"Insufficient disk space: need {required_gb:.1f}GB, " + f"have {available_gb:.1f}GB available" + ) + logger.error(f"Job {job.job_id}: {error_msg}") + raise ValueError(error_msg) + + logger.info( + f"Job {job.job_id}: Disk space check passed " + f"(need {required_bytes/(1024**3):.1f}GB, have {available_bytes/(1024**3):.1f}GB)" + ) + # Generate output filename (use original path for in-place replacement) file_path = Path(job.file_path) temp_file = file_path.parent / f"temp_{file_path.name}" @@ -148,6 +248,7 @@ class CompressionManager: # SAFE FILE REPLACEMENT WITH VALIDATION job.status = "validating" job.progress = 95.0 + await self.persistence.update_job_status(job.job_id, "validating", progress=95.0) logger.info(f"Job {job.job_id}: Starting safe file replacement process") if await self.safe_replace_file(file_path, temp_file, job): @@ -155,21 +256,31 @@ class CompressionManager: job.status = "completed" job.progress = 100.0 job.completed_at = datetime.now() + job.ffmpeg_pid = None # Clear PID on completion + await self.persistence.save_job(job.to_dict()) self.cleanup_temp_files(job) logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place") else: job.status = "failed" job.error = "File replacement failed or validation check failed" + await self.persistence.update_job_status(job.job_id, "failed", error=job.error) self.cleanup_temp_files(job) logger.error(f"Job {job.job_id} failed - file replacement unsuccessful") except asyncio.CancelledError: - job.status = "cancelled" + # Don't mark as cancelled - could be app shutdown + # Keep current status so job can be recovered on restart + job.ffmpeg_pid = None # Clear PID since process won't survive + # Don't update database status - leave it as "processing" or "validating" + # for recovery on restart self.cleanup_temp_files(job) - logger.info(f"Job {job.job_id} cancelled") + logger.info(f"Job {job.job_id} interrupted (task cancelled)") + raise # Re-raise to properly cancel the task except Exception as e: job.status = "failed" job.error = str(e) + job.ffmpeg_pid = None # Clear PID on failure + await self.persistence.update_job_status(job.job_id, "failed", error=str(e)) self.cleanup_temp_files(job) logger.error(f"Job {job.job_id} failed: {e}", exc_info=True) @@ -215,15 +326,25 @@ class CompressionManager: limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow ) + # Track PID for first pass only (second pass uses same job) + if pass_num == 1 and process.pid: + job.ffmpeg_pid = process.pid + await self.persistence.set_ffmpeg_pid(job.job_id, process.pid) + logger.info(f"Job {job.job_id}: ffmpeg PID {process.pid} tracked") + # Progress tracking stderr_output = [] - last_progress_update = datetime.now() + last_progress_update = None # Initialize as None, will be set when FFmpeg starts producing output async def read_stdout(): """Read and process stdout for progress updates""" nonlocal last_progress_update try: async for line in process.stdout: + # Initialize timer on first progress update from FFmpeg + if last_progress_update is None: + last_progress_update = datetime.now() + logger.info(f"Job {job.job_id}: FFmpeg started producing output, watchdog timer initialized") if job.status == "cancelled": process.kill() return @@ -258,6 +379,16 @@ class CompressionManager: else: remaining_sec = remaining_sec_current_pass job.eta_seconds = int(remaining_sec) + + # Periodically save progress to database (every ~5% or 30 seconds) + # Use modulo to reduce DB writes + if int(job.progress) % 5 == 0: + await self.persistence.update_job_progress( + job.job_id, + job.progress, + job.eta_seconds, + job.current_pass + ) except Exception as e: logger.error(f"Error reading stdout: {e}") @@ -288,14 +419,16 @@ class CompressionManager: process.kill() return - time_since_update = (datetime.now() - last_progress_update).total_seconds() - if time_since_update > WATCHDOG_TIMEOUT_SECONDS: - error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog." - logger.error(f"Watchdog: {error_msg}") - job.status = "failed" - job.error = error_msg - process.kill() - raise Exception(error_msg) + # Only check timeout if FFmpeg has started producing output + if last_progress_update is not None: + time_since_update = (datetime.now() - last_progress_update).total_seconds() + if time_since_update > WATCHDOG_TIMEOUT_SECONDS: + error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog." + logger.error(f"Watchdog: {error_msg}") + job.status = "failed" + job.error = error_msg + process.kill() + raise Exception(error_msg) await asyncio.sleep(WATCHDOG_CHECK_INTERVAL) @@ -452,6 +585,16 @@ class CompressionManager: except Exception as e: logger.warning(f"Failed to delete backup file (non-critical): {e}") + # Step 5: Invalidate cache for this directory to show updated file size + if self.cache_invalidation_callback: + try: + # Invalidate all cache entries containing the parent directory path + parent_name = original_path.parent.name + self.cache_invalidation_callback(parent_name) + logger.info(f"Cache invalidated for directory: {parent_name}") + except Exception as e: + logger.warning(f"Failed to invalidate cache (non-critical): {e}") + return True except Exception as e: @@ -560,6 +703,9 @@ class CompressionManager: job = CompressionJob(file_path, reduce_percentage) self.jobs[job.job_id] = job + # Save initial job to database + await self.persistence.save_job(job.to_dict()) + # Start compression in background task = asyncio.create_task(self.compress_video(job)) self.active_jobs[job.job_id] = task @@ -570,9 +716,11 @@ class CompressionManager: return job.job_id async def cancel_job(self, job_id: str): - """Cancel a running compression job""" + """Cancel a running compression job (user-initiated)""" if job_id in self.jobs: self.jobs[job_id].status = "cancelled" + # Save cancelled status to database (user-initiated cancellation) + await self.persistence.update_job_status(job_id, "cancelled") if job_id in self.active_jobs: self.active_jobs[job_id].cancel() diff --git a/backend/job_persistence.py b/backend/job_persistence.py new file mode 100644 index 0000000..1d69faf --- /dev/null +++ b/backend/job_persistence.py @@ -0,0 +1,243 @@ +import sqlite3 +import asyncio +import logging +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Any +from contextlib import asynccontextmanager +import aiosqlite +import os + +logger = logging.getLogger(__name__) + +# Database configuration from environment +DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db') +JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7')) + + +class JobPersistence: + """SQLite-based persistence layer for compression jobs""" + + def __init__(self, db_path: str = DATABASE_PATH): + self.db_path = db_path + self._initialized = False + self._lock = asyncio.Lock() + + async def initialize(self): + """Initialize database schema""" + if self._initialized: + return + + async with self._lock: + if self._initialized: + return + + # Ensure directory exists + db_dir = Path(self.db_path).parent + db_dir.mkdir(parents=True, exist_ok=True) + + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + CREATE TABLE IF NOT EXISTS compression_jobs ( + job_id TEXT PRIMARY KEY, + file_path TEXT NOT NULL, + reduce_percentage INTEGER NOT NULL, + status TEXT NOT NULL, + progress REAL DEFAULT 0.0, + current_pass INTEGER DEFAULT 0, + current_size_mb REAL, + target_size_mb REAL, + video_bitrate INTEGER, + duration_seconds REAL, + eta_seconds INTEGER, + created_at TIMESTAMP NOT NULL, + started_at TIMESTAMP, + completed_at TIMESTAMP, + output_file TEXT, + error TEXT, + ffmpeg_pid INTEGER + ) + """) + + # Create indexes for performance + await db.execute(""" + CREATE INDEX IF NOT EXISTS idx_status + ON compression_jobs(status) + """) + + await db.execute(""" + CREATE INDEX IF NOT EXISTS idx_created_at + ON compression_jobs(created_at) + """) + + await db.commit() + + self._initialized = True + logger.info(f"Job persistence initialized at {self.db_path}") + + async def save_job(self, job_data: Dict[str, Any]): + """Insert or update a job in the database""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + # Convert datetime objects to ISO format strings + job_data_copy = job_data.copy() + for key in ['created_at', 'started_at', 'completed_at']: + if key in job_data_copy and job_data_copy[key]: + if isinstance(job_data_copy[key], datetime): + job_data_copy[key] = job_data_copy[key].isoformat() + + await db.execute(""" + INSERT OR REPLACE INTO compression_jobs ( + job_id, file_path, reduce_percentage, status, progress, + current_pass, current_size_mb, target_size_mb, video_bitrate, + duration_seconds, eta_seconds, created_at, started_at, + completed_at, output_file, error, ffmpeg_pid + ) VALUES ( + :job_id, :file_path, :reduce_percentage, :status, :progress, + :current_pass, :current_size_mb, :target_size_mb, :video_bitrate, + :duration_seconds, :eta_seconds, :created_at, :started_at, + :completed_at, :output_file, :error, :ffmpeg_pid + ) + """, job_data_copy) + await db.commit() + + async def update_job_status(self, job_id: str, status: str, + progress: Optional[float] = None, + error: Optional[str] = None): + """Quick update for job status and progress""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + updates = {"status": status} + if progress is not None: + updates["progress"] = progress + if error is not None: + updates["error"] = error + + # Set completed_at for terminal states + if status in ['completed', 'failed', 'cancelled']: + updates["completed_at"] = datetime.now().isoformat() + + set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()]) + query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id" + + await db.execute(query, {**updates, "job_id": job_id}) + await db.commit() + + async def update_job_progress(self, job_id: str, progress: float, + eta_seconds: Optional[int] = None, + current_pass: Optional[int] = None): + """Update job progress and ETA""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + updates = {"progress": progress} + if eta_seconds is not None: + updates["eta_seconds"] = eta_seconds + if current_pass is not None: + updates["current_pass"] = current_pass + + set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()]) + query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id" + + await db.execute(query, {**updates, "job_id": job_id}) + await db.commit() + + async def set_ffmpeg_pid(self, job_id: str, pid: int): + """Update the ffmpeg PID for a job""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + "UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?", + (pid, job_id) + ) + await db.commit() + + async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: + """Get a single job by ID""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM compression_jobs WHERE job_id = ?", + (job_id,) + ) as cursor: + row = await cursor.fetchone() + if row: + return dict(row) + return None + + async def get_all_jobs(self) -> List[Dict[str, Any]]: + """Get all jobs from the database""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM compression_jobs ORDER BY created_at DESC" + ) as cursor: + rows = await cursor.fetchall() + return [dict(row) for row in rows] + + async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]: + """Get all jobs with a specific status""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC", + (status,) + ) as cursor: + rows = await cursor.fetchall() + return [dict(row) for row in rows] + + async def get_interrupted_jobs(self) -> List[Dict[str, Any]]: + """Get jobs that were interrupted (status=processing/validating)""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + """SELECT * FROM compression_jobs + WHERE status IN ('processing', 'validating') + ORDER BY created_at ASC""" + ) as cursor: + rows = await cursor.fetchall() + return [dict(row) for row in rows] + + async def delete_job(self, job_id: str): + """Delete a job from the database""" + await self.initialize() + + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + "DELETE FROM compression_jobs WHERE job_id = ?", + (job_id,) + ) + await db.commit() + + async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS): + """Delete completed/cancelled jobs older than retention period""" + await self.initialize() + + cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60) + cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat() + + async with aiosqlite.connect(self.db_path) as db: + result = await db.execute(""" + DELETE FROM compression_jobs + WHERE status IN ('completed', 'cancelled') + AND completed_at < ? + """, (cutoff_iso,)) + + deleted_count = result.rowcount + await db.commit() + + if deleted_count > 0: + logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)") + + return deleted_count diff --git a/backend/main.py b/backend/main.py index 0ac0e40..4472902 100644 --- a/backend/main.py +++ b/backend/main.py @@ -16,6 +16,11 @@ import mimetypes from sse_starlette.sse import EventSourceResponse from compression import CompressionManager from filesystem_health import FilesystemHealthChecker +from process_utils import ( + kill_process_safely, + cleanup_temp_files, + find_orphaned_ffmpeg_processes +) # Configure logging logger = logging.getLogger(__name__) @@ -27,6 +32,10 @@ STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for video streaming SSE_UPDATE_INTERVAL = 0.5 # Update every 500ms CACHE_TTL_SECONDS = 60 # Cache directory listings for 60 seconds +# Job recovery configuration +AUTO_RESTART_INTERRUPTED_JOBS = os.getenv('AUTO_RESTART_INTERRUPTED_JOBS', 'true').lower() == 'true' +CLEANUP_ORPHANED_PROCESSES = os.getenv('CLEANUP_ORPHANED_PROCESSES', 'true').lower() == 'true' + # Base path for footages FOOTAGES_PATH = Path("/footages") @@ -51,16 +60,26 @@ class SimpleCache: def clear(self): self.cache.clear() + def invalidate(self, pattern: str = None): + """Invalidate cache entries matching pattern or all if pattern is None""" + if pattern is None: + self.cache.clear() + else: + keys_to_delete = [k for k in self.cache.keys() if pattern in k] + for key in keys_to_delete: + del self.cache[key] + directory_cache = SimpleCache() # Initialize filesystem health checker first (compression manager needs it) filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH) -# Initialize compression manager with health checker +# Initialize compression manager with health checker and cache callback compression_manager = CompressionManager( max_concurrent=1, allowed_base_path=FOOTAGES_PATH, - health_checker=filesystem_health_checker + health_checker=filesystem_health_checker, + cache_invalidation_callback=directory_cache.invalidate ) # CORS middleware for frontend communication @@ -96,6 +115,116 @@ async def get_file_info(file_path: Path) -> Dict: } +async def recover_compression_jobs(): + """ + Recover compression jobs from database after app restart/crash + + This function: + 1. Loads all jobs from database + 2. Handles interrupted jobs (processing/validating) + 3. Cleans up orphaned ffmpeg processes + 4. Restarts pending jobs automatically + """ + logger.info("=" * 60) + logger.info("Starting compression job recovery...") + logger.info("=" * 60) + + # Initialize persistence layer + await compression_manager.initialize_persistence() + + # Load all jobs from database + await compression_manager.load_jobs_from_database() + + # Get interrupted jobs (were processing when app crashed) + interrupted_jobs = await compression_manager.persistence.get_interrupted_jobs() + + if interrupted_jobs: + logger.info(f"Found {len(interrupted_jobs)} interrupted jobs") + + for job_data in interrupted_jobs: + job_id = job_data['job_id'] + file_path = job_data['file_path'] + ffmpeg_pid = job_data.get('ffmpeg_pid') + + logger.info(f"Processing interrupted job {job_id}: {Path(file_path).name}") + + # Kill orphaned ffmpeg process if it exists and is verified as ours + if ffmpeg_pid and CLEANUP_ORPHANED_PROCESSES: + killed = kill_process_safely(ffmpeg_pid, file_path, timeout=10) + if killed: + logger.info(f" ✓ Killed orphaned ffmpeg process PID {ffmpeg_pid}") + else: + logger.warning(f" ⚠ Could not verify/kill PID {ffmpeg_pid}") + + # Clean up temp files + cleaned = cleanup_temp_files(file_path) + if cleaned > 0: + logger.info(f" ✓ Cleaned up {cleaned} temp file(s)") + + # Decide what to do with interrupted job + if AUTO_RESTART_INTERRUPTED_JOBS: + # Restart the job + logger.info(f" ⟳ Restarting interrupted job...") + await compression_manager.persistence.update_job_status( + job_id, "pending", progress=0.0 + ) + # Update in-memory job status + if job_id in compression_manager.jobs: + compression_manager.jobs[job_id].status = "pending" + compression_manager.jobs[job_id].progress = 0.0 + compression_manager.jobs[job_id].ffmpeg_pid = None + else: + # Mark as failed + logger.info(f" ✗ Marking as failed (auto-restart disabled)") + await compression_manager.persistence.update_job_status( + job_id, "failed", + error="Job interrupted by application restart/crash" + ) + # Update in-memory job status + if job_id in compression_manager.jobs: + compression_manager.jobs[job_id].status = "failed" + compression_manager.jobs[job_id].error = "Job interrupted by application restart/crash" + compression_manager.jobs[job_id].ffmpeg_pid = None + else: + logger.info("No interrupted jobs found") + + # Get pending jobs + pending_jobs = await compression_manager.persistence.get_jobs_by_status("pending") + + if pending_jobs: + logger.info(f"Found {len(pending_jobs)} pending jobs - will be processed automatically") + + # Restart pending jobs + for job_data in pending_jobs: + job_id = job_data['job_id'] + if job_id in compression_manager.jobs: + job = compression_manager.jobs[job_id] + # Only restart if not already in active jobs + if job_id not in compression_manager.active_jobs: + logger.info(f" ⟳ Restarting pending job: {Path(job.file_path).name}") + task = asyncio.create_task(compression_manager.compress_video(job)) + compression_manager.active_jobs[job_id] = task + task.add_done_callback(lambda t: compression_manager.active_jobs.pop(job_id, None)) + else: + logger.info("No pending jobs to restart") + + # Optional: Find any other orphaned ffmpeg processes in our path + if CLEANUP_ORPHANED_PROCESSES: + orphaned = find_orphaned_ffmpeg_processes(FOOTAGES_PATH) + if orphaned: + logger.warning(f"Found {len(orphaned)} untracked ffmpeg processes in our path:") + for proc in orphaned: + logger.warning(f" PID {proc['pid']}: {proc['cmdline'][:100]}...") + logger.warning(f" ⚠ NOT killing (not in our database) - manual review recommended") + + # Clean up old completed/cancelled jobs + deleted = await compression_manager.persistence.cleanup_old_jobs() + + logger.info("=" * 60) + logger.info("Compression job recovery complete") + logger.info("=" * 60) + + @app.on_event("startup") async def startup_event(): """Run startup tasks""" @@ -112,6 +241,10 @@ async def startup_event(): # Start background monitoring filesystem_health_checker.start_monitoring() + + # Recover compression jobs from database + await recover_compression_jobs() + logger.info("Application startup complete") diff --git a/backend/process_utils.py b/backend/process_utils.py new file mode 100644 index 0000000..a9bfc2f --- /dev/null +++ b/backend/process_utils.py @@ -0,0 +1,211 @@ +import psutil +import logging +from pathlib import Path +from typing import Optional, List + +logger = logging.getLogger(__name__) + + +def process_exists(pid: int) -> bool: + """Check if a process with given PID exists""" + try: + return psutil.pid_exists(pid) + except Exception as e: + logger.error(f"Error checking if PID {pid} exists: {e}") + return False + + +def get_process_info(pid: int) -> Optional[dict]: + """Get detailed information about a process""" + try: + proc = psutil.Process(pid) + return { + 'pid': pid, + 'name': proc.name(), + 'cmdline': proc.cmdline(), + 'cwd': proc.cwd(), + 'status': proc.status(), + 'create_time': proc.create_time() + } + except psutil.NoSuchProcess: + return None + except Exception as e: + logger.error(f"Error getting process info for PID {pid}: {e}") + return None + + +def is_our_ffmpeg_process(pid: int, expected_file_path: str) -> bool: + """ + Verify if a process is our ffmpeg process by checking: + 1. Process name is ffmpeg + 2. Command line contains our input file path + 3. Working directory is accessible + + Args: + pid: Process ID to check + expected_file_path: The file path this ffmpeg should be processing + + Returns: + True if verified as our ffmpeg process, False otherwise + """ + try: + proc = psutil.Process(pid) + + # Check 1: Process name must be ffmpeg + process_name = proc.name().lower() + if 'ffmpeg' not in process_name: + logger.debug(f"PID {pid}: Not an ffmpeg process (name: {process_name})") + return False + + # Check 2: Command line must contain our file path + cmdline = proc.cmdline() + cmdline_str = ' '.join(cmdline) + + # The file path should appear in the command line + if expected_file_path not in cmdline_str: + logger.debug(f"PID {pid}: File path '{expected_file_path}' not in command line") + return False + + # Check 3: Should have -i flag (input file) and -progress flag (our marker) + has_input_flag = '-i' in cmdline + has_progress_flag = '-progress' in cmdline or 'progress' in cmdline_str + + if not (has_input_flag and has_progress_flag): + logger.debug(f"PID {pid}: Missing expected ffmpeg flags") + return False + + logger.info(f"PID {pid}: Verified as our ffmpeg process for '{expected_file_path}'") + return True + + except psutil.NoSuchProcess: + logger.debug(f"PID {pid}: Process no longer exists") + return False + except psutil.AccessDenied: + logger.warning(f"PID {pid}: Access denied - cannot verify (won't kill)") + return False + except Exception as e: + logger.error(f"Error verifying PID {pid}: {e}") + return False + + +def kill_process_safely(pid: int, expected_file_path: str, timeout: int = 10) -> bool: + """ + Safely kill a process after verification + + Args: + pid: Process ID to kill + expected_file_path: Expected file path to verify against + timeout: Seconds to wait for graceful termination + + Returns: + True if process was killed successfully, False otherwise + """ + # Double-check verification before killing + if not is_our_ffmpeg_process(pid, expected_file_path): + logger.warning(f"PID {pid}: Failed verification - NOT killing for safety") + return False + + try: + proc = psutil.Process(pid) + + # Try graceful termination first (SIGTERM) + logger.info(f"PID {pid}: Sending SIGTERM for graceful shutdown") + proc.terminate() + + try: + # Wait for process to terminate + proc.wait(timeout=timeout) + logger.info(f"PID {pid}: Process terminated gracefully") + return True + except psutil.TimeoutExpired: + # Force kill if graceful shutdown failed (SIGKILL) + logger.warning(f"PID {pid}: Graceful shutdown timed out, sending SIGKILL") + proc.kill() + proc.wait(timeout=5) + logger.info(f"PID {pid}: Process force killed") + return True + + except psutil.NoSuchProcess: + logger.info(f"PID {pid}: Process already terminated") + return True + except Exception as e: + logger.error(f"Error killing PID {pid}: {e}") + return False + + +def cleanup_temp_files(file_path: str) -> int: + """ + Clean up temporary files associated with a compression job + + Args: + file_path: Original video file path + + Returns: + Number of files cleaned up + """ + cleaned_count = 0 + file_path_obj = Path(file_path) + parent_dir = file_path_obj.parent + + try: + # Clean up temp file: temp_ + temp_file = parent_dir / f"temp_{file_path_obj.name}" + if temp_file.exists(): + temp_file.unlink() + logger.info(f"Cleaned up temp file: {temp_file.name}") + cleaned_count += 1 + + # Clean up ffmpeg pass log file + log_file = parent_dir / "ffmpeg2pass-0.log" + if log_file.exists(): + log_file.unlink() + logger.info(f"Cleaned up ffmpeg log: {log_file.name}") + cleaned_count += 1 + + # Also check for mbtree file (created by x264) + mbtree_file = parent_dir / "ffmpeg2pass-0.log.mbtree" + if mbtree_file.exists(): + mbtree_file.unlink() + logger.info(f"Cleaned up mbtree file: {mbtree_file.name}") + cleaned_count += 1 + + except Exception as e: + logger.error(f"Error cleaning up temp files for {file_path}: {e}") + + return cleaned_count + + +def find_orphaned_ffmpeg_processes(allowed_base_path: Path) -> List[dict]: + """ + Find all ffmpeg processes that are working on files within our base path + This helps identify potential orphaned processes + + Args: + allowed_base_path: Base path where our videos are stored + + Returns: + List of process info dicts + """ + orphaned = [] + allowed_path_str = str(allowed_base_path.resolve()) + + try: + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + try: + if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower(): + cmdline = proc.info.get('cmdline', []) + cmdline_str = ' '.join(cmdline) if cmdline else '' + + # Check if this ffmpeg is working on a file in our path + if allowed_path_str in cmdline_str: + orphaned.append({ + 'pid': proc.info['pid'], + 'cmdline': cmdline_str, + 'name': proc.info['name'] + }) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + except Exception as e: + logger.error(f"Error finding orphaned processes: {e}") + + return orphaned diff --git a/backend/requirements.txt b/backend/requirements.txt index 3bb44dc..0e54116 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,3 +3,5 @@ uvicorn[standard]==0.27.0 python-multipart==0.0.6 aiofiles==23.2.1 sse-starlette==1.6.5 +aiosqlite==0.19.0 +psutil==5.9.8