M2-T04: add single-row record CRUD API (patch/delete)

- PATCH/DELETE /api/locations/{person}/{datetime} and /api/poo/{timestamp}
- update only non-PK fields (PK immutable); 404 on missing PK
- delete scoped to exact full PK with rowcount guard (0->404, 1->ok);
  no batch/truncate/drop path
- session + CSRF protected; bare ingestion endpoints untouched
- service helpers in app/services/location.py and poo.py; regenerate openapi/
- tests/test_api_record_crud.py
This commit is contained in:
2026-06-12 23:33:08 +02:00
parent 9ce3f2a0b8
commit 048414c5cb
7 changed files with 1377 additions and 4 deletions
+48 -1
View File
@@ -4,7 +4,7 @@ from dataclasses import dataclass
from datetime import datetime, timezone
import logging
from sqlalchemy import desc, insert, select
from sqlalchemy import delete, desc, insert, select
from sqlalchemy.orm import Session
from app.config import Settings
@@ -74,6 +74,53 @@ def record_poo(
logger.warning("Failed to trigger poo webhook on Home Assistant: %s", exc)
def update_poo_record(
session: Session,
timestamp_pk: str,
*,
status: str | None,
latitude: float | None,
longitude: float | None,
) -> PooRecord | None:
"""Update non-PK fields of a single poo record row.
Returns the updated ORM object, or ``None`` if the PK does not exist.
The ``timestamp`` PK is immutable and must not be passed as an update field.
Only fields with a non-``None`` value are written.
"""
row = session.execute(
select(PooRecord).where(PooRecord.timestamp == timestamp_pk)
).scalar_one_or_none()
if row is None:
return None
if status is not None:
row.status = status
if latitude is not None:
row.latitude = latitude
if longitude is not None:
row.longitude = longitude
session.commit()
session.refresh(row)
return row
def delete_poo_record(session: Session, timestamp_pk: str) -> bool:
"""Delete the single poo record row identified by its PK.
Returns ``True`` if exactly one row was deleted, ``False`` if the PK did
not exist (caller should raise 404). The DELETE is scoped to the exact PK
— no batch/truncate path exists.
"""
result = session.execute(
delete(PooRecord).where(PooRecord.timestamp == timestamp_pk)
)
session.commit()
return result.rowcount == 1
def get_latest_poo_record(session: Session) -> LatestPooRecord | None:
stmt = select(PooRecord).order_by(desc(PooRecord.timestamp)).limit(1)
record = session.execute(stmt).scalar_one_or_none()