This commit ensures that compression jobs survive Docker container restarts and are automatically recovered and restarted. Changes: - Modified CancelledError handler to preserve job status during shutdown - Jobs now keep 'processing' or 'validating' status instead of being marked as 'cancelled' when the app shuts down - Added job persistence layer using SQLite database - Implemented automatic job recovery on application startup - Added process cleanup utilities for orphaned ffmpeg processes - User-initiated cancellations still properly mark jobs as cancelled - Jobs are visible in frontend after recovery The recovery system: 1. Detects interrupted jobs (processing/validating status) 2. Cleans up orphaned ffmpeg processes and temp files 3. Restarts interrupted jobs from beginning 4. Maintains queue order and respects concurrency limits 5. Works with multiple jobs in the queue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
244 lines
9.3 KiB
Python
244 lines
9.3 KiB
Python
import sqlite3
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
from contextlib import asynccontextmanager
|
|
import aiosqlite
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database configuration from environment
|
|
DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db')
|
|
JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7'))
|
|
|
|
|
|
class JobPersistence:
|
|
"""SQLite-based persistence layer for compression jobs"""
|
|
|
|
def __init__(self, db_path: str = DATABASE_PATH):
|
|
self.db_path = db_path
|
|
self._initialized = False
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def initialize(self):
|
|
"""Initialize database schema"""
|
|
if self._initialized:
|
|
return
|
|
|
|
async with self._lock:
|
|
if self._initialized:
|
|
return
|
|
|
|
# Ensure directory exists
|
|
db_dir = Path(self.db_path).parent
|
|
db_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS compression_jobs (
|
|
job_id TEXT PRIMARY KEY,
|
|
file_path TEXT NOT NULL,
|
|
reduce_percentage INTEGER NOT NULL,
|
|
status TEXT NOT NULL,
|
|
progress REAL DEFAULT 0.0,
|
|
current_pass INTEGER DEFAULT 0,
|
|
current_size_mb REAL,
|
|
target_size_mb REAL,
|
|
video_bitrate INTEGER,
|
|
duration_seconds REAL,
|
|
eta_seconds INTEGER,
|
|
created_at TIMESTAMP NOT NULL,
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
output_file TEXT,
|
|
error TEXT,
|
|
ffmpeg_pid INTEGER
|
|
)
|
|
""")
|
|
|
|
# Create indexes for performance
|
|
await db.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_status
|
|
ON compression_jobs(status)
|
|
""")
|
|
|
|
await db.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_created_at
|
|
ON compression_jobs(created_at)
|
|
""")
|
|
|
|
await db.commit()
|
|
|
|
self._initialized = True
|
|
logger.info(f"Job persistence initialized at {self.db_path}")
|
|
|
|
async def save_job(self, job_data: Dict[str, Any]):
|
|
"""Insert or update a job in the database"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
# Convert datetime objects to ISO format strings
|
|
job_data_copy = job_data.copy()
|
|
for key in ['created_at', 'started_at', 'completed_at']:
|
|
if key in job_data_copy and job_data_copy[key]:
|
|
if isinstance(job_data_copy[key], datetime):
|
|
job_data_copy[key] = job_data_copy[key].isoformat()
|
|
|
|
await db.execute("""
|
|
INSERT OR REPLACE INTO compression_jobs (
|
|
job_id, file_path, reduce_percentage, status, progress,
|
|
current_pass, current_size_mb, target_size_mb, video_bitrate,
|
|
duration_seconds, eta_seconds, created_at, started_at,
|
|
completed_at, output_file, error, ffmpeg_pid
|
|
) VALUES (
|
|
:job_id, :file_path, :reduce_percentage, :status, :progress,
|
|
:current_pass, :current_size_mb, :target_size_mb, :video_bitrate,
|
|
:duration_seconds, :eta_seconds, :created_at, :started_at,
|
|
:completed_at, :output_file, :error, :ffmpeg_pid
|
|
)
|
|
""", job_data_copy)
|
|
await db.commit()
|
|
|
|
async def update_job_status(self, job_id: str, status: str,
|
|
progress: Optional[float] = None,
|
|
error: Optional[str] = None):
|
|
"""Quick update for job status and progress"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
updates = {"status": status}
|
|
if progress is not None:
|
|
updates["progress"] = progress
|
|
if error is not None:
|
|
updates["error"] = error
|
|
|
|
# Set completed_at for terminal states
|
|
if status in ['completed', 'failed', 'cancelled']:
|
|
updates["completed_at"] = datetime.now().isoformat()
|
|
|
|
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
|
|
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
|
|
|
|
await db.execute(query, {**updates, "job_id": job_id})
|
|
await db.commit()
|
|
|
|
async def update_job_progress(self, job_id: str, progress: float,
|
|
eta_seconds: Optional[int] = None,
|
|
current_pass: Optional[int] = None):
|
|
"""Update job progress and ETA"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
updates = {"progress": progress}
|
|
if eta_seconds is not None:
|
|
updates["eta_seconds"] = eta_seconds
|
|
if current_pass is not None:
|
|
updates["current_pass"] = current_pass
|
|
|
|
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
|
|
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
|
|
|
|
await db.execute(query, {**updates, "job_id": job_id})
|
|
await db.commit()
|
|
|
|
async def set_ffmpeg_pid(self, job_id: str, pid: int):
|
|
"""Update the ffmpeg PID for a job"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?",
|
|
(pid, job_id)
|
|
)
|
|
await db.commit()
|
|
|
|
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a single job by ID"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"SELECT * FROM compression_jobs WHERE job_id = ?",
|
|
(job_id,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
async def get_all_jobs(self) -> List[Dict[str, Any]]:
|
|
"""Get all jobs from the database"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"SELECT * FROM compression_jobs ORDER BY created_at DESC"
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]:
|
|
"""Get all jobs with a specific status"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC",
|
|
(status,)
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
async def get_interrupted_jobs(self) -> List[Dict[str, Any]]:
|
|
"""Get jobs that were interrupted (status=processing/validating)"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"""SELECT * FROM compression_jobs
|
|
WHERE status IN ('processing', 'validating')
|
|
ORDER BY created_at ASC"""
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
async def delete_job(self, job_id: str):
|
|
"""Delete a job from the database"""
|
|
await self.initialize()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"DELETE FROM compression_jobs WHERE job_id = ?",
|
|
(job_id,)
|
|
)
|
|
await db.commit()
|
|
|
|
async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS):
|
|
"""Delete completed/cancelled jobs older than retention period"""
|
|
await self.initialize()
|
|
|
|
cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60)
|
|
cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat()
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
result = await db.execute("""
|
|
DELETE FROM compression_jobs
|
|
WHERE status IN ('completed', 'cancelled')
|
|
AND completed_at < ?
|
|
""", (cutoff_iso,))
|
|
|
|
deleted_count = result.rowcount
|
|
await db.commit()
|
|
|
|
if deleted_count > 0:
|
|
logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)")
|
|
|
|
return deleted_count
|