Files
home-automation/app/api/routes/api/session.py
T
tliu93 8da1f13e60 M2-T02: add session/auth JSON API for the SPA
- GET /api/session (user + csrf_token, 401 when unauthenticated)
- POST /api/auth/login (sets HttpOnly session cookie; 401 on bad creds; no CSRF)
- POST /api/auth/logout (session+CSRF; revokes session, clears cookie; 204)
- POST /api/auth/password (session+CSRF; reuses change_password; 400 on failure; 204)
- reuses app/services/auth.py and shared require_session/require_csrf deps
- register router in app/main.py; regenerate openapi/
- tests/test_api_session.py
2026-06-12 23:15:56 +02:00

142 lines
4.4 KiB
Python

from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, HTTPException, Response, status
from sqlalchemy.orm import Session
from app.api.routes.api.deps import require_csrf, require_session
from app.config import Settings
from app.dependencies import get_app_settings, get_db
from app.schemas.session import (
LoginRequest,
PasswordChangeRequest,
SessionResponse,
SessionUser,
)
from app.services.auth import (
AuthPasswordChangeError,
AuthenticatedSession,
authenticate_user,
change_password,
create_session,
revoke_session,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["api-session"])
def _build_session_response(auth: AuthenticatedSession) -> SessionResponse:
return SessionResponse(
user=SessionUser(
username=auth.user.username,
force_password_change=auth.user.force_password_change,
),
csrf_token=auth.session.csrf_token,
)
@router.get("/session", response_model=SessionResponse)
def get_session(
auth: AuthenticatedSession = Depends(require_session),
) -> SessionResponse:
"""Return the current session user and CSRF token. Returns 401 if not authenticated."""
return _build_session_response(auth)
@router.post("/auth/login", response_model=SessionResponse)
def post_login(
body: LoginRequest,
response: Response,
db: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
) -> SessionResponse:
"""
Authenticate with username and password.
On success, sets an HttpOnly session cookie and returns the session user + CSRF token.
On failure, returns 401 with no cookie set.
No X-CSRF-Token required (unauthenticated endpoint).
"""
user = authenticate_user(db, username=body.username, password=body.password)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="invalid username or password",
)
auth_session, raw_token = create_session(db, user=user, settings=settings)
logger.info("Created API authenticated session for user '%s'", user.username)
response.set_cookie(
key=settings.auth_session_cookie_name,
value=raw_token,
max_age=settings.auth_session_ttl_hours * 3600,
httponly=True,
secure=settings.auth_cookie_secure,
samesite="lax",
path="/",
)
auth = AuthenticatedSession(user=user, session=auth_session)
return _build_session_response(auth)
@router.post("/auth/logout")
def post_logout(
response: Response,
db: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
auth: AuthenticatedSession = Depends(require_session),
_csrf: None = Depends(require_csrf),
) -> Response:
"""
Revoke the current session and clear the session cookie.
Requires authentication and X-CSRF-Token header.
Returns 204 No Content.
"""
revoke_session(db, auth_session=auth.session)
logger.info("Revoked API authenticated session for user '%s'", auth.user.username)
no_content = Response(status_code=status.HTTP_204_NO_CONTENT)
no_content.delete_cookie(settings.auth_session_cookie_name, path="/")
return no_content
@router.post("/auth/password")
def post_change_password(
body: PasswordChangeRequest,
db: Session = Depends(get_db),
auth: AuthenticatedSession = Depends(require_session),
_csrf: None = Depends(require_csrf),
) -> Response:
"""
Change the current user's password.
Requires authentication and X-CSRF-Token header.
On AuthPasswordChangeError returns 400 with a generic message.
On success, force_password_change becomes False (handled by the service).
Returns 204 No Content.
"""
try:
change_password(
db,
user=auth.user,
current_password=body.current_password,
new_password=body.new_password,
confirm_password=body.confirm_password,
)
except AuthPasswordChangeError as exc:
logger.info(
"Rejected password change for user '%s': %s",
auth.user.username,
exc,
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="password change failed",
) from exc
logger.info("Password updated for user '%s'", auth.user.username)
return Response(status_code=status.HTTP_204_NO_CONTENT)