Compare commits

...

9 Commits

Author SHA1 Message Date
Alihan
92338df716 Fix compression job queue persistence and recovery after Docker restart
This commit ensures that compression jobs survive Docker container restarts
and are automatically recovered and restarted.

Changes:
- Modified CancelledError handler to preserve job status during shutdown
- Jobs now keep 'processing' or 'validating' status instead of being marked
  as 'cancelled' when the app shuts down
- Added job persistence layer using SQLite database
- Implemented automatic job recovery on application startup
- Added process cleanup utilities for orphaned ffmpeg processes
- User-initiated cancellations still properly mark jobs as cancelled
- Jobs are visible in frontend after recovery

The recovery system:
1. Detects interrupted jobs (processing/validating status)
2. Cleans up orphaned ffmpeg processes and temp files
3. Restarts interrupted jobs from beginning
4. Maintains queue order and respects concurrency limits
5. Works with multiple jobs in the queue

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 02:57:04 +03:00
Alihan
80ca19cb4b Add dotfile handling in Locations section - display hidden files as gray non-clickable items at bottom
- Separate dotfiles (starting with .) from regular locations
- Display dotfiles in gray with disabled cursor
- Keep dotfiles always positioned at the bottom of the list regardless of sort order
- Prevent dotfile selection while maintaining visibility

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 01:37:52 +03:00
Alihan
91141eadcf Implement in-place compression file replacement with enhanced validation
## Overview
Refactored compression system to replace original files in-place instead of creating duplicate files with _compressed suffix. This eliminates disk space waste when compressing videos.

## Key Changes

### Backend (backend/compression.py)
- **Added enhanced validation** (`validate_video_enhanced`):
  - FFmpeg decode validation (catches corrupted files)
  - File size sanity check (minimum 1KB)
  - Duration comparison between original and compressed (±1 second tolerance)

- **Implemented safe file replacement** (`safe_replace_file`):
  - Enhanced validation before any file operations
  - Atomic file operations using `os.replace()` to prevent race conditions
  - Backup strategy: original → .backup during replacement
  - Automatic rollback if validation or replacement fails
  - Comprehensive logging at each step

- **Updated compress_video function**:
  - Removed `_compressed_75` suffix from output filenames
  - Changed to use original file path for in-place replacement
  - Calls new safe_replace_file method instead of simple os.rename
  - Improved logging messages to reflect replacement operation

### Frontend (frontend/src/App.jsx)
- Updated compression success messages to show reduction percentage
- Messages now indicate file is being replaced, not duplicated
- Displays compression reduction percentage (e.g., "Started compressing file.mp4 (75% reduction)")

## Safety Features
1. **Backup-and-restore**: Original file backed up as .backup until verification passes
2. **Enhanced validation**: Three-level validation before committing to replacement
3. **Atomic operations**: Uses os.replace() for atomic file replacements
4. **Automatic rollback**: If any step fails, original is restored from backup
5. **Comprehensive logging**: All file operations logged for debugging

## File Operations Flow
1. Compress to temp file
2. Enhanced validation (decode + size + duration)
3. Create backup: original.mp4 → original.mp4.backup
4. Move compressed: temp → original.mp4
5. Verify final file is accessible
6. Delete backup only after final verification
7. Rollback if any step fails

## Testing
- Docker rebuild: `docker compose down && docker compose build && docker compose up -d`
- Manual testing recommended for compression jobs with various video files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 01:25:40 +03:00
Alihan
d8242d47b9 Fix compression job watchdog timeout and error handling
- Increase watchdog timeout to 15 minutes (configurable via COMPRESSION_WATCHDOG_TIMEOUT env var)
- Fix stderr buffer overflow by setting 8MB limit and truncating to last 1000 lines
- Keep failed jobs visible in UI until manually removed (auto-prune only completed/cancelled)
- Add job removal endpoint: DELETE /api/compress/jobs/{job_id}?action=remove

Resolves issue where jobs stuck at 70% were killed prematurely due to stderr buffer overflow
and aggressive 5-minute timeout.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 01:59:28 +03:00
Alihan
f499691345 Improve nginx configuration and compression job UX
- Add security headers (X-Frame-Options, X-Content-Type-Options, X-XSS-Protection)
- Implement proper cache control for static assets and prevent index.html caching
- Fix SSE handling to properly fetch new compression jobs
- Immediately refresh job list after queuing compression for better UX feedback
2025-10-13 00:51:56 +03:00
Alihan
7db829d10c bugfix and security harden 2025-10-13 00:10:02 +03:00
Alihan
dd5fe1617a Fix 9 critical bugs: security, race conditions, precision, and UX improvements
Security fixes:
- Add filename path traversal validation (missing "/" check)
- Prevents attacks like filename="../../../etc/passwd"

Race condition and concurrency fixes:
- Add async locking to get_jobs_snapshot() to prevent dictionary iteration errors
- Fix watchdog loop to detect process completion immediately (move sleep to end)
- Fix EventSource ref updates during SSE reconnection to prevent memory leaks

Precision and calculation fixes:
- Keep duration as float instead of int for accurate bitrate calculations (~1% improvement)
- Prevents cumulative rounding errors in compression

Type safety improvements:
- Import and use Any from typing module instead of lowercase "any"
- Fixes Python type hints for proper static analysis

Media handling improvements:
- Determine MIME types dynamically using mimetypes module
- Supports MOV (video/quicktime), AVI, PNG properly instead of hardcoded types

UX fixes:
- Fix formatETA() to handle 0 seconds correctly (was showing "--" instead of "0m 0s")
- Use stable key for React video element (prevents unnecessary remounts)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 00:07:54 +03:00
Alihan
2acb4d9f4e Fix compression for files in root directory
- Handle __root__ special case in compression start endpoint
- Allow compression of videos not organized in date folders
2025-10-12 23:47:15 +03:00
Alihan
a7b7ad41e9 Fix video playback for files in root directory
- Handle __root__ special case in video streaming and image endpoints
- Support locations with files not organized by date folders
- Add visual indicator for root-level file collections in UI
2025-10-12 23:42:21 +03:00
15 changed files with 1091 additions and 76 deletions

View File

@@ -5,7 +5,8 @@
"mcp__playwright__browser_console_messages",
"mcp__playwright__browser_click",
"mcp__playwright__browser_evaluate",
"mcp__playwright__browser_navigate"
"mcp__playwright__browser_navigate",
"mcp__playwright__browser_wait_for"
],
"deny": [],
"ask": []

1
CLAUDE.md Normal file
View File

@@ -0,0 +1 @@
- app is running as docker container, so everytime there are need to reflect changes we have to docker compose down & build & up -d

View File

@@ -4,9 +4,11 @@ import uuid
import os
import re
import logging
import shutil
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional
from job_persistence import JobPersistence
# Configure logging
logging.basicConfig(
@@ -20,16 +22,18 @@ MAX_CONCURRENT_JOBS = 2
MAX_STORED_JOBS = 100
MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue
PRUNE_INTERVAL_SECONDS = 300 # 5 minutes
WATCHDOG_TIMEOUT_SECONDS = 300 # 5 minutes
WATCHDOG_TIMEOUT_SECONDS = int(os.getenv('COMPRESSION_WATCHDOG_TIMEOUT', '900')) # Default: 15 minutes
WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds
STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB
STDERR_READ_LIMIT = 8 * 1024 * 1024 # 8MB buffer limit for stderr
MAX_STDERR_LINES = 1000 # Keep only last 1000 lines of stderr
DEFAULT_AUDIO_BITRATE = 128 # kbps
MIN_VIDEO_BITRATE = 100 # kbps
class CompressionJob:
def __init__(self, file_path: str, reduce_percentage: int):
self.job_id = str(uuid.uuid4())
def __init__(self, file_path: str, reduce_percentage: int, job_id: Optional[str] = None):
self.job_id = job_id or str(uuid.uuid4())
self.file_path = file_path
self.reduce_percentage = reduce_percentage
self.status = "pending" # pending, processing, validating, completed, failed, cancelled
@@ -45,6 +49,59 @@ class CompressionJob:
self.target_size_mb = None
self.video_bitrate = None
self.duration_seconds = None
self.ffmpeg_pid = None # Track ffmpeg process PID
def to_dict(self) -> Dict:
"""Convert job to dictionary for database persistence"""
return {
'job_id': self.job_id,
'file_path': self.file_path,
'reduce_percentage': self.reduce_percentage,
'status': self.status,
'progress': self.progress,
'eta_seconds': self.eta_seconds,
'current_pass': self.current_pass,
'created_at': self.created_at,
'started_at': self.started_at,
'completed_at': self.completed_at,
'error': self.error,
'output_file': self.output_file,
'current_size_mb': self.current_size_mb,
'target_size_mb': self.target_size_mb,
'video_bitrate': self.video_bitrate,
'duration_seconds': self.duration_seconds,
'ffmpeg_pid': self.ffmpeg_pid
}
@classmethod
def from_dict(cls, data: Dict) -> 'CompressionJob':
"""Create job from database dictionary"""
job = cls(
file_path=data['file_path'],
reduce_percentage=data['reduce_percentage'],
job_id=data['job_id']
)
job.status = data.get('status', 'pending')
job.progress = data.get('progress', 0.0)
job.eta_seconds = data.get('eta_seconds')
job.current_pass = data.get('current_pass', 0)
job.error = data.get('error')
job.output_file = data.get('output_file')
job.current_size_mb = data.get('current_size_mb')
job.target_size_mb = data.get('target_size_mb')
job.video_bitrate = data.get('video_bitrate')
job.duration_seconds = data.get('duration_seconds')
job.ffmpeg_pid = data.get('ffmpeg_pid')
# Parse datetime strings back to datetime objects
for key in ['created_at', 'started_at', 'completed_at']:
if data.get(key):
if isinstance(data[key], str):
job.__dict__[key] = datetime.fromisoformat(data[key])
else:
job.__dict__[key] = data[key]
return job
class CompressionManager:
@@ -52,15 +109,36 @@ class CompressionManager:
TIME_PATTERN = re.compile(r'out_time_ms=(\d+)')
FPS_PATTERN = re.compile(r'fps=([\d.]+)')
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None):
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None, cache_invalidation_callback=None):
self.jobs: Dict[str, CompressionJob] = {}
self.active_jobs: Dict[str, asyncio.Task] = {}
self.semaphore = asyncio.Semaphore(max_concurrent)
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
self.health_checker = health_checker
self.cache_invalidation_callback = cache_invalidation_callback
self._pruning_task: Optional[asyncio.Task] = None
self._jobs_lock = asyncio.Lock()
self._start_periodic_pruning()
self.persistence = JobPersistence()
self._pruning_started = False
async def initialize_persistence(self):
"""Initialize the persistence layer - call this on startup"""
await self.persistence.initialize()
logger.info("Job persistence initialized")
# Start periodic pruning (only once when persistence is initialized)
if not self._pruning_started:
self._start_periodic_pruning()
self._pruning_started = True
async def load_jobs_from_database(self):
"""Load all jobs from database into memory"""
async with self._jobs_lock:
jobs_data = await self.persistence.get_all_jobs()
for job_data in jobs_data:
job = CompressionJob.from_dict(job_data)
self.jobs[job.job_id] = job
logger.info(f"Loaded {len(jobs_data)} jobs from database")
async def get_video_info(self, file_path: str) -> Dict:
"""Extract video duration and file size using ffprobe"""
@@ -80,7 +158,7 @@ class CompressionManager:
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
duration = int(float(stdout.decode().strip()))
duration = float(stdout.decode().strip())
return {
'size_mb': file_size_mb,
@@ -89,7 +167,7 @@ class CompressionManager:
def calculate_bitrates(self, current_size_mb: float,
target_size_mb: float,
duration_seconds: int,
duration_seconds: float,
audio_bitrate: int = DEFAULT_AUDIO_BITRATE) -> int:
"""Calculate video bitrate based on target size"""
# Total bitrate in kbps
@@ -104,6 +182,7 @@ class CompressionManager:
try:
job.status = "processing"
job.started_at = datetime.now()
await self.persistence.save_job(job.to_dict())
# Get video information
info = await self.get_video_info(job.file_path)
@@ -123,10 +202,32 @@ class CompressionManager:
f"Duration: {job.duration_seconds}s, "
f"Video Bitrate: {job.video_bitrate} kbps")
# Generate output filename
# Check disk space BEFORE starting compression
file_path = Path(job.file_path)
disk_usage = shutil.disk_usage(file_path.parent)
# Estimate required space: temp file + original + safety margin (1.5x original)
required_bytes = (job.current_size_mb * 1024 * 1024) * 1.5
available_bytes = disk_usage.free
if available_bytes < required_bytes:
required_gb = required_bytes / (1024**3)
available_gb = available_bytes / (1024**3)
error_msg = (
f"Insufficient disk space: need {required_gb:.1f}GB, "
f"have {available_gb:.1f}GB available"
)
logger.error(f"Job {job.job_id}: {error_msg}")
raise ValueError(error_msg)
logger.info(
f"Job {job.job_id}: Disk space check passed "
f"(need {required_bytes/(1024**3):.1f}GB, have {available_bytes/(1024**3):.1f}GB)"
)
# Generate output filename (use original path for in-place replacement)
file_path = Path(job.file_path)
temp_file = file_path.parent / f"temp_{file_path.name}"
output_file = file_path.parent / f"{file_path.stem}_compressed_{job.reduce_percentage}{file_path.suffix}"
# PASS 1: Analysis
job.current_pass = 1
@@ -144,31 +245,42 @@ class CompressionManager:
self.cleanup_temp_files(job)
return
# VALIDATION
# SAFE FILE REPLACEMENT WITH VALIDATION
job.status = "validating"
job.progress = 95.0
if await self.validate_video(temp_file):
# Move temp file to final output
os.rename(temp_file, output_file)
job.output_file = str(output_file)
await self.persistence.update_job_status(job.job_id, "validating", progress=95.0)
logger.info(f"Job {job.job_id}: Starting safe file replacement process")
if await self.safe_replace_file(file_path, temp_file, job):
job.output_file = str(file_path) # Output is same as original path
job.status = "completed"
job.progress = 100.0
job.completed_at = datetime.now()
job.ffmpeg_pid = None # Clear PID on completion
await self.persistence.save_job(job.to_dict())
self.cleanup_temp_files(job)
logger.info(f"Job {job.job_id} completed successfully")
logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place")
else:
job.status = "failed"
job.error = "Validation failed: Compressed video is corrupted"
job.error = "File replacement failed or validation check failed"
await self.persistence.update_job_status(job.job_id, "failed", error=job.error)
self.cleanup_temp_files(job)
logger.error(f"Job {job.job_id} failed validation")
logger.error(f"Job {job.job_id} failed - file replacement unsuccessful")
except asyncio.CancelledError:
job.status = "cancelled"
# Don't mark as cancelled - could be app shutdown
# Keep current status so job can be recovered on restart
job.ffmpeg_pid = None # Clear PID since process won't survive
# Don't update database status - leave it as "processing" or "validating"
# for recovery on restart
self.cleanup_temp_files(job)
logger.info(f"Job {job.job_id} cancelled")
logger.info(f"Job {job.job_id} interrupted (task cancelled)")
raise # Re-raise to properly cancel the task
except Exception as e:
job.status = "failed"
job.error = str(e)
job.ffmpeg_pid = None # Clear PID on failure
await self.persistence.update_job_status(job.job_id, "failed", error=str(e))
self.cleanup_temp_files(job)
logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
@@ -210,18 +322,29 @@ class CompressionManager:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow
)
# Track PID for first pass only (second pass uses same job)
if pass_num == 1 and process.pid:
job.ffmpeg_pid = process.pid
await self.persistence.set_ffmpeg_pid(job.job_id, process.pid)
logger.info(f"Job {job.job_id}: ffmpeg PID {process.pid} tracked")
# Progress tracking
stderr_output = []
last_progress_update = datetime.now()
last_progress_update = None # Initialize as None, will be set when FFmpeg starts producing output
async def read_stdout():
"""Read and process stdout for progress updates"""
nonlocal last_progress_update
try:
async for line in process.stdout:
# Initialize timer on first progress update from FFmpeg
if last_progress_update is None:
last_progress_update = datetime.now()
logger.info(f"Job {job.job_id}: FFmpeg started producing output, watchdog timer initialized")
if job.status == "cancelled":
process.kill()
return
@@ -256,6 +379,16 @@ class CompressionManager:
else:
remaining_sec = remaining_sec_current_pass
job.eta_seconds = int(remaining_sec)
# Periodically save progress to database (every ~5% or 30 seconds)
# Use modulo to reduce DB writes
if int(job.progress) % 5 == 0:
await self.persistence.update_job_progress(
job.job_id,
job.progress,
job.eta_seconds,
job.current_pass
)
except Exception as e:
logger.error(f"Error reading stdout: {e}")
@@ -265,6 +398,11 @@ class CompressionManager:
async for line in process.stderr:
line_str = line.decode('utf-8', errors='ignore')
stderr_output.append(line_str)
# Truncate stderr_output to keep only last MAX_STDERR_LINES lines
if len(stderr_output) > MAX_STDERR_LINES:
stderr_output[:] = stderr_output[-MAX_STDERR_LINES:]
# Log important stderr messages
if 'error' in line_str.lower():
logger.error(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
@@ -277,20 +415,22 @@ class CompressionManager:
"""Monitor for stuck jobs - kills process if no progress"""
nonlocal last_progress_update
while process.returncode is None:
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
if job.status == "cancelled":
process.kill()
return
time_since_update = (datetime.now() - last_progress_update).total_seconds()
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
logger.error(f"Watchdog: {error_msg}")
job.status = "failed"
job.error = error_msg
process.kill()
raise Exception(error_msg)
# Only check timeout if FFmpeg has started producing output
if last_progress_update is not None:
time_since_update = (datetime.now() - last_progress_update).total_seconds()
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
logger.error(f"Watchdog: {error_msg}")
job.status = "failed"
job.error = error_msg
process.kill()
raise Exception(error_msg)
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
# Run stdout, stderr readers and watchdog concurrently to prevent deadlock
try:
@@ -340,6 +480,134 @@ class CompressionManager:
# Also check return code
return process.returncode == 0
async def validate_video_enhanced(self, original_path: Path, compressed_path: Path) -> bool:
"""Enhanced validation with size and duration checks before file replacement"""
# 1. Basic ffmpeg decode validation
if not await self.validate_video(compressed_path):
logger.error(f"Enhanced validation failed: FFmpeg decode check failed")
return False
# 2. File size sanity check (minimum 1KB)
try:
compressed_size = os.path.getsize(str(compressed_path))
if compressed_size < 1024:
logger.error(f"Enhanced validation failed: Compressed file too small ({compressed_size} bytes)")
return False
logger.info(f"Enhanced validation: Size check passed ({compressed_size} bytes)")
except Exception as e:
logger.error(f"Enhanced validation failed: Could not check file size: {e}")
return False
# 3. Duration verification (optional but recommended)
try:
original_info = await self.get_video_info(str(original_path))
compressed_info = await self.get_video_info(str(compressed_path))
duration_diff = abs(original_info['duration_seconds'] - compressed_info['duration_seconds'])
# Allow up to 1 second difference due to rounding
if duration_diff > 1.0:
logger.error(f"Enhanced validation failed: Duration mismatch - "
f"original: {original_info['duration_seconds']:.2f}s vs "
f"compressed: {compressed_info['duration_seconds']:.2f}s "
f"(diff: {duration_diff:.2f}s)")
return False
logger.info(f"Enhanced validation: Duration check passed "
f"(original: {original_info['duration_seconds']:.2f}s, "
f"compressed: {compressed_info['duration_seconds']:.2f}s)")
except Exception as e:
logger.error(f"Enhanced validation failed: Could not verify duration: {e}")
return False
logger.info(f"Enhanced validation: All checks passed for {compressed_path.name}")
return True
async def safe_replace_file(self, original_path: Path, temp_file: Path, job: CompressionJob) -> bool:
"""Safely replace original file with compressed version with rollback capability"""
backup_path = original_path.parent / f"{original_path.name}.backup"
try:
# Enhanced validation BEFORE any file operations
logger.info(f"Validating compressed file before replacement: {temp_file.name}")
if not await self.validate_video_enhanced(original_path, temp_file):
logger.error(f"Safe replace failed: Validation check failed")
return False
# Step 1: Create backup of original file
logger.info(f"Creating backup: {original_path.name} -> {backup_path.name}")
try:
os.replace(str(original_path), str(backup_path))
except Exception as e:
logger.error(f"Safe replace failed: Could not create backup: {e}")
return False
# Step 2: Replace original with compressed file (atomic operation)
logger.info(f"Replacing original file with compressed version: {temp_file.name}")
try:
os.replace(str(temp_file), str(original_path))
except Exception as e:
logger.error(f"Safe replace failed: Could not move compressed file. Attempting rollback...")
# Rollback: restore original from backup
try:
if backup_path.exists():
os.replace(str(backup_path), str(original_path))
logger.info(f"Rollback successful: Original file restored")
except Exception as rollback_error:
logger.error(f"Rollback failed: {rollback_error}")
return False
# Step 3: Verify final file is accessible
logger.info(f"Verifying replaced file is accessible: {original_path.name}")
try:
if not original_path.exists() or not original_path.is_file():
logger.error(f"Safe replace failed: Replaced file not accessible. Attempting rollback...")
# Rollback: restore original from backup
try:
if backup_path.exists():
os.replace(str(backup_path), str(original_path))
logger.info(f"Rollback successful: Original file restored")
except Exception as rollback_error:
logger.error(f"Rollback failed: {rollback_error}")
return False
# Get final file size
final_size = os.path.getsize(str(original_path))
logger.info(f"File replacement successful. Final size: {final_size / (1024*1024):.2f} MB")
except Exception as e:
logger.error(f"Safe replace failed: Could not verify final file: {e}")
return False
# Step 4: Delete backup file only after successful verification
logger.info(f"Deleting backup file: {backup_path.name}")
try:
if backup_path.exists():
backup_path.unlink()
logger.info(f"Backup file deleted successfully")
except Exception as e:
logger.warning(f"Failed to delete backup file (non-critical): {e}")
# Step 5: Invalidate cache for this directory to show updated file size
if self.cache_invalidation_callback:
try:
# Invalidate all cache entries containing the parent directory path
parent_name = original_path.parent.name
self.cache_invalidation_callback(parent_name)
logger.info(f"Cache invalidated for directory: {parent_name}")
except Exception as e:
logger.warning(f"Failed to invalidate cache (non-critical): {e}")
return True
except Exception as e:
logger.error(f"Safe replace encountered unexpected error: {e}", exc_info=True)
# Attempt rollback on any unexpected error
try:
if backup_path.exists():
os.replace(str(backup_path), str(original_path))
logger.info(f"Rollback successful: Original file restored after unexpected error")
except Exception as rollback_error:
logger.error(f"Rollback failed: {rollback_error}")
return False
def cleanup_temp_files(self, job: CompressionJob):
"""Clean up temporary files"""
file_path = Path(job.file_path)
@@ -370,14 +638,14 @@ class CompressionManager:
self._pruning_task = asyncio.create_task(prune_periodically())
def _prune_old_jobs(self, max_jobs: int = MAX_STORED_JOBS):
"""Remove oldest completed jobs if total exceeds max_jobs"""
"""Remove oldest completed/cancelled jobs if total exceeds max_jobs. Failed jobs are kept visible."""
if len(self.jobs) <= max_jobs:
return
# Get inactive completed jobs sorted by time
# Get inactive completed/cancelled jobs sorted by time (exclude failed jobs)
inactive = [
(jid, j) for jid, j in self.jobs.items()
if jid not in self.active_jobs and j.status in ['completed', 'failed', 'cancelled']
if jid not in self.active_jobs and j.status in ['completed', 'cancelled']
]
inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at)
@@ -386,9 +654,10 @@ class CompressionManager:
for jid, _ in inactive[:num_to_remove]:
self.jobs.pop(jid, None)
def get_jobs_snapshot(self) -> list:
async def get_jobs_snapshot(self) -> list:
"""Get a safe snapshot of all jobs for iteration"""
return list(self.jobs.values())
async with self._jobs_lock:
return list(self.jobs.values())
def get_pending_count(self) -> int:
"""Get count of pending jobs"""
@@ -434,6 +703,9 @@ class CompressionManager:
job = CompressionJob(file_path, reduce_percentage)
self.jobs[job.job_id] = job
# Save initial job to database
await self.persistence.save_job(job.to_dict())
# Start compression in background
task = asyncio.create_task(self.compress_video(job))
self.active_jobs[job.job_id] = task
@@ -444,8 +716,21 @@ class CompressionManager:
return job.job_id
async def cancel_job(self, job_id: str):
"""Cancel a running compression job"""
"""Cancel a running compression job (user-initiated)"""
if job_id in self.jobs:
self.jobs[job_id].status = "cancelled"
# Save cancelled status to database (user-initiated cancellation)
await self.persistence.update_job_status(job_id, "cancelled")
if job_id in self.active_jobs:
self.active_jobs[job_id].cancel()
async def remove_job(self, job_id: str):
"""Remove a job from the list (for failed/completed jobs)"""
if job_id in self.jobs:
job = self.jobs[job_id]
# Only allow removal of inactive jobs
if job_id not in self.active_jobs:
async with self._jobs_lock:
self.jobs.pop(job_id, None)
return True
return False

243
backend/job_persistence.py Normal file
View File

@@ -0,0 +1,243 @@
import sqlite3
import asyncio
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any
from contextlib import asynccontextmanager
import aiosqlite
import os
logger = logging.getLogger(__name__)
# Database configuration from environment
DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db')
JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7'))
class JobPersistence:
"""SQLite-based persistence layer for compression jobs"""
def __init__(self, db_path: str = DATABASE_PATH):
self.db_path = db_path
self._initialized = False
self._lock = asyncio.Lock()
async def initialize(self):
"""Initialize database schema"""
if self._initialized:
return
async with self._lock:
if self._initialized:
return
# Ensure directory exists
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS compression_jobs (
job_id TEXT PRIMARY KEY,
file_path TEXT NOT NULL,
reduce_percentage INTEGER NOT NULL,
status TEXT NOT NULL,
progress REAL DEFAULT 0.0,
current_pass INTEGER DEFAULT 0,
current_size_mb REAL,
target_size_mb REAL,
video_bitrate INTEGER,
duration_seconds REAL,
eta_seconds INTEGER,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
completed_at TIMESTAMP,
output_file TEXT,
error TEXT,
ffmpeg_pid INTEGER
)
""")
# Create indexes for performance
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_status
ON compression_jobs(status)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_created_at
ON compression_jobs(created_at)
""")
await db.commit()
self._initialized = True
logger.info(f"Job persistence initialized at {self.db_path}")
async def save_job(self, job_data: Dict[str, Any]):
"""Insert or update a job in the database"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
# Convert datetime objects to ISO format strings
job_data_copy = job_data.copy()
for key in ['created_at', 'started_at', 'completed_at']:
if key in job_data_copy and job_data_copy[key]:
if isinstance(job_data_copy[key], datetime):
job_data_copy[key] = job_data_copy[key].isoformat()
await db.execute("""
INSERT OR REPLACE INTO compression_jobs (
job_id, file_path, reduce_percentage, status, progress,
current_pass, current_size_mb, target_size_mb, video_bitrate,
duration_seconds, eta_seconds, created_at, started_at,
completed_at, output_file, error, ffmpeg_pid
) VALUES (
:job_id, :file_path, :reduce_percentage, :status, :progress,
:current_pass, :current_size_mb, :target_size_mb, :video_bitrate,
:duration_seconds, :eta_seconds, :created_at, :started_at,
:completed_at, :output_file, :error, :ffmpeg_pid
)
""", job_data_copy)
await db.commit()
async def update_job_status(self, job_id: str, status: str,
progress: Optional[float] = None,
error: Optional[str] = None):
"""Quick update for job status and progress"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
updates = {"status": status}
if progress is not None:
updates["progress"] = progress
if error is not None:
updates["error"] = error
# Set completed_at for terminal states
if status in ['completed', 'failed', 'cancelled']:
updates["completed_at"] = datetime.now().isoformat()
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
await db.execute(query, {**updates, "job_id": job_id})
await db.commit()
async def update_job_progress(self, job_id: str, progress: float,
eta_seconds: Optional[int] = None,
current_pass: Optional[int] = None):
"""Update job progress and ETA"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
updates = {"progress": progress}
if eta_seconds is not None:
updates["eta_seconds"] = eta_seconds
if current_pass is not None:
updates["current_pass"] = current_pass
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
await db.execute(query, {**updates, "job_id": job_id})
await db.commit()
async def set_ffmpeg_pid(self, job_id: str, pid: int):
"""Update the ffmpeg PID for a job"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?",
(pid, job_id)
)
await db.commit()
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
"""Get a single job by ID"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM compression_jobs WHERE job_id = ?",
(job_id,)
) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_all_jobs(self) -> List[Dict[str, Any]]:
"""Get all jobs from the database"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM compression_jobs ORDER BY created_at DESC"
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]:
"""Get all jobs with a specific status"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC",
(status,)
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_interrupted_jobs(self) -> List[Dict[str, Any]]:
"""Get jobs that were interrupted (status=processing/validating)"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT * FROM compression_jobs
WHERE status IN ('processing', 'validating')
ORDER BY created_at ASC"""
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def delete_job(self, job_id: str):
"""Delete a job from the database"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"DELETE FROM compression_jobs WHERE job_id = ?",
(job_id,)
)
await db.commit()
async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS):
"""Delete completed/cancelled jobs older than retention period"""
await self.initialize()
cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60)
cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat()
async with aiosqlite.connect(self.db_path) as db:
result = await db.execute("""
DELETE FROM compression_jobs
WHERE status IN ('completed', 'cancelled')
AND completed_at < ?
""", (cutoff_iso,))
deleted_count = result.rowcount
await db.commit()
if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)")
return deleted_count

View File

@@ -2,7 +2,7 @@ from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse, Response
from pathlib import Path
from typing import List, Dict, Optional
from typing import List, Dict, Optional, Any
from pydantic import BaseModel
import os
from datetime import datetime
@@ -12,9 +12,15 @@ import asyncio
import json
import time
import logging
import mimetypes
from sse_starlette.sse import EventSourceResponse
from compression import CompressionManager
from filesystem_health import FilesystemHealthChecker
from process_utils import (
kill_process_safely,
cleanup_temp_files,
find_orphaned_ffmpeg_processes
)
# Configure logging
logger = logging.getLogger(__name__)
@@ -26,16 +32,20 @@ STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for video streaming
SSE_UPDATE_INTERVAL = 0.5 # Update every 500ms
CACHE_TTL_SECONDS = 60 # Cache directory listings for 60 seconds
# Job recovery configuration
AUTO_RESTART_INTERRUPTED_JOBS = os.getenv('AUTO_RESTART_INTERRUPTED_JOBS', 'true').lower() == 'true'
CLEANUP_ORPHANED_PROCESSES = os.getenv('CLEANUP_ORPHANED_PROCESSES', 'true').lower() == 'true'
# Base path for footages
FOOTAGES_PATH = Path("/footages")
# Simple in-memory cache for directory listings
class SimpleCache:
def __init__(self, ttl_seconds: int = CACHE_TTL_SECONDS):
self.cache: Dict[str, tuple[float, any]] = {}
self.cache: Dict[str, tuple[float, Any]] = {}
self.ttl = ttl_seconds
def get(self, key: str) -> Optional[any]:
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
timestamp, value = self.cache[key]
if time.time() - timestamp < self.ttl:
@@ -44,22 +54,32 @@ class SimpleCache:
del self.cache[key]
return None
def set(self, key: str, value: any):
def set(self, key: str, value: Any):
self.cache[key] = (time.time(), value)
def clear(self):
self.cache.clear()
def invalidate(self, pattern: str = None):
"""Invalidate cache entries matching pattern or all if pattern is None"""
if pattern is None:
self.cache.clear()
else:
keys_to_delete = [k for k in self.cache.keys() if pattern in k]
for key in keys_to_delete:
del self.cache[key]
directory_cache = SimpleCache()
# Initialize filesystem health checker first (compression manager needs it)
filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH)
# Initialize compression manager with health checker
# Initialize compression manager with health checker and cache callback
compression_manager = CompressionManager(
max_concurrent=1,
allowed_base_path=FOOTAGES_PATH,
health_checker=filesystem_health_checker
health_checker=filesystem_health_checker,
cache_invalidation_callback=directory_cache.invalidate
)
# CORS middleware for frontend communication
@@ -95,6 +115,116 @@ async def get_file_info(file_path: Path) -> Dict:
}
async def recover_compression_jobs():
"""
Recover compression jobs from database after app restart/crash
This function:
1. Loads all jobs from database
2. Handles interrupted jobs (processing/validating)
3. Cleans up orphaned ffmpeg processes
4. Restarts pending jobs automatically
"""
logger.info("=" * 60)
logger.info("Starting compression job recovery...")
logger.info("=" * 60)
# Initialize persistence layer
await compression_manager.initialize_persistence()
# Load all jobs from database
await compression_manager.load_jobs_from_database()
# Get interrupted jobs (were processing when app crashed)
interrupted_jobs = await compression_manager.persistence.get_interrupted_jobs()
if interrupted_jobs:
logger.info(f"Found {len(interrupted_jobs)} interrupted jobs")
for job_data in interrupted_jobs:
job_id = job_data['job_id']
file_path = job_data['file_path']
ffmpeg_pid = job_data.get('ffmpeg_pid')
logger.info(f"Processing interrupted job {job_id}: {Path(file_path).name}")
# Kill orphaned ffmpeg process if it exists and is verified as ours
if ffmpeg_pid and CLEANUP_ORPHANED_PROCESSES:
killed = kill_process_safely(ffmpeg_pid, file_path, timeout=10)
if killed:
logger.info(f" ✓ Killed orphaned ffmpeg process PID {ffmpeg_pid}")
else:
logger.warning(f" ⚠ Could not verify/kill PID {ffmpeg_pid}")
# Clean up temp files
cleaned = cleanup_temp_files(file_path)
if cleaned > 0:
logger.info(f" ✓ Cleaned up {cleaned} temp file(s)")
# Decide what to do with interrupted job
if AUTO_RESTART_INTERRUPTED_JOBS:
# Restart the job
logger.info(f" ⟳ Restarting interrupted job...")
await compression_manager.persistence.update_job_status(
job_id, "pending", progress=0.0
)
# Update in-memory job status
if job_id in compression_manager.jobs:
compression_manager.jobs[job_id].status = "pending"
compression_manager.jobs[job_id].progress = 0.0
compression_manager.jobs[job_id].ffmpeg_pid = None
else:
# Mark as failed
logger.info(f" ✗ Marking as failed (auto-restart disabled)")
await compression_manager.persistence.update_job_status(
job_id, "failed",
error="Job interrupted by application restart/crash"
)
# Update in-memory job status
if job_id in compression_manager.jobs:
compression_manager.jobs[job_id].status = "failed"
compression_manager.jobs[job_id].error = "Job interrupted by application restart/crash"
compression_manager.jobs[job_id].ffmpeg_pid = None
else:
logger.info("No interrupted jobs found")
# Get pending jobs
pending_jobs = await compression_manager.persistence.get_jobs_by_status("pending")
if pending_jobs:
logger.info(f"Found {len(pending_jobs)} pending jobs - will be processed automatically")
# Restart pending jobs
for job_data in pending_jobs:
job_id = job_data['job_id']
if job_id in compression_manager.jobs:
job = compression_manager.jobs[job_id]
# Only restart if not already in active jobs
if job_id not in compression_manager.active_jobs:
logger.info(f" ⟳ Restarting pending job: {Path(job.file_path).name}")
task = asyncio.create_task(compression_manager.compress_video(job))
compression_manager.active_jobs[job_id] = task
task.add_done_callback(lambda t: compression_manager.active_jobs.pop(job_id, None))
else:
logger.info("No pending jobs to restart")
# Optional: Find any other orphaned ffmpeg processes in our path
if CLEANUP_ORPHANED_PROCESSES:
orphaned = find_orphaned_ffmpeg_processes(FOOTAGES_PATH)
if orphaned:
logger.warning(f"Found {len(orphaned)} untracked ffmpeg processes in our path:")
for proc in orphaned:
logger.warning(f" PID {proc['pid']}: {proc['cmdline'][:100]}...")
logger.warning(f" ⚠ NOT killing (not in our database) - manual review recommended")
# Clean up old completed/cancelled jobs
deleted = await compression_manager.persistence.cleanup_old_jobs()
logger.info("=" * 60)
logger.info("Compression job recovery complete")
logger.info("=" * 60)
@app.on_event("startup")
async def startup_event():
"""Run startup tasks"""
@@ -111,6 +241,10 @@ async def startup_event():
# Start background monitoring
filesystem_health_checker.start_monitoring()
# Recover compression jobs from database
await recover_compression_jobs()
logger.info("Application startup complete")
@@ -176,6 +310,8 @@ async def get_dates(location: str) -> List[Dict]:
raise HTTPException(status_code=404, detail="Location not found")
dates = []
has_files_in_root = False
for item in location_path.iterdir():
if item.is_dir():
stat = await aiofiles.os.stat(item)
@@ -183,6 +319,16 @@ async def get_dates(location: str) -> List[Dict]:
"name": item.name,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
elif item.is_file():
has_files_in_root = True
# If no date folders but has files in root, return special marker
if not dates and has_files_in_root:
dates.append({
"name": "__root__",
"modified": None,
"message": "📁 Files not organized by date"
})
# Cache the result
directory_cache.set(cache_key, dates)
@@ -202,7 +348,11 @@ async def get_files(location: str, date: str) -> List[Dict]:
if ".." in location or ".." in date or "/" in location or "/" in date:
raise HTTPException(status_code=400, detail="Invalid path characters")
files_path = (FOOTAGES_PATH / location / date).resolve()
# Handle special __root__ marker for locations with files in root
if date == "__root__":
files_path = (FOOTAGES_PATH / location).resolve()
else:
files_path = (FOOTAGES_PATH / location / date).resolve()
# Ensure resolved path is still within FOOTAGES_PATH
try:
@@ -227,10 +377,14 @@ async def get_files(location: str, date: str) -> List[Dict]:
async def stream_video(location: str, date: str, filename: str, request: Request):
"""Stream video file with HTTP range request support for fast seeking"""
# Sanitize path components to prevent traversal
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date:
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date or "/" in filename:
raise HTTPException(status_code=400, detail="Invalid path characters")
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
# Handle __root__ case (files not in date subdirectories)
if date == "__root__":
file_path = (FOOTAGES_PATH / location / filename).resolve()
else:
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
# Ensure resolved path is still within FOOTAGES_PATH
try:
@@ -279,20 +433,20 @@ async def stream_video(location: str, date: str, filename: str, request: Request
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(content_length),
"Content-Type": "video/mp4",
"Content-Type": mimetypes.guess_type(file_path)[0] or "video/mp4",
}
return StreamingResponse(
iterfile(),
status_code=206,
headers=headers,
media_type="video/mp4"
media_type=mimetypes.guess_type(file_path)[0] or "video/mp4"
)
# No range header - return full file
return FileResponse(
file_path,
media_type="video/mp4",
media_type=mimetypes.guess_type(file_path)[0] or "video/mp4",
headers={"Accept-Ranges": "bytes"}
)
@@ -301,10 +455,14 @@ async def stream_video(location: str, date: str, filename: str, request: Request
async def get_image(location: str, date: str, filename: str):
"""Serve image file"""
# Sanitize path components to prevent traversal
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date:
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date or "/" in filename:
raise HTTPException(status_code=400, detail="Invalid path characters")
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
# Handle __root__ case (files not in date subdirectories)
if date == "__root__":
file_path = (FOOTAGES_PATH / location / filename).resolve()
else:
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
# Ensure resolved path is still within FOOTAGES_PATH
try:
@@ -319,8 +477,8 @@ async def get_image(location: str, date: str, filename: str):
if file_path.suffix not in IMAGE_EXTENSIONS:
raise HTTPException(status_code=400, detail="Not an image file")
# Determine media type
media_type = "image/jpeg" if file_path.suffix.lower() in {".jpg", ".jpeg"} else "image/png"
# Determine media type dynamically
media_type = mimetypes.guess_type(file_path)[0] or "image/jpeg"
return FileResponse(file_path, media_type=media_type)
@@ -340,7 +498,11 @@ async def start_compression(request: CompressionRequest):
if not 1 <= request.reduce_percentage <= 90:
raise HTTPException(status_code=400, detail="Percentage must be between 1-90")
file_path = FOOTAGES_PATH / request.location / request.date / request.filename
# Handle __root__ case (files not in date subdirectories)
if request.date == "__root__":
file_path = FOOTAGES_PATH / request.location / request.filename
else:
file_path = FOOTAGES_PATH / request.location / request.date / request.filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
@@ -358,7 +520,7 @@ async def get_all_jobs():
"""Get all compression jobs"""
jobs = []
# Use snapshot to avoid race condition during iteration
for job in compression_manager.get_jobs_snapshot():
for job in await compression_manager.get_jobs_snapshot():
jobs.append({
"job_id": job.job_id,
"file_path": job.file_path,
@@ -397,13 +559,27 @@ async def get_job_status(job_id: str):
@app.delete("/api/compress/jobs/{job_id}")
async def cancel_job(job_id: str):
"""Cancel a compression job"""
async def delete_job(job_id: str, action: str = "cancel"):
"""Delete or cancel a compression job
Args:
job_id: The job ID to delete
action: 'cancel' to cancel a running job, 'remove' to remove from list
"""
if job_id not in compression_manager.jobs:
raise HTTPException(status_code=404, detail="Job not found")
await compression_manager.cancel_job(job_id)
return {"status": "cancelled"}
if action == "remove":
# Remove completed/failed job from list
success = await compression_manager.remove_job(job_id)
if success:
return {"status": "removed"}
else:
raise HTTPException(status_code=400, detail="Cannot remove active job")
else:
# Cancel running job
await compression_manager.cancel_job(job_id)
return {"status": "cancelled"}
@app.get("/api/compress/events")
@@ -418,7 +594,7 @@ async def compression_events(request: Request):
# Send status of all active jobs (use snapshot to avoid race condition)
active_jobs = []
for job in compression_manager.get_jobs_snapshot():
for job in await compression_manager.get_jobs_snapshot():
if job.status in ["pending", "processing", "validating"]:
active_jobs.append({
"job_id": job.job_id,

211
backend/process_utils.py Normal file
View File

@@ -0,0 +1,211 @@
import psutil
import logging
from pathlib import Path
from typing import Optional, List
logger = logging.getLogger(__name__)
def process_exists(pid: int) -> bool:
"""Check if a process with given PID exists"""
try:
return psutil.pid_exists(pid)
except Exception as e:
logger.error(f"Error checking if PID {pid} exists: {e}")
return False
def get_process_info(pid: int) -> Optional[dict]:
"""Get detailed information about a process"""
try:
proc = psutil.Process(pid)
return {
'pid': pid,
'name': proc.name(),
'cmdline': proc.cmdline(),
'cwd': proc.cwd(),
'status': proc.status(),
'create_time': proc.create_time()
}
except psutil.NoSuchProcess:
return None
except Exception as e:
logger.error(f"Error getting process info for PID {pid}: {e}")
return None
def is_our_ffmpeg_process(pid: int, expected_file_path: str) -> bool:
"""
Verify if a process is our ffmpeg process by checking:
1. Process name is ffmpeg
2. Command line contains our input file path
3. Working directory is accessible
Args:
pid: Process ID to check
expected_file_path: The file path this ffmpeg should be processing
Returns:
True if verified as our ffmpeg process, False otherwise
"""
try:
proc = psutil.Process(pid)
# Check 1: Process name must be ffmpeg
process_name = proc.name().lower()
if 'ffmpeg' not in process_name:
logger.debug(f"PID {pid}: Not an ffmpeg process (name: {process_name})")
return False
# Check 2: Command line must contain our file path
cmdline = proc.cmdline()
cmdline_str = ' '.join(cmdline)
# The file path should appear in the command line
if expected_file_path not in cmdline_str:
logger.debug(f"PID {pid}: File path '{expected_file_path}' not in command line")
return False
# Check 3: Should have -i flag (input file) and -progress flag (our marker)
has_input_flag = '-i' in cmdline
has_progress_flag = '-progress' in cmdline or 'progress' in cmdline_str
if not (has_input_flag and has_progress_flag):
logger.debug(f"PID {pid}: Missing expected ffmpeg flags")
return False
logger.info(f"PID {pid}: Verified as our ffmpeg process for '{expected_file_path}'")
return True
except psutil.NoSuchProcess:
logger.debug(f"PID {pid}: Process no longer exists")
return False
except psutil.AccessDenied:
logger.warning(f"PID {pid}: Access denied - cannot verify (won't kill)")
return False
except Exception as e:
logger.error(f"Error verifying PID {pid}: {e}")
return False
def kill_process_safely(pid: int, expected_file_path: str, timeout: int = 10) -> bool:
"""
Safely kill a process after verification
Args:
pid: Process ID to kill
expected_file_path: Expected file path to verify against
timeout: Seconds to wait for graceful termination
Returns:
True if process was killed successfully, False otherwise
"""
# Double-check verification before killing
if not is_our_ffmpeg_process(pid, expected_file_path):
logger.warning(f"PID {pid}: Failed verification - NOT killing for safety")
return False
try:
proc = psutil.Process(pid)
# Try graceful termination first (SIGTERM)
logger.info(f"PID {pid}: Sending SIGTERM for graceful shutdown")
proc.terminate()
try:
# Wait for process to terminate
proc.wait(timeout=timeout)
logger.info(f"PID {pid}: Process terminated gracefully")
return True
except psutil.TimeoutExpired:
# Force kill if graceful shutdown failed (SIGKILL)
logger.warning(f"PID {pid}: Graceful shutdown timed out, sending SIGKILL")
proc.kill()
proc.wait(timeout=5)
logger.info(f"PID {pid}: Process force killed")
return True
except psutil.NoSuchProcess:
logger.info(f"PID {pid}: Process already terminated")
return True
except Exception as e:
logger.error(f"Error killing PID {pid}: {e}")
return False
def cleanup_temp_files(file_path: str) -> int:
"""
Clean up temporary files associated with a compression job
Args:
file_path: Original video file path
Returns:
Number of files cleaned up
"""
cleaned_count = 0
file_path_obj = Path(file_path)
parent_dir = file_path_obj.parent
try:
# Clean up temp file: temp_<filename>
temp_file = parent_dir / f"temp_{file_path_obj.name}"
if temp_file.exists():
temp_file.unlink()
logger.info(f"Cleaned up temp file: {temp_file.name}")
cleaned_count += 1
# Clean up ffmpeg pass log file
log_file = parent_dir / "ffmpeg2pass-0.log"
if log_file.exists():
log_file.unlink()
logger.info(f"Cleaned up ffmpeg log: {log_file.name}")
cleaned_count += 1
# Also check for mbtree file (created by x264)
mbtree_file = parent_dir / "ffmpeg2pass-0.log.mbtree"
if mbtree_file.exists():
mbtree_file.unlink()
logger.info(f"Cleaned up mbtree file: {mbtree_file.name}")
cleaned_count += 1
except Exception as e:
logger.error(f"Error cleaning up temp files for {file_path}: {e}")
return cleaned_count
def find_orphaned_ffmpeg_processes(allowed_base_path: Path) -> List[dict]:
"""
Find all ffmpeg processes that are working on files within our base path
This helps identify potential orphaned processes
Args:
allowed_base_path: Base path where our videos are stored
Returns:
List of process info dicts
"""
orphaned = []
allowed_path_str = str(allowed_base_path.resolve())
try:
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower():
cmdline = proc.info.get('cmdline', [])
cmdline_str = ' '.join(cmdline) if cmdline else ''
# Check if this ffmpeg is working on a file in our path
if allowed_path_str in cmdline_str:
orphaned.append({
'pid': proc.info['pid'],
'cmdline': cmdline_str,
'name': proc.info['name']
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except Exception as e:
logger.error(f"Error finding orphaned processes: {e}")
return orphaned

View File

@@ -3,3 +3,5 @@ uvicorn[standard]==0.27.0
python-multipart==0.0.6
aiofiles==23.2.1
sse-starlette==1.6.5
aiosqlite==0.19.0
psutil==5.9.8

View File

@@ -8,6 +8,8 @@ services:
container_name: drone-footage-backend
volumes:
- /home/uad/nextcloud/footages:/footages
environment:
- COMPRESSION_WATCHDOG_TIMEOUT=${COMPRESSION_WATCHDOG_TIMEOUT:-900}
restart: unless-stopped
networks:
- drone-footage-network

View File

@@ -21,7 +21,8 @@ FROM nginx:alpine
# Copy built assets from builder stage
COPY --from=builder /app/dist /usr/share/nginx/html
# Copy nginx configuration
# Copy nginx configurations
COPY nginx-main.conf /etc/nginx/nginx.conf
COPY nginx.conf /etc/nginx/conf.d/default.conf
# Expose port

29
frontend/nginx-main.conf Normal file
View File

@@ -0,0 +1,29 @@
user nginx;
worker_processes 2;
error_log /var/log/nginx/error.log notice;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d/*.conf;
}

View File

@@ -5,9 +5,27 @@ server {
root /usr/share/nginx/html;
index index.html;
# Serve static files
# Serve static files with proper cache control
location / {
try_files $uri $uri/ /index.html;
# Security headers for all responses
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
# No cache for index.html - always check for updates
location = /index.html {
add_header Cache-Control "private, no-cache, no-store, must-revalidate";
add_header Pragma "no-cache";
add_header Expires "0";
}
# Cache assets with hash in filename (private for single-user app)
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {
expires 1y;
add_header Cache-Control "private, immutable";
}
}
# Proxy API requests to backend

View File

@@ -7,7 +7,7 @@ import { formatFileSize } from './utils/formatters'
function App() {
const { isHealthy, error: healthError } = useSystemHealth()
const { jobs } = useCompressionJobs()
const { jobs, fetchJobs } = useCompressionJobs()
const [locations, setLocations] = useState([])
const [selectedLocation, setSelectedLocation] = useState(null)
const [dates, setDates] = useState([])
@@ -166,7 +166,20 @@ function App() {
return sorted
}
const getSortedLocations = () => sortItems(locations, locationSort)
const getSortedLocations = () => {
const sorted = sortItems(locations, locationSort)
// Separate dotfiles from regular locations
const regularLocations = sorted.filter(loc => {
const name = loc.name || loc
return !name.startsWith('.')
})
const dotfiles = sorted.filter(loc => {
const name = loc.name || loc
return name.startsWith('.')
})
// Return regular locations first, then dotfiles at the bottom
return [...regularLocations, ...dotfiles]
}
const getSortedDates = () => sortItems(dates, dateSort)
const getSortedFiles = () => sortItems(files, fileSort)
@@ -205,11 +218,13 @@ function App() {
})
if (response.ok) {
// Immediately refresh jobs list to show the new job
await fetchJobs()
const hasActiveJobs = jobs.some(j => ['pending', 'processing', 'validating'].includes(j.status))
if (hasActiveJobs) {
showToast('Compression job queued successfully!', 'success')
showToast(`Compression queued (${percentage}% reduction)`, 'success')
} else {
showToast('Compression job started!', 'success')
showToast(`Started compressing ${selectedFile.name} (${percentage}% reduction)`, 'success')
}
} else {
const error = await response.json()
@@ -261,7 +276,7 @@ function App() {
{selectedFile.is_video ? (
<video
ref={videoRef}
key={getMediaUrl(selectedFile)}
key={selectedFile.name}
controls
preload="metadata"
className="w-full max-h-[70vh]"
@@ -361,12 +376,16 @@ function App() {
<ul className="space-y-1">
{getSortedLocations().map((location) => {
const locationName = location.name || location
const isDotfile = locationName.startsWith('.')
return (
<li key={locationName}>
<button
onClick={() => handleLocationClick(locationName)}
className={`w-full text-left px-3 py-2 rounded hover:bg-blue-50 transition ${
selectedLocation === locationName ? 'bg-blue-100 font-semibold' : ''
onClick={() => !isDotfile && handleLocationClick(locationName)}
disabled={isDotfile}
className={`w-full text-left px-3 py-2 rounded transition ${
isDotfile
? 'text-gray-400 cursor-not-allowed bg-gray-50'
: `hover:bg-blue-50 ${selectedLocation === locationName ? 'bg-blue-100 font-semibold' : ''}`
}`}
>
{locationName}
@@ -409,15 +428,26 @@ function App() {
<ul className="space-y-1">
{getSortedDates().map((date) => {
const dateName = date.name || date
const isRootFiles = dateName === "__root__"
const displayName = isRootFiles ? "All Files" : dateName
const message = date.message || null
return (
<li key={dateName}>
<button
onClick={() => handleDateClick(dateName)}
className={`w-full text-left px-3 py-2 rounded hover:bg-blue-50 transition ${
className={`w-full text-left px-3 py-2 rounded transition ${
isRootFiles
? 'bg-yellow-50 border border-yellow-200 hover:bg-yellow-100'
: 'hover:bg-blue-50'
} ${
selectedDate === dateName ? 'bg-blue-100 font-semibold' : ''
}`}
>
{dateName}
<div className="font-medium">{displayName}</div>
{message && (
<div className="text-xs text-yellow-700 mt-1">{message}</div>
)}
</button>
</li>
)

View File

@@ -31,12 +31,24 @@ export function useCompressionJobs() {
const updates = JSON.parse(event.data)
setJobs(prevJobs => {
const updatedJobs = [...prevJobs]
let hasNewJobs = false
updates.forEach(update => {
const index = updatedJobs.findIndex(j => j.job_id === update.job_id)
if (index !== -1) {
// Update existing job
updatedJobs[index] = { ...updatedJobs[index], ...update }
} else {
// New job detected - we need complete data
hasNewJobs = true
}
})
// Trigger full fetch if new jobs detected
if (hasNewJobs) {
fetchJobs()
}
return updatedJobs
})
})
@@ -44,7 +56,9 @@ export function useCompressionJobs() {
eventSource.onerror = () => {
console.error('SSE connection error')
eventSource.close()
setTimeout(connectSSE, 5000) // Reconnect after 5s
setTimeout(() => {
connectSSE()
}, 5000)
}
eventSourceRef.current = eventSource

View File

@@ -53,7 +53,9 @@ export function useSystemHealth() {
console.error('System health SSE connection error')
eventSource.close()
// Reconnect after 10 seconds
setTimeout(connectSSE, 10000)
setTimeout(() => {
connectSSE()
}, 10000)
}
eventSourceRef.current = eventSource

View File

@@ -11,7 +11,7 @@ export const formatFileSize = (bytes) => {
}
export const formatETA = (seconds) => {
if (!seconds) return '--'
if (seconds == null || seconds === undefined) return '--'
const mins = Math.floor(seconds / 60)
const secs = seconds % 60
return `${mins}m ${secs}s`