mirror of
https://github.com/infinition/Bjorn.git
synced 2024-11-11 22:38:39 +03:00
812 lines
36 KiB
Python
812 lines
36 KiB
Python
#utils.py
|
|
|
|
import json
|
|
import subprocess
|
|
import os
|
|
import json
|
|
import csv
|
|
import zipfile
|
|
import uuid
|
|
import cgi
|
|
import io
|
|
import importlib
|
|
import logging
|
|
from datetime import datetime
|
|
from logger import Logger
|
|
from urllib.parse import unquote
|
|
from actions.nmap_vuln_scanner import NmapVulnScanner
|
|
|
|
|
|
|
|
logger = Logger(name="utils.py", level=logging.DEBUG)
|
|
|
|
|
|
class WebUtils:
|
|
def __init__(self, shared_data, logger):
|
|
self.shared_data = shared_data
|
|
self.logger = logger
|
|
self.actions = None # List that contains all actions
|
|
self.standalone_actions = None # List that contains all standalone actions
|
|
|
|
def load_actions(self):
|
|
"""Load all actions from the actions file"""
|
|
if self.actions is None or self.standalone_actions is None:
|
|
self.actions = [] # reset the actions list
|
|
self.standalone_actions = [] # reset the standalone actions list
|
|
self.actions_dir = self.shared_data.actions_dir
|
|
with open(self.shared_data.actions_file, 'r') as file:
|
|
actions_config = json.load(file)
|
|
for action in actions_config:
|
|
module_name = action["b_module"]
|
|
if module_name == 'scanning':
|
|
self.load_scanner(module_name)
|
|
elif module_name == 'nmap_vuln_scanner':
|
|
self.load_nmap_vuln_scanner(module_name)
|
|
else:
|
|
self.load_action(module_name, action)
|
|
|
|
def load_scanner(self, module_name):
|
|
"""Load the network scanner"""
|
|
module = importlib.import_module(f'actions.{module_name}')
|
|
b_class = getattr(module, 'b_class')
|
|
self.network_scanner = getattr(module, b_class)(self.shared_data)
|
|
|
|
def load_nmap_vuln_scanner(self, module_name):
|
|
"""Load the nmap vulnerability scanner"""
|
|
self.nmap_vuln_scanner = NmapVulnScanner(self.shared_data)
|
|
|
|
def load_action(self, module_name, action):
|
|
"""Load an action from the actions file"""
|
|
module = importlib.import_module(f'actions.{module_name}')
|
|
try:
|
|
b_class = action["b_class"]
|
|
action_instance = getattr(module, b_class)(self.shared_data)
|
|
action_instance.action_name = b_class
|
|
action_instance.port = action.get("b_port")
|
|
action_instance.b_parent_action = action.get("b_parent")
|
|
if action_instance.port == 0:
|
|
self.standalone_actions.append(action_instance)
|
|
else:
|
|
self.actions.append(action_instance)
|
|
except AttributeError as e:
|
|
self.logger.error(f"Module {module_name} is missing required attributes: {e}")
|
|
|
|
def serve_netkb_data_json(self, handler):
|
|
try:
|
|
netkb_file = self.shared_data.netkbfile
|
|
with open(netkb_file, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
data = [row for row in reader if row['Alive'] == '1']
|
|
|
|
actions = reader.fieldnames[5:] # Actions are all fields after 'Ports'
|
|
response_data = {
|
|
'ips': [row['IPs'] for row in data],
|
|
'ports': {row['IPs']: row['Ports'].split(';') for row in data},
|
|
'actions': actions
|
|
}
|
|
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps(response_data).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def execute_manual_attack(self, handler):
|
|
try:
|
|
content_length = int(handler.headers['Content-Length'])
|
|
post_data = handler.rfile.read(content_length).decode('utf-8')
|
|
params = json.loads(post_data)
|
|
ip = params['ip']
|
|
port = params['port']
|
|
action_class = params['action']
|
|
|
|
self.logger.info(f"Received request to execute {action_class} on {ip}:{port}")
|
|
|
|
# Charger les actions si ce n'est pas déjà fait
|
|
self.load_actions()
|
|
|
|
action_instance = next((action for action in self.actions if action.action_name == action_class), None)
|
|
if action_instance is None:
|
|
raise Exception(f"Action class {action_class} not found")
|
|
|
|
# Charger les données actuelles
|
|
current_data = self.shared_data.read_data()
|
|
row = next((r for r in current_data if r["IPs"] == ip), None)
|
|
|
|
if row is None:
|
|
raise Exception(f"No data found for IP: {ip}")
|
|
|
|
action_key = action_instance.action_name
|
|
self.logger.info(f"Executing {action_key} on {ip}:{port}")
|
|
result = action_instance.execute(ip, port, row, action_key)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
if result == 'success':
|
|
row[action_key] = f'success_{timestamp}'
|
|
self.logger.info(f"Action {action_key} executed successfully on {ip}:{port}")
|
|
else:
|
|
row[action_key] = f'failed_{timestamp}'
|
|
self.logger.error(f"Action {action_key} failed on {ip}:{port}")
|
|
self.shared_data.write_data(current_data)
|
|
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Manual attack executed"}).encode('utf-8'))
|
|
except Exception as e:
|
|
self.logger.error(f"Error executing manual attack: {e}")
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
|
|
def serve_logs(self, handler):
|
|
try:
|
|
log_file_path = self.shared_data.webconsolelog
|
|
if not os.path.exists(log_file_path):
|
|
subprocess.Popen(f"sudo tail -f /home/bjorn/Bjorn/data/logs/* > {log_file_path}", shell=True)
|
|
|
|
with open(log_file_path, 'r') as log_file:
|
|
log_lines = log_file.readlines()
|
|
|
|
max_lines = 2000
|
|
if len(log_lines) > max_lines:
|
|
log_lines = log_lines[-max_lines:]
|
|
with open(log_file_path, 'w') as log_file:
|
|
log_file.writelines(log_lines)
|
|
|
|
log_data = ''.join(log_lines)
|
|
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "text/plain")
|
|
handler.end_headers()
|
|
handler.wfile.write(log_data.encode('utf-8'))
|
|
except BrokenPipeError:
|
|
# Ignore broken pipe errors
|
|
pass
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def start_orchestrator(self, handler):
|
|
try:
|
|
bjorn_instance = self.shared_data.bjorn_instance
|
|
bjorn_instance.start_orchestrator()
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Orchestrator starting..."}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def stop_orchestrator(self, handler):
|
|
try:
|
|
bjorn_instance = self.shared_data.bjorn_instance
|
|
bjorn_instance.stop_orchestrator()
|
|
self.shared_data.orchestrator_should_exit = True
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Orchestrator stopping..."}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def backup(self, handler):
|
|
try:
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_filename = f"backup_{timestamp}.zip"
|
|
backup_path = os.path.join(self.shared_data.backupdir, backup_filename)
|
|
|
|
with zipfile.ZipFile(backup_path, 'w') as backup_zip:
|
|
for folder in [self.shared_data.configdir, self.shared_data.datadir, self.shared_data.actions_dir, self.shared_data.resourcesdir]:
|
|
for root, dirs, files in os.walk(folder):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
backup_zip.write(file_path, os.path.relpath(file_path, self.shared_data.currentdir))
|
|
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "url": f"/download_backup?filename={backup_filename}", "filename": backup_filename}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def restore(self, handler):
|
|
try:
|
|
content_length = int(handler.headers['Content-Length'])
|
|
field_data = handler.rfile.read(content_length)
|
|
field_storage = cgi.FieldStorage(fp=io.BytesIO(field_data), headers=handler.headers, environ={'REQUEST_METHOD': 'POST'})
|
|
|
|
file_item = field_storage['file']
|
|
if file_item.filename:
|
|
backup_path = os.path.join(self.shared_data.upload_dir, file_item.filename)
|
|
with open(backup_path, 'wb') as output_file:
|
|
output_file.write(file_item.file.read())
|
|
|
|
with zipfile.ZipFile(backup_path, 'r') as backup_zip:
|
|
backup_zip.extractall(self.shared_data.currentdir)
|
|
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Restore completed successfully"}).encode('utf-8'))
|
|
else:
|
|
handler.send_response(400)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": "No selected file"}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def download_backup(self, handler):
|
|
query = unquote(handler.path.split('?filename=')[1])
|
|
backup_path = os.path.join(self.shared_data.backupdir, query)
|
|
if os.path.isfile(backup_path):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-Disposition", f'attachment; filename="{os.path.basename(backup_path)}"')
|
|
handler.send_header("Content-type", "application/zip")
|
|
handler.end_headers()
|
|
with open(backup_path, 'rb') as file:
|
|
handler.wfile.write(file.read())
|
|
else:
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
|
|
def serve_credentials_data(self, handler):
|
|
try:
|
|
directory = self.shared_data.crackedpwddir
|
|
html_content = self.generate_html_for_csv_files(directory)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "text/html")
|
|
handler.end_headers()
|
|
handler.wfile.write(html_content.encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def generate_html_for_csv_files(self, directory):
|
|
html = '<div class="credentials-container">\n'
|
|
for filename in os.listdir(directory):
|
|
if filename.endswith('.csv'):
|
|
filepath = os.path.join(directory, filename)
|
|
html += f'<h2>{filename}</h2>\n'
|
|
html += '<table class="styled-table">\n<thead>\n<tr>\n'
|
|
with open(filepath, 'r') as file:
|
|
reader = csv.reader(file)
|
|
headers = next(reader)
|
|
for header in headers:
|
|
html += f'<th>{header}</th>\n'
|
|
html += '</tr>\n</thead>\n<tbody>\n'
|
|
for row in reader:
|
|
html += '<tr>\n'
|
|
for cell in row:
|
|
html += f'<td>{cell}</td>\n'
|
|
html += '</tr>\n'
|
|
html += '</tbody>\n</table>\n'
|
|
html += '</div>\n'
|
|
return html
|
|
|
|
def list_files(self, directory):
|
|
files = []
|
|
for entry in os.scandir(directory):
|
|
if entry.is_dir():
|
|
files.append({
|
|
"name": entry.name,
|
|
"is_directory": True,
|
|
"children": self.list_files(entry.path)
|
|
})
|
|
else:
|
|
files.append({
|
|
"name": entry.name,
|
|
"is_directory": False,
|
|
"path": entry.path
|
|
})
|
|
return files
|
|
|
|
|
|
|
|
def serve_file(self, handler, filename):
|
|
try:
|
|
with open(os.path.join(self.shared_data.webdir, filename), 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
content = content.replace('{{ web_delay }}', str(self.shared_data.web_delay * 1000))
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "text/html")
|
|
handler.end_headers()
|
|
handler.wfile.write(content.encode('utf-8'))
|
|
except FileNotFoundError:
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
|
|
def serve_current_config(self, handler):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
with open(self.shared_data.shared_config_json, 'r') as f:
|
|
config = json.load(f)
|
|
handler.wfile.write(json.dumps(config).encode('utf-8'))
|
|
|
|
def restore_default_config(self, handler):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
self.shared_data.config = self.shared_data.default_config.copy()
|
|
self.shared_data.save_config()
|
|
handler.wfile.write(json.dumps(self.shared_data.config).encode('utf-8'))
|
|
|
|
def serve_image(self, handler):
|
|
image_path = os.path.join(self.shared_data.webdir, 'screen.png')
|
|
try:
|
|
with open(image_path, 'rb') as file:
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "image/png")
|
|
handler.send_header("Cache-Control", "max-age=0, must-revalidate")
|
|
handler.end_headers()
|
|
handler.wfile.write(file.read())
|
|
except FileNotFoundError:
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
except BrokenPipeError:
|
|
# Ignore broken pipe errors
|
|
pass
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error: {e}")
|
|
|
|
|
|
def serve_favicon(self, handler):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "image/x-icon")
|
|
handler.end_headers()
|
|
favicon_path = os.path.join(self.shared_data.webdir, '/images/favicon.ico')
|
|
self.logger.info(f"Serving favicon from {favicon_path}")
|
|
try:
|
|
with open(favicon_path, 'rb') as file:
|
|
handler.wfile.write(file.read())
|
|
except FileNotFoundError:
|
|
self.logger.error(f"Favicon not found at {favicon_path}")
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
|
|
def serve_manifest(self, handler):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
manifest_path = os.path.join(self.shared_data.webdir, 'manifest.json')
|
|
try:
|
|
with open(manifest_path, 'r') as file:
|
|
handler.wfile.write(file.read().encode('utf-8'))
|
|
except FileNotFoundError:
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
|
|
def serve_apple_touch_icon(self, handler):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "image/png")
|
|
handler.end_headers()
|
|
icon_path = os.path.join(self.shared_data.webdir, 'icons/apple-touch-icon.png')
|
|
try:
|
|
with open(icon_path, 'rb') as file:
|
|
handler.wfile.write(file.read())
|
|
except FileNotFoundError:
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
|
|
def scan_wifi(self, handler):
|
|
try:
|
|
result = subprocess.Popen(['sudo', 'iwlist', 'wlan0', 'scan'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
stdout, stderr = result.communicate()
|
|
if result.returncode != 0:
|
|
raise Exception(stderr)
|
|
networks = self.parse_scan_result(stdout)
|
|
self.logger.info(f"Found {len(networks)} networks")
|
|
current_ssid = subprocess.Popen(['iwgetid', '-r'], stdout=subprocess.PIPE, text=True)
|
|
ssid_out, ssid_err = current_ssid.communicate()
|
|
if current_ssid.returncode != 0:
|
|
raise Exception(ssid_err)
|
|
current_ssid = ssid_out.strip()
|
|
self.logger.info(f"Current SSID: {current_ssid}")
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"networks": networks, "current_ssid": current_ssid}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
self.logger.error(f"Error scanning Wi-Fi networks: {e}")
|
|
handler.wfile.write(json.dumps({"error": str(e)}).encode('utf-8'))
|
|
|
|
def parse_scan_result(self, scan_output):
|
|
networks = []
|
|
for line in scan_output.split('\n'):
|
|
if 'ESSID' in line:
|
|
ssid = line.split(':')[1].strip('"')
|
|
if ssid not in networks:
|
|
networks.append(ssid)
|
|
return networks
|
|
|
|
def connect_wifi(self, handler):
|
|
try:
|
|
content_length = int(handler.headers['Content-Length'])
|
|
post_data = handler.rfile.read(content_length).decode('utf-8')
|
|
params = json.loads(post_data)
|
|
ssid = params['ssid']
|
|
password = params['password']
|
|
|
|
self.update_nmconnection(ssid, password)
|
|
command = f'sudo nmcli connection up "preconfigured"'
|
|
connect_result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
stdout, stderr = connect_result.communicate()
|
|
if connect_result.returncode != 0:
|
|
raise Exception(stderr)
|
|
|
|
self.shared_data.wifichanged = True
|
|
|
|
handler.send_response(200)
|
|
handler.send_header('Content-type', 'application/json')
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Connected to " + ssid}).encode('utf-8'))
|
|
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header('Content-type', 'application/json')
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def disconnect_and_clear_wifi(self, handler):
|
|
try:
|
|
command_disconnect = 'sudo nmcli connection down "preconfigured"'
|
|
disconnect_result = subprocess.Popen(command_disconnect, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
stdout, stderr = disconnect_result.communicate()
|
|
if disconnect_result.returncode != 0:
|
|
raise Exception(stderr)
|
|
|
|
config_path = '/etc/NetworkManager/system-connections/preconfigured.nmconnection'
|
|
with open(config_path, 'w') as f:
|
|
f.write("")
|
|
subprocess.Popen(['sudo', 'chmod', '600', config_path]).communicate()
|
|
subprocess.Popen(['sudo', 'nmcli', 'connection', 'reload']).communicate()
|
|
|
|
self.shared_data.wifichanged = False
|
|
|
|
handler.send_response(200)
|
|
handler.send_header('Content-type', 'application/json')
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Disconnected from Wi-Fi and cleared preconfigured settings"}).encode('utf-8'))
|
|
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header('Content-type', 'application/json')
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def clear_files(self, handler):
|
|
try:
|
|
command = """
|
|
sudo rm -rf config/*.json && sudo rm -rf data/*.csv && sudo rm -rf data/*.log && sudo rm -rf backup/backups/* && sudo rm -rf backup/uploads/* && sudo rm -rf data/output/data_stolen/* && sudo rm -rf data/output/crackedpwd/* && sudo rm -rf config/* && sudo rm -rf data/output/scan_results/* && sudo rm -rf __pycache__ && sudo rm -rf config/__pycache__ && sudo rm -rf data/__pycache__ && sudo rm -rf actions/__pycache__ && sudo rm -rf resources/__pycache__ && sudo rm -rf web/__pycache__ && sudo rm -rf *.log && sudo rm -rf resources/waveshare_epd/__pycache__ && sudo rm -rf data/logs/* && sudo rm -rf data/output/vulnerabilities/* && sudo rm -rf data/logs/*
|
|
"""
|
|
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
stdout, stderr = result.communicate()
|
|
|
|
if result.returncode == 0:
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Files cleared successfully"}).encode('utf-8'))
|
|
else:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": stderr}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def clear_files_light(self, handler):
|
|
try:
|
|
command = """
|
|
sudo rm -rf data/*.log && sudo rm -rf data/output/data_stolen/* && sudo rm -rf data/output/crackedpwd/* && sudo rm -rf data/output/scan_results/* && sudo rm -rf __pycache__ && sudo rm -rf config/__pycache__ && sudo rm -rf data/__pycache__ && sudo rm -rf actions/__pycache__ && sudo rm -rf resources/__pycache__ && sudo rm -rf web/__pycache__ && sudo rm -rf *.log && sudo rm -rf resources/waveshare_epd/__pycache__ && sudo rm -rf data/logs/* && sudo rm -rf data/output/vulnerabilities/* && sudo rm -rf data/logs/*
|
|
"""
|
|
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
stdout, stderr = result.communicate()
|
|
|
|
if result.returncode == 0:
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Files cleared successfully"}).encode('utf-8'))
|
|
else:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": stderr}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def initialize_csv(self, handler):
|
|
try:
|
|
self.shared_data.generate_actions_json()
|
|
self.shared_data.initialize_csv()
|
|
self.shared_data.create_livestatusfile()
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "CSV files initialized successfully"}).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def reboot_system(self, handler):
|
|
try:
|
|
command = "sudo reboot"
|
|
subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "System is rebooting"}).encode('utf-8'))
|
|
except subprocess.CalledProcessError as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def shutdown_system(self, handler):
|
|
try:
|
|
command = "sudo shutdown now"
|
|
subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "System is shutting down"}).encode('utf-8'))
|
|
except subprocess.CalledProcessError as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def restart_bjorn_service(self, handler):
|
|
try:
|
|
command = "sudo systemctl restart bjorn.service"
|
|
subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Bjorn service restarted successfully"}).encode('utf-8'))
|
|
except subprocess.CalledProcessError as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def serve_network_data(self, handler):
|
|
try:
|
|
latest_file = max(
|
|
[os.path.join(self.shared_data.scan_results_dir, f) for f in os.listdir(self.shared_data.scan_results_dir) if f.startswith('result_')],
|
|
key=os.path.getctime
|
|
)
|
|
table_html = self.generate_html_table(latest_file)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "text/html")
|
|
handler.end_headers()
|
|
handler.wfile.write(table_html.encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def generate_html_table(self, file_path):
|
|
table_html = '<table class="styled-table"><thead><tr>'
|
|
with open(file_path, 'r') as file:
|
|
reader = csv.reader(file)
|
|
headers = next(reader)
|
|
for header in headers:
|
|
table_html += f'<th>{header}</th>'
|
|
table_html += '</tr></thead><tbody>'
|
|
for row in reader:
|
|
table_html += '<tr>'
|
|
for cell in row:
|
|
cell_class = "green" if cell.strip() else "red"
|
|
table_html += f'<td class="{cell_class}">{cell}</td>'
|
|
table_html += '</tr>'
|
|
table_html += '</tbody></table>'
|
|
return table_html
|
|
|
|
def generate_html_table_netkb(self, file_path):
|
|
table_html = '<table class="styled-table"><thead><tr>'
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.reader(file)
|
|
headers = next(reader)
|
|
for header in headers:
|
|
table_html += f'<th>{header}</th>'
|
|
table_html += '</tr></thead><tbody>'
|
|
for row in reader:
|
|
row_class = "blue-row" if '0' in row[3] else ""
|
|
table_html += f'<tr class="{row_class}">'
|
|
for cell in row:
|
|
cell_class = ""
|
|
if "success" in cell:
|
|
cell_class = "green bold"
|
|
elif "failed" in cell:
|
|
cell_class = "red bold"
|
|
elif cell.strip() == "":
|
|
cell_class = "grey"
|
|
table_html += f'<td class="{cell_class}">{cell}</td>'
|
|
table_html += '</tr>'
|
|
table_html += '</tbody></table>'
|
|
except Exception as e:
|
|
self.logger.error(f"Error in generate_html_table_netkb: {e}")
|
|
return table_html
|
|
|
|
|
|
def serve_netkb_data(self, handler):
|
|
try:
|
|
latest_file = self.shared_data.netkbfile
|
|
table_html = self.generate_html_table_netkb(latest_file)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "text/html")
|
|
handler.end_headers()
|
|
handler.wfile.write(table_html.encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def update_nmconnection(self, ssid, password):
|
|
config_path = '/etc/NetworkManager/system-connections/preconfigured.nmconnection'
|
|
with open(config_path, 'w') as f:
|
|
f.write(f"""
|
|
[connection]
|
|
id=preconfigured
|
|
uuid={uuid.uuid4()}
|
|
type=wifi
|
|
autoconnect=true
|
|
|
|
[wifi]
|
|
ssid={ssid}
|
|
mode=infrastructure
|
|
|
|
[wifi-security]
|
|
key-mgmt=wpa-psk
|
|
psk={password}
|
|
|
|
[ipv4]
|
|
method=auto
|
|
|
|
[ipv6]
|
|
method=auto
|
|
""")
|
|
subprocess.Popen(['sudo', 'chmod', '600', config_path]).communicate()
|
|
subprocess.Popen(['sudo', 'nmcli', 'connection', 'reload']).communicate()
|
|
|
|
def save_configuration(self, handler):
|
|
try:
|
|
content_length = int(handler.headers['Content-Length'])
|
|
post_data = handler.rfile.read(content_length).decode('utf-8')
|
|
params = json.loads(post_data)
|
|
fichier = self.shared_data.shared_config_json
|
|
self.logger.info(f"Received params: {params}")
|
|
|
|
with open(fichier, 'r') as f:
|
|
current_config = json.load(f)
|
|
|
|
for key, value in params.items():
|
|
if isinstance(value, bool):
|
|
current_config[key] = value
|
|
elif isinstance(value, str) and value.lower() in ['true', 'false']:
|
|
current_config[key] = value.lower() == 'true'
|
|
elif isinstance(value, (int, float)):
|
|
current_config[key] = value
|
|
elif isinstance(value, list):
|
|
current_config[key] = value
|
|
elif isinstance(value, str):
|
|
if value.replace('.', '', 1).isdigit():
|
|
current_config[key] = float(value) if '.' in value else int(value)
|
|
else:
|
|
current_config[key] = value
|
|
else:
|
|
current_config[key] = value
|
|
|
|
with open(fichier, 'w') as f:
|
|
json.dump(current_config, f, indent=4)
|
|
self.logger.info("Configuration saved to file")
|
|
|
|
handler.send_response(200)
|
|
handler.send_header('Content-type', 'application/json')
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "success", "message": "Configuration saved"}).encode('utf-8'))
|
|
self.logger.info("Configuration saved (web)")
|
|
|
|
self.shared_data.load_config()
|
|
self.logger.info("Configuration reloaded (web)")
|
|
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header('Content-type', 'application/json')
|
|
handler.end_headers()
|
|
error_message = {"status": "error", "message": str(e)}
|
|
handler.wfile.write(json.dumps(error_message).encode('utf-8'))
|
|
self.logger.error(f"Error saving configuration: {e}")
|
|
|
|
def list_files(self, directory):
|
|
files = []
|
|
for entry in os.scandir(directory):
|
|
if entry.is_dir():
|
|
files.append({
|
|
"name": entry.name,
|
|
"is_directory": True,
|
|
"children": self.list_files(entry.path)
|
|
})
|
|
else:
|
|
files.append({
|
|
"name": entry.name,
|
|
"is_directory": False,
|
|
"path": entry.path
|
|
})
|
|
return files
|
|
|
|
def list_files_endpoint(self, handler):
|
|
try:
|
|
files = self.list_files(self.shared_data.datastolendir)
|
|
handler.send_response(200)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps(files).encode('utf-8'))
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
def download_file(self, handler):
|
|
try:
|
|
query = unquote(handler.path.split('?path=')[1])
|
|
file_path = os.path.join(self.shared_data.datastolendir, query)
|
|
if os.path.isfile(file_path):
|
|
handler.send_response(200)
|
|
handler.send_header("Content-Disposition", f'attachment; filename="{os.path.basename(file_path)}"')
|
|
handler.end_headers()
|
|
with open(file_path, 'rb') as file:
|
|
handler.wfile.write(file.read())
|
|
else:
|
|
handler.send_response(404)
|
|
handler.end_headers()
|
|
except Exception as e:
|
|
handler.send_response(500)
|
|
handler.send_header("Content-type", "application/json")
|
|
handler.end_headers()
|
|
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
|
|
|
|
|
|
|