from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import logging from sqlalchemy import desc, insert, select from sqlalchemy.orm import Session from app.config import Settings from app.integrations.homeassistant import ( HomeAssistantClient, HomeAssistantConfigError, HomeAssistantRequestError, ) from app.models.poo import PooRecord from app.schemas.poo import PooRecordRequest logger = logging.getLogger(__name__) @dataclass(slots=True) class LatestPooRecord: timestamp: str status: str latitude: float longitude: float def _parse_required_float(value: str, field_name: str) -> float: try: return float(value) except (TypeError, ValueError) as exc: raise ValueError(f"Invalid numeric value for {field_name}") from exc def _utc_now_minute_precision() -> str: now = datetime.now(timezone.utc).replace(second=0, microsecond=0) return now.strftime("%Y-%m-%dT%H:%MZ") def record_poo( session: Session, payload: PooRecordRequest, *, settings: Settings, homeassistant_client: HomeAssistantClient, ) -> None: stmt = insert(PooRecord).prefix_with("OR IGNORE").values( timestamp=_utc_now_minute_precision(), status=payload.status, latitude=_parse_required_float(payload.latitude, "latitude"), longitude=_parse_required_float(payload.longitude, "longitude"), ) session.execute(stmt) session.commit() try: publish_latest_poo_status( session=session, settings=settings, homeassistant_client=homeassistant_client, ) except (HomeAssistantConfigError, HomeAssistantRequestError) as exc: logger.warning("Failed to publish latest poo status to Home Assistant: %s", exc) if settings.poo_webhook_id: try: homeassistant_client.trigger_webhook( webhook_id=settings.poo_webhook_id, body={"status": payload.status}, ) except (HomeAssistantConfigError, HomeAssistantRequestError) as exc: logger.warning("Failed to trigger poo webhook on Home Assistant: %s", exc) def get_latest_poo_record(session: Session) -> LatestPooRecord | None: stmt = select(PooRecord).order_by(desc(PooRecord.timestamp)).limit(1) record = session.execute(stmt).scalar_one_or_none() if record is None: logger.info("No poo record is available yet") return None return LatestPooRecord( timestamp=record.timestamp, status=record.status, latitude=record.latitude, longitude=record.longitude, ) def publish_latest_poo_status( *, session: Session, settings: Settings, homeassistant_client: HomeAssistantClient, ) -> LatestPooRecord | None: latest = get_latest_poo_record(session) if latest is None: logger.info("Skipping Home Assistant poo sensor publish because no poo record exists yet") return None record_time = datetime.fromisoformat(latest.timestamp.replace("Z", "+00:00")).astimezone() homeassistant_client.publish_sensor( entity_id=settings.poo_sensor_entity_name, state=latest.status, attributes={ "last_poo": record_time.strftime("%a | %Y-%m-%d | %H:%M"), "friendly_name": settings.poo_sensor_friendly_name, }, ) return latest