import asyncio, time import threading from fastapi import APIRouter, Query, BackgroundTasks from djitellopy import Tello class FlightStatsCollector: def __init__(self): self.stats_thread = None self.stop_event = threading.Event() def start_collecting(self): self.stats_thread = threading.Thread(target=self.collect_stats) self.stats_thread.start() def stop_collecting(self): self.stop_event.set() if self.stats_thread: self.stats_thread.join() def collect_stats(self): while not self.stop_event.is_set(): tello.send_command_with_return("command") bat = tello.get_battery() temp = tello.get_temperature() # wsnr = tello.query_wifi_signal_noise_ratio() print(f"bat: {bat} - temp: {temp}") #- wsnr: {wsnr}") time.sleep(3) router = APIRouter() tello = Tello() stats_collector = FlightStatsCollector() def land_on_low_battery(): bat_level = tello.get_battery() if bat_level < 20: tello.land() @router.get("/takeoff") def takeoff(): try: tello.connect() except Exception as e: return {"msg": "command failed", "reason": "failed to connect"} if tello.is_flying: return {"msg": "command failed", "reason": "already flying"} try: tello.takeoff() stats_collector.start_collecting() except Exception as e: return { "msg": "command failed", "reason": e } if tello.is_flying: return {"msg": "ok"} else: return {"msg": "takeoff failed"} @router.get("/land") def land(): if not tello.is_flying: return {"msg": "command failed", "reason": "already on land"} try: tello.land() stats_collector.stop_collecting() except Exception as e: return { "msg": "command failed", "reason": e } if not tello.is_flying: return {"msg": "ok"} @router.get("/turn/{direction}/{degree}") def turn(direction: str, degree: int): if direction not in ["left", "right"]: return {"msg": "command failed", "reason": "direction must be only left or right"} if degree < 1 or degree > 360: return {"msg": "command failed", "reason": "degree must be between 1 and 360"} try: if direction == "right": tello.rotate_clockwise(degree) elif direction == "left": tello.rotate_counter_clockwise(degree) except Exception as e: return {"msg": "command failed", "reason": e} return {"msg": "ok"} @router.get("/move/{direction}/{distance}") def turn(direction: str, distance: int): if direction not in ["back", "forward", "left", "right", "up" , "down"]: return {"msg": "command failed", "reason": "direction must be only back, forward, left, right up or down"} if distance < 20 or distance > 250: # tellonun kendi sınırı 500 ama nolur nolmaz diye kısıtladım return {"msg": "command failed", "reason": "distance must be between 20 and 250"} try: tello.move(direction, distance) except Exception as e: return {"msg": "command failed", "reason": e} return {"msg": "ok"} @router.get("/emergency") def emergency_stop(): try: tello.emergency() return {"msg": "ok"} except Exception as e: return {"msg": "command failed", "reason": e} @router.get("/end") def end_flight_session(): try: tello.end() return {"msg": "ok"} except Exception as e: return {"msg": "command failed", "reason": e}