3 Commits

Author SHA1 Message Date
tliu93 779e160b95 add ip change notification and refine sender display
pytest / test (push) Successful in 57s
pytest / test (pull_request) Successful in 54s
2026-04-29 13:03:12 +02:00
tliu93 3ea3498e58 add smtp module and testing 2026-04-29 12:11:10 +02:00
tliu93 5a420bd37b add get public and storage feature 2026-04-29 11:45:49 +02:00
20 changed files with 1402 additions and 62 deletions
+1
View File
@@ -7,6 +7,7 @@ from app.auth_db import AuthBase
from app.config import get_settings
from app.models.config import AppConfigEntry # noqa: F401
from app.models.auth import AuthSession, AuthUser # noqa: F401
from app.models.public_ip import PublicIPHistory, PublicIPState # noqa: F401
config = context.config
@@ -0,0 +1,55 @@
"""public ip monitor tables
Revision ID: 20260429_05_public_ip_monitor
Revises: 20260420_04_app_config_table
Create Date: 2026-04-29 00:00:01.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "20260429_05_public_ip_monitor"
down_revision: Union[str, None] = "20260420_04_app_config_table"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"public_ip_history",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("ipv4", sa.String(length=45), nullable=False),
sa.Column("observed_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("change_type", sa.String(length=32), nullable=False),
sa.Column("provider", sa.String(length=64), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_public_ip_history_observed_at",
"public_ip_history",
["observed_at"],
unique=False,
)
op.create_table(
"public_ip_state",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("current_ipv4", sa.String(length=45), nullable=False),
sa.Column("previous_ipv4", sa.String(length=45), nullable=True),
sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_checked_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_changed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_check_status", sa.String(length=32), nullable=False),
sa.Column("last_check_error", sa.String(length=255), nullable=True),
sa.Column("last_provider", sa.String(length=64), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
def downgrade() -> None:
op.drop_table("public_ip_state")
op.drop_index("ix_public_ip_history_observed_at", table_name="public_ip_history")
op.drop_table("public_ip_history")
+135 -46
View File
@@ -14,6 +14,7 @@ from app.services.config_page import (
is_ticktick_oauth_ready,
save_config_updates,
)
from app.services.email import EmailConfigurationError, EmailDeliveryError, is_smtp_ready, send_smtp_test_email
from sqlalchemy.orm import Session
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[2] / "templates"))
@@ -33,6 +34,49 @@ def _ticktick_oauth_notice(status_value: str | None) -> tuple[str | None, str |
return None, None
def _smtp_test_notice(status_value: str | None) -> tuple[str | None, str | None]:
if status_value == "success":
return "SMTP test email sent successfully.", None
if status_value == "config-error":
return None, "SMTP test failed. Check required SMTP settings before sending a test email."
if status_value == "failed":
return None, "SMTP test failed. Check saved SMTP settings and server reachability."
return None, None
def _build_config_context(
*,
auth_db_session: Session,
settings: Settings,
current_auth: AuthenticatedSession,
config_saved: bool,
config_error: str | None,
password_change_error: str | None,
ticktick_oauth_notice: str | None,
ticktick_oauth_error: str | None,
smtp_test_notice: str | None,
smtp_test_error: str | None,
) -> dict[str, object]:
return {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": password_change_error,
"config_error": config_error,
"config_saved": config_saved,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": ticktick_oauth_notice,
"ticktick_oauth_error": ticktick_oauth_error,
"smtp_test_ready": is_smtp_ready(settings),
"smtp_test_notice": smtp_test_notice,
"smtp_test_error": smtp_test_error,
}
@router.get("/", response_class=HTMLResponse)
def home(
request: Request,
@@ -66,22 +110,19 @@ def config_page(
ticktick_oauth_notice, ticktick_oauth_error = _ticktick_oauth_notice(
request.query_params.get("ticktick_oauth")
)
context = {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": None,
"config_error": None,
"config_saved": request.query_params.get("saved") == "1",
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": ticktick_oauth_notice,
"ticktick_oauth_error": ticktick_oauth_error,
}
smtp_test_notice, smtp_test_error = _smtp_test_notice(request.query_params.get("smtp_test"))
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=request.query_params.get("saved") == "1",
config_error=None,
password_change_error=None,
ticktick_oauth_notice=ticktick_oauth_notice,
ticktick_oauth_error=ticktick_oauth_error,
smtp_test_notice=smtp_test_notice,
smtp_test_error=smtp_test_error,
)
return templates.TemplateResponse(request, "config.html", context)
@@ -99,21 +140,18 @@ async def config_submit(
csrf_token = form.get("csrf_token")
if csrf_token != current_auth.session.csrf_token:
logger.warning("Rejected config update due to CSRF validation failure")
context = {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": None,
"config_error": "invalid config update request",
"config_saved": False,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": None,
"ticktick_oauth_error": None,
}
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=False,
config_error="invalid config update request",
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error=None,
)
return templates.TemplateResponse(
request,
"config.html",
@@ -126,21 +164,18 @@ async def config_submit(
except ConfigSaveError:
logger.warning("Rejected config update due to invalid submitted values")
refreshed_settings = get_settings()
context = {
"app_name": refreshed_settings.app_name,
"app_env": refreshed_settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": None,
"config_error": "invalid config submission",
"config_saved": False,
"config_sections": build_config_sections(auth_db_session, refreshed_settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(refreshed_settings),
"ticktick_redirect_uri": refreshed_settings.ticktick_redirect_uri,
"ticktick_oauth_notice": None,
"ticktick_oauth_error": None,
}
context = _build_config_context(
auth_db_session=auth_db_session,
settings=refreshed_settings,
current_auth=current_auth,
config_saved=False,
config_error="invalid config submission",
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error=None,
)
return templates.TemplateResponse(
request,
"config.html",
@@ -149,3 +184,57 @@ async def config_submit(
)
return RedirectResponse(url="/config?saved=1", status_code=status.HTTP_303_SEE_OTHER)
@router.post("/config/smtp/test", response_class=HTMLResponse)
async def smtp_test_submit(
request: Request,
auth_db_session: Session = Depends(get_auth_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
form = await request.form()
csrf_token = form.get("csrf_token")
if csrf_token != current_auth.session.csrf_token:
logger.warning("Rejected SMTP test due to CSRF validation failure")
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=False,
config_error=None,
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error="invalid SMTP test request",
)
return templates.TemplateResponse(
request,
"config.html",
context,
status_code=status.HTTP_400_BAD_REQUEST,
)
try:
send_smtp_test_email(settings)
except EmailConfigurationError as exc:
logger.warning("SMTP test email rejected due to configuration: %s", exc)
return RedirectResponse(
url="/config?smtp_test=config-error",
status_code=status.HTTP_303_SEE_OTHER,
)
except EmailDeliveryError as exc:
logger.warning("SMTP test email failed: %s", exc)
return RedirectResponse(
url="/config?smtp_test=failed",
status_code=status.HTTP_303_SEE_OTHER,
)
return RedirectResponse(
url="/config?smtp_test=success",
status_code=status.HTTP_303_SEE_OTHER,
)
+26
View File
@@ -0,0 +1,26 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.dependencies import get_auth_db, get_current_auth_session
from app.schemas.public_ip import PublicIPCheckResponse
from app.config import get_settings
from app.services.auth import AuthenticatedSession
from app.services.public_ip import check_public_ipv4_and_notify
router = APIRouter(tags=["public-ip"])
@router.get("/public-ip/check", response_model=PublicIPCheckResponse)
def run_public_ip_check(
session: Session = Depends(get_auth_db),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> PublicIPCheckResponse:
if current_auth is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="authentication required")
result = check_public_ipv4_and_notify(session, bootstrap_settings=get_settings())
return PublicIPCheckResponse(
status=result.status,
checked_at=result.checked_at,
changed=result.changed,
)
+9
View File
@@ -23,6 +23,15 @@ class Settings(BaseSettings):
home_assistant_auth_token: str = ""
home_assistant_timeout_seconds: float = 1.0
home_assistant_action_task_project_id: str = ""
smtp_enabled: bool = False
smtp_host: str = ""
smtp_port: int = 587
smtp_username: str = ""
smtp_password: str = ""
smtp_from_name: str = ""
smtp_from_address: str = ""
smtp_to_address: str = ""
smtp_use_starttls: bool = True
poo_webhook_id: str = ""
poo_sensor_entity_name: str = "sensor.test_poo_status"
poo_sensor_friendly_name: str = "Poo Status"
+25
View File
@@ -3,6 +3,8 @@ from pathlib import Path
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.orm import Session
from app import models # noqa: F401
@@ -12,15 +14,26 @@ import app.auth_db as auth_db
from app.api.routes.homeassistant import router as homeassistant_router
from app.api.routes.location import router as location_router
from app.api.routes.poo import router as poo_router
from app.api.routes.public_ip import router as public_ip_router
from app.api.routes.ticktick import router as ticktick_router
from app.config import get_settings
from app.services.auth import AuthBootstrapError, initialize_auth_schema
from app.services.config_page import seed_missing_config_from_bootstrap, sync_app_hostname_from_bootstrap
from app.services.public_ip import check_public_ipv4_and_notify
from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db
from scripts.location_db_adopt import LocationDatabaseAdoptionError, validate_location_runtime_db
from scripts.poo_db_adopt import PooDatabaseAdoptionError, validate_poo_runtime_db
def _run_scheduled_public_ip_check() -> None:
session_local = auth_db.get_auth_session_local()
session: Session = session_local()
try:
check_public_ipv4_and_notify(session, bootstrap_settings=get_settings())
finally:
session.close()
def ensure_auth_db_ready() -> None:
session_local = auth_db.get_auth_session_local()
session: Session = session_local()
@@ -72,7 +85,18 @@ async def lifespan(_: FastAPI):
ensure_auth_db_ready()
ensure_location_db_ready()
ensure_poo_db_ready()
scheduler = BackgroundScheduler(timezone="UTC")
scheduler.add_job(
_run_scheduled_public_ip_check,
trigger=IntervalTrigger(hours=4),
id="public-ip-check",
replace_existing=True,
max_instances=1,
coalesce=True,
)
scheduler.start()
yield
scheduler.shutdown(wait=False)
def create_app() -> FastAPI:
@@ -97,6 +121,7 @@ def create_app() -> FastAPI:
app.include_router(homeassistant_router)
app.include_router(location_router)
app.include_router(poo_router)
app.include_router(public_ip_router)
app.include_router(ticktick_router)
return app
+9 -1
View File
@@ -3,5 +3,13 @@
from app.models.auth import AuthSession, AuthUser
from app.models.config import AppConfigEntry
from app.models.location import Location
from app.models.public_ip import PublicIPHistory, PublicIPState
__all__ = ["AppConfigEntry", "AuthSession", "AuthUser", "Location"]
__all__ = [
"AppConfigEntry",
"AuthSession",
"AuthUser",
"Location",
"PublicIPHistory",
"PublicIPState",
]
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from sqlalchemy import DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from app.auth_db import AuthBase
class PublicIPState(AuthBase):
__tablename__ = "public_ip_state"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
current_ipv4: Mapped[str] = mapped_column(String(45), nullable=False)
previous_ipv4: Mapped[str | None] = mapped_column(String(45), nullable=True)
first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_checked_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_changed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
last_check_status: Mapped[str] = mapped_column(String(32), nullable=False)
last_check_error: Mapped[str | None] = mapped_column(String(255), nullable=True)
last_provider: Mapped[str | None] = mapped_column(String(64), nullable=True)
class PublicIPHistory(AuthBase):
__tablename__ = "public_ip_history"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ipv4: Mapped[str] = mapped_column(String(45), nullable=False)
observed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
change_type: Mapped[str] = mapped_column(String(32), nullable=False)
provider: Mapped[str | None] = mapped_column(String(64), nullable=True)
+13
View File
@@ -0,0 +1,13 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
PublicIPCheckStatus = Literal["first_seen", "unchanged", "changed", "error"]
class PublicIPCheckResponse(BaseModel):
status: PublicIPCheckStatus
checked_at: datetime
changed: bool
+18
View File
@@ -27,6 +27,15 @@ CONFIG_FIELDS: tuple[ConfigField, ...] = (
ConfigField("System", "APP_ENV", "app_env", "App Env"),
ConfigField("System", "APP_DEBUG", "app_debug", "App Debug"),
ConfigField("System", "APP_HOSTNAME", "app_hostname", "App Hostname"),
ConfigField("SMTP", "SMTP_ENABLED", "smtp_enabled", "SMTP Enabled"),
ConfigField("SMTP", "SMTP_HOST", "smtp_host", "SMTP Host"),
ConfigField("SMTP", "SMTP_PORT", "smtp_port", "SMTP Port"),
ConfigField("SMTP", "SMTP_USERNAME", "smtp_username", "SMTP Username"),
ConfigField("SMTP", "SMTP_PASSWORD", "smtp_password", "SMTP Password", secret=True),
ConfigField("SMTP", "SMTP_FROM_NAME", "smtp_from_name", "SMTP From Name"),
ConfigField("SMTP", "SMTP_FROM_ADDRESS", "smtp_from_address", "SMTP From Address"),
ConfigField("SMTP", "SMTP_TO_ADDRESS", "smtp_to_address", "SMTP To Address"),
ConfigField("SMTP", "SMTP_USE_STARTTLS", "smtp_use_starttls", "SMTP Use STARTTLS"),
ConfigField(
"Authentication",
"AUTH_SESSION_COOKIE_NAME",
@@ -260,6 +269,15 @@ def _settings_payload(settings: Settings) -> dict[str, Any]:
"home_assistant_auth_token": settings.home_assistant_auth_token,
"home_assistant_timeout_seconds": settings.home_assistant_timeout_seconds,
"home_assistant_action_task_project_id": settings.home_assistant_action_task_project_id,
"smtp_enabled": settings.smtp_enabled,
"smtp_host": settings.smtp_host,
"smtp_port": settings.smtp_port,
"smtp_username": settings.smtp_username,
"smtp_password": settings.smtp_password,
"smtp_from_name": settings.smtp_from_name,
"smtp_from_address": settings.smtp_from_address,
"smtp_to_address": settings.smtp_to_address,
"smtp_use_starttls": settings.smtp_use_starttls,
"poo_webhook_id": settings.poo_webhook_id,
"poo_sensor_entity_name": settings.poo_sensor_entity_name,
"poo_sensor_friendly_name": settings.poo_sensor_friendly_name,
+149
View File
@@ -0,0 +1,149 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from email.message import EmailMessage
from email.utils import formataddr
import smtplib
from app.config import Settings
class EmailConfigurationError(ValueError):
"""Raised when SMTP settings are incomplete or disabled."""
class EmailDeliveryError(RuntimeError):
"""Raised when sending email fails."""
@dataclass(frozen=True, slots=True)
class SMTPConfig:
host: str
port: int
username: str
password: str
from_name: str
from_address: str
to_address: str
use_starttls: bool
def get_smtp_config(settings: Settings, *, require_enabled: bool = True) -> SMTPConfig:
if require_enabled and not settings.smtp_enabled:
raise EmailConfigurationError("SMTP is disabled")
if not settings.smtp_host:
raise EmailConfigurationError("SMTP host is required")
if settings.smtp_port <= 0:
raise EmailConfigurationError("SMTP port must be greater than zero")
if not settings.smtp_from_address:
raise EmailConfigurationError("SMTP from address is required")
if not settings.smtp_to_address:
raise EmailConfigurationError("SMTP to address is required")
return SMTPConfig(
host=settings.smtp_host,
port=settings.smtp_port,
username=settings.smtp_username,
password=settings.smtp_password,
from_name=settings.smtp_from_name,
from_address=settings.smtp_from_address,
to_address=settings.smtp_to_address,
use_starttls=settings.smtp_use_starttls,
)
def is_smtp_ready(settings: Settings) -> bool:
try:
get_smtp_config(settings, require_enabled=False)
except EmailConfigurationError:
return False
return True
def send_plaintext_email(
settings: Settings,
*,
subject: str,
body: str,
recipient: str | None = None,
require_enabled: bool = True,
) -> None:
smtp_config = get_smtp_config(settings, require_enabled=require_enabled)
message = EmailMessage()
message["Subject"] = subject
message["From"] = _build_from_header(smtp_config)
message["To"] = recipient or smtp_config.to_address
message.set_content(body)
try:
with smtplib.SMTP(smtp_config.host, smtp_config.port, timeout=10) as smtp:
smtp.ehlo()
if smtp_config.use_starttls:
smtp.starttls()
smtp.ehlo()
if smtp_config.username:
smtp.login(smtp_config.username, smtp_config.password)
smtp.send_message(
message,
from_addr=smtp_config.from_address,
to_addrs=[recipient or smtp_config.to_address],
)
except (OSError, smtplib.SMTPException) as exc:
error_message = _sanitize_error_message(str(exc), smtp_config.password)
raise EmailDeliveryError(error_message or "SMTP delivery failed") from exc
def send_smtp_test_email(settings: Settings) -> None:
send_plaintext_email(
settings,
subject="Home Automation SMTP Test",
body="This is a test email from Home Automation SMTP settings.",
require_enabled=False,
)
def send_public_ip_changed_email(
settings: Settings,
*,
previous_ipv4: str,
current_ipv4: str,
detected_at: datetime,
) -> None:
send_plaintext_email(
settings,
subject="Public IP changed",
body=(
"Your public IPv4 address has changed.\n\n"
f"Previous IP: {previous_ipv4}\n"
f"Current IP: {current_ipv4}\n"
f"Detected at: {_format_utc_timestamp(detected_at)}\n\n"
"If you use Namecheap API trusted IP restrictions, you may need to "
"update the trusted IP manually.\n"
),
)
def _sanitize_error_message(message: str, password: str) -> str:
sanitized = message
if password:
sanitized = sanitized.replace(password, "[redacted]")
return sanitized
def _format_utc_timestamp(value: datetime) -> str:
if value.tzinfo is None:
normalized = value.replace(tzinfo=UTC)
else:
normalized = value.astimezone(UTC)
return normalized.strftime("%Y-%m-%d %H:%M:%S UTC")
def _build_from_header(smtp_config: SMTPConfig) -> str:
if smtp_config.from_name:
return formataddr((smtp_config.from_name, smtp_config.from_address))
return smtp_config.from_address
+191
View File
@@ -0,0 +1,191 @@
from __future__ import annotations
import ipaddress
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Callable, Literal
import httpx
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
from app.models.public_ip import PublicIPHistory, PublicIPState
from app.services.config_page import build_runtime_settings
from app.services.email import EmailConfigurationError, EmailDeliveryError, send_public_ip_changed_email
logger = logging.getLogger(__name__)
PUBLIC_IP_PROVIDER_NAME = "ipify"
PUBLIC_IP_PROVIDER_URL = "https://api.ipify.org"
PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS = 5.0
PublicIPResultStatus = Literal["first_seen", "unchanged", "changed", "error"]
PublicIPv4Fetcher = Callable[[], str]
class PublicIPCheckError(RuntimeError):
"""Raised when the public IPv4 provider cannot return a valid IPv4."""
@dataclass(slots=True)
class PublicIPCheckResult:
status: PublicIPResultStatus
checked_at: datetime
changed: bool
previous_ipv4: str | None = None
current_ipv4: str | None = None
def check_public_ipv4(
session: Session,
*,
fetch_public_ipv4: PublicIPv4Fetcher | None = None,
provider_name: str = PUBLIC_IP_PROVIDER_NAME,
) -> PublicIPCheckResult:
checked_at = _utc_now()
state = session.scalar(select(PublicIPState).where(PublicIPState.id == 1).limit(1))
try:
raw_ipv4 = (fetch_public_ipv4 or fetch_public_ipv4_from_provider)()
current_ipv4 = _validate_ipv4(raw_ipv4)
except PublicIPCheckError as exc:
logger.warning("Public IPv4 check failed: %s", exc)
if state is not None:
state.last_checked_at = checked_at
state.last_check_status = "error"
state.last_check_error = str(exc)
state.last_provider = provider_name
session.commit()
return PublicIPCheckResult(status="error", checked_at=checked_at, changed=False)
if state is None:
state = PublicIPState(
id=1,
current_ipv4=current_ipv4,
previous_ipv4=None,
first_seen_at=checked_at,
last_checked_at=checked_at,
last_changed_at=None,
last_check_status="first_seen",
last_check_error=None,
last_provider=provider_name,
)
session.add(state)
session.add(
PublicIPHistory(
ipv4=current_ipv4,
observed_at=checked_at,
change_type="first_seen",
provider=provider_name,
)
)
session.commit()
return PublicIPCheckResult(
status="first_seen",
checked_at=checked_at,
changed=False,
current_ipv4=current_ipv4,
)
if state.current_ipv4 == current_ipv4:
state.last_checked_at = checked_at
state.last_check_status = "unchanged"
state.last_check_error = None
state.last_provider = provider_name
session.commit()
return PublicIPCheckResult(
status="unchanged",
checked_at=checked_at,
changed=False,
current_ipv4=current_ipv4,
)
previous_ipv4 = state.current_ipv4
state.previous_ipv4 = previous_ipv4
state.current_ipv4 = current_ipv4
state.last_checked_at = checked_at
state.last_changed_at = checked_at
state.last_check_status = "changed"
state.last_check_error = None
state.last_provider = provider_name
session.add(
PublicIPHistory(
ipv4=current_ipv4,
observed_at=checked_at,
change_type="changed",
provider=provider_name,
)
)
session.commit()
return PublicIPCheckResult(
status="changed",
checked_at=checked_at,
changed=True,
previous_ipv4=previous_ipv4,
current_ipv4=current_ipv4,
)
def check_public_ipv4_and_notify(
session: Session,
*,
bootstrap_settings: Settings,
fetch_public_ipv4: PublicIPv4Fetcher | None = None,
provider_name: str = PUBLIC_IP_PROVIDER_NAME,
) -> PublicIPCheckResult:
result = check_public_ipv4(
session,
fetch_public_ipv4=fetch_public_ipv4,
provider_name=provider_name,
)
if result.status != "changed" or result.previous_ipv4 is None or result.current_ipv4 is None:
return result
runtime_settings = build_runtime_settings(session, bootstrap_settings)
try:
send_public_ip_changed_email(
runtime_settings,
previous_ipv4=result.previous_ipv4,
current_ipv4=result.current_ipv4,
detected_at=result.checked_at,
)
except (EmailConfigurationError, EmailDeliveryError) as exc:
logger.warning("Public IPv4 change notification failed: %s", exc)
return result
def fetch_public_ipv4_from_provider() -> str:
try:
response = httpx.get(
PUBLIC_IP_PROVIDER_URL,
params={"format": "text"},
timeout=PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS,
)
response.raise_for_status()
except httpx.HTTPError as exc:
raise PublicIPCheckError(f"provider request failed: {exc}") from exc
return response.text.strip()
def _validate_ipv4(raw_value: str) -> str:
if not raw_value:
raise PublicIPCheckError("provider returned an empty response")
try:
parsed = ipaddress.ip_address(raw_value)
except ValueError as exc:
raise PublicIPCheckError("provider returned an invalid IPv4 value") from exc
if parsed.version != 4:
raise PublicIPCheckError("provider returned a non-IPv4 value")
return str(parsed)
def _utc_now() -> datetime:
return datetime.now(UTC)
+22
View File
@@ -33,6 +33,14 @@
<div class="notice">{{ ticktick_oauth_notice }}</div>
{% endif %}
{% if smtp_test_error %}
<div class="alert">{{ smtp_test_error }}</div>
{% endif %}
{% if smtp_test_notice %}
<div class="notice">{{ smtp_test_notice }}</div>
{% endif %}
<div class="meta single-column">
<div>
<dt>当前用户</dt>
@@ -102,6 +110,20 @@
{% endif %}
</div>
{% endif %}
{% if section.name == "SMTP" %}
<div class="integration-action-row">
<div>
<p class="integration-action-title">SMTP Test Email</p>
<p class="integration-action-copy">Save the SMTP settings first, then send a simple plaintext test email to the configured recipient.</p>
</div>
{% if smtp_test_ready %}
<button type="submit" formaction="/config/smtp/test" formmethod="post">Send SMTP Test</button>
{% else %}
<span class="button-link disabled" aria-disabled="true">Send SMTP Test</span>
{% endif %}
</div>
{% endif %}
</fieldset>
{% endfor %}
+13 -7
View File
@@ -8,15 +8,17 @@ alembic==1.18.4
# via -r requirements.in
annotated-types==0.7.0
# via pydantic
argon2-cffi==25.1.0
# via -r requirements.in
argon2-cffi-bindings==25.1.0
# via argon2-cffi
anyio==4.13.0
# via
# httpx
# starlette
# watchfiles
apscheduler==3.11.2
# via -r requirements.in
argon2-cffi==25.1.0
# via -r requirements.in
argon2-cffi-bindings==25.1.0
# via argon2-cffi
build==1.4.3
# via pip-tools
certifi==2026.2.25
@@ -42,7 +44,9 @@ httpcore==1.0.9
httptools==0.7.1
# via uvicorn
httpx==0.28.1
# via -r dev-requirements.in
# via
# -r dev-requirements.in
# -r requirements.in
idna==3.11
# via
# anyio
@@ -66,6 +70,8 @@ pip-tools==7.5.3
# via -r dev-requirements.in
pluggy==1.6.0
# via pytest
pycparser==2.23
# via cffi
pydantic==2.13.2
# via
# fastapi
@@ -88,8 +94,6 @@ python-dotenv==1.2.2
# uvicorn
python-multipart==0.0.26
# via -r requirements.in
pycparser==2.23
# via cffi
pyyaml==6.0.3
# via
# -r requirements.in
@@ -112,6 +116,8 @@ typing-inspection==0.4.2
# via
# pydantic
# pydantic-settings
tzlocal==5.3.1
# via apscheduler
uvicorn[standard]==0.44.0
# via -r requirements.in
uvloop==0.22.1
+2
View File
@@ -1,6 +1,8 @@
alembic>=1.14,<2.0
apscheduler>=3.10,<4.0
argon2-cffi>=25.1,<26.0
fastapi>=0.115,<0.116
httpx>=0.28,<1.0
jinja2>=3.1,<4.0
pydantic-settings>=2.6,<3.0
python-multipart>=0.0.12,<1.0
+24 -7
View File
@@ -8,14 +8,21 @@ alembic==1.18.4
# via -r requirements.in
annotated-types==0.7.0
# via pydantic
anyio==4.13.0
# via
# httpx
# starlette
# watchfiles
apscheduler==3.11.2
# via -r requirements.in
argon2-cffi==25.1.0
# via -r requirements.in
argon2-cffi-bindings==25.1.0
# via argon2-cffi
anyio==4.13.0
certifi==2026.4.22
# via
# starlette
# watchfiles
# httpcore
# httpx
cffi==2.0.0
# via argon2-cffi-bindings
click==8.3.2
@@ -25,11 +32,19 @@ fastapi==0.115.14
greenlet==3.4.0
# via sqlalchemy
h11==0.16.0
# via uvicorn
# via
# httpcore
# uvicorn
httpcore==1.0.9
# via httpx
httptools==0.7.1
# via uvicorn
httpx==0.28.1
# via -r requirements.in
idna==3.11
# via anyio
# via
# anyio
# httpx
jinja2==3.1.6
# via -r requirements.in
mako==1.3.11
@@ -38,6 +53,8 @@ markupsafe==3.0.3
# via
# jinja2
# mako
pycparser==2.23
# via cffi
pydantic==2.13.2
# via
# fastapi
@@ -52,8 +69,6 @@ python-dotenv==1.2.2
# uvicorn
python-multipart==0.0.26
# via -r requirements.in
pycparser==2.23
# via cffi
pyyaml==6.0.3
# via
# -r requirements.in
@@ -76,6 +91,8 @@ typing-inspection==0.4.2
# via
# pydantic
# pydantic-settings
tzlocal==5.3.1
# via apscheduler
uvicorn[standard]==0.44.0
# via -r requirements.in
uvloop==0.22.1
+1 -1
View File
@@ -15,7 +15,7 @@ if str(PROJECT_ROOT) not in sys.path:
from app.config import get_settings
APP_BASELINE_REVISION = "20260420_04_app_config_table"
APP_BASELINE_REVISION = "20260429_05_public_ip_monitor"
class AppDatabaseAdoptionError(RuntimeError):
+24
View File
@@ -53,3 +53,27 @@ def test_settings_derive_development_ticktick_redirect_uri(monkeypatch) -> None:
assert settings.app_base_url == "http://localhost:11001"
assert settings.ticktick_redirect_uri == "http://localhost:11001/ticktick/auth/code"
def test_settings_support_smtp_fields(monkeypatch) -> None:
monkeypatch.setenv("SMTP_ENABLED", "true")
monkeypatch.setenv("SMTP_HOST", "smtp.example.com")
monkeypatch.setenv("SMTP_PORT", "2525")
monkeypatch.setenv("SMTP_USERNAME", "smtp-user")
monkeypatch.setenv("SMTP_PASSWORD", "smtp-password")
monkeypatch.setenv("SMTP_FROM_NAME", "Home Automation")
monkeypatch.setenv("SMTP_FROM_ADDRESS", "sender@example.com")
monkeypatch.setenv("SMTP_TO_ADDRESS", "recipient@example.com")
monkeypatch.setenv("SMTP_USE_STARTTLS", "false")
settings = Settings(_env_file=None)
assert settings.smtp_enabled is True
assert settings.smtp_host == "smtp.example.com"
assert settings.smtp_port == 2525
assert settings.smtp_username == "smtp-user"
assert settings.smtp_password == "smtp-password"
assert settings.smtp_from_name == "Home Automation"
assert settings.smtp_from_address == "sender@example.com"
assert settings.smtp_to_address == "recipient@example.com"
assert settings.smtp_use_starttls is False
+258
View File
@@ -0,0 +1,258 @@
from datetime import UTC, datetime
import re
import sqlite3
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from app.config import Settings
from app.services.email import EmailDeliveryError
from app.services.public_ip import PublicIPCheckResult, check_public_ipv4, check_public_ipv4_and_notify
def _make_session(database_url: str) -> Session:
engine = create_engine(database_url, connect_args={"check_same_thread": False})
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
return session_local()
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
finally:
session.close()
assert result.status == "first_seen"
assert result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error, last_provider FROM public_ip_state"
).fetchone()
history = conn.execute(
"SELECT ipv4, change_type, provider FROM public_ip_history ORDER BY id"
).fetchall()
finally:
conn.close()
assert state == ("203.0.113.10", None, "first_seen", None, "ipify")
assert history == [("203.0.113.10", "first_seen", "ipify")]
def test_public_ip_unchanged_updates_state_without_adding_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
first_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
unchanged_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
finally:
session.close()
assert first_result.status == "first_seen"
assert unchanged_result.status == "unchanged"
assert unchanged_result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status FROM public_ip_state"
).fetchone()
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
finally:
conn.close()
assert state == ("203.0.113.10", None, "unchanged")
assert history_count == 1
def test_public_ip_changed_updates_state_and_adds_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "198.51.100.25")
finally:
session.close()
assert result.status == "changed"
assert result.changed is True
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_changed_at FROM public_ip_state"
).fetchone()
history = conn.execute(
"SELECT ipv4, change_type FROM public_ip_history ORDER BY id"
).fetchall()
finally:
conn.close()
assert state[0:3] == ("198.51.100.25", "203.0.113.10", "changed")
assert state[3] is not None
assert history == [("203.0.113.10", "first_seen"), ("198.51.100.25", "changed")]
def test_public_ip_error_keeps_existing_ip_and_does_not_add_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "not-an-ip")
finally:
session.close()
assert result.status == "error"
assert result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error FROM public_ip_state"
).fetchone()
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
finally:
conn.close()
assert state[0:3] == ("203.0.113.10", None, "error")
assert state[3] is not None
assert history_count == 1
def test_public_ip_check_endpoint_requires_authentication(client: TestClient) -> None:
response = client.get("/public-ip/check")
assert response.status_code == 401
assert response.json() == {"detail": "authentication required"}
def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatch) -> None:
from app.api.routes import public_ip as public_ip_route
fixed_checked_at = datetime(2026, 4, 29, 12, 0, tzinfo=UTC)
monkeypatch.setattr(
public_ip_route,
"check_public_ipv4_and_notify",
lambda session, bootstrap_settings: PublicIPCheckResult(
status="changed",
checked_at=fixed_checked_at,
changed=True,
),
)
_login(client)
response = client.get("/public-ip/check")
assert response.status_code == 200
assert response.json() == {
"status": "changed",
"checked_at": "2026-04-29T12:00:00Z",
"changed": True,
}
assert "current_ipv4" not in response.text
assert "previous_ipv4" not in response.text
assert "203.0.113.10" not in response.text
def _notification_settings() -> Settings:
return Settings(
_env_file=None,
app_env="development",
app_hostname="localhost:8000",
app_database_url="sqlite:///./data/app.db",
location_database_url="sqlite:///./data/locationRecorder.db",
poo_database_url="sqlite:///./data/pooRecorder.db",
auth_bootstrap_username="admin",
auth_bootstrap_password="secret-password",
smtp_enabled=True,
smtp_host="smtp.example.com",
smtp_port=587,
smtp_username="smtp-user",
smtp_password="super-secret-password",
smtp_from_address="sender@example.com",
smtp_to_address="recipient@example.com",
smtp_use_starttls=True,
)
def test_public_ip_notification_sends_only_when_changed(auth_database, monkeypatch) -> None:
session = _make_session(auth_database["app_url"])
sent = []
monkeypatch.setattr(
"app.services.public_ip.send_public_ip_changed_email",
lambda settings, *, previous_ipv4, current_ipv4, detected_at: sent.append(
(previous_ipv4, current_ipv4, detected_at)
),
)
try:
first_seen = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "203.0.113.10",
)
unchanged = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "203.0.113.10",
)
changed = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "198.51.100.25",
)
finally:
session.close()
assert first_seen.status == "first_seen"
assert unchanged.status == "unchanged"
assert changed.status == "changed"
assert len(sent) == 1
assert sent[0][0] == "203.0.113.10"
assert sent[0][1] == "198.51.100.25"
assert sent[0][2] == changed.checked_at
def test_public_ip_notification_failure_does_not_break_changed_result(auth_database, monkeypatch) -> None:
session = _make_session(auth_database["app_url"])
monkeypatch.setattr(
"app.services.public_ip.send_public_ip_changed_email",
lambda settings, *, previous_ipv4, current_ipv4, detected_at: (_ for _ in ()).throw(
EmailDeliveryError("smtp down")
),
)
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "198.51.100.25",
)
finally:
session.close()
assert result.status == "changed"
assert result.changed is True
assert result.previous_ipv4 == "203.0.113.10"
assert result.current_ipv4 == "198.51.100.25"
+397
View File
@@ -0,0 +1,397 @@
import re
import sqlite3
import smtplib
from fastapi.testclient import TestClient
from app.config import Settings
from app.services.email import (
EmailDeliveryError,
get_smtp_config,
is_smtp_ready,
send_public_ip_changed_email,
send_smtp_test_email,
)
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
def _smtp_settings(**overrides) -> Settings:
payload = {
"app_env": "development",
"app_hostname": "localhost:8000",
"app_database_url": "sqlite:///./data/app.db",
"location_database_url": "sqlite:///./data/locationRecorder.db",
"poo_database_url": "sqlite:///./data/pooRecorder.db",
"auth_bootstrap_username": "admin",
"auth_bootstrap_password": "secret-password",
"smtp_enabled": True,
"smtp_host": "smtp.example.com",
"smtp_port": 587,
"smtp_username": "smtp-user",
"smtp_password": "super-secret-password",
"smtp_from_name": "Home Automation",
"smtp_from_address": "sender@example.com",
"smtp_to_address": "recipient@example.com",
"smtp_use_starttls": True,
}
payload.update(overrides)
return Settings(_env_file=None, **payload)
def test_get_smtp_config_reads_runtime_values() -> None:
settings = _smtp_settings(smtp_port=2525, smtp_use_starttls=False)
smtp_config = get_smtp_config(settings)
assert smtp_config.host == "smtp.example.com"
assert smtp_config.port == 2525
assert smtp_config.username == "smtp-user"
assert smtp_config.password == "super-secret-password"
assert smtp_config.from_name == "Home Automation"
assert smtp_config.from_address == "sender@example.com"
assert smtp_config.to_address == "recipient@example.com"
assert smtp_config.use_starttls is False
def test_smtp_test_readiness_does_not_require_smtp_enabled() -> None:
settings = _smtp_settings(smtp_enabled=False)
assert is_smtp_ready(settings) is True
def test_send_smtp_test_email_success(monkeypatch) -> None:
sent = {}
class FakeSMTP:
def __init__(self, host, port, timeout):
sent["host"] = host
sent["port"] = port
sent["timeout"] = timeout
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
sent["ehlo"] = sent.get("ehlo", 0) + 1
def starttls(self):
sent["starttls"] = True
def login(self, username, password):
sent["username"] = username
sent["password"] = password
def send_message(self, message, from_addr=None, to_addrs=None):
sent["subject"] = message["Subject"]
sent["from"] = message["From"]
sent["to"] = message["To"]
sent["body"] = message.get_content()
sent["envelope_from"] = from_addr
sent["envelope_to"] = to_addrs
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
send_smtp_test_email(_smtp_settings())
assert sent["host"] == "smtp.example.com"
assert sent["port"] == 587
assert sent["timeout"] == 10
assert sent["starttls"] is True
assert sent["username"] == "smtp-user"
assert sent["password"] == "super-secret-password"
assert sent["subject"] == "Home Automation SMTP Test"
assert sent["from"] == "Home Automation <sender@example.com>"
assert sent["to"] == "recipient@example.com"
assert sent["envelope_from"] == "sender@example.com"
assert sent["envelope_to"] == ["recipient@example.com"]
assert "This is a test email" in sent["body"]
def test_send_smtp_test_email_does_not_require_smtp_enabled(monkeypatch) -> None:
sent = {}
class FakeSMTP:
def __init__(self, host, port, timeout):
sent["host"] = host
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
return None
def starttls(self):
return None
def login(self, username, password):
return None
def send_message(self, message, from_addr=None, to_addrs=None):
sent["subject"] = message["Subject"]
sent["from"] = message["From"]
sent["envelope_from"] = from_addr
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
send_smtp_test_email(_smtp_settings(smtp_enabled=False))
assert sent["host"] == "smtp.example.com"
assert sent["subject"] == "Home Automation SMTP Test"
assert sent["from"] == "Home Automation <sender@example.com>"
assert sent["envelope_from"] == "sender@example.com"
def test_send_smtp_test_email_failure_sanitizes_password(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, host, port, timeout):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
return None
def starttls(self):
raise smtplib.SMTPException("authentication failed for super-secret-password")
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
try:
send_smtp_test_email(_smtp_settings())
assert False, "expected EmailDeliveryError"
except EmailDeliveryError as exc:
assert "super-secret-password" not in str(exc)
assert "[redacted]" in str(exc)
def test_send_public_ip_changed_email_contains_expected_english_content(monkeypatch) -> None:
sent = {}
class FakeSMTP:
def __init__(self, host, port, timeout):
sent["host"] = host
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
return None
def starttls(self):
return None
def login(self, username, password):
return None
def send_message(self, message, from_addr=None, to_addrs=None):
sent["subject"] = message["Subject"]
sent["body"] = message.get_content()
sent["from"] = message["From"]
sent["envelope_from"] = from_addr
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
send_public_ip_changed_email(
_smtp_settings(),
previous_ipv4="203.0.113.10",
current_ipv4="198.51.100.25",
detected_at=__import__("datetime").datetime(2026, 4, 29, 10, 0, tzinfo=__import__("datetime").UTC),
)
assert sent["subject"] == "Public IP changed"
assert sent["from"] == "Home Automation <sender@example.com>"
assert sent["envelope_from"] == "sender@example.com"
assert "Your public IPv4 address has changed." in sent["body"]
assert "Previous IP: 203.0.113.10" in sent["body"]
assert "Current IP: 198.51.100.25" in sent["body"]
assert "Detected at: 2026-04-29 10:00:00 UTC" in sent["body"]
assert "update the trusted IP manually" in sent["body"]
def test_config_update_does_not_clear_existing_smtp_password(
client: TestClient, test_database_urls
) -> None:
_login(client)
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
response = client.post(
"/config",
data={
"csrf_token": config_csrf_token,
"APP_NAME": "SMTP Config Test",
"APP_ENV": "development",
"APP_DEBUG": "true",
"APP_HOSTNAME": "localhost:8000",
"SMTP_ENABLED": "true",
"SMTP_HOST": "smtp.example.com",
"SMTP_PORT": "587",
"SMTP_USERNAME": "smtp-user",
"SMTP_PASSWORD": "persist-me",
"SMTP_FROM_ADDRESS": "sender@example.com",
"SMTP_TO_ADDRESS": "recipient@example.com",
"SMTP_USE_STARTTLS": "true",
"AUTH_SESSION_COOKIE_NAME": "home_automation_session",
"AUTH_SESSION_TTL_HOURS": "12",
"AUTH_COOKIE_SECURE_OVERRIDE": "false",
"POO_WEBHOOK_ID": "",
"POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status",
"POO_SENSOR_FRIENDLY_NAME": "Poo Status",
"TICKTICK_CLIENT_ID": "",
"TICKTICK_CLIENT_SECRET": "",
"TICKTICK_TOKEN": "",
"HOME_ASSISTANT_BASE_URL": "",
"HOME_ASSISTANT_AUTH_TOKEN": "",
"HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0",
"HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "",
},
follow_redirects=False,
)
assert response.status_code == 303
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
response = client.post(
"/config",
data={
"csrf_token": config_csrf_token,
"APP_NAME": "SMTP Config Updated",
"APP_ENV": "development",
"APP_DEBUG": "true",
"APP_HOSTNAME": "localhost:8000",
"SMTP_ENABLED": "true",
"SMTP_HOST": "smtp.example.com",
"SMTP_PORT": "587",
"SMTP_USERNAME": "smtp-user",
"SMTP_PASSWORD": "",
"SMTP_FROM_ADDRESS": "sender@example.com",
"SMTP_TO_ADDRESS": "recipient@example.com",
"SMTP_USE_STARTTLS": "true",
"AUTH_SESSION_COOKIE_NAME": "home_automation_session",
"AUTH_SESSION_TTL_HOURS": "12",
"AUTH_COOKIE_SECURE_OVERRIDE": "false",
"POO_WEBHOOK_ID": "",
"POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status",
"POO_SENSOR_FRIENDLY_NAME": "Poo Status",
"TICKTICK_CLIENT_ID": "",
"TICKTICK_CLIENT_SECRET": "",
"TICKTICK_TOKEN": "",
"HOME_ASSISTANT_BASE_URL": "",
"HOME_ASSISTANT_AUTH_TOKEN": "",
"HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0",
"HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "",
},
follow_redirects=False,
)
assert response.status_code == 303
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows["SMTP_PASSWORD"] == "persist-me"
assert rows["APP_NAME"] == "SMTP Config Updated"
def test_smtp_test_endpoint_requires_authentication(client: TestClient) -> None:
response = client.post("/config/smtp/test", data={"csrf_token": "ignored"}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
def test_smtp_test_endpoint_success_and_failure_do_not_expose_password(
client: TestClient, monkeypatch
) -> None:
from app.api.routes import pages
_login(client)
config_page = client.get("/config")
csrf_token = _extract_csrf_token(config_page.text)
monkeypatch.setattr(pages, "send_smtp_test_email", lambda settings: None)
response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?smtp_test=success"
follow_up = client.get(response.headers["location"])
assert follow_up.status_code == 200
assert "SMTP test email sent successfully." in follow_up.text
assert "super-secret-password" not in follow_up.text
monkeypatch.setattr(
pages,
"send_smtp_test_email",
lambda settings: (_ for _ in ()).throw(EmailDeliveryError("smtp auth failed for [redacted]")),
)
response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?smtp_test=failed"
follow_up = client.get(response.headers["location"])
assert follow_up.status_code == 200
assert "SMTP test failed. Check saved SMTP settings and server reachability." in follow_up.text
assert "super-secret-password" not in follow_up.text
def test_config_page_renders_smtp_test_button_with_formaction(
client: TestClient, test_database_urls
) -> None:
_login(client)
conn = sqlite3.connect(test_database_urls["app_path"])
try:
conn.executemany(
"INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at",
[
("SMTP_ENABLED", "true"),
("SMTP_HOST", "smtp.example.com"),
("SMTP_PORT", "587"),
("SMTP_FROM_ADDRESS", "sender@example.com"),
("SMTP_TO_ADDRESS", "recipient@example.com"),
],
)
conn.commit()
finally:
conn.close()
response = client.get("/config")
assert response.status_code == 200
assert 'formaction="/config/smtp/test"' in response.text