This commit ensures that compression jobs survive Docker container restarts and are automatically recovered and restarted. Changes: - Modified CancelledError handler to preserve job status during shutdown - Jobs now keep 'processing' or 'validating' status instead of being marked as 'cancelled' when the app shuts down - Added job persistence layer using SQLite database - Implemented automatic job recovery on application startup - Added process cleanup utilities for orphaned ffmpeg processes - User-initiated cancellations still properly mark jobs as cancelled - Jobs are visible in frontend after recovery The recovery system: 1. Detects interrupted jobs (processing/validating status) 2. Cleans up orphaned ffmpeg processes and temp files 3. Restarts interrupted jobs from beginning 4. Maintains queue order and respects concurrency limits 5. Works with multiple jobs in the queue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
212 lines
6.8 KiB
Python
212 lines
6.8 KiB
Python
import psutil
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def process_exists(pid: int) -> bool:
|
|
"""Check if a process with given PID exists"""
|
|
try:
|
|
return psutil.pid_exists(pid)
|
|
except Exception as e:
|
|
logger.error(f"Error checking if PID {pid} exists: {e}")
|
|
return False
|
|
|
|
|
|
def get_process_info(pid: int) -> Optional[dict]:
|
|
"""Get detailed information about a process"""
|
|
try:
|
|
proc = psutil.Process(pid)
|
|
return {
|
|
'pid': pid,
|
|
'name': proc.name(),
|
|
'cmdline': proc.cmdline(),
|
|
'cwd': proc.cwd(),
|
|
'status': proc.status(),
|
|
'create_time': proc.create_time()
|
|
}
|
|
except psutil.NoSuchProcess:
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting process info for PID {pid}: {e}")
|
|
return None
|
|
|
|
|
|
def is_our_ffmpeg_process(pid: int, expected_file_path: str) -> bool:
|
|
"""
|
|
Verify if a process is our ffmpeg process by checking:
|
|
1. Process name is ffmpeg
|
|
2. Command line contains our input file path
|
|
3. Working directory is accessible
|
|
|
|
Args:
|
|
pid: Process ID to check
|
|
expected_file_path: The file path this ffmpeg should be processing
|
|
|
|
Returns:
|
|
True if verified as our ffmpeg process, False otherwise
|
|
"""
|
|
try:
|
|
proc = psutil.Process(pid)
|
|
|
|
# Check 1: Process name must be ffmpeg
|
|
process_name = proc.name().lower()
|
|
if 'ffmpeg' not in process_name:
|
|
logger.debug(f"PID {pid}: Not an ffmpeg process (name: {process_name})")
|
|
return False
|
|
|
|
# Check 2: Command line must contain our file path
|
|
cmdline = proc.cmdline()
|
|
cmdline_str = ' '.join(cmdline)
|
|
|
|
# The file path should appear in the command line
|
|
if expected_file_path not in cmdline_str:
|
|
logger.debug(f"PID {pid}: File path '{expected_file_path}' not in command line")
|
|
return False
|
|
|
|
# Check 3: Should have -i flag (input file) and -progress flag (our marker)
|
|
has_input_flag = '-i' in cmdline
|
|
has_progress_flag = '-progress' in cmdline or 'progress' in cmdline_str
|
|
|
|
if not (has_input_flag and has_progress_flag):
|
|
logger.debug(f"PID {pid}: Missing expected ffmpeg flags")
|
|
return False
|
|
|
|
logger.info(f"PID {pid}: Verified as our ffmpeg process for '{expected_file_path}'")
|
|
return True
|
|
|
|
except psutil.NoSuchProcess:
|
|
logger.debug(f"PID {pid}: Process no longer exists")
|
|
return False
|
|
except psutil.AccessDenied:
|
|
logger.warning(f"PID {pid}: Access denied - cannot verify (won't kill)")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error verifying PID {pid}: {e}")
|
|
return False
|
|
|
|
|
|
def kill_process_safely(pid: int, expected_file_path: str, timeout: int = 10) -> bool:
|
|
"""
|
|
Safely kill a process after verification
|
|
|
|
Args:
|
|
pid: Process ID to kill
|
|
expected_file_path: Expected file path to verify against
|
|
timeout: Seconds to wait for graceful termination
|
|
|
|
Returns:
|
|
True if process was killed successfully, False otherwise
|
|
"""
|
|
# Double-check verification before killing
|
|
if not is_our_ffmpeg_process(pid, expected_file_path):
|
|
logger.warning(f"PID {pid}: Failed verification - NOT killing for safety")
|
|
return False
|
|
|
|
try:
|
|
proc = psutil.Process(pid)
|
|
|
|
# Try graceful termination first (SIGTERM)
|
|
logger.info(f"PID {pid}: Sending SIGTERM for graceful shutdown")
|
|
proc.terminate()
|
|
|
|
try:
|
|
# Wait for process to terminate
|
|
proc.wait(timeout=timeout)
|
|
logger.info(f"PID {pid}: Process terminated gracefully")
|
|
return True
|
|
except psutil.TimeoutExpired:
|
|
# Force kill if graceful shutdown failed (SIGKILL)
|
|
logger.warning(f"PID {pid}: Graceful shutdown timed out, sending SIGKILL")
|
|
proc.kill()
|
|
proc.wait(timeout=5)
|
|
logger.info(f"PID {pid}: Process force killed")
|
|
return True
|
|
|
|
except psutil.NoSuchProcess:
|
|
logger.info(f"PID {pid}: Process already terminated")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error killing PID {pid}: {e}")
|
|
return False
|
|
|
|
|
|
def cleanup_temp_files(file_path: str) -> int:
|
|
"""
|
|
Clean up temporary files associated with a compression job
|
|
|
|
Args:
|
|
file_path: Original video file path
|
|
|
|
Returns:
|
|
Number of files cleaned up
|
|
"""
|
|
cleaned_count = 0
|
|
file_path_obj = Path(file_path)
|
|
parent_dir = file_path_obj.parent
|
|
|
|
try:
|
|
# Clean up temp file: temp_<filename>
|
|
temp_file = parent_dir / f"temp_{file_path_obj.name}"
|
|
if temp_file.exists():
|
|
temp_file.unlink()
|
|
logger.info(f"Cleaned up temp file: {temp_file.name}")
|
|
cleaned_count += 1
|
|
|
|
# Clean up ffmpeg pass log file
|
|
log_file = parent_dir / "ffmpeg2pass-0.log"
|
|
if log_file.exists():
|
|
log_file.unlink()
|
|
logger.info(f"Cleaned up ffmpeg log: {log_file.name}")
|
|
cleaned_count += 1
|
|
|
|
# Also check for mbtree file (created by x264)
|
|
mbtree_file = parent_dir / "ffmpeg2pass-0.log.mbtree"
|
|
if mbtree_file.exists():
|
|
mbtree_file.unlink()
|
|
logger.info(f"Cleaned up mbtree file: {mbtree_file.name}")
|
|
cleaned_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up temp files for {file_path}: {e}")
|
|
|
|
return cleaned_count
|
|
|
|
|
|
def find_orphaned_ffmpeg_processes(allowed_base_path: Path) -> List[dict]:
|
|
"""
|
|
Find all ffmpeg processes that are working on files within our base path
|
|
This helps identify potential orphaned processes
|
|
|
|
Args:
|
|
allowed_base_path: Base path where our videos are stored
|
|
|
|
Returns:
|
|
List of process info dicts
|
|
"""
|
|
orphaned = []
|
|
allowed_path_str = str(allowed_base_path.resolve())
|
|
|
|
try:
|
|
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
|
try:
|
|
if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower():
|
|
cmdline = proc.info.get('cmdline', [])
|
|
cmdline_str = ' '.join(cmdline) if cmdline else ''
|
|
|
|
# Check if this ffmpeg is working on a file in our path
|
|
if allowed_path_str in cmdline_str:
|
|
orphaned.append({
|
|
'pid': proc.info['pid'],
|
|
'cmdline': cmdline_str,
|
|
'name': proc.info['name']
|
|
})
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error finding orphaned processes: {e}")
|
|
|
|
return orphaned
|