mirror of
https://github.com/infinition/Bjorn.git
synced 2024-11-11 22:38:39 +03:00
386 lines
21 KiB
Python
386 lines
21 KiB
Python
#display.py
|
|
# Description:
|
|
# This file, display.py, is responsible for managing the e-ink display of the Bjorn project, updating it with relevant data and statuses.
|
|
# It initializes the display, manages multiple threads for updating shared data and vulnerability counts, and handles the rendering of information
|
|
# and images on the display.
|
|
#
|
|
# Key functionalities include:
|
|
# - Initializing the e-ink display (EPD) and handling any errors during initialization.
|
|
# - Creating and managing threads to periodically update shared data and vulnerability counts.
|
|
# - Rendering various statistics, status icons, and images on the e-ink display.
|
|
# - Handling updates to shared data from various sources, including CSV files and system commands.
|
|
# - Checking and displaying the status of Bluetooth, Wi-Fi, PAN, and USB connections.
|
|
# - Providing methods to update the display with comments from an AI (Commentaireia) and generating images dynamically.
|
|
|
|
import threading
|
|
import time
|
|
import os
|
|
import pandas as pd
|
|
import signal
|
|
import glob
|
|
import logging
|
|
import random
|
|
import sys
|
|
from PIL import Image, ImageDraw
|
|
from init_shared import shared_data
|
|
from comment import Commentaireia
|
|
from logger import Logger
|
|
import subprocess
|
|
|
|
logger = Logger(name="display.py", level=logging.DEBUG)
|
|
|
|
class Display:
|
|
def __init__(self, shared_data):
|
|
"""Initialize the display and start the main image and shared data update threads."""
|
|
self.shared_data = shared_data
|
|
self.shared_data.bjornstatustext2 = "Awakening..."
|
|
self.commentaire_ia = Commentaireia()
|
|
self.semaphore = threading.Semaphore(10)
|
|
self.screen_reversed = self.shared_data.screen_reversed
|
|
self.web_screen_reversed = self.shared_data.web_screen_reversed
|
|
|
|
try:
|
|
self.epd_helper = self.shared_data.epd_helper
|
|
self.epd_helper.init_partial_update()
|
|
logger.info("Display initialization complete.")
|
|
except Exception as e:
|
|
logger.error(f"Error during display initialization: {e}")
|
|
raise
|
|
|
|
self.main_image_thread = threading.Thread(target=self.update_main_image)
|
|
self.main_image_thread.daemon = True
|
|
self.main_image_thread.start()
|
|
|
|
self.update_shared_data_thread = threading.Thread(target=self.schedule_update_shared_data)
|
|
self.update_shared_data_thread.daemon = True
|
|
self.update_shared_data_thread.start()
|
|
|
|
self.update_vuln_count_thread = threading.Thread(target=self.schedule_update_vuln_count)
|
|
self.update_vuln_count_thread.daemon = True
|
|
self.update_vuln_count_thread.start()
|
|
|
|
self.scale_factor_x = self.shared_data.scale_factor_x
|
|
self.scale_factor_y = self.shared_data.scale_factor_y
|
|
|
|
def schedule_update_shared_data(self):
|
|
"""Periodically update the shared data with the latest system information."""
|
|
while not self.shared_data.display_should_exit:
|
|
self.update_shared_data()
|
|
time.sleep(25)
|
|
|
|
def schedule_update_vuln_count(self):
|
|
"""Periodically update the vulnerability count on the display."""
|
|
while not self.shared_data.display_should_exit:
|
|
self.update_vuln_count()
|
|
time.sleep(300)
|
|
|
|
def update_main_image(self):
|
|
"""Update the main image on the display with the latest immagegen data."""
|
|
while not self.shared_data.display_should_exit:
|
|
try:
|
|
self.shared_data.update_image_randomizer()
|
|
if self.shared_data.imagegen:
|
|
self.main_image = self.shared_data.imagegen
|
|
else:
|
|
logger.error("No image generated for current status.")
|
|
time.sleep(random.uniform(self.shared_data.image_display_delaymin, self.shared_data.image_display_delaymax))
|
|
except Exception as e:
|
|
logger.error(f"An error occurred in update_main_image: {e}")
|
|
def get_open_files(self):
|
|
"""Get the number of open FD files on the system."""
|
|
try:
|
|
open_files = len(glob.glob('/proc/*/fd/*'))
|
|
logger.debug(f"FD : {open_files}")
|
|
return open_files
|
|
except Exception as e:
|
|
logger.error(f"Error getting open files: {e}")
|
|
return None
|
|
|
|
def update_vuln_count(self):
|
|
"""Update the vulnerability count on the display."""
|
|
with self.semaphore:
|
|
try:
|
|
if not os.path.exists(self.shared_data.vuln_summary_file):
|
|
# Create the file with the necessary columns if it does not exist
|
|
df = pd.DataFrame(columns=["IP", "Hostname", "MAC Address", "Port", "Vulnerabilities"])
|
|
df.to_csv(self.shared_data.vuln_summary_file, index=False)
|
|
self.shared_data.vulnnbr = 0
|
|
logger.info("Vulnerability summary file created.")
|
|
else:
|
|
# Load the netkbfile to check for "Alive" IPs
|
|
if os.path.exists(self.shared_data.netkbfile):
|
|
with open(self.shared_data.netkbfile, 'r') as file:
|
|
netkb_df = pd.read_csv(file)
|
|
alive_macs = set(netkb_df[(netkb_df["Alive"] == 1) & (netkb_df["MAC Address"] != "STANDALONE")]["MAC Address"]) # Get all alive MACs based on the 'Alive' column and ignore "STANDALONE"
|
|
else:
|
|
alive_macs = set()
|
|
|
|
with open(self.shared_data.vuln_summary_file, 'r') as file:
|
|
df = pd.read_csv(file)
|
|
all_vulnerabilities = set()
|
|
|
|
for index, row in df.iterrows():
|
|
mac_address = row["MAC Address"]
|
|
if mac_address in alive_macs and mac_address != "STANDALONE": # Ignore "STANDALONE" MAC addresses
|
|
vulnerabilities = row["Vulnerabilities"]
|
|
if pd.isna(vulnerabilities) or not isinstance(vulnerabilities, str): # Check if vulnerabilities is NaN or not a string
|
|
# logger.debug(f"No valid vulnerabilities for MAC Address: {mac_address}")
|
|
continue
|
|
|
|
if vulnerabilities and isinstance(vulnerabilities, str):
|
|
all_vulnerabilities.update(vulnerabilities.split("; ")) # Add the vulnerabilities to the set
|
|
|
|
self.shared_data.vulnnbr = len(all_vulnerabilities)
|
|
logger.debug(f"Updated vulnerabilities count: {self.shared_data.vulnnbr}")
|
|
|
|
# Update the livestatusfile
|
|
if os.path.exists(self.shared_data.livestatusfile):
|
|
with open(self.shared_data.livestatusfile, 'r+') as livestatus_file:
|
|
livestatus_df = pd.read_csv(livestatus_file)
|
|
livestatus_df.loc[0, 'Vulnerabilities Count'] = self.shared_data.vulnnbr
|
|
livestatus_df.to_csv(self.shared_data.livestatusfile, index=False)
|
|
logger.debug(f"Updated livestatusfile with vulnerability count: {self.shared_data.vulnnbr}")
|
|
else:
|
|
logger.error(f"Livestatusfile {self.shared_data.livestatusfile} does not exist.")
|
|
except Exception as e:
|
|
logger.error(f"An error occurred in update_vuln_count: {e}")
|
|
|
|
|
|
def update_shared_data(self):
|
|
"""Update the shared data with the latest system information."""
|
|
with self.semaphore:
|
|
"""
|
|
Update shared data from CSV files live_status.csv and cracked_passwords.csv.
|
|
"""
|
|
try:
|
|
with open(self.shared_data.livestatusfile, 'r') as file:
|
|
livestatus_df = pd.read_csv(file)
|
|
self.shared_data.portnbr = livestatus_df['Total Open Ports'].iloc[0] # Get the total number of open ports
|
|
self.shared_data.targetnbr = livestatus_df['Alive Hosts Count'].iloc[0] # Get the total number of alive hosts
|
|
self.shared_data.networkkbnbr = livestatus_df['All Known Hosts Count'].iloc[0] # Get the total number of known hosts
|
|
self.shared_data.vulnnbr = livestatus_df['Vulnerabilities Count'].iloc[0] # Get the total number of vulnerable ports
|
|
|
|
crackedpw_files = glob.glob(f"{self.shared_data.crackedpwddir}/*.csv") # Get all CSV files in the cracked password directory
|
|
|
|
total_passwords = 0
|
|
for file in crackedpw_files:
|
|
with open(file, 'r') as f:
|
|
total_passwords += len(pd.read_csv(f, usecols=[0]))
|
|
|
|
self.shared_data.crednbr = total_passwords # Set the total number of cracked passwords to shared data
|
|
|
|
total_data = sum([len(files) for r, d, files in os.walk(self.shared_data.datastolendir)]) # Get the total number of data files in the data store directory
|
|
self.shared_data.datanbr = total_data
|
|
|
|
total_zombies = sum([len(files) for r, d, files in os.walk(self.shared_data.zombiesdir)]) # Get the total number of zombies in the zombies directory
|
|
self.shared_data.zombiesnbr = total_zombies
|
|
total_attacks = sum([len(files) for r, d, files in os.walk(self.shared_data.actions_dir) if not r.endswith("__pycache__")]) - 2
|
|
|
|
self.shared_data.attacksnbr = total_attacks
|
|
|
|
self.shared_data.update_stats()
|
|
# Update Bluetooth, WiFi, and PAN connection status
|
|
self.shared_data.manual_mode = self.is_manual_mode()
|
|
if self.shared_data.manual_mode:
|
|
self.manual_mode_txt = "M"
|
|
else:
|
|
self.manual_mode_txt = "A"
|
|
# # self.shared_data.bluetooth_active = self.is_bluetooth_connected()
|
|
self.shared_data.wifi_connected = self.is_wifi_connected()
|
|
self.shared_data.usb_active = self.is_usb_connected()
|
|
########self.shared_data.pan_connected = self.is_interface_connected('pan0') or self.is_interface_connected('usb0')
|
|
self.get_open_files()
|
|
|
|
except (FileNotFoundError, pd.errors.EmptyDataError) as e:
|
|
logger.error(f"Error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error updating shared data: {e}")
|
|
|
|
def display_comment(self, status):
|
|
""" Display the comment based on the status of the BjornOrch. """
|
|
comment = self.commentaire_ia.get_commentaire(status) # Get the comment from Commentaireia
|
|
if comment:
|
|
self.shared_data.bjornsay = comment # Set the comment to shared data
|
|
self.shared_data.bjornstatustext = self.shared_data.bjornorch_status # Set the status to shared data
|
|
else:
|
|
pass
|
|
|
|
# # # def is_bluetooth_connected(self):
|
|
# # # """
|
|
# # # Check if any device is connected to the Bluetooth (pan0) interface by checking the output of 'ip neigh show dev pan0'.
|
|
# # # """
|
|
# # # try:
|
|
# # # result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', 'pan0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
# # # output, error = result.communicate()
|
|
# # # if result.returncode != 0:
|
|
# # # logger.error(f"Error executing 'ip neigh show dev pan0': {error}")
|
|
# # # return False
|
|
# # # return bool(output.strip())
|
|
# # # except Exception as e:
|
|
# # # logger.error(f"Error checking Bluetooth connection status: {e}")
|
|
# # # return False
|
|
|
|
def is_wifi_connected(self):
|
|
"""
|
|
Check if WiFi is connected by checking the current SSID.
|
|
"""
|
|
try:
|
|
result = subprocess.Popen(['iwgetid', '-r'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
ssid, error = result.communicate()
|
|
if result.returncode != 0:
|
|
logger.error(f"Error executing 'iwgetid -r': {error}")
|
|
return False
|
|
return bool(ssid.strip()) # Return True if connected to a WiFi network
|
|
except Exception as e:
|
|
logger.error(f"Error checking WiFi status: {e}")
|
|
return False
|
|
|
|
def is_manual_mode(self):
|
|
"""Check if the BjornOrch is in manual mode."""
|
|
return self.shared_data.manual_mode
|
|
|
|
def is_interface_connected(self, interface):
|
|
"""
|
|
Check if any device is connected to the specified interface (pan0 or usb0)
|
|
by checking the output of 'ip neigh show dev <interface>'.
|
|
"""
|
|
try:
|
|
result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', interface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
output, error = result.communicate()
|
|
if result.returncode != 0:
|
|
logger.error(f"Error executing 'ip neigh show dev {interface}': {error}")
|
|
return False
|
|
return bool(output.strip())
|
|
except Exception as e:
|
|
logger.error(f"Error checking connection status on {interface}: {e}")
|
|
return False
|
|
|
|
def is_usb_connected(self):
|
|
"""
|
|
Check if any device is connected to the USB (usb0) interface by checking the output of 'ip neigh show dev usb0'.
|
|
"""
|
|
try:
|
|
result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', 'usb0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
output, error = result.communicate()
|
|
if result.returncode != 0:
|
|
logger.error(f"Error executing 'ip neigh show dev usb0': {error}")
|
|
return False
|
|
return bool(output.strip())
|
|
except Exception as e:
|
|
logger.error(f"Error checking USB connection status: {e}")
|
|
return False
|
|
|
|
def run(self):
|
|
"""
|
|
Main loop for updating the EPD display with shared data.
|
|
"""
|
|
self.manual_mode_txt = ""
|
|
while not self.shared_data.display_should_exit:
|
|
try:
|
|
self.epd_helper.init_partial_update()
|
|
self.display_comment(self.shared_data.bjornorch_status) # Display the comment
|
|
image = Image.new('1', (self.shared_data.width, self.shared_data.height))
|
|
draw = ImageDraw.Draw(image)
|
|
draw.rectangle((0, 0, self.shared_data.width, self.shared_data.height), fill=255)
|
|
draw.text((int(37 * self.scale_factor_x), int(5 * self.scale_factor_y)), "BJORN", font=self.shared_data.font_viking, fill=0)
|
|
draw.text((int(110 * self.scale_factor_x), int(170 * self.scale_factor_y)), self.manual_mode_txt, font=self.shared_data.font_arial14, fill=0)
|
|
if self.shared_data.wifi_connected:
|
|
image.paste(self.shared_data.wifi, (int(3 * self.scale_factor_x), int(3 * self.scale_factor_y)))
|
|
# # # if self.shared_data.bluetooth_active:
|
|
# # # image.paste(self.shared_data.bluetooth, (int(23 * self.scale_factor_x), int(4 * self.scale_factor_y)))
|
|
if self.shared_data.pan_connected:
|
|
image.paste(self.shared_data.connected, (int(104 * self.scale_factor_x), int(3 * self.scale_factor_y)))
|
|
if self.shared_data.usb_active:
|
|
image.paste(self.shared_data.usb, (int(90 * self.scale_factor_x), int(4 * self.scale_factor_y)))
|
|
|
|
stats = [
|
|
(self.shared_data.target, (int(8 * self.scale_factor_x), int(22 * self.scale_factor_y)), (int(28 * self.scale_factor_x), int(22 * self.scale_factor_y)), str(self.shared_data.targetnbr)),
|
|
(self.shared_data.port, (int(47 * self.scale_factor_x), int(22 * self.scale_factor_y)), (int(67 * self.scale_factor_x), int(22 * self.scale_factor_y)), str(self.shared_data.portnbr)),
|
|
(self.shared_data.vuln, (int(86 * self.scale_factor_x), int(22 * self.scale_factor_y)), (int(106 * self.scale_factor_x), int(22 * self.scale_factor_y)), str(self.shared_data.vulnnbr)),
|
|
(self.shared_data.cred, (int(8 * self.scale_factor_x), int(41 * self.scale_factor_y)), (int(28 * self.scale_factor_x), int(41 * self.scale_factor_y)), str(self.shared_data.crednbr)),
|
|
(self.shared_data.money, (int(3 * self.scale_factor_x), int(172 * self.scale_factor_y)), (int(3 * self.scale_factor_x), int(192 * self.scale_factor_y)), str(self.shared_data.coinnbr)),
|
|
(self.shared_data.level, (int(2 * self.scale_factor_x), int(217 * self.scale_factor_y)), (int(4 * self.scale_factor_x), int(237 * self.scale_factor_y)), str(self.shared_data.levelnbr)),
|
|
(self.shared_data.zombie, (int(47 * self.scale_factor_x), int(41 * self.scale_factor_y)), (int(67 * self.scale_factor_x), int(41 * self.scale_factor_y)), str(self.shared_data.zombiesnbr)),
|
|
(self.shared_data.networkkb, (int(102 * self.scale_factor_x), int(190 * self.scale_factor_y)), (int(102 * self.scale_factor_x), int(208 * self.scale_factor_y)), str(self.shared_data.networkkbnbr)),
|
|
(self.shared_data.data, (int(86 * self.scale_factor_x), int(41 * self.scale_factor_y)), (int(106 * self.scale_factor_x), int(41 * self.scale_factor_y)), str(self.shared_data.datanbr)),
|
|
(self.shared_data.attacks, (int(100 * self.scale_factor_x), int(218 * self.scale_factor_y)), (int(102 * self.scale_factor_x), int(237 * self.scale_factor_y)), str(self.shared_data.attacksnbr)),
|
|
]
|
|
|
|
for img, img_pos, text_pos, text in stats:
|
|
image.paste(img, img_pos)
|
|
draw.text(text_pos, text, font=self.shared_data.font_arial9, fill=0)
|
|
|
|
self.shared_data.update_bjornstatus()
|
|
image.paste(self.shared_data.bjornstatusimage, (int(3 * self.scale_factor_x), int(60 * self.scale_factor_y)))
|
|
draw.text((int(35 * self.scale_factor_x), int(65 * self.scale_factor_y)), self.shared_data.bjornstatustext, font=self.shared_data.font_arial9, fill=0)
|
|
draw.text((int(35 * self.scale_factor_x), int(75 * self.scale_factor_y)), self.shared_data.bjornstatustext2, font=self.shared_data.font_arial9, fill=0)
|
|
|
|
image.paste(self.shared_data.frise, (int(0 * self.scale_factor_x), int(160 * self.scale_factor_y)))
|
|
|
|
draw.rectangle((1, 1, self.shared_data.width - 1, self.shared_data.height - 1), outline=0)
|
|
draw.line((1, 20, self.shared_data.width - 1, 20), fill=0)
|
|
draw.line((1, 59, self.shared_data.width - 1, 59), fill=0)
|
|
draw.line((1, 87, self.shared_data.width - 1, 87), fill=0)
|
|
|
|
lines = self.shared_data.wrap_text(self.shared_data.bjornsay, self.shared_data.font_arialbold, self.shared_data.width - 4)
|
|
y_text = int(90 * self.scale_factor_y)
|
|
|
|
if self.main_image is not None:
|
|
image.paste(self.main_image, (self.shared_data.x_center1, self.shared_data.y_bottom1))
|
|
else:
|
|
logger.error("Main image not found in shared_data.")
|
|
|
|
for line in lines:
|
|
draw.text((int(4 * self.scale_factor_x), y_text), line, font=self.shared_data.font_arialbold, fill=0) # Display the comment
|
|
y_text += (self.shared_data.font_arialbold.getbbox(line)[3] - self.shared_data.font_arialbold.getbbox(line)[1]) + 3 # Calculate the height of the text depending on the font size, 3 means the space between lines
|
|
if self.screen_reversed:
|
|
image = image.transpose(Image.ROTATE_180)
|
|
|
|
|
|
self.epd_helper.display_partial(image)
|
|
self.epd_helper.display_partial(image)
|
|
|
|
if self.web_screen_reversed:
|
|
image = image.transpose(Image.ROTATE_180)
|
|
with open(os.path.join(self.shared_data.webdir, "screen.png"), 'wb') as img_file:
|
|
image.save(img_file)
|
|
img_file.flush()
|
|
os.fsync(img_file.fileno())
|
|
|
|
time.sleep(self.shared_data.screen_delay)
|
|
except Exception as e:
|
|
logger.error(f"An exception occurred: {e}")
|
|
|
|
def handle_exit_display(signum, frame, display_thread):
|
|
"""Handle the exit signal and close the display."""
|
|
global should_exit
|
|
shared_data.display_should_exit = True
|
|
logger.info("Exit signal received. Waiting for the main loop to finish...")
|
|
try:
|
|
if main_loop and main_loop.epd:
|
|
main_loop.epd.init(main_loop.epd.sleep)
|
|
main_loop.epd.Dev_exit()
|
|
except Exception as e:
|
|
logger.error(f"Error while closing the display: {e}")
|
|
display_thread.join()
|
|
logger.info("Main loop finished. Clean exit.")
|
|
sys.exit(0) # Used sys.exit(0) instead of exit(0)
|
|
|
|
# Declare main_loop globally
|
|
main_loop = None
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
logger.info("Starting main loop...")
|
|
main_loop = Display(shared_data)
|
|
display_thread = threading.Thread(target=main_loop.run)
|
|
display_thread.start()
|
|
logger.info("Main loop started.")
|
|
|
|
signal.signal(signal.SIGINT, lambda signum, frame: handle_exit_display(signum, frame, display_thread))
|
|
signal.signal(signal.SIGTERM, lambda signum, frame: handle_exit_display(signum, frame, display_thread))
|
|
except Exception as e:
|
|
logger.error(f"An exception occurred during program execution: {e}")
|
|
handle_exit_display(signal.SIGINT, None, display_thread)
|
|
sys.exit(1) # Used sys.exit(1) instead of exit(1)
|