048414c5cb
- PATCH/DELETE /api/locations/{person}/{datetime} and /api/poo/{timestamp}
- update only non-PK fields (PK immutable); 404 on missing PK
- delete scoped to exact full PK with rowcount guard (0->404, 1->ok);
no batch/truncate/drop path
- session + CSRF protected; bare ingestion endpoints untouched
- service helpers in app/services/location.py and poo.py; regenerate openapi/
- tests/test_api_record_crud.py
160 lines
4.7 KiB
Python
160 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
import logging
|
|
|
|
from sqlalchemy import delete, desc, insert, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import Settings
|
|
from app.integrations.homeassistant import (
|
|
HomeAssistantClient,
|
|
HomeAssistantConfigError,
|
|
HomeAssistantRequestError,
|
|
)
|
|
from app.models.poo import PooRecord
|
|
from app.schemas.poo import PooRecordRequest
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LatestPooRecord:
|
|
timestamp: str
|
|
status: str
|
|
latitude: float
|
|
longitude: float
|
|
|
|
|
|
def _parse_required_float(value: str, field_name: str) -> float:
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError(f"Invalid numeric value for {field_name}") from exc
|
|
|
|
|
|
def _utc_now_minute_precision() -> str:
|
|
now = datetime.now(timezone.utc).replace(second=0, microsecond=0)
|
|
return now.strftime("%Y-%m-%dT%H:%MZ")
|
|
|
|
|
|
def record_poo(
|
|
session: Session,
|
|
payload: PooRecordRequest,
|
|
*,
|
|
settings: Settings,
|
|
homeassistant_client: HomeAssistantClient,
|
|
) -> None:
|
|
stmt = insert(PooRecord).prefix_with("OR IGNORE").values(
|
|
timestamp=_utc_now_minute_precision(),
|
|
status=payload.status,
|
|
latitude=_parse_required_float(payload.latitude, "latitude"),
|
|
longitude=_parse_required_float(payload.longitude, "longitude"),
|
|
)
|
|
session.execute(stmt)
|
|
session.commit()
|
|
|
|
try:
|
|
publish_latest_poo_status(
|
|
session=session,
|
|
settings=settings,
|
|
homeassistant_client=homeassistant_client,
|
|
)
|
|
except (HomeAssistantConfigError, HomeAssistantRequestError) as exc:
|
|
logger.warning("Failed to publish latest poo status to Home Assistant: %s", exc)
|
|
|
|
if settings.poo_webhook_id:
|
|
try:
|
|
homeassistant_client.trigger_webhook(
|
|
webhook_id=settings.poo_webhook_id,
|
|
body={"status": payload.status},
|
|
)
|
|
except (HomeAssistantConfigError, HomeAssistantRequestError) as exc:
|
|
logger.warning("Failed to trigger poo webhook on Home Assistant: %s", exc)
|
|
|
|
|
|
def update_poo_record(
|
|
session: Session,
|
|
timestamp_pk: str,
|
|
*,
|
|
status: str | None,
|
|
latitude: float | None,
|
|
longitude: float | None,
|
|
) -> PooRecord | None:
|
|
"""Update non-PK fields of a single poo record row.
|
|
|
|
Returns the updated ORM object, or ``None`` if the PK does not exist.
|
|
The ``timestamp`` PK is immutable and must not be passed as an update field.
|
|
Only fields with a non-``None`` value are written.
|
|
"""
|
|
row = session.execute(
|
|
select(PooRecord).where(PooRecord.timestamp == timestamp_pk)
|
|
).scalar_one_or_none()
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
if status is not None:
|
|
row.status = status
|
|
if latitude is not None:
|
|
row.latitude = latitude
|
|
if longitude is not None:
|
|
row.longitude = longitude
|
|
|
|
session.commit()
|
|
session.refresh(row)
|
|
return row
|
|
|
|
|
|
def delete_poo_record(session: Session, timestamp_pk: str) -> bool:
|
|
"""Delete the single poo record row identified by its PK.
|
|
|
|
Returns ``True`` if exactly one row was deleted, ``False`` if the PK did
|
|
not exist (caller should raise 404). The DELETE is scoped to the exact PK
|
|
— no batch/truncate path exists.
|
|
"""
|
|
result = session.execute(
|
|
delete(PooRecord).where(PooRecord.timestamp == timestamp_pk)
|
|
)
|
|
session.commit()
|
|
return result.rowcount == 1
|
|
|
|
|
|
def get_latest_poo_record(session: Session) -> LatestPooRecord | None:
|
|
stmt = select(PooRecord).order_by(desc(PooRecord.timestamp)).limit(1)
|
|
record = session.execute(stmt).scalar_one_or_none()
|
|
if record is None:
|
|
logger.info("No poo record is available yet")
|
|
return None
|
|
return LatestPooRecord(
|
|
timestamp=record.timestamp,
|
|
status=record.status,
|
|
latitude=record.latitude,
|
|
longitude=record.longitude,
|
|
)
|
|
|
|
|
|
def publish_latest_poo_status(
|
|
*,
|
|
session: Session,
|
|
settings: Settings,
|
|
homeassistant_client: HomeAssistantClient,
|
|
) -> LatestPooRecord | None:
|
|
latest = get_latest_poo_record(session)
|
|
if latest is None:
|
|
logger.info("Skipping Home Assistant poo sensor publish because no poo record exists yet")
|
|
return None
|
|
|
|
record_time = datetime.fromisoformat(latest.timestamp.replace("Z", "+00:00")).astimezone()
|
|
|
|
homeassistant_client.publish_sensor(
|
|
entity_id=settings.poo_sensor_entity_name,
|
|
state=latest.status,
|
|
attributes={
|
|
"last_poo": record_time.strftime("%a | %Y-%m-%d | %H:%M"),
|
|
"friendly_name": settings.poo_sensor_friendly_name,
|
|
},
|
|
)
|
|
return latest
|