Files
tello-commander/brain/brain_base.py

70 lines
2.1 KiB
Python

import ast
import json
import os
from loguru import logger
from commander.commands import CommandHandler
class BaseBrain:
def __init__(self):
self.command_handler = CommandHandler()
@property
def sys_prompt(self):
return self._read_prompt()
def _read_prompt(self):
prompt_file_name = "prompt.txt"
for root, dirs, files in os.walk("./"):
if prompt_file_name in files:
prompt_filepath = os.path.join(root, prompt_file_name)
with open(prompt_filepath, "r") as f:
return f.read()
def _is_valid_json(self, answer):
try:
response_json = json.loads(answer)
return True
except ValueError as e:
logger.error(f"chatgpt failed to return json obj: {answer}")
return False
def _gc(self):
self.cmd_prompt = None
self.response = None
def is_emergency(self, input):
if input == "q":
msg = "##### BASE BRAIN: EMERGENCY STOP DETECTED!!! #####"
logger.warning(msg)
self.response_to_chatui = msg
self.command_handler.handle({"command": "emergency"})
return True
else:
return False
def listen(self, channel="cli", prompt=None):
if channel == "cli":
self.cmd_prompt = input("\n\nwhat should I do now?\n(enter q for emergency)\n\t")
elif channel == "api":
self.cmd_prompt = prompt
def command(self):
if self._is_valid_json(self.answer):
command = ast.literal_eval(self.answer)
if command == {}:
msg = f"I failed to understand: {command}"
logger.warning(msg)
self.response_to_chatui = msg
else:
msg = f"I will send this command: {command}"
logger.success(msg)
self.response_to_chatui = msg
self.command_handler.handle(command)
else:
msg = f"\tI will skip this:\n {self.answer}"
logger.warning(msg)
self.response_to_chatui = msg
self._gc()