Files

113 lines
3.3 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import logging
from sqlalchemy import desc, insert, select
from sqlalchemy.orm import Session
from app.config import Settings
from app.integrations.homeassistant import (
HomeAssistantClient,
HomeAssistantConfigError,
HomeAssistantRequestError,
)
from app.models.poo import PooRecord
from app.schemas.poo import PooRecordRequest
logger = logging.getLogger(__name__)
@dataclass(slots=True)
class LatestPooRecord:
timestamp: str
status: str
latitude: float
longitude: float
def _parse_required_float(value: str, field_name: str) -> float:
try:
return float(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid numeric value for {field_name}") from exc
def _utc_now_minute_precision() -> str:
now = datetime.now(timezone.utc).replace(second=0, microsecond=0)
return now.strftime("%Y-%m-%dT%H:%MZ")
def record_poo(
session: Session,
payload: PooRecordRequest,
*,
settings: Settings,
homeassistant_client: HomeAssistantClient,
) -> None:
stmt = insert(PooRecord).prefix_with("OR IGNORE").values(
timestamp=_utc_now_minute_precision(),
status=payload.status,
latitude=_parse_required_float(payload.latitude, "latitude"),
longitude=_parse_required_float(payload.longitude, "longitude"),
)
session.execute(stmt)
session.commit()
try:
publish_latest_poo_status(
session=session,
settings=settings,
homeassistant_client=homeassistant_client,
)
except (HomeAssistantConfigError, HomeAssistantRequestError) as exc:
logger.warning("Failed to publish latest poo status to Home Assistant: %s", exc)
if settings.poo_webhook_id:
try:
homeassistant_client.trigger_webhook(
webhook_id=settings.poo_webhook_id,
body={"status": payload.status},
)
except (HomeAssistantConfigError, HomeAssistantRequestError) as exc:
logger.warning("Failed to trigger poo webhook on Home Assistant: %s", exc)
def get_latest_poo_record(session: Session) -> LatestPooRecord | None:
stmt = select(PooRecord).order_by(desc(PooRecord.timestamp)).limit(1)
record = session.execute(stmt).scalar_one_or_none()
if record is None:
logger.info("No poo record is available yet")
return None
return LatestPooRecord(
timestamp=record.timestamp,
status=record.status,
latitude=record.latitude,
longitude=record.longitude,
)
def publish_latest_poo_status(
*,
session: Session,
settings: Settings,
homeassistant_client: HomeAssistantClient,
) -> LatestPooRecord | None:
latest = get_latest_poo_record(session)
if latest is None:
logger.info("Skipping Home Assistant poo sensor publish because no poo record exists yet")
return None
record_time = datetime.fromisoformat(latest.timestamp.replace("Z", "+00:00")).astimezone()
homeassistant_client.publish_sensor(
entity_id=settings.poo_sensor_entity_name,
state=latest.status,
attributes={
"last_poo": record_time.strftime("%a | %Y-%m-%d | %H:%M"),
"friendly_name": settings.poo_sensor_friendly_name,
},
)
return latest