Files
drone-footage-manager/backend/job_persistence.py
Alihan 92338df716 Fix compression job queue persistence and recovery after Docker restart
This commit ensures that compression jobs survive Docker container restarts
and are automatically recovered and restarted.

Changes:
- Modified CancelledError handler to preserve job status during shutdown
- Jobs now keep 'processing' or 'validating' status instead of being marked
  as 'cancelled' when the app shuts down
- Added job persistence layer using SQLite database
- Implemented automatic job recovery on application startup
- Added process cleanup utilities for orphaned ffmpeg processes
- User-initiated cancellations still properly mark jobs as cancelled
- Jobs are visible in frontend after recovery

The recovery system:
1. Detects interrupted jobs (processing/validating status)
2. Cleans up orphaned ffmpeg processes and temp files
3. Restarts interrupted jobs from beginning
4. Maintains queue order and respects concurrency limits
5. Works with multiple jobs in the queue

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 02:57:04 +03:00

244 lines
9.3 KiB
Python

import sqlite3
import asyncio
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any
from contextlib import asynccontextmanager
import aiosqlite
import os
logger = logging.getLogger(__name__)
# Database configuration from environment
DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db')
JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7'))
class JobPersistence:
"""SQLite-based persistence layer for compression jobs"""
def __init__(self, db_path: str = DATABASE_PATH):
self.db_path = db_path
self._initialized = False
self._lock = asyncio.Lock()
async def initialize(self):
"""Initialize database schema"""
if self._initialized:
return
async with self._lock:
if self._initialized:
return
# Ensure directory exists
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS compression_jobs (
job_id TEXT PRIMARY KEY,
file_path TEXT NOT NULL,
reduce_percentage INTEGER NOT NULL,
status TEXT NOT NULL,
progress REAL DEFAULT 0.0,
current_pass INTEGER DEFAULT 0,
current_size_mb REAL,
target_size_mb REAL,
video_bitrate INTEGER,
duration_seconds REAL,
eta_seconds INTEGER,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
completed_at TIMESTAMP,
output_file TEXT,
error TEXT,
ffmpeg_pid INTEGER
)
""")
# Create indexes for performance
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_status
ON compression_jobs(status)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_created_at
ON compression_jobs(created_at)
""")
await db.commit()
self._initialized = True
logger.info(f"Job persistence initialized at {self.db_path}")
async def save_job(self, job_data: Dict[str, Any]):
"""Insert or update a job in the database"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
# Convert datetime objects to ISO format strings
job_data_copy = job_data.copy()
for key in ['created_at', 'started_at', 'completed_at']:
if key in job_data_copy and job_data_copy[key]:
if isinstance(job_data_copy[key], datetime):
job_data_copy[key] = job_data_copy[key].isoformat()
await db.execute("""
INSERT OR REPLACE INTO compression_jobs (
job_id, file_path, reduce_percentage, status, progress,
current_pass, current_size_mb, target_size_mb, video_bitrate,
duration_seconds, eta_seconds, created_at, started_at,
completed_at, output_file, error, ffmpeg_pid
) VALUES (
:job_id, :file_path, :reduce_percentage, :status, :progress,
:current_pass, :current_size_mb, :target_size_mb, :video_bitrate,
:duration_seconds, :eta_seconds, :created_at, :started_at,
:completed_at, :output_file, :error, :ffmpeg_pid
)
""", job_data_copy)
await db.commit()
async def update_job_status(self, job_id: str, status: str,
progress: Optional[float] = None,
error: Optional[str] = None):
"""Quick update for job status and progress"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
updates = {"status": status}
if progress is not None:
updates["progress"] = progress
if error is not None:
updates["error"] = error
# Set completed_at for terminal states
if status in ['completed', 'failed', 'cancelled']:
updates["completed_at"] = datetime.now().isoformat()
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
await db.execute(query, {**updates, "job_id": job_id})
await db.commit()
async def update_job_progress(self, job_id: str, progress: float,
eta_seconds: Optional[int] = None,
current_pass: Optional[int] = None):
"""Update job progress and ETA"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
updates = {"progress": progress}
if eta_seconds is not None:
updates["eta_seconds"] = eta_seconds
if current_pass is not None:
updates["current_pass"] = current_pass
set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()])
query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id"
await db.execute(query, {**updates, "job_id": job_id})
await db.commit()
async def set_ffmpeg_pid(self, job_id: str, pid: int):
"""Update the ffmpeg PID for a job"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?",
(pid, job_id)
)
await db.commit()
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
"""Get a single job by ID"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM compression_jobs WHERE job_id = ?",
(job_id,)
) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_all_jobs(self) -> List[Dict[str, Any]]:
"""Get all jobs from the database"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM compression_jobs ORDER BY created_at DESC"
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]:
"""Get all jobs with a specific status"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC",
(status,)
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_interrupted_jobs(self) -> List[Dict[str, Any]]:
"""Get jobs that were interrupted (status=processing/validating)"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT * FROM compression_jobs
WHERE status IN ('processing', 'validating')
ORDER BY created_at ASC"""
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def delete_job(self, job_id: str):
"""Delete a job from the database"""
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"DELETE FROM compression_jobs WHERE job_id = ?",
(job_id,)
)
await db.commit()
async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS):
"""Delete completed/cancelled jobs older than retention period"""
await self.initialize()
cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60)
cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat()
async with aiosqlite.connect(self.db_path) as db:
result = await db.execute("""
DELETE FROM compression_jobs
WHERE status IN ('completed', 'cancelled')
AND completed_at < ?
""", (cutoff_iso,))
deleted_count = result.rowcount
await db.commit()
if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)")
return deleted_count