1
0
mirror of https://github.com/infinition/Bjorn.git synced 2024-11-11 22:38:39 +03:00
Files
2024-11-07 16:39:14 +01:00

223 lines
9.1 KiB
Python

#webapp.py
import json
import threading
import http.server
import socketserver
import logging
import sys
import signal
import os
import gzip
import io
from logger import Logger
from init_shared import shared_data
from utils import WebUtils
# Initialize the logger
logger = Logger(name="webapp.py", level=logging.DEBUG)
# Set the path to the favicon
favicon_path = os.path.join(shared_data.webdir, '/images/favicon.ico')
class CustomHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.shared_data = shared_data
self.web_utils = WebUtils(shared_data, logger)
super().__init__(*args, **kwargs)
def log_message(self, format, *args):
# Override to suppress logging of GET requests.
if 'GET' not in format % args:
logger.info("%s - - [%s] %s\n" %
(self.client_address[0],
self.log_date_time_string(),
format % args))
def gzip_encode(self, content):
"""Gzip compress the given content."""
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(content)
return out.getvalue()
def send_gzipped_response(self, content, content_type):
"""Send a gzipped HTTP response."""
gzipped_content = self.gzip_encode(content)
self.send_response(200)
self.send_header("Content-type", content_type)
self.send_header("Content-Encoding", "gzip")
self.send_header("Content-Length", str(len(gzipped_content)))
self.end_headers()
self.wfile.write(gzipped_content)
def serve_file_gzipped(self, file_path, content_type):
"""Serve a file with gzip compression."""
with open(file_path, 'rb') as file:
content = file.read()
self.send_gzipped_response(content, content_type)
def do_GET(self):
# Handle GET requests. Serve the HTML interface and the EPD image.
if self.path == '/index.html' or self.path == '/':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'index.html'), 'text/html')
elif self.path == '/config.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'config.html'), 'text/html')
elif self.path == '/actions.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'actions.html'), 'text/html')
elif self.path == '/network.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'network.html'), 'text/html')
elif self.path == '/netkb.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'netkb.html'), 'text/html')
elif self.path == '/bjorn.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'bjorn.html'), 'text/html')
elif self.path == '/loot.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'loot.html'), 'text/html')
elif self.path == '/credentials.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'credentials.html'), 'text/html')
elif self.path == '/manual.html':
self.serve_file_gzipped(os.path.join(self.shared_data.webdir, 'manual.html'), 'text/html')
elif self.path == '/load_config':
self.web_utils.serve_current_config(self)
elif self.path == '/restore_default_config':
self.web_utils.restore_default_config(self)
elif self.path == '/get_web_delay':
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
response = json.dumps({"web_delay": self.shared_data.web_delay})
self.wfile.write(response.encode('utf-8'))
elif self.path == '/scan_wifi':
self.web_utils.scan_wifi(self)
elif self.path == '/network_data':
self.web_utils.serve_network_data(self)
elif self.path == '/netkb_data':
self.web_utils.serve_netkb_data(self)
elif self.path == '/netkb_data_json':
self.web_utils.serve_netkb_data_json(self)
elif self.path.startswith('/screen.png'):
self.web_utils.serve_image(self)
elif self.path == '/favicon.ico':
self.web_utils.serve_favicon(self)
elif self.path == '/manifest.json':
self.web_utils.serve_manifest(self)
elif self.path == '/apple-touch-icon':
self.web_utils.serve_apple_touch_icon(self)
elif self.path == '/get_logs':
self.web_utils.serve_logs(self)
elif self.path == '/list_credentials':
self.web_utils.serve_credentials_data(self)
elif self.path.startswith('/list_files'):
self.web_utils.list_files_endpoint(self)
elif self.path.startswith('/download_file'):
self.web_utils.download_file(self)
elif self.path.startswith('/download_backup'):
self.web_utils.download_backup(self)
else:
super().do_GET()
def do_POST(self):
# Handle POST requests for saving configuration, connecting to Wi-Fi, clearing files, rebooting, and shutting down.
if self.path == '/save_config':
self.web_utils.save_configuration(self)
elif self.path == '/connect_wifi':
self.web_utils.connect_wifi(self)
self.shared_data.wifichanged = True # Set the flag when Wi-Fi is connected
elif self.path == '/disconnect_wifi': # New route to disconnect Wi-Fi
self.web_utils.disconnect_and_clear_wifi(self)
elif self.path == '/clear_files':
self.web_utils.clear_files(self)
elif self.path == '/clear_files_light':
self.web_utils.clear_files_light(self)
elif self.path == '/initialize_csv':
self.web_utils.initialize_csv(self)
elif self.path == '/reboot':
self.web_utils.reboot_system(self)
elif self.path == '/shutdown':
self.web_utils.shutdown_system(self)
elif self.path == '/restart_bjorn_service':
self.web_utils.restart_bjorn_service(self)
elif self.path == '/backup':
self.web_utils.backup(self)
elif self.path == '/restore':
self.web_utils.restore(self)
elif self.path == '/stop_orchestrator': # New route to stop the orchestrator
self.web_utils.stop_orchestrator(self)
elif self.path == '/start_orchestrator': # New route to start the orchestrator
self.web_utils.start_orchestrator(self)
elif self.path == '/execute_manual_attack': # New route to execute a manual attack
self.web_utils.execute_manual_attack(self)
else:
self.send_response(404)
self.end_headers()
class WebThread(threading.Thread):
"""
Thread to run the web server serving the EPD display interface.
"""
def __init__(self, handler_class=CustomHandler, port=8000):
super().__init__()
self.shared_data = shared_data
self.port = port
self.handler_class = handler_class
self.httpd = None
def run(self):
"""
Run the web server in a separate thread.
"""
while not self.shared_data.webapp_should_exit:
try:
with socketserver.TCPServer(("", self.port), self.handler_class) as httpd:
self.httpd = httpd
logger.info(f"Serving at port {self.port}")
while not self.shared_data.webapp_should_exit:
httpd.handle_request()
except OSError as e:
if e.errno == 98: # Address already in use error
logger.warning(f"Port {self.port} is in use, trying the next port...")
self.port += 1
else:
logger.error(f"Error in web server: {e}")
break
finally:
if self.httpd:
self.httpd.server_close()
logger.info("Web server closed.")
def shutdown(self):
"""
Shutdown the web server gracefully.
"""
if self.httpd:
self.httpd.shutdown()
self.httpd.server_close()
logger.info("Web server shutdown initiated.")
def handle_exit_web(signum, frame):
"""
Handle exit signals to shutdown the web server cleanly.
"""
shared_data.webapp_should_exit = True
if web_thread.is_alive():
web_thread.shutdown()
web_thread.join() # Wait until the web_thread is finished
logger.info("Server shutting down...")
sys.exit(0)
# Initialize the web thread
web_thread = WebThread(port=8000)
# Set up signal handling for graceful shutdown
signal.signal(signal.SIGINT, handle_exit_web)
signal.signal(signal.SIGTERM, handle_exit_web)
if __name__ == "__main__":
try:
# Start the web server thread
web_thread.start()
logger.info("Web server thread started.")
except Exception as e:
logger.error(f"An exception occurred during web server start: {e}")
handle_exit_web(signal.SIGINT, None)
sys.exit(1)