M2-T11: serve React SPA from FastAPI and remove Jinja pages

- app/main.py serves the SPA build (SPA_DIST_DIR, default frontend/dist):
  mounts /assets and a GET catch-all returning index.html for client routes;
  catch-all 404s on /api/*, never swallows /docs, /openapi.json, /static, assets,
  ingestion/ticktick/status; skips SPA serving when dist absent (backend-only CI)
- delete app/api/routes/pages.py, app/api/routes/auth.py, app/templates/
  (all replaced by /api/* + SPA; auth service layer kept)
- remove/replace Jinja page tests (JSON coverage already in test_api_*);
  add tests/test_spa_hosting.py for the fallback contract
- regenerate openapi/ (Jinja paths gone) and frontend schema.d.ts
This commit is contained in:
2026-06-13 11:29:14 +02:00
parent 8aa7316b26
commit a9830c42d8
18 changed files with 319 additions and 2094 deletions
-234
View File
@@ -1,234 +0,0 @@
import logging
from pathlib import Path
from fastapi import APIRouter, Depends, Form, Request, status
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from app.config import Settings
from app.dependencies import get_app_settings, get_db, get_current_auth_session
from app.services.auth import (
AuthenticatedSession,
authenticate_user,
change_password,
create_session,
AuthPasswordChangeError,
issue_login_csrf_token,
revoke_session,
validate_csrf_token,
)
from app.services.config_page import build_config_sections, is_ticktick_oauth_ready
logger = logging.getLogger(__name__)
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[2] / "templates"))
router = APIRouter(tags=["auth"])
LOGIN_CSRF_COOKIE_NAME = "login_csrf"
@router.get("/login", response_class=HTMLResponse)
def login_page(
request: Request,
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is not None:
return RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
csrf_token = issue_login_csrf_token()
response = templates.TemplateResponse(
request,
"login.html",
{
"app_name": settings.app_name,
"app_env": settings.app_env,
"csrf_token": csrf_token,
"error_message": None,
},
)
_set_login_csrf_cookie(response, settings=settings, token=csrf_token)
return response
@router.post("/login", response_class=HTMLResponse)
def login_submit(
request: Request,
username: str = Form(),
password: str = Form(),
csrf_token: str = Form(),
session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
) -> Response:
cookie_csrf_token = request.cookies.get(LOGIN_CSRF_COOKIE_NAME)
if not validate_csrf_token(expected=cookie_csrf_token, actual=csrf_token):
logger.warning("Rejected login attempt due to CSRF validation failure")
return _render_login_error(
request,
settings=settings,
status_code=status.HTTP_400_BAD_REQUEST,
error_message="invalid login request",
)
user = authenticate_user(session, username=username, password=password)
if user is None:
return _render_login_error(
request,
settings=settings,
status_code=status.HTTP_401_UNAUTHORIZED,
error_message="invalid username or password",
)
auth_session, raw_token = create_session(session, user=user, settings=settings)
response = RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(LOGIN_CSRF_COOKIE_NAME, path="/login")
response.set_cookie(
key=settings.auth_session_cookie_name,
value=raw_token,
max_age=settings.auth_session_ttl_hours * 3600,
httponly=True,
secure=settings.auth_cookie_secure,
samesite="lax",
path="/",
)
logger.info("Created authenticated session for user '%s'", user.username)
return response
@router.post("/config/change-password", response_class=HTMLResponse)
def change_password_submit(
request: Request,
current_password: str = Form(),
new_password: str = Form(),
confirm_password: str = Form(),
csrf_token: str = Form(),
session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
if not validate_csrf_token(expected=current_auth.session.csrf_token, actual=csrf_token):
logger.warning("Rejected password change attempt due to CSRF validation failure")
return _render_config_page(
request,
settings=settings,
auth_db_session=session,
current_auth=current_auth,
status_code=status.HTTP_400_BAD_REQUEST,
password_change_error="invalid password change request",
)
try:
change_password(
session,
user=current_auth.user,
current_password=current_password,
new_password=new_password,
confirm_password=confirm_password,
)
except AuthPasswordChangeError as exc:
logger.info(
"Rejected password change for user '%s': %s",
current_auth.user.username,
exc,
)
return _render_config_page(
request,
settings=settings,
auth_db_session=session,
current_auth=current_auth,
status_code=status.HTTP_400_BAD_REQUEST,
password_change_error="password change failed",
)
logger.info("Password updated for user '%s'", current_auth.user.username)
return RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
@router.post("/logout")
def logout(
request: Request,
csrf_token: str = Form(),
session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> RedirectResponse:
if current_auth is not None and validate_csrf_token(
expected=current_auth.session.csrf_token, actual=csrf_token
):
revoke_session(session, auth_session=current_auth.session)
logger.info("Revoked authenticated session for user '%s'", current_auth.user.username)
else:
logger.warning("Rejected logout request due to missing session or invalid CSRF token")
response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(settings.auth_session_cookie_name, path="/")
return response
def _render_login_error(
request: Request,
*,
settings: Settings,
status_code: int,
error_message: str,
) -> HTMLResponse:
csrf_token = issue_login_csrf_token()
response = templates.TemplateResponse(
request,
"login.html",
{
"app_name": settings.app_name,
"app_env": settings.app_env,
"csrf_token": csrf_token,
"error_message": error_message,
},
status_code=status_code,
)
_set_login_csrf_cookie(response, settings=settings, token=csrf_token)
return response
def _set_login_csrf_cookie(response: HTMLResponse, *, settings: Settings, token: str) -> None:
response.set_cookie(
key=LOGIN_CSRF_COOKIE_NAME,
value=token,
max_age=1800,
httponly=True,
secure=settings.auth_cookie_secure,
samesite="lax",
path="/login",
)
def _render_config_page(
request: Request,
*,
settings: Settings,
auth_db_session: Session,
current_auth: AuthenticatedSession,
status_code: int,
password_change_error: str | None,
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"config.html",
{
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": password_change_error,
"config_error": None,
"config_saved": False,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": None,
"ticktick_oauth_error": None,
},
status_code=status_code,
)
-240
View File
@@ -1,240 +0,0 @@
import logging
from pathlib import Path
from fastapi import APIRouter, Depends, Request, status
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_db, get_current_auth_session
from app.services.auth import AuthenticatedSession
from app.services.config_page import (
ConfigSaveError,
build_config_sections,
is_ticktick_oauth_ready,
save_config_updates,
)
from app.services.email import EmailConfigurationError, EmailDeliveryError, is_smtp_ready, send_smtp_test_email
from sqlalchemy.orm import Session
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[2] / "templates"))
router = APIRouter(tags=["pages"])
logger = logging.getLogger(__name__)
def _ticktick_oauth_notice(status_value: str | None) -> tuple[str | None, str | None]:
if status_value == "success":
return "TickTick authorization completed successfully.", None
if status_value == "invalid-state":
return None, "TickTick authorization failed due to invalid OAuth state. Start the flow again."
if status_value == "invalid-callback":
return None, "TickTick authorization callback was missing required parameters."
if status_value == "failed":
return None, "TickTick authorization failed. Check server logs for the provider response and verify TickTick app credentials and redirect URI."
return None, None
def _smtp_test_notice(status_value: str | None) -> tuple[str | None, str | None]:
if status_value == "success":
return "SMTP test email sent successfully.", None
if status_value == "config-error":
return None, "SMTP test failed. Check required SMTP settings before sending a test email."
if status_value == "failed":
return None, "SMTP test failed. Check saved SMTP settings and server reachability."
return None, None
def _build_config_context(
*,
auth_db_session: Session,
settings: Settings,
current_auth: AuthenticatedSession,
config_saved: bool,
config_error: str | None,
password_change_error: str | None,
ticktick_oauth_notice: str | None,
ticktick_oauth_error: str | None,
smtp_test_notice: str | None,
smtp_test_error: str | None,
) -> dict[str, object]:
return {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": password_change_error,
"config_error": config_error,
"config_saved": config_saved,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": ticktick_oauth_notice,
"ticktick_oauth_error": ticktick_oauth_error,
"smtp_test_ready": is_smtp_ready(settings),
"smtp_test_notice": smtp_test_notice,
"smtp_test_error": smtp_test_error,
}
@router.get("/", response_class=HTMLResponse)
def home(
request: Request,
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> RedirectResponse:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
return RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
@router.get("/admin", response_class=HTMLResponse)
def admin_redirect(
request: Request,
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> RedirectResponse:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
return RedirectResponse(url="/config", status_code=status.HTTP_303_SEE_OTHER)
@router.get("/config", response_class=HTMLResponse)
def config_page(
request: Request,
auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
ticktick_oauth_notice, ticktick_oauth_error = _ticktick_oauth_notice(
request.query_params.get("ticktick_oauth")
)
smtp_test_notice, smtp_test_error = _smtp_test_notice(request.query_params.get("smtp_test"))
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=request.query_params.get("saved") == "1",
config_error=None,
password_change_error=None,
ticktick_oauth_notice=ticktick_oauth_notice,
ticktick_oauth_error=ticktick_oauth_error,
smtp_test_notice=smtp_test_notice,
smtp_test_error=smtp_test_error,
)
return templates.TemplateResponse(request, "config.html", context)
@router.post("/config", response_class=HTMLResponse)
async def config_submit(
request: Request,
auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
form = await request.form()
csrf_token = form.get("csrf_token")
if csrf_token != current_auth.session.csrf_token:
logger.warning("Rejected config update due to CSRF validation failure")
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=False,
config_error="invalid config update request",
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error=None,
)
return templates.TemplateResponse(
request,
"config.html",
context,
status_code=status.HTTP_400_BAD_REQUEST,
)
try:
save_config_updates(auth_db_session, dict(form), settings)
except ConfigSaveError:
logger.warning("Rejected config update due to invalid submitted values")
refreshed_settings = get_settings()
context = _build_config_context(
auth_db_session=auth_db_session,
settings=refreshed_settings,
current_auth=current_auth,
config_saved=False,
config_error="invalid config submission",
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error=None,
)
return templates.TemplateResponse(
request,
"config.html",
context,
status_code=status.HTTP_400_BAD_REQUEST,
)
return RedirectResponse(url="/config?saved=1", status_code=status.HTTP_303_SEE_OTHER)
@router.post("/config/smtp/test", response_class=HTMLResponse)
async def smtp_test_submit(
request: Request,
auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
form = await request.form()
csrf_token = form.get("csrf_token")
if csrf_token != current_auth.session.csrf_token:
logger.warning("Rejected SMTP test due to CSRF validation failure")
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=False,
config_error=None,
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error="invalid SMTP test request",
)
return templates.TemplateResponse(
request,
"config.html",
context,
status_code=status.HTTP_400_BAD_REQUEST,
)
try:
send_smtp_test_email(settings)
except EmailConfigurationError as exc:
logger.warning("SMTP test email rejected due to configuration: %s", exc)
return RedirectResponse(
url="/config?smtp_test=config-error",
status_code=status.HTTP_303_SEE_OTHER,
)
except EmailDeliveryError as exc:
logger.warning("SMTP test email failed: %s", exc)
return RedirectResponse(
url="/config?smtp_test=failed",
status_code=status.HTTP_303_SEE_OTHER,
)
return RedirectResponse(
url="/config?smtp_test=success",
status_code=status.HTTP_303_SEE_OTHER,
)
+49 -5
View File
@@ -1,7 +1,10 @@
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
@@ -11,8 +14,7 @@ from app import models # noqa: F401
from app.api.routes.api.config import router as api_config_router
from app.api.routes.api.data import router as api_data_router
from app.api.routes.api.session import router as api_session_router
from app.api.routes.auth import router as auth_router
from app.api.routes import pages, status
from app.api.routes import status
from app.db import get_session_local
from app.api.routes.homeassistant import router as homeassistant_router
from app.api.routes.location import router as location_router
@@ -25,6 +27,17 @@ from app.services.config_page import seed_missing_config_from_bootstrap, sync_ap
from app.services.public_ip import check_public_ipv4_and_notify
from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db
logger = logging.getLogger(__name__)
_REPO_ROOT = Path(__file__).resolve().parents[1]
def _get_spa_dist_dir() -> Path:
env_val = os.environ.get("SPA_DIST_DIR")
if env_val:
return Path(env_val)
return _REPO_ROOT / "frontend" / "dist"
def _run_scheduled_public_ip_check() -> None:
session_local = get_session_local()
@@ -92,8 +105,6 @@ def create_app() -> FastAPI:
app.mount("/static", StaticFiles(directory=static_dir), name="static")
app.include_router(status.router)
app.include_router(auth_router)
app.include_router(pages.router)
app.include_router(api_config_router)
app.include_router(api_data_router)
app.include_router(api_session_router)
@@ -102,6 +113,39 @@ def create_app() -> FastAPI:
app.include_router(poo_router)
app.include_router(public_ip_router)
app.include_router(ticktick_router)
# SPA hosting: mount frontend/dist if it exists and has index.html.
# If the SPA dist is absent (e.g. backend-only CI), skip SPA serving entirely
# so that pytest stays green with only the API routes registered.
spa_dist = _get_spa_dist_dir()
spa_index = spa_dist / "index.html"
if spa_dist.is_dir() and spa_index.is_file():
spa_assets = spa_dist / "assets"
if spa_assets.is_dir():
app.mount("/assets", StaticFiles(directory=spa_assets), name="spa-assets")
# Resolve the dist root once so the containment check is fast and consistent.
_spa_root = spa_dist.resolve()
@app.get("/{full_path:path}", include_in_schema=False)
async def spa_fallback(full_path: str, request: Request) -> FileResponse: # noqa: RUF029
# Explicit 404 for unmatched /api/* — never return index.html for API paths.
if full_path.startswith("api/"):
raise HTTPException(status_code=404, detail="not found")
# Resolve candidate to an absolute path and verify it stays within the SPA
# dist root. Without this check, URL-encoded ".." sequences (e.g. "..%2f")
# bypass Starlette's path parameter handling and allow arbitrary file reads.
candidate = (spa_dist / full_path).resolve()
if candidate.is_file() and candidate.is_relative_to(_spa_root):
return FileResponse(candidate)
# For any path outside the dist root, or for SPA client routes that don't
# correspond to a real file, return index.html so the SPA router handles it.
return FileResponse(spa_index)
else:
logger.warning(
"SPA dist not found at %s — SPA hosting disabled (API-only mode).", spa_dist
)
return app
-16
View File
@@ -1,16 +0,0 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{% block title %}{{ app_name }}{% endblock %}</title>
<link rel="icon" href="data:,">
<link rel="stylesheet" href="/static/styles.css">
</head>
<body>
<main class="shell">
{% block content %}{% endblock %}
</main>
</body>
</html>
-139
View File
@@ -1,139 +0,0 @@
{% extends "base.html" %}
{% block title %}Config · {{ app_name }}{% endblock %}
{% block content %}
<section class="panel">
<p class="eyebrow">Configuration</p>
<h1>Config</h1>
{% if force_password_change %}
<div class="alert">
首次登录后需要先修改密码。完成后再继续长期使用当前配置页面。
</div>
{% endif %}
{% if password_change_error %}
<div class="alert">{{ password_change_error }}</div>
{% endif %}
{% if config_error %}
<div class="alert">{{ config_error }}</div>
{% endif %}
{% if config_saved %}
<div class="notice">config saved to the app database. Some changes may require an app restart.</div>
{% endif %}
{% if ticktick_oauth_error %}
<div class="alert">{{ ticktick_oauth_error }}</div>
{% endif %}
{% if ticktick_oauth_notice %}
<div class="notice">{{ ticktick_oauth_notice }}</div>
{% endif %}
{% if smtp_test_error %}
<div class="alert">{{ smtp_test_error }}</div>
{% endif %}
{% if smtp_test_notice %}
<div class="notice">{{ smtp_test_notice }}</div>
{% endif %}
<div class="meta single-column">
<div>
<dt>当前用户</dt>
<dd>admin</dd>
</div>
</div>
<section class="config-block">
<h2>Change Password</h2>
<form class="auth-form" method="post" action="/config/change-password">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<label>
<span>Current Password</span>
<input type="password" name="current_password" autocomplete="current-password" required>
</label>
<label>
<span>New Password</span>
<input type="password" name="new_password" autocomplete="new-password" required>
</label>
<label>
<span>Confirm New Password</span>
<input type="password" name="confirm_password" autocomplete="new-password" required>
</label>
<button type="submit">修改密码</button>
</form>
</section>
<section class="config-block">
<h2>Config</h2>
<form class="config-form" method="post" action="/config">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
{% for section in config_sections %}
<fieldset class="config-section">
<legend>{{ section.name }}</legend>
{% for field in section.fields %}
<label>
<span>{{ field.label }}</span>
{% if field.secret %}
<input type="{{ field.input_type }}" name="{{ field.env_name }}" value="" placeholder="leave blank to keep current value">
<small>{% if field.configured %}configured{% else %}not configured{% endif %}</small>
{% else %}
<input type="{{ field.input_type }}" name="{{ field.env_name }}" value="{{ field.value }}">
{% endif %}
</label>
{% endfor %}
{% if section.name == "TickTick" %}
<div class="integration-action-row">
<div>
<p class="integration-action-title">TickTick OAuth</p>
<p class="integration-action-copy">Redirect URI: {{ ticktick_redirect_uri or "configure APP_HOSTNAME to generate the callback URI" }}</p>
{% if ticktick_oauth_ready %}
<p class="integration-action-copy">Use the saved TickTick client settings to start the authorization flow.</p>
{% else %}
<p class="integration-action-copy">Fill in App Hostname, TickTick Client ID, and TickTick Client Secret before starting OAuth.</p>
{% endif %}
</div>
{% if ticktick_oauth_ready %}
<a class="button-link" href="/ticktick/auth/start">Authorize TickTick</a>
{% else %}
<span class="button-link disabled" aria-disabled="true">Authorize TickTick</span>
{% endif %}
</div>
{% endif %}
{% if section.name == "SMTP" %}
<div class="integration-action-row">
<div>
<p class="integration-action-title">SMTP Test Email</p>
<p class="integration-action-copy">Save the SMTP settings first, then send a simple plaintext test email to the configured recipient.</p>
</div>
{% if smtp_test_ready %}
<button type="submit" formaction="/config/smtp/test" formmethod="post">Send SMTP Test</button>
{% else %}
<span class="button-link disabled" aria-disabled="true">Send SMTP Test</span>
{% endif %}
</div>
{% endif %}
</fieldset>
{% endfor %}
<button type="submit">Save Config</button>
</form>
</section>
<form class="logout-form" method="post" action="/logout">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button type="submit">登出</button>
</form>
</section>
{% endblock %}
-36
View File
@@ -1,36 +0,0 @@
{% extends "base.html" %}
{% block title %}{{ app_name }}{% endblock %}
{% block content %}
<section class="panel">
<p class="eyebrow">Python Rewrite Skeleton</p>
<h1>{{ app_name }}</h1>
<p class="lead">
这是当前 Go 后端的 Python 重构基础骨架。此阶段仅提供应用入口、配置、数据库、
测试、模板和容器化基础,不包含业务逻辑迁移。
</p>
<dl class="meta">
<div>
<dt>运行环境</dt>
<dd>{{ app_env }}</dd>
</div>
<div>
<dt>健康检查</dt>
<dd><a href="/status">/status</a></dd>
</div>
<div>
<dt>OpenAPI</dt>
<dd><a href="/docs">/docs</a></dd>
</div>
<div>
<dt>登录</dt>
<dd><a href="/login">/login</a></dd>
</div>
<div>
<dt>Notion</dt>
<dd>{{ notion_status }}</dd>
</div>
</dl>
</section>
{% endblock %}
-33
View File
@@ -1,33 +0,0 @@
{% extends "base.html" %}
{% block title %}登录 · {{ app_name }}{% endblock %}
{% block content %}
<section class="panel auth-panel">
<p class="eyebrow">Authentication</p>
<h1>登录</h1>
<p class="lead">
登录成功后会进入受保护的 config 页面。
</p>
{% if error_message %}
<div class="alert">{{ error_message }}</div>
{% endif %}
<form class="auth-form" method="post" action="/login">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<label>
<span>Username</span>
<input type="text" name="username" autocomplete="username" required>
</label>
<label>
<span>Password</span>
<input type="password" name="password" autocomplete="current-password" required>
</label>
<button type="submit">登录</button>
</form>
</section>
{% endblock %}
-365
View File
@@ -21,127 +21,6 @@ export interface paths {
patch?: never;
trace?: never;
};
"/login": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Login Page */
get: operations["login_page_login_get"];
put?: never;
/** Login Submit */
post: operations["login_submit_login_post"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/config/change-password": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Change Password Submit */
post: operations["change_password_submit_config_change_password_post"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/logout": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Logout */
post: operations["logout_logout_post"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Home */
get: operations["home__get"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/admin": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Admin Redirect */
get: operations["admin_redirect_admin_get"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/config": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Config Page */
get: operations["config_page_config_get"];
put?: never;
/** Config Submit */
post: operations["config_submit_config_post"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/config/smtp/test": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Smtp Test Submit */
post: operations["smtp_test_submit_config_smtp_test_post"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/api/config": {
parameters: {
query?: never;
@@ -544,31 +423,6 @@ export interface paths {
export type webhooks = Record<string, never>;
export interface components {
schemas: {
/** Body_change_password_submit_config_change_password_post */
Body_change_password_submit_config_change_password_post: {
/** Current Password */
current_password: string;
/** New Password */
new_password: string;
/** Confirm Password */
confirm_password: string;
/** Csrf Token */
csrf_token: string;
};
/** Body_login_submit_login_post */
Body_login_submit_login_post: {
/** Username */
username: string;
/** Password */
password: string;
/** Csrf Token */
csrf_token: string;
};
/** Body_logout_logout_post */
Body_logout_logout_post: {
/** Csrf Token */
csrf_token: string;
};
/** ConfigField */
ConfigField: {
/** Env Name */
@@ -831,225 +685,6 @@ export interface operations {
};
};
};
login_page_login_get: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
};
};
login_submit_login_post: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/x-www-form-urlencoded": components["schemas"]["Body_login_submit_login_post"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
change_password_submit_config_change_password_post: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/x-www-form-urlencoded": components["schemas"]["Body_change_password_submit_config_change_password_post"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
logout_logout_post: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/x-www-form-urlencoded": components["schemas"]["Body_logout_logout_post"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
home__get: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
};
};
admin_redirect_admin_get: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
};
};
config_page_config_get: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
};
};
config_submit_config_post: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
};
};
smtp_test_submit_config_smtp_test_post: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"text/html": string;
};
};
};
};
get_config_api_config_get: {
parameters: {
query?: never;
-307
View File
@@ -27,249 +27,6 @@
}
}
},
"/login": {
"get": {
"tags": [
"auth"
],
"summary": "Login Page",
"operationId": "login_page_login_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
},
"post": {
"tags": [
"auth"
],
"summary": "Login Submit",
"operationId": "login_submit_login_post",
"requestBody": {
"content": {
"application/x-www-form-urlencoded": {
"schema": {
"$ref": "#/components/schemas/Body_login_submit_login_post"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/config/change-password": {
"post": {
"tags": [
"auth"
],
"summary": "Change Password Submit",
"operationId": "change_password_submit_config_change_password_post",
"requestBody": {
"content": {
"application/x-www-form-urlencoded": {
"schema": {
"$ref": "#/components/schemas/Body_change_password_submit_config_change_password_post"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/logout": {
"post": {
"tags": [
"auth"
],
"summary": "Logout",
"operationId": "logout_logout_post",
"requestBody": {
"content": {
"application/x-www-form-urlencoded": {
"schema": {
"$ref": "#/components/schemas/Body_logout_logout_post"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/": {
"get": {
"tags": [
"pages"
],
"summary": "Home",
"operationId": "home__get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
}
},
"/admin": {
"get": {
"tags": [
"pages"
],
"summary": "Admin Redirect",
"operationId": "admin_redirect_admin_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
}
},
"/config": {
"get": {
"tags": [
"pages"
],
"summary": "Config Page",
"operationId": "config_page_config_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
},
"post": {
"tags": [
"pages"
],
"summary": "Config Submit",
"operationId": "config_submit_config_post",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
}
},
"/config/smtp/test": {
"post": {
"tags": [
"pages"
],
"summary": "Smtp Test Submit",
"operationId": "smtp_test_submit_config_smtp_test_post",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
}
},
"/api/config": {
"get": {
"tags": [
@@ -1176,70 +933,6 @@
},
"components": {
"schemas": {
"Body_change_password_submit_config_change_password_post": {
"properties": {
"current_password": {
"type": "string",
"title": "Current Password"
},
"new_password": {
"type": "string",
"title": "New Password"
},
"confirm_password": {
"type": "string",
"title": "Confirm Password"
},
"csrf_token": {
"type": "string",
"title": "Csrf Token"
}
},
"type": "object",
"required": [
"current_password",
"new_password",
"confirm_password",
"csrf_token"
],
"title": "Body_change_password_submit_config_change_password_post"
},
"Body_login_submit_login_post": {
"properties": {
"username": {
"type": "string",
"title": "Username"
},
"password": {
"type": "string",
"title": "Password"
},
"csrf_token": {
"type": "string",
"title": "Csrf Token"
}
},
"type": "object",
"required": [
"username",
"password",
"csrf_token"
],
"title": "Body_login_submit_login_post"
},
"Body_logout_logout_post": {
"properties": {
"csrf_token": {
"type": "string",
"title": "Csrf Token"
}
},
"type": "object",
"required": [
"csrf_token"
],
"title": "Body_logout_logout_post"
},
"ConfigField": {
"properties": {
"env_name": {
-197
View File
@@ -18,156 +18,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/StatusResponse'
/login:
get:
tags:
- auth
summary: Login Page
operationId: login_page_login_get
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
post:
tags:
- auth
summary: Login Submit
operationId: login_submit_login_post
requestBody:
content:
application/x-www-form-urlencoded:
schema:
$ref: '#/components/schemas/Body_login_submit_login_post'
required: true
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/config/change-password:
post:
tags:
- auth
summary: Change Password Submit
operationId: change_password_submit_config_change_password_post
requestBody:
content:
application/x-www-form-urlencoded:
schema:
$ref: '#/components/schemas/Body_change_password_submit_config_change_password_post'
required: true
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/logout:
post:
tags:
- auth
summary: Logout
operationId: logout_logout_post
requestBody:
content:
application/x-www-form-urlencoded:
schema:
$ref: '#/components/schemas/Body_logout_logout_post'
required: true
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/:
get:
tags:
- pages
summary: Home
operationId: home__get
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
/admin:
get:
tags:
- pages
summary: Admin Redirect
operationId: admin_redirect_admin_get
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
/config:
get:
tags:
- pages
summary: Config Page
operationId: config_page_config_get
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
post:
tags:
- pages
summary: Config Submit
operationId: config_submit_config_post
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
/config/smtp/test:
post:
tags:
- pages
summary: Smtp Test Submit
operationId: smtp_test_submit_config_smtp_test_post
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
/api/config:
get:
tags:
@@ -812,53 +662,6 @@ paths:
schema: {}
components:
schemas:
Body_change_password_submit_config_change_password_post:
properties:
current_password:
type: string
title: Current Password
new_password:
type: string
title: New Password
confirm_password:
type: string
title: Confirm Password
csrf_token:
type: string
title: Csrf Token
type: object
required:
- current_password
- new_password
- confirm_password
- csrf_token
title: Body_change_password_submit_config_change_password_post
Body_login_submit_login_post:
properties:
username:
type: string
title: Username
password:
type: string
title: Password
csrf_token:
type: string
title: Csrf Token
type: object
required:
- username
- password
- csrf_token
title: Body_login_submit_login_post
Body_logout_logout_post:
properties:
csrf_token:
type: string
title: Csrf Token
type: object
required:
- csrf_token
title: Body_logout_logout_post
ConfigField:
properties:
env_name:
+4 -18
View File
@@ -2,7 +2,6 @@
Tests for M2-T05: POST /api/config/smtp/test."""
from __future__ import annotations
import re
import sqlite3
from unittest.mock import patch
@@ -17,26 +16,13 @@ from app.services.email import EmailConfigurationError, EmailDeliveryError
# Helpers
# ---------------------------------------------------------------------------
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None, "csrf_token not found in HTML"
return match.group(1)
def _login(client: TestClient) -> None:
"""Log in as admin/test-password using the Jinja login form."""
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
"""Log in as admin/test-password using the JSON API."""
resp = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
"/api/auth/login",
json={"username": "admin", "password": "test-password"},
)
assert resp.status_code == 303, f"Login failed: {resp.status_code}"
assert resp.status_code == 200, f"Login failed: {resp.status_code}"
def _stringify(value) -> str:
+2 -22
View File
@@ -1,8 +1,6 @@
"""Tests for M2-T02: GET /api/session, POST /api/auth/login, /logout, /password."""
from __future__ import annotations
import re
from fastapi.testclient import TestClient
@@ -11,24 +9,6 @@ from fastapi.testclient import TestClient
# ---------------------------------------------------------------------------
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None, "csrf_token not found in HTML"
return match.group(1)
def _jinja_login(client: TestClient) -> None:
"""Log in via the existing Jinja form so the client has a session cookie."""
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
resp = client.post(
"/login",
data={"username": "admin", "password": "test-password", "csrf_token": csrf_token},
follow_redirects=False,
)
assert resp.status_code == 303, f"Jinja login failed: {resp.status_code}"
def _api_login(client: TestClient, *, username: str = "admin", password: str = "test-password"):
"""Log in via POST /api/auth/login and return the response."""
return client.post(
@@ -53,7 +33,7 @@ def test_get_session_unauthenticated_returns_401(client: TestClient) -> None:
def test_get_session_authenticated_returns_user_and_csrf(client: TestClient) -> None:
_jinja_login(client)
_api_login(client)
response = client.get("/api/session")
@@ -68,7 +48,7 @@ def test_get_session_authenticated_returns_user_and_csrf(client: TestClient) ->
def test_get_session_does_not_leak_password(client: TestClient) -> None:
_jinja_login(client)
_api_login(client)
response = client.get("/api/session")
body_str = str(response.json())
assert "test-password" not in body_str
+4 -2
View File
@@ -25,9 +25,11 @@ def _prepare_app_db(tmp_path) -> str:
def test_app_starts(client: TestClient) -> None:
# With SPA enabled, GET / is served by the catch-all and returns index.html (200).
# Without SPA (e.g. SPA_DIST_DIR points to empty dir), it returns 404.
# Either way the app started successfully — just assert it is not a server error.
response = client.get("/", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
assert response.status_code in (200, 404)
def test_status_endpoint(client: TestClient) -> None:
+4 -264
View File
@@ -1,265 +1,5 @@
import re
import sqlite3
"""Jinja-based auth tests removed in M2-T11 (Jinja routes deleted).
from fastapi.testclient import TestClient
from app.db import reset_db_caches
from app.config import get_settings
from app.main import create_app
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _stringify_for_form(value) -> str:
if value is None:
return ""
if isinstance(value, bool):
return str(value).lower()
return str(value)
def test_unauthenticated_config_redirects_to_login(client: TestClient) -> None:
response = client.get("/config", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
def test_login_success_sets_session_cookie_and_allows_admin_access(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
assert response.headers["location"] == "/config"
set_cookie_header = response.headers["set-cookie"].lower()
assert "home_automation_session=" in set_cookie_header
assert "httponly" in set_cookie_header
assert "samesite=lax" in set_cookie_header
config_response = client.get("/config")
assert config_response.status_code == 200
assert "首次登录后需要先修改密码" in config_response.text
assert "Current Password" in config_response.text
assert "New Password" in config_response.text
assert "Save Config" in config_response.text
assert "当前用户" in config_response.text
assert "Fill in App Hostname, TickTick Client ID, and TickTick Client Secret before starting OAuth." in config_response.text
assert 'aria-disabled="true">Authorize TickTick<' in config_response.text
def test_login_failure_returns_generic_error(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "wrong-password",
"csrf_token": csrf_token,
},
)
assert response.status_code == 401
assert "invalid username or password" in response.text
assert "wrong-password" not in response.text
def test_logout_revokes_session(client: TestClient) -> None:
login_page = client.get("/login")
login_csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": login_csrf_token,
},
)
config_page = client.get("/config")
logout_csrf_token = _extract_csrf_token(config_page.text)
logout_response = client.post(
"/logout",
data={"csrf_token": logout_csrf_token},
follow_redirects=False,
)
assert logout_response.status_code == 303
assert logout_response.headers["location"] == "/login"
config_after_logout = client.get("/config", follow_redirects=False)
assert config_after_logout.status_code == 303
assert config_after_logout.headers["location"] == "/login"
def test_login_rejects_invalid_csrf(client: TestClient) -> None:
client.get("/login")
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": "wrong-csrf",
},
)
assert response.status_code == 400
assert "invalid login request" in response.text
def test_legacy_admin_route_redirects_to_config_when_authenticated(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
response = client.get("/admin", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config"
def test_config_page_update_persists_to_database(
client: TestClient, test_database_urls
) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
settings = get_settings()
form_data = {"csrf_token": config_csrf_token}
from app.services.config_page import CONFIG_FIELDS
for field in CONFIG_FIELDS:
if field.secret:
form_data[field.env_name] = ""
else:
form_data[field.env_name] = _stringify_for_form(getattr(settings, field.setting_attr))
form_data["APP_NAME"] = "Updated Home Automation"
form_data["HOME_ASSISTANT_AUTH_TOKEN"] = "new-token"
response = client.post("/config", data=form_data, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?saved=1"
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows["APP_NAME"] == "Updated Home Automation"
assert rows["HOME_ASSISTANT_AUTH_TOKEN"] == "new-token"
assert "AUTH_BOOTSTRAP_USERNAME" not in rows
def test_config_page_shows_ticktick_oauth_link_when_ticktick_is_configured(
auth_database,
monkeypatch,
) -> None:
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.setenv("APP_HOSTNAME", "localhost:8000")
monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id")
monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret")
get_settings.cache_clear()
reset_db_caches()
with TestClient(create_app()) as client:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
config_response = client.get("/config")
assert config_response.status_code == 200
assert "Use the saved TickTick client settings to start the authorization flow." in config_response.text
assert "Redirect URI: https://localhost:8000/ticktick/auth/code" in config_response.text
assert 'href="/ticktick/auth/start">Authorize TickTick<' in config_response.text
def test_config_page_shows_ticktick_oauth_success_notice(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
response = client.get("/config?ticktick_oauth=success")
assert response.status_code == 200
assert "TickTick authorization completed successfully." in response.text
def test_config_page_shows_ticktick_oauth_failure_notice(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
response = client.get("/config?ticktick_oauth=failed")
assert response.status_code == 200
assert "TickTick authorization failed. Check server logs for the provider response and verify TickTick app credentials and redirect URI." in response.text
Equivalent JSON-API coverage lives in test_api_session.py and test_api_config.py.
This file is intentionally left with no test functions so pytest does not error.
"""
+3 -17
View File
@@ -1,5 +1,4 @@
from datetime import UTC, datetime
import re
import sqlite3
from fastapi.testclient import TestClient
@@ -17,25 +16,12 @@ def _make_session(database_url: str) -> Session:
return session_local()
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
"/api/auth/login",
json={"username": "admin", "password": "test-password"},
)
assert response.status_code == 303
assert response.status_code == 200
def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None:
+6 -181
View File
@@ -1,8 +1,10 @@
import re
import sqlite3
import smtplib
"""SMTP service-layer unit tests.
from fastapi.testclient import TestClient
Jinja-based HTTP flow tests (POST /config, POST /config/smtp/test via form) were
removed in M2-T11 when the Jinja routes were deleted. HTTP-level SMTP test
endpoint coverage lives in test_api_config.py.
"""
import smtplib
from app.config import Settings
from app.services.email import (
@@ -14,27 +16,6 @@ from app.services.email import (
)
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
def _smtp_settings(**overrides) -> Settings:
payload = {
"app_env": "development",
@@ -237,159 +218,3 @@ def test_send_public_ip_changed_email_contains_expected_english_content(monkeypa
assert "Current IP: 198.51.100.25" in sent["body"]
assert "Detected at: 2026-04-29 10:00:00 UTC" in sent["body"]
assert "update the trusted IP manually" in sent["body"]
def test_config_update_does_not_clear_existing_smtp_password(
client: TestClient, test_database_urls
) -> None:
_login(client)
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
response = client.post(
"/config",
data={
"csrf_token": config_csrf_token,
"APP_NAME": "SMTP Config Test",
"APP_ENV": "development",
"APP_DEBUG": "true",
"APP_HOSTNAME": "localhost:8000",
"SMTP_ENABLED": "true",
"SMTP_HOST": "smtp.example.com",
"SMTP_PORT": "587",
"SMTP_USERNAME": "smtp-user",
"SMTP_PASSWORD": "persist-me",
"SMTP_FROM_ADDRESS": "sender@example.com",
"SMTP_TO_ADDRESS": "recipient@example.com",
"SMTP_USE_STARTTLS": "true",
"AUTH_SESSION_COOKIE_NAME": "home_automation_session",
"AUTH_SESSION_TTL_HOURS": "12",
"AUTH_COOKIE_SECURE_OVERRIDE": "false",
"POO_WEBHOOK_ID": "",
"POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status",
"POO_SENSOR_FRIENDLY_NAME": "Poo Status",
"TICKTICK_CLIENT_ID": "",
"TICKTICK_CLIENT_SECRET": "",
"TICKTICK_TOKEN": "",
"HOME_ASSISTANT_BASE_URL": "",
"HOME_ASSISTANT_AUTH_TOKEN": "",
"HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0",
"HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "",
},
follow_redirects=False,
)
assert response.status_code == 303
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
response = client.post(
"/config",
data={
"csrf_token": config_csrf_token,
"APP_NAME": "SMTP Config Updated",
"APP_ENV": "development",
"APP_DEBUG": "true",
"APP_HOSTNAME": "localhost:8000",
"SMTP_ENABLED": "true",
"SMTP_HOST": "smtp.example.com",
"SMTP_PORT": "587",
"SMTP_USERNAME": "smtp-user",
"SMTP_PASSWORD": "",
"SMTP_FROM_ADDRESS": "sender@example.com",
"SMTP_TO_ADDRESS": "recipient@example.com",
"SMTP_USE_STARTTLS": "true",
"AUTH_SESSION_COOKIE_NAME": "home_automation_session",
"AUTH_SESSION_TTL_HOURS": "12",
"AUTH_COOKIE_SECURE_OVERRIDE": "false",
"POO_WEBHOOK_ID": "",
"POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status",
"POO_SENSOR_FRIENDLY_NAME": "Poo Status",
"TICKTICK_CLIENT_ID": "",
"TICKTICK_CLIENT_SECRET": "",
"TICKTICK_TOKEN": "",
"HOME_ASSISTANT_BASE_URL": "",
"HOME_ASSISTANT_AUTH_TOKEN": "",
"HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0",
"HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "",
},
follow_redirects=False,
)
assert response.status_code == 303
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows["SMTP_PASSWORD"] == "persist-me"
assert rows["APP_NAME"] == "SMTP Config Updated"
def test_smtp_test_endpoint_requires_authentication(client: TestClient) -> None:
response = client.post("/config/smtp/test", data={"csrf_token": "ignored"}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
def test_smtp_test_endpoint_success_and_failure_do_not_expose_password(
client: TestClient, monkeypatch
) -> None:
from app.api.routes import pages
_login(client)
config_page = client.get("/config")
csrf_token = _extract_csrf_token(config_page.text)
monkeypatch.setattr(pages, "send_smtp_test_email", lambda settings: None)
response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?smtp_test=success"
follow_up = client.get(response.headers["location"])
assert follow_up.status_code == 200
assert "SMTP test email sent successfully." in follow_up.text
assert "super-secret-password" not in follow_up.text
monkeypatch.setattr(
pages,
"send_smtp_test_email",
lambda settings: (_ for _ in ()).throw(EmailDeliveryError("smtp auth failed for [redacted]")),
)
response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?smtp_test=failed"
follow_up = client.get(response.headers["location"])
assert follow_up.status_code == 200
assert "SMTP test failed. Check saved SMTP settings and server reachability." in follow_up.text
assert "super-secret-password" not in follow_up.text
def test_config_page_renders_smtp_test_button_with_formaction(
client: TestClient, test_database_urls
) -> None:
_login(client)
conn = sqlite3.connect(test_database_urls["app_path"])
try:
conn.executemany(
"INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at",
[
("SMTP_ENABLED", "true"),
("SMTP_HOST", "smtp.example.com"),
("SMTP_PORT", "587"),
("SMTP_FROM_ADDRESS", "sender@example.com"),
("SMTP_TO_ADDRESS", "recipient@example.com"),
],
)
conn.commit()
finally:
conn.close()
response = client.get("/config")
assert response.status_code == 200
assert 'formaction="/config/smtp/test"' in response.text
+243
View File
@@ -0,0 +1,243 @@
"""Tests for M2-T11: SPA hosting + fallback behavior in app/main.py.
Uses SPA_DIST_DIR env var to point at a temporary directory containing a fake
index.html and an asset file, so tests are hermetic and don't depend on the
real frontend/dist build.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from app.config import get_settings
from app.db import reset_db_caches
from app.main import create_app
@pytest.fixture
def spa_dist(tmp_path: Path) -> Path:
"""Create a minimal fake SPA dist directory.
Layout:
tmp_path/
secret.txt ← OUTSIDE dist — must never be served
fake_dist/
index.html
assets/
main.js
"""
# A secret file placed OUTSIDE the dist dir — used by traversal tests.
(tmp_path / "secret.txt").write_text("TOP_SECRET_SENTINEL", encoding="utf-8")
dist = tmp_path / "fake_dist"
dist.mkdir()
(dist / "index.html").write_text(
"<!DOCTYPE html><html><body>SPA INDEX</body></html>", encoding="utf-8"
)
assets = dist / "assets"
assets.mkdir()
(assets / "main.js").write_text("console.log('app');", encoding="utf-8")
return dist
@pytest.fixture
def spa_client(spa_dist: Path, auth_database, monkeypatch: pytest.MonkeyPatch) -> TestClient:
"""TestClient with a fresh app wired to the fake SPA dist."""
monkeypatch.setenv("SPA_DIST_DIR", str(spa_dist))
get_settings.cache_clear()
reset_db_caches()
app = create_app()
with TestClient(app) as client:
yield client
get_settings.cache_clear()
reset_db_caches()
# ---------------------------------------------------------------------------
# SPA fallback — client routes served as index.html
# ---------------------------------------------------------------------------
def test_spa_root_returns_index_html(spa_client: TestClient) -> None:
"""GET / returns the SPA index.html (200)."""
response = spa_client.get("/", follow_redirects=False)
assert response.status_code == 200
assert "SPA INDEX" in response.text
def test_spa_config_route_returns_index_html(spa_client: TestClient) -> None:
"""/config is a client-side route; the fallback must serve index.html."""
response = spa_client.get("/config", follow_redirects=False)
assert response.status_code == 200
assert "SPA INDEX" in response.text
def test_spa_records_route_returns_index_html(spa_client: TestClient) -> None:
"""/records is a client-side route; the fallback must serve index.html."""
response = spa_client.get("/records", follow_redirects=False)
assert response.status_code == 200
assert "SPA INDEX" in response.text
def test_spa_deep_link_returns_index_html(spa_client: TestClient) -> None:
"""/some/deep/path that doesn't exist on disk returns index.html (deep-link support)."""
response = spa_client.get("/some/deep/path", follow_redirects=False)
assert response.status_code == 200
assert "SPA INDEX" in response.text
# ---------------------------------------------------------------------------
# SPA asset serving
# ---------------------------------------------------------------------------
def test_spa_asset_is_served(spa_client: TestClient) -> None:
"""/assets/main.js must be served directly from the dist/assets directory."""
response = spa_client.get("/assets/main.js")
assert response.status_code == 200
assert "console.log" in response.text
# ---------------------------------------------------------------------------
# API routes not swallowed by fallback
# ---------------------------------------------------------------------------
def test_unauthenticated_api_session_returns_401_not_index(spa_client: TestClient) -> None:
"""/api/session without a session cookie must return 401, not index.html."""
response = spa_client.get("/api/session")
assert response.status_code == 401
assert "SPA INDEX" not in response.text
def test_unknown_api_path_returns_404_not_index(spa_client: TestClient) -> None:
"""/api/does-not-exist must return 404, not index.html."""
response = spa_client.get("/api/does-not-exist")
assert response.status_code == 404
assert "SPA INDEX" not in response.text
def test_api_typo_returns_404_not_index(spa_client: TestClient) -> None:
"""/api/typo returns 404 (the fallback must not serve index.html for /api/*)."""
response = spa_client.get("/api/typo")
assert response.status_code == 404
assert "SPA INDEX" not in response.text
# ---------------------------------------------------------------------------
# FastAPI built-in endpoints not swallowed by fallback
# ---------------------------------------------------------------------------
def test_openapi_json_is_served(spa_client: TestClient) -> None:
"""/openapi.json must be served by FastAPI, not the SPA fallback."""
response = spa_client.get("/openapi.json")
assert response.status_code == 200
body = response.json()
assert "openapi" in body
def test_docs_endpoint_is_served(spa_client: TestClient) -> None:
"""/docs must be served by FastAPI Swagger UI, not index.html."""
response = spa_client.get("/docs")
assert response.status_code == 200
assert "SPA INDEX" not in response.text
def test_status_endpoint_is_served(spa_client: TestClient) -> None:
"""/status must remain reachable and return JSON."""
response = spa_client.get("/status")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# ---------------------------------------------------------------------------
# Path-traversal containment — security regression tests
# ---------------------------------------------------------------------------
def test_path_traversal_encoded_dotdot_slash_returns_index_not_secret(
spa_client: TestClient, spa_dist: Path
) -> None:
"""GET /..%2fsecret.txt must NOT return the secret file outside the dist dir.
Starlette URL-decodes {full_path:path} but does not normalise it, so an
encoded '../' can escape the dist root without the containment guard.
The guarded implementation resolves the candidate and checks is_relative_to;
a path that escapes the root falls back to index.html instead.
"""
response = spa_client.get("/..%2fsecret.txt", follow_redirects=False)
# Must not expose the secret content.
assert "TOP_SECRET_SENTINEL" not in response.text
# Should be a successful SPA index response (not a server error).
assert response.status_code == 200
assert "SPA INDEX" in response.text
def test_path_traversal_pct_encoded_dotdot_returns_index_not_secret(
spa_client: TestClient, spa_dist: Path
) -> None:
"""GET /%2e%2e%2fsecret.txt must NOT expose the file outside dist.
Covers the %2e%2e encoding variant of '..'.
"""
response = spa_client.get("/%2e%2e%2fsecret.txt", follow_redirects=False)
assert "TOP_SECRET_SENTINEL" not in response.text
assert response.status_code == 200
assert "SPA INDEX" in response.text
def test_path_traversal_nested_encoded_dotdot_returns_index_not_secret(
spa_client: TestClient, spa_dist: Path
) -> None:
"""GET /fake_dist/..%2f..%2fsecret.txt (deeper traversal) must not leak."""
response = spa_client.get("/fake_dist/..%2f..%2fsecret.txt", follow_redirects=False)
assert "TOP_SECRET_SENTINEL" not in response.text
assert response.status_code == 200
assert "SPA INDEX" in response.text
def test_legit_nested_asset_inside_dist_is_served(
spa_client: TestClient, spa_dist: Path
) -> None:
"""A real file inside the dist dir is still served correctly after the fix.
Place a nested asset directly inside dist (not under /assets) and confirm
the catch-all serves it.
"""
nested = spa_dist / "nested" / "chunk.js"
nested.parent.mkdir()
nested.write_text("// nested chunk", encoding="utf-8")
response = spa_client.get("/nested/chunk.js", follow_redirects=False)
assert response.status_code == 200
assert "nested chunk" in response.text
# ---------------------------------------------------------------------------
# SPA disabled when dist dir is missing
# ---------------------------------------------------------------------------
def test_spa_disabled_when_dist_missing(
tmp_path: Path, auth_database, monkeypatch: pytest.MonkeyPatch
) -> None:
"""When SPA_DIST_DIR points to a non-existent directory, the app still starts
and API routes work normally — the SPA fallback is simply absent."""
empty = tmp_path / "no_dist_here"
monkeypatch.setenv("SPA_DIST_DIR", str(empty))
get_settings.cache_clear()
reset_db_caches()
app = create_app()
with TestClient(app) as client:
response = client.get("/status")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# API still returns 401 for unauthenticated access
api_response = client.get("/api/session")
assert api_response.status_code == 401
get_settings.cache_clear()
reset_db_caches()
+4 -18
View File
@@ -49,14 +49,6 @@ def _configured_settings(**overrides) -> Settings:
return Settings(_env_file=None, **payload)
def _extract_csrf_token(html: str) -> str:
import re
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def test_build_authorization_url_contains_expected_query(monkeypatch: pytest.MonkeyPatch) -> None:
client = TickTickClient(settings=_configured_settings())
monkeypatch.setattr("app.integrations.ticktick.secrets.token_hex", lambda _: "state-123")
@@ -263,17 +255,11 @@ def test_ticktick_auth_start_redirects_authenticated_user(
monkeypatch.setattr("app.integrations.ticktick.secrets.token_hex", lambda _: "state-redirect")
with TestClient(create_app()) as client:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
resp = client.post(
"/api/auth/login",
json={"username": "admin", "password": "test-password"},
)
assert resp.status_code == 200, f"API login failed: {resp.status_code}"
response = client.get("/ticktick/auth/start", follow_redirects=False)