import sqlite3 import asyncio import logging from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Any from contextlib import asynccontextmanager import aiosqlite import os logger = logging.getLogger(__name__) # Database configuration from environment DATABASE_PATH = os.getenv('JOB_DATABASE_PATH', '/footages/.compression_jobs.db') JOB_RETENTION_DAYS = int(os.getenv('JOB_RETENTION_DAYS', '7')) class JobPersistence: """SQLite-based persistence layer for compression jobs""" def __init__(self, db_path: str = DATABASE_PATH): self.db_path = db_path self._initialized = False self._lock = asyncio.Lock() async def initialize(self): """Initialize database schema""" if self._initialized: return async with self._lock: if self._initialized: return # Ensure directory exists db_dir = Path(self.db_path).parent db_dir.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(self.db_path) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS compression_jobs ( job_id TEXT PRIMARY KEY, file_path TEXT NOT NULL, reduce_percentage INTEGER NOT NULL, status TEXT NOT NULL, progress REAL DEFAULT 0.0, current_pass INTEGER DEFAULT 0, current_size_mb REAL, target_size_mb REAL, video_bitrate INTEGER, duration_seconds REAL, eta_seconds INTEGER, created_at TIMESTAMP NOT NULL, started_at TIMESTAMP, completed_at TIMESTAMP, output_file TEXT, error TEXT, ffmpeg_pid INTEGER ) """) # Create indexes for performance await db.execute(""" CREATE INDEX IF NOT EXISTS idx_status ON compression_jobs(status) """) await db.execute(""" CREATE INDEX IF NOT EXISTS idx_created_at ON compression_jobs(created_at) """) await db.commit() self._initialized = True logger.info(f"Job persistence initialized at {self.db_path}") async def save_job(self, job_data: Dict[str, Any]): """Insert or update a job in the database""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: # Convert datetime objects to ISO format strings job_data_copy = job_data.copy() for key in ['created_at', 'started_at', 'completed_at']: if key in job_data_copy and job_data_copy[key]: if isinstance(job_data_copy[key], datetime): job_data_copy[key] = job_data_copy[key].isoformat() await db.execute(""" INSERT OR REPLACE INTO compression_jobs ( job_id, file_path, reduce_percentage, status, progress, current_pass, current_size_mb, target_size_mb, video_bitrate, duration_seconds, eta_seconds, created_at, started_at, completed_at, output_file, error, ffmpeg_pid ) VALUES ( :job_id, :file_path, :reduce_percentage, :status, :progress, :current_pass, :current_size_mb, :target_size_mb, :video_bitrate, :duration_seconds, :eta_seconds, :created_at, :started_at, :completed_at, :output_file, :error, :ffmpeg_pid ) """, job_data_copy) await db.commit() async def update_job_status(self, job_id: str, status: str, progress: Optional[float] = None, error: Optional[str] = None): """Quick update for job status and progress""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: updates = {"status": status} if progress is not None: updates["progress"] = progress if error is not None: updates["error"] = error # Set completed_at for terminal states if status in ['completed', 'failed', 'cancelled']: updates["completed_at"] = datetime.now().isoformat() set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()]) query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id" await db.execute(query, {**updates, "job_id": job_id}) await db.commit() async def update_job_progress(self, job_id: str, progress: float, eta_seconds: Optional[int] = None, current_pass: Optional[int] = None): """Update job progress and ETA""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: updates = {"progress": progress} if eta_seconds is not None: updates["eta_seconds"] = eta_seconds if current_pass is not None: updates["current_pass"] = current_pass set_clause = ", ".join([f"{k} = :{k}" for k in updates.keys()]) query = f"UPDATE compression_jobs SET {set_clause} WHERE job_id = :job_id" await db.execute(query, {**updates, "job_id": job_id}) await db.commit() async def set_ffmpeg_pid(self, job_id: str, pid: int): """Update the ffmpeg PID for a job""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: await db.execute( "UPDATE compression_jobs SET ffmpeg_pid = ? WHERE job_id = ?", (pid, job_id) ) await db.commit() async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: """Get a single job by ID""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM compression_jobs WHERE job_id = ?", (job_id,) ) as cursor: row = await cursor.fetchone() if row: return dict(row) return None async def get_all_jobs(self) -> List[Dict[str, Any]]: """Get all jobs from the database""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM compression_jobs ORDER BY created_at DESC" ) as cursor: rows = await cursor.fetchall() return [dict(row) for row in rows] async def get_jobs_by_status(self, status: str) -> List[Dict[str, Any]]: """Get all jobs with a specific status""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM compression_jobs WHERE status = ? ORDER BY created_at ASC", (status,) ) as cursor: rows = await cursor.fetchall() return [dict(row) for row in rows] async def get_interrupted_jobs(self) -> List[Dict[str, Any]]: """Get jobs that were interrupted (status=processing/validating)""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( """SELECT * FROM compression_jobs WHERE status IN ('processing', 'validating') ORDER BY created_at ASC""" ) as cursor: rows = await cursor.fetchall() return [dict(row) for row in rows] async def delete_job(self, job_id: str): """Delete a job from the database""" await self.initialize() async with aiosqlite.connect(self.db_path) as db: await db.execute( "DELETE FROM compression_jobs WHERE job_id = ?", (job_id,) ) await db.commit() async def cleanup_old_jobs(self, retention_days: int = JOB_RETENTION_DAYS): """Delete completed/cancelled jobs older than retention period""" await self.initialize() cutoff_date = datetime.now().timestamp() - (retention_days * 24 * 60 * 60) cutoff_iso = datetime.fromtimestamp(cutoff_date).isoformat() async with aiosqlite.connect(self.db_path) as db: result = await db.execute(""" DELETE FROM compression_jobs WHERE status IN ('completed', 'cancelled') AND completed_at < ? """, (cutoff_iso,)) deleted_count = result.rowcount await db.commit() if deleted_count > 0: logger.info(f"Cleaned up {deleted_count} old jobs (older than {retention_days} days)") return deleted_count