M2-T02: add session/auth JSON API for the SPA
- GET /api/session (user + csrf_token, 401 when unauthenticated) - POST /api/auth/login (sets HttpOnly session cookie; 401 on bad creds; no CSRF) - POST /api/auth/logout (session+CSRF; revokes session, clears cookie; 204) - POST /api/auth/password (session+CSRF; reuses change_password; 400 on failure; 204) - reuses app/services/auth.py and shared require_session/require_csrf deps - register router in app/main.py; regenerate openapi/ - tests/test_api_session.py
This commit is contained in:
@@ -0,0 +1,352 @@
|
||||
"""Tests for M2-T02: GET /api/session, POST /api/auth/login, /logout, /password."""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _extract_csrf_token(html: str) -> str:
|
||||
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
|
||||
assert match is not None, "csrf_token not found in HTML"
|
||||
return match.group(1)
|
||||
|
||||
|
||||
def _jinja_login(client: TestClient) -> None:
|
||||
"""Log in via the existing Jinja form so the client has a session cookie."""
|
||||
login_page = client.get("/login")
|
||||
csrf_token = _extract_csrf_token(login_page.text)
|
||||
resp = client.post(
|
||||
"/login",
|
||||
data={"username": "admin", "password": "test-password", "csrf_token": csrf_token},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 303, f"Jinja login failed: {resp.status_code}"
|
||||
|
||||
|
||||
def _api_login(client: TestClient, *, username: str = "admin", password: str = "test-password"):
|
||||
"""Log in via POST /api/auth/login and return the response."""
|
||||
return client.post(
|
||||
"/api/auth/login",
|
||||
json={"username": username, "password": password},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/session — unauthenticated
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_session_unauthenticated_returns_401(client: TestClient) -> None:
|
||||
response = client.get("/api/session")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/session — authenticated (via Jinja login)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_session_authenticated_returns_user_and_csrf(client: TestClient) -> None:
|
||||
_jinja_login(client)
|
||||
|
||||
response = client.get("/api/session")
|
||||
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert "user" in body
|
||||
assert "csrf_token" in body
|
||||
assert body["user"]["username"] == "admin"
|
||||
assert isinstance(body["user"]["force_password_change"], bool)
|
||||
assert isinstance(body["csrf_token"], str)
|
||||
assert body["csrf_token"] # non-empty
|
||||
|
||||
|
||||
def test_get_session_does_not_leak_password(client: TestClient) -> None:
|
||||
_jinja_login(client)
|
||||
response = client.get("/api/session")
|
||||
body_str = str(response.json())
|
||||
assert "test-password" not in body_str
|
||||
assert "password_hash" not in body_str
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/auth/login
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_post_login_valid_credentials_returns_200_with_session(client: TestClient) -> None:
|
||||
response = _api_login(client)
|
||||
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert "user" in body
|
||||
assert "csrf_token" in body
|
||||
assert body["user"]["username"] == "admin"
|
||||
assert isinstance(body["csrf_token"], str)
|
||||
assert body["csrf_token"]
|
||||
|
||||
|
||||
def test_post_login_sets_httponly_session_cookie(client: TestClient) -> None:
|
||||
response = _api_login(client)
|
||||
|
||||
assert response.status_code == 200
|
||||
set_cookie = response.headers.get("set-cookie", "").lower()
|
||||
assert "home_automation_session=" in set_cookie
|
||||
assert "httponly" in set_cookie
|
||||
assert "samesite=lax" in set_cookie
|
||||
assert "path=/" in set_cookie
|
||||
|
||||
|
||||
def test_post_login_cookie_secure_flag_follows_settings(client: TestClient) -> None:
|
||||
"""In test mode AUTH_COOKIE_SECURE_OVERRIDE=false so secure should be absent."""
|
||||
response = _api_login(client)
|
||||
|
||||
assert response.status_code == 200
|
||||
set_cookie = response.headers.get("set-cookie", "").lower()
|
||||
# secure is absent because AUTH_COOKIE_SECURE_OVERRIDE=false in conftest
|
||||
assert "secure" not in set_cookie
|
||||
|
||||
|
||||
def test_post_login_invalid_credentials_returns_401(client: TestClient) -> None:
|
||||
response = _api_login(client, password="wrong-password")
|
||||
|
||||
assert response.status_code == 401
|
||||
# No session cookie should be set
|
||||
assert "set-cookie" not in response.headers or (
|
||||
"home_automation_session=" not in response.headers.get("set-cookie", "").lower()
|
||||
)
|
||||
|
||||
|
||||
def test_post_login_unknown_user_returns_401(client: TestClient) -> None:
|
||||
response = _api_login(client, username="nobody", password="irrelevant")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_post_login_does_not_require_csrf_header(client: TestClient) -> None:
|
||||
"""Login is unauthenticated; no X-CSRF-Token should be required."""
|
||||
response = client.post(
|
||||
"/api/auth/login",
|
||||
json={"username": "admin", "password": "test-password"},
|
||||
# Deliberately omit X-CSRF-Token
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_post_login_allows_subsequent_authenticated_request(client: TestClient) -> None:
|
||||
login_resp = _api_login(client)
|
||||
assert login_resp.status_code == 200
|
||||
|
||||
# GET /api/session should now succeed (cookie was set on the client)
|
||||
session_resp = client.get("/api/session")
|
||||
assert session_resp.status_code == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/auth/logout
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_post_logout_unauthenticated_returns_401(client: TestClient) -> None:
|
||||
response = client.post("/api/auth/logout", headers={"X-CSRF-Token": "token"})
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_post_logout_authenticated_missing_csrf_returns_403(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post("/api/auth/logout")
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_post_logout_authenticated_empty_csrf_returns_403(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post("/api/auth/logout", headers={"X-CSRF-Token": ""})
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_post_logout_authenticated_with_csrf_returns_204(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post("/api/auth/logout", headers={"X-CSRF-Token": "any-non-empty-value"})
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
def test_post_logout_invalidates_session(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
|
||||
# Verify session is active
|
||||
assert client.get("/api/session").status_code == 200
|
||||
|
||||
# Logout
|
||||
client.post("/api/auth/logout", headers={"X-CSRF-Token": "token"})
|
||||
|
||||
# Session should now be gone
|
||||
assert client.get("/api/session").status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/auth/password
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_post_password_unauthenticated_returns_401(client: TestClient) -> None:
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "new-password-123",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_post_password_authenticated_missing_csrf_returns_403(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "new-password-123",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_post_password_success_returns_204(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "new-password-123",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
def test_post_password_wrong_current_password_returns_400(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "wrong-current",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "new-password-123",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
# Error message must be generic — no leaking which check failed
|
||||
detail = response.json().get("detail", "")
|
||||
assert "current password is invalid" not in detail
|
||||
assert detail == "password change failed"
|
||||
|
||||
|
||||
def test_post_password_mismatched_new_passwords_returns_400(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "different-password-123",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert response.json()["detail"] == "password change failed"
|
||||
|
||||
|
||||
def test_post_password_too_short_returns_400(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "short",
|
||||
"confirm_password": "short",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert response.json()["detail"] == "password change failed"
|
||||
|
||||
|
||||
def test_post_password_same_as_current_returns_400(client: TestClient) -> None:
|
||||
_api_login(client)
|
||||
response = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "test-password",
|
||||
"confirm_password": "test-password",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert response.json()["detail"] == "password change failed"
|
||||
|
||||
|
||||
def test_post_password_success_sets_force_password_change_false(client: TestClient) -> None:
|
||||
"""After successful password change, force_password_change should be False."""
|
||||
_api_login(client)
|
||||
|
||||
# The bootstrap user always has force_password_change=True; change it
|
||||
resp = client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "new-password-123",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
assert resp.status_code == 204
|
||||
|
||||
# Session still active; force_password_change should now be False
|
||||
session_resp = client.get("/api/session")
|
||||
assert session_resp.status_code == 200
|
||||
assert session_resp.json()["user"]["force_password_change"] is False
|
||||
|
||||
|
||||
def test_post_password_does_not_revoke_session(client: TestClient) -> None:
|
||||
"""After password change, the session remains valid (not revoked)."""
|
||||
_api_login(client)
|
||||
|
||||
client.post(
|
||||
"/api/auth/password",
|
||||
json={
|
||||
"current_password": "test-password",
|
||||
"new_password": "new-password-123",
|
||||
"confirm_password": "new-password-123",
|
||||
},
|
||||
headers={"X-CSRF-Token": "token"},
|
||||
)
|
||||
|
||||
# Session must still be active
|
||||
assert client.get("/api/session").status_code == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Response schema correctness — no secrets in session response
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_session_response_has_no_secret_fields(client: TestClient) -> None:
|
||||
login_resp = _api_login(client)
|
||||
assert login_resp.status_code == 200
|
||||
body = login_resp.json()
|
||||
|
||||
# Must have exactly these top-level keys
|
||||
assert set(body.keys()) == {"user", "csrf_token"}
|
||||
# user must have exactly these keys
|
||||
assert set(body["user"].keys()) == {"username", "force_password_change"}
|
||||
Reference in New Issue
Block a user