Files
home-automation-backend/recorder/poo.py

78 lines
2.9 KiB
Python
Raw Normal View History

2024-07-16 15:55:45 +02:00
from datetime import datetime
from fastapi_mqtt import FastMQTT, MQTTConfig
2024-07-23 15:33:23 +02:00
from recorder.notion_handle import NotionClient
2024-07-16 15:55:45 +02:00
class PooRecorder:
mqtt_config = MQTTConfig(username="mqtt", password="mqtt", reconnect_retries=-1) # noqa: S106
2024-07-23 15:33:23 +02:00
notion = NotionClient()
2024-07-23 14:06:40 +02:00
mqtt = FastMQTT(config=mqtt_config, client_id="poo_recorder")
2024-07-17 16:44:30 +02:00
CONFIG_TOPIC = "homeassistant/text/poo_recorder/config"
AVAILABILITY_TOPIC = "studiotj/poo_recorder/status"
COMMAND_TOPIC = "studiotj/poo_recorder/update_text"
STATE_TOPIC = "studiotj/poo_recorder/text"
JSON_TOPIC = "studiotj/poo_recorder/attributes"
ONLINE = "online"
OFFLINE = "offline"
2024-07-16 15:55:45 +02:00
def __init__(self) -> None:
print("Initialization.")
async def start(self) -> None:
print("Starting...")
await PooRecorder.mqtt.mqtt_startup()
async def stop(self) -> None:
print("Stopping...")
await PooRecorder.mqtt.mqtt_shutdown()
2024-07-17 16:44:30 +02:00
async def record(self, status: str) -> None:
PooRecorder.publish_text(status)
2024-07-16 15:55:45 +02:00
now = datetime.now(tz=datetime.now().astimezone().tzinfo)
2024-07-17 16:44:30 +02:00
PooRecorder.publish_time(now)
2024-07-23 15:33:23 +02:00
await PooRecorder.notion.note(now, status)
2024-07-16 15:55:45 +02:00
2024-07-17 16:44:30 +02:00
@staticmethod
2024-07-16 15:55:45 +02:00
@mqtt.on_connect()
2024-07-17 16:44:30 +02:00
def on_connect(client, flags, rc, properties) -> None: # noqa: ANN001, ARG004
2024-07-16 15:55:45 +02:00
print("Connected")
config = PooRecorder.compose_config()
PooRecorder.mqtt.publish(PooRecorder.CONFIG_TOPIC, config, retain=True)
PooRecorder.mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True)
2024-07-17 16:44:30 +02:00
@staticmethod
def publish_text(new_text: str) -> None:
PooRecorder.mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True)
2024-07-17 16:47:28 +02:00
PooRecorder.mqtt.publish(PooRecorder.STATE_TOPIC, new_text, retain=True)
2024-07-17 16:44:30 +02:00
@staticmethod
def publish_time(time: datetime) -> None:
formatted_time = time.strftime("%a | %Y-%m-%d | %H:%M")
PooRecorder.mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True)
2024-07-17 16:44:30 +02:00
json_string = {"last_poo": formatted_time}
2024-07-17 16:47:28 +02:00
PooRecorder.mqtt.publish(PooRecorder.JSON_TOPIC, json_string, retain=True)
2024-07-16 15:55:45 +02:00
@staticmethod
def compose_config() -> dict:
return {
"device": {
"name": "Dog Poop Recorder",
"model": "poop-recorder-backend",
"sw_version": "0.2",
"identifiers": ["poo_recorder"],
"manufacturer": "Studio TJ",
},
"unique_id": "poo_recorder",
2024-07-17 16:44:30 +02:00
"name": "Poo Status",
"availability_topic": PooRecorder.AVAILABILITY_TOPIC,
2024-07-16 15:55:45 +02:00
"availability_template": "{{ value_json.availability }}",
2024-07-17 16:44:30 +02:00
"json_attributes_topic": PooRecorder.JSON_TOPIC,
2024-07-16 15:55:45 +02:00
"min": 0,
"max": 255,
"mode": "text",
2024-07-17 16:44:30 +02:00
"command_topic": PooRecorder.COMMAND_TOPIC,
"state_topic": PooRecorder.STATE_TOPIC,
2024-07-16 15:55:45 +02:00
}