## Overview Refactored compression system to replace original files in-place instead of creating duplicate files with _compressed suffix. This eliminates disk space waste when compressing videos. ## Key Changes ### Backend (backend/compression.py) - **Added enhanced validation** (`validate_video_enhanced`): - FFmpeg decode validation (catches corrupted files) - File size sanity check (minimum 1KB) - Duration comparison between original and compressed (±1 second tolerance) - **Implemented safe file replacement** (`safe_replace_file`): - Enhanced validation before any file operations - Atomic file operations using `os.replace()` to prevent race conditions - Backup strategy: original → .backup during replacement - Automatic rollback if validation or replacement fails - Comprehensive logging at each step - **Updated compress_video function**: - Removed `_compressed_75` suffix from output filenames - Changed to use original file path for in-place replacement - Calls new safe_replace_file method instead of simple os.rename - Improved logging messages to reflect replacement operation ### Frontend (frontend/src/App.jsx) - Updated compression success messages to show reduction percentage - Messages now indicate file is being replaced, not duplicated - Displays compression reduction percentage (e.g., "Started compressing file.mp4 (75% reduction)") ## Safety Features 1. **Backup-and-restore**: Original file backed up as .backup until verification passes 2. **Enhanced validation**: Three-level validation before committing to replacement 3. **Atomic operations**: Uses os.replace() for atomic file replacements 4. **Automatic rollback**: If any step fails, original is restored from backup 5. **Comprehensive logging**: All file operations logged for debugging ## File Operations Flow 1. Compress to temp file 2. Enhanced validation (decode + size + duration) 3. Create backup: original.mp4 → original.mp4.backup 4. Move compressed: temp → original.mp4 5. Verify final file is accessible 6. Delete backup only after final verification 7. Rollback if any step fails ## Testing - Docker rebuild: `docker compose down && docker compose build && docker compose up -d` - Manual testing recommended for compression jobs with various video files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
589 lines
25 KiB
Python
589 lines
25 KiB
Python
import asyncio
|
|
import subprocess
|
|
import uuid
|
|
import os
|
|
import re
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration constants
|
|
MAX_CONCURRENT_JOBS = 2
|
|
MAX_STORED_JOBS = 100
|
|
MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue
|
|
PRUNE_INTERVAL_SECONDS = 300 # 5 minutes
|
|
WATCHDOG_TIMEOUT_SECONDS = int(os.getenv('COMPRESSION_WATCHDOG_TIMEOUT', '900')) # Default: 15 minutes
|
|
WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds
|
|
STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB
|
|
STDERR_READ_LIMIT = 8 * 1024 * 1024 # 8MB buffer limit for stderr
|
|
MAX_STDERR_LINES = 1000 # Keep only last 1000 lines of stderr
|
|
DEFAULT_AUDIO_BITRATE = 128 # kbps
|
|
MIN_VIDEO_BITRATE = 100 # kbps
|
|
|
|
|
|
class CompressionJob:
|
|
def __init__(self, file_path: str, reduce_percentage: int):
|
|
self.job_id = str(uuid.uuid4())
|
|
self.file_path = file_path
|
|
self.reduce_percentage = reduce_percentage
|
|
self.status = "pending" # pending, processing, validating, completed, failed, cancelled
|
|
self.progress = 0.0
|
|
self.eta_seconds = None
|
|
self.current_pass = 0 # 0=not started, 1=first pass, 2=second pass
|
|
self.created_at = datetime.now()
|
|
self.started_at = None
|
|
self.completed_at = None
|
|
self.error = None
|
|
self.output_file = None
|
|
self.current_size_mb = None
|
|
self.target_size_mb = None
|
|
self.video_bitrate = None
|
|
self.duration_seconds = None
|
|
|
|
|
|
class CompressionManager:
|
|
# Compile regex patterns once at class level for performance
|
|
TIME_PATTERN = re.compile(r'out_time_ms=(\d+)')
|
|
FPS_PATTERN = re.compile(r'fps=([\d.]+)')
|
|
|
|
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None):
|
|
self.jobs: Dict[str, CompressionJob] = {}
|
|
self.active_jobs: Dict[str, asyncio.Task] = {}
|
|
self.semaphore = asyncio.Semaphore(max_concurrent)
|
|
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
|
|
self.health_checker = health_checker
|
|
self._pruning_task: Optional[asyncio.Task] = None
|
|
self._jobs_lock = asyncio.Lock()
|
|
self._start_periodic_pruning()
|
|
|
|
async def get_video_info(self, file_path: str) -> Dict:
|
|
"""Extract video duration and file size using ffprobe"""
|
|
# Get file size in MB
|
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
|
|
|
|
# Get duration using ffprobe
|
|
cmd = [
|
|
'ffprobe', '-i', file_path,
|
|
'-show_entries', 'format=duration',
|
|
'-v', 'quiet',
|
|
'-of', 'csv=p=0'
|
|
]
|
|
result = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, _ = await result.communicate()
|
|
duration = float(stdout.decode().strip())
|
|
|
|
return {
|
|
'size_mb': file_size_mb,
|
|
'duration_seconds': duration
|
|
}
|
|
|
|
def calculate_bitrates(self, current_size_mb: float,
|
|
target_size_mb: float,
|
|
duration_seconds: float,
|
|
audio_bitrate: int = DEFAULT_AUDIO_BITRATE) -> int:
|
|
"""Calculate video bitrate based on target size"""
|
|
# Total bitrate in kbps
|
|
total_bitrate = (target_size_mb * 8192) / duration_seconds
|
|
# Video bitrate = total - audio
|
|
video_bitrate = int(total_bitrate - audio_bitrate)
|
|
return max(video_bitrate, MIN_VIDEO_BITRATE)
|
|
|
|
async def compress_video(self, job: CompressionJob):
|
|
"""Main compression function - two-pass encoding"""
|
|
async with self.semaphore:
|
|
try:
|
|
job.status = "processing"
|
|
job.started_at = datetime.now()
|
|
|
|
# Get video information
|
|
info = await self.get_video_info(job.file_path)
|
|
job.current_size_mb = info['size_mb']
|
|
job.duration_seconds = info['duration_seconds']
|
|
|
|
# Calculate target size and bitrate
|
|
job.target_size_mb = job.current_size_mb * (1 - job.reduce_percentage / 100)
|
|
job.video_bitrate = self.calculate_bitrates(
|
|
job.current_size_mb,
|
|
job.target_size_mb,
|
|
job.duration_seconds
|
|
)
|
|
|
|
logger.info(f"Job {job.job_id}: Current Size: {job.current_size_mb:.2f} MB, "
|
|
f"Target Size: {job.target_size_mb:.2f} MB, "
|
|
f"Duration: {job.duration_seconds}s, "
|
|
f"Video Bitrate: {job.video_bitrate} kbps")
|
|
|
|
# Generate output filename (use original path for in-place replacement)
|
|
file_path = Path(job.file_path)
|
|
temp_file = file_path.parent / f"temp_{file_path.name}"
|
|
|
|
# PASS 1: Analysis
|
|
job.current_pass = 1
|
|
await self.run_ffmpeg_pass1(job, temp_file)
|
|
|
|
if job.status == "cancelled":
|
|
self.cleanup_temp_files(job)
|
|
return
|
|
|
|
# PASS 2: Encoding
|
|
job.current_pass = 2
|
|
await self.run_ffmpeg_pass2(job, temp_file)
|
|
|
|
if job.status == "cancelled":
|
|
self.cleanup_temp_files(job)
|
|
return
|
|
|
|
# SAFE FILE REPLACEMENT WITH VALIDATION
|
|
job.status = "validating"
|
|
job.progress = 95.0
|
|
logger.info(f"Job {job.job_id}: Starting safe file replacement process")
|
|
|
|
if await self.safe_replace_file(file_path, temp_file, job):
|
|
job.output_file = str(file_path) # Output is same as original path
|
|
job.status = "completed"
|
|
job.progress = 100.0
|
|
job.completed_at = datetime.now()
|
|
self.cleanup_temp_files(job)
|
|
logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place")
|
|
else:
|
|
job.status = "failed"
|
|
job.error = "File replacement failed or validation check failed"
|
|
self.cleanup_temp_files(job)
|
|
logger.error(f"Job {job.job_id} failed - file replacement unsuccessful")
|
|
|
|
except asyncio.CancelledError:
|
|
job.status = "cancelled"
|
|
self.cleanup_temp_files(job)
|
|
logger.info(f"Job {job.job_id} cancelled")
|
|
except Exception as e:
|
|
job.status = "failed"
|
|
job.error = str(e)
|
|
self.cleanup_temp_files(job)
|
|
logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
|
|
|
|
async def run_ffmpeg_pass1(self, job: CompressionJob, output_file: Path):
|
|
"""First pass: Analysis"""
|
|
cmd = [
|
|
'ffmpeg', '-y',
|
|
'-i', job.file_path,
|
|
'-c:v', 'libx264',
|
|
'-b:v', f'{job.video_bitrate}k',
|
|
'-pass', '1',
|
|
'-an', # No audio in first pass
|
|
'-f', 'null',
|
|
'/dev/null',
|
|
'-progress', 'pipe:1'
|
|
]
|
|
|
|
await self.run_ffmpeg_with_progress(job, cmd, pass_num=1)
|
|
|
|
async def run_ffmpeg_pass2(self, job: CompressionJob, output_file: Path):
|
|
"""Second pass: Encoding"""
|
|
cmd = [
|
|
'ffmpeg',
|
|
'-i', job.file_path,
|
|
'-c:v', 'libx264',
|
|
'-b:v', f'{job.video_bitrate}k',
|
|
'-pass', '2',
|
|
'-c:a', 'aac',
|
|
'-b:a', '128k',
|
|
'-movflags', '+faststart', # Important for web streaming
|
|
str(output_file),
|
|
'-progress', 'pipe:1'
|
|
]
|
|
|
|
await self.run_ffmpeg_with_progress(job, cmd, pass_num=2)
|
|
|
|
async def run_ffmpeg_with_progress(self, job: CompressionJob, cmd: list, pass_num: int):
|
|
"""Run ffmpeg and track progress with concurrent stdout/stderr reading to prevent deadlock"""
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow
|
|
)
|
|
|
|
# Progress tracking
|
|
stderr_output = []
|
|
last_progress_update = datetime.now()
|
|
|
|
async def read_stdout():
|
|
"""Read and process stdout for progress updates"""
|
|
nonlocal last_progress_update
|
|
try:
|
|
async for line in process.stdout:
|
|
if job.status == "cancelled":
|
|
process.kill()
|
|
return
|
|
|
|
line_str = line.decode('utf-8', errors='ignore')
|
|
|
|
# Extract time using class-level compiled pattern
|
|
time_match = self.TIME_PATTERN.search(line_str)
|
|
if time_match:
|
|
current_time_ms = int(time_match.group(1))
|
|
current_time_sec = current_time_ms / 1_000_000
|
|
|
|
# Calculate progress for this pass (0-50% or 50-100%)
|
|
pass_progress = (current_time_sec / job.duration_seconds) * 50
|
|
if pass_num == 1:
|
|
job.progress = min(pass_progress, 50)
|
|
else:
|
|
job.progress = min(50 + pass_progress, 95)
|
|
|
|
last_progress_update = datetime.now()
|
|
|
|
# Extract FPS for ETA calculation using class-level compiled pattern
|
|
fps_match = self.FPS_PATTERN.search(line_str)
|
|
if fps_match:
|
|
fps = float(fps_match.group(1))
|
|
if fps > 0:
|
|
# Calculate remaining time for current pass
|
|
remaining_sec_current_pass = (job.duration_seconds - current_time_sec)
|
|
# If in pass 1, add full duration for pass 2
|
|
if pass_num == 1:
|
|
remaining_sec = remaining_sec_current_pass + job.duration_seconds
|
|
else:
|
|
remaining_sec = remaining_sec_current_pass
|
|
job.eta_seconds = int(remaining_sec)
|
|
except Exception as e:
|
|
logger.error(f"Error reading stdout: {e}")
|
|
|
|
async def read_stderr():
|
|
"""Read stderr concurrently to prevent pipe buffer deadlock"""
|
|
try:
|
|
async for line in process.stderr:
|
|
line_str = line.decode('utf-8', errors='ignore')
|
|
stderr_output.append(line_str)
|
|
|
|
# Truncate stderr_output to keep only last MAX_STDERR_LINES lines
|
|
if len(stderr_output) > MAX_STDERR_LINES:
|
|
stderr_output[:] = stderr_output[-MAX_STDERR_LINES:]
|
|
|
|
# Log important stderr messages
|
|
if 'error' in line_str.lower():
|
|
logger.error(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
|
elif 'warning' in line_str.lower():
|
|
logger.warning(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
|
except Exception as e:
|
|
logger.error(f"Error reading stderr: {e}")
|
|
|
|
async def watchdog():
|
|
"""Monitor for stuck jobs - kills process if no progress"""
|
|
nonlocal last_progress_update
|
|
while process.returncode is None:
|
|
if job.status == "cancelled":
|
|
process.kill()
|
|
return
|
|
|
|
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
|
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
|
|
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
|
logger.error(f"Watchdog: {error_msg}")
|
|
job.status = "failed"
|
|
job.error = error_msg
|
|
process.kill()
|
|
raise Exception(error_msg)
|
|
|
|
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
|
|
|
|
# Run stdout, stderr readers and watchdog concurrently to prevent deadlock
|
|
try:
|
|
await asyncio.gather(
|
|
read_stdout(),
|
|
read_stderr(),
|
|
watchdog(),
|
|
return_exceptions=False
|
|
)
|
|
except Exception as e:
|
|
process.kill()
|
|
raise
|
|
|
|
await process.wait()
|
|
|
|
if process.returncode != 0:
|
|
stderr_text = ''.join(stderr_output[-50:]) # Last 50 lines of stderr
|
|
raise Exception(f"FFmpeg pass {pass_num} failed with code {process.returncode}: {stderr_text}")
|
|
|
|
async def validate_video(self, file_path: Path) -> bool:
|
|
"""Validate compressed video is not corrupted"""
|
|
cmd = [
|
|
'ffmpeg',
|
|
'-v', 'error',
|
|
'-i', str(file_path),
|
|
'-f', 'null',
|
|
'-'
|
|
]
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
_, stderr = await process.communicate()
|
|
|
|
# Check for actual errors (not just warnings)
|
|
# FFmpeg with -v error should only output actual errors
|
|
stderr_text = stderr.decode('utf-8', errors='ignore').strip()
|
|
|
|
if stderr_text:
|
|
# Log the validation error for debugging
|
|
logger.error(f"Video validation failed: {stderr_text}")
|
|
return False
|
|
|
|
# Also check return code
|
|
return process.returncode == 0
|
|
|
|
async def validate_video_enhanced(self, original_path: Path, compressed_path: Path) -> bool:
|
|
"""Enhanced validation with size and duration checks before file replacement"""
|
|
# 1. Basic ffmpeg decode validation
|
|
if not await self.validate_video(compressed_path):
|
|
logger.error(f"Enhanced validation failed: FFmpeg decode check failed")
|
|
return False
|
|
|
|
# 2. File size sanity check (minimum 1KB)
|
|
try:
|
|
compressed_size = os.path.getsize(str(compressed_path))
|
|
if compressed_size < 1024:
|
|
logger.error(f"Enhanced validation failed: Compressed file too small ({compressed_size} bytes)")
|
|
return False
|
|
logger.info(f"Enhanced validation: Size check passed ({compressed_size} bytes)")
|
|
except Exception as e:
|
|
logger.error(f"Enhanced validation failed: Could not check file size: {e}")
|
|
return False
|
|
|
|
# 3. Duration verification (optional but recommended)
|
|
try:
|
|
original_info = await self.get_video_info(str(original_path))
|
|
compressed_info = await self.get_video_info(str(compressed_path))
|
|
|
|
duration_diff = abs(original_info['duration_seconds'] - compressed_info['duration_seconds'])
|
|
# Allow up to 1 second difference due to rounding
|
|
if duration_diff > 1.0:
|
|
logger.error(f"Enhanced validation failed: Duration mismatch - "
|
|
f"original: {original_info['duration_seconds']:.2f}s vs "
|
|
f"compressed: {compressed_info['duration_seconds']:.2f}s "
|
|
f"(diff: {duration_diff:.2f}s)")
|
|
return False
|
|
logger.info(f"Enhanced validation: Duration check passed "
|
|
f"(original: {original_info['duration_seconds']:.2f}s, "
|
|
f"compressed: {compressed_info['duration_seconds']:.2f}s)")
|
|
except Exception as e:
|
|
logger.error(f"Enhanced validation failed: Could not verify duration: {e}")
|
|
return False
|
|
|
|
logger.info(f"Enhanced validation: All checks passed for {compressed_path.name}")
|
|
return True
|
|
|
|
async def safe_replace_file(self, original_path: Path, temp_file: Path, job: CompressionJob) -> bool:
|
|
"""Safely replace original file with compressed version with rollback capability"""
|
|
backup_path = original_path.parent / f"{original_path.name}.backup"
|
|
|
|
try:
|
|
# Enhanced validation BEFORE any file operations
|
|
logger.info(f"Validating compressed file before replacement: {temp_file.name}")
|
|
if not await self.validate_video_enhanced(original_path, temp_file):
|
|
logger.error(f"Safe replace failed: Validation check failed")
|
|
return False
|
|
|
|
# Step 1: Create backup of original file
|
|
logger.info(f"Creating backup: {original_path.name} -> {backup_path.name}")
|
|
try:
|
|
os.replace(str(original_path), str(backup_path))
|
|
except Exception as e:
|
|
logger.error(f"Safe replace failed: Could not create backup: {e}")
|
|
return False
|
|
|
|
# Step 2: Replace original with compressed file (atomic operation)
|
|
logger.info(f"Replacing original file with compressed version: {temp_file.name}")
|
|
try:
|
|
os.replace(str(temp_file), str(original_path))
|
|
except Exception as e:
|
|
logger.error(f"Safe replace failed: Could not move compressed file. Attempting rollback...")
|
|
# Rollback: restore original from backup
|
|
try:
|
|
if backup_path.exists():
|
|
os.replace(str(backup_path), str(original_path))
|
|
logger.info(f"Rollback successful: Original file restored")
|
|
except Exception as rollback_error:
|
|
logger.error(f"Rollback failed: {rollback_error}")
|
|
return False
|
|
|
|
# Step 3: Verify final file is accessible
|
|
logger.info(f"Verifying replaced file is accessible: {original_path.name}")
|
|
try:
|
|
if not original_path.exists() or not original_path.is_file():
|
|
logger.error(f"Safe replace failed: Replaced file not accessible. Attempting rollback...")
|
|
# Rollback: restore original from backup
|
|
try:
|
|
if backup_path.exists():
|
|
os.replace(str(backup_path), str(original_path))
|
|
logger.info(f"Rollback successful: Original file restored")
|
|
except Exception as rollback_error:
|
|
logger.error(f"Rollback failed: {rollback_error}")
|
|
return False
|
|
|
|
# Get final file size
|
|
final_size = os.path.getsize(str(original_path))
|
|
logger.info(f"File replacement successful. Final size: {final_size / (1024*1024):.2f} MB")
|
|
except Exception as e:
|
|
logger.error(f"Safe replace failed: Could not verify final file: {e}")
|
|
return False
|
|
|
|
# Step 4: Delete backup file only after successful verification
|
|
logger.info(f"Deleting backup file: {backup_path.name}")
|
|
try:
|
|
if backup_path.exists():
|
|
backup_path.unlink()
|
|
logger.info(f"Backup file deleted successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete backup file (non-critical): {e}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Safe replace encountered unexpected error: {e}", exc_info=True)
|
|
# Attempt rollback on any unexpected error
|
|
try:
|
|
if backup_path.exists():
|
|
os.replace(str(backup_path), str(original_path))
|
|
logger.info(f"Rollback successful: Original file restored after unexpected error")
|
|
except Exception as rollback_error:
|
|
logger.error(f"Rollback failed: {rollback_error}")
|
|
return False
|
|
|
|
def cleanup_temp_files(self, job: CompressionJob):
|
|
"""Clean up temporary files"""
|
|
file_path = Path(job.file_path)
|
|
temp_file = file_path.parent / f"temp_{file_path.name}"
|
|
|
|
# Remove temp file
|
|
try:
|
|
if temp_file.exists():
|
|
temp_file.unlink()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove temp file: {e}")
|
|
|
|
# Remove ffmpeg pass log files in same directory as source
|
|
try:
|
|
log_file = file_path.parent / "ffmpeg2pass-0.log"
|
|
if log_file.exists():
|
|
log_file.unlink()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove log file: {e}")
|
|
|
|
def _start_periodic_pruning(self):
|
|
"""Start background task for periodic job pruning"""
|
|
async def prune_periodically():
|
|
while True:
|
|
await asyncio.sleep(PRUNE_INTERVAL_SECONDS)
|
|
self._prune_old_jobs()
|
|
|
|
self._pruning_task = asyncio.create_task(prune_periodically())
|
|
|
|
def _prune_old_jobs(self, max_jobs: int = MAX_STORED_JOBS):
|
|
"""Remove oldest completed/cancelled jobs if total exceeds max_jobs. Failed jobs are kept visible."""
|
|
if len(self.jobs) <= max_jobs:
|
|
return
|
|
|
|
# Get inactive completed/cancelled jobs sorted by time (exclude failed jobs)
|
|
inactive = [
|
|
(jid, j) for jid, j in self.jobs.items()
|
|
if jid not in self.active_jobs and j.status in ['completed', 'cancelled']
|
|
]
|
|
inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at)
|
|
|
|
# Remove oldest jobs beyond limit
|
|
num_to_remove = len(self.jobs) - max_jobs
|
|
for jid, _ in inactive[:num_to_remove]:
|
|
self.jobs.pop(jid, None)
|
|
|
|
async def get_jobs_snapshot(self) -> list:
|
|
"""Get a safe snapshot of all jobs for iteration"""
|
|
async with self._jobs_lock:
|
|
return list(self.jobs.values())
|
|
|
|
def get_pending_count(self) -> int:
|
|
"""Get count of pending jobs"""
|
|
return sum(1 for job in self.jobs.values() if job.status == "pending")
|
|
|
|
async def start_compression(self, file_path: str, reduce_percentage: int) -> str:
|
|
"""Start a new compression job"""
|
|
# Check filesystem health first
|
|
if self.health_checker:
|
|
status = self.health_checker.get_status()
|
|
if not status["healthy"]:
|
|
error_msg = f"Cannot start compression: Filesystem is not writable. {status.get('error', 'Unknown error')}"
|
|
logger.error(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
# Check if queue is full
|
|
pending_count = self.get_pending_count()
|
|
if pending_count >= MAX_PENDING_JOBS:
|
|
error_msg = f"Queue is full: {pending_count}/{MAX_PENDING_JOBS} pending jobs. Please wait for jobs to complete."
|
|
logger.warning(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
# Validate path is within allowed directory
|
|
if self.allowed_base_path:
|
|
abs_path = Path(file_path).resolve()
|
|
# Resolve both paths to handle symlinks properly
|
|
resolved_base = self.allowed_base_path.resolve()
|
|
|
|
# Check if resolved path is within allowed directory
|
|
try:
|
|
abs_path.relative_to(resolved_base)
|
|
except ValueError:
|
|
raise ValueError("Invalid file path: outside allowed directory")
|
|
|
|
# Verify file exists and is a regular file (not a symlink to outside)
|
|
if not abs_path.exists() or not abs_path.is_file():
|
|
raise ValueError("Invalid file path: file does not exist or is not a regular file")
|
|
|
|
async with self._jobs_lock:
|
|
# Prune old jobs before creating new one
|
|
self._prune_old_jobs()
|
|
|
|
job = CompressionJob(file_path, reduce_percentage)
|
|
self.jobs[job.job_id] = job
|
|
|
|
# Start compression in background
|
|
task = asyncio.create_task(self.compress_video(job))
|
|
self.active_jobs[job.job_id] = task
|
|
|
|
# Clean up task when done
|
|
task.add_done_callback(lambda t: self.active_jobs.pop(job.job_id, None))
|
|
|
|
return job.job_id
|
|
|
|
async def cancel_job(self, job_id: str):
|
|
"""Cancel a running compression job"""
|
|
if job_id in self.jobs:
|
|
self.jobs[job_id].status = "cancelled"
|
|
if job_id in self.active_jobs:
|
|
self.active_jobs[job_id].cancel()
|
|
|
|
async def remove_job(self, job_id: str):
|
|
"""Remove a job from the list (for failed/completed jobs)"""
|
|
if job_id in self.jobs:
|
|
job = self.jobs[job_id]
|
|
# Only allow removal of inactive jobs
|
|
if job_id not in self.active_jobs:
|
|
async with self._jobs_lock:
|
|
self.jobs.pop(job_id, None)
|
|
return True
|
|
return False
|