Restore legacy poo inbound dispatch
pytest / test (push) Successful in 43s
docker-image / build-and-push (push) Successful in 3m38s

This commit is contained in:
2026-04-20 23:33:57 +02:00
parent b9e7f51d51
commit 35aee79d93
3 changed files with 227 additions and 4 deletions
+158
View File
@@ -1,5 +1,21 @@
from sqlalchemy import text
import app.db as app_db
import app.poo_db as poo_db
from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_homeassistant_client
from app.main import create_app
class _FakeHomeAssistantClient:
def __init__(self) -> None:
self.sensor_calls: list[dict] = []
def publish_sensor(self, *, entity_id: str, state: str, attributes: dict | None = None) -> None:
self.sensor_calls.append(
{"entity_id": entity_id, "state": state, "attributes": attributes or {}}
)
def test_homeassistant_publish_records_location(location_client) -> None:
client, engine = location_client
@@ -141,6 +157,148 @@ def test_homeassistant_publish_rejects_invalid_ticktick_content(location_client)
assert response.text == "bad request"
def test_homeassistant_publish_poo_get_latest_publishes_latest_status(
ready_location_database,
ready_poo_database,
auth_database,
monkeypatch,
) -> None:
location_engine = app_db.create_engine(
ready_location_database["location_url"],
connect_args={"check_same_thread": False},
)
location_session_local = app_db.sessionmaker(
bind=location_engine,
autoflush=False,
autocommit=False,
)
poo_engine = poo_db.create_engine(
ready_poo_database["poo_url"],
connect_args={"check_same_thread": False},
)
poo_session_local = poo_db.sessionmaker(
bind=poo_engine,
autoflush=False,
autocommit=False,
)
fake_ha = _FakeHomeAssistantClient()
settings = Settings(
poo_sensor_entity_name="sensor.test_poo_status",
poo_sensor_friendly_name="Poo Status",
)
monkeypatch.setattr(app_db, "engine", location_engine)
monkeypatch.setattr(app_db, "SessionLocal", location_session_local)
monkeypatch.setattr(poo_db, "poo_engine", poo_engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", poo_session_local)
test_app = create_app()
test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha
test_app.dependency_overrides[get_app_settings] = lambda: settings
with poo_engine.begin() as conn:
conn.execute(
text(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) "
"VALUES (:timestamp, :status, :latitude, :longitude)"
),
{
"timestamp": "2026-04-20T10:05Z",
"status": "done",
"latitude": 1.23,
"longitude": 4.56,
},
)
try:
from fastapi.testclient import TestClient
with TestClient(test_app) as client:
response = client.post(
"/homeassistant/publish",
json={
"target": "poo_recorder",
"action": "get_latest",
"content": "",
},
)
assert response.status_code == 200
assert response.text == ""
assert len(fake_ha.sensor_calls) == 1
assert fake_ha.sensor_calls[0]["entity_id"] == "sensor.test_poo_status"
assert fake_ha.sensor_calls[0]["state"] == "done"
assert fake_ha.sensor_calls[0]["attributes"]["friendly_name"] == "Poo Status"
assert fake_ha.sensor_calls[0]["attributes"]["last_poo"]
finally:
test_app.dependency_overrides.clear()
get_settings.cache_clear()
location_engine.dispose()
poo_engine.dispose()
def test_homeassistant_publish_returns_internal_error_for_unknown_poo_action(
ready_location_database,
ready_poo_database,
auth_database,
monkeypatch,
) -> None:
location_engine = app_db.create_engine(
ready_location_database["location_url"],
connect_args={"check_same_thread": False},
)
location_session_local = app_db.sessionmaker(
bind=location_engine,
autoflush=False,
autocommit=False,
)
poo_engine = poo_db.create_engine(
ready_poo_database["poo_url"],
connect_args={"check_same_thread": False},
)
poo_session_local = poo_db.sessionmaker(
bind=poo_engine,
autoflush=False,
autocommit=False,
)
fake_ha = _FakeHomeAssistantClient()
settings = Settings(
poo_sensor_entity_name="sensor.test_poo_status",
poo_sensor_friendly_name="Poo Status",
)
monkeypatch.setattr(app_db, "engine", location_engine)
monkeypatch.setattr(app_db, "SessionLocal", location_session_local)
monkeypatch.setattr(poo_db, "poo_engine", poo_engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", poo_session_local)
test_app = create_app()
test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha
test_app.dependency_overrides[get_app_settings] = lambda: settings
try:
from fastapi.testclient import TestClient
with TestClient(test_app) as client:
response = client.post(
"/homeassistant/publish",
json={
"target": "poo_recorder",
"action": "unknown_action",
"content": "",
},
)
assert response.status_code == 500
assert response.text == "internal server error"
assert fake_ha.sensor_calls == []
finally:
test_app.dependency_overrides.clear()
get_settings.cache_clear()
location_engine.dispose()
poo_engine.dispose()
def test_homeassistant_publish_returns_not_implemented_for_unknown_location_action(
location_client,
) -> None: