Implement WebSocket-based real-time video streaming

Replace FFmpeg UDP forwarding with WebSocket streaming for lower latency.

Changes:
- Add WebSocket endpoint at /stream/ws/video for video frame transmission
- Implement capture_video_frames worker using OpenCV to read Tello UDP stream
- Add Canvas element to web UI for video display
- Create WebSocket client in app.js with base64 JPEG decoding
- Start video capture worker on server lifespan
- Add CLAUDE.md documentation for codebase architecture

Technical details:
- Video frames captured at ~30 FPS from udp://192.168.10.1:11111
- Frames encoded as JPEG (80% quality) and base64-encoded
- WebSocket provides bidirectional persistent connection for minimal latency
- Auto-reconnection on WebSocket disconnect
- Video worker waits for Tello connection and command mode before streaming

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
ALIHAN DIKEL
2025-10-19 02:29:49 +03:00
parent 2148c92a92
commit d621032fef
6 changed files with 256 additions and 35 deletions

123
CLAUDE.md Normal file
View File

@@ -0,0 +1,123 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Tello Commander is a FastAPI-based control system for DJI Tello drones. The system provides a web UI for controlling the drone and manages WiFi connectivity, video streaming, and flight telemetry. It runs on Raspberry Pi and Linux systems with supervisor for process management.
## Development Commands
### Running the Server
**Local development:**
```bash
export $(grep -v '^#' env | xargs) && python src/server/server.py
```
**Production (via supervisor on Raspberry Pi):**
```bash
# Manage remote server
./scripts/manage_server.sh {start|stop|restart|status|update|logs}
# View logs
./scripts/manage_server.sh logs
```
### Deployment
**Pull latest changes:**
```bash
./scripts/pull_from_origin.sh
```
**Update supervisor service:**
```bash
./scripts/manage_server.sh update
```
## Architecture
### Core Components
**Server Entry Point (`src/server/server.py`):**
- FastAPI application with lifespan management
- Initializes shared state with Tello instance and connection status
- Spawns background workers: `maintain_connection_to_tello` and `collect_flight_stats`
- Serves static files from `src/client/static` and templates from `src/client/templates`
- Runs on port 8001 with reload enabled
**Background Workers (`src/server/workers.py`):**
- `maintain_connection_to_tello`: Continuously manages WiFi connection to Tello via dhclient (wlan1 interface). Sets `state["connection"]` to "OK" or "NOK"
- `collect_flight_stats`: Polls battery, temperature, and barometer data. Enters command mode when connected. Updates `state["stats"]`
- `forward_video_stream`: FFmpeg-based video forwarding from Tello (UDP 11111) to client
- Uses ThreadPoolExecutor for blocking I/O operations
**Routers:**
- `router_main.py`: Root endpoint, UI endpoint (`/ui`), status endpoint (`/status`), reconnect trigger
- `router_command.py`: Flight commands (`/takeoff`, `/land`, `/turn`, `/move`, `/emergency`, `/end`)
- `router_stream.py`: Video streaming control (`/streamon`)
- All routers access shared state via `request.app.state.shared_state`
**Connection Management (`src/server/services/connections.py`):**
- `release_and_renew`: Uses dhclient to release/renew IP on wlan1 interface
- `check_dhcp_ip`: Validates DHCP assignment matches expected Tello IP (from env `TELLO_STATION_IP`)
- Designed for Linux network management via subprocess calls
### State Management
Shared state dictionary stored in `app.state.shared_state`:
- `connection`: "genesis" | "OK" | "NOK" - WiFi connection status to Tello
- `tello`: DJI Tello instance (djitellopy)
- `command`: Boolean - Whether Tello is in command mode
- `stats`: Dict with `bat`, `temp`, `baro` keys
- `streamon`: Boolean - Video stream status
### Network Configuration
Environment variables (defined in `env` file):
- `TELLO_SSID_NAME`: Tello WiFi network name
- `TELLO_SSID_PASS`: Tello WiFi password
- `TELLO_STATION_IP`: Expected IP address for this station on Tello network (e.g., 192.168.10.4)
- `DONGLE_IFNAME`: WiFi interface name (wlan1)
### Deployment Details
**Supervisor Configuration (`src/server/supervisor/tello-server.conf`):**
- Service runs as user `uad`
- Working directory: `/home/uad/tello-commander`
- Uses CPU affinity (taskset) to distribute load
- Auto-restart enabled
- Logs written to supervisor directory
**Remote Access:**
- Default server IP: `192.168.1.219:8001`
- Web UI: `http://192.168.1.219:8001/ui`
- Managed via SSH (`uad@192.168.1.219`)
## Important Patterns
**Router Pattern:**
All routers are registered through `routers/base.py` which creates a single `api_router`. This router is imported in `server.py` during lifespan startup to avoid circular imports.
**Connection Recovery:**
When `state["connection"]` becomes "NOK", the maintain_connection worker automatically attempts reconnection via dhclient release/renew cycles. Manual reconnection can be triggered via `/reconnect` endpoint.
**Barometer Calibration:**
Barometer readings are calibrated in `workers.py` using `get_calibrated_altitude` with a hardcoded offset (39 meters for "tuncel yerde").
## Dependencies
Key packages (from `requirements-dev.txt`):
- `fastapi==0.112.0` - Web framework
- `uvicorn==0.30.5` - ASGI server
- `djitellopy==2.5.0` - Tello SDK wrapper
- `loguru==0.7.2` - Logging
- `nmcli==1.3.0` - NetworkManager CLI wrapper
- `opencv-python==4.10.0.84` - Video processing
- `av==12.3.0` - Video codec bindings
- `ffmpegio` - FFmpeg wrapper
## Current Branch
Development occurs on `refactor/v2` branch.

36
src/client/static/app.js Normal file → Executable file
View File

@@ -1,6 +1,7 @@
document.addEventListener('DOMContentLoaded', () => {
// Base API URL
const API_BASE_URL = 'http://192.168.1.219:8001';
const WS_BASE_URL = 'ws://192.168.1.219:8001';
// Control Buttons
const reconnectBtn = document.getElementById('reconnect-btn');
@@ -19,6 +20,41 @@ document.addEventListener('DOMContentLoaded', () => {
const baroElem = document.getElementById('baro');
const tempElem = document.getElementById('temp');
// Video Canvas
const videoCanvas = document.getElementById('videoCanvas');
const ctx = videoCanvas.getContext('2d');
// WebSocket for Video Streaming
let videoWs = null;
const connectVideoWebSocket = () => {
videoWs = new WebSocket(`${WS_BASE_URL}/stream/ws/video`);
videoWs.onopen = () => {
console.log('Video WebSocket connected');
};
videoWs.onmessage = (event) => {
// Decode base64 JPEG and draw to canvas
const img = new Image();
img.onload = () => {
ctx.drawImage(img, 0, 0, videoCanvas.width, videoCanvas.height);
};
img.src = 'data:image/jpeg;base64,' + event.data;
};
videoWs.onerror = (error) => {
console.error('Video WebSocket error:', error);
};
videoWs.onclose = () => {
console.log('Video WebSocket closed, reconnecting in 3s...');
setTimeout(connectVideoWebSocket, 3000);
};
};
// Connect to video stream
connectVideoWebSocket();
// Command Functions
const sendCommand = async (endpoint) => {
try {

6
src/client/templates/index.html Normal file → Executable file
View File

@@ -18,9 +18,9 @@
</head>
<body>
<div class="container mt-4">
<!-- Placeholder for Video Stream or Additional Content -->
<div class="video-container">
<!--<img src="/streaming/video_feed" alt="Drone Video Stream" class="img-fluid">-->
<!-- Video Stream Canvas -->
<div class="video-container mb-3">
<canvas id="videoCanvas" width="960" height="720" style="width: 100%; max-width: 960px; border: 2px solid #333;"></canvas>
</div>
<!-- Status Indicators -->

28
src/server/routers/router_stream.py Normal file → Executable file
View File

@@ -1,4 +1,6 @@
from fastapi import APIRouter, Request
import asyncio
import base64
from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect
from loguru import logger
router = APIRouter()
@@ -13,3 +15,27 @@ def start_video_stream(request: Request):
except Exception as e:
logger.error(f"failed to start stream - {e}")
return {"msg": "error", "reason": f"failed to start stream - {e}"}
@router.websocket("/ws/video")
async def websocket_video_endpoint(websocket: WebSocket):
await websocket.accept()
logger.info("Video WebSocket client connected")
try:
shared_state = websocket.app.state.shared_state
while True:
# Wait for a new frame to be available
if "video_frame" in shared_state and shared_state["video_frame"] is not None:
frame_data = shared_state["video_frame"]
# Send base64 encoded JPEG to client
await websocket.send_text(frame_data)
# Small delay to avoid overwhelming the connection
await asyncio.sleep(0.033) # ~30 FPS max
except WebSocketDisconnect:
logger.info("Video WebSocket client disconnected")
except Exception as e:
logger.error(f"Video WebSocket error: {e}")

1
src/server/server.py Normal file → Executable file
View File

@@ -20,6 +20,7 @@ async def lifespan(app: FastAPI):
app.state.shared_state = state
asyncio.create_task(workers.maintain_connection_to_tello(state))
asyncio.create_task(workers.collect_flight_stats(state))
asyncio.create_task(workers.capture_video_frames(state))
from routers.base import api_router # Import api_router here to avoid circular import
app.include_router(api_router)
yield

97
src/server/workers.py Normal file → Executable file
View File

@@ -1,8 +1,10 @@
import asyncio
import os
import sys
import base64
from concurrent.futures import ThreadPoolExecutor
import cv2
import nmcli
import ffmpegio
from djitellopy import TelloException
@@ -80,41 +82,74 @@ async def collect_flight_stats(state):
await asyncio.sleep(1)
async def forward_video_stream(state):
loop = asyncio.get_event_loop()
async def capture_video_frames(state):
"""
Captures video frames from Tello's UDP stream and stores them in shared state
for WebSocket streaming to clients.
"""
tello = state["tello"]
state["streamon"] = False
state["video_frame"] = None
# tek seferlik 'streamon' komutu göndererek telloyu moda sok
if (state["connection"] == "OK" and state["command"] == True and state["streamon"] == False):
await loop.run_in_executor(executor, tello.send_command_with_return, "streamon")
state["streamon"] = True
try:
state_ffmpeg = ffmpegio.run(
ffmpeg_args=[
"-fflags", "nobuffer",
"-flags", "low_delay",
"-strict", "experimental",
"-analyzeduration", "0",
"-probesize", "32",
"-i", "udp://192.168.10.1:11111",
"-c", "copy",
"-f", "mpegts",
"-flush_packets", "1",
"-max_delay", "0",
"-f", "mpegts",
"udp://192.168.1.210:11111"
],
overwrite=True,
capture_log=True
)
logger.info("Video capture worker started")
if state_ffmpeg.returncode == 0:
print("FFmpeg stream completed successfully.")
else:
print(f"FFmpeg exited with return code {state_ffmpeg.returncode}.")
except Exception as e:
print(f"Unexpected error: {e}")
while True:
# Wait for connection and command mode
if state["connection"] != "OK" or not state["command"]:
await asyncio.sleep(1)
continue
# Send streamon command once
if not state["streamon"]:
try:
await asyncio.to_thread(tello.send_command_with_return, "streamon")
state["streamon"] = True
logger.success("Tello stream activated")
await asyncio.sleep(2) # Give stream time to start
except Exception as e:
logger.error(f"Failed to start Tello stream: {e}")
await asyncio.sleep(5)
continue
# Capture frames from UDP stream
try:
cap = cv2.VideoCapture("udp://192.168.10.1:11111", cv2.CAP_FFMPEG)
if not cap.isOpened():
logger.warning("Failed to open video stream, retrying...")
state["streamon"] = False
await asyncio.sleep(3)
continue
logger.info("Video stream opened successfully")
while state["connection"] == "OK" and state["streamon"]:
ret, frame = await asyncio.to_thread(cap.read)
if not ret:
logger.warning("Failed to read frame from stream")
break
# Encode frame as JPEG
ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
if not ret:
continue
# Convert to base64 for WebSocket transmission
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
state["video_frame"] = jpg_as_text
# Small delay to control frame rate
await asyncio.sleep(0.033) # ~30 FPS
cap.release()
logger.warning("Video capture stopped")
state["streamon"] = False
except Exception as e:
logger.error(f"Video capture error: {e}")
state["streamon"] = False
await asyncio.sleep(5)
async def get_calibrated_altitude(offset, measure):
"""