Files
home-automation/app/api/routes/auth.py
T

235 lines
7.9 KiB
Python

import logging
from pathlib import Path
from fastapi import APIRouter, Depends, Form, Request, status
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from app.config import Settings
from app.dependencies import get_app_settings, get_auth_db, get_current_auth_session
from app.services.auth import (
AuthenticatedSession,
authenticate_user,
change_password,
create_session,
AuthPasswordChangeError,
issue_login_csrf_token,
revoke_session,
validate_csrf_token,
)
from app.services.config_page import build_config_sections, is_ticktick_oauth_ready
logger = logging.getLogger(__name__)
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[2] / "templates"))
router = APIRouter(tags=["auth"])
LOGIN_CSRF_COOKIE_NAME = "login_csrf"
@router.get("/login", response_class=HTMLResponse)
def login_page(
request: Request,
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is not None:
return RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
csrf_token = issue_login_csrf_token()
response = templates.TemplateResponse(
request,
"login.html",
{
"app_name": settings.app_name,
"app_env": settings.app_env,
"csrf_token": csrf_token,
"error_message": None,
},
)
_set_login_csrf_cookie(response, settings=settings, token=csrf_token)
return response
@router.post("/login", response_class=HTMLResponse)
def login_submit(
request: Request,
username: str = Form(),
password: str = Form(),
csrf_token: str = Form(),
session: Session = Depends(get_auth_db),
settings: Settings = Depends(get_app_settings),
) -> Response:
cookie_csrf_token = request.cookies.get(LOGIN_CSRF_COOKIE_NAME)
if not validate_csrf_token(expected=cookie_csrf_token, actual=csrf_token):
logger.warning("Rejected login attempt due to CSRF validation failure")
return _render_login_error(
request,
settings=settings,
status_code=status.HTTP_400_BAD_REQUEST,
error_message="invalid login request",
)
user = authenticate_user(session, username=username, password=password)
if user is None:
return _render_login_error(
request,
settings=settings,
status_code=status.HTTP_401_UNAUTHORIZED,
error_message="invalid username or password",
)
auth_session, raw_token = create_session(session, user=user, settings=settings)
response = RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(LOGIN_CSRF_COOKIE_NAME, path="/login")
response.set_cookie(
key=settings.auth_session_cookie_name,
value=raw_token,
max_age=settings.auth_session_ttl_hours * 3600,
httponly=True,
secure=settings.auth_cookie_secure,
samesite="lax",
path="/",
)
logger.info("Created authenticated session for user '%s'", user.username)
return response
@router.post("/config/change-password", response_class=HTMLResponse)
def change_password_submit(
request: Request,
current_password: str = Form(),
new_password: str = Form(),
confirm_password: str = Form(),
csrf_token: str = Form(),
session: Session = Depends(get_auth_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
if not validate_csrf_token(expected=current_auth.session.csrf_token, actual=csrf_token):
logger.warning("Rejected password change attempt due to CSRF validation failure")
return _render_config_page(
request,
settings=settings,
auth_db_session=session,
current_auth=current_auth,
status_code=status.HTTP_400_BAD_REQUEST,
password_change_error="invalid password change request",
)
try:
change_password(
session,
user=current_auth.user,
current_password=current_password,
new_password=new_password,
confirm_password=confirm_password,
)
except AuthPasswordChangeError as exc:
logger.info(
"Rejected password change for user '%s': %s",
current_auth.user.username,
exc,
)
return _render_config_page(
request,
settings=settings,
auth_db_session=session,
current_auth=current_auth,
status_code=status.HTTP_400_BAD_REQUEST,
password_change_error="password change failed",
)
logger.info("Password updated for user '%s'", current_auth.user.username)
return RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
@router.post("/logout")
def logout(
request: Request,
csrf_token: str = Form(),
session: Session = Depends(get_auth_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> RedirectResponse:
if current_auth is not None and validate_csrf_token(
expected=current_auth.session.csrf_token, actual=csrf_token
):
revoke_session(session, auth_session=current_auth.session)
logger.info("Revoked authenticated session for user '%s'", current_auth.user.username)
else:
logger.warning("Rejected logout request due to missing session or invalid CSRF token")
response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(settings.auth_session_cookie_name, path="/")
return response
def _render_login_error(
request: Request,
*,
settings: Settings,
status_code: int,
error_message: str,
) -> HTMLResponse:
csrf_token = issue_login_csrf_token()
response = templates.TemplateResponse(
request,
"login.html",
{
"app_name": settings.app_name,
"app_env": settings.app_env,
"csrf_token": csrf_token,
"error_message": error_message,
},
status_code=status_code,
)
_set_login_csrf_cookie(response, settings=settings, token=csrf_token)
return response
def _set_login_csrf_cookie(response: HTMLResponse, *, settings: Settings, token: str) -> None:
response.set_cookie(
key=LOGIN_CSRF_COOKIE_NAME,
value=token,
max_age=1800,
httponly=True,
secure=settings.auth_cookie_secure,
samesite="lax",
path="/login",
)
def _render_config_page(
request: Request,
*,
settings: Settings,
auth_db_session: Session,
current_auth: AuthenticatedSession,
status_code: int,
password_change_error: str | None,
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"config.html",
{
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": password_change_error,
"config_error": None,
"config_saved": False,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": None,
"ticktick_oauth_error": None,
},
status_code=status_code,
)