Improve compression queue: Add resource limits and security

- Add concurrency limiting with semaphore (max 2 concurrent jobs)
- Add job pruning to prevent unbounded memory growth (max 100 jobs)
- Add file path validation to ensure files within allowed directory
- Fix ffmpeg2pass log cleanup to use source file directory
- Add SSE reconnect handler to re-sync jobs on connection restore

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Alihan
2025-10-12 19:48:46 +03:00
parent 752fa4eefd
commit 7cd79216fe
3 changed files with 101 additions and 69 deletions

View File

@@ -29,9 +29,11 @@ class CompressionJob:
class CompressionManager:
def __init__(self):
def __init__(self, max_concurrent: int = 2, allowed_base_path: Optional[Path] = None):
self.jobs: Dict[str, CompressionJob] = {}
self.active_jobs: Dict[str, asyncio.Task] = {}
self.semaphore = asyncio.Semaphore(max_concurrent)
self.allowed_base_path = allowed_base_path.resolve() if allowed_base_path else None
async def get_video_info(self, file_path: str) -> Dict:
"""Extract video duration and file size using ffprobe"""
@@ -71,77 +73,78 @@ class CompressionManager:
async def compress_video(self, job: CompressionJob):
"""Main compression function - two-pass encoding"""
try:
job.status = "processing"
job.started_at = datetime.now()
async with self.semaphore:
try:
job.status = "processing"
job.started_at = datetime.now()
# Get video information
info = await self.get_video_info(job.file_path)
job.current_size_mb = info['size_mb']
job.duration_seconds = info['duration_seconds']
# Get video information
info = await self.get_video_info(job.file_path)
job.current_size_mb = info['size_mb']
job.duration_seconds = info['duration_seconds']
# Calculate target size and bitrate
job.target_size_mb = job.current_size_mb * (1 - job.reduce_percentage / 100)
job.video_bitrate = self.calculate_bitrates(
job.current_size_mb,
job.target_size_mb,
job.duration_seconds
)
# Calculate target size and bitrate
job.target_size_mb = job.current_size_mb * (1 - job.reduce_percentage / 100)
job.video_bitrate = self.calculate_bitrates(
job.current_size_mb,
job.target_size_mb,
job.duration_seconds
)
print(f"Job {job.job_id}:")
print(f" Current Size: {job.current_size_mb:.2f} MB")
print(f" Target Size: {job.target_size_mb:.2f} MB")
print(f" Duration: {job.duration_seconds}s")
print(f" Video Bitrate: {job.video_bitrate} kbps")
print(f"Job {job.job_id}:")
print(f" Current Size: {job.current_size_mb:.2f} MB")
print(f" Target Size: {job.target_size_mb:.2f} MB")
print(f" Duration: {job.duration_seconds}s")
print(f" Video Bitrate: {job.video_bitrate} kbps")
# Generate output filename
file_path = Path(job.file_path)
temp_file = file_path.parent / f"temp_{file_path.name}"
output_file = file_path.parent / f"{file_path.stem}_compressed_{job.reduce_percentage}{file_path.suffix}"
# Generate output filename
file_path = Path(job.file_path)
temp_file = file_path.parent / f"temp_{file_path.name}"
output_file = file_path.parent / f"{file_path.stem}_compressed_{job.reduce_percentage}{file_path.suffix}"
# PASS 1: Analysis
job.current_pass = 1
await self.run_ffmpeg_pass1(job, temp_file)
# PASS 1: Analysis
job.current_pass = 1
await self.run_ffmpeg_pass1(job, temp_file)
if job.status == "cancelled":
if job.status == "cancelled":
self.cleanup_temp_files(job)
return
# PASS 2: Encoding
job.current_pass = 2
await self.run_ffmpeg_pass2(job, temp_file)
if job.status == "cancelled":
self.cleanup_temp_files(job)
return
# VALIDATION
job.status = "validating"
job.progress = 95.0
if await self.validate_video(temp_file):
# Move temp file to final output
os.rename(temp_file, output_file)
job.output_file = str(output_file)
job.status = "completed"
job.progress = 100.0
job.completed_at = datetime.now()
self.cleanup_temp_files(job)
print(f"Job {job.job_id} completed successfully")
else:
job.status = "failed"
job.error = "Validation failed: Compressed video is corrupted"
self.cleanup_temp_files(job)
print(f"Job {job.job_id} failed validation")
except asyncio.CancelledError:
job.status = "cancelled"
self.cleanup_temp_files(job)
return
# PASS 2: Encoding
job.current_pass = 2
await self.run_ffmpeg_pass2(job, temp_file)
if job.status == "cancelled":
self.cleanup_temp_files(job)
return
# VALIDATION
job.status = "validating"
job.progress = 95.0
if await self.validate_video(temp_file):
# Move temp file to final output
os.rename(temp_file, output_file)
job.output_file = str(output_file)
job.status = "completed"
job.progress = 100.0
job.completed_at = datetime.now()
self.cleanup_temp_files(job)
print(f"Job {job.job_id} completed successfully")
else:
print(f"Job {job.job_id} cancelled")
except Exception as e:
job.status = "failed"
job.error = "Validation failed: Compressed video is corrupted"
job.error = str(e)
self.cleanup_temp_files(job)
print(f"Job {job.job_id} failed validation")
except asyncio.CancelledError:
job.status = "cancelled"
self.cleanup_temp_files(job)
print(f"Job {job.job_id} cancelled")
except Exception as e:
job.status = "failed"
job.error = str(e)
self.cleanup_temp_files(job)
print(f"Job {job.job_id} failed: {e}")
print(f"Job {job.job_id} failed: {e}")
async def run_ffmpeg_pass1(self, job: CompressionJob, output_file: Path):
"""First pass: Analysis"""
@@ -310,16 +313,41 @@ class CompressionManager:
except Exception as e:
print(f"Failed to remove temp file: {e}")
# Remove ffmpeg pass log files
# Remove ffmpeg pass log files in same directory as source
try:
log_file = Path("ffmpeg2pass-0.log")
log_file = file_path.parent / "ffmpeg2pass-0.log"
if log_file.exists():
log_file.unlink()
except Exception as e:
print(f"Failed to remove log file: {e}")
def _prune_old_jobs(self, max_jobs: int = 100):
"""Remove oldest completed jobs if total exceeds max_jobs"""
if len(self.jobs) <= max_jobs:
return
# Get inactive completed jobs sorted by time
inactive = [
(jid, j) for jid, j in self.jobs.items()
if jid not in self.active_jobs and j.status in ['completed', 'failed', 'cancelled']
]
inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at)
# Remove oldest jobs beyond limit
for jid, _ in inactive[:len(self.jobs) - max_jobs]:
self.jobs.pop(jid, None)
async def start_compression(self, file_path: str, reduce_percentage: int) -> str:
"""Start a new compression job"""
# Validate path is within allowed directory
if self.allowed_base_path:
abs_path = Path(file_path).resolve()
if not abs_path.is_relative_to(self.allowed_base_path):
raise ValueError("Invalid file path: outside allowed directory")
# Prune old jobs before creating new one
self._prune_old_jobs()
job = CompressionJob(file_path, reduce_percentage)
self.jobs[job.job_id] = job

View File

@@ -14,8 +14,11 @@ from compression import CompressionManager
app = FastAPI(title="Drone Footage Manager API")
# Base path for footages
FOOTAGES_PATH = Path("/footages")
# Initialize compression manager
compression_manager = CompressionManager()
compression_manager = CompressionManager(max_concurrent=2, allowed_base_path=FOOTAGES_PATH)
# CORS middleware for frontend communication
app.add_middleware(
@@ -26,9 +29,6 @@ app.add_middleware(
allow_headers=["*"],
)
# Base path for footages
FOOTAGES_PATH = Path("/footages")
# Supported video and image extensions
VIDEO_EXTENSIONS = {".mp4", ".MP4", ".mov", ".MOV", ".avi", ".AVI"}
IMAGE_EXTENSIONS = {".jpg", ".JPG", ".jpeg", ".JPEG", ".png", ".PNG"}

View File

@@ -25,6 +25,10 @@ function CompressionPanel({ selectedFile, location, date }) {
const connectSSE = () => {
const eventSource = new EventSource(`${API_URL}/compress/events`)
eventSource.onopen = () => {
fetchJobs() // Re-sync jobs on connect/reconnect
}
eventSource.addEventListener('progress', (event) => {
const updates = JSON.parse(event.data)
setJobs(prevJobs => {