import re import sqlite3 from pathlib import Path from fastapi.testclient import TestClient from app.auth_db import reset_auth_db_caches from app.config import get_settings from app.main import create_app def _extract_csrf_token(html: str) -> str: match = re.search(r'name="csrf_token" value="([^"]+)"', html) assert match is not None return match.group(1) def _stringify_for_form(value) -> str: if value is None: return "" if isinstance(value, bool): return str(value).lower() return str(value) def test_unauthenticated_config_redirects_to_login(client: TestClient) -> None: response = client.get("/config", follow_redirects=False) assert response.status_code == 303 assert response.headers["location"] == "/login" def test_login_success_sets_session_cookie_and_allows_admin_access(client: TestClient) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) response = client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) assert response.status_code == 303 assert response.headers["location"] == "/config" set_cookie_header = response.headers["set-cookie"].lower() assert "home_automation_session=" in set_cookie_header assert "httponly" in set_cookie_header assert "samesite=lax" in set_cookie_header config_response = client.get("/config") assert config_response.status_code == 200 assert "首次登录后需要先修改密码" in config_response.text assert "Current Password" in config_response.text assert "New Password" in config_response.text assert "Save Config" in config_response.text assert "当前用户" in config_response.text assert "Fill in App Hostname, TickTick Client ID, and TickTick Client Secret before starting OAuth." in config_response.text assert 'aria-disabled="true">Authorize TickTick<' in config_response.text def test_login_failure_returns_generic_error(client: TestClient) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) response = client.post( "/login", data={ "username": "admin", "password": "wrong-password", "csrf_token": csrf_token, }, ) assert response.status_code == 401 assert "invalid username or password" in response.text assert "wrong-password" not in response.text def test_logout_revokes_session(client: TestClient) -> None: login_page = client.get("/login") login_csrf_token = _extract_csrf_token(login_page.text) client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": login_csrf_token, }, ) config_page = client.get("/config") logout_csrf_token = _extract_csrf_token(config_page.text) logout_response = client.post( "/logout", data={"csrf_token": logout_csrf_token}, follow_redirects=False, ) assert logout_response.status_code == 303 assert logout_response.headers["location"] == "/login" config_after_logout = client.get("/config", follow_redirects=False) assert config_after_logout.status_code == 303 assert config_after_logout.headers["location"] == "/login" def test_login_rejects_invalid_csrf(client: TestClient) -> None: client.get("/login") response = client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": "wrong-csrf", }, ) assert response.status_code == 400 assert "invalid login request" in response.text def test_legacy_admin_route_redirects_to_config_when_authenticated(client: TestClient) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) response = client.get("/admin", follow_redirects=False) assert response.status_code == 303 assert response.headers["location"] == "/config" def test_config_page_update_persists_to_database( client: TestClient, test_database_urls ) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) config_page = client.get("/config") config_csrf_token = _extract_csrf_token(config_page.text) settings = get_settings() form_data = {"csrf_token": config_csrf_token} from app.services.config_page import CONFIG_FIELDS for field in CONFIG_FIELDS: if field.secret: form_data[field.env_name] = "" else: form_data[field.env_name] = _stringify_for_form(getattr(settings, field.setting_attr)) form_data["APP_NAME"] = "Updated Home Automation" form_data["HOME_ASSISTANT_AUTH_TOKEN"] = "new-token" response = client.post("/config", data=form_data, follow_redirects=False) assert response.status_code == 303 assert response.headers["location"] == "/config?saved=1" conn = sqlite3.connect(test_database_urls["app_path"]) try: rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall()) finally: conn.close() assert rows["APP_NAME"] == "Updated Home Automation" assert rows["HOME_ASSISTANT_AUTH_TOKEN"] == "new-token" assert "AUTH_BOOTSTRAP_USERNAME" not in rows def test_config_page_shows_ticktick_oauth_link_when_ticktick_is_configured( test_database_urls, ready_location_database, ready_poo_database, auth_database, monkeypatch, ) -> None: monkeypatch.setenv("APP_ENV", "production") monkeypatch.setenv("APP_HOSTNAME", "localhost:8000") monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id") monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret") get_settings.cache_clear() reset_auth_db_caches() with TestClient(create_app()) as client: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) config_response = client.get("/config") assert config_response.status_code == 200 assert "Use the saved TickTick client settings to start the authorization flow." in config_response.text assert "Redirect URI: https://localhost:8000/ticktick/auth/code" in config_response.text assert 'href="/ticktick/auth/start">Authorize TickTick<' in config_response.text def test_config_page_shows_ticktick_oauth_success_notice(client: TestClient) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) response = client.get("/config?ticktick_oauth=success") assert response.status_code == 200 assert "TickTick authorization completed successfully." in response.text def test_config_page_shows_ticktick_oauth_failure_notice(client: TestClient) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) response = client.get("/config?ticktick_oauth=failed") assert response.status_code == 200 assert "TickTick authorization failed. Check server logs for the provider response and verify TickTick app credentials and redirect URI." in response.text