69 lines
2.5 KiB
Python
69 lines
2.5 KiB
Python
import sys
|
|
|
|
import requests
|
|
from requests.exceptions import ConnectionError
|
|
from loguru import logger
|
|
|
|
from settings.config import settings
|
|
|
|
|
|
|
|
class CommandHandler:
|
|
|
|
def __init__(self):
|
|
self.COMMANDER_COMMANDS_URL = f"{settings.COMMANDER_ROOT_URL}/command"
|
|
self._check_commander_health()
|
|
|
|
def _parse_response(self, raw_resp):
|
|
json_resp = raw_resp.json()
|
|
if json_resp["msg"] == "ok": logger.success(json_resp)
|
|
else: logger.warning(json_resp["msg"])
|
|
|
|
def _check_commander_health(self):
|
|
try:
|
|
response = requests.get(f"{settings.COMMANDER_ROOT_URL}/test/health")
|
|
status = response.json()
|
|
except ConnectionError:
|
|
logger.error(f"commander service is unavailable: {settings.COMMANDER_ROOT_URL}"); sys.exit(1)
|
|
#raise Exception(f"commander service is unavailable: {settings.COMMANDER_ROOT_URL}")
|
|
if status["msg"] != "ok":
|
|
logger.success("connected to commander service")
|
|
|
|
def _move(self, direction, distance):
|
|
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/move/{direction}/{distance}")
|
|
self._parse_response(raw_resp=response)
|
|
|
|
def _turn(self, direction, degree):
|
|
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/turn/{direction}/{degree}")
|
|
self._parse_response(raw_resp=response)
|
|
|
|
def _takeoff(self):
|
|
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/takeoff")
|
|
self._parse_response(raw_resp=response)
|
|
|
|
def _land(self):
|
|
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/land")
|
|
self._parse_response(raw_resp=response)
|
|
|
|
def _end_session(self):
|
|
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/end")
|
|
self._parse_response(raw_resp=response)
|
|
|
|
def _emergency(self):
|
|
response = requests.get(f"{self.COMMANDER_COMMANDS_URL}/emergency")
|
|
self._parse_response(raw_resp=response)
|
|
|
|
def handle(self, cmd: dict):
|
|
if cmd["command"] == "move":
|
|
self._move(direction=cmd["direction"], distance=cmd["distance_angle"])
|
|
elif cmd["command"] == "turn":
|
|
self._turn(direction=cmd["direction"], degree=cmd["distance_angle"])
|
|
elif cmd["command"] == "takeoff":
|
|
self._takeoff()
|
|
elif cmd["command"] == "land":
|
|
self._land()
|
|
elif cmd["command"] == "emergency":
|
|
self._emergency()
|
|
else:
|
|
raise ValueError(f"Uknown command object: {cmd}")
|