This commit ensures that compression jobs survive Docker container restarts and are automatically recovered and restarted. Changes: - Modified CancelledError handler to preserve job status during shutdown - Jobs now keep 'processing' or 'validating' status instead of being marked as 'cancelled' when the app shuts down - Added job persistence layer using SQLite database - Implemented automatic job recovery on application startup - Added process cleanup utilities for orphaned ffmpeg processes - User-initiated cancellations still properly mark jobs as cancelled - Jobs are visible in frontend after recovery The recovery system: 1. Detects interrupted jobs (processing/validating status) 2. Cleans up orphaned ffmpeg processes and temp files 3. Restarts interrupted jobs from beginning 4. Maintains queue order and respects concurrency limits 5. Works with multiple jobs in the queue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
737 lines
32 KiB
Python
737 lines
32 KiB
Python
import asyncio
|
|
import subprocess
|
|
import uuid
|
|
import os
|
|
import re
|
|
import logging
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
from job_persistence import JobPersistence
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration constants
|
|
MAX_CONCURRENT_JOBS = 2
|
|
MAX_STORED_JOBS = 100
|
|
MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue
|
|
PRUNE_INTERVAL_SECONDS = 300 # 5 minutes
|
|
WATCHDOG_TIMEOUT_SECONDS = int(os.getenv('COMPRESSION_WATCHDOG_TIMEOUT', '900')) # Default: 15 minutes
|
|
WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds
|
|
STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB
|
|
STDERR_READ_LIMIT = 8 * 1024 * 1024 # 8MB buffer limit for stderr
|
|
MAX_STDERR_LINES = 1000 # Keep only last 1000 lines of stderr
|
|
DEFAULT_AUDIO_BITRATE = 128 # kbps
|
|
MIN_VIDEO_BITRATE = 100 # kbps
|
|
|
|
|
|
class CompressionJob:
|
|
def __init__(self, file_path: str, reduce_percentage: int, job_id: Optional[str] = None):
|
|
self.job_id = job_id or str(uuid.uuid4())
|
|
self.file_path = file_path
|
|
self.reduce_percentage = reduce_percentage
|
|
self.status = "pending" # pending, processing, validating, completed, failed, cancelled
|
|
self.progress = 0.0
|
|
self.eta_seconds = None
|
|
self.current_pass = 0 # 0=not started, 1=first pass, 2=second pass
|
|
self.created_at = datetime.now()
|
|
self.started_at = None
|
|
self.completed_at = None
|
|
self.error = None
|
|
self.output_file = None
|
|
self.current_size_mb = None
|
|
self.target_size_mb = None
|
|
self.video_bitrate = None
|
|
self.duration_seconds = None
|
|
self.ffmpeg_pid = None # Track ffmpeg process PID
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert job to dictionary for database persistence"""
|
|
return {
|
|
'job_id': self.job_id,
|
|
'file_path': self.file_path,
|
|
'reduce_percentage': self.reduce_percentage,
|
|
'status': self.status,
|
|
'progress': self.progress,
|
|
'eta_seconds': self.eta_seconds,
|
|
'current_pass': self.current_pass,
|
|
'created_at': self.created_at,
|
|
'started_at': self.started_at,
|
|
'completed_at': self.completed_at,
|
|
'error': self.error,
|
|
'output_file': self.output_file,
|
|
'current_size_mb': self.current_size_mb,
|
|
'target_size_mb': self.target_size_mb,
|
|
'video_bitrate': self.video_bitrate,
|
|
'duration_seconds': self.duration_seconds,
|
|
'ffmpeg_pid': self.ffmpeg_pid
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict) -> 'CompressionJob':
|
|
"""Create job from database dictionary"""
|
|
job = cls(
|
|
file_path=data['file_path'],
|
|
reduce_percentage=data['reduce_percentage'],
|
|
job_id=data['job_id']
|
|
)
|
|
job.status = data.get('status', 'pending')
|
|
job.progress = data.get('progress', 0.0)
|
|
job.eta_seconds = data.get('eta_seconds')
|
|
job.current_pass = data.get('current_pass', 0)
|
|
job.error = data.get('error')
|
|
job.output_file = data.get('output_file')
|
|
job.current_size_mb = data.get('current_size_mb')
|
|
job.target_size_mb = data.get('target_size_mb')
|
|
job.video_bitrate = data.get('video_bitrate')
|
|
job.duration_seconds = data.get('duration_seconds')
|
|
job.ffmpeg_pid = data.get('ffmpeg_pid')
|
|
|
|
# Parse datetime strings back to datetime objects
|
|
for key in ['created_at', 'started_at', 'completed_at']:
|
|
if data.get(key):
|
|
if isinstance(data[key], str):
|
|
job.__dict__[key] = datetime.fromisoformat(data[key])
|
|
else:
|
|
job.__dict__[key] = data[key]
|
|
|
|
return job
|
|
|
|
|
|
class CompressionManager:
|
|
# Compile regex patterns once at class level for performance
|
|
TIME_PATTERN = re.compile(r'out_time_ms=(\d+)')
|
|
FPS_PATTERN = re.compile(r'fps=([\d.]+)')
|
|
|
|
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None, cache_invalidation_callback=None):
|
|
self.jobs: Dict[str, CompressionJob] = {}
|
|
self.active_jobs: Dict[str, asyncio.Task] = {}
|
|
self.semaphore = asyncio.Semaphore(max_concurrent)
|
|
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
|
|
self.health_checker = health_checker
|
|
self.cache_invalidation_callback = cache_invalidation_callback
|
|
self._pruning_task: Optional[asyncio.Task] = None
|
|
self._jobs_lock = asyncio.Lock()
|
|
self.persistence = JobPersistence()
|
|
self._pruning_started = False
|
|
|
|
async def initialize_persistence(self):
|
|
"""Initialize the persistence layer - call this on startup"""
|
|
await self.persistence.initialize()
|
|
logger.info("Job persistence initialized")
|
|
|
|
# Start periodic pruning (only once when persistence is initialized)
|
|
if not self._pruning_started:
|
|
self._start_periodic_pruning()
|
|
self._pruning_started = True
|
|
|
|
async def load_jobs_from_database(self):
|
|
"""Load all jobs from database into memory"""
|
|
async with self._jobs_lock:
|
|
jobs_data = await self.persistence.get_all_jobs()
|
|
for job_data in jobs_data:
|
|
job = CompressionJob.from_dict(job_data)
|
|
self.jobs[job.job_id] = job
|
|
logger.info(f"Loaded {len(jobs_data)} jobs from database")
|
|
|
|
async def get_video_info(self, file_path: str) -> Dict:
|
|
"""Extract video duration and file size using ffprobe"""
|
|
# Get file size in MB
|
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
|
|
|
|
# Get duration using ffprobe
|
|
cmd = [
|
|
'ffprobe', '-i', file_path,
|
|
'-show_entries', 'format=duration',
|
|
'-v', 'quiet',
|
|
'-of', 'csv=p=0'
|
|
]
|
|
result = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, _ = await result.communicate()
|
|
duration = float(stdout.decode().strip())
|
|
|
|
return {
|
|
'size_mb': file_size_mb,
|
|
'duration_seconds': duration
|
|
}
|
|
|
|
def calculate_bitrates(self, current_size_mb: float,
|
|
target_size_mb: float,
|
|
duration_seconds: float,
|
|
audio_bitrate: int = DEFAULT_AUDIO_BITRATE) -> int:
|
|
"""Calculate video bitrate based on target size"""
|
|
# Total bitrate in kbps
|
|
total_bitrate = (target_size_mb * 8192) / duration_seconds
|
|
# Video bitrate = total - audio
|
|
video_bitrate = int(total_bitrate - audio_bitrate)
|
|
return max(video_bitrate, MIN_VIDEO_BITRATE)
|
|
|
|
async def compress_video(self, job: CompressionJob):
|
|
"""Main compression function - two-pass encoding"""
|
|
async with self.semaphore:
|
|
try:
|
|
job.status = "processing"
|
|
job.started_at = datetime.now()
|
|
await self.persistence.save_job(job.to_dict())
|
|
|
|
# Get video information
|
|
info = await self.get_video_info(job.file_path)
|
|
job.current_size_mb = info['size_mb']
|
|
job.duration_seconds = info['duration_seconds']
|
|
|
|
# Calculate target size and bitrate
|
|
job.target_size_mb = job.current_size_mb * (1 - job.reduce_percentage / 100)
|
|
job.video_bitrate = self.calculate_bitrates(
|
|
job.current_size_mb,
|
|
job.target_size_mb,
|
|
job.duration_seconds
|
|
)
|
|
|
|
logger.info(f"Job {job.job_id}: Current Size: {job.current_size_mb:.2f} MB, "
|
|
f"Target Size: {job.target_size_mb:.2f} MB, "
|
|
f"Duration: {job.duration_seconds}s, "
|
|
f"Video Bitrate: {job.video_bitrate} kbps")
|
|
|
|
# Check disk space BEFORE starting compression
|
|
file_path = Path(job.file_path)
|
|
disk_usage = shutil.disk_usage(file_path.parent)
|
|
|
|
# Estimate required space: temp file + original + safety margin (1.5x original)
|
|
required_bytes = (job.current_size_mb * 1024 * 1024) * 1.5
|
|
available_bytes = disk_usage.free
|
|
|
|
if available_bytes < required_bytes:
|
|
required_gb = required_bytes / (1024**3)
|
|
available_gb = available_bytes / (1024**3)
|
|
error_msg = (
|
|
f"Insufficient disk space: need {required_gb:.1f}GB, "
|
|
f"have {available_gb:.1f}GB available"
|
|
)
|
|
logger.error(f"Job {job.job_id}: {error_msg}")
|
|
raise ValueError(error_msg)
|
|
|
|
logger.info(
|
|
f"Job {job.job_id}: Disk space check passed "
|
|
f"(need {required_bytes/(1024**3):.1f}GB, have {available_bytes/(1024**3):.1f}GB)"
|
|
)
|
|
|
|
# Generate output filename (use original path for in-place replacement)
|
|
file_path = Path(job.file_path)
|
|
temp_file = file_path.parent / f"temp_{file_path.name}"
|
|
|
|
# PASS 1: Analysis
|
|
job.current_pass = 1
|
|
await self.run_ffmpeg_pass1(job, temp_file)
|
|
|
|
if job.status == "cancelled":
|
|
self.cleanup_temp_files(job)
|
|
return
|
|
|
|
# PASS 2: Encoding
|
|
job.current_pass = 2
|
|
await self.run_ffmpeg_pass2(job, temp_file)
|
|
|
|
if job.status == "cancelled":
|
|
self.cleanup_temp_files(job)
|
|
return
|
|
|
|
# SAFE FILE REPLACEMENT WITH VALIDATION
|
|
job.status = "validating"
|
|
job.progress = 95.0
|
|
await self.persistence.update_job_status(job.job_id, "validating", progress=95.0)
|
|
logger.info(f"Job {job.job_id}: Starting safe file replacement process")
|
|
|
|
if await self.safe_replace_file(file_path, temp_file, job):
|
|
job.output_file = str(file_path) # Output is same as original path
|
|
job.status = "completed"
|
|
job.progress = 100.0
|
|
job.completed_at = datetime.now()
|
|
job.ffmpeg_pid = None # Clear PID on completion
|
|
await self.persistence.save_job(job.to_dict())
|
|
self.cleanup_temp_files(job)
|
|
logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place")
|
|
else:
|
|
job.status = "failed"
|
|
job.error = "File replacement failed or validation check failed"
|
|
await self.persistence.update_job_status(job.job_id, "failed", error=job.error)
|
|
self.cleanup_temp_files(job)
|
|
logger.error(f"Job {job.job_id} failed - file replacement unsuccessful")
|
|
|
|
except asyncio.CancelledError:
|
|
# Don't mark as cancelled - could be app shutdown
|
|
# Keep current status so job can be recovered on restart
|
|
job.ffmpeg_pid = None # Clear PID since process won't survive
|
|
# Don't update database status - leave it as "processing" or "validating"
|
|
# for recovery on restart
|
|
self.cleanup_temp_files(job)
|
|
logger.info(f"Job {job.job_id} interrupted (task cancelled)")
|
|
raise # Re-raise to properly cancel the task
|
|
except Exception as e:
|
|
job.status = "failed"
|
|
job.error = str(e)
|
|
job.ffmpeg_pid = None # Clear PID on failure
|
|
await self.persistence.update_job_status(job.job_id, "failed", error=str(e))
|
|
self.cleanup_temp_files(job)
|
|
logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
|
|
|
|
async def run_ffmpeg_pass1(self, job: CompressionJob, output_file: Path):
|
|
"""First pass: Analysis"""
|
|
cmd = [
|
|
'ffmpeg', '-y',
|
|
'-i', job.file_path,
|
|
'-c:v', 'libx264',
|
|
'-b:v', f'{job.video_bitrate}k',
|
|
'-pass', '1',
|
|
'-an', # No audio in first pass
|
|
'-f', 'null',
|
|
'/dev/null',
|
|
'-progress', 'pipe:1'
|
|
]
|
|
|
|
await self.run_ffmpeg_with_progress(job, cmd, pass_num=1)
|
|
|
|
async def run_ffmpeg_pass2(self, job: CompressionJob, output_file: Path):
|
|
"""Second pass: Encoding"""
|
|
cmd = [
|
|
'ffmpeg',
|
|
'-i', job.file_path,
|
|
'-c:v', 'libx264',
|
|
'-b:v', f'{job.video_bitrate}k',
|
|
'-pass', '2',
|
|
'-c:a', 'aac',
|
|
'-b:a', '128k',
|
|
'-movflags', '+faststart', # Important for web streaming
|
|
str(output_file),
|
|
'-progress', 'pipe:1'
|
|
]
|
|
|
|
await self.run_ffmpeg_with_progress(job, cmd, pass_num=2)
|
|
|
|
async def run_ffmpeg_with_progress(self, job: CompressionJob, cmd: list, pass_num: int):
|
|
"""Run ffmpeg and track progress with concurrent stdout/stderr reading to prevent deadlock"""
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow
|
|
)
|
|
|
|
# Track PID for first pass only (second pass uses same job)
|
|
if pass_num == 1 and process.pid:
|
|
job.ffmpeg_pid = process.pid
|
|
await self.persistence.set_ffmpeg_pid(job.job_id, process.pid)
|
|
logger.info(f"Job {job.job_id}: ffmpeg PID {process.pid} tracked")
|
|
|
|
# Progress tracking
|
|
stderr_output = []
|
|
last_progress_update = None # Initialize as None, will be set when FFmpeg starts producing output
|
|
|
|
async def read_stdout():
|
|
"""Read and process stdout for progress updates"""
|
|
nonlocal last_progress_update
|
|
try:
|
|
async for line in process.stdout:
|
|
# Initialize timer on first progress update from FFmpeg
|
|
if last_progress_update is None:
|
|
last_progress_update = datetime.now()
|
|
logger.info(f"Job {job.job_id}: FFmpeg started producing output, watchdog timer initialized")
|
|
if job.status == "cancelled":
|
|
process.kill()
|
|
return
|
|
|
|
line_str = line.decode('utf-8', errors='ignore')
|
|
|
|
# Extract time using class-level compiled pattern
|
|
time_match = self.TIME_PATTERN.search(line_str)
|
|
if time_match:
|
|
current_time_ms = int(time_match.group(1))
|
|
current_time_sec = current_time_ms / 1_000_000
|
|
|
|
# Calculate progress for this pass (0-50% or 50-100%)
|
|
pass_progress = (current_time_sec / job.duration_seconds) * 50
|
|
if pass_num == 1:
|
|
job.progress = min(pass_progress, 50)
|
|
else:
|
|
job.progress = min(50 + pass_progress, 95)
|
|
|
|
last_progress_update = datetime.now()
|
|
|
|
# Extract FPS for ETA calculation using class-level compiled pattern
|
|
fps_match = self.FPS_PATTERN.search(line_str)
|
|
if fps_match:
|
|
fps = float(fps_match.group(1))
|
|
if fps > 0:
|
|
# Calculate remaining time for current pass
|
|
remaining_sec_current_pass = (job.duration_seconds - current_time_sec)
|
|
# If in pass 1, add full duration for pass 2
|
|
if pass_num == 1:
|
|
remaining_sec = remaining_sec_current_pass + job.duration_seconds
|
|
else:
|
|
remaining_sec = remaining_sec_current_pass
|
|
job.eta_seconds = int(remaining_sec)
|
|
|
|
# Periodically save progress to database (every ~5% or 30 seconds)
|
|
# Use modulo to reduce DB writes
|
|
if int(job.progress) % 5 == 0:
|
|
await self.persistence.update_job_progress(
|
|
job.job_id,
|
|
job.progress,
|
|
job.eta_seconds,
|
|
job.current_pass
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error reading stdout: {e}")
|
|
|
|
async def read_stderr():
|
|
"""Read stderr concurrently to prevent pipe buffer deadlock"""
|
|
try:
|
|
async for line in process.stderr:
|
|
line_str = line.decode('utf-8', errors='ignore')
|
|
stderr_output.append(line_str)
|
|
|
|
# Truncate stderr_output to keep only last MAX_STDERR_LINES lines
|
|
if len(stderr_output) > MAX_STDERR_LINES:
|
|
stderr_output[:] = stderr_output[-MAX_STDERR_LINES:]
|
|
|
|
# Log important stderr messages
|
|
if 'error' in line_str.lower():
|
|
logger.error(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
|
elif 'warning' in line_str.lower():
|
|
logger.warning(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
|
except Exception as e:
|
|
logger.error(f"Error reading stderr: {e}")
|
|
|
|
async def watchdog():
|
|
"""Monitor for stuck jobs - kills process if no progress"""
|
|
nonlocal last_progress_update
|
|
while process.returncode is None:
|
|
if job.status == "cancelled":
|
|
process.kill()
|
|
return
|
|
|
|
# Only check timeout if FFmpeg has started producing output
|
|
if last_progress_update is not None:
|
|
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
|
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
|
|
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
|
logger.error(f"Watchdog: {error_msg}")
|
|
job.status = "failed"
|
|
job.error = error_msg
|
|
process.kill()
|
|
raise Exception(error_msg)
|
|
|
|
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
|
|
|
|
# Run stdout, stderr readers and watchdog concurrently to prevent deadlock
|
|
try:
|
|
await asyncio.gather(
|
|
read_stdout(),
|
|
read_stderr(),
|
|
watchdog(),
|
|
return_exceptions=False
|
|
)
|
|
except Exception as e:
|
|
process.kill()
|
|
raise
|
|
|
|
await process.wait()
|
|
|
|
if process.returncode != 0:
|
|
stderr_text = ''.join(stderr_output[-50:]) # Last 50 lines of stderr
|
|
raise Exception(f"FFmpeg pass {pass_num} failed with code {process.returncode}: {stderr_text}")
|
|
|
|
async def validate_video(self, file_path: Path) -> bool:
|
|
"""Validate compressed video is not corrupted"""
|
|
cmd = [
|
|
'ffmpeg',
|
|
'-v', 'error',
|
|
'-i', str(file_path),
|
|
'-f', 'null',
|
|
'-'
|
|
]
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
_, stderr = await process.communicate()
|
|
|
|
# Check for actual errors (not just warnings)
|
|
# FFmpeg with -v error should only output actual errors
|
|
stderr_text = stderr.decode('utf-8', errors='ignore').strip()
|
|
|
|
if stderr_text:
|
|
# Log the validation error for debugging
|
|
logger.error(f"Video validation failed: {stderr_text}")
|
|
return False
|
|
|
|
# Also check return code
|
|
return process.returncode == 0
|
|
|
|
async def validate_video_enhanced(self, original_path: Path, compressed_path: Path) -> bool:
|
|
"""Enhanced validation with size and duration checks before file replacement"""
|
|
# 1. Basic ffmpeg decode validation
|
|
if not await self.validate_video(compressed_path):
|
|
logger.error(f"Enhanced validation failed: FFmpeg decode check failed")
|
|
return False
|
|
|
|
# 2. File size sanity check (minimum 1KB)
|
|
try:
|
|
compressed_size = os.path.getsize(str(compressed_path))
|
|
if compressed_size < 1024:
|
|
logger.error(f"Enhanced validation failed: Compressed file too small ({compressed_size} bytes)")
|
|
return False
|
|
logger.info(f"Enhanced validation: Size check passed ({compressed_size} bytes)")
|
|
except Exception as e:
|
|
logger.error(f"Enhanced validation failed: Could not check file size: {e}")
|
|
return False
|
|
|
|
# 3. Duration verification (optional but recommended)
|
|
try:
|
|
original_info = await self.get_video_info(str(original_path))
|
|
compressed_info = await self.get_video_info(str(compressed_path))
|
|
|
|
duration_diff = abs(original_info['duration_seconds'] - compressed_info['duration_seconds'])
|
|
# Allow up to 1 second difference due to rounding
|
|
if duration_diff > 1.0:
|
|
logger.error(f"Enhanced validation failed: Duration mismatch - "
|
|
f"original: {original_info['duration_seconds']:.2f}s vs "
|
|
f"compressed: {compressed_info['duration_seconds']:.2f}s "
|
|
f"(diff: {duration_diff:.2f}s)")
|
|
return False
|
|
logger.info(f"Enhanced validation: Duration check passed "
|
|
f"(original: {original_info['duration_seconds']:.2f}s, "
|
|
f"compressed: {compressed_info['duration_seconds']:.2f}s)")
|
|
except Exception as e:
|
|
logger.error(f"Enhanced validation failed: Could not verify duration: {e}")
|
|
return False
|
|
|
|
logger.info(f"Enhanced validation: All checks passed for {compressed_path.name}")
|
|
return True
|
|
|
|
async def safe_replace_file(self, original_path: Path, temp_file: Path, job: CompressionJob) -> bool:
|
|
"""Safely replace original file with compressed version with rollback capability"""
|
|
backup_path = original_path.parent / f"{original_path.name}.backup"
|
|
|
|
try:
|
|
# Enhanced validation BEFORE any file operations
|
|
logger.info(f"Validating compressed file before replacement: {temp_file.name}")
|
|
if not await self.validate_video_enhanced(original_path, temp_file):
|
|
logger.error(f"Safe replace failed: Validation check failed")
|
|
return False
|
|
|
|
# Step 1: Create backup of original file
|
|
logger.info(f"Creating backup: {original_path.name} -> {backup_path.name}")
|
|
try:
|
|
os.replace(str(original_path), str(backup_path))
|
|
except Exception as e:
|
|
logger.error(f"Safe replace failed: Could not create backup: {e}")
|
|
return False
|
|
|
|
# Step 2: Replace original with compressed file (atomic operation)
|
|
logger.info(f"Replacing original file with compressed version: {temp_file.name}")
|
|
try:
|
|
os.replace(str(temp_file), str(original_path))
|
|
except Exception as e:
|
|
logger.error(f"Safe replace failed: Could not move compressed file. Attempting rollback...")
|
|
# Rollback: restore original from backup
|
|
try:
|
|
if backup_path.exists():
|
|
os.replace(str(backup_path), str(original_path))
|
|
logger.info(f"Rollback successful: Original file restored")
|
|
except Exception as rollback_error:
|
|
logger.error(f"Rollback failed: {rollback_error}")
|
|
return False
|
|
|
|
# Step 3: Verify final file is accessible
|
|
logger.info(f"Verifying replaced file is accessible: {original_path.name}")
|
|
try:
|
|
if not original_path.exists() or not original_path.is_file():
|
|
logger.error(f"Safe replace failed: Replaced file not accessible. Attempting rollback...")
|
|
# Rollback: restore original from backup
|
|
try:
|
|
if backup_path.exists():
|
|
os.replace(str(backup_path), str(original_path))
|
|
logger.info(f"Rollback successful: Original file restored")
|
|
except Exception as rollback_error:
|
|
logger.error(f"Rollback failed: {rollback_error}")
|
|
return False
|
|
|
|
# Get final file size
|
|
final_size = os.path.getsize(str(original_path))
|
|
logger.info(f"File replacement successful. Final size: {final_size / (1024*1024):.2f} MB")
|
|
except Exception as e:
|
|
logger.error(f"Safe replace failed: Could not verify final file: {e}")
|
|
return False
|
|
|
|
# Step 4: Delete backup file only after successful verification
|
|
logger.info(f"Deleting backup file: {backup_path.name}")
|
|
try:
|
|
if backup_path.exists():
|
|
backup_path.unlink()
|
|
logger.info(f"Backup file deleted successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete backup file (non-critical): {e}")
|
|
|
|
# Step 5: Invalidate cache for this directory to show updated file size
|
|
if self.cache_invalidation_callback:
|
|
try:
|
|
# Invalidate all cache entries containing the parent directory path
|
|
parent_name = original_path.parent.name
|
|
self.cache_invalidation_callback(parent_name)
|
|
logger.info(f"Cache invalidated for directory: {parent_name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to invalidate cache (non-critical): {e}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Safe replace encountered unexpected error: {e}", exc_info=True)
|
|
# Attempt rollback on any unexpected error
|
|
try:
|
|
if backup_path.exists():
|
|
os.replace(str(backup_path), str(original_path))
|
|
logger.info(f"Rollback successful: Original file restored after unexpected error")
|
|
except Exception as rollback_error:
|
|
logger.error(f"Rollback failed: {rollback_error}")
|
|
return False
|
|
|
|
def cleanup_temp_files(self, job: CompressionJob):
|
|
"""Clean up temporary files"""
|
|
file_path = Path(job.file_path)
|
|
temp_file = file_path.parent / f"temp_{file_path.name}"
|
|
|
|
# Remove temp file
|
|
try:
|
|
if temp_file.exists():
|
|
temp_file.unlink()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove temp file: {e}")
|
|
|
|
# Remove ffmpeg pass log files in same directory as source
|
|
try:
|
|
log_file = file_path.parent / "ffmpeg2pass-0.log"
|
|
if log_file.exists():
|
|
log_file.unlink()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove log file: {e}")
|
|
|
|
def _start_periodic_pruning(self):
|
|
"""Start background task for periodic job pruning"""
|
|
async def prune_periodically():
|
|
while True:
|
|
await asyncio.sleep(PRUNE_INTERVAL_SECONDS)
|
|
self._prune_old_jobs()
|
|
|
|
self._pruning_task = asyncio.create_task(prune_periodically())
|
|
|
|
def _prune_old_jobs(self, max_jobs: int = MAX_STORED_JOBS):
|
|
"""Remove oldest completed/cancelled jobs if total exceeds max_jobs. Failed jobs are kept visible."""
|
|
if len(self.jobs) <= max_jobs:
|
|
return
|
|
|
|
# Get inactive completed/cancelled jobs sorted by time (exclude failed jobs)
|
|
inactive = [
|
|
(jid, j) for jid, j in self.jobs.items()
|
|
if jid not in self.active_jobs and j.status in ['completed', 'cancelled']
|
|
]
|
|
inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at)
|
|
|
|
# Remove oldest jobs beyond limit
|
|
num_to_remove = len(self.jobs) - max_jobs
|
|
for jid, _ in inactive[:num_to_remove]:
|
|
self.jobs.pop(jid, None)
|
|
|
|
async def get_jobs_snapshot(self) -> list:
|
|
"""Get a safe snapshot of all jobs for iteration"""
|
|
async with self._jobs_lock:
|
|
return list(self.jobs.values())
|
|
|
|
def get_pending_count(self) -> int:
|
|
"""Get count of pending jobs"""
|
|
return sum(1 for job in self.jobs.values() if job.status == "pending")
|
|
|
|
async def start_compression(self, file_path: str, reduce_percentage: int) -> str:
|
|
"""Start a new compression job"""
|
|
# Check filesystem health first
|
|
if self.health_checker:
|
|
status = self.health_checker.get_status()
|
|
if not status["healthy"]:
|
|
error_msg = f"Cannot start compression: Filesystem is not writable. {status.get('error', 'Unknown error')}"
|
|
logger.error(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
# Check if queue is full
|
|
pending_count = self.get_pending_count()
|
|
if pending_count >= MAX_PENDING_JOBS:
|
|
error_msg = f"Queue is full: {pending_count}/{MAX_PENDING_JOBS} pending jobs. Please wait for jobs to complete."
|
|
logger.warning(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
# Validate path is within allowed directory
|
|
if self.allowed_base_path:
|
|
abs_path = Path(file_path).resolve()
|
|
# Resolve both paths to handle symlinks properly
|
|
resolved_base = self.allowed_base_path.resolve()
|
|
|
|
# Check if resolved path is within allowed directory
|
|
try:
|
|
abs_path.relative_to(resolved_base)
|
|
except ValueError:
|
|
raise ValueError("Invalid file path: outside allowed directory")
|
|
|
|
# Verify file exists and is a regular file (not a symlink to outside)
|
|
if not abs_path.exists() or not abs_path.is_file():
|
|
raise ValueError("Invalid file path: file does not exist or is not a regular file")
|
|
|
|
async with self._jobs_lock:
|
|
# Prune old jobs before creating new one
|
|
self._prune_old_jobs()
|
|
|
|
job = CompressionJob(file_path, reduce_percentage)
|
|
self.jobs[job.job_id] = job
|
|
|
|
# Save initial job to database
|
|
await self.persistence.save_job(job.to_dict())
|
|
|
|
# Start compression in background
|
|
task = asyncio.create_task(self.compress_video(job))
|
|
self.active_jobs[job.job_id] = task
|
|
|
|
# Clean up task when done
|
|
task.add_done_callback(lambda t: self.active_jobs.pop(job.job_id, None))
|
|
|
|
return job.job_id
|
|
|
|
async def cancel_job(self, job_id: str):
|
|
"""Cancel a running compression job (user-initiated)"""
|
|
if job_id in self.jobs:
|
|
self.jobs[job_id].status = "cancelled"
|
|
# Save cancelled status to database (user-initiated cancellation)
|
|
await self.persistence.update_job_status(job_id, "cancelled")
|
|
if job_id in self.active_jobs:
|
|
self.active_jobs[job_id].cancel()
|
|
|
|
async def remove_job(self, job_id: str):
|
|
"""Remove a job from the list (for failed/completed jobs)"""
|
|
if job_id in self.jobs:
|
|
job = self.jobs[job_id]
|
|
# Only allow removal of inactive jobs
|
|
if job_id not in self.active_jobs:
|
|
async with self._jobs_lock:
|
|
self.jobs.pop(job_id, None)
|
|
return True
|
|
return False
|