Fix critical deadlocks causing API to hang on second job

Fixed two separate deadlock issues preventing job queue from processing
multiple jobs sequentially:

**Deadlock #1: JobQueue lock ordering violation**
- Fixed _calculate_queue_positions() attempting to acquire _jobs_lock
  while already holding _queue_positions_lock
- Implemented snapshot pattern to avoid nested lock acquisition
- Updated submit_job() to properly separate lock acquisitions

**Deadlock #2: JobRepository non-reentrant lock bug**
- Fixed _flush_dirty_jobs_sync() trying to re-acquire _dirty_lock
  while already holding it (threading.Lock is not reentrant)
- Removed redundant lock acquisition since caller already holds lock

Additional improvements:
- Added comprehensive lock ordering documentation to JobQueue class
- Added detailed debug logging throughout job submission flow
- Enabled DEBUG logging in API server for troubleshooting

Testing: Successfully processed 3 consecutive jobs without hanging
This commit is contained in:
Alihan
2025-10-17 03:51:46 +03:00
parent 3c0f79645c
commit f6777b1488
3 changed files with 59 additions and 19 deletions

View File

@@ -125,7 +125,29 @@ class Job:
class JobQueue:
"""Manages job queue with background worker."""
"""
Manages job queue with background worker.
THREAD SAFETY & LOCK ORDERING
==============================
This class uses multiple locks to protect shared state. To prevent deadlocks,
all code MUST follow this strict lock ordering:
LOCK HIERARCHY (acquire in this order):
1. _jobs_lock - Protects _jobs dict and _current_job_id
2. _queue_positions_lock - Protects _queued_job_ids deque
RULES:
- NEVER acquire _jobs_lock while holding _queue_positions_lock
- Always release locks in reverse order of acquisition
- Keep lock hold time minimal - release before I/O operations
- Use snapshot/copy pattern when data must cross lock boundaries
CRITICAL METHODS:
- _calculate_queue_positions(): Uses snapshot pattern to avoid nested locks
- submit_job(): Acquires locks separately, never nested
- _worker_loop(): Acquires locks separately in correct order
"""
def __init__(self,
max_queue_size: int = 100,
@@ -297,6 +319,7 @@ class JobQueue:
# 3. Generate job_id
job_id = str(uuid.uuid4())
logger.debug(f"Generated job_id: {job_id}")
# 4. Create Job object
job = Job(
@@ -320,27 +343,38 @@ class JobQueue:
error=None,
processing_time_seconds=None,
)
logger.debug(f"Created Job object for {job_id}")
# 5. Add to queue (raises queue.Full if full)
logger.debug(f"Attempting to add job {job_id} to queue (current size: {self._queue.qsize()})")
try:
self._queue.put_nowait(job)
logger.debug(f"Successfully added job {job_id} to queue")
except queue.Full:
raise queue.Full(
f"Job queue is full (max size: {self._max_queue_size}). "
f"Please try again later."
)
# 6. Add to jobs dict, update queue tracking
# Capture ALL return data inside lock to prevent TOCTOU race
# 6. Add to jobs dict and update queue tracking
# LOCK ORDERING: Always acquire _jobs_lock before _queue_positions_lock
logger.debug(f"Acquiring _jobs_lock for job {job_id}")
with self._jobs_lock:
self._jobs[job_id] = job
logger.debug(f"Added job {job_id} to _jobs dict")
# Update queue positions (separate lock to avoid deadlock)
logger.debug(f"Acquiring _queue_positions_lock for job {job_id}")
with self._queue_positions_lock:
# Add to ordered queue for O(1) position tracking
self._queued_job_ids.append(job_id)
logger.debug(f"Added job {job_id} to _queued_job_ids, calling _calculate_queue_positions()")
# Calculate positions - this will briefly acquire _jobs_lock internally
self._calculate_queue_positions()
logger.debug(f"Finished _calculate_queue_positions() for job {job_id}")
# Capture return data while position is stable
# Capture return data (need to re-acquire lock after position calculation)
logger.debug(f"Re-acquiring _jobs_lock to capture return data for job {job_id}")
with self._jobs_lock:
return_data = {
"job_id": job_id,
@@ -349,9 +383,12 @@ class JobQueue:
"created_at": job.created_at.isoformat() + "Z"
}
queue_position = job.queue_position
logger.debug(f"Captured return data for job {job_id}, queue_position={queue_position}")
# Mark for async persistence (outside lock to avoid blocking)
logger.debug(f"Marking job {job_id} for persistence")
job.mark_for_persistence(self._repository)
logger.debug(f"Job {job_id} marked for persistence successfully")
logger.info(
f"Job {job_id} submitted: {audio_path} "
@@ -711,19 +748,25 @@ class JobQueue:
Optimized O(n) implementation using deque. Only updates positions
for jobs still in QUEUED status.
Note: Must be called with _queue_positions_lock held.
IMPORTANT: Must be called with _queue_positions_lock held.
Does NOT acquire _jobs_lock to avoid deadlock - uses snapshot approach.
"""
# Filter out jobs that are no longer queued (O(n))
valid_queued_ids = []
for job_id in self._queued_job_ids:
# Need jobs_lock to check status safely
with self._jobs_lock:
if job_id in self._jobs and self._jobs[job_id].status == JobStatus.QUEUED:
valid_queued_ids.append(job_id)
# Step 1: Create snapshot of job statuses (acquire lock briefly)
job_status_snapshot = {}
with self._jobs_lock:
for job_id in self._queued_job_ids:
if job_id in self._jobs:
job_status_snapshot[job_id] = self._jobs[job_id].status
# Step 2: Filter out jobs that are no longer queued (no lock needed)
valid_queued_ids = [
job_id for job_id in self._queued_job_ids
if job_id in job_status_snapshot and job_status_snapshot[job_id] == JobStatus.QUEUED
]
self._queued_job_ids = deque(valid_queued_ids)
# Update positions based on deque order (O(n))
# Step 3: Update positions (acquire lock briefly for each update)
for i, job_id in enumerate(self._queued_job_ids, start=1):
with self._jobs_lock:
if job_id in self._jobs:

View File

@@ -136,11 +136,8 @@ class JobRepository:
jobs_to_flush = list(self._dirty_jobs.values())
self._dirty_jobs.clear()
# Release lock before I/O
with self._dirty_lock:
pass # Just to ensure we have the lock
# Write jobs to disk
# Lock is already held by caller, do NOT re-acquire
# Write jobs to disk (no lock needed for I/O)
flush_count = 0
for job in jobs_to_flush:
try:

View File

@@ -37,7 +37,7 @@ from utils.input_validation import (
)
# Logging configuration
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Constants