Fix compression job watchdog timeout and error handling
- Increase watchdog timeout to 15 minutes (configurable via COMPRESSION_WATCHDOG_TIMEOUT env var)
- Fix stderr buffer overflow by setting 8MB limit and truncating to last 1000 lines
- Keep failed jobs visible in UI until manually removed (auto-prune only completed/cancelled)
- Add job removal endpoint: DELETE /api/compress/jobs/{job_id}?action=remove
Resolves issue where jobs stuck at 70% were killed prematurely due to stderr buffer overflow
and aggressive 5-minute timeout.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -20,9 +20,11 @@ MAX_CONCURRENT_JOBS = 2
|
||||
MAX_STORED_JOBS = 100
|
||||
MAX_PENDING_JOBS = 10 # Maximum jobs waiting in queue
|
||||
PRUNE_INTERVAL_SECONDS = 300 # 5 minutes
|
||||
WATCHDOG_TIMEOUT_SECONDS = 300 # 5 minutes
|
||||
WATCHDOG_TIMEOUT_SECONDS = int(os.getenv('COMPRESSION_WATCHDOG_TIMEOUT', '900')) # Default: 15 minutes
|
||||
WATCHDOG_CHECK_INTERVAL = 30 # 30 seconds
|
||||
STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB
|
||||
STDERR_READ_LIMIT = 8 * 1024 * 1024 # 8MB buffer limit for stderr
|
||||
MAX_STDERR_LINES = 1000 # Keep only last 1000 lines of stderr
|
||||
DEFAULT_AUDIO_BITRATE = 128 # kbps
|
||||
MIN_VIDEO_BITRATE = 100 # kbps
|
||||
|
||||
@@ -210,7 +212,8 @@ class CompressionManager:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
limit=STDERR_READ_LIMIT # Set buffer limit to prevent overflow
|
||||
)
|
||||
|
||||
# Progress tracking
|
||||
@@ -265,6 +268,11 @@ class CompressionManager:
|
||||
async for line in process.stderr:
|
||||
line_str = line.decode('utf-8', errors='ignore')
|
||||
stderr_output.append(line_str)
|
||||
|
||||
# Truncate stderr_output to keep only last MAX_STDERR_LINES lines
|
||||
if len(stderr_output) > MAX_STDERR_LINES:
|
||||
stderr_output[:] = stderr_output[-MAX_STDERR_LINES:]
|
||||
|
||||
# Log important stderr messages
|
||||
if 'error' in line_str.lower():
|
||||
logger.error(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
||||
@@ -370,14 +378,14 @@ class CompressionManager:
|
||||
self._pruning_task = asyncio.create_task(prune_periodically())
|
||||
|
||||
def _prune_old_jobs(self, max_jobs: int = MAX_STORED_JOBS):
|
||||
"""Remove oldest completed jobs if total exceeds max_jobs"""
|
||||
"""Remove oldest completed/cancelled jobs if total exceeds max_jobs. Failed jobs are kept visible."""
|
||||
if len(self.jobs) <= max_jobs:
|
||||
return
|
||||
|
||||
# Get inactive completed jobs sorted by time
|
||||
# Get inactive completed/cancelled jobs sorted by time (exclude failed jobs)
|
||||
inactive = [
|
||||
(jid, j) for jid, j in self.jobs.items()
|
||||
if jid not in self.active_jobs and j.status in ['completed', 'failed', 'cancelled']
|
||||
if jid not in self.active_jobs and j.status in ['completed', 'cancelled']
|
||||
]
|
||||
inactive.sort(key=lambda x: x[1].completed_at or x[1].created_at)
|
||||
|
||||
@@ -450,3 +458,14 @@ class CompressionManager:
|
||||
self.jobs[job_id].status = "cancelled"
|
||||
if job_id in self.active_jobs:
|
||||
self.active_jobs[job_id].cancel()
|
||||
|
||||
async def remove_job(self, job_id: str):
|
||||
"""Remove a job from the list (for failed/completed jobs)"""
|
||||
if job_id in self.jobs:
|
||||
job = self.jobs[job_id]
|
||||
# Only allow removal of inactive jobs
|
||||
if job_id not in self.active_jobs:
|
||||
async with self._jobs_lock:
|
||||
self.jobs.pop(job_id, None)
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -426,13 +426,27 @@ async def get_job_status(job_id: str):
|
||||
|
||||
|
||||
@app.delete("/api/compress/jobs/{job_id}")
|
||||
async def cancel_job(job_id: str):
|
||||
"""Cancel a compression job"""
|
||||
async def delete_job(job_id: str, action: str = "cancel"):
|
||||
"""Delete or cancel a compression job
|
||||
|
||||
Args:
|
||||
job_id: The job ID to delete
|
||||
action: 'cancel' to cancel a running job, 'remove' to remove from list
|
||||
"""
|
||||
if job_id not in compression_manager.jobs:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
|
||||
await compression_manager.cancel_job(job_id)
|
||||
return {"status": "cancelled"}
|
||||
if action == "remove":
|
||||
# Remove completed/failed job from list
|
||||
success = await compression_manager.remove_job(job_id)
|
||||
if success:
|
||||
return {"status": "removed"}
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="Cannot remove active job")
|
||||
else:
|
||||
# Cancel running job
|
||||
await compression_manager.cancel_job(job_id)
|
||||
return {"status": "cancelled"}
|
||||
|
||||
|
||||
@app.get("/api/compress/events")
|
||||
|
||||
@@ -8,6 +8,8 @@ services:
|
||||
container_name: drone-footage-backend
|
||||
volumes:
|
||||
- /home/uad/nextcloud/footages:/footages
|
||||
environment:
|
||||
- COMPRESSION_WATCHDOG_TIMEOUT=${COMPRESSION_WATCHDOG_TIMEOUT:-900}
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- drone-footage-network
|
||||
|
||||
Reference in New Issue
Block a user