import asyncio import subprocess import uuid import os import re import logging import shutil from pathlib import Path from datetime import datetime from typing import Dict, Optional from job_persistence import JobPersistence # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration constants MAX_CONCURRENT_JOBS = 2 MAX_STORED_JOBS = 100 MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue PRUNE_INTERVAL_SECONDS = 300 # 5 minutes WATCHDOG_TIMEOUT_SECONDS = int(os.getenv('COMPRESSION_WATCHDOG_TIMEOUT', '900')) # Default: 15 minutes WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB STDERR_READ_LIMIT = 8 * 1024 * 1024 # 8MB buffer limit for stderr MAX_STDERR_LINES = 1000 # Keep only last 1000 lines of stderr DEFAULT_AUDIO_BITRATE = 128 # kbps MIN_VIDEO_BITRATE = 100 # kbps class CompressionJob: def __init__(self, file_path: str, reduce_percentage: int, job_id: Optional[str] = None): self.job_id = job_id or str(uuid.uuid4()) self.file_path = file_path self.reduce_percentage = reduce_percentage self.status = "pending" # pending, processing, validating, completed, failed, cancelled self.progress = 0.0 self.eta_seconds = None self.current_pass = 0 # 0=not started, 1=first pass, 2=second pass self.created_at = datetime.now() self.started_at = None self.completed_at = None self.error = None self.output_file = None self.current_size_mb = None self.target_size_mb = None self.video_bitrate = None self.duration_seconds = None self.ffmpeg_pid = None # Track ffmpeg process PID def to_dict(self) -> Dict: """Convert job to dictionary for database persistence""" return { 'job_id': self.job_id, 'file_path': self.file_path, 'reduce_percentage': self.reduce_percentage, 'status': self.status, 'progress': self.progress, 'eta_seconds': self.eta_seconds, 'current_pass': self.current_pass, 'created_at': self.created_at, 'started_at': self.started_at, 'completed_at': self.completed_at, 'error': self.error, 'output_file': self.output_file, 'current_size_mb': self.current_size_mb, 'target_size_mb': self.target_size_mb, 'video_bitrate': self.video_bitrate, 'duration_seconds': self.duration_seconds, 'ffmpeg_pid': self.ffmpeg_pid } @classmethod def from_dict(cls, data: Dict) -> 'CompressionJob': """Create job from database dictionary""" job = cls( file_path=data['file_path'], reduce_percentage=data['reduce_percentage'], job_id=data['job_id'] ) job.status = data.get('status', 'pending') job.progress = data.get('progress', 0.0) job.eta_seconds = data.get('eta_seconds') job.current_pass = data.get('current_pass', 0) job.error = data.get('error') job.output_file = data.get('output_file') job.current_size_mb = data.get('current_size_mb') job.target_size_mb = data.get('target_size_mb') job.video_bitrate = data.get('video_bitrate') job.duration_seconds = data.get('duration_seconds') job.ffmpeg_pid = data.get('ffmpeg_pid') # Parse datetime strings back to datetime objects for key in ['created_at', 'started_at', 'completed_at']: if data.get(key): if isinstance(data[key], str): job.__dict__[key] = datetime.fromisoformat(data[key]) else: job.__dict__[key] = data[key] return job class CompressionManager: # Compile regex patterns once at class level for performance TIME_PATTERN = re.compile(r'out_time_ms=(\d+)') FPS_PATTERN = re.compile(r'fps=([\d.]+)') def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None, cache_invalidation_callback=None): self.jobs: Dict[str, CompressionJob] = {} self.active_jobs: Dict[str, asyncio.Task] = {} self.semaphore = asyncio.Semaphore(max_concurrent) self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None self.health_checker = health_checker self.cache_invalidation_callback = cache_invalidation_callback self._pruning_task: Optional[asyncio.Task] = None self._jobs_lock = asyncio.Lock() self.persistence = JobPersistence() self._pruning_started = False async def initialize_persistence(self): """Initialize the persistence layer - call this on startup""" await self.persistence.initialize() logger.info("Job persistence initialized") # Start periodic pruning (only once when persistence is initialized) if not self._pruning_started: self._start_periodic_pruning() self._pruning_started = True async def load_jobs_from_database(self): """Load all jobs from database into memory""" async with self._jobs_lock: jobs_data = await self.persistence.get_all_jobs() for job_data in jobs_data: job = CompressionJob.from_dict(job_data) self.jobs[job.job_id] = job logger.info(f"Loaded {len(jobs_data)} jobs from database") async def get_video_info(self, file_path: str) -> Dict: """Extract video duration and file size using ffprobe""" # Get file size in MB file_size_mb = os.path.getsize(file_path) / (1024 * 1024) # Get duration using ffprobe cmd = [ 'ffprobe', '-i', file_path, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=p=0' ] result = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await result.communicate() duration = float(stdout.decode().strip()) return { 'size_mb': file_size_mb, 'duration_seconds': duration } def calculate_bitrates(self, current_size_mb: float, target_size_mb: float, duration_seconds: float, audio_bitrate: int = DEFAULT_AUDIO_BITRATE) -> int: """Calculate video bitrate based on target size""" # Total bitrate in kbps total_bitrate = (target_size_mb * 8192) / duration_seconds # Video bitrate = total - audio video_bitrate = int(total_bitrate - audio_bitrate) return max(video_bitrate, MIN_VIDEO_BITRATE) async def compress_video(self, job: CompressionJob): """Main compression function - two-pass encoding""" async with self.semaphore: try: job.status = "processing" job.started_at = datetime.now() await self.persistence.save_job(job.to_dict()) # Get video information info = await self.get_video_info(job.file_path) job.current_size_mb = info['size_mb'] job.duration_seconds = info['duration_seconds'] # Calculate target size and bitrate job.target_size_mb = job.current_size_mb * (1 - job.reduce_percentage / 100) job.video_bitrate = self.calculate_bitrates( job.current_size_mb, job.target_size_mb, job.duration_seconds ) logger.info(f"Job {job.job_id}: Current Size: {job.current_size_mb:.2f} MB, " f"Target Size: {job.target_size_mb:.2f} MB, " f"Duration: {job.duration_seconds}s, " f"Video Bitrate: {job.video_bitrate} kbps") # Check disk space BEFORE starting compression file_path = Path(job.file_path) disk_usage = shutil.disk_usage(file_path.parent) # Estimate required space: temp file + original + safety margin (1.5x original) required_bytes = (job.current_size_mb * 1024 * 1024) * 1.5 available_bytes = disk_usage.free if available_bytes < required_bytes: required_gb = required_bytes / (1024**3) available_gb = available_bytes / (1024**3) error_msg = ( f"Insufficient disk space: need {required_gb:.1f}GB, " f"have {available_gb:.1f}GB available" ) logger.error(f"Job {job.job_id}: {error_msg}") raise ValueError(error_msg) logger.info( f"Job {job.job_id}: Disk space check passed " f"(need {required_bytes/(1024**3):.1f}GB, have {available_bytes/(1024**3):.1f}GB)" ) # Generate output filename (use original path for in-place replacement) file_path = Path(job.file_path) temp_file = file_path.parent / f"temp_{file_path.name}" # PASS 1: Analysis job.current_pass = 1 await self.run_ffmpeg_pass1(job, temp_file) if job.status == "cancelled": self.cleanup_temp_files(job) return # PASS 2: Encoding job.current_pass = 2 await self.run_ffmpeg_pass2(job, temp_file) if job.status == "cancelled": self.cleanup_temp_files(job) return # SAFE FILE REPLACEMENT WITH VALIDATION job.status = "validating" job.progress = 95.0 await self.persistence.update_job_status(job.job_id, "validating", progress=95.0) logger.info(f"Job {job.job_id}: Starting safe file replacement process") if await self.safe_replace_file(file_path, temp_file, job): job.output_file = str(file_path) # Output is same as original path job.status = "completed" job.progress = 100.0 job.completed_at = datetime.now() job.ffmpeg_pid = None # Clear PID on completion await self.persistence.save_job(job.to_dict()) self.cleanup_temp_files(job) logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place") else: job.status = "failed" job.error = "File replacement failed or validation check failed" await self.persistence.update_job_status(job.job_id, "failed", error=job.error) self.cleanup_temp_files(job) logger.error(f"Job {job.job_id} failed - file replacement unsuccessful") except asyncio.CancelledError: # Don't mark as cancelled - could be app shutdown # Keep current status so job can be recovered on restart job.ffmpeg_pid = None # Clear PID since process won't survive # Don't update database status - leave it as "processing" or "validating" # for recovery on restart self.cleanup_temp_files(job) logger.info(f"Job {job.job_id} interrupted (task cancelled)") raise # Re-raise to properly cancel the task except Exception as e: job.status = "failed" job.error = str(e) job.ffmpeg_pid = None # Clear PID on failure await self.persistence.update_job_status(job.job_id, "failed", error=str(e)) self.cleanup_temp_files(job) logger.error(f"Job {job.job_id} failed: {e}", exc_info=True) async def run_ffmpeg_pass1(self, job: CompressionJob, output_file: Path): """First pass: Analysis""" cmd = [ 'ffmpeg', '-y', '-i', job.file_path, '-c:v', 'libx264', '-b:v', f'{job.video_bitrate}k', '-pass', '1', '-an', # No audio in first pass '-f', 'null', '/dev/null', '-progress', 'pipe:1' ] await self.run_ffmpeg_with_progress(job, cmd, pass_num=1) async def run_ffmpeg_pass2(self, job: CompressionJob, output_file: Path): """Second pass: Encoding""" cmd = [ 'ffmpeg', '-i', job.file_path, '-c:v', 'libx264', '-b:v', f'{job.video_bitrate}k', '-pass', '2', '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', # Important for web streaming str(output_file), '-progress', 'pipe:1' ] await self.run_ffmpeg_with_progress(job, cmd, pass_num=2) async def run_ffmpeg_with_progress(self, job: CompressionJob, cmd: list, pass_num: int): """Run ffmpeg and track progress with concurrent stdout/stderr reading to prevent deadlock""" process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow ) # Track PID for first pass only (second pass uses same job) if pass_num == 1 and process.pid: job.ffmpeg_pid = process.pid await self.persistence.set_ffmpeg_pid(job.job_id, process.pid) logger.info(f"Job {job.job_id}: ffmpeg PID {process.pid} tracked") # Progress tracking stderr_output = [] last_progress_update = None # Initialize as None, will be set when FFmpeg starts producing output async def read_stdout(): """Read and process stdout for progress updates""" nonlocal last_progress_update try: async for line in process.stdout: # Initialize timer on first progress update from FFmpeg if last_progress_update is None: last_progress_update = datetime.now() logger.info(f"Job {job.job_id}: FFmpeg started producing output, watchdog timer initialized") if job.status == "cancelled": process.kill() return line_str = line.decode('utf-8', errors='ignore') # Extract time using class-level compiled pattern time_match = self.TIME_PATTERN.search(line_str) if time_match: current_time_ms = int(time_match.group(1)) current_time_sec = current_time_ms / 1_000_000 # Calculate progress for this pass (0-50% or 50-100%) pass_progress = (current_time_sec / job.duration_seconds) * 50 if pass_num == 1: job.progress = min(pass_progress, 50) else: job.progress = min(50 + pass_progress, 95) last_progress_update = datetime.now() # Extract FPS for ETA calculation using class-level compiled pattern fps_match = self.FPS_PATTERN.search(line_str) if fps_match: fps = float(fps_match.group(1)) if fps > 0: # Calculate remaining time for current pass remaining_sec_current_pass = (job.duration_seconds - current_time_sec) # If in pass 1, add full duration for pass 2 if pass_num == 1: remaining_sec = remaining_sec_current_pass + job.duration_seconds else: remaining_sec = remaining_sec_current_pass job.eta_seconds = int(remaining_sec) # Periodically save progress to database (every ~5% or 30 seconds) # Use modulo to reduce DB writes if int(job.progress) % 5 == 0: await self.persistence.update_job_progress( job.job_id, job.progress, job.eta_seconds, job.current_pass ) except Exception as e: logger.error(f"Error reading stdout: {e}") async def read_stderr(): """Read stderr concurrently to prevent pipe buffer deadlock""" try: async for line in process.stderr: line_str = line.decode('utf-8', errors='ignore') stderr_output.append(line_str) # Truncate stderr_output to keep only last MAX_STDERR_LINES lines if len(stderr_output) > MAX_STDERR_LINES: stderr_output[:] = stderr_output[-MAX_STDERR_LINES:] # Log important stderr messages if 'error' in line_str.lower(): logger.error(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}") elif 'warning' in line_str.lower(): logger.warning(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}") except Exception as e: logger.error(f"Error reading stderr: {e}") async def watchdog(): """Monitor for stuck jobs - kills process if no progress""" nonlocal last_progress_update while process.returncode is None: if job.status == "cancelled": process.kill() return # Only check timeout if FFmpeg has started producing output if last_progress_update is not None: time_since_update = (datetime.now() - last_progress_update).total_seconds() if time_since_update > WATCHDOG_TIMEOUT_SECONDS: error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog." logger.error(f"Watchdog: {error_msg}") job.status = "failed" job.error = error_msg process.kill() raise Exception(error_msg) await asyncio.sleep(WATCHDOG_CHECK_INTERVAL) # Run stdout, stderr readers and watchdog concurrently to prevent deadlock try: await asyncio.gather( read_stdout(), read_stderr(), watchdog(), return_exceptions=False ) except Exception as e: process.kill() raise await process.wait() if process.returncode != 0: stderr_text = ''.join(stderr_output[-50:]) # Last 50 lines of stderr raise Exception(f"FFmpeg pass {pass_num} failed with code {process.returncode}: {stderr_text}") async def validate_video(self, file_path: Path) -> bool: """Validate compressed video is not corrupted""" cmd = [ 'ffmpeg', '-v', 'error', '-i', str(file_path), '-f', 'null', '-' ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) _, stderr = await process.communicate() # Check for actual errors (not just warnings) # FFmpeg with -v error should only output actual errors stderr_text = stderr.decode('utf-8', errors='ignore').strip() if stderr_text: # Log the validation error for debugging logger.error(f"Video validation failed: {stderr_text}") return False # Also check return code return process.returncode == 0 async def validate_video_enhanced(self, original_path: Path, compressed_path: Path) -> bool: """Enhanced validation with size and duration checks before file replacement""" # 1. Basic ffmpeg decode validation if not await self.validate_video(compressed_path): logger.error(f"Enhanced validation failed: FFmpeg decode check failed") return False # 2. File size sanity check (minimum 1KB) try: compressed_size = os.path.getsize(str(compressed_path)) if compressed_size < 1024: logger.error(f"Enhanced validation failed: Compressed file too small ({compressed_size} bytes)") return False logger.info(f"Enhanced validation: Size check passed ({compressed_size} bytes)") except Exception as e: logger.error(f"Enhanced validation failed: Could not check file size: {e}") return False # 3. Duration verification (optional but recommended) try: original_info = await self.get_video_info(str(original_path)) compressed_info = await self.get_video_info(str(compressed_path)) duration_diff = abs(original_info['duration_seconds'] - compressed_info['duration_seconds']) # Allow up to 1 second difference due to rounding if duration_diff > 1.0: logger.error(f"Enhanced validation failed: Duration mismatch - " f"original: {original_info['duration_seconds']:.2f}s vs " f"compressed: {compressed_info['duration_seconds']:.2f}s " f"(diff: {duration_diff:.2f}s)") return False logger.info(f"Enhanced validation: Duration check passed " f"(original: {original_info['duration_seconds']:.2f}s, " f"compressed: {compressed_info['duration_seconds']:.2f}s)") except Exception as e: logger.error(f"Enhanced validation failed: Could not verify duration: {e}") return False logger.info(f"Enhanced validation: All checks passed for {compressed_path.name}") return True async def safe_replace_file(self, original_path: Path, temp_file: Path, job: CompressionJob) -> bool: """Safely replace original file with compressed version with rollback capability""" backup_path = original_path.parent / f"{original_path.name}.backup" try: # Enhanced validation BEFORE any file operations logger.info(f"Validating compressed file before replacement: {temp_file.name}") if not await self.validate_video_enhanced(original_path, temp_file): logger.error(f"Safe replace failed: Validation check failed") return False # Step 1: Create backup of original file logger.info(f"Creating backup: {original_path.name} -> {backup_path.name}") try: os.replace(str(original_path), str(backup_path)) except Exception as e: logger.error(f"Safe replace failed: Could not create backup: {e}") return False # Step 2: Replace original with compressed file (atomic operation) logger.info(f"Replacing original file with compressed version: {temp_file.name}") try: os.replace(str(temp_file), str(original_path)) except Exception as e: logger.error(f"Safe replace failed: Could not move compressed file. Attempting rollback...") # Rollback: restore original from backup try: if backup_path.exists(): os.replace(str(backup_path), str(original_path)) logger.info(f"Rollback successful: Original file restored") except Exception as rollback_error: logger.error(f"Rollback failed: {rollback_error}") return False # Step 3: Verify final file is accessible logger.info(f"Verifying replaced file is accessible: {original_path.name}") try: if not original_path.exists() or not original_path.is_file(): logger.error(f"Safe replace failed: Replaced file not accessible. Attempting rollback...") # Rollback: restore original from backup try: if backup_path.exists(): os.replace(str(backup_path), str(original_path)) logger.info(f"Rollback successful: Original file restored") except Exception as rollback_error: logger.error(f"Rollback failed: {rollback_error}") return False # Get final file size final_size = os.path.getsize(str(original_path)) logger.info(f"File replacement successful. Final size: {final_size / (1024*1024):.2f} MB") except Exception as e: logger.error(f"Safe replace failed: Could not verify final file: {e}") return False # Step 4: Delete backup file only after successful verification logger.info(f"Deleting backup file: {backup_path.name}") try: if backup_path.exists(): backup_path.unlink() logger.info(f"Backup file deleted successfully") except Exception as e: logger.warning(f"Failed to delete backup file (non-critical): {e}") # Step 5: Invalidate cache for this directory to show updated file size if self.cache_invalidation_callback: try: # Invalidate all cache entries containing the parent directory path parent_name = original_path.parent.name self.cache_invalidation_callback(parent_name) logger.info(f"Cache invalidated for directory: {parent_name}") except Exception as e: logger.warning(f"Failed to invalidate cache (non-critical): {e}") return True except Exception as e: logger.error(f"Safe replace encountered unexpected error: {e}", exc_info=True) # Attempt rollback on any unexpected error try: if backup_path.exists(): os.replace(str(backup_path), str(original_path)) logger.info(f"Rollback successful: Original file restored after unexpected error") except Exception as rollback_error: logger.error(f"Rollback failed: {rollback_error}") return False def cleanup_temp_files(self, job: CompressionJob): """Clean up temporary files""" file_path = Path(job.file_path) temp_file = file_path.parent / f"temp_{file_path.name}" # Remove temp file try: if temp_file.exists(): temp_file.unlink() except Exception as e: logger.warning(f"Failed to remove temp file: {e}") # Remove ffmpeg pass log files in same directory as source try: log_file = file_path.parent / "ffmpeg2pass-0.log" if log_file.exists(): log_file.unlink() except Exception as e: logger.warning(f"Failed to remove log file: {e}") def _start_periodic_pruning(self): """Start background task for periodic job pruning""" async def prune_periodically(): while True: await asyncio.sleep(PRUNE_INTERVAL_SECONDS) self._prune_old_jobs() self._pruning_task = asyncio.create_task(prune_periodically()) def _prune_old_jobs(self, max_jobs: int = MAX_STORED_JOBS): """Remove oldest completed/cancelled jobs if total exceeds max_jobs. Failed jobs are kept visible.""" if len(self.jobs) <= max_jobs: return # Get inactive completed/cancelled jobs sorted by time (exclude failed jobs) inactive = [ (jid, j) for jid, j in self.jobs.items() if jid not in self.active_jobs and j.status in ['completed', 'cancelled'] ] inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at) # Remove oldest jobs beyond limit num_to_remove = len(self.jobs) - max_jobs for jid, _ in inactive[:num_to_remove]: self.jobs.pop(jid, None) async def get_jobs_snapshot(self) -> list: """Get a safe snapshot of all jobs for iteration""" async with self._jobs_lock: return list(self.jobs.values()) def get_pending_count(self) -> int: """Get count of pending jobs""" return sum(1 for job in self.jobs.values() if job.status == "pending") async def start_compression(self, file_path: str, reduce_percentage: int) -> str: """Start a new compression job""" # Check filesystem health first if self.health_checker: status = self.health_checker.get_status() if not status["healthy"]: error_msg = f"Cannot start compression: Filesystem is not writable. {status.get('error', 'Unknown error')}" logger.error(error_msg) raise ValueError(error_msg) # Check if queue is full pending_count = self.get_pending_count() if pending_count >= MAX_PENDING_JOBS: error_msg = f"Queue is full: {pending_count}/{MAX_PENDING_JOBS} pending jobs. Please wait for jobs to complete." logger.warning(error_msg) raise ValueError(error_msg) # Validate path is within allowed directory if self.allowed_base_path: abs_path = Path(file_path).resolve() # Resolve both paths to handle symlinks properly resolved_base = self.allowed_base_path.resolve() # Check if resolved path is within allowed directory try: abs_path.relative_to(resolved_base) except ValueError: raise ValueError("Invalid file path: outside allowed directory") # Verify file exists and is a regular file (not a symlink to outside) if not abs_path.exists() or not abs_path.is_file(): raise ValueError("Invalid file path: file does not exist or is not a regular file") async with self._jobs_lock: # Prune old jobs before creating new one self._prune_old_jobs() job = CompressionJob(file_path, reduce_percentage) self.jobs[job.job_id] = job # Save initial job to database await self.persistence.save_job(job.to_dict()) # Start compression in background task = asyncio.create_task(self.compress_video(job)) self.active_jobs[job.job_id] = task # Clean up task when done task.add_done_callback(lambda t: self.active_jobs.pop(job.job_id, None)) return job.job_id async def cancel_job(self, job_id: str): """Cancel a running compression job (user-initiated)""" if job_id in self.jobs: self.jobs[job_id].status = "cancelled" # Save cancelled status to database (user-initiated cancellation) await self.persistence.update_job_status(job_id, "cancelled") if job_id in self.active_jobs: self.active_jobs[job_id].cancel() async def remove_job(self, job_id: str): """Remove a job from the list (for failed/completed jobs)""" if job_id in self.jobs: job = self.jobs[job_id] # Only allow removal of inactive jobs if job_id not in self.active_jobs: async with self._jobs_lock: self.jobs.pop(job_id, None) return True return False