Use webhook to publish automation trigger

This commit is contained in:
2024-08-21 17:05:12 +02:00
parent 3eaae13dc3
commit b9b8659347
3 changed files with 13 additions and 2 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import ast import ast
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import httpx
from pydantic import BaseModel from pydantic import BaseModel
from src.cloud_util.ticktick import TickTick from src.cloud_util.ticktick import TickTick
@@ -24,6 +25,12 @@ class HomeAssistant:
return {"Status": "Unknown target"} return {"Status": "Unknown target"}
async def trigger_webhook(self, payload: dict[str, str], webhook_id: str) -> None:
token: str = Config.get_env("HOMEASSISTANT_TOKEN")
webhook_url: str = Config.get_env("HOMEASSISTANT_URL") + "/api/webhook/" + webhook_id
headers: dict[str, str] = {"Authorization": f"Bearer {token}"}
await httpx.AsyncClient().post(webhook_url, json=payload, headers=headers)
def _process_ticktick_message(self, message: Message) -> dict[str, str]: def _process_ticktick_message(self, message: Message) -> dict[str, str]:
if message.action == "create_shopping_list": if message.action == "create_shopping_list":
return self._create_shopping_list(content=message.content) return self._create_shopping_list(content=message.content)

View File

@@ -15,8 +15,8 @@ Config.init()
ticktick = TickTick() ticktick = TickTick()
notion = NotionAsync(token=Config.get_env(key="NOTION_TOKEN")) notion = NotionAsync(token=Config.get_env(key="NOTION_TOKEN"))
mqtt = MQTT() mqtt = MQTT()
poo_recorder = PooRecorder(mqtt=mqtt, notion=notion)
homeassistant = HomeAssistant(ticktick=ticktick) homeassistant = HomeAssistant(ticktick=ticktick)
poo_recorder = PooRecorder(mqtt=mqtt, notion=notion, homeassistant=homeassistant)
@asynccontextmanager @asynccontextmanager

View File

@@ -1,5 +1,6 @@
from datetime import datetime from datetime import datetime
from src.cloud_util.homeassistant import HomeAssistant
from src.cloud_util.mqtt import MQTT from src.cloud_util.mqtt import MQTT
from src.cloud_util.notion import NotionAsync from src.cloud_util.notion import NotionAsync
from src.config import Config from src.config import Config
@@ -14,13 +15,14 @@ class PooRecorder:
ONLINE = "online" ONLINE = "online"
OFFLINE = "offline" OFFLINE = "offline"
def __init__(self, mqtt: MQTT, notion: NotionAsync) -> None: def __init__(self, mqtt: MQTT, notion: NotionAsync, homeassistant: HomeAssistant) -> None:
print("Poo Recorder Initialization...") print("Poo Recorder Initialization...")
self._notion = notion self._notion = notion
self._table_id = Config.get_env("POO_RECORD_NOTION_TABLE_ID") self._table_id = Config.get_env("POO_RECORD_NOTION_TABLE_ID")
self._mqtt = mqtt self._mqtt = mqtt
self._mqtt.publish(PooRecorder.CONFIG_TOPIC, PooRecorder.compose_config(), retain=True) self._mqtt.publish(PooRecorder.CONFIG_TOPIC, PooRecorder.compose_config(), retain=True)
self._mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True) self._mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True)
self._homeassistant = homeassistant
async def _note(self, now: datetime, status: str) -> None: async def _note(self, now: datetime, status: str) -> None:
formatted_date = now.strftime("%Y-%m-%d") formatted_date = now.strftime("%Y-%m-%d")
@@ -28,10 +30,12 @@ class PooRecorder:
await self._notion.append_table_row_text(self._table_id, [formatted_date, formatted_time, status]) await self._notion.append_table_row_text(self._table_id, [formatted_date, formatted_time, status])
async def record(self, status: str) -> None: async def record(self, status: str) -> None:
webhook_id: str = Config.get_env("HOMEASSISTANT_POO_TRIGGER_ID")
self._publish_text(status) self._publish_text(status)
now = datetime.now(tz=datetime.now().astimezone().tzinfo) now = datetime.now(tz=datetime.now().astimezone().tzinfo)
self._publish_time(now) self._publish_time(now)
await self._note(now, status) await self._note(now, status)
await self._homeassistant.trigger_webhook(payload={"status": status}, webhook_id=webhook_id)
def _publish_text(self, new_text: str) -> None: def _publish_text(self, new_text: str) -> None:
self._mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True) self._mqtt.publish(PooRecorder.AVAILABILITY_TOPIC, PooRecorder.ONLINE, retain=True)