113 lines
3.3 KiB
Python
113 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
import logging
|
|
|
|
from sqlalchemy import desc, insert, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import Settings
|
|
from app.integrations.homeassistant import (
|
|
HomeAssistantClient,
|
|
HomeAssistantConfigError,
|
|
HomeAssistantRequestError,
|
|
)
|
|
from app.models.poo import PooRecord
|
|
from app.schemas.poo import PooRecordRequest
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LatestPooRecord:
|
|
timestamp: str
|
|
status: str
|
|
latitude: float
|
|
longitude: float
|
|
|
|
|
|
def _parse_required_float(value: str, field_name: str) -> float:
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError(f"Invalid numeric value for {field_name}") from exc
|
|
|
|
|
|
def _utc_now_minute_precision() -> str:
|
|
now = datetime.now(timezone.utc).replace(second=0, microsecond=0)
|
|
return now.strftime("%Y-%m-%dT%H:%MZ")
|
|
|
|
|
|
def record_poo(
|
|
session: Session,
|
|
payload: PooRecordRequest,
|
|
*,
|
|
settings: Settings,
|
|
homeassistant_client: HomeAssistantClient,
|
|
) -> None:
|
|
stmt = insert(PooRecord).prefix_with("OR IGNORE").values(
|
|
timestamp=_utc_now_minute_precision(),
|
|
status=payload.status,
|
|
latitude=_parse_required_float(payload.latitude, "latitude"),
|
|
longitude=_parse_required_float(payload.longitude, "longitude"),
|
|
)
|
|
session.execute(stmt)
|
|
session.commit()
|
|
|
|
try:
|
|
publish_latest_poo_status(
|
|
session=session,
|
|
settings=settings,
|
|
homeassistant_client=homeassistant_client,
|
|
)
|
|
except (HomeAssistantConfigError, HomeAssistantRequestError) as exc:
|
|
logger.warning("Failed to publish latest poo status to Home Assistant: %s", exc)
|
|
|
|
if settings.poo_webhook_id:
|
|
try:
|
|
homeassistant_client.trigger_webhook(
|
|
webhook_id=settings.poo_webhook_id,
|
|
body={"status": payload.status},
|
|
)
|
|
except (HomeAssistantConfigError, HomeAssistantRequestError) as exc:
|
|
logger.warning("Failed to trigger poo webhook on Home Assistant: %s", exc)
|
|
|
|
|
|
def get_latest_poo_record(session: Session) -> LatestPooRecord | None:
|
|
stmt = select(PooRecord).order_by(desc(PooRecord.timestamp)).limit(1)
|
|
record = session.execute(stmt).scalar_one_or_none()
|
|
if record is None:
|
|
logger.info("No poo record is available yet")
|
|
return None
|
|
return LatestPooRecord(
|
|
timestamp=record.timestamp,
|
|
status=record.status,
|
|
latitude=record.latitude,
|
|
longitude=record.longitude,
|
|
)
|
|
|
|
|
|
def publish_latest_poo_status(
|
|
*,
|
|
session: Session,
|
|
settings: Settings,
|
|
homeassistant_client: HomeAssistantClient,
|
|
) -> LatestPooRecord | None:
|
|
latest = get_latest_poo_record(session)
|
|
if latest is None:
|
|
logger.info("Skipping Home Assistant poo sensor publish because no poo record exists yet")
|
|
return None
|
|
|
|
record_time = datetime.fromisoformat(latest.timestamp.replace("Z", "+00:00")).astimezone()
|
|
|
|
homeassistant_client.publish_sensor(
|
|
entity_id=settings.poo_sensor_entity_name,
|
|
state=latest.status,
|
|
attributes={
|
|
"last_poo": record_time.strftime("%a | %Y-%m-%d | %H:%M"),
|
|
"friendly_name": settings.poo_sensor_friendly_name,
|
|
},
|
|
)
|
|
return latest
|