Compare commits
3 Commits
d8242d47b9
...
92338df716
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92338df716 | ||
|
|
80ca19cb4b | ||
|
|
91141eadcf |
@@ -4,9 +4,11 @@ import uuid
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
from job_persistence import JobPersistence
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
@@ -30,8 +32,8 @@ MIN_VIDEO_BITRATE = 100 # kbps
|
||||
|
||||
|
||||
class CompressionJob:
|
||||
def __init__(self, file_path: str, reduce_percentage: int):
|
||||
self.job_id = str(uuid.uuid4())
|
||||
def __init__(self, file_path: str, reduce_percentage: int, job_id: Optional[str] = None):
|
||||
self.job_id = job_id or str(uuid.uuid4())
|
||||
self.file_path = file_path
|
||||
self.reduce_percentage = reduce_percentage
|
||||
self.status = "pending" # pending, processing, validating, completed, failed, cancelled
|
||||
@@ -47,6 +49,59 @@ class CompressionJob:
|
||||
self.target_size_mb = None
|
||||
self.video_bitrate = None
|
||||
self.duration_seconds = None
|
||||
self.ffmpeg_pid = None # Track ffmpeg process PID
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
"""Convert job to dictionary for database persistence"""
|
||||
return {
|
||||
'job_id': self.job_id,
|
||||
'file_path': self.file_path,
|
||||
'reduce_percentage': self.reduce_percentage,
|
||||
'status': self.status,
|
||||
'progress': self.progress,
|
||||
'eta_seconds': self.eta_seconds,
|
||||
'current_pass': self.current_pass,
|
||||
'created_at': self.created_at,
|
||||
'started_at': self.started_at,
|
||||
'completed_at': self.completed_at,
|
||||
'error': self.error,
|
||||
'output_file': self.output_file,
|
||||
'current_size_mb': self.current_size_mb,
|
||||
'target_size_mb': self.target_size_mb,
|
||||
'video_bitrate': self.video_bitrate,
|
||||
'duration_seconds': self.duration_seconds,
|
||||
'ffmpeg_pid': self.ffmpeg_pid
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict) -> 'CompressionJob':
|
||||
"""Create job from database dictionary"""
|
||||
job = cls(
|
||||
file_path=data['file_path'],
|
||||
reduce_percentage=data['reduce_percentage'],
|
||||
job_id=data['job_id']
|
||||
)
|
||||
job.status = data.get('status', 'pending')
|
||||
job.progress = data.get('progress', 0.0)
|
||||
job.eta_seconds = data.get('eta_seconds')
|
||||
job.current_pass = data.get('current_pass', 0)
|
||||
job.error = data.get('error')
|
||||
job.output_file = data.get('output_file')
|
||||
job.current_size_mb = data.get('current_size_mb')
|
||||
job.target_size_mb = data.get('target_size_mb')
|
||||
job.video_bitrate = data.get('video_bitrate')
|
||||
job.duration_seconds = data.get('duration_seconds')
|
||||
job.ffmpeg_pid = data.get('ffmpeg_pid')
|
||||
|
||||
# Parse datetime strings back to datetime objects
|
||||
for key in ['created_at', 'started_at', 'completed_at']:
|
||||
if data.get(key):
|
||||
if isinstance(data[key], str):
|
||||
job.__dict__[key] = datetime.fromisoformat(data[key])
|
||||
else:
|
||||
job.__dict__[key] = data[key]
|
||||
|
||||
return job
|
||||
|
||||
|
||||
class CompressionManager:
|
||||
@@ -54,15 +109,36 @@ class CompressionManager:
|
||||
TIME_PATTERN = re.compile(r'out_time_ms=(\d+)')
|
||||
FPS_PATTERN = re.compile(r'fps=([\d.]+)')
|
||||
|
||||
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None):
|
||||
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None, health_checker=None, cache_invalidation_callback=None):
|
||||
self.jobs: Dict[str, CompressionJob] = {}
|
||||
self.active_jobs: Dict[str, asyncio.Task] = {}
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent)
|
||||
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
|
||||
self.health_checker = health_checker
|
||||
self.cache_invalidation_callback = cache_invalidation_callback
|
||||
self._pruning_task: Optional[asyncio.Task] = None
|
||||
self._jobs_lock = asyncio.Lock()
|
||||
self._start_periodic_pruning()
|
||||
self.persistence = JobPersistence()
|
||||
self._pruning_started = False
|
||||
|
||||
async def initialize_persistence(self):
|
||||
"""Initialize the persistence layer - call this on startup"""
|
||||
await self.persistence.initialize()
|
||||
logger.info("Job persistence initialized")
|
||||
|
||||
# Start periodic pruning (only once when persistence is initialized)
|
||||
if not self._pruning_started:
|
||||
self._start_periodic_pruning()
|
||||
self._pruning_started = True
|
||||
|
||||
async def load_jobs_from_database(self):
|
||||
"""Load all jobs from database into memory"""
|
||||
async with self._jobs_lock:
|
||||
jobs_data = await self.persistence.get_all_jobs()
|
||||
for job_data in jobs_data:
|
||||
job = CompressionJob.from_dict(job_data)
|
||||
self.jobs[job.job_id] = job
|
||||
logger.info(f"Loaded {len(jobs_data)} jobs from database")
|
||||
|
||||
async def get_video_info(self, file_path: str) -> Dict:
|
||||
"""Extract video duration and file size using ffprobe"""
|
||||
@@ -106,6 +182,7 @@ class CompressionManager:
|
||||
try:
|
||||
job.status = "processing"
|
||||
job.started_at = datetime.now()
|
||||
await self.persistence.save_job(job.to_dict())
|
||||
|
||||
# Get video information
|
||||
info = await self.get_video_info(job.file_path)
|
||||
@@ -125,10 +202,32 @@ class CompressionManager:
|
||||
f"Duration: {job.duration_seconds}s, "
|
||||
f"Video Bitrate: {job.video_bitrate} kbps")
|
||||
|
||||
# Generate output filename
|
||||
# Check disk space BEFORE starting compression
|
||||
file_path = Path(job.file_path)
|
||||
disk_usage = shutil.disk_usage(file_path.parent)
|
||||
|
||||
# Estimate required space: temp file + original + safety margin (1.5x original)
|
||||
required_bytes = (job.current_size_mb * 1024 * 1024) * 1.5
|
||||
available_bytes = disk_usage.free
|
||||
|
||||
if available_bytes < required_bytes:
|
||||
required_gb = required_bytes / (1024**3)
|
||||
available_gb = available_bytes / (1024**3)
|
||||
error_msg = (
|
||||
f"Insufficient disk space: need {required_gb:.1f}GB, "
|
||||
f"have {available_gb:.1f}GB available"
|
||||
)
|
||||
logger.error(f"Job {job.job_id}: {error_msg}")
|
||||
raise ValueError(error_msg)
|
||||
|
||||
logger.info(
|
||||
f"Job {job.job_id}: Disk space check passed "
|
||||
f"(need {required_bytes/(1024**3):.1f}GB, have {available_bytes/(1024**3):.1f}GB)"
|
||||
)
|
||||
|
||||
# Generate output filename (use original path for in-place replacement)
|
||||
file_path = Path(job.file_path)
|
||||
temp_file = file_path.parent / f"temp_{file_path.name}"
|
||||
output_file = file_path.parent / f"{file_path.stem}_compressed_{job.reduce_percentage}{file_path.suffix}"
|
||||
|
||||
# PASS 1: Analysis
|
||||
job.current_pass = 1
|
||||
@@ -146,31 +245,42 @@ class CompressionManager:
|
||||
self.cleanup_temp_files(job)
|
||||
return
|
||||
|
||||
# VALIDATION
|
||||
# SAFE FILE REPLACEMENT WITH VALIDATION
|
||||
job.status = "validating"
|
||||
job.progress = 95.0
|
||||
if await self.validate_video(temp_file):
|
||||
# Move temp file to final output
|
||||
os.rename(temp_file, output_file)
|
||||
job.output_file = str(output_file)
|
||||
await self.persistence.update_job_status(job.job_id, "validating", progress=95.0)
|
||||
logger.info(f"Job {job.job_id}: Starting safe file replacement process")
|
||||
|
||||
if await self.safe_replace_file(file_path, temp_file, job):
|
||||
job.output_file = str(file_path) # Output is same as original path
|
||||
job.status = "completed"
|
||||
job.progress = 100.0
|
||||
job.completed_at = datetime.now()
|
||||
job.ffmpeg_pid = None # Clear PID on completion
|
||||
await self.persistence.save_job(job.to_dict())
|
||||
self.cleanup_temp_files(job)
|
||||
logger.info(f"Job {job.job_id} completed successfully")
|
||||
logger.info(f"Job {job.job_id} completed successfully - original file replaced in-place")
|
||||
else:
|
||||
job.status = "failed"
|
||||
job.error = "Validation failed: Compressed video is corrupted"
|
||||
job.error = "File replacement failed or validation check failed"
|
||||
await self.persistence.update_job_status(job.job_id, "failed", error=job.error)
|
||||
self.cleanup_temp_files(job)
|
||||
logger.error(f"Job {job.job_id} failed validation")
|
||||
logger.error(f"Job {job.job_id} failed - file replacement unsuccessful")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
job.status = "cancelled"
|
||||
# Don't mark as cancelled - could be app shutdown
|
||||
# Keep current status so job can be recovered on restart
|
||||
job.ffmpeg_pid = None # Clear PID since process won't survive
|
||||
# Don't update database status - leave it as "processing" or "validating"
|
||||
# for recovery on restart
|
||||
self.cleanup_temp_files(job)
|
||||
logger.info(f"Job {job.job_id} cancelled")
|
||||
logger.info(f"Job {job.job_id} interrupted (task cancelled)")
|
||||
raise # Re-raise to properly cancel the task
|
||||
except Exception as e:
|
||||
job.status = "failed"
|
||||
job.error = str(e)
|
||||
job.ffmpeg_pid = None # Clear PID on failure
|
||||
await self.persistence.update_job_status(job.job_id, "failed", error=str(e))
|
||||
self.cleanup_temp_files(job)
|
||||
logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
|
||||
|
||||
@@ -216,15 +326,25 @@ class CompressionManager:
|
||||
limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow
|
||||
)
|
||||
|
||||
# Track PID for first pass only (second pass uses same job)
|
||||
if pass_num == 1 and process.pid:
|
||||
job.ffmpeg_pid = process.pid
|
||||
await self.persistence.set_ffmpeg_pid(job.job_id, process.pid)
|
||||
logger.info(f"Job {job.job_id}: ffmpeg PID {process.pid} tracked")
|
||||
|
||||
# Progress tracking
|
||||
stderr_output = []
|
||||
last_progress_update = datetime.now()
|
||||
last_progress_update = None # Initialize as None, will be set when FFmpeg starts producing output
|
||||
|
||||
async def read_stdout():
|
||||
"""Read and process stdout for progress updates"""
|
||||
nonlocal last_progress_update
|
||||
try:
|
||||
async for line in process.stdout:
|
||||
# Initialize timer on first progress update from FFmpeg
|
||||
if last_progress_update is None:
|
||||
last_progress_update = datetime.now()
|
||||
logger.info(f"Job {job.job_id}: FFmpeg started producing output, watchdog timer initialized")
|
||||
if job.status == "cancelled":
|
||||
process.kill()
|
||||
return
|
||||
@@ -259,6 +379,16 @@ class CompressionManager:
|
||||
else:
|
||||
remaining_sec = remaining_sec_current_pass
|
||||
job.eta_seconds = int(remaining_sec)
|
||||
|
||||
# Periodically save progress to database (every ~5% or 30 seconds)
|
||||
# Use modulo to reduce DB writes
|
||||
if int(job.progress) % 5 == 0:
|
||||
await self.persistence.update_job_progress(
|
||||
job.job_id,
|
||||
job.progress,
|
||||
job.eta_seconds,
|
||||
job.current_pass
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading stdout: {e}")
|
||||
|
||||
@@ -289,14 +419,16 @@ class CompressionManager:
|
||||
process.kill()
|
||||
return
|
||||
|
||||
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
||||
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
|
||||
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
||||
logger.error(f"Watchdog: {error_msg}")
|
||||
job.status = "failed"
|
||||
job.error = error_msg
|
||||
process.kill()
|
||||
raise Exception(error_msg)
|
||||
# Only check timeout if FFmpeg has started producing output
|
||||
if last_progress_update is not None:
|
||||
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
||||
if time_since_update > WATCHDOG_TIMEOUT_SECONDS:
|
||||
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
||||
logger.error(f"Watchdog: {error_msg}")
|
||||
job.status = "failed"
|
||||
job.error = error_msg
|
||||
process.kill()
|
||||
raise Exception(error_msg)
|
||||
|
||||
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
|
||||
|
||||
@@ -348,6 +480,134 @@ class CompressionManager:
|
||||
# Also check return code
|
||||
return process.returncode == 0
|
||||
|
||||
async def validate_video_enhanced(self, original_path: Path, compressed_path: Path) -> bool:
|
||||
"""Enhanced validation with size and duration checks before file replacement"""
|
||||
# 1. Basic ffmpeg decode validation
|
||||
if not await self.validate_video(compressed_path):
|
||||
logger.error(f"Enhanced validation failed: FFmpeg decode check failed")
|
||||
return False
|
||||
|
||||
# 2. File size sanity check (minimum 1KB)
|
||||
try:
|
||||
compressed_size = os.path.getsize(str(compressed_path))
|
||||
if compressed_size < 1024:
|
||||
logger.error(f"Enhanced validation failed: Compressed file too small ({compressed_size} bytes)")
|
||||
return False
|
||||
logger.info(f"Enhanced validation: Size check passed ({compressed_size} bytes)")
|
||||
except Exception as e:
|
||||
logger.error(f"Enhanced validation failed: Could not check file size: {e}")
|
||||
return False
|
||||
|
||||
# 3. Duration verification (optional but recommended)
|
||||
try:
|
||||
original_info = await self.get_video_info(str(original_path))
|
||||
compressed_info = await self.get_video_info(str(compressed_path))
|
||||
|
||||
duration_diff = abs(original_info['duration_seconds'] - compressed_info['duration_seconds'])
|
||||
# Allow up to 1 second difference due to rounding
|
||||
if duration_diff > 1.0:
|
||||
logger.error(f"Enhanced validation failed: Duration mismatch - "
|
||||
f"original: {original_info['duration_seconds']:.2f}s vs "
|
||||
f"compressed: {compressed_info['duration_seconds']:.2f}s "
|
||||
f"(diff: {duration_diff:.2f}s)")
|
||||
return False
|
||||
logger.info(f"Enhanced validation: Duration check passed "
|
||||
f"(original: {original_info['duration_seconds']:.2f}s, "
|
||||
f"compressed: {compressed_info['duration_seconds']:.2f}s)")
|
||||
except Exception as e:
|
||||
logger.error(f"Enhanced validation failed: Could not verify duration: {e}")
|
||||
return False
|
||||
|
||||
logger.info(f"Enhanced validation: All checks passed for {compressed_path.name}")
|
||||
return True
|
||||
|
||||
async def safe_replace_file(self, original_path: Path, temp_file: Path, job: CompressionJob) -> bool:
|
||||
"""Safely replace original file with compressed version with rollback capability"""
|
||||
backup_path = original_path.parent / f"{original_path.name}.backup"
|
||||
|
||||
try:
|
||||
# Enhanced validation BEFORE any file operations
|
||||
logger.info(f"Validating compressed file before replacement: {temp_file.name}")
|
||||
if not await self.validate_video_enhanced(original_path, temp_file):
|
||||
logger.error(f"Safe replace failed: Validation check failed")
|
||||
return False
|
||||
|
||||
# Step 1: Create backup of original file
|
||||
logger.info(f"Creating backup: {original_path.name} -> {backup_path.name}")
|
||||
try:
|
||||
os.replace(str(original_path), str(backup_path))
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace failed: Could not create backup: {e}")
|
||||
return False
|
||||
|
||||
# Step 2: Replace original with compressed file (atomic operation)
|
||||
logger.info(f"Replacing original file with compressed version: {temp_file.name}")
|
||||
try:
|
||||
os.replace(str(temp_file), str(original_path))
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace failed: Could not move compressed file. Attempting rollback...")
|
||||
# Rollback: restore original from backup
|
||||
try:
|
||||
if backup_path.exists():
|
||||
os.replace(str(backup_path), str(original_path))
|
||||
logger.info(f"Rollback successful: Original file restored")
|
||||
except Exception as rollback_error:
|
||||
logger.error(f"Rollback failed: {rollback_error}")
|
||||
return False
|
||||
|
||||
# Step 3: Verify final file is accessible
|
||||
logger.info(f"Verifying replaced file is accessible: {original_path.name}")
|
||||
try:
|
||||
if not original_path.exists() or not original_path.is_file():
|
||||
logger.error(f"Safe replace failed: Replaced file not accessible. Attempting rollback...")
|
||||
# Rollback: restore original from backup
|
||||
try:
|
||||
if backup_path.exists():
|
||||
os.replace(str(backup_path), str(original_path))
|
||||
logger.info(f"Rollback successful: Original file restored")
|
||||
except Exception as rollback_error:
|
||||
logger.error(f"Rollback failed: {rollback_error}")
|
||||
return False
|
||||
|
||||
# Get final file size
|
||||
final_size = os.path.getsize(str(original_path))
|
||||
logger.info(f"File replacement successful. Final size: {final_size / (1024*1024):.2f} MB")
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace failed: Could not verify final file: {e}")
|
||||
return False
|
||||
|
||||
# Step 4: Delete backup file only after successful verification
|
||||
logger.info(f"Deleting backup file: {backup_path.name}")
|
||||
try:
|
||||
if backup_path.exists():
|
||||
backup_path.unlink()
|
||||
logger.info(f"Backup file deleted successfully")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete backup file (non-critical): {e}")
|
||||
|
||||
# Step 5: Invalidate cache for this directory to show updated file size
|
||||
if self.cache_invalidation_callback:
|
||||
try:
|
||||
# Invalidate all cache entries containing the parent directory path
|
||||
parent_name = original_path.parent.name
|
||||
self.cache_invalidation_callback(parent_name)
|
||||
logger.info(f"Cache invalidated for directory: {parent_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to invalidate cache (non-critical): {e}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Safe replace encountered unexpected error: {e}", exc_info=True)
|
||||
# Attempt rollback on any unexpected error
|
||||
try:
|
||||
if backup_path.exists():
|
||||
os.replace(str(backup_path), str(original_path))
|
||||
logger.info(f"Rollback successful: Original file restored after unexpected error")
|
||||
except Exception as rollback_error:
|
||||
logger.error(f"Rollback failed: {rollback_error}")
|
||||
return False
|
||||
|
||||
def cleanup_temp_files(self, job: CompressionJob):
|
||||
"""Clean up temporary files"""
|
||||
file_path = Path(job.file_path)
|
||||
@@ -443,6 +703,9 @@ class CompressionManager:
|
||||
job = CompressionJob(file_path, reduce_percentage)
|
||||
self.jobs[job.job_id] = job
|
||||
|
||||
# Save initial job to database
|
||||
await self.persistence.save_job(job.to_dict())
|
||||
|
||||
# Start compression in background
|
||||
task = asyncio.create_task(self.compress_video(job))
|
||||
self.active_jobs[job.job_id] = task
|
||||
@@ -453,9 +716,11 @@ class CompressionManager:
|
||||
return job.job_id
|
||||
|
||||
async def cancel_job(self, job_id: str):
|
||||
"""Cancel a running compression job"""
|
||||
"""Cancel a running compression job (user-initiated)"""
|
||||
if job_id in self.jobs:
|
||||
self.jobs[job_id].status = "cancelled"
|
||||
# Save cancelled status to database (user-initiated cancellation)
|
||||
await self.persistence.update_job_status(job_id, "cancelled")
|
||||
if job_id in self.active_jobs:
|
||||
self.active_jobs[job_id].cancel()
|
||||
|
||||
|
||||
243
backend/job_persistence.py
Normal file
243
backend/job_persistence.py
Normal file
@@ -0,0 +1,243 @@
|
||||
import sqlite3
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Any
|
||||
from contextlib import asynccontextmanager
|
||||
import aiosqlite
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Database configuration from environment
|
||||
DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db')
|
||||
JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7'))
|
||||
|
||||
|
||||
class JobPersistence:
|
||||
"""SQLite-based persistence layer for compression jobs"""
|
||||
|
||||
def __init__(self, db_path: str = DATABASE_PATH):
|
||||
self.db_path = db_path
|
||||
self._initialized = False
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize database schema"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
async with self._lock:
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Ensure directory exists
|
||||
db_dir = Path(self.db_path).parent
|
||||
db_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute("""
|
||||
CREATE TABLE IF NOT EXISTS compression_jobs (
|
||||
job_id TEXT PRIMARY KEY,
|
||||
file_path TEXT NOT NULL,
|
||||
reduce_percentage INTEGER NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
progress REAL DEFAULT 0.0,
|
||||
current_pass INTEGER DEFAULT 0,
|
||||
current_size_mb REAL,
|
||||
target_size_mb REAL,
|
||||
video_bitrate INTEGER,
|
||||
duration_seconds REAL,
|
||||
eta_seconds INTEGER,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
started_at TIMESTAMP,
|
||||
completed_at TIMESTAMP,
|
||||
output_file TEXT,
|
||||
error TEXT,
|
||||
ffmpeg_pid INTEGER
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for performance
|
||||
await db.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_status
|
||||
ON compression_jobs(status)
|
||||
""")
|
||||
|
||||
await db.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_created_at
|
||||
ON compression_jobs(created_at)
|
||||
""")
|
||||
|
||||
await db.commit()
|
||||
|
||||
self._initialized = True
|
||||
logger.info(f"Job persistence initialized at {self.db_path}")
|
||||
|
||||
async def save_job(self, job_data: Dict[str, Any]):
|
||||
"""Insert or update a job in the database"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
# Convert datetime objects to ISO format strings
|
||||
job_data_copy = job_data.copy()
|
||||
for key in ['created_at', 'started_at', 'completed_at']:
|
||||
if key in job_data_copy and job_data_copy[key]:
|
||||
if isinstance(job_data_copy[key], datetime):
|
||||
job_data_copy[key] = job_data_copy[key].isoformat()
|
||||
|
||||
await db.execute("""
|
||||
INSERT OR REPLACE INTO compression_jobs (
|
||||
job_id, file_path, reduce_percentage, status, progress,
|
||||
current_pass, current_size_mb, target_size_mb, video_bitrate,
|
||||
duration_seconds, eta_seconds, created_at, started_at,
|
||||
completed_at, output_file, error, ffmpeg_pid
|
||||
) VALUES (
|
||||
:job_id, :file_path, :reduce_percentage, :status, :progress,
|
||||
:current_pass, :current_size_mb, :target_size_mb, :video_bitrate,
|
||||
:duration_seconds, :eta_seconds, :created_at, :started_at,
|
||||
:completed_at, :output_file, :error, :ffmpeg_pid
|
||||
)
|
||||
""", job_data_copy)
|
||||
await db.commit()
|
||||
|
||||
async def update_job_status(self, job_id: str, status: str,
|
||||
progress: Optional[float] = None,
|
||||
error: Optional[str] = None):
|
||||
"""Quick update for job status and progress"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
updates = {"status": status}
|
||||
if progress is not None:
|
||||
updates["progress"] = progress
|
||||
if error is not None:
|
||||
updates["error"] = error
|
||||
|
||||
# Set completed_at for terminal states
|
||||
if status in ['completed', 'failed', 'cancelled']:
|
||||
updates["completed_at"] = datetime.now().isoformat()
|
||||
|
||||
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
|
||||
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
|
||||
|
||||
await db.execute(query, {**updates, "job_id": job_id})
|
||||
await db.commit()
|
||||
|
||||
async def update_job_progress(self, job_id: str, progress: float,
|
||||
eta_seconds: Optional[int] = None,
|
||||
current_pass: Optional[int] = None):
|
||||
"""Update job progress and ETA"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
updates = {"progress": progress}
|
||||
if eta_seconds is not None:
|
||||
updates["eta_seconds"] = eta_seconds
|
||||
if current_pass is not None:
|
||||
updates["current_pass"] = current_pass
|
||||
|
||||
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
|
||||
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
|
||||
|
||||
await db.execute(query, {**updates, "job_id": job_id})
|
||||
await db.commit()
|
||||
|
||||
async def set_ffmpeg_pid(self, job_id: str, pid: int):
|
||||
"""Update the ffmpeg PID for a job"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?",
|
||||
(pid, job_id)
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get a single job by ID"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"SELECT * FROM compression_jobs WHERE job_id = ?",
|
||||
(job_id,)
|
||||
) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if row:
|
||||
return dict(row)
|
||||
return None
|
||||
|
||||
async def get_all_jobs(self) -> List[Dict[str, Any]]:
|
||||
"""Get all jobs from the database"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"SELECT * FROM compression_jobs ORDER BY created_at DESC"
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]:
|
||||
"""Get all jobs with a specific status"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC",
|
||||
(status,)
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def get_interrupted_jobs(self) -> List[Dict[str, Any]]:
|
||||
"""Get jobs that were interrupted (status=processing/validating)"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"""SELECT * FROM compression_jobs
|
||||
WHERE status IN ('processing', 'validating')
|
||||
ORDER BY created_at ASC"""
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def delete_job(self, job_id: str):
|
||||
"""Delete a job from the database"""
|
||||
await self.initialize()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"DELETE FROM compression_jobs WHERE job_id = ?",
|
||||
(job_id,)
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS):
|
||||
"""Delete completed/cancelled jobs older than retention period"""
|
||||
await self.initialize()
|
||||
|
||||
cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60)
|
||||
cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat()
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
result = await db.execute("""
|
||||
DELETE FROM compression_jobs
|
||||
WHERE status IN ('completed', 'cancelled')
|
||||
AND completed_at < ?
|
||||
""", (cutoff_iso,))
|
||||
|
||||
deleted_count = result.rowcount
|
||||
await db.commit()
|
||||
|
||||
if deleted_count > 0:
|
||||
logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)")
|
||||
|
||||
return deleted_count
|
||||
137
backend/main.py
137
backend/main.py
@@ -16,6 +16,11 @@ import mimetypes
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
from compression import CompressionManager
|
||||
from filesystem_health import FilesystemHealthChecker
|
||||
from process_utils import (
|
||||
kill_process_safely,
|
||||
cleanup_temp_files,
|
||||
find_orphaned_ffmpeg_processes
|
||||
)
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -27,6 +32,10 @@ STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for video streaming
|
||||
SSE_UPDATE_INTERVAL = 0.5 # Update every 500ms
|
||||
CACHE_TTL_SECONDS = 60 # Cache directory listings for 60 seconds
|
||||
|
||||
# Job recovery configuration
|
||||
AUTO_RESTART_INTERRUPTED_JOBS = os.getenv('AUTO_RESTART_INTERRUPTED_JOBS', 'true').lower() == 'true'
|
||||
CLEANUP_ORPHANED_PROCESSES = os.getenv('CLEANUP_ORPHANED_PROCESSES', 'true').lower() == 'true'
|
||||
|
||||
# Base path for footages
|
||||
FOOTAGES_PATH = Path("/footages")
|
||||
|
||||
@@ -51,16 +60,26 @@ class SimpleCache:
|
||||
def clear(self):
|
||||
self.cache.clear()
|
||||
|
||||
def invalidate(self, pattern: str = None):
|
||||
"""Invalidate cache entries matching pattern or all if pattern is None"""
|
||||
if pattern is None:
|
||||
self.cache.clear()
|
||||
else:
|
||||
keys_to_delete = [k for k in self.cache.keys() if pattern in k]
|
||||
for key in keys_to_delete:
|
||||
del self.cache[key]
|
||||
|
||||
directory_cache = SimpleCache()
|
||||
|
||||
# Initialize filesystem health checker first (compression manager needs it)
|
||||
filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH)
|
||||
|
||||
# Initialize compression manager with health checker
|
||||
# Initialize compression manager with health checker and cache callback
|
||||
compression_manager = CompressionManager(
|
||||
max_concurrent=1,
|
||||
allowed_base_path=FOOTAGES_PATH,
|
||||
health_checker=filesystem_health_checker
|
||||
health_checker=filesystem_health_checker,
|
||||
cache_invalidation_callback=directory_cache.invalidate
|
||||
)
|
||||
|
||||
# CORS middleware for frontend communication
|
||||
@@ -96,6 +115,116 @@ async def get_file_info(file_path: Path) -> Dict:
|
||||
}
|
||||
|
||||
|
||||
async def recover_compression_jobs():
|
||||
"""
|
||||
Recover compression jobs from database after app restart/crash
|
||||
|
||||
This function:
|
||||
1. Loads all jobs from database
|
||||
2. Handles interrupted jobs (processing/validating)
|
||||
3. Cleans up orphaned ffmpeg processes
|
||||
4. Restarts pending jobs automatically
|
||||
"""
|
||||
logger.info("=" * 60)
|
||||
logger.info("Starting compression job recovery...")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# Initialize persistence layer
|
||||
await compression_manager.initialize_persistence()
|
||||
|
||||
# Load all jobs from database
|
||||
await compression_manager.load_jobs_from_database()
|
||||
|
||||
# Get interrupted jobs (were processing when app crashed)
|
||||
interrupted_jobs = await compression_manager.persistence.get_interrupted_jobs()
|
||||
|
||||
if interrupted_jobs:
|
||||
logger.info(f"Found {len(interrupted_jobs)} interrupted jobs")
|
||||
|
||||
for job_data in interrupted_jobs:
|
||||
job_id = job_data['job_id']
|
||||
file_path = job_data['file_path']
|
||||
ffmpeg_pid = job_data.get('ffmpeg_pid')
|
||||
|
||||
logger.info(f"Processing interrupted job {job_id}: {Path(file_path).name}")
|
||||
|
||||
# Kill orphaned ffmpeg process if it exists and is verified as ours
|
||||
if ffmpeg_pid and CLEANUP_ORPHANED_PROCESSES:
|
||||
killed = kill_process_safely(ffmpeg_pid, file_path, timeout=10)
|
||||
if killed:
|
||||
logger.info(f" ✓ Killed orphaned ffmpeg process PID {ffmpeg_pid}")
|
||||
else:
|
||||
logger.warning(f" ⚠ Could not verify/kill PID {ffmpeg_pid}")
|
||||
|
||||
# Clean up temp files
|
||||
cleaned = cleanup_temp_files(file_path)
|
||||
if cleaned > 0:
|
||||
logger.info(f" ✓ Cleaned up {cleaned} temp file(s)")
|
||||
|
||||
# Decide what to do with interrupted job
|
||||
if AUTO_RESTART_INTERRUPTED_JOBS:
|
||||
# Restart the job
|
||||
logger.info(f" ⟳ Restarting interrupted job...")
|
||||
await compression_manager.persistence.update_job_status(
|
||||
job_id, "pending", progress=0.0
|
||||
)
|
||||
# Update in-memory job status
|
||||
if job_id in compression_manager.jobs:
|
||||
compression_manager.jobs[job_id].status = "pending"
|
||||
compression_manager.jobs[job_id].progress = 0.0
|
||||
compression_manager.jobs[job_id].ffmpeg_pid = None
|
||||
else:
|
||||
# Mark as failed
|
||||
logger.info(f" ✗ Marking as failed (auto-restart disabled)")
|
||||
await compression_manager.persistence.update_job_status(
|
||||
job_id, "failed",
|
||||
error="Job interrupted by application restart/crash"
|
||||
)
|
||||
# Update in-memory job status
|
||||
if job_id in compression_manager.jobs:
|
||||
compression_manager.jobs[job_id].status = "failed"
|
||||
compression_manager.jobs[job_id].error = "Job interrupted by application restart/crash"
|
||||
compression_manager.jobs[job_id].ffmpeg_pid = None
|
||||
else:
|
||||
logger.info("No interrupted jobs found")
|
||||
|
||||
# Get pending jobs
|
||||
pending_jobs = await compression_manager.persistence.get_jobs_by_status("pending")
|
||||
|
||||
if pending_jobs:
|
||||
logger.info(f"Found {len(pending_jobs)} pending jobs - will be processed automatically")
|
||||
|
||||
# Restart pending jobs
|
||||
for job_data in pending_jobs:
|
||||
job_id = job_data['job_id']
|
||||
if job_id in compression_manager.jobs:
|
||||
job = compression_manager.jobs[job_id]
|
||||
# Only restart if not already in active jobs
|
||||
if job_id not in compression_manager.active_jobs:
|
||||
logger.info(f" ⟳ Restarting pending job: {Path(job.file_path).name}")
|
||||
task = asyncio.create_task(compression_manager.compress_video(job))
|
||||
compression_manager.active_jobs[job_id] = task
|
||||
task.add_done_callback(lambda t: compression_manager.active_jobs.pop(job_id, None))
|
||||
else:
|
||||
logger.info("No pending jobs to restart")
|
||||
|
||||
# Optional: Find any other orphaned ffmpeg processes in our path
|
||||
if CLEANUP_ORPHANED_PROCESSES:
|
||||
orphaned = find_orphaned_ffmpeg_processes(FOOTAGES_PATH)
|
||||
if orphaned:
|
||||
logger.warning(f"Found {len(orphaned)} untracked ffmpeg processes in our path:")
|
||||
for proc in orphaned:
|
||||
logger.warning(f" PID {proc['pid']}: {proc['cmdline'][:100]}...")
|
||||
logger.warning(f" ⚠ NOT killing (not in our database) - manual review recommended")
|
||||
|
||||
# Clean up old completed/cancelled jobs
|
||||
deleted = await compression_manager.persistence.cleanup_old_jobs()
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("Compression job recovery complete")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""Run startup tasks"""
|
||||
@@ -112,6 +241,10 @@ async def startup_event():
|
||||
|
||||
# Start background monitoring
|
||||
filesystem_health_checker.start_monitoring()
|
||||
|
||||
# Recover compression jobs from database
|
||||
await recover_compression_jobs()
|
||||
|
||||
logger.info("Application startup complete")
|
||||
|
||||
|
||||
|
||||
211
backend/process_utils.py
Normal file
211
backend/process_utils.py
Normal file
@@ -0,0 +1,211 @@
|
||||
import psutil
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def process_exists(pid: int) -> bool:
|
||||
"""Check if a process with given PID exists"""
|
||||
try:
|
||||
return psutil.pid_exists(pid)
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking if PID {pid} exists: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_process_info(pid: int) -> Optional[dict]:
|
||||
"""Get detailed information about a process"""
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
return {
|
||||
'pid': pid,
|
||||
'name': proc.name(),
|
||||
'cmdline': proc.cmdline(),
|
||||
'cwd': proc.cwd(),
|
||||
'status': proc.status(),
|
||||
'create_time': proc.create_time()
|
||||
}
|
||||
except psutil.NoSuchProcess:
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting process info for PID {pid}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def is_our_ffmpeg_process(pid: int, expected_file_path: str) -> bool:
|
||||
"""
|
||||
Verify if a process is our ffmpeg process by checking:
|
||||
1. Process name is ffmpeg
|
||||
2. Command line contains our input file path
|
||||
3. Working directory is accessible
|
||||
|
||||
Args:
|
||||
pid: Process ID to check
|
||||
expected_file_path: The file path this ffmpeg should be processing
|
||||
|
||||
Returns:
|
||||
True if verified as our ffmpeg process, False otherwise
|
||||
"""
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
|
||||
# Check 1: Process name must be ffmpeg
|
||||
process_name = proc.name().lower()
|
||||
if 'ffmpeg' not in process_name:
|
||||
logger.debug(f"PID {pid}: Not an ffmpeg process (name: {process_name})")
|
||||
return False
|
||||
|
||||
# Check 2: Command line must contain our file path
|
||||
cmdline = proc.cmdline()
|
||||
cmdline_str = ' '.join(cmdline)
|
||||
|
||||
# The file path should appear in the command line
|
||||
if expected_file_path not in cmdline_str:
|
||||
logger.debug(f"PID {pid}: File path '{expected_file_path}' not in command line")
|
||||
return False
|
||||
|
||||
# Check 3: Should have -i flag (input file) and -progress flag (our marker)
|
||||
has_input_flag = '-i' in cmdline
|
||||
has_progress_flag = '-progress' in cmdline or 'progress' in cmdline_str
|
||||
|
||||
if not (has_input_flag and has_progress_flag):
|
||||
logger.debug(f"PID {pid}: Missing expected ffmpeg flags")
|
||||
return False
|
||||
|
||||
logger.info(f"PID {pid}: Verified as our ffmpeg process for '{expected_file_path}'")
|
||||
return True
|
||||
|
||||
except psutil.NoSuchProcess:
|
||||
logger.debug(f"PID {pid}: Process no longer exists")
|
||||
return False
|
||||
except psutil.AccessDenied:
|
||||
logger.warning(f"PID {pid}: Access denied - cannot verify (won't kill)")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying PID {pid}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def kill_process_safely(pid: int, expected_file_path: str, timeout: int = 10) -> bool:
|
||||
"""
|
||||
Safely kill a process after verification
|
||||
|
||||
Args:
|
||||
pid: Process ID to kill
|
||||
expected_file_path: Expected file path to verify against
|
||||
timeout: Seconds to wait for graceful termination
|
||||
|
||||
Returns:
|
||||
True if process was killed successfully, False otherwise
|
||||
"""
|
||||
# Double-check verification before killing
|
||||
if not is_our_ffmpeg_process(pid, expected_file_path):
|
||||
logger.warning(f"PID {pid}: Failed verification - NOT killing for safety")
|
||||
return False
|
||||
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
|
||||
# Try graceful termination first (SIGTERM)
|
||||
logger.info(f"PID {pid}: Sending SIGTERM for graceful shutdown")
|
||||
proc.terminate()
|
||||
|
||||
try:
|
||||
# Wait for process to terminate
|
||||
proc.wait(timeout=timeout)
|
||||
logger.info(f"PID {pid}: Process terminated gracefully")
|
||||
return True
|
||||
except psutil.TimeoutExpired:
|
||||
# Force kill if graceful shutdown failed (SIGKILL)
|
||||
logger.warning(f"PID {pid}: Graceful shutdown timed out, sending SIGKILL")
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
logger.info(f"PID {pid}: Process force killed")
|
||||
return True
|
||||
|
||||
except psutil.NoSuchProcess:
|
||||
logger.info(f"PID {pid}: Process already terminated")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error killing PID {pid}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def cleanup_temp_files(file_path: str) -> int:
|
||||
"""
|
||||
Clean up temporary files associated with a compression job
|
||||
|
||||
Args:
|
||||
file_path: Original video file path
|
||||
|
||||
Returns:
|
||||
Number of files cleaned up
|
||||
"""
|
||||
cleaned_count = 0
|
||||
file_path_obj = Path(file_path)
|
||||
parent_dir = file_path_obj.parent
|
||||
|
||||
try:
|
||||
# Clean up temp file: temp_<filename>
|
||||
temp_file = parent_dir / f"temp_{file_path_obj.name}"
|
||||
if temp_file.exists():
|
||||
temp_file.unlink()
|
||||
logger.info(f"Cleaned up temp file: {temp_file.name}")
|
||||
cleaned_count += 1
|
||||
|
||||
# Clean up ffmpeg pass log file
|
||||
log_file = parent_dir / "ffmpeg2pass-0.log"
|
||||
if log_file.exists():
|
||||
log_file.unlink()
|
||||
logger.info(f"Cleaned up ffmpeg log: {log_file.name}")
|
||||
cleaned_count += 1
|
||||
|
||||
# Also check for mbtree file (created by x264)
|
||||
mbtree_file = parent_dir / "ffmpeg2pass-0.log.mbtree"
|
||||
if mbtree_file.exists():
|
||||
mbtree_file.unlink()
|
||||
logger.info(f"Cleaned up mbtree file: {mbtree_file.name}")
|
||||
cleaned_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up temp files for {file_path}: {e}")
|
||||
|
||||
return cleaned_count
|
||||
|
||||
|
||||
def find_orphaned_ffmpeg_processes(allowed_base_path: Path) -> List[dict]:
|
||||
"""
|
||||
Find all ffmpeg processes that are working on files within our base path
|
||||
This helps identify potential orphaned processes
|
||||
|
||||
Args:
|
||||
allowed_base_path: Base path where our videos are stored
|
||||
|
||||
Returns:
|
||||
List of process info dicts
|
||||
"""
|
||||
orphaned = []
|
||||
allowed_path_str = str(allowed_base_path.resolve())
|
||||
|
||||
try:
|
||||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||||
try:
|
||||
if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower():
|
||||
cmdline = proc.info.get('cmdline', [])
|
||||
cmdline_str = ' '.join(cmdline) if cmdline else ''
|
||||
|
||||
# Check if this ffmpeg is working on a file in our path
|
||||
if allowed_path_str in cmdline_str:
|
||||
orphaned.append({
|
||||
'pid': proc.info['pid'],
|
||||
'cmdline': cmdline_str,
|
||||
'name': proc.info['name']
|
||||
})
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Error finding orphaned processes: {e}")
|
||||
|
||||
return orphaned
|
||||
@@ -3,3 +3,5 @@ uvicorn[standard]==0.27.0
|
||||
python-multipart==0.0.6
|
||||
aiofiles==23.2.1
|
||||
sse-starlette==1.6.5
|
||||
aiosqlite==0.19.0
|
||||
psutil==5.9.8
|
||||
|
||||
@@ -166,7 +166,20 @@ function App() {
|
||||
return sorted
|
||||
}
|
||||
|
||||
const getSortedLocations = () => sortItems(locations, locationSort)
|
||||
const getSortedLocations = () => {
|
||||
const sorted = sortItems(locations, locationSort)
|
||||
// Separate dotfiles from regular locations
|
||||
const regularLocations = sorted.filter(loc => {
|
||||
const name = loc.name || loc
|
||||
return !name.startsWith('.')
|
||||
})
|
||||
const dotfiles = sorted.filter(loc => {
|
||||
const name = loc.name || loc
|
||||
return name.startsWith('.')
|
||||
})
|
||||
// Return regular locations first, then dotfiles at the bottom
|
||||
return [...regularLocations, ...dotfiles]
|
||||
}
|
||||
const getSortedDates = () => sortItems(dates, dateSort)
|
||||
const getSortedFiles = () => sortItems(files, fileSort)
|
||||
|
||||
@@ -209,9 +222,9 @@ function App() {
|
||||
await fetchJobs()
|
||||
const hasActiveJobs = jobs.some(j => ['pending', 'processing', 'validating'].includes(j.status))
|
||||
if (hasActiveJobs) {
|
||||
showToast('Compression job queued successfully!', 'success')
|
||||
showToast(`Compression queued (${percentage}% reduction)`, 'success')
|
||||
} else {
|
||||
showToast('Compression job started!', 'success')
|
||||
showToast(`Started compressing ${selectedFile.name} (${percentage}% reduction)`, 'success')
|
||||
}
|
||||
} else {
|
||||
const error = await response.json()
|
||||
@@ -363,12 +376,16 @@ function App() {
|
||||
<ul className="space-y-1">
|
||||
{getSortedLocations().map((location) => {
|
||||
const locationName = location.name || location
|
||||
const isDotfile = locationName.startsWith('.')
|
||||
return (
|
||||
<li key={locationName}>
|
||||
<button
|
||||
onClick={() => handleLocationClick(locationName)}
|
||||
className={`w-full text-left px-3 py-2 rounded hover:bg-blue-50 transition ${
|
||||
selectedLocation === locationName ? 'bg-blue-100 font-semibold' : ''
|
||||
onClick={() => !isDotfile && handleLocationClick(locationName)}
|
||||
disabled={isDotfile}
|
||||
className={`w-full text-left px-3 py-2 rounded transition ${
|
||||
isDotfile
|
||||
? 'text-gray-400 cursor-not-allowed bg-gray-50'
|
||||
: `hover:bg-blue-50 ${selectedLocation === locationName ? 'bg-blue-100 font-semibold' : ''}`
|
||||
}`}
|
||||
>
|
||||
{locationName}
|
||||
|
||||
Reference in New Issue
Block a user