Files
tello-commander/brain/commands.py
ALIHAN DIKEL 0b3523000c better logging
2023-05-19 03:00:52 +03:00

69 lines
2.5 KiB
Python

import sys
import requests
from requests.exceptions import ConnectionError
from loguru import logger
from settings.config import settings
class CommandHandler:
def __init__(self):
self.COMMANDER_COMMANDS_URL = f"{settings.COMMANDER_ROOT_URL}/command"
self._check_commander_health()
def _parse_response(self, raw_resp):
json_resp = raw_resp.json()
if json_resp["msg"] == "ok": logger.success(json_resp)
else: logger.warning(json_resp["msg"])
def _check_commander_health(self):
try:
response = requests.get(f"{settings.COMMANDER_ROOT_URL}/test/health")
status = response.json()
except ConnectionError:
logger.error(f"commander service is unavailable: {settings.COMMANDER_ROOT_URL}"); sys.exit(1)
#raise Exception(f"commander service is unavailable: {settings.COMMANDER_ROOT_URL}")
if status["msg"] != "ok":
logger.success("connected to commander service")
def _move(self, direction, distance):
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/move/{direction}/{distance}")
self._parse_response(raw_resp=response)
def _turn(self, direction, degree):
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/turn/{direction}/{degree}")
self._parse_response(raw_resp=response)
def _takeoff(self):
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/takeoff")
self._parse_response(raw_resp=response)
def _land(self):
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/land")
self._parse_response(raw_resp=response)
def _end_session(self):
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/end")
self._parse_response(raw_resp=response)
def _emergency(self):
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/emergency")
self._parse_response(raw_resp=response)
def handle(self, cmd: dict):
if cmd["command"] == "move":
self._move(direction=cmd["direction"], distance=cmd["distance_angle"])
elif cmd["command"] == "turn":
self._turn(direction=cmd["direction"], degree=cmd["distance_angle"])
elif cmd["command"] == "takeoff":
self._takeoff()
elif cmd["command"] == "land":
self._land()
elif cmd["command"] == "emergency":
self._emergency()
else:
raise ValueError(f"Uknown command object: {cmd}")