"""Tests for M2-T01: GET /api/config and PUT /api/config. Tests for M2-T05: POST /api/config/smtp/test.""" from __future__ import annotations import re import sqlite3 from unittest.mock import patch from fastapi.testclient import TestClient from app.config import get_settings from app.services.config_page import CONFIG_FIELDS from app.services.email import EmailConfigurationError, EmailDeliveryError # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _extract_csrf_token(html: str) -> str: match = re.search(r'name="csrf_token" value="([^"]+)"', html) assert match is not None, "csrf_token not found in HTML" return match.group(1) def _login(client: TestClient) -> None: """Log in as admin/test-password using the Jinja login form.""" login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) resp = client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) assert resp.status_code == 303, f"Login failed: {resp.status_code}" def _stringify(value) -> str: if value is None: return "" if isinstance(value, bool): return str(value).lower() return str(value) def _full_config_payload(overrides: dict[str, str] | None = None) -> dict[str, str]: """Build a complete env_name→value dict mirroring the Jinja form's full submission. Secrets default to "" (keep-old semantics). Non-secrets use current settings defaults. """ settings = get_settings() payload: dict[str, str] = {} for field in CONFIG_FIELDS: if field.secret: payload[field.env_name] = "" # blank → keep existing else: payload[field.env_name] = _stringify(getattr(settings, field.setting_attr)) if overrides: payload.update(overrides) return payload # --------------------------------------------------------------------------- # GET /api/config — unauthenticated # --------------------------------------------------------------------------- def test_get_config_unauthenticated_returns_401(client: TestClient) -> None: response = client.get("/api/config") assert response.status_code == 401 # --------------------------------------------------------------------------- # GET /api/config — authenticated # --------------------------------------------------------------------------- def test_get_config_authenticated_returns_sections(client: TestClient) -> None: _login(client) response = client.get("/api/config") assert response.status_code == 200 body = response.json() assert "sections" in body assert isinstance(body["sections"], list) assert len(body["sections"]) > 0 def test_get_config_sections_have_expected_structure(client: TestClient) -> None: _login(client) response = client.get("/api/config") body = response.json() for section in body["sections"]: assert "name" in section assert "fields" in section assert isinstance(section["fields"], list) for field in section["fields"]: assert "env_name" in field assert "label" in field assert "value" in field assert "secret" in field assert "input_type" in field assert "configured" in field def test_get_config_secret_fields_have_empty_string_value(client: TestClient) -> None: _login(client) response = client.get("/api/config") body = response.json() for section in body["sections"]: for field in section["fields"]: if field["secret"]: assert field["value"] == "", ( f"Secret field {field['env_name']} should be masked (empty string), " f"got {field['value']!r}" ) def test_get_config_includes_known_sections(client: TestClient) -> None: _login(client) response = client.get("/api/config") body = response.json() section_names = {s["name"] for s in body["sections"]} assert "System" in section_names assert "SMTP" in section_names assert "Authentication" in section_names # --------------------------------------------------------------------------- # PUT /api/config — unauthenticated # --------------------------------------------------------------------------- def test_put_config_unauthenticated_returns_401(client: TestClient) -> None: response = client.put( "/api/config", json={"updates": _full_config_payload()}, headers={"X-CSRF-Token": "any-token"}, ) assert response.status_code == 401 # --------------------------------------------------------------------------- # PUT /api/config — authenticated but missing CSRF # --------------------------------------------------------------------------- def test_put_config_authenticated_missing_csrf_returns_403(client: TestClient) -> None: _login(client) # No X-CSRF-Token header at all response = client.put( "/api/config", json={"updates": _full_config_payload()}, ) assert response.status_code == 403 def test_put_config_authenticated_empty_csrf_returns_403(client: TestClient) -> None: _login(client) # Empty string X-CSRF-Token header counts as missing response = client.put( "/api/config", json={"updates": _full_config_payload()}, headers={"X-CSRF-Token": ""}, ) assert response.status_code == 403 # --------------------------------------------------------------------------- # PUT /api/config — authenticated + CSRF present (any non-empty value) # --------------------------------------------------------------------------- def test_put_config_with_csrf_header_updates_app_name( client: TestClient, test_database_urls ) -> None: _login(client) payload = _full_config_payload({"APP_NAME": "Updated via API"}) response = client.put( "/api/config", json={"updates": payload}, headers={"X-CSRF-Token": "any-non-empty-value"}, ) assert response.status_code == 200 body = response.json() assert "sections" in body # The refreshed config in the response should reflect the new name system_section = next(s for s in body["sections"] if s["name"] == "System") app_name_field = next(f for f in system_section["fields"] if f["env_name"] == "APP_NAME") assert app_name_field["value"] == "Updated via API" def test_put_config_blank_secret_keeps_existing_value( client: TestClient, test_database_urls ) -> None: """Submitting a blank value for a secret field must NOT overwrite the stored secret.""" _login(client) # First: store a secret via a full PUT with the secret value set payload_with_secret = _full_config_payload({"SMTP_PASSWORD": "original-secret"}) resp1 = client.put( "/api/config", json={"updates": payload_with_secret}, headers={"X-CSRF-Token": "token"}, ) assert resp1.status_code == 200, f"First PUT failed: {resp1.status_code} {resp1.text}" # Second: PUT with blank for that secret (keep-old semantics) payload_blank_secret = _full_config_payload({"SMTP_PASSWORD": ""}) resp2 = client.put( "/api/config", json={"updates": payload_blank_secret}, headers={"X-CSRF-Token": "token"}, ) assert resp2.status_code == 200, f"Second PUT failed: {resp2.status_code} {resp2.text}" # The stored value in the DB should still be the original secret conn = sqlite3.connect(test_database_urls["app_path"]) try: rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall()) finally: conn.close() assert rows.get("SMTP_PASSWORD") == "original-secret" def test_put_config_returns_refreshed_sections(client: TestClient) -> None: _login(client) payload = _full_config_payload({"APP_NAME": "Refreshed Name"}) response = client.put( "/api/config", json={"updates": payload}, headers={"X-CSRF-Token": "token"}, ) assert response.status_code == 200 body = response.json() assert "sections" in body assert isinstance(body["sections"], list) assert len(body["sections"]) > 0 # Sections should reflect updated value system_section = next(s for s in body["sections"] if s["name"] == "System") app_name_field = next(f for f in system_section["fields"] if f["env_name"] == "APP_NAME") assert app_name_field["value"] == "Refreshed Name" def test_put_config_invalid_value_returns_422_and_does_not_write( client: TestClient, test_database_urls ) -> None: """An invalid config value (e.g. bad type for a typed field) must return 4xx and not persist.""" _login(client) # SMTP_PORT expects an integer; submit something that fails Settings validation payload = _full_config_payload({"SMTP_PORT": "not-a-number"}) response = client.put( "/api/config", json={"updates": payload}, headers={"X-CSRF-Token": "token"}, ) assert response.status_code == 422 # Confirm the bad value was not persisted conn = sqlite3.connect(test_database_urls["app_path"]) try: rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall()) finally: conn.close() assert rows.get("SMTP_PORT") != "not-a-number" # --------------------------------------------------------------------------- # Response schema correctness — secret values never leak in response # --------------------------------------------------------------------------- def test_put_config_response_does_not_leak_secret_values(client: TestClient) -> None: _login(client) # Set a secret payload1 = _full_config_payload({"HOME_ASSISTANT_AUTH_TOKEN": "super-secret-token"}) resp1 = client.put( "/api/config", json={"updates": payload1}, headers={"X-CSRF-Token": "token"}, ) assert resp1.status_code == 200 # Do another PUT and check response doesn't leak the secret payload2 = _full_config_payload({"APP_NAME": "check-secrets"}) response = client.put( "/api/config", json={"updates": payload2}, headers={"X-CSRF-Token": "token"}, ) assert response.status_code == 200 body = response.json() for section in body["sections"]: for field in section["fields"]: if field["secret"]: assert field["value"] == "", ( f"Secret field {field['env_name']} leaked in PUT response" ) # The secret value itself should not appear anywhere in the raw response assert "super-secret-token" not in str(body) # --------------------------------------------------------------------------- # POST /api/config/smtp/test — M2-T05 # --------------------------------------------------------------------------- _SMTP_TEST_URL = "/api/config/smtp/test" _SMTP_SEND_PATH = "app.api.routes.api.config.send_smtp_test_email" def test_smtp_test_unauthenticated_returns_401(client: TestClient) -> None: """Unauthenticated request must return 401 (require_session fires before require_csrf).""" response = client.post( _SMTP_TEST_URL, headers={"X-CSRF-Token": "any-token"}, ) assert response.status_code == 401 def test_smtp_test_authenticated_missing_csrf_returns_403(client: TestClient) -> None: """Authenticated but no X-CSRF-Token header must return 403.""" _login(client) response = client.post(_SMTP_TEST_URL) assert response.status_code == 403 def test_smtp_test_authenticated_empty_csrf_returns_403(client: TestClient) -> None: """Authenticated but empty X-CSRF-Token header must return 403.""" _login(client) response = client.post(_SMTP_TEST_URL, headers={"X-CSRF-Token": ""}) assert response.status_code == 403 def test_smtp_test_success_returns_200(client: TestClient) -> None: """When send_smtp_test_email succeeds (returns None), endpoint returns 200 with result=success.""" _login(client) with patch(_SMTP_SEND_PATH, return_value=None) as mock_send: response = client.post(_SMTP_TEST_URL, headers={"X-CSRF-Token": "any-token"}) mock_send.assert_called_once() assert response.status_code == 200 body = response.json() assert body["result"] == "success" assert "message" in body def test_smtp_test_config_error_returns_400(client: TestClient) -> None: """When send_smtp_test_email raises EmailConfigurationError, endpoint returns 400 with result=config-error.""" _login(client) with patch(_SMTP_SEND_PATH, side_effect=EmailConfigurationError("SMTP host is required")): response = client.post(_SMTP_TEST_URL, headers={"X-CSRF-Token": "any-token"}) assert response.status_code == 400 body = response.json() assert body["result"] == "config-error" assert "message" in body assert "SMTP host is required" in body["message"] def test_smtp_test_delivery_error_returns_502(client: TestClient) -> None: """When send_smtp_test_email raises EmailDeliveryError, endpoint returns 502 with result=failed.""" _login(client) with patch(_SMTP_SEND_PATH, side_effect=EmailDeliveryError("connection refused")): response = client.post(_SMTP_TEST_URL, headers={"X-CSRF-Token": "any-token"}) assert response.status_code == 502 body = response.json() assert body["result"] == "failed" assert "message" in body assert "connection refused" in body["message"] def test_smtp_test_response_does_not_echo_smtp_password(client: TestClient) -> None: """The SMTP password stored in config must not appear in any API response body.""" _login(client) smtp_password = "s3cr3t-smtp-pass" # Store a fake SMTP password in config payload = _full_config_payload({"SMTP_PASSWORD": smtp_password}) client.put( "/api/config", json={"updates": payload}, headers={"X-CSRF-Token": "token"}, ) # Simulate a delivery error whose message has already been sanitised by the # service layer (i.e. the password does not appear in the exception text). # This mirrors production behaviour: email.py's _sanitize_error_message # replaces any password occurrence with "[redacted]" before raising. with patch( _SMTP_SEND_PATH, side_effect=EmailDeliveryError("authentication failure: [redacted]"), ): response = client.post(_SMTP_TEST_URL, headers={"X-CSRF-Token": "token"}) assert response.status_code == 502 body = response.json() assert "result" in body assert "message" in body # The plaintext password must not appear anywhere in the response body assert smtp_password not in response.text