Files
tello-commander/commander/routes/route_command.py
ALIHAN DIKEL 0b3523000c better logging
2023-05-19 03:00:52 +03:00

125 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio, time
import threading
from fastapi import APIRouter, Query, BackgroundTasks
from djitellopy import Tello
class FlightStatsCollector:
def __init__(self):
self.stats_thread = None
self.stop_event = threading.Event()
def start_collecting(self):
self.stats_thread = threading.Thread(target=self.collect_stats)
self.stats_thread.start()
def stop_collecting(self):
self.stop_event.set()
if self.stats_thread:
self.stats_thread.join()
def collect_stats(self):
while not self.stop_event.is_set():
tello.send_command_with_return("command")
bat = tello.get_battery()
temp = tello.get_temperature()
# wsnr = tello.query_wifi_signal_noise_ratio()
print(f"bat: {bat} - temp: {temp}") #- wsnr: {wsnr}")
time.sleep(3)
router = APIRouter()
tello = Tello()
stats_collector = FlightStatsCollector()
def land_on_low_battery():
bat_level = tello.get_battery()
if bat_level < 20:
tello.land()
@router.get("/takeoff")
def takeoff():
try:
tello.connect()
except Exception as e:
return {"msg": "command failed", "reason": "failed to connect"}
if tello.is_flying:
return {"msg": "command failed", "reason": "already flying"}
try:
tello.takeoff()
stats_collector.start_collecting()
except Exception as e:
return { "msg": "command failed", "reason": e }
if tello.is_flying:
return {"msg": "ok"}
else:
return {"msg": "takeoff failed"}
@router.get("/land")
def land():
if not tello.is_flying:
return {"msg": "command failed", "reason": "already on land"}
try:
tello.land()
stats_collector.stop_collecting()
except Exception as e:
return { "msg": "command failed", "reason": e }
if not tello.is_flying:
return {"msg": "ok"}
@router.get("/turn/{direction}/{degree}")
def turn(direction: str, degree: int):
if direction not in ["left", "right"]:
return {"msg": "command failed", "reason": "direction must be only left or right"}
if degree < 1 or degree > 360:
return {"msg": "command failed", "reason": "degree must be between 1 and 360"}
try:
if direction == "right":
tello.rotate_clockwise(degree)
elif direction == "left":
tello.rotate_counter_clockwise(degree)
except Exception as e:
return {"msg": "command failed", "reason": e}
return {"msg": "ok"}
@router.get("/move/{direction}/{distance}")
def turn(direction: str, distance: int):
if direction not in ["back", "forward", "left", "right", "up" , "down"]:
return {"msg": "command failed", "reason": "direction must be only back, forward, left, right up or down"}
if distance < 20 or distance > 250:
# tellonun kendi sınırı 500 ama nolur nolmaz diye kısıtladım
return {"msg": "command failed", "reason": "distance must be between 20 and 250"}
try:
tello.move(direction, distance)
except Exception as e:
return {"msg": "command failed", "reason": e}
return {"msg": "ok"}
@router.get("/emergency")
def emergency_stop():
try:
tello.emergency()
return {"msg": "ok"}
except Exception as e:
return {"msg": "command failed", "reason": e}
@router.get("/end")
def end_flight_session():
try:
tello.end()
return {"msg": "ok"}
except Exception as e:
return {"msg": "command failed", "reason": e}