- Handle __root__ special case in compression start endpoint - Allow compression of videos not organized in date folders
508 lines
17 KiB
Python
508 lines
17 KiB
Python
from fastapi import FastAPI, HTTPException, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, StreamingResponse, Response
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
from pydantic import BaseModel
|
|
import os
|
|
from datetime import datetime
|
|
import aiofiles
|
|
import aiofiles.os
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import logging
|
|
from sse_starlette.sse import EventSourceResponse
|
|
from compression import CompressionManager
|
|
from filesystem_health import FilesystemHealthChecker
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="Drone Footage Manager API")
|
|
|
|
# Configuration constants
|
|
STREAM_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for video streaming
|
|
SSE_UPDATE_INTERVAL = 0.5 # Update every 500ms
|
|
CACHE_TTL_SECONDS = 60 # Cache directory listings for 60 seconds
|
|
|
|
# Base path for footages
|
|
FOOTAGES_PATH = Path("/footages")
|
|
|
|
# Simple in-memory cache for directory listings
|
|
class SimpleCache:
|
|
def __init__(self, ttl_seconds: int = CACHE_TTL_SECONDS):
|
|
self.cache: Dict[str, tuple[float, any]] = {}
|
|
self.ttl = ttl_seconds
|
|
|
|
def get(self, key: str) -> Optional[any]:
|
|
if key in self.cache:
|
|
timestamp, value = self.cache[key]
|
|
if time.time() - timestamp < self.ttl:
|
|
return value
|
|
else:
|
|
del self.cache[key]
|
|
return None
|
|
|
|
def set(self, key: str, value: any):
|
|
self.cache[key] = (time.time(), value)
|
|
|
|
def clear(self):
|
|
self.cache.clear()
|
|
|
|
directory_cache = SimpleCache()
|
|
|
|
# Initialize filesystem health checker first (compression manager needs it)
|
|
filesystem_health_checker = FilesystemHealthChecker(FOOTAGES_PATH)
|
|
|
|
# Initialize compression manager with health checker
|
|
compression_manager = CompressionManager(
|
|
max_concurrent=1,
|
|
allowed_base_path=FOOTAGES_PATH,
|
|
health_checker=filesystem_health_checker
|
|
)
|
|
|
|
# CORS middleware for frontend communication
|
|
ALLOWED_ORIGINS = os.getenv("ALLOWED_ORIGINS", "*").split(",")
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=ALLOWED_ORIGINS,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Supported video and image extensions
|
|
VIDEO_EXTENSIONS = {".mp4", ".MP4", ".mov", ".MOV", ".avi", ".AVI"}
|
|
IMAGE_EXTENSIONS = {".jpg", ".JPG", ".jpeg", ".JPEG", ".png", ".PNG"}
|
|
|
|
|
|
def is_media_file(filename: str) -> bool:
|
|
"""Check if file is a video or image"""
|
|
ext = Path(filename).suffix
|
|
return ext in VIDEO_EXTENSIONS or ext in IMAGE_EXTENSIONS
|
|
|
|
|
|
async def get_file_info(file_path: Path) -> Dict:
|
|
"""Get file metadata"""
|
|
stat = await aiofiles.os.stat(file_path)
|
|
return {
|
|
"name": file_path.name,
|
|
"size": stat.st_size,
|
|
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
"is_video": file_path.suffix in VIDEO_EXTENSIONS,
|
|
"is_image": file_path.suffix in IMAGE_EXTENSIONS,
|
|
}
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Run startup tasks"""
|
|
logger.info("Running startup tasks...")
|
|
|
|
# Perform initial filesystem health check
|
|
initial_status = await filesystem_health_checker.perform_health_check()
|
|
if initial_status["healthy"]:
|
|
logger.info("✓ Initial filesystem health check PASSED")
|
|
else:
|
|
logger.error(
|
|
f"✗ Initial filesystem health check FAILED: {initial_status['error']}"
|
|
)
|
|
|
|
# Start background monitoring
|
|
filesystem_health_checker.start_monitoring()
|
|
logger.info("Application startup complete")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Run shutdown tasks"""
|
|
logger.info("Shutting down...")
|
|
filesystem_health_checker.stop_monitoring()
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {"message": "Drone Footage Manager API", "status": "running"}
|
|
|
|
|
|
@app.get("/api/locations")
|
|
async def get_locations() -> List[Dict]:
|
|
"""Get list of all location folders with metadata"""
|
|
# Check cache first
|
|
cached = directory_cache.get("locations")
|
|
if cached is not None:
|
|
return cached
|
|
|
|
if not FOOTAGES_PATH.exists():
|
|
raise HTTPException(status_code=500, detail="Footages directory not found")
|
|
|
|
locations = []
|
|
for item in FOOTAGES_PATH.iterdir():
|
|
if item.is_dir():
|
|
stat = await aiofiles.os.stat(item)
|
|
locations.append({
|
|
"name": item.name,
|
|
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
|
|
})
|
|
|
|
# Cache the result
|
|
directory_cache.set("locations", locations)
|
|
return locations
|
|
|
|
|
|
@app.get("/api/locations/{location}/dates")
|
|
async def get_dates(location: str) -> List[Dict]:
|
|
"""Get list of date folders for a location with metadata"""
|
|
# Check cache first
|
|
cache_key = f"dates:{location}"
|
|
cached = directory_cache.get(cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# Sanitize path components to prevent traversal
|
|
if ".." in location or "/" in location:
|
|
raise HTTPException(status_code=400, detail="Invalid path characters")
|
|
|
|
location_path = (FOOTAGES_PATH / location).resolve()
|
|
|
|
# Ensure resolved path is still within FOOTAGES_PATH
|
|
try:
|
|
location_path.relative_to(FOOTAGES_PATH.resolve())
|
|
except ValueError:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
if not location_path.exists() or not location_path.is_dir():
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
dates = []
|
|
has_files_in_root = False
|
|
|
|
for item in location_path.iterdir():
|
|
if item.is_dir():
|
|
stat = await aiofiles.os.stat(item)
|
|
dates.append({
|
|
"name": item.name,
|
|
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
|
|
})
|
|
elif item.is_file():
|
|
has_files_in_root = True
|
|
|
|
# If no date folders but has files in root, return special marker
|
|
if not dates and has_files_in_root:
|
|
dates.append({
|
|
"name": "__root__",
|
|
"modified": None,
|
|
"message": "📁 Files not organized by date"
|
|
})
|
|
|
|
# Cache the result
|
|
directory_cache.set(cache_key, dates)
|
|
return dates
|
|
|
|
|
|
@app.get("/api/files/{location}/{date}")
|
|
async def get_files(location: str, date: str) -> List[Dict]:
|
|
"""Get list of files for a location and date"""
|
|
# Check cache first
|
|
cache_key = f"files:{location}:{date}"
|
|
cached = directory_cache.get(cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# Sanitize path components to prevent traversal
|
|
if ".." in location or ".." in date or "/" in location or "/" in date:
|
|
raise HTTPException(status_code=400, detail="Invalid path characters")
|
|
|
|
# Handle special __root__ marker for locations with files in root
|
|
if date == "__root__":
|
|
files_path = (FOOTAGES_PATH / location).resolve()
|
|
else:
|
|
files_path = (FOOTAGES_PATH / location / date).resolve()
|
|
|
|
# Ensure resolved path is still within FOOTAGES_PATH
|
|
try:
|
|
files_path.relative_to(FOOTAGES_PATH.resolve())
|
|
except ValueError:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
if not files_path.exists() or not files_path.is_dir():
|
|
raise HTTPException(status_code=404, detail="Path not found")
|
|
|
|
files = []
|
|
for item in sorted(files_path.iterdir()):
|
|
if item.is_file() and is_media_file(item.name):
|
|
files.append(await get_file_info(item))
|
|
|
|
# Cache the result
|
|
directory_cache.set(cache_key, files)
|
|
return files
|
|
|
|
|
|
@app.get("/api/stream/{location}/{date}/{filename}")
|
|
async def stream_video(location: str, date: str, filename: str, request: Request):
|
|
"""Stream video file with HTTP range request support for fast seeking"""
|
|
# Sanitize path components to prevent traversal
|
|
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date:
|
|
raise HTTPException(status_code=400, detail="Invalid path characters")
|
|
|
|
# Handle __root__ case (files not in date subdirectories)
|
|
if date == "__root__":
|
|
file_path = (FOOTAGES_PATH / location / filename).resolve()
|
|
else:
|
|
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
|
|
|
|
# Ensure resolved path is still within FOOTAGES_PATH
|
|
try:
|
|
file_path.relative_to(FOOTAGES_PATH.resolve())
|
|
except ValueError:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
if not file_path.exists() or not file_path.is_file():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
# Check if it's a video file
|
|
if file_path.suffix not in VIDEO_EXTENSIONS:
|
|
raise HTTPException(status_code=400, detail="Not a video file")
|
|
|
|
# Get file size
|
|
file_stat = await aiofiles.os.stat(file_path)
|
|
file_size = file_stat.st_size
|
|
|
|
# Parse range header
|
|
range_header = request.headers.get("range")
|
|
|
|
if range_header:
|
|
# Parse range header (e.g., "bytes=0-1023")
|
|
range_match = range_header.replace("bytes=", "").split("-")
|
|
start = int(range_match[0]) if range_match[0] else 0
|
|
end = int(range_match[1]) if range_match[1] else file_size - 1
|
|
end = min(end, file_size - 1)
|
|
|
|
# Calculate content length
|
|
content_length = end - start + 1
|
|
|
|
# Create streaming response
|
|
async def iterfile():
|
|
async with aiofiles.open(file_path, mode='rb') as f:
|
|
await f.seek(start)
|
|
remaining = content_length
|
|
|
|
while remaining > 0:
|
|
chunk = await f.read(min(STREAM_CHUNK_SIZE, remaining))
|
|
if not chunk:
|
|
break
|
|
remaining -= len(chunk)
|
|
yield chunk
|
|
|
|
headers = {
|
|
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(content_length),
|
|
"Content-Type": "video/mp4",
|
|
}
|
|
|
|
return StreamingResponse(
|
|
iterfile(),
|
|
status_code=206,
|
|
headers=headers,
|
|
media_type="video/mp4"
|
|
)
|
|
|
|
# No range header - return full file
|
|
return FileResponse(
|
|
file_path,
|
|
media_type="video/mp4",
|
|
headers={"Accept-Ranges": "bytes"}
|
|
)
|
|
|
|
|
|
@app.get("/api/image/{location}/{date}/{filename}")
|
|
async def get_image(location: str, date: str, filename: str):
|
|
"""Serve image file"""
|
|
# Sanitize path components to prevent traversal
|
|
if ".." in location or ".." in date or ".." in filename or "/" in location or "/" in date:
|
|
raise HTTPException(status_code=400, detail="Invalid path characters")
|
|
|
|
# Handle __root__ case (files not in date subdirectories)
|
|
if date == "__root__":
|
|
file_path = (FOOTAGES_PATH / location / filename).resolve()
|
|
else:
|
|
file_path = (FOOTAGES_PATH / location / date / filename).resolve()
|
|
|
|
# Ensure resolved path is still within FOOTAGES_PATH
|
|
try:
|
|
file_path.relative_to(FOOTAGES_PATH.resolve())
|
|
except ValueError:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
if not file_path.exists() or not file_path.is_file():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
# Check if it's an image file
|
|
if file_path.suffix not in IMAGE_EXTENSIONS:
|
|
raise HTTPException(status_code=400, detail="Not an image file")
|
|
|
|
# Determine media type
|
|
media_type = "image/jpeg" if file_path.suffix.lower() in {".jpg", ".jpeg"} else "image/png"
|
|
|
|
return FileResponse(file_path, media_type=media_type)
|
|
|
|
|
|
# ========== COMPRESSION API ENDPOINTS ==========
|
|
|
|
class CompressionRequest(BaseModel):
|
|
location: str
|
|
date: str
|
|
filename: str
|
|
reduce_percentage: int
|
|
|
|
|
|
@app.post("/api/compress/start")
|
|
async def start_compression(request: CompressionRequest):
|
|
"""Start a compression job"""
|
|
if not 1 <= request.reduce_percentage <= 90:
|
|
raise HTTPException(status_code=400, detail="Percentage must be between 1-90")
|
|
|
|
# Handle __root__ case (files not in date subdirectories)
|
|
if request.date == "__root__":
|
|
file_path = FOOTAGES_PATH / request.location / request.filename
|
|
else:
|
|
file_path = FOOTAGES_PATH / request.location / request.date / request.filename
|
|
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
if file_path.suffix not in VIDEO_EXTENSIONS:
|
|
raise HTTPException(status_code=400, detail="File is not a video")
|
|
|
|
job_id = await compression_manager.start_compression(str(file_path), request.reduce_percentage)
|
|
|
|
return {"job_id": job_id, "status": "started"}
|
|
|
|
|
|
@app.get("/api/compress/jobs")
|
|
async def get_all_jobs():
|
|
"""Get all compression jobs"""
|
|
jobs = []
|
|
# Use snapshot to avoid race condition during iteration
|
|
for job in compression_manager.get_jobs_snapshot():
|
|
jobs.append({
|
|
"job_id": job.job_id,
|
|
"file_path": job.file_path,
|
|
"file_name": Path(job.file_path).name,
|
|
"reduce_percentage": job.reduce_percentage,
|
|
"status": job.status,
|
|
"progress": round(job.progress, 1),
|
|
"eta_seconds": job.eta_seconds,
|
|
"current_pass": job.current_pass,
|
|
"current_size_mb": round(job.current_size_mb, 2) if job.current_size_mb else None,
|
|
"target_size_mb": round(job.target_size_mb, 2) if job.target_size_mb else None,
|
|
"video_bitrate": job.video_bitrate,
|
|
"created_at": job.created_at.isoformat() if job.created_at else None,
|
|
"output_file": Path(job.output_file).name if job.output_file else None,
|
|
"error": job.error
|
|
})
|
|
return jobs
|
|
|
|
|
|
@app.get("/api/compress/jobs/{job_id}")
|
|
async def get_job_status(job_id: str):
|
|
"""Get status of specific compression job"""
|
|
if job_id not in compression_manager.jobs:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
job = compression_manager.jobs[job_id]
|
|
return {
|
|
"job_id": job.job_id,
|
|
"status": job.status,
|
|
"progress": round(job.progress, 1),
|
|
"eta_seconds": job.eta_seconds,
|
|
"current_pass": job.current_pass,
|
|
"output_file": Path(job.output_file).name if job.output_file else None,
|
|
"error": job.error
|
|
}
|
|
|
|
|
|
@app.delete("/api/compress/jobs/{job_id}")
|
|
async def cancel_job(job_id: str):
|
|
"""Cancel a compression job"""
|
|
if job_id not in compression_manager.jobs:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
await compression_manager.cancel_job(job_id)
|
|
return {"status": "cancelled"}
|
|
|
|
|
|
@app.get("/api/compress/events")
|
|
async def compression_events(request: Request):
|
|
"""Server-Sent Events endpoint for real-time progress updates"""
|
|
async def event_generator():
|
|
try:
|
|
while True:
|
|
# Check if client is still connected
|
|
if await request.is_disconnected():
|
|
break
|
|
|
|
# Send status of all active jobs (use snapshot to avoid race condition)
|
|
active_jobs = []
|
|
for job in compression_manager.get_jobs_snapshot():
|
|
if job.status in ["pending", "processing", "validating"]:
|
|
active_jobs.append({
|
|
"job_id": job.job_id,
|
|
"status": job.status,
|
|
"progress": round(job.progress, 1),
|
|
"eta_seconds": job.eta_seconds,
|
|
"current_pass": job.current_pass
|
|
})
|
|
|
|
if active_jobs:
|
|
yield {
|
|
"event": "progress",
|
|
"data": json.dumps(active_jobs)
|
|
}
|
|
|
|
await asyncio.sleep(SSE_UPDATE_INTERVAL)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return EventSourceResponse(event_generator())
|
|
|
|
|
|
# ========== SYSTEM HEALTH API ENDPOINTS ==========
|
|
|
|
@app.get("/api/system/health")
|
|
async def get_system_health():
|
|
"""Get current system health status"""
|
|
return filesystem_health_checker.get_status()
|
|
|
|
|
|
@app.get("/api/system/health/stream")
|
|
async def system_health_stream(request: Request):
|
|
"""Server-Sent Events endpoint for real-time health status updates"""
|
|
async def event_generator():
|
|
try:
|
|
while True:
|
|
# Check if client is still connected
|
|
if await request.is_disconnected():
|
|
break
|
|
|
|
# Send current health status
|
|
status = filesystem_health_checker.get_status()
|
|
yield {
|
|
"event": "health",
|
|
"data": json.dumps(status)
|
|
}
|
|
|
|
# Check every 5 seconds
|
|
await asyncio.sleep(5)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return EventSourceResponse(event_generator())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|