import asyncio import subprocess import uuid import os import re from pathlib import Path from datetime import datetime from typing import Dict, Optional class CompressionJob: def __init__(self, file_path: str, reduce_percentage: int): self.job_id = str(uuid.uuid4()) self.file_path = file_path self.reduce_percentage = reduce_percentage self.status = "pending" # pending, processing, validating, completed, failed, cancelled self.progress = 0.0 self.eta_seconds = None self.current_pass = 0 # 0=not started, 1=first pass, 2=second pass self.created_at = datetime.now() self.started_at = None self.completed_at = None self.error = None self.output_file = None self.current_size_mb = None self.target_size_mb = None self.video_bitrate = None self.duration_seconds = None class CompressionManager: def __init__(self): self.jobs: Dict[str, CompressionJob] = {} self.active_jobs: Dict[str, asyncio.Task] = {} async def get_video_info(self, file_path: str) -> Dict: """Extract video duration and file size using ffprobe""" # Get file size in MB file_size_mb = os.path.getsize(file_path) / (1024 * 1024) # Get duration using ffprobe cmd = [ 'ffprobe', '-i', file_path, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=p=0' ] result = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await result.communicate() duration = int(float(stdout.decode().strip())) return { 'size_mb': file_size_mb, 'duration_seconds': duration } def calculate_bitrates(self, current_size_mb: float, target_size_mb: float, duration_seconds: int, audio_bitrate: int = 128) -> int: """Calculate video bitrate based on target size""" # Total bitrate in kbps total_bitrate = (target_size_mb * 8192) / duration_seconds # Video bitrate = total - audio video_bitrate = int(total_bitrate - audio_bitrate) return max(video_bitrate, 100) # Minimum 100kbps async def compress_video(self, job: CompressionJob): """Main compression function - two-pass encoding""" try: job.status = "processing" job.started_at = datetime.now() # Get video information info = await self.get_video_info(job.file_path) job.current_size_mb = info['size_mb'] job.duration_seconds = info['duration_seconds'] # Calculate target size and bitrate job.target_size_mb = job.current_size_mb * (1 - job.reduce_percentage / 100) job.video_bitrate = self.calculate_bitrates( job.current_size_mb, job.target_size_mb, job.duration_seconds ) print(f"Job {job.job_id}:") print(f" Current Size: {job.current_size_mb:.2f} MB") print(f" Target Size: {job.target_size_mb:.2f} MB") print(f" Duration: {job.duration_seconds}s") print(f" Video Bitrate: {job.video_bitrate} kbps") # Generate output filename file_path = Path(job.file_path) temp_file = file_path.parent / f"temp_{file_path.name}" output_file = file_path.parent / f"{file_path.stem}_compressed_{job.reduce_percentage}{file_path.suffix}" # PASS 1: Analysis job.current_pass = 1 await self.run_ffmpeg_pass1(job, temp_file) if job.status == "cancelled": self.cleanup_temp_files(job) return # PASS 2: Encoding job.current_pass = 2 await self.run_ffmpeg_pass2(job, temp_file) if job.status == "cancelled": self.cleanup_temp_files(job) return # VALIDATION job.status = "validating" job.progress = 95.0 if await self.validate_video(temp_file): # Move temp file to final output os.rename(temp_file, output_file) job.output_file = str(output_file) job.status = "completed" job.progress = 100.0 job.completed_at = datetime.now() self.cleanup_temp_files(job) print(f"Job {job.job_id} completed successfully") else: job.status = "failed" job.error = "Validation failed: Compressed video is corrupted" self.cleanup_temp_files(job) print(f"Job {job.job_id} failed validation") except asyncio.CancelledError: job.status = "cancelled" self.cleanup_temp_files(job) print(f"Job {job.job_id} cancelled") except Exception as e: job.status = "failed" job.error = str(e) self.cleanup_temp_files(job) print(f"Job {job.job_id} failed: {e}") async def run_ffmpeg_pass1(self, job: CompressionJob, output_file: Path): """First pass: Analysis""" cmd = [ 'ffmpeg', '-y', '-i', job.file_path, '-c:v', 'libx264', '-b:v', f'{job.video_bitrate}k', '-pass', '1', '-an', # No audio in first pass '-f', 'null', '/dev/null', '-progress', 'pipe:1' ] await self.run_ffmpeg_with_progress(job, cmd, pass_num=1) async def run_ffmpeg_pass2(self, job: CompressionJob, output_file: Path): """Second pass: Encoding""" cmd = [ 'ffmpeg', '-i', job.file_path, '-c:v', 'libx264', '-b:v', f'{job.video_bitrate}k', '-pass', '2', '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', # Important for web streaming str(output_file), '-progress', 'pipe:1' ] await self.run_ffmpeg_with_progress(job, cmd, pass_num=2) async def run_ffmpeg_with_progress(self, job: CompressionJob, cmd: list, pass_num: int): """Run ffmpeg and track progress""" process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Progress tracking time_pattern = re.compile(r'out_time_ms=(\d+)') fps_pattern = re.compile(r'fps=([\d.]+)') async for line in process.stdout: if job.status == "cancelled": process.kill() raise asyncio.CancelledError() line_str = line.decode('utf-8', errors='ignore') # Extract time time_match = time_pattern.search(line_str) if time_match: current_time_ms = int(time_match.group(1)) current_time_sec = current_time_ms / 1_000_000 # Calculate progress for this pass (0-50% or 50-100%) pass_progress = (current_time_sec / job.duration_seconds) * 50 if pass_num == 1: job.progress = min(pass_progress, 50) else: job.progress = min(50 + pass_progress, 95) # Extract FPS for ETA calculation fps_match = fps_pattern.search(line_str) if fps_match: fps = float(fps_match.group(1)) if fps > 0: remaining_sec = (job.duration_seconds - current_time_sec) if pass_num == 1: remaining_sec = remaining_sec + job.duration_seconds # Account for both passes job.eta_seconds = int(remaining_sec) await process.wait() if process.returncode != 0: stderr = await process.stderr.read() raise Exception(f"FFmpeg pass {pass_num} failed: {stderr.decode()}") async def validate_video(self, file_path: Path) -> bool: """Validate compressed video is not corrupted""" cmd = [ 'ffmpeg', '-v', 'error', '-i', str(file_path), '-f', 'null', '-' ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) _, stderr = await process.communicate() # If there are errors in stderr, validation failed return len(stderr) == 0 def cleanup_temp_files(self, job: CompressionJob): """Clean up temporary files""" file_path = Path(job.file_path) temp_file = file_path.parent / f"temp_{file_path.name}" # Remove temp file try: if temp_file.exists(): temp_file.unlink() except Exception as e: print(f"Failed to remove temp file: {e}") # Remove ffmpeg pass log files try: log_file = Path("ffmpeg2pass-0.log") if log_file.exists(): log_file.unlink() except Exception as e: print(f"Failed to remove log file: {e}") async def start_compression(self, file_path: str, reduce_percentage: int) -> str: """Start a new compression job""" job = CompressionJob(file_path, reduce_percentage) self.jobs[job.job_id] = job # Start compression in background task = asyncio.create_task(self.compress_video(job)) self.active_jobs[job.job_id] = task # Clean up task when done task.add_done_callback(lambda t: self.active_jobs.pop(job.job_id, None)) return job.job_id async def cancel_job(self, job_id: str): """Cancel a running compression job""" if job_id in self.jobs: self.jobs[job_id].status = "cancelled" if job_id in self.active_jobs: self.active_jobs[job_id].cancel()