52 lines
1.7 KiB
Python
52 lines
1.7 KiB
Python
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
from fastapi_mqtt import FastMQTT, MQTTConfig
|
||
|
|
|
||
|
|
|
||
|
|
class PooRecorder:
|
||
|
|
mqtt_config = MQTTConfig(username="mqtt", password="mqtt")
|
||
|
|
mqtt = FastMQTT(config=mqtt_config)
|
||
|
|
|
||
|
|
def __init__(self) -> None:
|
||
|
|
print("Initialization.")
|
||
|
|
|
||
|
|
async def start(self) -> None:
|
||
|
|
print("Starting...")
|
||
|
|
await PooRecorder.mqtt.mqtt_startup()
|
||
|
|
|
||
|
|
async def stop(self) -> None:
|
||
|
|
print("Stopping...")
|
||
|
|
await PooRecorder.mqtt.mqtt_shutdown()
|
||
|
|
|
||
|
|
def record(self) -> None:
|
||
|
|
print("Recording...", self._status)
|
||
|
|
now = datetime.now(tz=datetime.now().astimezone().tzinfo)
|
||
|
|
formatted_time = now.strftime("%a | %Y-%m-%d | %H:%M")
|
||
|
|
|
||
|
|
@mqtt.on_connect()
|
||
|
|
def on_connect(client, flags, rc, properties):
|
||
|
|
print("Connected")
|
||
|
|
config = PooRecorder.compose_config()
|
||
|
|
PooRecorder.mqtt.publish("homeassistant/text/poo_recorder/config", config)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def compose_config() -> dict:
|
||
|
|
return {
|
||
|
|
"device": {
|
||
|
|
"name": "Dog Poop Recorder",
|
||
|
|
"model": "poop-recorder-backend",
|
||
|
|
"sw_version": "0.2",
|
||
|
|
"identifiers": ["poo_recorder"],
|
||
|
|
"manufacturer": "Studio TJ",
|
||
|
|
},
|
||
|
|
"unique_id": "poo_recorder",
|
||
|
|
"name": "Poop Recorder",
|
||
|
|
"availability_topic": "studiotj/poo_recorder/status",
|
||
|
|
"availability_template": "{{ value_json.availability }}",
|
||
|
|
"json_attributes_topic": "studiotj/poo_recorder/attributes",
|
||
|
|
"min": 0,
|
||
|
|
"max": 255,
|
||
|
|
"mode": "text",
|
||
|
|
"command_topic": "studiotj/poo_recorder/command",
|
||
|
|
}
|