Persist runtime config in app db and seed from env

This commit is contained in:
2026-04-20 15:56:10 +02:00
parent 3f7c9e43d9
commit 179aae264e
21 changed files with 921 additions and 125 deletions
+49 -3
View File
@@ -25,8 +25,9 @@ def _prepare_app_db(tmp_path) -> str:
def test_app_starts(client: TestClient) -> None:
response = client.get("/")
assert response.status_code == 200
response = client.get("/", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
def test_status_endpoint(client: TestClient) -> None:
@@ -73,11 +74,56 @@ def test_app_db_adoption_initializes_new_database(tmp_path) -> None:
"SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
}
assert {"auth_users", "auth_sessions", "alembic_version"} <= tables
assert {"auth_users", "auth_sessions", "app_config", "alembic_version"} <= tables
finally:
conn.close()
def test_app_start_seeds_missing_config_from_env_without_overwriting_existing_values(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
app_database_url = _prepare_app_db(tmp_path)
location_database_path = tmp_path / "location_ready.db"
poo_database_path = tmp_path / "poo_ready.db"
command.upgrade(_make_alembic_config(f"sqlite:///{location_database_path}"), "head")
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
app_database_path = tmp_path / "app_ready.db"
conn = sqlite3.connect(app_database_path)
conn.execute(
"INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)",
("APP_NAME", "Database Owned Name"),
)
conn.commit()
conn.close()
monkeypatch.setenv("APP_DATABASE_URL", app_database_url)
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("APP_NAME", "Bootstrap Name")
monkeypatch.setenv("HOME_ASSISTANT_BASE_URL", "http://bootstrap-ha.local:8123")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_database_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear()
reset_auth_db_caches()
app = create_app()
anyio.run(_run_lifespan, app)
conn = sqlite3.connect(app_database_path)
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows["APP_NAME"] == "Database Owned Name"
assert rows["HOME_ASSISTANT_BASE_URL"] == "http://bootstrap-ha.local:8123"
assert rows["AUTH_SESSION_COOKIE_NAME"] == "home_automation_session"
get_settings.cache_clear()
reset_auth_db_caches()
def test_app_start_fails_when_location_db_missing(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
+94 -14
View File
@@ -1,7 +1,11 @@
import re
import sqlite3
from pathlib import Path
from fastapi.testclient import TestClient
from app.config import get_settings
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
@@ -9,8 +13,16 @@ def _extract_csrf_token(html: str) -> str:
return match.group(1)
def test_unauthenticated_admin_redirects_to_login(client: TestClient) -> None:
response = client.get("/admin", follow_redirects=False)
def _stringify_for_form(value) -> str:
if value is None:
return ""
if isinstance(value, bool):
return str(value).lower()
return str(value)
def test_unauthenticated_config_redirects_to_login(client: TestClient) -> None:
response = client.get("/config", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
@@ -31,18 +43,19 @@ def test_login_success_sets_session_cookie_and_allows_admin_access(client: TestC
)
assert response.status_code == 303
assert response.headers["location"] == "/admin"
assert response.headers["location"] == "/config"
set_cookie_header = response.headers["set-cookie"].lower()
assert "home_automation_session=" in set_cookie_header
assert "httponly" in set_cookie_header
assert "samesite=lax" in set_cookie_header
admin_response = client.get("/admin")
assert admin_response.status_code == 200
assert "首次登录后需要先修改密码" in admin_response.text
assert "Current Password" in admin_response.text
assert "New Password" in admin_response.text
assert "当前用户" not in admin_response.text
config_response = client.get("/config")
assert config_response.status_code == 200
assert "首次登录后需要先修改密码" in config_response.text
assert "Current Password" in config_response.text
assert "New Password" in config_response.text
assert "Save Config" in config_response.text
assert "当前用户" in config_response.text
def test_login_failure_returns_generic_error(client: TestClient) -> None:
@@ -76,8 +89,8 @@ def test_logout_revokes_session(client: TestClient) -> None:
},
)
admin_page = client.get("/admin")
logout_csrf_token = _extract_csrf_token(admin_page.text)
config_page = client.get("/config")
logout_csrf_token = _extract_csrf_token(config_page.text)
logout_response = client.post(
"/logout",
@@ -88,9 +101,9 @@ def test_logout_revokes_session(client: TestClient) -> None:
assert logout_response.status_code == 303
assert logout_response.headers["location"] == "/login"
admin_after_logout = client.get("/admin", follow_redirects=False)
assert admin_after_logout.status_code == 303
assert admin_after_logout.headers["location"] == "/login"
config_after_logout = client.get("/config", follow_redirects=False)
assert config_after_logout.status_code == 303
assert config_after_logout.headers["location"] == "/login"
def test_login_rejects_invalid_csrf(client: TestClient) -> None:
@@ -107,3 +120,70 @@ def test_login_rejects_invalid_csrf(client: TestClient) -> None:
assert response.status_code == 400
assert "invalid login request" in response.text
def test_legacy_admin_route_redirects_to_config_when_authenticated(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
response = client.get("/admin", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config"
def test_config_page_update_persists_to_database(
client: TestClient, test_database_urls
) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
settings = get_settings()
form_data = {"csrf_token": config_csrf_token}
from app.services.config_page import CONFIG_FIELDS
for field in CONFIG_FIELDS:
if field.secret:
form_data[field.env_name] = ""
else:
form_data[field.env_name] = _stringify_for_form(getattr(settings, field.setting_attr))
form_data["APP_NAME"] = "Updated Home Automation"
form_data["HOME_ASSISTANT_AUTH_TOKEN"] = "new-token"
response = client.post("/config", data=form_data, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?saved=1"
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows["APP_NAME"] == "Updated Home Automation"
assert rows["HOME_ASSISTANT_AUTH_TOKEN"] == "new-token"
assert "AUTH_BOOTSTRAP_USERNAME" not in rows