From 5a420bd37baa315ad22c5f60d674f86a03e9b9cb Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 29 Apr 2026 11:45:49 +0200 Subject: [PATCH 1/4] add get public and storage feature --- alembic_app/env.py | 1 + .../versions/20260429_05_public_ip_monitor.py | 55 ++++++ app/api/routes/public_ip.py | 25 +++ app/main.py | 25 +++ app/models/__init__.py | 10 +- app/models/public_ip.py | 30 +++ app/schemas/public_ip.py | 13 ++ app/services/public_ip.py | 139 ++++++++++++++ dev-requirements.txt | 20 +- requirements.in | 2 + requirements.txt | 31 +++- scripts/app_db_adopt.py | 2 +- tests/test_public_ip.py | 174 ++++++++++++++++++ 13 files changed, 511 insertions(+), 16 deletions(-) create mode 100644 alembic_app/versions/20260429_05_public_ip_monitor.py create mode 100644 app/api/routes/public_ip.py create mode 100644 app/models/public_ip.py create mode 100644 app/schemas/public_ip.py create mode 100644 app/services/public_ip.py create mode 100644 tests/test_public_ip.py diff --git a/alembic_app/env.py b/alembic_app/env.py index c20c54e..00d6bea 100644 --- a/alembic_app/env.py +++ b/alembic_app/env.py @@ -7,6 +7,7 @@ from app.auth_db import AuthBase from app.config import get_settings from app.models.config import AppConfigEntry # noqa: F401 from app.models.auth import AuthSession, AuthUser # noqa: F401 +from app.models.public_ip import PublicIPHistory, PublicIPState # noqa: F401 config = context.config diff --git a/alembic_app/versions/20260429_05_public_ip_monitor.py b/alembic_app/versions/20260429_05_public_ip_monitor.py new file mode 100644 index 0000000..48978e4 --- /dev/null +++ b/alembic_app/versions/20260429_05_public_ip_monitor.py @@ -0,0 +1,55 @@ +"""public ip monitor tables + +Revision ID: 20260429_05_public_ip_monitor +Revises: 20260420_04_app_config_table +Create Date: 2026-04-29 00:00:01.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "20260429_05_public_ip_monitor" +down_revision: Union[str, None] = "20260420_04_app_config_table" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "public_ip_history", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("ipv4", sa.String(length=45), nullable=False), + sa.Column("observed_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("change_type", sa.String(length=32), nullable=False), + sa.Column("provider", sa.String(length=64), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "ix_public_ip_history_observed_at", + "public_ip_history", + ["observed_at"], + unique=False, + ) + + op.create_table( + "public_ip_state", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("current_ipv4", sa.String(length=45), nullable=False), + sa.Column("previous_ipv4", sa.String(length=45), nullable=True), + sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("last_checked_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("last_changed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("last_check_status", sa.String(length=32), nullable=False), + sa.Column("last_check_error", sa.String(length=255), nullable=True), + sa.Column("last_provider", sa.String(length=64), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + +def downgrade() -> None: + op.drop_table("public_ip_state") + op.drop_index("ix_public_ip_history_observed_at", table_name="public_ip_history") + op.drop_table("public_ip_history") \ No newline at end of file diff --git a/app/api/routes/public_ip.py b/app/api/routes/public_ip.py new file mode 100644 index 0000000..ee9d722 --- /dev/null +++ b/app/api/routes/public_ip.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session + +from app.dependencies import get_auth_db, get_current_auth_session +from app.schemas.public_ip import PublicIPCheckResponse +from app.services.auth import AuthenticatedSession +from app.services.public_ip import check_public_ipv4 + +router = APIRouter(tags=["public-ip"]) + + +@router.get("/public-ip/check", response_model=PublicIPCheckResponse) +def run_public_ip_check( + session: Session = Depends(get_auth_db), + current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), +) -> PublicIPCheckResponse: + if current_auth is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="authentication required") + + result = check_public_ipv4(session) + return PublicIPCheckResponse( + status=result.status, + checked_at=result.checked_at, + changed=result.changed, + ) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 7dddb19..4a8ee2a 100644 --- a/app/main.py +++ b/app/main.py @@ -3,6 +3,8 @@ from pathlib import Path from fastapi import FastAPI from fastapi.staticfiles import StaticFiles +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy.orm import Session from app import models # noqa: F401 @@ -12,15 +14,26 @@ import app.auth_db as auth_db from app.api.routes.homeassistant import router as homeassistant_router from app.api.routes.location import router as location_router from app.api.routes.poo import router as poo_router +from app.api.routes.public_ip import router as public_ip_router from app.api.routes.ticktick import router as ticktick_router from app.config import get_settings from app.services.auth import AuthBootstrapError, initialize_auth_schema from app.services.config_page import seed_missing_config_from_bootstrap, sync_app_hostname_from_bootstrap +from app.services.public_ip import check_public_ipv4 from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db from scripts.location_db_adopt import LocationDatabaseAdoptionError, validate_location_runtime_db from scripts.poo_db_adopt import PooDatabaseAdoptionError, validate_poo_runtime_db +def _run_scheduled_public_ip_check() -> None: + session_local = auth_db.get_auth_session_local() + session: Session = session_local() + try: + check_public_ipv4(session) + finally: + session.close() + + def ensure_auth_db_ready() -> None: session_local = auth_db.get_auth_session_local() session: Session = session_local() @@ -72,7 +85,18 @@ async def lifespan(_: FastAPI): ensure_auth_db_ready() ensure_location_db_ready() ensure_poo_db_ready() + scheduler = BackgroundScheduler(timezone="UTC") + scheduler.add_job( + _run_scheduled_public_ip_check, + trigger=IntervalTrigger(hours=4), + id="public-ip-check", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + scheduler.start() yield + scheduler.shutdown(wait=False) def create_app() -> FastAPI: @@ -97,6 +121,7 @@ def create_app() -> FastAPI: app.include_router(homeassistant_router) app.include_router(location_router) app.include_router(poo_router) + app.include_router(public_ip_router) app.include_router(ticktick_router) return app diff --git a/app/models/__init__.py b/app/models/__init__.py index d8889cc..24d4862 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -3,5 +3,13 @@ from app.models.auth import AuthSession, AuthUser from app.models.config import AppConfigEntry from app.models.location import Location +from app.models.public_ip import PublicIPHistory, PublicIPState -__all__ = ["AppConfigEntry", "AuthSession", "AuthUser", "Location"] +__all__ = [ + "AppConfigEntry", + "AuthSession", + "AuthUser", + "Location", + "PublicIPHistory", + "PublicIPState", +] diff --git a/app/models/public_ip.py b/app/models/public_ip.py new file mode 100644 index 0000000..a88fd4e --- /dev/null +++ b/app/models/public_ip.py @@ -0,0 +1,30 @@ +from datetime import datetime + +from sqlalchemy import DateTime, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from app.auth_db import AuthBase + + +class PublicIPState(AuthBase): + __tablename__ = "public_ip_state" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + current_ipv4: Mapped[str] = mapped_column(String(45), nullable=False) + previous_ipv4: Mapped[str | None] = mapped_column(String(45), nullable=True) + first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + last_checked_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + last_changed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + last_check_status: Mapped[str] = mapped_column(String(32), nullable=False) + last_check_error: Mapped[str | None] = mapped_column(String(255), nullable=True) + last_provider: Mapped[str | None] = mapped_column(String(64), nullable=True) + + +class PublicIPHistory(AuthBase): + __tablename__ = "public_ip_history" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + ipv4: Mapped[str] = mapped_column(String(45), nullable=False) + observed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + change_type: Mapped[str] = mapped_column(String(32), nullable=False) + provider: Mapped[str | None] = mapped_column(String(64), nullable=True) \ No newline at end of file diff --git a/app/schemas/public_ip.py b/app/schemas/public_ip.py new file mode 100644 index 0000000..497b32c --- /dev/null +++ b/app/schemas/public_ip.py @@ -0,0 +1,13 @@ +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel + + +PublicIPCheckStatus = Literal["first_seen", "unchanged", "changed", "error"] + + +class PublicIPCheckResponse(BaseModel): + status: PublicIPCheckStatus + checked_at: datetime + changed: bool diff --git a/app/services/public_ip.py b/app/services/public_ip.py new file mode 100644 index 0000000..a984f25 --- /dev/null +++ b/app/services/public_ip.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import ipaddress +import logging +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Callable, Literal + +import httpx +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.models.public_ip import PublicIPHistory, PublicIPState + +logger = logging.getLogger(__name__) + +PUBLIC_IP_PROVIDER_NAME = "ipify" +PUBLIC_IP_PROVIDER_URL = "https://api.ipify.org" +PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS = 5.0 + +PublicIPResultStatus = Literal["first_seen", "unchanged", "changed", "error"] +PublicIPv4Fetcher = Callable[[], str] + + +class PublicIPCheckError(RuntimeError): + """Raised when the public IPv4 provider cannot return a valid IPv4.""" + + +@dataclass(slots=True) +class PublicIPCheckResult: + status: PublicIPResultStatus + checked_at: datetime + changed: bool + + +def check_public_ipv4( + session: Session, + *, + fetch_public_ipv4: PublicIPv4Fetcher | None = None, + provider_name: str = PUBLIC_IP_PROVIDER_NAME, +) -> PublicIPCheckResult: + checked_at = _utc_now() + state = session.scalar(select(PublicIPState).where(PublicIPState.id == 1).limit(1)) + + try: + raw_ipv4 = (fetch_public_ipv4 or fetch_public_ipv4_from_provider)() + current_ipv4 = _validate_ipv4(raw_ipv4) + except PublicIPCheckError as exc: + logger.warning("Public IPv4 check failed: %s", exc) + if state is not None: + state.last_checked_at = checked_at + state.last_check_status = "error" + state.last_check_error = str(exc) + state.last_provider = provider_name + session.commit() + return PublicIPCheckResult(status="error", checked_at=checked_at, changed=False) + + if state is None: + state = PublicIPState( + id=1, + current_ipv4=current_ipv4, + previous_ipv4=None, + first_seen_at=checked_at, + last_checked_at=checked_at, + last_changed_at=None, + last_check_status="first_seen", + last_check_error=None, + last_provider=provider_name, + ) + session.add(state) + session.add( + PublicIPHistory( + ipv4=current_ipv4, + observed_at=checked_at, + change_type="first_seen", + provider=provider_name, + ) + ) + session.commit() + return PublicIPCheckResult(status="first_seen", checked_at=checked_at, changed=False) + + if state.current_ipv4 == current_ipv4: + state.last_checked_at = checked_at + state.last_check_status = "unchanged" + state.last_check_error = None + state.last_provider = provider_name + session.commit() + return PublicIPCheckResult(status="unchanged", checked_at=checked_at, changed=False) + + state.previous_ipv4 = state.current_ipv4 + state.current_ipv4 = current_ipv4 + state.last_checked_at = checked_at + state.last_changed_at = checked_at + state.last_check_status = "changed" + state.last_check_error = None + state.last_provider = provider_name + session.add( + PublicIPHistory( + ipv4=current_ipv4, + observed_at=checked_at, + change_type="changed", + provider=provider_name, + ) + ) + session.commit() + return PublicIPCheckResult(status="changed", checked_at=checked_at, changed=True) + + +def fetch_public_ipv4_from_provider() -> str: + try: + response = httpx.get( + PUBLIC_IP_PROVIDER_URL, + params={"format": "text"}, + timeout=PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS, + ) + response.raise_for_status() + except httpx.HTTPError as exc: + raise PublicIPCheckError(f"provider request failed: {exc}") from exc + + return response.text.strip() + + +def _validate_ipv4(raw_value: str) -> str: + if not raw_value: + raise PublicIPCheckError("provider returned an empty response") + + try: + parsed = ipaddress.ip_address(raw_value) + except ValueError as exc: + raise PublicIPCheckError("provider returned an invalid IPv4 value") from exc + + if parsed.version != 4: + raise PublicIPCheckError("provider returned a non-IPv4 value") + + return str(parsed) + + +def _utc_now() -> datetime: + return datetime.now(UTC) diff --git a/dev-requirements.txt b/dev-requirements.txt index 26de089..1d79e6a 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,15 +8,17 @@ alembic==1.18.4 # via -r requirements.in annotated-types==0.7.0 # via pydantic -argon2-cffi==25.1.0 - # via -r requirements.in -argon2-cffi-bindings==25.1.0 - # via argon2-cffi anyio==4.13.0 # via # httpx # starlette # watchfiles +apscheduler==3.11.2 + # via -r requirements.in +argon2-cffi==25.1.0 + # via -r requirements.in +argon2-cffi-bindings==25.1.0 + # via argon2-cffi build==1.4.3 # via pip-tools certifi==2026.2.25 @@ -42,7 +44,9 @@ httpcore==1.0.9 httptools==0.7.1 # via uvicorn httpx==0.28.1 - # via -r dev-requirements.in + # via + # -r dev-requirements.in + # -r requirements.in idna==3.11 # via # anyio @@ -66,6 +70,8 @@ pip-tools==7.5.3 # via -r dev-requirements.in pluggy==1.6.0 # via pytest +pycparser==2.23 + # via cffi pydantic==2.13.2 # via # fastapi @@ -88,8 +94,6 @@ python-dotenv==1.2.2 # uvicorn python-multipart==0.0.26 # via -r requirements.in -pycparser==2.23 - # via cffi pyyaml==6.0.3 # via # -r requirements.in @@ -112,6 +116,8 @@ typing-inspection==0.4.2 # via # pydantic # pydantic-settings +tzlocal==5.3.1 + # via apscheduler uvicorn[standard]==0.44.0 # via -r requirements.in uvloop==0.22.1 diff --git a/requirements.in b/requirements.in index 3211579..61b29b2 100644 --- a/requirements.in +++ b/requirements.in @@ -1,6 +1,8 @@ alembic>=1.14,<2.0 +apscheduler>=3.10,<4.0 argon2-cffi>=25.1,<26.0 fastapi>=0.115,<0.116 +httpx>=0.28,<1.0 jinja2>=3.1,<4.0 pydantic-settings>=2.6,<3.0 python-multipart>=0.0.12,<1.0 diff --git a/requirements.txt b/requirements.txt index be07d71..c76c026 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,14 +8,21 @@ alembic==1.18.4 # via -r requirements.in annotated-types==0.7.0 # via pydantic +anyio==4.13.0 + # via + # httpx + # starlette + # watchfiles +apscheduler==3.11.2 + # via -r requirements.in argon2-cffi==25.1.0 # via -r requirements.in argon2-cffi-bindings==25.1.0 # via argon2-cffi -anyio==4.13.0 +certifi==2026.4.22 # via - # starlette - # watchfiles + # httpcore + # httpx cffi==2.0.0 # via argon2-cffi-bindings click==8.3.2 @@ -25,11 +32,19 @@ fastapi==0.115.14 greenlet==3.4.0 # via sqlalchemy h11==0.16.0 - # via uvicorn + # via + # httpcore + # uvicorn +httpcore==1.0.9 + # via httpx httptools==0.7.1 # via uvicorn +httpx==0.28.1 + # via -r requirements.in idna==3.11 - # via anyio + # via + # anyio + # httpx jinja2==3.1.6 # via -r requirements.in mako==1.3.11 @@ -38,6 +53,8 @@ markupsafe==3.0.3 # via # jinja2 # mako +pycparser==2.23 + # via cffi pydantic==2.13.2 # via # fastapi @@ -52,8 +69,6 @@ python-dotenv==1.2.2 # uvicorn python-multipart==0.0.26 # via -r requirements.in -pycparser==2.23 - # via cffi pyyaml==6.0.3 # via # -r requirements.in @@ -76,6 +91,8 @@ typing-inspection==0.4.2 # via # pydantic # pydantic-settings +tzlocal==5.3.1 + # via apscheduler uvicorn[standard]==0.44.0 # via -r requirements.in uvloop==0.22.1 diff --git a/scripts/app_db_adopt.py b/scripts/app_db_adopt.py index 0e88286..4a1b30d 100644 --- a/scripts/app_db_adopt.py +++ b/scripts/app_db_adopt.py @@ -15,7 +15,7 @@ if str(PROJECT_ROOT) not in sys.path: from app.config import get_settings -APP_BASELINE_REVISION = "20260420_04_app_config_table" +APP_BASELINE_REVISION = "20260429_05_public_ip_monitor" class AppDatabaseAdoptionError(RuntimeError): diff --git a/tests/test_public_ip.py b/tests/test_public_ip.py new file mode 100644 index 0000000..23b39e1 --- /dev/null +++ b/tests/test_public_ip.py @@ -0,0 +1,174 @@ +from datetime import UTC, datetime +import re +import sqlite3 + +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +from app.services.public_ip import PublicIPCheckResult, check_public_ipv4 + + +def _make_session(database_url: str) -> Session: + engine = create_engine(database_url, connect_args={"check_same_thread": False}) + session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session) + return session_local() + + +def _extract_csrf_token(html: str) -> str: + match = re.search(r'name="csrf_token" value="([^"]+)"', html) + assert match is not None + return match.group(1) + + +def _login(client: TestClient) -> None: + login_page = client.get("/login") + csrf_token = _extract_csrf_token(login_page.text) + response = client.post( + "/login", + data={ + "username": "admin", + "password": "test-password", + "csrf_token": csrf_token, + }, + follow_redirects=False, + ) + assert response.status_code == 303 + + +def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None: + session = _make_session(auth_database["app_url"]) + try: + result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") + finally: + session.close() + + assert result.status == "first_seen" + assert result.changed is False + + conn = sqlite3.connect(auth_database["app_path"]) + try: + state = conn.execute( + "SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error, last_provider FROM public_ip_state" + ).fetchone() + history = conn.execute( + "SELECT ipv4, change_type, provider FROM public_ip_history ORDER BY id" + ).fetchall() + finally: + conn.close() + + assert state == ("203.0.113.10", None, "first_seen", None, "ipify") + assert history == [("203.0.113.10", "first_seen", "ipify")] + + +def test_public_ip_unchanged_updates_state_without_adding_history(auth_database) -> None: + session = _make_session(auth_database["app_url"]) + try: + first_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") + unchanged_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") + finally: + session.close() + + assert first_result.status == "first_seen" + assert unchanged_result.status == "unchanged" + assert unchanged_result.changed is False + + conn = sqlite3.connect(auth_database["app_path"]) + try: + state = conn.execute( + "SELECT current_ipv4, previous_ipv4, last_check_status FROM public_ip_state" + ).fetchone() + history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0] + finally: + conn.close() + + assert state == ("203.0.113.10", None, "unchanged") + assert history_count == 1 + + +def test_public_ip_changed_updates_state_and_adds_history(auth_database) -> None: + session = _make_session(auth_database["app_url"]) + try: + check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") + result = check_public_ipv4(session, fetch_public_ipv4=lambda: "198.51.100.25") + finally: + session.close() + + assert result.status == "changed" + assert result.changed is True + + conn = sqlite3.connect(auth_database["app_path"]) + try: + state = conn.execute( + "SELECT current_ipv4, previous_ipv4, last_check_status, last_changed_at FROM public_ip_state" + ).fetchone() + history = conn.execute( + "SELECT ipv4, change_type FROM public_ip_history ORDER BY id" + ).fetchall() + finally: + conn.close() + + assert state[0:3] == ("198.51.100.25", "203.0.113.10", "changed") + assert state[3] is not None + assert history == [("203.0.113.10", "first_seen"), ("198.51.100.25", "changed")] + + +def test_public_ip_error_keeps_existing_ip_and_does_not_add_history(auth_database) -> None: + session = _make_session(auth_database["app_url"]) + try: + check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") + result = check_public_ipv4(session, fetch_public_ipv4=lambda: "not-an-ip") + finally: + session.close() + + assert result.status == "error" + assert result.changed is False + + conn = sqlite3.connect(auth_database["app_path"]) + try: + state = conn.execute( + "SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error FROM public_ip_state" + ).fetchone() + history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0] + finally: + conn.close() + + assert state[0:3] == ("203.0.113.10", None, "error") + assert state[3] is not None + assert history_count == 1 + + +def test_public_ip_check_endpoint_requires_authentication(client: TestClient) -> None: + response = client.get("/public-ip/check") + + assert response.status_code == 401 + assert response.json() == {"detail": "authentication required"} + + +def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatch) -> None: + from app.api.routes import public_ip as public_ip_route + + fixed_checked_at = datetime(2026, 4, 29, 12, 0, tzinfo=UTC) + + monkeypatch.setattr( + public_ip_route, + "check_public_ipv4", + lambda session: PublicIPCheckResult( + status="changed", + checked_at=fixed_checked_at, + changed=True, + ), + ) + + _login(client) + response = client.get("/public-ip/check") + + assert response.status_code == 200 + assert response.json() == { + "status": "changed", + "checked_at": "2026-04-29T12:00:00Z", + "changed": True, + } + assert "current_ipv4" not in response.text + assert "previous_ipv4" not in response.text + assert "203.0.113.10" not in response.text -- 2.52.0 From 3ea3498e5878ca83be18728b977d9cf81510c7b2 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 29 Apr 2026 12:11:10 +0200 Subject: [PATCH 2/4] add smtp module and testing --- app/api/routes/pages.py | 181 ++++++++++++++----- app/config.py | 8 + app/services/config_page.py | 16 ++ app/services/email.py | 106 ++++++++++++ app/templates/config.html | 22 +++ tests/test_config.py | 22 +++ tests/test_smtp.py | 334 ++++++++++++++++++++++++++++++++++++ 7 files changed, 643 insertions(+), 46 deletions(-) create mode 100644 app/services/email.py create mode 100644 tests/test_smtp.py diff --git a/app/api/routes/pages.py b/app/api/routes/pages.py index 2fb774e..4b474cb 100644 --- a/app/api/routes/pages.py +++ b/app/api/routes/pages.py @@ -14,6 +14,7 @@ from app.services.config_page import ( is_ticktick_oauth_ready, save_config_updates, ) +from app.services.email import EmailConfigurationError, EmailDeliveryError, is_smtp_ready, send_smtp_test_email from sqlalchemy.orm import Session templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[2] / "templates")) @@ -33,6 +34,49 @@ def _ticktick_oauth_notice(status_value: str | None) -> tuple[str | None, str | return None, None +def _smtp_test_notice(status_value: str | None) -> tuple[str | None, str | None]: + if status_value == "success": + return "SMTP test email sent successfully.", None + if status_value == "config-error": + return None, "SMTP test failed. Check required SMTP settings before sending a test email." + if status_value == "failed": + return None, "SMTP test failed. Check saved SMTP settings and server reachability." + return None, None + + +def _build_config_context( + *, + auth_db_session: Session, + settings: Settings, + current_auth: AuthenticatedSession, + config_saved: bool, + config_error: str | None, + password_change_error: str | None, + ticktick_oauth_notice: str | None, + ticktick_oauth_error: str | None, + smtp_test_notice: str | None, + smtp_test_error: str | None, +) -> dict[str, object]: + return { + "app_name": settings.app_name, + "app_env": settings.app_env, + "current_username": current_auth.user.username, + "csrf_token": current_auth.session.csrf_token, + "force_password_change": current_auth.user.force_password_change, + "password_change_error": password_change_error, + "config_error": config_error, + "config_saved": config_saved, + "config_sections": build_config_sections(auth_db_session, settings), + "ticktick_oauth_ready": is_ticktick_oauth_ready(settings), + "ticktick_redirect_uri": settings.ticktick_redirect_uri, + "ticktick_oauth_notice": ticktick_oauth_notice, + "ticktick_oauth_error": ticktick_oauth_error, + "smtp_test_ready": is_smtp_ready(settings), + "smtp_test_notice": smtp_test_notice, + "smtp_test_error": smtp_test_error, + } + + @router.get("/", response_class=HTMLResponse) def home( request: Request, @@ -66,22 +110,19 @@ def config_page( ticktick_oauth_notice, ticktick_oauth_error = _ticktick_oauth_notice( request.query_params.get("ticktick_oauth") ) - - context = { - "app_name": settings.app_name, - "app_env": settings.app_env, - "current_username": current_auth.user.username, - "csrf_token": current_auth.session.csrf_token, - "force_password_change": current_auth.user.force_password_change, - "password_change_error": None, - "config_error": None, - "config_saved": request.query_params.get("saved") == "1", - "config_sections": build_config_sections(auth_db_session, settings), - "ticktick_oauth_ready": is_ticktick_oauth_ready(settings), - "ticktick_redirect_uri": settings.ticktick_redirect_uri, - "ticktick_oauth_notice": ticktick_oauth_notice, - "ticktick_oauth_error": ticktick_oauth_error, - } + smtp_test_notice, smtp_test_error = _smtp_test_notice(request.query_params.get("smtp_test")) + context = _build_config_context( + auth_db_session=auth_db_session, + settings=settings, + current_auth=current_auth, + config_saved=request.query_params.get("saved") == "1", + config_error=None, + password_change_error=None, + ticktick_oauth_notice=ticktick_oauth_notice, + ticktick_oauth_error=ticktick_oauth_error, + smtp_test_notice=smtp_test_notice, + smtp_test_error=smtp_test_error, + ) return templates.TemplateResponse(request, "config.html", context) @@ -99,21 +140,18 @@ async def config_submit( csrf_token = form.get("csrf_token") if csrf_token != current_auth.session.csrf_token: logger.warning("Rejected config update due to CSRF validation failure") - context = { - "app_name": settings.app_name, - "app_env": settings.app_env, - "current_username": current_auth.user.username, - "csrf_token": current_auth.session.csrf_token, - "force_password_change": current_auth.user.force_password_change, - "password_change_error": None, - "config_error": "invalid config update request", - "config_saved": False, - "config_sections": build_config_sections(auth_db_session, settings), - "ticktick_oauth_ready": is_ticktick_oauth_ready(settings), - "ticktick_redirect_uri": settings.ticktick_redirect_uri, - "ticktick_oauth_notice": None, - "ticktick_oauth_error": None, - } + context = _build_config_context( + auth_db_session=auth_db_session, + settings=settings, + current_auth=current_auth, + config_saved=False, + config_error="invalid config update request", + password_change_error=None, + ticktick_oauth_notice=None, + ticktick_oauth_error=None, + smtp_test_notice=None, + smtp_test_error=None, + ) return templates.TemplateResponse( request, "config.html", @@ -126,21 +164,18 @@ async def config_submit( except ConfigSaveError: logger.warning("Rejected config update due to invalid submitted values") refreshed_settings = get_settings() - context = { - "app_name": refreshed_settings.app_name, - "app_env": refreshed_settings.app_env, - "current_username": current_auth.user.username, - "csrf_token": current_auth.session.csrf_token, - "force_password_change": current_auth.user.force_password_change, - "password_change_error": None, - "config_error": "invalid config submission", - "config_saved": False, - "config_sections": build_config_sections(auth_db_session, refreshed_settings), - "ticktick_oauth_ready": is_ticktick_oauth_ready(refreshed_settings), - "ticktick_redirect_uri": refreshed_settings.ticktick_redirect_uri, - "ticktick_oauth_notice": None, - "ticktick_oauth_error": None, - } + context = _build_config_context( + auth_db_session=auth_db_session, + settings=refreshed_settings, + current_auth=current_auth, + config_saved=False, + config_error="invalid config submission", + password_change_error=None, + ticktick_oauth_notice=None, + ticktick_oauth_error=None, + smtp_test_notice=None, + smtp_test_error=None, + ) return templates.TemplateResponse( request, "config.html", @@ -149,3 +184,57 @@ async def config_submit( ) return RedirectResponse(url="/config?saved=1", status_code=status.HTTP_303_SEE_OTHER) + + +@router.post("/config/smtp/test", response_class=HTMLResponse) +async def smtp_test_submit( + request: Request, + auth_db_session: Session = Depends(get_auth_db), + settings: Settings = Depends(get_app_settings), + current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), +) -> Response: + if current_auth is None: + return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER) + + form = await request.form() + csrf_token = form.get("csrf_token") + if csrf_token != current_auth.session.csrf_token: + logger.warning("Rejected SMTP test due to CSRF validation failure") + context = _build_config_context( + auth_db_session=auth_db_session, + settings=settings, + current_auth=current_auth, + config_saved=False, + config_error=None, + password_change_error=None, + ticktick_oauth_notice=None, + ticktick_oauth_error=None, + smtp_test_notice=None, + smtp_test_error="invalid SMTP test request", + ) + return templates.TemplateResponse( + request, + "config.html", + context, + status_code=status.HTTP_400_BAD_REQUEST, + ) + + try: + send_smtp_test_email(settings) + except EmailConfigurationError as exc: + logger.warning("SMTP test email rejected due to configuration: %s", exc) + return RedirectResponse( + url="/config?smtp_test=config-error", + status_code=status.HTTP_303_SEE_OTHER, + ) + except EmailDeliveryError as exc: + logger.warning("SMTP test email failed: %s", exc) + return RedirectResponse( + url="/config?smtp_test=failed", + status_code=status.HTTP_303_SEE_OTHER, + ) + + return RedirectResponse( + url="/config?smtp_test=success", + status_code=status.HTTP_303_SEE_OTHER, + ) diff --git a/app/config.py b/app/config.py index 1d7e0b9..929b38a 100644 --- a/app/config.py +++ b/app/config.py @@ -23,6 +23,14 @@ class Settings(BaseSettings): home_assistant_auth_token: str = "" home_assistant_timeout_seconds: float = 1.0 home_assistant_action_task_project_id: str = "" + smtp_enabled: bool = False + smtp_host: str = "" + smtp_port: int = 587 + smtp_username: str = "" + smtp_password: str = "" + smtp_from_address: str = "" + smtp_to_address: str = "" + smtp_use_starttls: bool = True poo_webhook_id: str = "" poo_sensor_entity_name: str = "sensor.test_poo_status" poo_sensor_friendly_name: str = "Poo Status" diff --git a/app/services/config_page.py b/app/services/config_page.py index 5a68621..141f75d 100644 --- a/app/services/config_page.py +++ b/app/services/config_page.py @@ -27,6 +27,14 @@ CONFIG_FIELDS: tuple[ConfigField, ...] = ( ConfigField("System", "APP_ENV", "app_env", "App Env"), ConfigField("System", "APP_DEBUG", "app_debug", "App Debug"), ConfigField("System", "APP_HOSTNAME", "app_hostname", "App Hostname"), + ConfigField("SMTP", "SMTP_ENABLED", "smtp_enabled", "SMTP Enabled"), + ConfigField("SMTP", "SMTP_HOST", "smtp_host", "SMTP Host"), + ConfigField("SMTP", "SMTP_PORT", "smtp_port", "SMTP Port"), + ConfigField("SMTP", "SMTP_USERNAME", "smtp_username", "SMTP Username"), + ConfigField("SMTP", "SMTP_PASSWORD", "smtp_password", "SMTP Password", secret=True), + ConfigField("SMTP", "SMTP_FROM_ADDRESS", "smtp_from_address", "SMTP From Address"), + ConfigField("SMTP", "SMTP_TO_ADDRESS", "smtp_to_address", "SMTP To Address"), + ConfigField("SMTP", "SMTP_USE_STARTTLS", "smtp_use_starttls", "SMTP Use STARTTLS"), ConfigField( "Authentication", "AUTH_SESSION_COOKIE_NAME", @@ -260,6 +268,14 @@ def _settings_payload(settings: Settings) -> dict[str, Any]: "home_assistant_auth_token": settings.home_assistant_auth_token, "home_assistant_timeout_seconds": settings.home_assistant_timeout_seconds, "home_assistant_action_task_project_id": settings.home_assistant_action_task_project_id, + "smtp_enabled": settings.smtp_enabled, + "smtp_host": settings.smtp_host, + "smtp_port": settings.smtp_port, + "smtp_username": settings.smtp_username, + "smtp_password": settings.smtp_password, + "smtp_from_address": settings.smtp_from_address, + "smtp_to_address": settings.smtp_to_address, + "smtp_use_starttls": settings.smtp_use_starttls, "poo_webhook_id": settings.poo_webhook_id, "poo_sensor_entity_name": settings.poo_sensor_entity_name, "poo_sensor_friendly_name": settings.poo_sensor_friendly_name, diff --git a/app/services/email.py b/app/services/email.py new file mode 100644 index 0000000..249b921 --- /dev/null +++ b/app/services/email.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from dataclasses import dataclass +from email.message import EmailMessage +import smtplib + +from app.config import Settings + + +class EmailConfigurationError(ValueError): + """Raised when SMTP settings are incomplete or disabled.""" + + +class EmailDeliveryError(RuntimeError): + """Raised when sending email fails.""" + + +@dataclass(frozen=True, slots=True) +class SMTPConfig: + host: str + port: int + username: str + password: str + from_address: str + to_address: str + use_starttls: bool + + +def get_smtp_config(settings: Settings, *, require_enabled: bool = True) -> SMTPConfig: + if require_enabled and not settings.smtp_enabled: + raise EmailConfigurationError("SMTP is disabled") + + if not settings.smtp_host: + raise EmailConfigurationError("SMTP host is required") + + if settings.smtp_port <= 0: + raise EmailConfigurationError("SMTP port must be greater than zero") + + if not settings.smtp_from_address: + raise EmailConfigurationError("SMTP from address is required") + + if not settings.smtp_to_address: + raise EmailConfigurationError("SMTP to address is required") + + return SMTPConfig( + host=settings.smtp_host, + port=settings.smtp_port, + username=settings.smtp_username, + password=settings.smtp_password, + from_address=settings.smtp_from_address, + to_address=settings.smtp_to_address, + use_starttls=settings.smtp_use_starttls, + ) + + +def is_smtp_ready(settings: Settings) -> bool: + try: + get_smtp_config(settings, require_enabled=False) + except EmailConfigurationError: + return False + return True + + +def send_plaintext_email( + settings: Settings, + *, + subject: str, + body: str, + recipient: str | None = None, + require_enabled: bool = True, +) -> None: + smtp_config = get_smtp_config(settings, require_enabled=require_enabled) + message = EmailMessage() + message["Subject"] = subject + message["From"] = smtp_config.from_address + message["To"] = recipient or smtp_config.to_address + message.set_content(body) + + try: + with smtplib.SMTP(smtp_config.host, smtp_config.port, timeout=10) as smtp: + smtp.ehlo() + if smtp_config.use_starttls: + smtp.starttls() + smtp.ehlo() + if smtp_config.username: + smtp.login(smtp_config.username, smtp_config.password) + smtp.send_message(message) + except (OSError, smtplib.SMTPException) as exc: + error_message = _sanitize_error_message(str(exc), smtp_config.password) + raise EmailDeliveryError(error_message or "SMTP delivery failed") from exc + + +def send_smtp_test_email(settings: Settings) -> None: + send_plaintext_email( + settings, + subject="Home Automation SMTP Test", + body="This is a test email from Home Automation SMTP settings.", + require_enabled=False, + ) + + +def _sanitize_error_message(message: str, password: str) -> str: + sanitized = message + if password: + sanitized = sanitized.replace(password, "[redacted]") + return sanitized \ No newline at end of file diff --git a/app/templates/config.html b/app/templates/config.html index 6ce1b81..0fb3f70 100644 --- a/app/templates/config.html +++ b/app/templates/config.html @@ -33,6 +33,14 @@
{{ ticktick_oauth_notice }}
{% endif %} + {% if smtp_test_error %} +
{{ smtp_test_error }}
+ {% endif %} + + {% if smtp_test_notice %} +
{{ smtp_test_notice }}
+ {% endif %} +
当前用户
@@ -102,6 +110,20 @@ {% endif %}
{% endif %} + + {% if section.name == "SMTP" %} +
+
+

SMTP Test Email

+

Save the SMTP settings first, then send a simple plaintext test email to the configured recipient.

+
+ {% if smtp_test_ready %} + + {% else %} + Send SMTP Test + {% endif %} +
+ {% endif %} {% endfor %} diff --git a/tests/test_config.py b/tests/test_config.py index 598d280..1818de3 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -53,3 +53,25 @@ def test_settings_derive_development_ticktick_redirect_uri(monkeypatch) -> None: assert settings.app_base_url == "http://localhost:11001" assert settings.ticktick_redirect_uri == "http://localhost:11001/ticktick/auth/code" + + +def test_settings_support_smtp_fields(monkeypatch) -> None: + monkeypatch.setenv("SMTP_ENABLED", "true") + monkeypatch.setenv("SMTP_HOST", "smtp.example.com") + monkeypatch.setenv("SMTP_PORT", "2525") + monkeypatch.setenv("SMTP_USERNAME", "smtp-user") + monkeypatch.setenv("SMTP_PASSWORD", "smtp-password") + monkeypatch.setenv("SMTP_FROM_ADDRESS", "sender@example.com") + monkeypatch.setenv("SMTP_TO_ADDRESS", "recipient@example.com") + monkeypatch.setenv("SMTP_USE_STARTTLS", "false") + + settings = Settings(_env_file=None) + + assert settings.smtp_enabled is True + assert settings.smtp_host == "smtp.example.com" + assert settings.smtp_port == 2525 + assert settings.smtp_username == "smtp-user" + assert settings.smtp_password == "smtp-password" + assert settings.smtp_from_address == "sender@example.com" + assert settings.smtp_to_address == "recipient@example.com" + assert settings.smtp_use_starttls is False diff --git a/tests/test_smtp.py b/tests/test_smtp.py new file mode 100644 index 0000000..f004b2e --- /dev/null +++ b/tests/test_smtp.py @@ -0,0 +1,334 @@ +import re +import sqlite3 +import smtplib + +from fastapi.testclient import TestClient + +from app.config import Settings +from app.services.email import EmailDeliveryError, get_smtp_config, is_smtp_ready, send_smtp_test_email + + +def _extract_csrf_token(html: str) -> str: + match = re.search(r'name="csrf_token" value="([^"]+)"', html) + assert match is not None + return match.group(1) + + +def _login(client: TestClient) -> None: + login_page = client.get("/login") + csrf_token = _extract_csrf_token(login_page.text) + response = client.post( + "/login", + data={ + "username": "admin", + "password": "test-password", + "csrf_token": csrf_token, + }, + follow_redirects=False, + ) + assert response.status_code == 303 + + +def _smtp_settings(**overrides) -> Settings: + payload = { + "app_env": "development", + "app_hostname": "localhost:8000", + "app_database_url": "sqlite:///./data/app.db", + "location_database_url": "sqlite:///./data/locationRecorder.db", + "poo_database_url": "sqlite:///./data/pooRecorder.db", + "auth_bootstrap_username": "admin", + "auth_bootstrap_password": "secret-password", + "smtp_enabled": True, + "smtp_host": "smtp.example.com", + "smtp_port": 587, + "smtp_username": "smtp-user", + "smtp_password": "super-secret-password", + "smtp_from_address": "sender@example.com", + "smtp_to_address": "recipient@example.com", + "smtp_use_starttls": True, + } + payload.update(overrides) + return Settings(_env_file=None, **payload) + + +def test_get_smtp_config_reads_runtime_values() -> None: + settings = _smtp_settings(smtp_port=2525, smtp_use_starttls=False) + + smtp_config = get_smtp_config(settings) + + assert smtp_config.host == "smtp.example.com" + assert smtp_config.port == 2525 + assert smtp_config.username == "smtp-user" + assert smtp_config.password == "super-secret-password" + assert smtp_config.from_address == "sender@example.com" + assert smtp_config.to_address == "recipient@example.com" + assert smtp_config.use_starttls is False + + +def test_smtp_test_readiness_does_not_require_smtp_enabled() -> None: + settings = _smtp_settings(smtp_enabled=False) + + assert is_smtp_ready(settings) is True + + +def test_send_smtp_test_email_success(monkeypatch) -> None: + sent = {} + + class FakeSMTP: + def __init__(self, host, port, timeout): + sent["host"] = host + sent["port"] = port + sent["timeout"] = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return None + + def ehlo(self): + sent["ehlo"] = sent.get("ehlo", 0) + 1 + + def starttls(self): + sent["starttls"] = True + + def login(self, username, password): + sent["username"] = username + sent["password"] = password + + def send_message(self, message): + sent["subject"] = message["Subject"] + sent["from"] = message["From"] + sent["to"] = message["To"] + sent["body"] = message.get_content() + + monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP) + + send_smtp_test_email(_smtp_settings()) + + assert sent["host"] == "smtp.example.com" + assert sent["port"] == 587 + assert sent["timeout"] == 10 + assert sent["starttls"] is True + assert sent["username"] == "smtp-user" + assert sent["password"] == "super-secret-password" + assert sent["subject"] == "Home Automation SMTP Test" + assert sent["from"] == "sender@example.com" + assert sent["to"] == "recipient@example.com" + assert "This is a test email" in sent["body"] + + +def test_send_smtp_test_email_does_not_require_smtp_enabled(monkeypatch) -> None: + sent = {} + + class FakeSMTP: + def __init__(self, host, port, timeout): + sent["host"] = host + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return None + + def ehlo(self): + return None + + def starttls(self): + return None + + def login(self, username, password): + return None + + def send_message(self, message): + sent["subject"] = message["Subject"] + + monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP) + + send_smtp_test_email(_smtp_settings(smtp_enabled=False)) + + assert sent["host"] == "smtp.example.com" + assert sent["subject"] == "Home Automation SMTP Test" + + +def test_send_smtp_test_email_failure_sanitizes_password(monkeypatch) -> None: + class FakeSMTP: + def __init__(self, host, port, timeout): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return None + + def ehlo(self): + return None + + def starttls(self): + raise smtplib.SMTPException("authentication failed for super-secret-password") + + monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP) + + try: + send_smtp_test_email(_smtp_settings()) + assert False, "expected EmailDeliveryError" + except EmailDeliveryError as exc: + assert "super-secret-password" not in str(exc) + assert "[redacted]" in str(exc) + + +def test_config_update_does_not_clear_existing_smtp_password( + client: TestClient, test_database_urls +) -> None: + _login(client) + config_page = client.get("/config") + config_csrf_token = _extract_csrf_token(config_page.text) + + response = client.post( + "/config", + data={ + "csrf_token": config_csrf_token, + "APP_NAME": "SMTP Config Test", + "APP_ENV": "development", + "APP_DEBUG": "true", + "APP_HOSTNAME": "localhost:8000", + "SMTP_ENABLED": "true", + "SMTP_HOST": "smtp.example.com", + "SMTP_PORT": "587", + "SMTP_USERNAME": "smtp-user", + "SMTP_PASSWORD": "persist-me", + "SMTP_FROM_ADDRESS": "sender@example.com", + "SMTP_TO_ADDRESS": "recipient@example.com", + "SMTP_USE_STARTTLS": "true", + "AUTH_SESSION_COOKIE_NAME": "home_automation_session", + "AUTH_SESSION_TTL_HOURS": "12", + "AUTH_COOKIE_SECURE_OVERRIDE": "false", + "POO_WEBHOOK_ID": "", + "POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status", + "POO_SENSOR_FRIENDLY_NAME": "Poo Status", + "TICKTICK_CLIENT_ID": "", + "TICKTICK_CLIENT_SECRET": "", + "TICKTICK_TOKEN": "", + "HOME_ASSISTANT_BASE_URL": "", + "HOME_ASSISTANT_AUTH_TOKEN": "", + "HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0", + "HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "", + }, + follow_redirects=False, + ) + assert response.status_code == 303 + + config_page = client.get("/config") + config_csrf_token = _extract_csrf_token(config_page.text) + response = client.post( + "/config", + data={ + "csrf_token": config_csrf_token, + "APP_NAME": "SMTP Config Updated", + "APP_ENV": "development", + "APP_DEBUG": "true", + "APP_HOSTNAME": "localhost:8000", + "SMTP_ENABLED": "true", + "SMTP_HOST": "smtp.example.com", + "SMTP_PORT": "587", + "SMTP_USERNAME": "smtp-user", + "SMTP_PASSWORD": "", + "SMTP_FROM_ADDRESS": "sender@example.com", + "SMTP_TO_ADDRESS": "recipient@example.com", + "SMTP_USE_STARTTLS": "true", + "AUTH_SESSION_COOKIE_NAME": "home_automation_session", + "AUTH_SESSION_TTL_HOURS": "12", + "AUTH_COOKIE_SECURE_OVERRIDE": "false", + "POO_WEBHOOK_ID": "", + "POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status", + "POO_SENSOR_FRIENDLY_NAME": "Poo Status", + "TICKTICK_CLIENT_ID": "", + "TICKTICK_CLIENT_SECRET": "", + "TICKTICK_TOKEN": "", + "HOME_ASSISTANT_BASE_URL": "", + "HOME_ASSISTANT_AUTH_TOKEN": "", + "HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0", + "HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "", + }, + follow_redirects=False, + ) + assert response.status_code == 303 + + conn = sqlite3.connect(test_database_urls["app_path"]) + try: + rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall()) + finally: + conn.close() + + assert rows["SMTP_PASSWORD"] == "persist-me" + assert rows["APP_NAME"] == "SMTP Config Updated" + + +def test_smtp_test_endpoint_requires_authentication(client: TestClient) -> None: + response = client.post("/config/smtp/test", data={"csrf_token": "ignored"}, follow_redirects=False) + + assert response.status_code == 303 + assert response.headers["location"] == "/login" + + +def test_smtp_test_endpoint_success_and_failure_do_not_expose_password( + client: TestClient, monkeypatch +) -> None: + from app.api.routes import pages + + _login(client) + config_page = client.get("/config") + csrf_token = _extract_csrf_token(config_page.text) + + monkeypatch.setattr(pages, "send_smtp_test_email", lambda settings: None) + response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False) + assert response.status_code == 303 + assert response.headers["location"] == "/config?smtp_test=success" + + follow_up = client.get(response.headers["location"]) + assert follow_up.status_code == 200 + assert "SMTP test email sent successfully." in follow_up.text + assert "super-secret-password" not in follow_up.text + + monkeypatch.setattr( + pages, + "send_smtp_test_email", + lambda settings: (_ for _ in ()).throw(EmailDeliveryError("smtp auth failed for [redacted]")), + ) + response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False) + assert response.status_code == 303 + assert response.headers["location"] == "/config?smtp_test=failed" + + follow_up = client.get(response.headers["location"]) + assert follow_up.status_code == 200 + assert "SMTP test failed. Check saved SMTP settings and server reachability." in follow_up.text + assert "super-secret-password" not in follow_up.text + + +def test_config_page_renders_smtp_test_button_with_formaction( + client: TestClient, test_database_urls +) -> None: + _login(client) + + conn = sqlite3.connect(test_database_urls["app_path"]) + try: + conn.executemany( + "INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at", + [ + ("SMTP_ENABLED", "true"), + ("SMTP_HOST", "smtp.example.com"), + ("SMTP_PORT", "587"), + ("SMTP_FROM_ADDRESS", "sender@example.com"), + ("SMTP_TO_ADDRESS", "recipient@example.com"), + ], + ) + conn.commit() + finally: + conn.close() + + response = client.get("/config") + + assert response.status_code == 200 + assert 'formaction="/config/smtp/test"' in response.text \ No newline at end of file -- 2.52.0 From 779e160b95b03f4b3ad7e017e08447ddf2ead5aa Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 29 Apr 2026 13:03:12 +0200 Subject: [PATCH 3/4] add ip change notification and refine sender display --- app/api/routes/public_ip.py | 5 ++- app/config.py | 1 + app/main.py | 4 +- app/services/config_page.py | 2 + app/services/email.py | 49 ++++++++++++++++++-- app/services/public_ip.py | 60 +++++++++++++++++++++++-- tests/test_config.py | 2 + tests/test_public_ip.py | 90 +++++++++++++++++++++++++++++++++++-- tests/test_smtp.py | 71 +++++++++++++++++++++++++++-- 9 files changed, 266 insertions(+), 18 deletions(-) diff --git a/app/api/routes/public_ip.py b/app/api/routes/public_ip.py index ee9d722..766525f 100644 --- a/app/api/routes/public_ip.py +++ b/app/api/routes/public_ip.py @@ -3,8 +3,9 @@ from sqlalchemy.orm import Session from app.dependencies import get_auth_db, get_current_auth_session from app.schemas.public_ip import PublicIPCheckResponse +from app.config import get_settings from app.services.auth import AuthenticatedSession -from app.services.public_ip import check_public_ipv4 +from app.services.public_ip import check_public_ipv4_and_notify router = APIRouter(tags=["public-ip"]) @@ -17,7 +18,7 @@ def run_public_ip_check( if current_auth is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="authentication required") - result = check_public_ipv4(session) + result = check_public_ipv4_and_notify(session, bootstrap_settings=get_settings()) return PublicIPCheckResponse( status=result.status, checked_at=result.checked_at, diff --git a/app/config.py b/app/config.py index 929b38a..29adab5 100644 --- a/app/config.py +++ b/app/config.py @@ -28,6 +28,7 @@ class Settings(BaseSettings): smtp_port: int = 587 smtp_username: str = "" smtp_password: str = "" + smtp_from_name: str = "" smtp_from_address: str = "" smtp_to_address: str = "" smtp_use_starttls: bool = True diff --git a/app/main.py b/app/main.py index 4a8ee2a..dd8a9ec 100644 --- a/app/main.py +++ b/app/main.py @@ -19,7 +19,7 @@ from app.api.routes.ticktick import router as ticktick_router from app.config import get_settings from app.services.auth import AuthBootstrapError, initialize_auth_schema from app.services.config_page import seed_missing_config_from_bootstrap, sync_app_hostname_from_bootstrap -from app.services.public_ip import check_public_ipv4 +from app.services.public_ip import check_public_ipv4_and_notify from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db from scripts.location_db_adopt import LocationDatabaseAdoptionError, validate_location_runtime_db from scripts.poo_db_adopt import PooDatabaseAdoptionError, validate_poo_runtime_db @@ -29,7 +29,7 @@ def _run_scheduled_public_ip_check() -> None: session_local = auth_db.get_auth_session_local() session: Session = session_local() try: - check_public_ipv4(session) + check_public_ipv4_and_notify(session, bootstrap_settings=get_settings()) finally: session.close() diff --git a/app/services/config_page.py b/app/services/config_page.py index 141f75d..db7a450 100644 --- a/app/services/config_page.py +++ b/app/services/config_page.py @@ -32,6 +32,7 @@ CONFIG_FIELDS: tuple[ConfigField, ...] = ( ConfigField("SMTP", "SMTP_PORT", "smtp_port", "SMTP Port"), ConfigField("SMTP", "SMTP_USERNAME", "smtp_username", "SMTP Username"), ConfigField("SMTP", "SMTP_PASSWORD", "smtp_password", "SMTP Password", secret=True), + ConfigField("SMTP", "SMTP_FROM_NAME", "smtp_from_name", "SMTP From Name"), ConfigField("SMTP", "SMTP_FROM_ADDRESS", "smtp_from_address", "SMTP From Address"), ConfigField("SMTP", "SMTP_TO_ADDRESS", "smtp_to_address", "SMTP To Address"), ConfigField("SMTP", "SMTP_USE_STARTTLS", "smtp_use_starttls", "SMTP Use STARTTLS"), @@ -273,6 +274,7 @@ def _settings_payload(settings: Settings) -> dict[str, Any]: "smtp_port": settings.smtp_port, "smtp_username": settings.smtp_username, "smtp_password": settings.smtp_password, + "smtp_from_name": settings.smtp_from_name, "smtp_from_address": settings.smtp_from_address, "smtp_to_address": settings.smtp_to_address, "smtp_use_starttls": settings.smtp_use_starttls, diff --git a/app/services/email.py b/app/services/email.py index 249b921..e8ef37e 100644 --- a/app/services/email.py +++ b/app/services/email.py @@ -1,7 +1,9 @@ from __future__ import annotations from dataclasses import dataclass +from datetime import UTC, datetime from email.message import EmailMessage +from email.utils import formataddr import smtplib from app.config import Settings @@ -21,6 +23,7 @@ class SMTPConfig: port: int username: str password: str + from_name: str from_address: str to_address: str use_starttls: bool @@ -47,6 +50,7 @@ def get_smtp_config(settings: Settings, *, require_enabled: bool = True) -> SMTP port=settings.smtp_port, username=settings.smtp_username, password=settings.smtp_password, + from_name=settings.smtp_from_name, from_address=settings.smtp_from_address, to_address=settings.smtp_to_address, use_starttls=settings.smtp_use_starttls, @@ -72,7 +76,7 @@ def send_plaintext_email( smtp_config = get_smtp_config(settings, require_enabled=require_enabled) message = EmailMessage() message["Subject"] = subject - message["From"] = smtp_config.from_address + message["From"] = _build_from_header(smtp_config) message["To"] = recipient or smtp_config.to_address message.set_content(body) @@ -84,7 +88,11 @@ def send_plaintext_email( smtp.ehlo() if smtp_config.username: smtp.login(smtp_config.username, smtp_config.password) - smtp.send_message(message) + smtp.send_message( + message, + from_addr=smtp_config.from_address, + to_addrs=[recipient or smtp_config.to_address], + ) except (OSError, smtplib.SMTPException) as exc: error_message = _sanitize_error_message(str(exc), smtp_config.password) raise EmailDeliveryError(error_message or "SMTP delivery failed") from exc @@ -99,8 +107,43 @@ def send_smtp_test_email(settings: Settings) -> None: ) +def send_public_ip_changed_email( + settings: Settings, + *, + previous_ipv4: str, + current_ipv4: str, + detected_at: datetime, +) -> None: + send_plaintext_email( + settings, + subject="Public IP changed", + body=( + "Your public IPv4 address has changed.\n\n" + f"Previous IP: {previous_ipv4}\n" + f"Current IP: {current_ipv4}\n" + f"Detected at: {_format_utc_timestamp(detected_at)}\n\n" + "If you use Namecheap API trusted IP restrictions, you may need to " + "update the trusted IP manually.\n" + ), + ) + + def _sanitize_error_message(message: str, password: str) -> str: sanitized = message if password: sanitized = sanitized.replace(password, "[redacted]") - return sanitized \ No newline at end of file + return sanitized + + +def _format_utc_timestamp(value: datetime) -> str: + if value.tzinfo is None: + normalized = value.replace(tzinfo=UTC) + else: + normalized = value.astimezone(UTC) + return normalized.strftime("%Y-%m-%d %H:%M:%S UTC") + + +def _build_from_header(smtp_config: SMTPConfig) -> str: + if smtp_config.from_name: + return formataddr((smtp_config.from_name, smtp_config.from_address)) + return smtp_config.from_address \ No newline at end of file diff --git a/app/services/public_ip.py b/app/services/public_ip.py index a984f25..d13d6cc 100644 --- a/app/services/public_ip.py +++ b/app/services/public_ip.py @@ -10,7 +10,10 @@ import httpx from sqlalchemy import select from sqlalchemy.orm import Session +from app.config import Settings from app.models.public_ip import PublicIPHistory, PublicIPState +from app.services.config_page import build_runtime_settings +from app.services.email import EmailConfigurationError, EmailDeliveryError, send_public_ip_changed_email logger = logging.getLogger(__name__) @@ -31,6 +34,8 @@ class PublicIPCheckResult: status: PublicIPResultStatus checked_at: datetime changed: bool + previous_ipv4: str | None = None + current_ipv4: str | None = None def check_public_ipv4( @@ -77,7 +82,12 @@ def check_public_ipv4( ) ) session.commit() - return PublicIPCheckResult(status="first_seen", checked_at=checked_at, changed=False) + return PublicIPCheckResult( + status="first_seen", + checked_at=checked_at, + changed=False, + current_ipv4=current_ipv4, + ) if state.current_ipv4 == current_ipv4: state.last_checked_at = checked_at @@ -85,9 +95,15 @@ def check_public_ipv4( state.last_check_error = None state.last_provider = provider_name session.commit() - return PublicIPCheckResult(status="unchanged", checked_at=checked_at, changed=False) + return PublicIPCheckResult( + status="unchanged", + checked_at=checked_at, + changed=False, + current_ipv4=current_ipv4, + ) - state.previous_ipv4 = state.current_ipv4 + previous_ipv4 = state.current_ipv4 + state.previous_ipv4 = previous_ipv4 state.current_ipv4 = current_ipv4 state.last_checked_at = checked_at state.last_changed_at = checked_at @@ -103,7 +119,43 @@ def check_public_ipv4( ) ) session.commit() - return PublicIPCheckResult(status="changed", checked_at=checked_at, changed=True) + return PublicIPCheckResult( + status="changed", + checked_at=checked_at, + changed=True, + previous_ipv4=previous_ipv4, + current_ipv4=current_ipv4, + ) + + +def check_public_ipv4_and_notify( + session: Session, + *, + bootstrap_settings: Settings, + fetch_public_ipv4: PublicIPv4Fetcher | None = None, + provider_name: str = PUBLIC_IP_PROVIDER_NAME, +) -> PublicIPCheckResult: + result = check_public_ipv4( + session, + fetch_public_ipv4=fetch_public_ipv4, + provider_name=provider_name, + ) + + if result.status != "changed" or result.previous_ipv4 is None or result.current_ipv4 is None: + return result + + runtime_settings = build_runtime_settings(session, bootstrap_settings) + try: + send_public_ip_changed_email( + runtime_settings, + previous_ipv4=result.previous_ipv4, + current_ipv4=result.current_ipv4, + detected_at=result.checked_at, + ) + except (EmailConfigurationError, EmailDeliveryError) as exc: + logger.warning("Public IPv4 change notification failed: %s", exc) + + return result def fetch_public_ipv4_from_provider() -> str: diff --git a/tests/test_config.py b/tests/test_config.py index 1818de3..6aea8d0 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -61,6 +61,7 @@ def test_settings_support_smtp_fields(monkeypatch) -> None: monkeypatch.setenv("SMTP_PORT", "2525") monkeypatch.setenv("SMTP_USERNAME", "smtp-user") monkeypatch.setenv("SMTP_PASSWORD", "smtp-password") + monkeypatch.setenv("SMTP_FROM_NAME", "Home Automation") monkeypatch.setenv("SMTP_FROM_ADDRESS", "sender@example.com") monkeypatch.setenv("SMTP_TO_ADDRESS", "recipient@example.com") monkeypatch.setenv("SMTP_USE_STARTTLS", "false") @@ -72,6 +73,7 @@ def test_settings_support_smtp_fields(monkeypatch) -> None: assert settings.smtp_port == 2525 assert settings.smtp_username == "smtp-user" assert settings.smtp_password == "smtp-password" + assert settings.smtp_from_name == "Home Automation" assert settings.smtp_from_address == "sender@example.com" assert settings.smtp_to_address == "recipient@example.com" assert settings.smtp_use_starttls is False diff --git a/tests/test_public_ip.py b/tests/test_public_ip.py index 23b39e1..a8a9558 100644 --- a/tests/test_public_ip.py +++ b/tests/test_public_ip.py @@ -6,7 +6,9 @@ from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker -from app.services.public_ip import PublicIPCheckResult, check_public_ipv4 +from app.config import Settings +from app.services.email import EmailDeliveryError +from app.services.public_ip import PublicIPCheckResult, check_public_ipv4, check_public_ipv4_and_notify def _make_session(database_url: str) -> Session: @@ -152,8 +154,8 @@ def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatc monkeypatch.setattr( public_ip_route, - "check_public_ipv4", - lambda session: PublicIPCheckResult( + "check_public_ipv4_and_notify", + lambda session, bootstrap_settings: PublicIPCheckResult( status="changed", checked_at=fixed_checked_at, changed=True, @@ -172,3 +174,85 @@ def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatc assert "current_ipv4" not in response.text assert "previous_ipv4" not in response.text assert "203.0.113.10" not in response.text + + +def _notification_settings() -> Settings: + return Settings( + _env_file=None, + app_env="development", + app_hostname="localhost:8000", + app_database_url="sqlite:///./data/app.db", + location_database_url="sqlite:///./data/locationRecorder.db", + poo_database_url="sqlite:///./data/pooRecorder.db", + auth_bootstrap_username="admin", + auth_bootstrap_password="secret-password", + smtp_enabled=True, + smtp_host="smtp.example.com", + smtp_port=587, + smtp_username="smtp-user", + smtp_password="super-secret-password", + smtp_from_address="sender@example.com", + smtp_to_address="recipient@example.com", + smtp_use_starttls=True, + ) + + +def test_public_ip_notification_sends_only_when_changed(auth_database, monkeypatch) -> None: + session = _make_session(auth_database["app_url"]) + sent = [] + monkeypatch.setattr( + "app.services.public_ip.send_public_ip_changed_email", + lambda settings, *, previous_ipv4, current_ipv4, detected_at: sent.append( + (previous_ipv4, current_ipv4, detected_at) + ), + ) + try: + first_seen = check_public_ipv4_and_notify( + session, + bootstrap_settings=_notification_settings(), + fetch_public_ipv4=lambda: "203.0.113.10", + ) + unchanged = check_public_ipv4_and_notify( + session, + bootstrap_settings=_notification_settings(), + fetch_public_ipv4=lambda: "203.0.113.10", + ) + changed = check_public_ipv4_and_notify( + session, + bootstrap_settings=_notification_settings(), + fetch_public_ipv4=lambda: "198.51.100.25", + ) + finally: + session.close() + + assert first_seen.status == "first_seen" + assert unchanged.status == "unchanged" + assert changed.status == "changed" + assert len(sent) == 1 + assert sent[0][0] == "203.0.113.10" + assert sent[0][1] == "198.51.100.25" + assert sent[0][2] == changed.checked_at + + +def test_public_ip_notification_failure_does_not_break_changed_result(auth_database, monkeypatch) -> None: + session = _make_session(auth_database["app_url"]) + monkeypatch.setattr( + "app.services.public_ip.send_public_ip_changed_email", + lambda settings, *, previous_ipv4, current_ipv4, detected_at: (_ for _ in ()).throw( + EmailDeliveryError("smtp down") + ), + ) + try: + check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") + result = check_public_ipv4_and_notify( + session, + bootstrap_settings=_notification_settings(), + fetch_public_ipv4=lambda: "198.51.100.25", + ) + finally: + session.close() + + assert result.status == "changed" + assert result.changed is True + assert result.previous_ipv4 == "203.0.113.10" + assert result.current_ipv4 == "198.51.100.25" diff --git a/tests/test_smtp.py b/tests/test_smtp.py index f004b2e..2881da6 100644 --- a/tests/test_smtp.py +++ b/tests/test_smtp.py @@ -5,7 +5,13 @@ import smtplib from fastapi.testclient import TestClient from app.config import Settings -from app.services.email import EmailDeliveryError, get_smtp_config, is_smtp_ready, send_smtp_test_email +from app.services.email import ( + EmailDeliveryError, + get_smtp_config, + is_smtp_ready, + send_public_ip_changed_email, + send_smtp_test_email, +) def _extract_csrf_token(html: str) -> str: @@ -43,6 +49,7 @@ def _smtp_settings(**overrides) -> Settings: "smtp_port": 587, "smtp_username": "smtp-user", "smtp_password": "super-secret-password", + "smtp_from_name": "Home Automation", "smtp_from_address": "sender@example.com", "smtp_to_address": "recipient@example.com", "smtp_use_starttls": True, @@ -60,6 +67,7 @@ def test_get_smtp_config_reads_runtime_values() -> None: assert smtp_config.port == 2525 assert smtp_config.username == "smtp-user" assert smtp_config.password == "super-secret-password" + assert smtp_config.from_name == "Home Automation" assert smtp_config.from_address == "sender@example.com" assert smtp_config.to_address == "recipient@example.com" assert smtp_config.use_starttls is False @@ -96,11 +104,13 @@ def test_send_smtp_test_email_success(monkeypatch) -> None: sent["username"] = username sent["password"] = password - def send_message(self, message): + def send_message(self, message, from_addr=None, to_addrs=None): sent["subject"] = message["Subject"] sent["from"] = message["From"] sent["to"] = message["To"] sent["body"] = message.get_content() + sent["envelope_from"] = from_addr + sent["envelope_to"] = to_addrs monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP) @@ -113,8 +123,10 @@ def test_send_smtp_test_email_success(monkeypatch) -> None: assert sent["username"] == "smtp-user" assert sent["password"] == "super-secret-password" assert sent["subject"] == "Home Automation SMTP Test" - assert sent["from"] == "sender@example.com" + assert sent["from"] == "Home Automation " assert sent["to"] == "recipient@example.com" + assert sent["envelope_from"] == "sender@example.com" + assert sent["envelope_to"] == ["recipient@example.com"] assert "This is a test email" in sent["body"] @@ -140,8 +152,10 @@ def test_send_smtp_test_email_does_not_require_smtp_enabled(monkeypatch) -> None def login(self, username, password): return None - def send_message(self, message): + def send_message(self, message, from_addr=None, to_addrs=None): sent["subject"] = message["Subject"] + sent["from"] = message["From"] + sent["envelope_from"] = from_addr monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP) @@ -149,6 +163,8 @@ def test_send_smtp_test_email_does_not_require_smtp_enabled(monkeypatch) -> None assert sent["host"] == "smtp.example.com" assert sent["subject"] == "Home Automation SMTP Test" + assert sent["from"] == "Home Automation " + assert sent["envelope_from"] == "sender@example.com" def test_send_smtp_test_email_failure_sanitizes_password(monkeypatch) -> None: @@ -178,6 +194,53 @@ def test_send_smtp_test_email_failure_sanitizes_password(monkeypatch) -> None: assert "[redacted]" in str(exc) +def test_send_public_ip_changed_email_contains_expected_english_content(monkeypatch) -> None: + sent = {} + + class FakeSMTP: + def __init__(self, host, port, timeout): + sent["host"] = host + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return None + + def ehlo(self): + return None + + def starttls(self): + return None + + def login(self, username, password): + return None + + def send_message(self, message, from_addr=None, to_addrs=None): + sent["subject"] = message["Subject"] + sent["body"] = message.get_content() + sent["from"] = message["From"] + sent["envelope_from"] = from_addr + + monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP) + + send_public_ip_changed_email( + _smtp_settings(), + previous_ipv4="203.0.113.10", + current_ipv4="198.51.100.25", + detected_at=__import__("datetime").datetime(2026, 4, 29, 10, 0, tzinfo=__import__("datetime").UTC), + ) + + assert sent["subject"] == "Public IP changed" + assert sent["from"] == "Home Automation " + assert sent["envelope_from"] == "sender@example.com" + assert "Your public IPv4 address has changed." in sent["body"] + assert "Previous IP: 203.0.113.10" in sent["body"] + assert "Current IP: 198.51.100.25" in sent["body"] + assert "Detected at: 2026-04-29 10:00:00 UTC" in sent["body"] + assert "update the trusted IP manually" in sent["body"] + + def test_config_update_does_not_clear_existing_smtp_password( client: TestClient, test_database_urls ) -> None: -- 2.52.0 From eda49489e0ec61777f4e2aa97c68c409cb1cd069 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 29 Apr 2026 13:07:59 +0200 Subject: [PATCH 4/4] update reademe and docs --- README.md | 76 ++++++++++++++++++++ docs/architecture-overview.md | 4 ++ docs/public-ip-monitor.md | 126 ++++++++++++++++++++++++++++++++++ 3 files changed, 206 insertions(+) create mode 100644 docs/public-ip-monitor.md diff --git a/README.md b/README.md index fdaaba9..9fd0eee 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ - SQLite + SQLAlchemy + Alembic 的三库结构 - username/password + server-side session 鉴权 - runtime config 页面与 app DB 持久化 +- public IPv4 monitor、历史持久化与定时检查 +- SMTP 配置、测试发信与 public IPv4 changed 邮件通知 - location recorder - poo recorder - Home Assistant inbound / outbound integration @@ -40,6 +42,7 @@ - 单个 admin 用户 - server-side session - runtime config 持久化 +- public IPv4 当前状态与变化历史 这部分现在也使用 Alembic 管理: @@ -199,6 +202,79 @@ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 - token / secret 这类运行时必须可取回的配置,目前允许明文存储在 config 表中 - 登录密码仍然单独使用 Argon2 哈希,不走 config 表明文存储 +当前已经接入 config 页面的运行时配置包括: + +- 基础系统配置 +- auth cookie 相关配置 +- SMTP 基础配置 +- TickTick OAuth 配置 +- Home Assistant 配置 + +其中 SMTP password 与其他 secret 字段一致: + +- 页面不明文回显 +- 留空提交时保留旧值 +- 用于测试发信与自动通知时不会写入响应 + +## Public IPv4 Monitor + +当前系统已经提供最小可用的 public IPv4 monitor: + +- 使用单一 provider 检查当前公网 IPv4 +- 将状态与变化历史持久化到 app DB +- 提供受保护的手动检查入口:`GET /public-ip/check` +- 启动时注册 APScheduler job,默认每 4 小时检查一次 + +当前 app DB 中与此功能相关的新表: + +- `public_ip_state` +- `public_ip_history` + +状态语义如下: + +- `first_seen`:首次发现当前公网 IPv4 +- `unchanged`:与上次状态一致 +- `changed`:公网 IPv4 发生变化 +- `error`:provider 请求失败或返回无效值 + +## SMTP 与邮件通知 + +当前系统已经提供最小可用的 SMTP 能力: + +- SMTP 配置可在 `/config` 页面填写并保存到 `app_config` +- 可通过 config 页面发送测试邮件 +- 邮件 `From` 头支持显示名,例如 `Home Automation ` + +当前 SMTP 配置项包括: + +- `SMTP_ENABLED` +- `SMTP_HOST` +- `SMTP_PORT` +- `SMTP_USERNAME` +- `SMTP_PASSWORD` +- `SMTP_FROM_NAME` +- `SMTP_FROM_ADDRESS` +- `SMTP_TO_ADDRESS` +- `SMTP_USE_STARTTLS` + +当前 public IPv4 monitor 已与 SMTP sender 接通,但只处理一个很小的通知场景: + +- 当 public IPv4 check 结果为 `changed` 时,自动发送一封英文纯文本邮件 + +以下情况不会发邮件: + +- `first_seen` +- `unchanged` +- `error` + +当前通知邮件内容固定,不提供模板系统,正文会包含: + +- previous IP +- current IP +- detected time + +手动测试时,如果需要再次模拟一次 IP 变化,可以临时修改 `public_ip_state.current_ipv4` 为一个保留测试地址,然后再次调用 `GET /public-ip/check`。 + ## OpenAPI 可使用下面的脚本重新导出当前 API 定义: diff --git a/docs/architecture-overview.md b/docs/architecture-overview.md index 7c1c5db..c7c853c 100644 --- a/docs/architecture-overview.md +++ b/docs/architecture-overview.md @@ -32,6 +32,7 @@ - `api/` - HTTP routes - 当前已迁入 `/login`、`/logout`、`/admin` + - 当前已迁入 `GET /public-ip/check` - 当前已迁入 `POST /homeassistant/publish` 第一版入口 - 当前已迁入 `POST /poo/record` 与 `GET /poo/latest` - `models/` @@ -42,6 +43,8 @@ - `services/` - 业务服务层 - 当前已迁入 config page 的 DB 持久化逻辑 + - 当前已迁入 public IPv4 检查、状态持久化与变化通知逻辑 + - 当前已迁入 SMTP 发信与测试发信逻辑 - `integrations/` - 外部系统适配层 - 当前已迁入 Home Assistant outbound adapter @@ -80,6 +83,7 @@ pytest 测试目录。后续可以在这里自然扩展: - 当前数据库继续使用 SQLite - 当前不引入前后端分离 - 当前不设计 Notion 模块 +- 当前通知能力仍保持极小范围,不引入独立通知中心或多渠道抽象 ## 关于 Notion diff --git a/docs/public-ip-monitor.md b/docs/public-ip-monitor.md new file mode 100644 index 0000000..b80e7b1 --- /dev/null +++ b/docs/public-ip-monitor.md @@ -0,0 +1,126 @@ +# Public IPv4 Monitor 与邮件通知 + +本文档说明当前 public IPv4 monitor 与 SMTP 邮件通知能力的职责边界和运行方式。 + +## 当前范围 + +当前实现只覆盖一个很小的通知能力: + +- 定期或手动检查当前公网 IPv4 +- 将当前状态和变化历史持久化到 app DB +- 仅在公网 IPv4 发生变化时发送一封英文纯文本邮件 + +当前明确不包含: + +- Namecheap API 自动更新 +- IPv6 检查 +- 错误告警邮件 +- 重复提醒 / 升级告警 +- Telegram / Slack / Discord 通知 +- 完整通知中心或模板系统 + +## 数据存储 + +当前数据全部进入 app DB。 + +相关表: + +- `public_ip_state` + - 保存当前状态 + - 逻辑上通常只有一行 +- `public_ip_history` + - 保存首次发现和变化历史 + +当前不会把 public IP 状态放进 `app_config`。 + +## 检查结果语义 + +一次检查会返回以下四种结果之一: + +- `first_seen` +- `unchanged` +- `changed` +- `error` + +行为约束: + +- `first_seen`:写入当前 IP 和首条 history,但不发通知邮件 +- `unchanged`:只更新时间和状态,不写 history,不发邮件 +- `changed`:更新 `previous_ipv4` / `current_ipv4` / `last_changed_at`,写入 history,并发送邮件 +- `error`:保留已有有效 IP,不写伪 history,也不发邮件 + +## 手动检查与定时检查 + +手动检查入口: + +- `GET /public-ip/check` + +约束: + +- 需要现有鉴权 +- 响应不暴露 IP 本身 +- 只返回非敏感检查结果 + +定时检查: + +- 应用启动时注册 APScheduler job +- 默认每 4 小时执行一次 +- 与手动检查复用同一套 public IP check + notify 逻辑 + +## SMTP 通知 + +当前通知发信复用现有 SMTP sender。 + +依赖的配置项: + +- `SMTP_ENABLED` +- `SMTP_HOST` +- `SMTP_PORT` +- `SMTP_USERNAME` +- `SMTP_PASSWORD` +- `SMTP_FROM_NAME` +- `SMTP_FROM_ADDRESS` +- `SMTP_TO_ADDRESS` +- `SMTP_USE_STARTTLS` + +其中: + +- `SMTP_FROM_NAME` 用于邮件头显示名 +- `From` 头会渲染成 `Name ` +- SMTP envelope sender 仍然使用纯邮箱地址,保持兼容性 + +## 通知触发条件 + +只有在 `changed` 时发邮件。 + +不会发邮件的情况: + +- `first_seen` +- `unchanged` +- `error` + +这使得同一 IP 状态不会被重复通知,因为在首次变更之后,后续重复检查会变成 `unchanged`。 + +## 邮件内容 + +当前邮件标题固定为: + +- `Public IP changed` + +正文为英文纯文本,至少包含: + +- previous IP +- current IP +- detected time + +当前正文还会附带一句 Namecheap trusted IP 的人工更新提示。 + +## 失败处理 + +当前通知发送是“尽力而为”的附加动作: + +- public IP 状态持久化先完成 +- 邮件发送失败不会回滚 public IP 状态 +- 失败只记录 warning 日志 + +这样可以避免通知链路反过来影响主检查流程。 \ No newline at end of file -- 2.52.0