from datetime import datetime from fastapi_mqtt import FastMQTT, MQTTConfig class PooRecorder: mqtt_config = MQTTConfig(username="mqtt", password="mqtt") # noqa: S106 mqtt = FastMQTT(config=mqtt_config) CONFIG_TOPIC = "homeassistant/text/poo_recorder/config" AVAILABILITY_TOPIC = "studiotj/poo_recorder/status" COMMAND_TOPIC = "studiotj/poo_recorder/update_text" STATE_TOPIC = "studiotj/poo_recorder/text" JSON_TOPIC = "studiotj/poo_recorder/attributes" ONLINE = "online" OFFLINE = "offline" def __init__(self) -> None: print("Initialization.") async def start(self) -> None: print("Starting...") await PooRecorder.mqtt.mqtt_startup() async def stop(self) -> None: print("Stopping...") await PooRecorder.mqtt.mqtt_shutdown() async def record(self, status: str) -> None: PooRecorder.publish_text(status) now = datetime.now(tz=datetime.now().astimezone().tzinfo) PooRecorder.publish_time(now) @staticmethod @mqtt.on_connect() def on_connect(client, flags, rc, properties) -> None: # noqa: ANN001, ARG004 print("Connected") config = PooRecorder.compose_config() PooRecorder.mqtt.publish(PooRecorder.CONFIG_TOPIC, config) PooRecorder.publish_text("N/A") @staticmethod def publish_text(new_text: str) -> None: PooRecorder.mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE) PooRecorder.mqtt.publish(PooRecorder.STATE_TOPIC, new_text) @staticmethod def publish_time(time: datetime) -> None: formatted_time = time.strftime("%a | %Y-%m-%d | %H:%M") PooRecorder.mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE) json_string = {"last_poo": formatted_time} PooRecorder.mqtt.publish(PooRecorder.JSON_TOPIC, json_string) @staticmethod def compose_config() -> dict: return { "device": { "name": "Dog Poop Recorder", "model": "poop-recorder-backend", "sw_version": "0.2", "identifiers": ["poo_recorder"], "manufacturer": "Studio TJ", }, "unique_id": "poo_recorder", "name": "Poo Status", "availability_topic": PooRecorder.AVAILABILITY_TOPIC, "availability_template": "{{ value_json.availability }}", "json_attributes_topic": PooRecorder.JSON_TOPIC, "min": 0, "max": 255, "mode": "text", "command_topic": PooRecorder.COMMAND_TOPIC, "state_topic": PooRecorder.STATE_TOPIC, }