Compare commits
9 Commits
dec49a43f9
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92338df716 | ||
|
|
80ca19cb4b | ||
|
|
91141eadcf | ||
|
|
d8242d47b9 | ||
|
|
f499691345 | ||
|
|
7db829d10c | ||
|
|
dd5fe1617a | ||
|
|
2acb4d9f4e | ||
|
|
a7b7ad41e9 |
@@ -5,7 +5,8 @@
|
||||
"mcp__playwright__browser_console_messages",
|
||||
"mcp__playwright__browser_click",
|
||||
"mcp__playwright__browser_evaluate",
|
||||
"mcp__playwright__browser_navigate"
|
||||
"mcp__playwright__browser_navigate",
|
||||
"mcp__playwright__browser_wait_for"
|
||||
],
|
||||
"deny": [],
|
||||
"ask": []
|
||||
|
||||
1
CLAUDE.md
Normal file
1
CLAUDE.md
Normal file
@@ -0,0 +1 @@
|
||||
- app is running as docker container, so everytime there are need to reflect changes we have to docker compose down & build & up -d
|
||||
@@ -4,9 +4,11 @@ import uuid
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
from job_persistence import JobPersistence
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
@@ -20,16 +22,18 @@ MAX_CONCURRENT_JOBS = 2
|
||||
MAX_STORED_JOBS = 100
|
||||
MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue
|
||||
PRUNE_INTERVAL_SECONDS = 300 # 5 minutes
|
||||
WATCHDOG_TIMEOUT_SECONDS = 300 # 5 minutes
|
||||
WATCHDOG_TIMEOUT_SECONDS = int(os.getenv('COMPRESSION_WATCHDOG_TIMEOUT', '900')) # Default: 15 minutes
|
||||
WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds
|
||||
STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB
|
||||
STDERR_READ_LIMIT = 8 * 1024 * 1024 # 8MB buffer limit for stderr
|
||||
MAX_STDERR_LINES = 1000 # Keep only last 1000 lines of stderr
|
||||
DEFAULT_AUDIO_BITRATE = 128 # kbps
|
||||
MIN_VIDEO_BITRATE = 100 # kbps
|
||||
|
||||
|
||||
class CompressionJob:
|
||||
def __init__(self, file_path: str, reduce_percentage: int):
|
||||
self.job_id = str(uuid.uuid4())
|
||||
def __init__(self, file_path: str, reduce_percentage: int, job_id: Optional[str] = None):
|
||||
self.job_id = job_id or str(uuid.uuid4())
|
||||
self.file_path = file_path
|
||||
self.reduce_percentage = reduce_percentage
|
||||
self.status = "pending" # pending, processing, validating, completed, failed, cancelled
|
||||
@@ -45,6 +49,59 @@ class CompressionJob:
|
||||
self.target_size_mb = None
|
||||
self.video_bitrate = None
|
||||
self.duration_seconds = None
|
||||
self.ffmpeg_pid = None # Track ffmpeg process PID
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
"""Convert job to dictionary for database persistence"""
|
||||
return {
|
||||
'job_id': self.job_id,
|
||||
'file_path': self.file_path,
|
||||
'reduce_percentage': self.reduce_percentage,
|
||||
'status': self.status,
|
||||
'progress': self.progress,
|
||||
'eta_seconds': self.eta_seconds,
|
||||
'current_pass': self.current_pass,
|
||||
'created_at': self.created_at,
|
||||
'started_at': self.started_at,
|
||||
'completed_at': self.completed_at,
|
||||
'error': self.error,
|
||||
'output_file': self.output_file,
|
||||
'current_size_mb': self.current_size_mb,
|
||||
'target_size_mb': self.target_size_mb,
|
||||
'video_bitrate': self.video_bitrate,
|
||||
'duration_seconds': self.duration_seconds,
|
||||
'ffmpeg_pid': self.ffmpeg_pid
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict) -> 'CompressionJob':
|
||||
"""Create job from database dictionary"""
|
||||
job = cls(
|
||||
file_path=data['file_path'],
|
||||
reduce_percentage=data['reduce_percentage'],
|
||||
job_id=data['job_id']
|
||||
)
|
||||
job.status = data.get('status', 'pending')
|
||||
job.progress = data.get('progress', 0.0)
|
||||
job.eta_seconds = data.get('eta_seconds')
|
||||
job.current_pass = data.get('current_pass', 0)
|
||||
job.error = data.get('error')
|
||||
job.output_file = data.get('output_file')
|
||||
job.current_size_mb = data.get('current_size_mb')
|
||||
job.target_size_mb = data.get('target_size_mb')
|
||||
job.video_bitrate = data.get('video_bitrate')
|
||||
job.duration_seconds = data.get('duration_seconds')
|
||||
job.ffmpeg_pid = data.get('ffmpeg_pid')
|
||||
|
||||
# Parse datetime strings back to datetime objects
|
||||
for key in ['created_at', 'started_at', 'completed_at']:
|
||||
if data.get(key):
|
||||
if isinstance(data[key], str):
|
||||
job.__dict__[key] = datetime.fromisoformat(data[key])
|
||||
else:
|
||||
job.__dict__[key] = data[key]
|
||||
|
||||
return job
|
||||
|
||||
|
||||
class CompressionManager:
|
||||
@@ -52,15 +109,36 @@ class CompressionManager:
|
||||
TIME_PATTERN = re.compile(r'out_time_ms=(\d+)')
|
||||
FPS_PATTERN = re.compile(r'fps=([\d.]+)')
|
||||
|
||||
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None):
|
||||
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None, cache_invalidation_callback=None):
|
||||
self.jobs: Dict[str, CompressionJob] = {}
|
||||
self.active_jobs: Dict[str, asyncio.Task] = {}
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent)
|
||||
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
|
||||
self.health_checker = health_checker
|
||||
self.cache_invalidation_callback = cache_invalidation_callback
|
||||
self._pruning_task: Optional[asyncio.Task] = None
|
||||
self._jobs_lock = asyncio.Lock()
|
||||
self._start_periodic_pruning()
|
||||
self.persistence = JobPersistence()
|
||||
self._pruning_started = False
|
||||
|
||||
async def initialize_persistence(self):
|
||||
"""Initialize the persistence layer - call this on startup"""
|
||||
await self.persistence.initialize()
|
||||
logger.info("Job persistence initialized")
|
||||
|
||||
# Start periodic pruning (only once when persistence is initialized)
|
||||
if not self._pruning_started:
|
||||
self._start_periodic_pruning()
|
||||
self._pruning_started = True
|
||||
|
||||
async def load_jobs_from_database(self):
|
||||
"""Load all jobs from database into memory"""
|
||||
async with self._jobs_lock:
|
||||
jobs_data = await self.persistence.get_all_jobs()
|
||||
for job_data in jobs_data:
|
||||
job = CompressionJob.from_dict(job_data)
|
||||
self.jobs[job.job_id] = job
|
||||
logger.info(f"Loaded {len(jobs_data)} jobs from database")
|
||||
|
||||
async def get_video_info(self, file_path: str) -> Dict:
|
||||
"""Extract video duration and file size using ffprobe"""
|
||||
@@ -80,7 +158,7 @@ class CompressionManager:
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, _ = await result.communicate()
|
||||
duration = int(float(stdout.decode().strip()))
|
||||
duration = float(stdout.decode().strip())
|
||||
|
||||
return {
|
||||
'size_mb': file_size_mb,
|
||||
@@ -89,7 +167,7 @@ class CompressionManager:
|
||||
|
||||
def calculate_bitrates(self, current_size_mb: float,
|
||||
target_size_mb: float,
|
||||
duration_seconds: int,
|
||||
duration_seconds: float,
|
||||
audio_bitrate: int = DEFAULT_AUDIO_BITRATE) -> int:
|
||||
"""Calculate video bitrate based on target size"""
|
||||
# Total bitrate in kbps
|
||||
@@ -104,6 +182,7 @@ class CompressionManager:
|
||||
try:
|
||||
job.status = "processing"
|
||||
job.started_at = datetime.now()
|
||||
await self.persistence.save_job(job.to_dict())
|
||||
|
||||
# Get video information
|
||||
info = await self.get_video_info(job.file_path)
|
||||
@@ -123,10 +202,32 @@ class CompressionManager:
|
||||
f"Duration: {job.duration_seconds}s, "
|
||||
f"Video Bitrate: {job.video_bitrate} kbps")
|
||||
|
||||
# Generate output filename
|
||||
# Check disk space BEFORE starting compression
|
||||
file_path = Path(job.file_path)
|
||||
disk_usage = shutil.disk_usage(file_path.parent)
|
||||
|
||||
# Estimate required space: temp file + original + safety margin (1.5x original)
|
||||
required_bytes = (job.current_size_mb * 1024 * 1024) * 1.5
|
||||
available_bytes = disk_usage.free
|
||||
|
||||
if available_bytes < required_bytes:
|
||||
required_gb = required_bytes / (1024**3)
|
||||
available_gb = available_bytes / (1024**3)
|
||||
error_msg = (
|
||||
f"Insufficient disk space: need {required_gb:.1f}GB, "
|
||||
f"have {available_gb:.1f}GB available"
|
||||
)
|
||||
logger.error(f"Job {job.job_id}: {error_msg}")
|
||||
raise ValueError(error_msg)
|
||||
|
||||
logger.info(
|
||||
f"Job {job.job_id}: Disk space check passed "
|
||||
f"(need {required_bytes/(1024**3):.1f}GB, have {available_bytes/(1024**3):.1f}GB)"
|
||||
)
|
||||
|
||||
# Generate output filename (use original path for in-place replacement)
|
||||
file_path = Path(job.file_path)
|
||||
temp_file = file_path.parent / f"temp_{file_path.name}"
|
||||
output_file = file_path.parent / f"{file_path.stem}_compressed_{job.reduce_percentage}{file_path.suffix}"
|
||||
|
||||
# PASS 1: Analysis
|
||||
job.current_pass = 1
|
||||
@@ -144,31 +245,42 @@ class CompressionManager:
|
||||
self.cleanup_temp_files(job)
|
||||
return
|
||||
|
||||
# VALIDATION
|
||||
# SAFE FILE REPLACEMENT WITH VALIDATION
|
||||
job.status = "validating"
|
||||
job.progress = 95.0
|
||||
if await self.validate_video(temp_file):
|
||||
# Move temp file to final output
|
||||
os.rename(temp_file, output_file)
|
||||
job.output_file = str(output_file)
|
||||
await self.persistence.update_job_status(job.job_id, "validating", progress=95.0)
|
||||
logger.info(f"Job {job.job_id}: Starting safe file replacement process")
|
||||
|
||||
if await self.safe_replace_file(file_path, temp_file, job):
|
||||
job.output_file = str(file_path) # Output is same as original path
|
||||
job.status = "completed"
|
||||
job.progress = 100.0
|
||||
job.completed_at = datetime.now()
|
||||
job.ffmpeg_pid = None # Clear PID on completion
|
||||
await self.persistence.save_job(job.to_dict())
|
||||
self.cleanup_temp_files(job)
|
||||
logger.info(f"Job {job.job_id} completed successfully")
|
||||
logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place")
|
||||
else:
|
||||
job.status = "failed"
|
||||
job.error = "Validation failed: Compressed video is corrupted"
|
||||
job.error = "File replacement failed or validation check failed"
|
||||
await self.persistence.update_job_status(job.job_id, "failed", error=job.error)
|
||||
self.cleanup_temp_files(job)
|
||||
logger.error(f"Job {job.job_id} failed validation")
|
||||
logger.error(f"Job {job.job_id} failed - file replacement unsuccessful")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
job.status = "cancelled"
|
||||
# Don't mark as cancelled - could be app shutdown
|
||||
# Keep current status so job can be recovered on restart
|
||||
job.ffmpeg_pid = None # Clear PID since process won't survive
|
||||
# Don't update database status - leave it as "processing" or "validating"
|
||||
# for recovery on restart
|
||||
self.cleanup_temp_files(job)
|
||||
logger.info(f"Job {job.job_id} cancelled")
|
||||
logger.info(f"Job {job.job_id} interrupted (task cancelled)")
|
||||
raise # Re-raise to properly cancel the task
|
||||
except Exception as e:
|
||||
job.status = "failed"
|
||||
job.error = str(e)
|
||||
job.ffmpeg_pid = None # Clear PID on failure
|
||||
await self.persistence.update_job_status(job.job_id, "failed", error=str(e))
|
||||
self.cleanup_temp_files(job)
|
||||
logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
|
||||
|
||||
@@ -210,18 +322,29 @@ class CompressionManager:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow
|
||||
)
|
||||
|
||||
# Track PID for first pass only (second pass uses same job)
|
||||
if pass_num == 1 and process.pid:
|
||||
job.ffmpeg_pid = process.pid
|
||||
await self.persistence.set_ffmpeg_pid(job.job_id, process.pid)
|
||||
logger.info(f"Job {job.job_id}: ffmpeg PID {process.pid} tracked")
|
||||
|
||||
# Progress tracking
|
||||
stderr_output = []
|
||||
last_progress_update = datetime.now()
|
||||
last_progress_update = None # Initialize as None, will be set when FFmpeg starts producing output
|
||||
|
||||
async def read_stdout():
|
||||
"""Read and process stdout for progress updates"""
|
||||
nonlocal last_progress_update
|
||||
try:
|
||||
async for line in process.stdout:
|
||||
# Initialize timer on first progress update from FFmpeg
|
||||
if last_progress_update is None:
|
||||
last_progress_update = datetime.now()
|
||||
logger.info(f"Job {job.job_id}: FFmpeg started producing output, watchdog timer initialized")
|
||||
if job.status == "cancelled":
|
||||
process.kill()
|
||||
return
|
||||
@@ -256,6 +379,16 @@ class CompressionManager:
|
||||
else:
|
||||
remaining_sec = remaining_sec_current_pass
|
||||
job.eta_seconds = int(remaining_sec)
|
||||
|
||||
# Periodically save progress to database (every ~5% or 30 seconds)
|
||||
# Use modulo to reduce DB writes
|
||||
if int(job.progress) % 5 == 0:
|
||||
await self.persistence.update_job_progress(
|
||||
job.job_id,
|
||||
job.progress,
|
||||
job.eta_seconds,
|
||||
job.current_pass
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading stdout: {e}")
|
||||
|
||||
@@ -265,6 +398,11 @@ class CompressionManager:
|
||||
async for line in process.stderr:
|
||||
line_str = line.decode('utf-8', errors='ignore')
|
||||
stderr_output.append(line_str)
|
||||
|
||||
# Truncate stderr_output to keep only last MAX_STDERR_LINES lines
|
||||
if len(stderr_output) > MAX_STDERR_LINES:
|
||||
stderr_output[:] = stderr_output[-MAX_STDERR_LINES:]
|
||||
|
||||
# Log important stderr messages
|
||||
if 'error' in line_str.lower():
|
||||
logger.error(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
||||
@@ -277,20 +415,22 @@ class CompressionManager:
|
||||
"""Monitor for stuck jobs - kills process if no progress"""
|
||||
nonlocal last_progress_update
|
||||
while process.returncode is None:
|
||||
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
|
||||
|
||||
if job.status == "cancelled":
|
||||
process.kill()
|
||||
return
|
||||
|
||||
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
||||
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
|
||||
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
||||
logger.error(f"Watchdog: {error_msg}")
|
||||
job.status = "failed"
|
||||
job.error = error_msg
|
||||
process.kill()
|
||||
raise Exception(error_msg)
|
||||
# Only check timeout if FFmpeg has started producing output
|
||||
if last_progress_update is not None:
|
||||
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
||||
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
|
||||
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
||||
logger.error(f"Watchdog: {error_msg}")
|
||||
job.status = "failed"
|
||||
job.error = error_msg
|
||||
process.kill()
|
||||
raise Exception(error_msg)
|
||||
|
||||
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
|
||||
|
||||
# Run stdout, stderr readers and watchdog concurrently to prevent deadlock
|
||||
try:
|
||||
@@ -340,6 +480,134 @@ class CompressionManager:
|
||||
# Also check return code
|
||||
return process.returncode == 0
|
||||
|
||||
async def validate_video_enhanced(self, original_path: Path, compressed_path: Path) -> bool:
|
||||
"""Enhanced validation with size and duration checks before file replacement"""
|
||||
# 1. Basic ffmpeg decode validation
|
||||
if not await self.validate_video(compressed_path):
|
||||
logger.error(f"Enhanced validation failed: FFmpeg decode check failed")
|
||||
return False
|
||||
|
||||
# 2. File size sanity check (minimum 1KB)
|
||||
try:
|
||||
compressed_size = os.path.getsize(str(compressed_path))
|
||||
if compressed_size < 1024:
|
||||
logger.error(f"Enhanced validation failed: Compressed file too small ({compressed_size} bytes)")
|
||||
return False
|
||||
logger.info(f"Enhanced validation: Size check passed ({compressed_size} bytes)")
|
||||
except Exception as e:
|
||||
logger.error(f"Enhanced validation failed: Could not check file size: {e}")
|
||||
return False
|
||||
|
||||
# 3. Duration verification (optional but recommended)
|
||||
try:
|
||||
original_info = await self.get_video_info(str(original_path))
|
||||
compressed_info = await self.get_video_info(str(compressed_path))
|
||||
|
||||
duration_diff = abs(original_info['duration_seconds'] - compressed_info['duration_seconds'])
|
||||
# Allow up to 1 second difference due to rounding
|
||||
if duration_diff > 1.0:
|
||||
logger.error(f"Enhanced validation failed: Duration mismatch - "
|
||||
f"original: {original_info['duration_seconds']:.2f}s vs "
|
||||
f"compressed: {compressed_info['duration_seconds']:.2f}s "
|
||||
f"(diff: {duration_diff:.2f}s)")
|
||||
return False
|
||||
logger.info(f"Enhanced validation: Duration check passed "
|
||||
f"(original: {original_info['duration_seconds']:.2f}s, "
|
||||
f"compressed: {compressed_info['duration_seconds']:.2f}s)")
|
||||
except Exception as e:
|
||||
logger.error(f"Enhanced validation failed: Could not verify duration: {e}")
|
||||
return False
|
||||
|
||||
logger.info(f"Enhanced validation: All checks passed for {compressed_path.name}")
|
||||
return True
|
||||
|
||||
async def safe_replace_file(self, original_path: Path, temp_file: Path, job: CompressionJob) -> bool:
|
||||
"""Safely replace original file with compressed version with rollback capability"""
|
||||
backup_path = original_path.parent / f"{original_path.name}.backup"
|
||||
|
||||
try:
|
||||
# Enhanced validation BEFORE any file operations
|
||||
logger.info(f"Validating compressed file before replacement: {temp_file.name}")
|
||||
if not await self.validate_video_enhanced(original_path, temp_file):
|
||||
logger.error(f"Safe replace failed: Validation check failed")
|
||||
return False
|
||||
|
||||
# Step 1: Create backup of original file
|
||||
logger.info(f"Creating backup: {original_path.name} -> {backup_path.name}")
|
||||
try:
|
||||
os.replace(str(original_path), str(backup_path))
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace failed: Could not create backup: {e}")
|
||||
return False
|
||||
|
||||
# Step 2: Replace original with compressed file (atomic operation)
|
||||
logger.info(f"Replacing original file with compressed version: {temp_file.name}")
|
||||
try:
|
||||
os.replace(str(temp_file), str(original_path))
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace failed: Could not move compressed file. Attempting rollback...")
|
||||
# Rollback: restore original from backup
|
||||
try:
|
||||
if backup_path.exists():
|
||||
os.replace(str(backup_path), str(original_path))
|
||||
logger.info(f"Rollback successful: Original file restored")
|
||||
except Exception as rollback_error:
|
||||
logger.error(f"Rollback failed: {rollback_error}")
|
||||
return False
|
||||
|
||||
# Step 3: Verify final file is accessible
|
||||
logger.info(f"Verifying replaced file is accessible: {original_path.name}")
|
||||
try:
|
||||
if not original_path.exists() or not original_path.is_file():
|
||||
logger.error(f"Safe replace failed: Replaced file not accessible. Attempting rollback...")
|
||||
# Rollback: restore original from backup
|
||||
try:
|
||||
if backup_path.exists():
|
||||
os.replace(str(backup_path), str(original_path))
|
||||
logger.info(f"Rollback successful: Original file restored")
|
||||
except Exception as rollback_error:
|
||||
logger.error(f"Rollback failed: {rollback_error}")
|
||||
return False
|
||||
|
||||
# Get final file size
|
||||
final_size = os.path.getsize(str(original_path))
|
||||
logger.info(f"File replacement successful. Final size: {final_size / (1024*1024):.2f} MB")
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace failed: Could not verify final file: {e}")
|
||||
return False
|
||||
|
||||
# Step 4: Delete backup file only after successful verification
|
||||
logger.info(f"Deleting backup file: {backup_path.name}")
|
||||
try:
|
||||
if backup_path.exists():
|
||||
backup_path.unlink()
|
||||
logger.info(f"Backup file deleted successfully")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete backup file (non-critical): {e}")
|
||||
|
||||
# Step 5: Invalidate cache for this directory to show updated file size
|
||||
if self.cache_invalidation_callback:
|
||||
try:
|
||||
# Invalidate all cache entries containing the parent directory path
|
||||
parent_name = original_path.parent.name
|
||||
self.cache_invalidation_callback(parent_name)
|
||||
logger.info(f"Cache invalidated for directory: {parent_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to invalidate cache (non-critical): {e}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace encountered unexpected error: {e}", exc_info=True)
|
||||
# Attempt rollback on any unexpected error
|
||||
try:
|
||||
if backup_path.exists():
|
||||
os.replace(str(backup_path), str(original_path))
|
||||
logger.info(f"Rollback successful: Original file restored after unexpected error")
|
||||
except Exception as rollback_error:
|
||||
logger.error(f"Rollback failed: {rollback_error}")
|
||||
return False
|
||||
|
||||
def cleanup_temp_files(self, job: CompressionJob):
|
||||
"""Clean up temporary files"""
|
||||
file_path = Path(job.file_path)
|
||||
@@ -370,14 +638,14 @@ class CompressionManager:
|
||||
self._pruning_task = asyncio.create_task(prune_periodically())
|
||||
|
||||
def _prune_old_jobs(self, max_jobs: int = MAX_STORED_JOBS):
|
||||
"""Remove oldest completed jobs if total exceeds max_jobs"""
|
||||
"""Remove oldest completed/cancelled jobs if total exceeds max_jobs. Failed jobs are kept visible."""
|
||||
if len(self.jobs) <= max_jobs:
|
||||
return
|
||||
|
||||
# Get inactive completed jobs sorted by time
|
||||
# Get inactive completed/cancelled jobs sorted by time (exclude failed jobs)
|
||||
inactive = [
|
||||
(jid, j) for jid, j in self.jobs.items()
|
||||
if jid not in self.active_jobs and j.status in ['completed', 'failed', 'cancelled']
|
||||
if jid not in self.active_jobs and j.status in ['completed', 'cancelled']
|
||||
]
|
||||
inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at)
|
||||
|
||||
@@ -386,9 +654,10 @@ class CompressionManager:
|
||||
for jid, _ in inactive[:num_to_remove]:
|
||||
self.jobs.pop(jid, None)
|
||||
|
||||
def get_jobs_snapshot(self) -> list:
|
||||
async def get_jobs_snapshot(self) -> list:
|
||||
"""Get a safe snapshot of all jobs for iteration"""
|
||||
return list(self.jobs.values())
|
||||
async with self._jobs_lock:
|
||||
return list(self.jobs.values())
|
||||
|
||||
def get_pending_count(self) -> int:
|
||||
"""Get count of pending jobs"""
|
||||
@@ -434,6 +703,9 @@ class CompressionManager:
|
||||
job = CompressionJob(file_path, reduce_percentage)
|
||||
self.jobs[job.job_id] = job
|
||||
|
||||
# Save initial job to database
|
||||
await self.persistence.save_job(job.to_dict())
|
||||
|
||||
# Start compression in background
|
||||
task = asyncio.create_task(self.compress_video(job))
|
||||
self.active_jobs[job.job_id] = task
|
||||
@@ -444,8 +716,21 @@ class CompressionManager:
|
||||
return job.job_id
|
||||
|
||||
async def cancel_job(self, job_id: str):
|
||||
"""Cancel a running compression job"""
|
||||
"""Cancel a running compression job (user-initiated)"""
|
||||
if job_id in self.jobs:
|
||||
self.jobs[job_id].status = "cancelled"
|
||||
# Save cancelled status to database (user-initiated cancellation)
|
||||
await self.persistence.update_job_status(job_id, "cancelled")
|
||||
if job_id in self.active_jobs:
|
||||
self.active_jobs[job_id].cancel()
|
||||
|
||||
async def remove_job(self, job_id: str):
|
||||
"""Remove a job from the list (for failed/completed jobs)"""
|
||||
if job_id in self.jobs:
|
||||
job = self.jobs[job_id]
|
||||
# Only allow removal of inactive jobs
|
||||
if job_id not in self.active_jobs:
|
||||
async with self._jobs_lock:
|
||||
self.jobs.pop(job_id, None)
|
||||
return True
|
||||
return False
|
||||
|
||||
243
backend/job_persistence.py
Normal file
243
backend/job_persistence.py
Normal file
@@ -0,0 +1,243 @@
|
||||
import sqlite3
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Any
|
||||
from contextlib import asynccontextmanager
|
||||
import aiosqlite
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Database configuration from environment
|
||||
DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db')
|
||||
JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7'))
|
||||
|
||||
|
||||
class JobPersistence:
|
||||
"""SQLite-based persistence layer for compression jobs"""
|
||||
|
||||
def __init__(self, db_path: str = DATABASE_PATH):
|
||||
self.db_path = db_path
|
||||
self._initialized = False
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize database schema"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
async with self._lock:
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Ensure directory exists
|
||||
db_dir = Path(self.db_path).parent
|
||||
db_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute("""
|
||||
CREATE TABLE IF NOT EXISTS compression_jobs (
|
||||
job_id TEXT PRIMARY KEY,
|
||||
file_path TEXT NOT NULL,
|
||||
reduce_percentage INTEGER NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
progress REAL DEFAULT 0.0,
|
||||
current_pass INTEGER DEFAULT 0,
|
||||
current_size_mb REAL,
|
||||
target_size_mb REAL,
|
||||
video_bitrate INTEGER,
|
||||
duration_seconds REAL,
|
||||
eta_seconds INTEGER,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
started_at TIMESTAMP,
|
||||
completed_at TIMESTAMP,
|
||||
output_file TEXT,
|
||||
error TEXT,
|
||||
ffmpeg_pid INTEGER
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for performance
|
||||
await db.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_status
|
||||
ON compression_jobs(status)
|
||||
""")
|
||||
|
||||
await db.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_created_at
|
||||
ON compression_jobs(created_at)
|
||||
""")
|
||||
|
||||
await db.commit()
|
||||
|
||||
self._initialized = True
|
||||
logger.info(f"Job persistence initialized at {self.db_path}")
|
||||
|
||||
async def save_job(self, job_data: Dict[str, Any]):
|
||||
"""Insert or update a job in the database"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
# Convert datetime objects to ISO format strings
|
||||
job_data_copy = job_data.copy()
|
||||
for key in ['created_at', 'started_at', 'completed_at']:
|
||||
if key in job_data_copy and job_data_copy[key]:
|
||||
if isinstance(job_data_copy[key], datetime):
|
||||
job_data_copy[key] = job_data_copy[key].isoformat()
|
||||
|
||||
await db.execute("""
|
||||
INSERT OR REPLACE INTO compression_jobs (
|
||||
job_id, file_path, reduce_percentage, status, progress,
|
||||
current_pass, current_size_mb, target_size_mb, video_bitrate,
|
||||
duration_seconds, eta_seconds, created_at, started_at,
|
||||
completed_at, output_file, error, ffmpeg_pid
|
||||
) VALUES (
|
||||
:job_id, :file_path, :reduce_percentage, :status, :progress,
|
||||
:current_pass, :current_size_mb, :target_size_mb, :video_bitrate,
|
||||
:duration_seconds, :eta_seconds, :created_at, :started_at,
|
||||
:completed_at, :output_file, :error, :ffmpeg_pid
|
||||
)
|
||||
""", job_data_copy)
|
||||
await db.commit()
|
||||
|
||||
async def update_job_status(self, job_id: str, status: str,
|
||||
progress: Optional[float] = None,
|
||||
error: Optional[str] = None):
|
||||
"""Quick update for job status and progress"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
updates = {"status": status}
|
||||
if progress is not None:
|
||||
updates["progress"] = progress
|
||||
if error is not None:
|
||||
updates["error"] = error
|
||||
|
||||
# Set completed_at for terminal states
|
||||
if status in ['completed', 'failed', 'cancelled']:
|
||||
updates["completed_at"] = datetime.now().isoformat()
|
||||
|
||||
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
|
||||
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
|
||||
|
||||
await db.execute(query, {**updates, "job_id": job_id})
|
||||
await db.commit()
|
||||
|
||||
async def update_job_progress(self, job_id: str, progress: float,
|
||||
eta_seconds: Optional[int] = None,
|
||||
current_pass: Optional[int] = None):
|
||||
"""Update job progress and ETA"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
updates = {"progress": progress}
|
||||
if eta_seconds is not None:
|
||||
updates["eta_seconds"] = eta_seconds
|
||||
if current_pass is not None:
|
||||
updates["current_pass"] = current_pass
|
||||
|
||||
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
|
||||
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
|
||||
|
||||
await db.execute(query, {**updates, "job_id": job_id})
|
||||
await db.commit()
|
||||
|
||||
async def set_ffmpeg_pid(self, job_id: str, pid: int):
|
||||
"""Update the ffmpeg PID for a job"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?",
|
||||
(pid, job_id)
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get a single job by ID"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"SELECT * FROM compression_jobs WHERE job_id = ?",
|
||||
(job_id,)
|
||||
) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if row:
|
||||
return dict(row)
|
||||
return None
|
||||
|
||||
async def get_all_jobs(self) -> List[Dict[str, Any]]:
|
||||
"""Get all jobs from the database"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"SELECT * FROM compression_jobs ORDER BY created_at DESC"
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]:
|
||||
"""Get all jobs with a specific status"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC",
|
||||
(status,)
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def get_interrupted_jobs(self) -> List[Dict[str, Any]]:
|
||||
"""Get jobs that were interrupted (status=processing/validating)"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"""SELECT * FROM compression_jobs
|
||||
WHERE status IN ('processing', 'validating')
|
||||
ORDER BY created_at ASC"""
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def delete_job(self, job_id: str):
|
||||
"""Delete a job from the database"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"DELETE FROM compression_jobs WHERE job_id = ?",
|
||||
(job_id,)
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS):
|
||||
"""Delete completed/cancelled jobs older than retention period"""
|
||||
await self.initialize()
|
||||
|
||||
cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60)
|
||||
cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
result = await db.execute("""
|
||||
DELETE FROM compression_jobs
|
||||
WHERE status IN ('completed', 'cancelled')
|
||||
AND completed_at < ?
|
||||
""", (cutoff_iso,))
|
||||
|
||||
deleted_count = result.rowcount
|
||||
await db.commit()
|
||||
|
||||
if deleted_count > 0:
|
||||
logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)")
|
||||
|
||||
return deleted_count
|
||||
222
backend/main.py
222
backend/main.py
@@ -2,7 +2,7 @@ from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import FileResponse, StreamingResponse, Response
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
from typing import List, Dict, Optional, Any
|
||||
from pydantic import BaseModel
|
||||
import os
|
||||
from datetime import datetime
|
||||
@@ -12,9 +12,15 @@ import asyncio
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
import mimetypes
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
from compression import CompressionManager
|
||||
from filesystem_health import FilesystemHealthChecker
|
||||
from process_utils import (
|
||||
kill_process_safely,
|
||||
cleanup_temp_files,
|
||||
find_orphaned_ffmpeg_processes
|
||||
)
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -26,16 +32,20 @@ STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for video streaming
|
||||
SSE_UPDATE_INTERVAL = 0.5 # Update every 500ms
|
||||
CACHE_TTL_SECONDS = 60 # Cache directory listings for 60 seconds
|
||||
|
||||
# Job recovery configuration
|
||||
AUTO_RESTART_INTERRUPTED_JOBS = os.getenv('AUTO_RESTART_INTERRUPTED_JOBS', 'true').lower() == 'true'
|
||||
CLEANUP_ORPHANED_PROCESSES = os.getenv('CLEANUP_ORPHANED_PROCESSES', 'true').lower() == 'true'
|
||||
|
||||
# Base path for footages
|
||||
FOOTAGES_PATH = Path("/footages")
|
||||
|
||||
# Simple in-memory cache for directory listings
|
||||
class SimpleCache:
|
||||
def __init__(self, ttl_seconds: int = CACHE_TTL_SECONDS):
|
||||
self.cache: Dict[str, tuple[float, any]] = {}
|
||||
self.cache: Dict[str, tuple[float, Any]] = {}
|
||||
self.ttl = ttl_seconds
|
||||
|
||||
def get(self, key: str) -> Optional[any]:
|
||||
def get(self, key: str) -> Optional[Any]:
|
||||
if key in self.cache:
|
||||
timestamp, value = self.cache[key]
|
||||
if time.time() - timestamp < self.ttl:
|
||||
@@ -44,22 +54,32 @@ class SimpleCache:
|
||||
del self.cache[key]
|
||||
return None
|
||||
|
||||
def set(self, key: str, value: any):
|
||||
def set(self, key: str, value: Any):
|
||||
self.cache[key] = (time.time(), value)
|
||||
|
||||
def clear(self):
|
||||
self.cache.clear()
|
||||
|
||||
def invalidate(self, pattern: str = None):
|
||||
"""Invalidate cache entries matching pattern or all if pattern is None"""
|
||||
if pattern is None:
|
||||
self.cache.clear()
|
||||
else:
|
||||
keys_to_delete = [k for k in self.cache.keys() if pattern in k]
|
||||
for key in keys_to_delete:
|
||||
del self.cache[key]
|
||||
|
||||
directory_cache = SimpleCache()
|
||||
|
||||
# Initialize filesystem health checker first (compression manager needs it)
|
||||
filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH)
|
||||
|
||||
# Initialize compression manager with health checker
|
||||
# Initialize compression manager with health checker and cache callback
|
||||
compression_manager = CompressionManager(
|
||||
max_concurrent=1,
|
||||
allowed_base_path=FOOTAGES_PATH,
|
||||
health_checker=filesystem_health_checker
|
||||
health_checker=filesystem_health_checker,
|
||||
cache_invalidation_callback=directory_cache.invalidate
|
||||
)
|
||||
|
||||
# CORS middleware for frontend communication
|
||||
@@ -95,6 +115,116 @@ async def get_file_info(file_path: Path) -> Dict:
|
||||
}
|
||||
|
||||
|
||||
async def recover_compression_jobs():
|
||||
"""
|
||||
Recover compression jobs from database after app restart/crash
|
||||
|
||||
This function:
|
||||
1. Loads all jobs from database
|
||||
2. Handles interrupted jobs (processing/validating)
|
||||
3. Cleans up orphaned ffmpeg processes
|
||||
4. Restarts pending jobs automatically
|
||||
"""
|
||||
logger.info("=" * 60)
|
||||
logger.info("Starting compression job recovery...")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# Initialize persistence layer
|
||||
await compression_manager.initialize_persistence()
|
||||
|
||||
# Load all jobs from database
|
||||
await compression_manager.load_jobs_from_database()
|
||||
|
||||
# Get interrupted jobs (were processing when app crashed)
|
||||
interrupted_jobs = await compression_manager.persistence.get_interrupted_jobs()
|
||||
|
||||
if interrupted_jobs:
|
||||
logger.info(f"Found {len(interrupted_jobs)} interrupted jobs")
|
||||
|
||||
for job_data in interrupted_jobs:
|
||||
job_id = job_data['job_id']
|
||||
file_path = job_data['file_path']
|
||||
ffmpeg_pid = job_data.get('ffmpeg_pid')
|
||||
|
||||
logger.info(f"Processing interrupted job {job_id}: {Path(file_path).name}")
|
||||
|
||||
# Kill orphaned ffmpeg process if it exists and is verified as ours
|
||||
if ffmpeg_pid and CLEANUP_ORPHANED_PROCESSES:
|
||||
killed = kill_process_safely(ffmpeg_pid, file_path, timeout=10)
|
||||
if killed:
|
||||
logger.info(f" ✓ Killed orphaned ffmpeg process PID {ffmpeg_pid}")
|
||||
else:
|
||||
logger.warning(f" ⚠ Could not verify/kill PID {ffmpeg_pid}")
|
||||
|
||||
# Clean up temp files
|
||||
cleaned = cleanup_temp_files(file_path)
|
||||
if cleaned > 0:
|
||||
logger.info(f" ✓ Cleaned up {cleaned} temp file(s)")
|
||||
|
||||
# Decide what to do with interrupted job
|
||||
if AUTO_RESTART_INTERRUPTED_JOBS:
|
||||
# Restart the job
|
||||
logger.info(f" ⟳ Restarting interrupted job...")
|
||||
await compression_manager.persistence.update_job_status(
|
||||
job_id, "pending", progress=0.0
|
||||
)
|
||||
# Update in-memory job status
|
||||
if job_id in compression_manager.jobs:
|
||||
compression_manager.jobs[job_id].status = "pending"
|
||||
compression_manager.jobs[job_id].progress = 0.0
|
||||
compression_manager.jobs[job_id].ffmpeg_pid = None
|
||||
else:
|
||||
# Mark as failed
|
||||
logger.info(f" ✗ Marking as failed (auto-restart disabled)")
|
||||
await compression_manager.persistence.update_job_status(
|
||||
job_id, "failed",
|
||||
error="Job interrupted by application restart/crash"
|
||||
)
|
||||
# Update in-memory job status
|
||||
if job_id in compression_manager.jobs:
|
||||
compression_manager.jobs[job_id].status = "failed"
|
||||
compression_manager.jobs[job_id].error = "Job interrupted by application restart/crash"
|
||||
compression_manager.jobs[job_id].ffmpeg_pid = None
|
||||
else:
|
||||
logger.info("No interrupted jobs found")
|
||||
|
||||
# Get pending jobs
|
||||
pending_jobs = await compression_manager.persistence.get_jobs_by_status("pending")
|
||||
|
||||
if pending_jobs:
|
||||
logger.info(f"Found {len(pending_jobs)} pending jobs - will be processed automatically")
|
||||
|
||||
# Restart pending jobs
|
||||
for job_data in pending_jobs:
|
||||
job_id = job_data['job_id']
|
||||
if job_id in compression_manager.jobs:
|
||||
job = compression_manager.jobs[job_id]
|
||||
# Only restart if not already in active jobs
|
||||
if job_id not in compression_manager.active_jobs:
|
||||
logger.info(f" ⟳ Restarting pending job: {Path(job.file_path).name}")
|
||||
task = asyncio.create_task(compression_manager.compress_video(job))
|
||||
compression_manager.active_jobs[job_id] = task
|
||||
task.add_done_callback(lambda t: compression_manager.active_jobs.pop(job_id, None))
|
||||
else:
|
||||
logger.info("No pending jobs to restart")
|
||||
|
||||
# Optional: Find any other orphaned ffmpeg processes in our path
|
||||
if CLEANUP_ORPHANED_PROCESSES:
|
||||
orphaned = find_orphaned_ffmpeg_processes(FOOTAGES_PATH)
|
||||
if orphaned:
|
||||
logger.warning(f"Found {len(orphaned)} untracked ffmpeg processes in our path:")
|
||||
for proc in orphaned:
|
||||
logger.warning(f" PID {proc['pid']}: {proc['cmdline'][:100]}...")
|
||||
logger.warning(f" ⚠ NOT killing (not in our database) - manual review recommended")
|
||||
|
||||
# Clean up old completed/cancelled jobs
|
||||
deleted = await compression_manager.persistence.cleanup_old_jobs()
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("Compression job recovery complete")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""Run startup tasks"""
|
||||
@@ -111,6 +241,10 @@ async def startup_event():
|
||||
|
||||
# Start background monitoring
|
||||
filesystem_health_checker.start_monitoring()
|
||||
|
||||
# Recover compression jobs from database
|
||||
await recover_compression_jobs()
|
||||
|
||||
logger.info("Application startup complete")
|
||||
|
||||
|
||||
@@ -176,6 +310,8 @@ async def get_dates(location: str) -> List[Dict]:
|
||||
raise HTTPException(status_code=404, detail="Location not found")
|
||||
|
||||
dates = []
|
||||
has_files_in_root = False
|
||||
|
||||
for item in location_path.iterdir():
|
||||
if item.is_dir():
|
||||
stat = await aiofiles.os.stat(item)
|
||||
@@ -183,6 +319,16 @@ async def get_dates(location: str) -> List[Dict]:
|
||||
"name": item.name,
|
||||
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
|
||||
})
|
||||
elif item.is_file():
|
||||
has_files_in_root = True
|
||||
|
||||
# If no date folders but has files in root, return special marker
|
||||
if not dates and has_files_in_root:
|
||||
dates.append({
|
||||
"name": "__root__",
|
||||
"modified": None,
|
||||
"message": "📁 Files not organized by date"
|
||||
})
|
||||
|
||||
# Cache the result
|
||||
directory_cache.set(cache_key, dates)
|
||||
@@ -202,7 +348,11 @@ async def get_files(location: str, date: str) -> List[Dict]:
|
||||
if ".." in location or ".." in date or "/" in location or "/" in date:
|
||||
raise HTTPException(status_code=400, detail="Invalid path characters")
|
||||
|
||||
files_path = (FOOTAGES_PATH / location / date).resolve()
|
||||
# Handle special __root__ marker for locations with files in root
|
||||
if date == "__root__":
|
||||
files_path = (FOOTAGES_PATH / location).resolve()
|
||||
else:
|
||||
files_path = (FOOTAGES_PATH / location / date).resolve()
|
||||
|
||||
# Ensure resolved path is still within FOOTAGES_PATH
|
||||
try:
|
||||
@@ -227,10 +377,14 @@ async def get_files(location: str, date: str) -> List[Dict]:
|
||||
async def stream_video(location: str, date: str, filename: str, request: Request):
|
||||
"""Stream video file with HTTP range request support for fast seeking"""
|
||||
# Sanitize path components to prevent traversal
|
||||
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date:
|
||||
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date or "/" in filename:
|
||||
raise HTTPException(status_code=400, detail="Invalid path characters")
|
||||
|
||||
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
|
||||
# Handle __root__ case (files not in date subdirectories)
|
||||
if date == "__root__":
|
||||
file_path = (FOOTAGES_PATH / location / filename).resolve()
|
||||
else:
|
||||
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
|
||||
|
||||
# Ensure resolved path is still within FOOTAGES_PATH
|
||||
try:
|
||||
@@ -279,20 +433,20 @@ async def stream_video(location: str, date: str, filename: str, request: Request
|
||||
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
||||
"Accept-Ranges": "bytes",
|
||||
"Content-Length": str(content_length),
|
||||
"Content-Type": "video/mp4",
|
||||
"Content-Type": mimetypes.guess_type(file_path)[0] or "video/mp4",
|
||||
}
|
||||
|
||||
return StreamingResponse(
|
||||
iterfile(),
|
||||
status_code=206,
|
||||
headers=headers,
|
||||
media_type="video/mp4"
|
||||
media_type=mimetypes.guess_type(file_path)[0] or "video/mp4"
|
||||
)
|
||||
|
||||
# No range header - return full file
|
||||
return FileResponse(
|
||||
file_path,
|
||||
media_type="video/mp4",
|
||||
media_type=mimetypes.guess_type(file_path)[0] or "video/mp4",
|
||||
headers={"Accept-Ranges": "bytes"}
|
||||
)
|
||||
|
||||
@@ -301,10 +455,14 @@ async def stream_video(location: str, date: str, filename: str, request: Request
|
||||
async def get_image(location: str, date: str, filename: str):
|
||||
"""Serve image file"""
|
||||
# Sanitize path components to prevent traversal
|
||||
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date:
|
||||
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date or "/" in filename:
|
||||
raise HTTPException(status_code=400, detail="Invalid path characters")
|
||||
|
||||
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
|
||||
# Handle __root__ case (files not in date subdirectories)
|
||||
if date == "__root__":
|
||||
file_path = (FOOTAGES_PATH / location / filename).resolve()
|
||||
else:
|
||||
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
|
||||
|
||||
# Ensure resolved path is still within FOOTAGES_PATH
|
||||
try:
|
||||
@@ -319,8 +477,8 @@ async def get_image(location: str, date: str, filename: str):
|
||||
if file_path.suffix not in IMAGE_EXTENSIONS:
|
||||
raise HTTPException(status_code=400, detail="Not an image file")
|
||||
|
||||
# Determine media type
|
||||
media_type = "image/jpeg" if file_path.suffix.lower() in {".jpg", ".jpeg"} else "image/png"
|
||||
# Determine media type dynamically
|
||||
media_type = mimetypes.guess_type(file_path)[0] or "image/jpeg"
|
||||
|
||||
return FileResponse(file_path, media_type=media_type)
|
||||
|
||||
@@ -340,7 +498,11 @@ async def start_compression(request: CompressionRequest):
|
||||
if not 1 <= request.reduce_percentage <= 90:
|
||||
raise HTTPException(status_code=400, detail="Percentage must be between 1-90")
|
||||
|
||||
file_path = FOOTAGES_PATH / request.location / request.date / request.filename
|
||||
# Handle __root__ case (files not in date subdirectories)
|
||||
if request.date == "__root__":
|
||||
file_path = FOOTAGES_PATH / request.location / request.filename
|
||||
else:
|
||||
file_path = FOOTAGES_PATH / request.location / request.date / request.filename
|
||||
|
||||
if not file_path.exists():
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
@@ -358,7 +520,7 @@ async def get_all_jobs():
|
||||
"""Get all compression jobs"""
|
||||
jobs = []
|
||||
# Use snapshot to avoid race condition during iteration
|
||||
for job in compression_manager.get_jobs_snapshot():
|
||||
for job in await compression_manager.get_jobs_snapshot():
|
||||
jobs.append({
|
||||
"job_id": job.job_id,
|
||||
"file_path": job.file_path,
|
||||
@@ -397,13 +559,27 @@ async def get_job_status(job_id: str):
|
||||
|
||||
|
||||
@app.delete("/api/compress/jobs/{job_id}")
|
||||
async def cancel_job(job_id: str):
|
||||
"""Cancel a compression job"""
|
||||
async def delete_job(job_id: str, action: str = "cancel"):
|
||||
"""Delete or cancel a compression job
|
||||
|
||||
Args:
|
||||
job_id: The job ID to delete
|
||||
action: 'cancel' to cancel a running job, 'remove' to remove from list
|
||||
"""
|
||||
if job_id not in compression_manager.jobs:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
|
||||
await compression_manager.cancel_job(job_id)
|
||||
return {"status": "cancelled"}
|
||||
if action == "remove":
|
||||
# Remove completed/failed job from list
|
||||
success = await compression_manager.remove_job(job_id)
|
||||
if success:
|
||||
return {"status": "removed"}
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="Cannot remove active job")
|
||||
else:
|
||||
# Cancel running job
|
||||
await compression_manager.cancel_job(job_id)
|
||||
return {"status": "cancelled"}
|
||||
|
||||
|
||||
@app.get("/api/compress/events")
|
||||
@@ -418,7 +594,7 @@ async def compression_events(request: Request):
|
||||
|
||||
# Send status of all active jobs (use snapshot to avoid race condition)
|
||||
active_jobs = []
|
||||
for job in compression_manager.get_jobs_snapshot():
|
||||
for job in await compression_manager.get_jobs_snapshot():
|
||||
if job.status in ["pending", "processing", "validating"]:
|
||||
active_jobs.append({
|
||||
"job_id": job.job_id,
|
||||
|
||||
211
backend/process_utils.py
Normal file
211
backend/process_utils.py
Normal file
@@ -0,0 +1,211 @@
|
||||
import psutil
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def process_exists(pid: int) -> bool:
|
||||
"""Check if a process with given PID exists"""
|
||||
try:
|
||||
return psutil.pid_exists(pid)
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking if PID {pid} exists: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_process_info(pid: int) -> Optional[dict]:
|
||||
"""Get detailed information about a process"""
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
return {
|
||||
'pid': pid,
|
||||
'name': proc.name(),
|
||||
'cmdline': proc.cmdline(),
|
||||
'cwd': proc.cwd(),
|
||||
'status': proc.status(),
|
||||
'create_time': proc.create_time()
|
||||
}
|
||||
except psutil.NoSuchProcess:
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting process info for PID {pid}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def is_our_ffmpeg_process(pid: int, expected_file_path: str) -> bool:
|
||||
"""
|
||||
Verify if a process is our ffmpeg process by checking:
|
||||
1. Process name is ffmpeg
|
||||
2. Command line contains our input file path
|
||||
3. Working directory is accessible
|
||||
|
||||
Args:
|
||||
pid: Process ID to check
|
||||
expected_file_path: The file path this ffmpeg should be processing
|
||||
|
||||
Returns:
|
||||
True if verified as our ffmpeg process, False otherwise
|
||||
"""
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
|
||||
# Check 1: Process name must be ffmpeg
|
||||
process_name = proc.name().lower()
|
||||
if 'ffmpeg' not in process_name:
|
||||
logger.debug(f"PID {pid}: Not an ffmpeg process (name: {process_name})")
|
||||
return False
|
||||
|
||||
# Check 2: Command line must contain our file path
|
||||
cmdline = proc.cmdline()
|
||||
cmdline_str = ' '.join(cmdline)
|
||||
|
||||
# The file path should appear in the command line
|
||||
if expected_file_path not in cmdline_str:
|
||||
logger.debug(f"PID {pid}: File path '{expected_file_path}' not in command line")
|
||||
return False
|
||||
|
||||
# Check 3: Should have -i flag (input file) and -progress flag (our marker)
|
||||
has_input_flag = '-i' in cmdline
|
||||
has_progress_flag = '-progress' in cmdline or 'progress' in cmdline_str
|
||||
|
||||
if not (has_input_flag and has_progress_flag):
|
||||
logger.debug(f"PID {pid}: Missing expected ffmpeg flags")
|
||||
return False
|
||||
|
||||
logger.info(f"PID {pid}: Verified as our ffmpeg process for '{expected_file_path}'")
|
||||
return True
|
||||
|
||||
except psutil.NoSuchProcess:
|
||||
logger.debug(f"PID {pid}: Process no longer exists")
|
||||
return False
|
||||
except psutil.AccessDenied:
|
||||
logger.warning(f"PID {pid}: Access denied - cannot verify (won't kill)")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying PID {pid}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def kill_process_safely(pid: int, expected_file_path: str, timeout: int = 10) -> bool:
|
||||
"""
|
||||
Safely kill a process after verification
|
||||
|
||||
Args:
|
||||
pid: Process ID to kill
|
||||
expected_file_path: Expected file path to verify against
|
||||
timeout: Seconds to wait for graceful termination
|
||||
|
||||
Returns:
|
||||
True if process was killed successfully, False otherwise
|
||||
"""
|
||||
# Double-check verification before killing
|
||||
if not is_our_ffmpeg_process(pid, expected_file_path):
|
||||
logger.warning(f"PID {pid}: Failed verification - NOT killing for safety")
|
||||
return False
|
||||
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
|
||||
# Try graceful termination first (SIGTERM)
|
||||
logger.info(f"PID {pid}: Sending SIGTERM for graceful shutdown")
|
||||
proc.terminate()
|
||||
|
||||
try:
|
||||
# Wait for process to terminate
|
||||
proc.wait(timeout=timeout)
|
||||
logger.info(f"PID {pid}: Process terminated gracefully")
|
||||
return True
|
||||
except psutil.TimeoutExpired:
|
||||
# Force kill if graceful shutdown failed (SIGKILL)
|
||||
logger.warning(f"PID {pid}: Graceful shutdown timed out, sending SIGKILL")
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
logger.info(f"PID {pid}: Process force killed")
|
||||
return True
|
||||
|
||||
except psutil.NoSuchProcess:
|
||||
logger.info(f"PID {pid}: Process already terminated")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error killing PID {pid}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def cleanup_temp_files(file_path: str) -> int:
|
||||
"""
|
||||
Clean up temporary files associated with a compression job
|
||||
|
||||
Args:
|
||||
file_path: Original video file path
|
||||
|
||||
Returns:
|
||||
Number of files cleaned up
|
||||
"""
|
||||
cleaned_count = 0
|
||||
file_path_obj = Path(file_path)
|
||||
parent_dir = file_path_obj.parent
|
||||
|
||||
try:
|
||||
# Clean up temp file: temp_<filename>
|
||||
temp_file = parent_dir / f"temp_{file_path_obj.name}"
|
||||
if temp_file.exists():
|
||||
temp_file.unlink()
|
||||
logger.info(f"Cleaned up temp file: {temp_file.name}")
|
||||
cleaned_count += 1
|
||||
|
||||
# Clean up ffmpeg pass log file
|
||||
log_file = parent_dir / "ffmpeg2pass-0.log"
|
||||
if log_file.exists():
|
||||
log_file.unlink()
|
||||
logger.info(f"Cleaned up ffmpeg log: {log_file.name}")
|
||||
cleaned_count += 1
|
||||
|
||||
# Also check for mbtree file (created by x264)
|
||||
mbtree_file = parent_dir / "ffmpeg2pass-0.log.mbtree"
|
||||
if mbtree_file.exists():
|
||||
mbtree_file.unlink()
|
||||
logger.info(f"Cleaned up mbtree file: {mbtree_file.name}")
|
||||
cleaned_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up temp files for {file_path}: {e}")
|
||||
|
||||
return cleaned_count
|
||||
|
||||
|
||||
def find_orphaned_ffmpeg_processes(allowed_base_path: Path) -> List[dict]:
|
||||
"""
|
||||
Find all ffmpeg processes that are working on files within our base path
|
||||
This helps identify potential orphaned processes
|
||||
|
||||
Args:
|
||||
allowed_base_path: Base path where our videos are stored
|
||||
|
||||
Returns:
|
||||
List of process info dicts
|
||||
"""
|
||||
orphaned = []
|
||||
allowed_path_str = str(allowed_base_path.resolve())
|
||||
|
||||
try:
|
||||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||||
try:
|
||||
if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower():
|
||||
cmdline = proc.info.get('cmdline', [])
|
||||
cmdline_str = ' '.join(cmdline) if cmdline else ''
|
||||
|
||||
# Check if this ffmpeg is working on a file in our path
|
||||
if allowed_path_str in cmdline_str:
|
||||
orphaned.append({
|
||||
'pid': proc.info['pid'],
|
||||
'cmdline': cmdline_str,
|
||||
'name': proc.info['name']
|
||||
})
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Error finding orphaned processes: {e}")
|
||||
|
||||
return orphaned
|
||||
@@ -3,3 +3,5 @@ uvicorn[standard]==0.27.0
|
||||
python-multipart==0.0.6
|
||||
aiofiles==23.2.1
|
||||
sse-starlette==1.6.5
|
||||
aiosqlite==0.19.0
|
||||
psutil==5.9.8
|
||||
|
||||
@@ -8,6 +8,8 @@ services:
|
||||
container_name: drone-footage-backend
|
||||
volumes:
|
||||
- /home/uad/nextcloud/footages:/footages
|
||||
environment:
|
||||
- COMPRESSION_WATCHDOG_TIMEOUT=${COMPRESSION_WATCHDOG_TIMEOUT:-900}
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- drone-footage-network
|
||||
|
||||
@@ -21,7 +21,8 @@ FROM nginx:alpine
|
||||
# Copy built assets from builder stage
|
||||
COPY --from=builder /app/dist /usr/share/nginx/html
|
||||
|
||||
# Copy nginx configuration
|
||||
# Copy nginx configurations
|
||||
COPY nginx-main.conf /etc/nginx/nginx.conf
|
||||
COPY nginx.conf /etc/nginx/conf.d/default.conf
|
||||
|
||||
# Expose port
|
||||
|
||||
29
frontend/nginx-main.conf
Normal file
29
frontend/nginx-main.conf
Normal file
@@ -0,0 +1,29 @@
|
||||
user nginx;
|
||||
worker_processes 2;
|
||||
|
||||
error_log /var/log/nginx/error.log notice;
|
||||
pid /var/run/nginx.pid;
|
||||
|
||||
events {
|
||||
worker_connections 1024;
|
||||
}
|
||||
|
||||
http {
|
||||
include /etc/nginx/mime.types;
|
||||
default_type application/octet-stream;
|
||||
|
||||
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
|
||||
'$status $body_bytes_sent "$http_referer" '
|
||||
'"$http_user_agent" "$http_x_forwarded_for"';
|
||||
|
||||
access_log /var/log/nginx/access.log main;
|
||||
|
||||
sendfile on;
|
||||
#tcp_nopush on;
|
||||
|
||||
keepalive_timeout 65;
|
||||
|
||||
#gzip on;
|
||||
|
||||
include /etc/nginx/conf.d/*.conf;
|
||||
}
|
||||
@@ -5,9 +5,27 @@ server {
|
||||
root /usr/share/nginx/html;
|
||||
index index.html;
|
||||
|
||||
# Serve static files
|
||||
# Serve static files with proper cache control
|
||||
location / {
|
||||
try_files $uri $uri/ /index.html;
|
||||
|
||||
# Security headers for all responses
|
||||
add_header X-Content-Type-Options "nosniff" always;
|
||||
add_header X-Frame-Options "SAMEORIGIN" always;
|
||||
add_header X-XSS-Protection "1; mode=block" always;
|
||||
|
||||
# No cache for index.html - always check for updates
|
||||
location = /index.html {
|
||||
add_header Cache-Control "private, no-cache, no-store, must-revalidate";
|
||||
add_header Pragma "no-cache";
|
||||
add_header Expires "0";
|
||||
}
|
||||
|
||||
# Cache assets with hash in filename (private for single-user app)
|
||||
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {
|
||||
expires 1y;
|
||||
add_header Cache-Control "private, immutable";
|
||||
}
|
||||
}
|
||||
|
||||
# Proxy API requests to backend
|
||||
|
||||
@@ -7,7 +7,7 @@ import { formatFileSize } from './utils/formatters'
|
||||
|
||||
function App() {
|
||||
const { isHealthy, error: healthError } = useSystemHealth()
|
||||
const { jobs } = useCompressionJobs()
|
||||
const { jobs, fetchJobs } = useCompressionJobs()
|
||||
const [locations, setLocations] = useState([])
|
||||
const [selectedLocation, setSelectedLocation] = useState(null)
|
||||
const [dates, setDates] = useState([])
|
||||
@@ -166,7 +166,20 @@ function App() {
|
||||
return sorted
|
||||
}
|
||||
|
||||
const getSortedLocations = () => sortItems(locations, locationSort)
|
||||
const getSortedLocations = () => {
|
||||
const sorted = sortItems(locations, locationSort)
|
||||
// Separate dotfiles from regular locations
|
||||
const regularLocations = sorted.filter(loc => {
|
||||
const name = loc.name || loc
|
||||
return !name.startsWith('.')
|
||||
})
|
||||
const dotfiles = sorted.filter(loc => {
|
||||
const name = loc.name || loc
|
||||
return name.startsWith('.')
|
||||
})
|
||||
// Return regular locations first, then dotfiles at the bottom
|
||||
return [...regularLocations, ...dotfiles]
|
||||
}
|
||||
const getSortedDates = () => sortItems(dates, dateSort)
|
||||
const getSortedFiles = () => sortItems(files, fileSort)
|
||||
|
||||
@@ -205,11 +218,13 @@ function App() {
|
||||
})
|
||||
|
||||
if (response.ok) {
|
||||
// Immediately refresh jobs list to show the new job
|
||||
await fetchJobs()
|
||||
const hasActiveJobs = jobs.some(j => ['pending', 'processing', 'validating'].includes(j.status))
|
||||
if (hasActiveJobs) {
|
||||
showToast('Compression job queued successfully!', 'success')
|
||||
showToast(`Compression queued (${percentage}% reduction)`, 'success')
|
||||
} else {
|
||||
showToast('Compression job started!', 'success')
|
||||
showToast(`Started compressing ${selectedFile.name} (${percentage}% reduction)`, 'success')
|
||||
}
|
||||
} else {
|
||||
const error = await response.json()
|
||||
@@ -261,7 +276,7 @@ function App() {
|
||||
{selectedFile.is_video ? (
|
||||
<video
|
||||
ref={videoRef}
|
||||
key={getMediaUrl(selectedFile)}
|
||||
key={selectedFile.name}
|
||||
controls
|
||||
preload="metadata"
|
||||
className="w-full max-h-[70vh]"
|
||||
@@ -361,12 +376,16 @@ function App() {
|
||||
<ul className="space-y-1">
|
||||
{getSortedLocations().map((location) => {
|
||||
const locationName = location.name || location
|
||||
const isDotfile = locationName.startsWith('.')
|
||||
return (
|
||||
<li key={locationName}>
|
||||
<button
|
||||
onClick={() => handleLocationClick(locationName)}
|
||||
className={`w-full text-left px-3 py-2 rounded hover:bg-blue-50 transition ${
|
||||
selectedLocation === locationName ? 'bg-blue-100 font-semibold' : ''
|
||||
onClick={() => !isDotfile && handleLocationClick(locationName)}
|
||||
disabled={isDotfile}
|
||||
className={`w-full text-left px-3 py-2 rounded transition ${
|
||||
isDotfile
|
||||
? 'text-gray-400 cursor-not-allowed bg-gray-50'
|
||||
: `hover:bg-blue-50 ${selectedLocation === locationName ? 'bg-blue-100 font-semibold' : ''}`
|
||||
}`}
|
||||
>
|
||||
{locationName}
|
||||
@@ -409,15 +428,26 @@ function App() {
|
||||
<ul className="space-y-1">
|
||||
{getSortedDates().map((date) => {
|
||||
const dateName = date.name || date
|
||||
const isRootFiles = dateName === "__root__"
|
||||
const displayName = isRootFiles ? "All Files" : dateName
|
||||
const message = date.message || null
|
||||
|
||||
return (
|
||||
<li key={dateName}>
|
||||
<button
|
||||
onClick={() => handleDateClick(dateName)}
|
||||
className={`w-full text-left px-3 py-2 rounded hover:bg-blue-50 transition ${
|
||||
className={`w-full text-left px-3 py-2 rounded transition ${
|
||||
isRootFiles
|
||||
? 'bg-yellow-50 border border-yellow-200 hover:bg-yellow-100'
|
||||
: 'hover:bg-blue-50'
|
||||
} ${
|
||||
selectedDate === dateName ? 'bg-blue-100 font-semibold' : ''
|
||||
}`}
|
||||
>
|
||||
{dateName}
|
||||
<div className="font-medium">{displayName}</div>
|
||||
{message && (
|
||||
<div className="text-xs text-yellow-700 mt-1">{message}</div>
|
||||
)}
|
||||
</button>
|
||||
</li>
|
||||
)
|
||||
|
||||
@@ -31,12 +31,24 @@ export function useCompressionJobs() {
|
||||
const updates = JSON.parse(event.data)
|
||||
setJobs(prevJobs => {
|
||||
const updatedJobs = [...prevJobs]
|
||||
let hasNewJobs = false
|
||||
|
||||
updates.forEach(update => {
|
||||
const index = updatedJobs.findIndex(j => j.job_id === update.job_id)
|
||||
if (index !== -1) {
|
||||
// Update existing job
|
||||
updatedJobs[index] = { ...updatedJobs[index], ...update }
|
||||
} else {
|
||||
// New job detected - we need complete data
|
||||
hasNewJobs = true
|
||||
}
|
||||
})
|
||||
|
||||
// Trigger full fetch if new jobs detected
|
||||
if (hasNewJobs) {
|
||||
fetchJobs()
|
||||
}
|
||||
|
||||
return updatedJobs
|
||||
})
|
||||
})
|
||||
@@ -44,7 +56,9 @@ export function useCompressionJobs() {
|
||||
eventSource.onerror = () => {
|
||||
console.error('SSE connection error')
|
||||
eventSource.close()
|
||||
setTimeout(connectSSE, 5000) // Reconnect after 5s
|
||||
setTimeout(() => {
|
||||
connectSSE()
|
||||
}, 5000)
|
||||
}
|
||||
|
||||
eventSourceRef.current = eventSource
|
||||
|
||||
@@ -53,7 +53,9 @@ export function useSystemHealth() {
|
||||
console.error('System health SSE connection error')
|
||||
eventSource.close()
|
||||
// Reconnect after 10 seconds
|
||||
setTimeout(connectSSE, 10000)
|
||||
setTimeout(() => {
|
||||
connectSSE()
|
||||
}, 10000)
|
||||
}
|
||||
|
||||
eventSourceRef.current = eventSource
|
||||
|
||||
@@ -11,7 +11,7 @@ export const formatFileSize = (bytes) => {
|
||||
}
|
||||
|
||||
export const formatETA = (seconds) => {
|
||||
if (!seconds) return '--'
|
||||
if (seconds == null || seconds === undefined) return '--'
|
||||
const mins = Math.floor(seconds / 60)
|
||||
const secs = seconds % 60
|
||||
return `${mins}m ${secs}s`
|
||||
|
||||
Reference in New Issue
Block a user