Fix compression job deadlock and add watchdog timer
Resolved critical issue where compression jobs would get stuck at random progress percentages (e.g., 35.5%) due to pipe buffer deadlock. **Root Cause:** - Python code only read ffmpeg's stdout for progress updates - ffmpeg's stderr pipe buffer (64KB) would fill with output - ffmpeg blocked writing to stderr, Python blocked reading stdout - Result: deadlock with job appearing stuck but ffmpeg still using CPU **Fixes:** - Read stdout and stderr concurrently using asyncio.gather() - Prevents pipe buffer deadlock by consuming both streams - Added watchdog timer to detect genuinely stuck jobs (5 min timeout) - Improved error logging with stderr capture - Better error messages showing exact failure reason **Additional Improvements:** - Watchdog sets job.error with informative message before killing - Captures last 50 lines of stderr on failure for debugging - Enhanced cancellation handling with multiple checkpoints Tested with previously stuck video file - progress now updates continuously throughout encoding process. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -177,7 +177,7 @@ class CompressionManager:
|
||||
await self.run_ffmpeg_with_progress(job, cmd, pass_num=2)
|
||||
|
||||
async def run_ffmpeg_with_progress(self, job: CompressionJob, cmd: list, pass_num: int):
|
||||
"""Run ffmpeg and track progress"""
|
||||
"""Run ffmpeg and track progress with concurrent stdout/stderr reading to prevent deadlock"""
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
@@ -187,42 +187,95 @@ class CompressionManager:
|
||||
# Progress tracking
|
||||
time_pattern = re.compile(r'out_time_ms=(\d+)')
|
||||
fps_pattern = re.compile(r'fps=([\d.]+)')
|
||||
stderr_output = []
|
||||
last_progress_update = datetime.now()
|
||||
|
||||
async for line in process.stdout:
|
||||
if job.status == "cancelled":
|
||||
process.kill()
|
||||
raise asyncio.CancelledError()
|
||||
async def read_stdout():
|
||||
"""Read and process stdout for progress updates"""
|
||||
nonlocal last_progress_update
|
||||
try:
|
||||
async for line in process.stdout:
|
||||
if job.status == "cancelled":
|
||||
process.kill()
|
||||
return
|
||||
|
||||
line_str = line.decode('utf-8', errors='ignore')
|
||||
line_str = line.decode('utf-8', errors='ignore')
|
||||
|
||||
# Extract time
|
||||
time_match = time_pattern.search(line_str)
|
||||
if time_match:
|
||||
current_time_ms = int(time_match.group(1))
|
||||
current_time_sec = current_time_ms / 1_000_000
|
||||
# Extract time
|
||||
time_match = time_pattern.search(line_str)
|
||||
if time_match:
|
||||
current_time_ms = int(time_match.group(1))
|
||||
current_time_sec = current_time_ms / 1_000_000
|
||||
|
||||
# Calculate progress for this pass (0-50% or 50-100%)
|
||||
pass_progress = (current_time_sec / job.duration_seconds) * 50
|
||||
if pass_num == 1:
|
||||
job.progress = min(pass_progress, 50)
|
||||
else:
|
||||
job.progress = min(50 + pass_progress, 95)
|
||||
|
||||
# Extract FPS for ETA calculation
|
||||
fps_match = fps_pattern.search(line_str)
|
||||
if fps_match:
|
||||
fps = float(fps_match.group(1))
|
||||
if fps > 0:
|
||||
remaining_sec = (job.duration_seconds - current_time_sec)
|
||||
# Calculate progress for this pass (0-50% or 50-100%)
|
||||
pass_progress = (current_time_sec / job.duration_seconds) * 50
|
||||
if pass_num == 1:
|
||||
remaining_sec = remaining_sec + job.duration_seconds # Account for both passes
|
||||
job.eta_seconds = int(remaining_sec)
|
||||
job.progress = min(pass_progress, 50)
|
||||
else:
|
||||
job.progress = min(50 + pass_progress, 95)
|
||||
|
||||
last_progress_update = datetime.now()
|
||||
|
||||
# Extract FPS for ETA calculation
|
||||
fps_match = fps_pattern.search(line_str)
|
||||
if fps_match:
|
||||
fps = float(fps_match.group(1))
|
||||
if fps > 0:
|
||||
remaining_sec = (job.duration_seconds - current_time_sec)
|
||||
if pass_num == 1:
|
||||
remaining_sec = remaining_sec + job.duration_seconds
|
||||
job.eta_seconds = int(remaining_sec)
|
||||
except Exception as e:
|
||||
print(f"Error reading stdout: {e}")
|
||||
|
||||
async def read_stderr():
|
||||
"""Read stderr concurrently to prevent pipe buffer deadlock"""
|
||||
try:
|
||||
async for line in process.stderr:
|
||||
line_str = line.decode('utf-8', errors='ignore')
|
||||
stderr_output.append(line_str)
|
||||
# Log important stderr messages
|
||||
if 'error' in line_str.lower() or 'warning' in line_str.lower():
|
||||
print(f"FFmpeg stderr (pass {pass_num}): {line_str.strip()}")
|
||||
except Exception as e:
|
||||
print(f"Error reading stderr: {e}")
|
||||
|
||||
async def watchdog():
|
||||
"""Monitor for stuck jobs - kills process if no progress for 5 minutes"""
|
||||
nonlocal last_progress_update
|
||||
while process.returncode is None:
|
||||
await asyncio.sleep(30) # Check every 30 seconds
|
||||
|
||||
if job.status == "cancelled":
|
||||
process.kill()
|
||||
return
|
||||
|
||||
time_since_update = (datetime.now() - last_progress_update).total_seconds()
|
||||
if time_since_update > 300: # 5 minutes without progress
|
||||
error_msg = f"Job stuck at {job.progress:.1f}% - no progress for {time_since_update:.0f} seconds. Process killed by watchdog."
|
||||
print(f"Watchdog: {error_msg}")
|
||||
job.status = "failed"
|
||||
job.error = error_msg
|
||||
process.kill()
|
||||
raise Exception(error_msg)
|
||||
|
||||
# Run stdout, stderr readers and watchdog concurrently to prevent deadlock
|
||||
try:
|
||||
await asyncio.gather(
|
||||
read_stdout(),
|
||||
read_stderr(),
|
||||
watchdog(),
|
||||
return_exceptions=False
|
||||
)
|
||||
except Exception as e:
|
||||
process.kill()
|
||||
raise
|
||||
|
||||
await process.wait()
|
||||
|
||||
if process.returncode != 0:
|
||||
stderr = await process.stderr.read()
|
||||
raise Exception(f"FFmpeg pass {pass_num} failed: {stderr.decode()}")
|
||||
stderr_text = ''.join(stderr_output[-50:]) # Last 50 lines of stderr
|
||||
raise Exception(f"FFmpeg pass {pass_num} failed with code {process.returncode}: {stderr_text}")
|
||||
|
||||
async def validate_video(self, file_path: Path) -> bool:
|
||||
"""Validate compressed video is not corrupted"""
|
||||
|
||||
Reference in New Issue
Block a user