mirror of
https://github.com/Burningstone91/smart-home-setup.git
synced 2022-05-05 21:16:50 +03:00
Remove unneded AppDaemon apps
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -31,5 +31,6 @@ portainer/
|
||||
home-assistant-db/
|
||||
esphome/
|
||||
swag/
|
||||
nextcloud/
|
||||
check_docker_latest.sh
|
||||
check_docker_list.txt
|
||||
@@ -1,90 +0,0 @@
|
||||
"""Define automations for areas."""
|
||||
import voluptuous as vol
|
||||
|
||||
from appbase import AppBase, APP_SCHEMA
|
||||
from utils import config_validation as cv
|
||||
|
||||
|
||||
class Area(AppBase):
|
||||
"""Representation of an Area."""
|
||||
|
||||
APP_SCHEMA = APP_SCHEMA.extend(
|
||||
{
|
||||
vol.Required("area"): str,
|
||||
vol.Optional("attributes"): vol.Schema(
|
||||
{vol.Optional("friendly_name"): str}
|
||||
),
|
||||
vol.Optional("occupancy"): vol.Schema({vol.Optional(cv.entity_id): str}),
|
||||
}
|
||||
)
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure an area."""
|
||||
areas = self.adbase.get_state("area")
|
||||
area = self.args["area"]
|
||||
area_id = area.lower().replace(" ", "_")
|
||||
attributes = self.args["attributes"]
|
||||
self.occupancy_entities = self.args.get("occupancy_entities")
|
||||
self.area_entity = f"area.{area_id}"
|
||||
|
||||
# Create an entity for the area if it doesn't already exist
|
||||
if self.area_entity not in areas.keys():
|
||||
if "friendly_name" not in attributes:
|
||||
attributes.update({"friendly_name": area.title()})
|
||||
|
||||
attributes.update(
|
||||
{"id": area_id, "persons": [], "occupied": None, "motion": None}
|
||||
)
|
||||
|
||||
self.adbase.set_state(self.area_entity, state="idle", attributes=attributes)
|
||||
|
||||
# Listen for no changes in area state for 30 seconds
|
||||
self.adbase.listen_state(self.on_state_change, self.area_entity, duration=30)
|
||||
|
||||
# Listen for changes in occupancy entities of area
|
||||
if "occupancy_entities" in self.args:
|
||||
for entity in self.occupancy_entities.keys():
|
||||
self.hass.listen_state(self.on_occupancy_change, entity)
|
||||
|
||||
# Listen for changes in persons of area
|
||||
self.adbase.listen_state(
|
||||
self.on_occupancy_change, self.area_entity, attribute="persons"
|
||||
)
|
||||
# Listen for changes in motion of area
|
||||
self.adbase.listen_state(
|
||||
self.on_occupancy_change, self.area_entity, attribute="motion"
|
||||
)
|
||||
|
||||
def on_state_change(
|
||||
self, entity: str, attribute: dict, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when area doesn't change state for 30s."""
|
||||
# Set area to idle
|
||||
self.adbase.set_state(entity, state="idle")
|
||||
|
||||
def on_occupancy_change(
|
||||
self, entity: str, attribute: dict, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when occupancy factor changes."""
|
||||
old = self.adbase.get_state(self.area_entity, attribute="occupied")
|
||||
occupied = self.is_occupied(self.area_entity)
|
||||
# Set occupancy of area
|
||||
if old != occupied:
|
||||
self.adbase.set_state(self.area_entity, occupied=occupied)
|
||||
self.adbase.log(f"{entity.split('.')[1].capitalize()} Occupied: {occupied}")
|
||||
|
||||
def is_occupied(self, area: str) -> bool:
|
||||
"""Return True if area is occupied."""
|
||||
# Check if motion in area
|
||||
motion = self.adbase.get_state(area, attribute="motion")
|
||||
# Check if persons in area
|
||||
# persons = len(area_attr["persons"]) > 0
|
||||
persons = False
|
||||
# Check if occupancy devices are "on"
|
||||
devices_on = False
|
||||
if self.occupancy_entities:
|
||||
devices_on = any(
|
||||
self.hass.get_state(entity) == state
|
||||
for entity, state in self.occupancy_entities.items()
|
||||
)
|
||||
return persons or motion or devices_on
|
||||
@@ -1,11 +1,3 @@
|
||||
# bedroom:
|
||||
# module: area
|
||||
# class: Area
|
||||
# area: bedroom
|
||||
# attributes:
|
||||
# friendly_name: Schlafzimmer
|
||||
# priority: 1
|
||||
|
||||
bedroom_light_switch:
|
||||
module: switches
|
||||
class: HueDimmerSwitch
|
||||
@@ -30,13 +22,9 @@ bedroom_sabrina_switch:
|
||||
entity_id:
|
||||
- light.bedroom_ceiling
|
||||
short_press_turn_off:
|
||||
service: homeassistant.turn_off
|
||||
service: light.turn_off
|
||||
entity_id:
|
||||
- switch.dehumidifier_dressroom
|
||||
- light.bedroom
|
||||
- light.livingroom
|
||||
- light.dressroom
|
||||
- light.office
|
||||
- light.all_lights
|
||||
|
||||
bedroom_dimitri_switch:
|
||||
module: switches
|
||||
@@ -52,10 +40,6 @@ bedroom_dimitri_switch:
|
||||
entity_id:
|
||||
- light.bedroom_ceiling
|
||||
short_press_turn_off:
|
||||
service: homeassistant.turn_off
|
||||
service: light.turn_off
|
||||
entity_id:
|
||||
- switch.dehumidifier_dressroom
|
||||
- light.bedroom
|
||||
- light.livingroom
|
||||
- light.dressroom
|
||||
- light.office
|
||||
- light.all_lights
|
||||
@@ -1,13 +1,11 @@
|
||||
# dressroom:
|
||||
# module: area
|
||||
# class: Area
|
||||
# area: dressroom
|
||||
# attributes:
|
||||
# friendly_name: Ankleidezimmer
|
||||
# priority: 1
|
||||
|
||||
dressroom_light_switch:
|
||||
module: switches
|
||||
class: HueDimmerSwitch
|
||||
switch_id: remote_light_dressroom
|
||||
lights: light.dressroom
|
||||
custom_button_config:
|
||||
long_press_turn_off:
|
||||
service: homeassistant.turn_off
|
||||
entity_id:
|
||||
- light.dressroom
|
||||
- switch.dehumidifier_dressroom
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
"""Define automations for the house."""
|
||||
import voluptuous as vol
|
||||
|
||||
from appbase import AppBase, APP_SCHEMA
|
||||
|
||||
|
||||
class House(AppBase):
|
||||
"""Representation of a House."""
|
||||
|
||||
APP_SCHEMA = APP_SCHEMA.extend(
|
||||
{
|
||||
vol.Required("id"): str,
|
||||
vol.Optional("attributes"): vol.Schema(
|
||||
{
|
||||
vol.Optional("friendly_name"): str,
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure an area."""
|
||||
houses = self.adbase.get_state("house")
|
||||
house_id = self.args["id"]
|
||||
attributes = self.args.get("attributes")
|
||||
entity_id = f"house.{house_id}"
|
||||
|
||||
# Create an entity for the house if it doesn't already exist
|
||||
if entity_id not in houses.keys():
|
||||
if "friendly_name" not in attributes:
|
||||
attributes.update({"friendly_name": house_id.title()})
|
||||
|
||||
attributes.update(
|
||||
{
|
||||
"id": house_id,
|
||||
"persons": [],
|
||||
"occupied": None,
|
||||
"presence_state": None,
|
||||
"sleep_state": None
|
||||
}
|
||||
)
|
||||
|
||||
self.adbase.set_state(entity_id, state="idle", attributes=attributes)
|
||||
|
||||
# Listen for no changes in house state for 30 seconds
|
||||
self.adbase.listen_state(
|
||||
self.state_changed, entity_id, duration=30
|
||||
)
|
||||
|
||||
def state_changed(
|
||||
self, entity: str, attribute: dict, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when house doesn't change state for 30s."""
|
||||
# Set area to idle
|
||||
self.adbase.set_state(entity, state="idle")
|
||||
@@ -1,8 +0,0 @@
|
||||
house:
|
||||
module: house
|
||||
class: House
|
||||
id: home
|
||||
attributes:
|
||||
friendly_name: Wohnung
|
||||
priority: 1
|
||||
|
||||
@@ -1,13 +1,3 @@
|
||||
# livingroom:
|
||||
# module: area
|
||||
# class: Area
|
||||
# area: livingroom
|
||||
# attributes:
|
||||
# friendly_name: Wohnzimmer
|
||||
# occupancy_entities:
|
||||
# media_player.receiver_livingroom: "on"
|
||||
# priority: 1
|
||||
|
||||
livingroom_light_switch:
|
||||
module: switches
|
||||
class: HueDimmerSwitch
|
||||
|
||||
141
appdaemon/apps/notification.py
Executable file
141
appdaemon/apps/notification.py
Executable file
@@ -0,0 +1,141 @@
|
||||
"""Define automations for notifications."""
|
||||
from typing import Union, Callable
|
||||
|
||||
from appbase import AppBase
|
||||
|
||||
|
||||
class Notifications(AppBase):
|
||||
"""Define a class for Notification handling."""
|
||||
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
class Notification:
|
||||
"""Define a notification object."""
|
||||
|
||||
def __init__(self, channel, message, targets, title=None, **kwargs):
|
||||
"""Initialize."""
|
||||
self.channel = channel
|
||||
self.message = message
|
||||
self.title = title
|
||||
self.targets = targets
|
||||
self.repeat = kwargs.get("repeat")
|
||||
self.interval = kwargs.get("interval")
|
||||
self.data = kwargs.get("data")
|
||||
if self.data is None:
|
||||
self.data = {}
|
||||
self.cancel = None
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure."""
|
||||
self.briefing = []
|
||||
persons = self.adbase.get_state("person")
|
||||
for person in persons:
|
||||
# Listen for person arriving at home
|
||||
self.adbase.listen_state(
|
||||
self.send_briefing,
|
||||
person,
|
||||
attribute="non_binary_presence",
|
||||
new="just_arrived",
|
||||
)
|
||||
# Listen for person waking up
|
||||
self.adbase.listen_state(
|
||||
self.send_briefing, person, attribute="sleep_state", new="awake"
|
||||
)
|
||||
|
||||
def send_briefing(
|
||||
self, entity: Union[str, dict], attribute: str, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Send all notifications in the briefing list."""
|
||||
for notification in self.briefing:
|
||||
notification.cancel(delete=True)
|
||||
self.send_notification(notification)
|
||||
|
||||
def notify(
|
||||
self,
|
||||
channel: str,
|
||||
message: str,
|
||||
targets: Union[str, list],
|
||||
title: str = None,
|
||||
repeat: bool = False,
|
||||
interval: Union[int, None] = None,
|
||||
data: Union[dict, None] = None,
|
||||
) -> Callable:
|
||||
"""Return an object to send a notification."""
|
||||
return self.send_notification(
|
||||
self.Notification(
|
||||
channel,
|
||||
message,
|
||||
targets=targets,
|
||||
title=title,
|
||||
repeat=repeat,
|
||||
interval=interval,
|
||||
data=data,
|
||||
)
|
||||
)
|
||||
|
||||
def send_notification(self, notification: Notification) -> Callable:
|
||||
"""Send single or repeating notification and
|
||||
return a method to cancel notification"""
|
||||
if not notification.repeat:
|
||||
handle = self.adbase.run_in(self.send, 0, notification=notification)
|
||||
else:
|
||||
handle = self.adbase.run_every(
|
||||
self.send, "now", notification.interval, notification=notification
|
||||
)
|
||||
|
||||
def cancel(delete: bool = False) -> None:
|
||||
"""Define a method to cancel the notification."""
|
||||
self.adbase.cancel_timer(handle)
|
||||
if delete:
|
||||
self.briefing.remove(notification)
|
||||
|
||||
notification.cancel = cancel
|
||||
self.briefing.append(notification)
|
||||
|
||||
return cancel
|
||||
|
||||
def send(self, kwargs: dict) -> None:
|
||||
"""Send a notification."""
|
||||
notification = kwargs["notification"]
|
||||
targets = self.get_targets(notification.targets, notification.channel)
|
||||
targets_flat = [target for notifiers in targets for target in notifiers]
|
||||
|
||||
for target in targets_flat:
|
||||
self.hass.call_service(
|
||||
f"notify/{target}",
|
||||
title=notification.title,
|
||||
message=notification.message,
|
||||
data=notification.data,
|
||||
)
|
||||
self.adbase.log(
|
||||
f"Nachricht: "
|
||||
f"{notification.title if notification.title else notification.message}"
|
||||
f" an {target}"
|
||||
)
|
||||
|
||||
if targets and not notification.repeat and notification in self.briefing:
|
||||
self.briefing.remove(notification)
|
||||
|
||||
if not targets and notification.repeat:
|
||||
notification.cancel()
|
||||
|
||||
def get_targets(self, targets: Union[str, list], channel: str) -> Union[str, list]:
|
||||
"""Return available targets."""
|
||||
if channel == "emergency":
|
||||
return [
|
||||
self.adbase.get_state(f"person.{target}", attribute="notifiers")
|
||||
for target in targets
|
||||
]
|
||||
return [
|
||||
self.adbase.get_state(f"person.{target}", attribute="notifiers")
|
||||
for target in targets
|
||||
if self.target_available(target)
|
||||
]
|
||||
|
||||
def target_available(self, target: str) -> bool:
|
||||
"""Return True if target is available."""
|
||||
sleep_state = self.adbase.get_state(f"person.{target}", attribute="sleep_state")
|
||||
# Remove after full implementation of bed sensors
|
||||
sleep_state = "awake"
|
||||
home = self.adbase.get_state(f"person.{target}", attribute="home")
|
||||
return home and sleep_state == "awake"
|
||||
3
appdaemon/apps/notification.yaml
Executable file
3
appdaemon/apps/notification.yaml
Executable file
@@ -0,0 +1,3 @@
|
||||
notification_manager:
|
||||
class: Notifications
|
||||
module: notification
|
||||
@@ -1,16 +1,3 @@
|
||||
# office:
|
||||
# module: area
|
||||
# class: Area
|
||||
# area: office
|
||||
# attributes:
|
||||
# friendly_name: Büro
|
||||
# occupancy_entities:
|
||||
# device_tracker.desktop_dimitri: "home"
|
||||
# device_tracker.desktop_sabrina: "home"
|
||||
# device_tracker.laptop_work: "home"
|
||||
# media_player.receiver_office: "on"
|
||||
# priority: 1
|
||||
|
||||
office_light_switch:
|
||||
module: switches
|
||||
class: HueDimmerSwitch
|
||||
|
||||
@@ -1,229 +0,0 @@
|
||||
"""Define an automation for updating a device tracker from the state of a sensor."""
|
||||
import voluptuous as vol
|
||||
|
||||
from appbase import AppBase, APP_SCHEMA
|
||||
from utils import config_validation as cv
|
||||
|
||||
|
||||
class RoomPresence(AppBase):
|
||||
"""Define a base class for room presence."""
|
||||
|
||||
APP_SCHEMA = APP_SCHEMA.extend(
|
||||
{vol.Required("sensors"): vol.Schema({vol.Optional(str): cv.entity_id})}
|
||||
)
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure."""
|
||||
room_presence_sensors = self.args["sensors"]
|
||||
|
||||
for person, sensor in room_presence_sensors.items():
|
||||
# Listen for person changing area
|
||||
self.hass.listen_state(
|
||||
self.on_sensor_change, sensor, duration=5, person_id=person
|
||||
)
|
||||
|
||||
def on_sensor_change(
|
||||
self, entity: str, attribute: str, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when room presence sensor changes state."""
|
||||
person_id = kwargs["person_id"]
|
||||
person_entity = f"person.{person_id}"
|
||||
old_state = self.adbase.get_state(person_entity, attribute="area")
|
||||
if new != old_state:
|
||||
areas = self.adbase.get_state("area")
|
||||
area_entity = f"area.{new}"
|
||||
|
||||
# Remove person from other areas
|
||||
for area in areas.keys():
|
||||
if area != area_entity:
|
||||
persons = self.adbase.get_state(area, attribute="persons")
|
||||
if person_id in persons:
|
||||
persons.remove(person_id)
|
||||
self.adbase.set_state(area, persons=persons)
|
||||
|
||||
# Add person to new area
|
||||
if new != "not_home":
|
||||
persons = self.adbase.get_state(area_entity, attribute="persons")
|
||||
if person_id not in persons:
|
||||
persons.append(person_id)
|
||||
self.adbase.set_state(area_entity, persons=persons)
|
||||
|
||||
# Set area for person
|
||||
self.adbase.set_state(person_entity, area=new)
|
||||
self.adbase.log(f"{person_id.capitalize()} Area: {new}")
|
||||
|
||||
|
||||
class PersonPresence(AppBase):
|
||||
"""Define a base class for binary person presence."""
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure."""
|
||||
persons = self.adbase.get_state("person")
|
||||
|
||||
for person in persons.keys():
|
||||
# Listen for person entering the house
|
||||
self.adbase.listen_state(
|
||||
self.on_person_arrival,
|
||||
person,
|
||||
attribute="area",
|
||||
)
|
||||
|
||||
# area 3 minutes "not_home" -> person left
|
||||
self.adbase.listen_state(
|
||||
self.on_person_leave,
|
||||
person,
|
||||
attribute="area",
|
||||
new="not_home",
|
||||
duration=3 * 60
|
||||
)
|
||||
|
||||
def on_person_arrival(
|
||||
self, entity: str, attribute: str, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when person enters house."""
|
||||
# Set person to "home"
|
||||
not_home_states = ["not_home", "undefined", "unknown", None]
|
||||
if new != old and old in not_home_states and new not in not_home_states:
|
||||
self.adbase.set_state(entity, home=True)
|
||||
self.adbase.log(f"{entity.split('.')[1].capitalize()}: home")
|
||||
|
||||
def on_person_leave(
|
||||
self, entity: str, attribute: str, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when person left house for 3 minutes."""
|
||||
# Set person to "not home"
|
||||
self.adbase.set_state(entity, home=False)
|
||||
self.adbase.log(f"{entity.split('.')[1].capitalize()}: not home")
|
||||
|
||||
class NonBinaryPresence(AppBase):
|
||||
"""Define a base class for non binary person presence."""
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure."""
|
||||
persons = self.adbase.get_state("person")
|
||||
|
||||
for person in persons.keys():
|
||||
# away/extended away -> just arrived
|
||||
self.adbase.listen_state(
|
||||
self.on_presence_change,
|
||||
person,
|
||||
attribute="home",
|
||||
new=1,
|
||||
non_binary_state="just_arrived",
|
||||
)
|
||||
|
||||
# home -> just left
|
||||
self.adbase.listen_state(
|
||||
self.on_presence_change,
|
||||
person,
|
||||
attribute="home",
|
||||
new=0,
|
||||
non_binary_state="just_left",
|
||||
)
|
||||
|
||||
# just arrived -> home, after 5 min
|
||||
self.adbase.listen_state(
|
||||
self.on_presence_change,
|
||||
person,
|
||||
attribute="non_binary_presence",
|
||||
new="just_arrived",
|
||||
duration=5 * 60,
|
||||
non_binary_state="home",
|
||||
)
|
||||
|
||||
# just left -> away, after 5 min
|
||||
self.adbase.listen_state(
|
||||
self.on_presence_change,
|
||||
person,
|
||||
attribute="non_binary_presence",
|
||||
new="just_left",
|
||||
duration=5 * 60,
|
||||
non_binary_state="away",
|
||||
)
|
||||
|
||||
# away -> extended away, after 24 hours
|
||||
self.adbase.listen_state(
|
||||
self.on_presence_change,
|
||||
person,
|
||||
attribute="non_binary_presence",
|
||||
new="away",
|
||||
duration=24 * 60 * 60,
|
||||
non_binary_state="extended_away",
|
||||
)
|
||||
|
||||
def on_presence_change(
|
||||
self, entity: str, attribute: str, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when person changes presence state."""
|
||||
# just left -> just arrived = home
|
||||
old_state = self.adbase.get_state(entity, attribute="sleep_state")
|
||||
non_binary_state = kwargs["non_binary_state"]
|
||||
|
||||
if old_state == "just_left" and non_binary_state == "just_arrived":
|
||||
non_binary_state = "home"
|
||||
|
||||
# Set non binary presence state for person
|
||||
self.adbase.set_state(entity, non_binary_presence=non_binary_state)
|
||||
self.adbase.log(
|
||||
f"{entity.split('.')[1].capitalize()}: {non_binary_state.replace('_',' ')}"
|
||||
)
|
||||
|
||||
|
||||
class HousePresence(AppBase):
|
||||
"""Define a base class for house presence."""
|
||||
|
||||
APP_SCHEMA = APP_SCHEMA.extend({vol.Required("house_id"): str})
|
||||
|
||||
def configure(self) -> None:
|
||||
"""Configure."""
|
||||
house_id = self.args["house_id"]
|
||||
self.house_entity_id = f"house.{house_id}"
|
||||
persons = self.adbase.get_state("person")
|
||||
|
||||
# Listen for person changing home state
|
||||
for person in persons.keys():
|
||||
self.adbase.listen_state(self.on_presence_change, person, attribute="home")
|
||||
|
||||
def on_presence_change(
|
||||
self, entity: str, attribute: str, old: str, new: str, kwargs: dict
|
||||
) -> None:
|
||||
"""Respond when person changes presence state."""
|
||||
person_id = entity.split(".")[1]
|
||||
persons = self.adbase.get_state("person")
|
||||
|
||||
persons_home = self.adbase.get_state(self.house_entity_id, attribute="persons")
|
||||
persons_extended_away = [
|
||||
person
|
||||
for person, attributes in persons.items()
|
||||
if attributes["attributes"]["non_binary_presence"] == "extended_away"
|
||||
]
|
||||
|
||||
# Add/remove person from the house
|
||||
if new and person_id not in persons_home:
|
||||
persons_home.append(person_id)
|
||||
elif person_id in persons_home:
|
||||
persons_home.remove(person_id)
|
||||
|
||||
# Set occupancy of the house
|
||||
if not persons_home:
|
||||
occupied = False
|
||||
else:
|
||||
occupied = True
|
||||
|
||||
# Set presence state of the house
|
||||
if len(persons.keys()) == len(persons_home):
|
||||
presence_state = "everyone_home"
|
||||
elif len(persons.keys()) == len(persons_extended_away):
|
||||
presence_state = "vacation"
|
||||
elif not persons_home:
|
||||
presence_state = "nobody_home"
|
||||
else:
|
||||
presence_state = "someone_home"
|
||||
|
||||
self.adbase.set_state(
|
||||
self.house_entity_id,
|
||||
presence_state=presence_state,
|
||||
occupied=occupied,
|
||||
persons=persons_home,
|
||||
)
|
||||
self.adbase.log(f"House Presence: {presence_state.replace('_',' ')}")
|
||||
@@ -1,19 +0,0 @@
|
||||
room_presence_app:
|
||||
module: presence
|
||||
class: RoomPresence
|
||||
sensors:
|
||||
dimitri: sensor.dimitri_room_presence
|
||||
sabrina: sensor.sabrina_room_presence
|
||||
|
||||
person_presence_app:
|
||||
module: presence
|
||||
class: PersonPresence
|
||||
|
||||
non_binary_presence_app:
|
||||
module: presence
|
||||
class: NonBinaryPresence
|
||||
|
||||
house_presence_app:
|
||||
module: presence
|
||||
class: HousePresence
|
||||
house_id: home
|
||||
64
appdaemon/apps/system-monitoring.py
Executable file
64
appdaemon/apps/system-monitoring.py
Executable file
@@ -0,0 +1,64 @@
|
||||
"""Define automations for system monitoring."""
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import datetime
|
||||
from urllib.request import urlopen
|
||||
from packaging import version
|
||||
|
||||
from appbase import AppBase
|
||||
|
||||
# class NotifyAppDaemonError(AppBase):
|
||||
# """Define a base class for notification on AppDaemon errors."""
|
||||
|
||||
# APP_SCHEMA = APP_SCHEMA.extend({vol.Required("targets"): cv.ensure_list})
|
||||
|
||||
# def configure(self) -> None:
|
||||
# """Configure."""
|
||||
# # Listen for errors in the appdaemon log files
|
||||
# self.targets = self.args["targets"]
|
||||
# self.adbase.listen_log(self.on_log_error, log="error_log")
|
||||
|
||||
# def on_log_error(
|
||||
# self, name: str, ts: str, level: str, type: str, message: str, kwargs: dict
|
||||
# ) -> None:
|
||||
# self.adbase.log(f"Send notification about AppDaemon Error to {self.targets}.")
|
||||
# """Notify on Error."""
|
||||
# self.notification_manager.notify(
|
||||
# channel="smart_home",
|
||||
# message=f"Es gab einen Fehler in AppDaemon!",
|
||||
# title=f"AppDaemon Fehler",
|
||||
# targets=self.targets,
|
||||
# )
|
||||
|
||||
|
||||
class LatestConBeeFirmware(AppBase):
|
||||
"""Define a base class for getting the latest ConBee firmeware version."""
|
||||
|
||||
def configure(self) -> None:
|
||||
self.url = "http://deconz.dresden-elektronik.de/deconz-firmware/"
|
||||
self.adbase.run_every(self.update_latest_version_sensor, "now", 60 * 60)
|
||||
|
||||
def update_latest_version_sensor(self, *args) -> None:
|
||||
urls = self.get_url_paths(self.url)
|
||||
urls_filtered = [url for url in urls if "ConBeeII" in url]
|
||||
urls_with_dates = {url: self.get_last_modified_date(url) for url in urls_filtered}
|
||||
latest_url = max(urls_with_dates, key=urls_with_dates.get)
|
||||
version = latest_url.split("ConBeeII_")[1].strip(".bin.GCF")
|
||||
|
||||
self.hass.set_state(
|
||||
"sensor.latest_firmware_conbee",
|
||||
state=version,
|
||||
attributes={"friendly_name": "Firmware ConBee II"}
|
||||
)
|
||||
|
||||
def get_url_paths(self, url):
|
||||
response = requests.get(url)
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
parent = [url + node.get('href') for node in soup.find_all('a') if node.get('href').endswith("GCF")]
|
||||
return parent
|
||||
|
||||
def get_last_modified_date(self, url):
|
||||
u = urlopen(url)
|
||||
headers = dict(u.getheaders())
|
||||
last_modified = headers['Last-Modified']
|
||||
return datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z")
|
||||
9
appdaemon/apps/system-monitoring.yaml
Executable file
9
appdaemon/apps/system-monitoring.yaml
Executable file
@@ -0,0 +1,9 @@
|
||||
# appdaemon_error:
|
||||
# class: NotifyAppDaemonError
|
||||
# module: system-monitoring
|
||||
# dependencies: notification_manager
|
||||
# targets: dimitri
|
||||
|
||||
latest_conbee_firmware:
|
||||
class: LatestConBeeFirmware
|
||||
module: system-monitoring
|
||||
@@ -1,2 +1,3 @@
|
||||
voluptuous==0.11.7
|
||||
geopy
|
||||
packaging
|
||||
bs4
|
||||
184
scratch
184
scratch
@@ -1,184 +0,0 @@
|
||||
|
||||
######################nonbinary state and house presence state app, use sensors with set state instead of input select
|
||||
First some preparations in Home Assistant:
|
||||
Create a file called "modes.yaml" inside the packages directory in the Home Assistant configuration.
|
||||
|
||||
In the modes.yaml file add the following:
|
||||
|
||||
```yaml
|
||||
input_boolean:
|
||||
# Input booleans to represent state of modes
|
||||
mode_guest:
|
||||
name: Gastmodus
|
||||
icon: mdi:account-group
|
||||
mode_cleaning:
|
||||
name: Reinigungsmodus
|
||||
icon: mdi:robot-vacuum
|
||||
mode_sleep:
|
||||
name: Schlafmodus
|
||||
icon: mdi:sleep
|
||||
```
|
||||
This creates three input booleans which represent the state of the respective mode. I use guest mode, sleep mode and cleaning mode but you can choose whatever modes you want for your system.
|
||||
|
||||
#######################3SRG METEO
|
||||
import base64
|
||||
import json
|
||||
import requests
|
||||
|
||||
oauth_url = 'https://api.srgssr.ch/oauth/v1/accesstoken?grant_type=client_credentials'
|
||||
username = '5kGwbf52a8LYxQBwUI5VM8qAKExIULYJ'
|
||||
password = 'gGf7gRkNjA1iwd5x'
|
||||
|
||||
|
||||
|
||||
credentials = (username + ':' + password).encode('utf-8')
|
||||
base64_encoded_credentials = base64.b64encode(credentials).decode('utf-8')
|
||||
|
||||
headers = {
|
||||
'Authorization': 'Basic ' + base64_encoded_credentials,
|
||||
'Content': '0',
|
||||
'Cache-Control': 'no-cache'
|
||||
}
|
||||
|
||||
response = requests.get(oauth_url, headers=headers)
|
||||
json_response = json.loads(response.text)
|
||||
token = json_response['access_token']
|
||||
|
||||
|
||||
|
||||
base_url = 'https://api.srgssr.ch/forecasts/v1.0/weather'
|
||||
latitude = 47.36667
|
||||
longitude = 8.5
|
||||
|
||||
headers = {
|
||||
'Authorization': 'Bearer ' + token,
|
||||
'Content': '0',
|
||||
'Cache-Control': 'no-cache'
|
||||
}
|
||||
|
||||
#Current daily forecast
|
||||
response = requests.get(base_url + '/current?latitude=' + str(latitude) + '&longitude=' + str(longitude), headers=headers)
|
||||
|
||||
print('########################Current############################')
|
||||
print('Body:', response.content.decode("utf-8"))
|
||||
|
||||
#weekly forecast
|
||||
response = requests.get(base_url + '/7day?latitude=' + str(latitude) + '&longitude=' + str(longitude), headers=headers)
|
||||
|
||||
print('########################7day############################')
|
||||
print('Body:', response.content.decode("utf-8"))
|
||||
|
||||
#next hour forecast
|
||||
response = requests.get(base_url + '/nexthour?latitude=' + str(latitude) + '&longitude=' + str(longitude), headers=headers)
|
||||
|
||||
print('########################next hour############################')
|
||||
print('Body:', response.content.decode("utf-8"))
|
||||
|
||||
#24 hour forecast
|
||||
response = requests.get(base_url + '/24hour?latitude=' + str(latitude) + '&longitude=' + str(longitude), headers=headers)
|
||||
|
||||
print('########################24 hour############################')
|
||||
print('Body:', response.content.decode("utf-8"))
|
||||
|
||||
|
||||
|
||||
################################################33
|
||||
System Monitoring
|
||||
################################################
|
||||
# COMMANDLine
|
||||
- platform: command_line
|
||||
name: NUC CPU Temperature Core Zone 0
|
||||
command: "cat /sys/devices/virtual/thermal/thermal_zone0/temp"
|
||||
value_template: '{{ value | multiply(0.001) | round(2) }}'
|
||||
unit_of_measurement: '°C'
|
||||
- platform: command_line
|
||||
name: NUC CPU Temperature Core Zone 2
|
||||
command: "cat /sys/devices/virtual/thermal/thermal_zone2/temp"
|
||||
value_template: '{{ value | multiply(0.001) | round(2) }}'
|
||||
unit_of_measurement: '°C'
|
||||
- platform: command_line
|
||||
name: NUC CPU Frequency 0
|
||||
command: "cat /sys/devices/system/cpu/cpufreq/policy0/scaling_cur_freq"
|
||||
value_template: '{{ value | multiply(0.000001) | round(2) }}'
|
||||
unit_of_measurement: 'GHz'
|
||||
- platform: command_line
|
||||
name: NUC CPU Frequency 1
|
||||
command: "cat /sys/devices/system/cpu/cpufreq/policy1/scaling_cur_freq"
|
||||
value_template: '{{ value | multiply(0.000001) | round(2) }}'
|
||||
unit_of_measurement: 'GHz'
|
||||
- platform: command_line
|
||||
name: Linux version
|
||||
command: "cat /proc/version"
|
||||
value_template: '{{value|truncate(30,True)}}'
|
||||
- platform: command_line
|
||||
name: NUC CPU Governor
|
||||
command: "cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"
|
||||
- platform: command_line
|
||||
name: NUC CPU min. frequency
|
||||
command: "cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_min_freq"
|
||||
value_template: '{{ value | multiply(0.001) | round(2) }}'
|
||||
unit_of_measurement: 'MHz - Min Freq'
|
||||
- platform: command_line
|
||||
name: NUC CPU max. frequency
|
||||
command: "cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq"
|
||||
value_template: '{{ value | multiply(0.001) | round(2) }}'
|
||||
unit_of_measurement: 'MHz - Max Freq'
|
||||
- platform: command_line
|
||||
name: BIOS Vendor
|
||||
command: "cat /sys/class/dmi/id/bios_vendor"
|
||||
value_template: '{{value|truncate(45,True)}}'
|
||||
- platform: command_line
|
||||
name: SYS Vendor
|
||||
command: "cat /sys/class/dmi/id/product_name"
|
||||
value_template: '{{value|truncate(45,True)}}'
|
||||
- platform: command_line
|
||||
name: CPU online
|
||||
command: "cat /sys/devices/system/cpu/online"
|
||||
value_template: '{{value|truncate(45,True)}}'
|
||||
- platform: command_line
|
||||
name: CPU offline
|
||||
command: "cat /sys/devices/system/cpu/offline"
|
||||
value_template: '{{value|truncate(45,True)}}'
|
||||
- platform: command_line
|
||||
name: NUC Bios version
|
||||
command: "cat /sys/class/dmi/id/bios_version"
|
||||
- platform: command_line
|
||||
name: NUC board name
|
||||
command: "cat /sys/class/dmi/id/board_name"
|
||||
|
||||
# SystemMonitor
|
||||
- platform: systemmonitor
|
||||
resources:
|
||||
- type: disk_use_percent
|
||||
arg: /home
|
||||
- type: disk_use
|
||||
arg: /home
|
||||
- type: disk_free
|
||||
arg: /home
|
||||
- type: memory_free
|
||||
- type: memory_use
|
||||
- type: memory_use_percent
|
||||
- type: swap_use_percent
|
||||
- type: swap_use
|
||||
- type: swap_free
|
||||
- type: load_1m
|
||||
- type: load_5m
|
||||
- type: load_15m
|
||||
- type: network_in
|
||||
arg: enp3s0
|
||||
- type: network_out
|
||||
arg: enp3s0
|
||||
- type: throughput_network_in
|
||||
arg: enp3s0
|
||||
- type: throughput_network_out
|
||||
arg: enp3s0
|
||||
- type: packets_in
|
||||
arg: enp3s0
|
||||
- type: packets_out
|
||||
arg: enp3s0
|
||||
- type: processor_use
|
||||
- type: last_boot
|
||||
- type: ipv6_address
|
||||
arg: enp3s0
|
||||
- type: ipv4_address
|
||||
arg: enp3s0
|
||||
Reference in New Issue
Block a user