M2-T01: add config JSON API (GET/PUT /api/config)

- new app/api/routes/api/ package with shared require_session (401) and
  require_csrf (presence-only X-CSRF-Token, 403) dependencies
- GET /api/config returns masked config sections; PUT /api/config reuses
  save_config_updates (blank secret keeps old; invalid -> 422, no write)
- session-protected; PUT also CSRF-protected
- register router in app/main.py; regenerate openapi/
- tests/test_api_config.py
This commit is contained in:
2026-06-12 23:08:14 +02:00
parent 3628ac51e5
commit c2b1b7b751
8 changed files with 770 additions and 0 deletions
View File
+71
View File
@@ -0,0 +1,71 @@
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.routes.api.deps import require_csrf, require_session
from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_db
from app.schemas.config import (
ConfigField,
ConfigResponse,
ConfigSection,
ConfigUpdateRequest,
ConfigUpdateResponse,
)
from app.services.auth import AuthenticatedSession
from app.services.config_page import ConfigSaveError, build_config_sections, save_config_updates
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["api-config"])
def _sections_from_raw(sections_raw: list[dict]) -> list[ConfigSection]:
result = []
for section in sections_raw:
fields = [ConfigField(**f) for f in section["fields"]]
result.append(ConfigSection(name=section["name"], fields=fields))
return result
@router.get("/config", response_model=ConfigResponse)
def get_config(
db: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
_auth: AuthenticatedSession = Depends(require_session),
) -> ConfigResponse:
"""Return all configuration sections. Secret field values are masked (empty string)."""
sections_raw = build_config_sections(db, settings)
return ConfigResponse(sections=_sections_from_raw(sections_raw))
@router.put("/config", response_model=ConfigUpdateResponse)
def put_config(
body: ConfigUpdateRequest,
db: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
_auth: AuthenticatedSession = Depends(require_session),
_csrf: None = Depends(require_csrf),
) -> ConfigUpdateResponse:
"""
Save configuration updates.
- Blank secret value keeps the existing stored value (no change).
- Invalid values return 422 and nothing is written to the database.
"""
try:
save_config_updates(db, body.updates, settings)
except ConfigSaveError as exc:
logger.warning("Rejected config update via API: %s", exc)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="invalid config submission",
) from exc
# Re-read settings after save (save_config_updates clears the settings cache)
refreshed_settings = get_settings()
sections_raw = build_config_sections(db, refreshed_settings)
return ConfigUpdateResponse(sections=_sections_from_raw(sections_raw))
+28
View File
@@ -0,0 +1,28 @@
from __future__ import annotations
from fastapi import Depends, Header, HTTPException, status
from app.dependencies import get_current_auth_session
from app.services.auth import AuthenticatedSession
def require_session(
auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> AuthenticatedSession:
if auth is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="authentication required",
)
return auth
def require_csrf(
_auth: AuthenticatedSession = Depends(require_session),
x_csrf_token: str | None = Header(default=None, alias="X-CSRF-Token"),
) -> None:
if not x_csrf_token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="missing CSRF token",
)
+2
View File
@@ -8,6 +8,7 @@ from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.orm import Session
from app import models # noqa: F401
from app.api.routes.api.config import router as api_config_router
from app.api.routes.auth import router as auth_router
from app.api.routes import pages, status
from app.db import get_session_local
@@ -91,6 +92,7 @@ def create_app() -> FastAPI:
app.include_router(status.router)
app.include_router(auth_router)
app.include_router(pages.router)
app.include_router(api_config_router)
app.include_router(homeassistant_router)
app.include_router(location_router)
app.include_router(poo_router)
+31
View File
@@ -0,0 +1,31 @@
from __future__ import annotations
from pydantic import BaseModel
class ConfigField(BaseModel):
env_name: str
label: str
value: str
secret: bool
input_type: str
configured: bool
class ConfigSection(BaseModel):
name: str
fields: list[ConfigField]
class ConfigResponse(BaseModel):
sections: list[ConfigSection]
class ConfigUpdateRequest(BaseModel):
"""Flat mapping of env_name → value, mirroring the existing form semantics."""
updates: dict[str, str]
class ConfigUpdateResponse(BaseModel):
sections: list[ConfigSection]
+188
View File
@@ -270,6 +270,86 @@
}
}
},
"/api/config": {
"get": {
"tags": [
"api-config"
],
"summary": "Get Config",
"description": "Return all configuration sections. Secret field values are masked (empty string).",
"operationId": "get_config_api_config_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ConfigResponse"
}
}
}
}
}
},
"put": {
"tags": [
"api-config"
],
"summary": "Put Config",
"description": "Save configuration updates.\n\n- Blank secret value keeps the existing stored value (no change).\n- Invalid values return 422 and nothing is written to the database.",
"operationId": "put_config_api_config_put",
"parameters": [
{
"name": "X-CSRF-Token",
"in": "header",
"required": false,
"schema": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "X-Csrf-Token"
}
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ConfigUpdateRequest"
}
}
}
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ConfigUpdateResponse"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/homeassistant/publish": {
"post": {
"tags": [
@@ -472,6 +552,114 @@
],
"title": "Body_logout_logout_post"
},
"ConfigField": {
"properties": {
"env_name": {
"type": "string",
"title": "Env Name"
},
"label": {
"type": "string",
"title": "Label"
},
"value": {
"type": "string",
"title": "Value"
},
"secret": {
"type": "boolean",
"title": "Secret"
},
"input_type": {
"type": "string",
"title": "Input Type"
},
"configured": {
"type": "boolean",
"title": "Configured"
}
},
"type": "object",
"required": [
"env_name",
"label",
"value",
"secret",
"input_type",
"configured"
],
"title": "ConfigField"
},
"ConfigResponse": {
"properties": {
"sections": {
"items": {
"$ref": "#/components/schemas/ConfigSection"
},
"type": "array",
"title": "Sections"
}
},
"type": "object",
"required": [
"sections"
],
"title": "ConfigResponse"
},
"ConfigSection": {
"properties": {
"name": {
"type": "string",
"title": "Name"
},
"fields": {
"items": {
"$ref": "#/components/schemas/ConfigField"
},
"type": "array",
"title": "Fields"
}
},
"type": "object",
"required": [
"name",
"fields"
],
"title": "ConfigSection"
},
"ConfigUpdateRequest": {
"properties": {
"updates": {
"additionalProperties": {
"type": "string"
},
"type": "object",
"title": "Updates"
}
},
"type": "object",
"required": [
"updates"
],
"title": "ConfigUpdateRequest",
"description": "Flat mapping of env_name → value, mirroring the existing form semantics."
},
"ConfigUpdateResponse": {
"properties": {
"sections": {
"items": {
"$ref": "#/components/schemas/ConfigSection"
},
"type": "array",
"title": "Sections"
}
},
"type": "object",
"required": [
"sections"
],
"title": "ConfigUpdateResponse"
},
"HTTPValidationError": {
"properties": {
"detail": {
+132
View File
@@ -168,6 +168,60 @@ paths:
text/html:
schema:
type: string
/api/config:
get:
tags:
- api-config
summary: Get Config
description: Return all configuration sections. Secret field values are masked
(empty string).
operationId: get_config_api_config_get
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConfigResponse'
put:
tags:
- api-config
summary: Put Config
description: 'Save configuration updates.
- Blank secret value keeps the existing stored value (no change).
- Invalid values return 422 and nothing is written to the database.'
operationId: put_config_api_config_put
parameters:
- name: X-CSRF-Token
in: header
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: X-Csrf-Token
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ConfigUpdateRequest'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConfigUpdateResponse'
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/homeassistant/publish:
post:
tags:
@@ -302,6 +356,84 @@ components:
required:
- csrf_token
title: Body_logout_logout_post
ConfigField:
properties:
env_name:
type: string
title: Env Name
label:
type: string
title: Label
value:
type: string
title: Value
secret:
type: boolean
title: Secret
input_type:
type: string
title: Input Type
configured:
type: boolean
title: Configured
type: object
required:
- env_name
- label
- value
- secret
- input_type
- configured
title: ConfigField
ConfigResponse:
properties:
sections:
items:
$ref: '#/components/schemas/ConfigSection'
type: array
title: Sections
type: object
required:
- sections
title: ConfigResponse
ConfigSection:
properties:
name:
type: string
title: Name
fields:
items:
$ref: '#/components/schemas/ConfigField'
type: array
title: Fields
type: object
required:
- name
- fields
title: ConfigSection
ConfigUpdateRequest:
properties:
updates:
additionalProperties:
type: string
type: object
title: Updates
type: object
required:
- updates
title: ConfigUpdateRequest
description: Flat mapping of env_name → value, mirroring the existing form semantics.
ConfigUpdateResponse:
properties:
sections:
items:
$ref: '#/components/schemas/ConfigSection'
type: array
title: Sections
type: object
required:
- sections
title: ConfigUpdateResponse
HTTPValidationError:
properties:
detail:
+318
View File
@@ -0,0 +1,318 @@
"""Tests for M2-T01: GET /api/config and PUT /api/config."""
from __future__ import annotations
import re
import sqlite3
from fastapi.testclient import TestClient
from app.config import get_settings
from app.services.config_page import CONFIG_FIELDS
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None, "csrf_token not found in HTML"
return match.group(1)
def _login(client: TestClient) -> None:
"""Log in as admin/test-password using the Jinja login form."""
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
resp = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert resp.status_code == 303, f"Login failed: {resp.status_code}"
def _stringify(value) -> str:
if value is None:
return ""
if isinstance(value, bool):
return str(value).lower()
return str(value)
def _full_config_payload(overrides: dict[str, str] | None = None) -> dict[str, str]:
"""Build a complete env_name→value dict mirroring the Jinja form's full submission.
Secrets default to "" (keep-old semantics). Non-secrets use current settings defaults.
"""
settings = get_settings()
payload: dict[str, str] = {}
for field in CONFIG_FIELDS:
if field.secret:
payload[field.env_name] = "" # blank → keep existing
else:
payload[field.env_name] = _stringify(getattr(settings, field.setting_attr))
if overrides:
payload.update(overrides)
return payload
# ---------------------------------------------------------------------------
# GET /api/config — unauthenticated
# ---------------------------------------------------------------------------
def test_get_config_unauthenticated_returns_401(client: TestClient) -> None:
response = client.get("/api/config")
assert response.status_code == 401
# ---------------------------------------------------------------------------
# GET /api/config — authenticated
# ---------------------------------------------------------------------------
def test_get_config_authenticated_returns_sections(client: TestClient) -> None:
_login(client)
response = client.get("/api/config")
assert response.status_code == 200
body = response.json()
assert "sections" in body
assert isinstance(body["sections"], list)
assert len(body["sections"]) > 0
def test_get_config_sections_have_expected_structure(client: TestClient) -> None:
_login(client)
response = client.get("/api/config")
body = response.json()
for section in body["sections"]:
assert "name" in section
assert "fields" in section
assert isinstance(section["fields"], list)
for field in section["fields"]:
assert "env_name" in field
assert "label" in field
assert "value" in field
assert "secret" in field
assert "input_type" in field
assert "configured" in field
def test_get_config_secret_fields_have_empty_string_value(client: TestClient) -> None:
_login(client)
response = client.get("/api/config")
body = response.json()
for section in body["sections"]:
for field in section["fields"]:
if field["secret"]:
assert field["value"] == "", (
f"Secret field {field['env_name']} should be masked (empty string), "
f"got {field['value']!r}"
)
def test_get_config_includes_known_sections(client: TestClient) -> None:
_login(client)
response = client.get("/api/config")
body = response.json()
section_names = {s["name"] for s in body["sections"]}
assert "System" in section_names
assert "SMTP" in section_names
assert "Authentication" in section_names
# ---------------------------------------------------------------------------
# PUT /api/config — unauthenticated
# ---------------------------------------------------------------------------
def test_put_config_unauthenticated_returns_401(client: TestClient) -> None:
response = client.put(
"/api/config",
json={"updates": _full_config_payload()},
headers={"X-CSRF-Token": "any-token"},
)
assert response.status_code == 401
# ---------------------------------------------------------------------------
# PUT /api/config — authenticated but missing CSRF
# ---------------------------------------------------------------------------
def test_put_config_authenticated_missing_csrf_returns_403(client: TestClient) -> None:
_login(client)
# No X-CSRF-Token header at all
response = client.put(
"/api/config",
json={"updates": _full_config_payload()},
)
assert response.status_code == 403
def test_put_config_authenticated_empty_csrf_returns_403(client: TestClient) -> None:
_login(client)
# Empty string X-CSRF-Token header counts as missing
response = client.put(
"/api/config",
json={"updates": _full_config_payload()},
headers={"X-CSRF-Token": ""},
)
assert response.status_code == 403
# ---------------------------------------------------------------------------
# PUT /api/config — authenticated + CSRF present (any non-empty value)
# ---------------------------------------------------------------------------
def test_put_config_with_csrf_header_updates_app_name(
client: TestClient, test_database_urls
) -> None:
_login(client)
payload = _full_config_payload({"APP_NAME": "Updated via API"})
response = client.put(
"/api/config",
json={"updates": payload},
headers={"X-CSRF-Token": "any-non-empty-value"},
)
assert response.status_code == 200
body = response.json()
assert "sections" in body
# The refreshed config in the response should reflect the new name
system_section = next(s for s in body["sections"] if s["name"] == "System")
app_name_field = next(f for f in system_section["fields"] if f["env_name"] == "APP_NAME")
assert app_name_field["value"] == "Updated via API"
def test_put_config_blank_secret_keeps_existing_value(
client: TestClient, test_database_urls
) -> None:
"""Submitting a blank value for a secret field must NOT overwrite the stored secret."""
_login(client)
# First: store a secret via a full PUT with the secret value set
payload_with_secret = _full_config_payload({"SMTP_PASSWORD": "original-secret"})
resp1 = client.put(
"/api/config",
json={"updates": payload_with_secret},
headers={"X-CSRF-Token": "token"},
)
assert resp1.status_code == 200, f"First PUT failed: {resp1.status_code} {resp1.text}"
# Second: PUT with blank for that secret (keep-old semantics)
payload_blank_secret = _full_config_payload({"SMTP_PASSWORD": ""})
resp2 = client.put(
"/api/config",
json={"updates": payload_blank_secret},
headers={"X-CSRF-Token": "token"},
)
assert resp2.status_code == 200, f"Second PUT failed: {resp2.status_code} {resp2.text}"
# The stored value in the DB should still be the original secret
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows.get("SMTP_PASSWORD") == "original-secret"
def test_put_config_returns_refreshed_sections(client: TestClient) -> None:
_login(client)
payload = _full_config_payload({"APP_NAME": "Refreshed Name"})
response = client.put(
"/api/config",
json={"updates": payload},
headers={"X-CSRF-Token": "token"},
)
assert response.status_code == 200
body = response.json()
assert "sections" in body
assert isinstance(body["sections"], list)
assert len(body["sections"]) > 0
# Sections should reflect updated value
system_section = next(s for s in body["sections"] if s["name"] == "System")
app_name_field = next(f for f in system_section["fields"] if f["env_name"] == "APP_NAME")
assert app_name_field["value"] == "Refreshed Name"
def test_put_config_invalid_value_returns_422_and_does_not_write(
client: TestClient, test_database_urls
) -> None:
"""An invalid config value (e.g. bad type for a typed field) must return 4xx and not persist."""
_login(client)
# SMTP_PORT expects an integer; submit something that fails Settings validation
payload = _full_config_payload({"SMTP_PORT": "not-a-number"})
response = client.put(
"/api/config",
json={"updates": payload},
headers={"X-CSRF-Token": "token"},
)
assert response.status_code == 422
# Confirm the bad value was not persisted
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows.get("SMTP_PORT") != "not-a-number"
# ---------------------------------------------------------------------------
# Response schema correctness — secret values never leak in response
# ---------------------------------------------------------------------------
def test_put_config_response_does_not_leak_secret_values(client: TestClient) -> None:
_login(client)
# Set a secret
payload1 = _full_config_payload({"HOME_ASSISTANT_AUTH_TOKEN": "super-secret-token"})
resp1 = client.put(
"/api/config",
json={"updates": payload1},
headers={"X-CSRF-Token": "token"},
)
assert resp1.status_code == 200
# Do another PUT and check response doesn't leak the secret
payload2 = _full_config_payload({"APP_NAME": "check-secrets"})
response = client.put(
"/api/config",
json={"updates": payload2},
headers={"X-CSRF-Token": "token"},
)
assert response.status_code == 200
body = response.json()
for section in body["sections"]:
for field in section["fields"]:
if field["secret"]:
assert field["value"] == "", (
f"Secret field {field['env_name']} leaked in PUT response"
)
# The secret value itself should not appear anywhere in the raw response
assert "super-secret-token" not in str(body)