10 Commits

Author SHA1 Message Date
tliu93 779e160b95 add ip change notification and refine sender display
pytest / test (push) Successful in 57s
pytest / test (pull_request) Successful in 54s
2026-04-29 13:03:12 +02:00
tliu93 3ea3498e58 add smtp module and testing 2026-04-29 12:11:10 +02:00
tliu93 5a420bd37b add get public and storage feature 2026-04-29 11:45:49 +02:00
tliu93 a24e402d47 add grafana provisioning
pytest / test (push) Successful in 46s
2026-04-23 00:12:51 +02:00
tliu93 8565534b73 Merge pull request 'fix ci test' (#5) from feature/add_separate_migration_container into main
pytest / test (push) Successful in 45s
docker-image / build-and-push (push) Successful in 3m40s
Reviewed-on: #5
2026-04-22 13:35:40 +02:00
tliu93 4acdd2dc60 fix ci test
pytest / test (push) Successful in 45s
pytest / test (pull_request) Successful in 44s
2026-04-22 13:31:26 +02:00
tliu93 c9af7530e5 Merge pull request 'change adoption to separate step' (#4) from feature/add_separate_migration_container into main
pytest / test (push) Failing after 44s
docker-image / build-and-push (push) Successful in 3m40s
Reviewed-on: #4
2026-04-22 13:28:30 +02:00
tliu93 a76d6bfb71 change adoption to separate step
pytest / test (push) Failing after 46s
pytest / test (pull_request) Failing after 45s
2026-04-22 13:28:00 +02:00
tliu93 35aee79d93 Restore legacy poo inbound dispatch
pytest / test (push) Successful in 43s
docker-image / build-and-push (push) Successful in 3m38s
2026-04-20 23:33:57 +02:00
tliu93 b9e7f51d51 Split compose dev build from registry deploy
pytest / test (push) Successful in 44s
2026-04-20 23:16:13 +02:00
39 changed files with 2623 additions and 93 deletions
+1
View File
@@ -23,3 +23,4 @@ RUN mkdir -p /app/data
EXPOSE 8000
ENTRYPOINT ["/app/docker/entrypoint.sh"]
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
+70 -4
View File
@@ -107,9 +107,7 @@ cp .env.example .env
3. 初始化数据库
```bash
python scripts/app_db_adopt.py
python scripts/location_db_adopt.py
python scripts/poo_db_adopt.py
python -m scripts.run_migrations
```
4. 启动服务
@@ -141,6 +139,7 @@ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
- App Alembic 环境:`alembic_app.ini` + `alembic_app/`
- Location Alembic 环境:`alembic_location.ini` + `alembic_location/`
- Poo Alembic 环境:`alembic_poo.ini` + `alembic_poo/`
- 统一 migration job`python -m scripts.run_migrations`
- App DB 初始化:`python scripts/app_db_adopt.py`
- Location DB 接管 / 初始化:`python scripts/location_db_adopt.py`
- Poo DB 接管 / 初始化:`python scripts/poo_db_adopt.py`
@@ -217,18 +216,81 @@ python scripts/export_openapi.py
当前默认 Compose 服务名为 `app`,容器名固定为 `home-automation-app`
启动方式
当前 Compose 分成两层
- `docker-compose.yml`:默认使用 registry image,适合部署 / 生产拉取
- `docker-compose.override.yml`:仅为本地开发追加 `build: .`
本地开发启动方式:
```bash
docker compose up -d --build
```
上面的命令会自动叠加 `docker-compose.override.yml`,因此本地仍然会按当前工作目录重新 build。
如果要按生产方式直接从 registry 拉取并启动,显式只使用基础 compose 文件:
```bash
docker compose -f docker-compose.yml pull
docker compose -f docker-compose.yml up -d
```
持续查看日志:
```bash
docker compose logs -f app
```
## Grafana Provisioning
当前仓库支持通过 Grafana provisioning 自动加载 SQLite datasource 和 repo 内的 dashboard 导出文件。
需要保留的文件路径如下:
- `grafana/provisioning/datasources/locationrecorder.yaml`
- `grafana/provisioning/datasources/poorecorder.yaml`
- `grafana/provisioning/dashboards/provider.yaml`
- `grafana/dashboards/locationrecorder.json`
- `grafana/dashboards/poorecorder.json`
这些文件的职责分别是:
- `grafana/provisioning/datasources/locationrecorder.yaml`:声明 `locationrecorder` SQLite datasource,并指向 `/data/home-automation/locationRecorder.db`
- `grafana/provisioning/datasources/poorecorder.yaml`:声明 `poorecorder` SQLite datasource,并指向 `/data/home-automation/pooRecorder.db`
- `grafana/provisioning/dashboards/provider.yaml`:告诉 Grafana 从 `/var/lib/grafana/dashboards` 扫描并加载 dashboard JSON
- `grafana/dashboards/locationrecorder.json`location recorder dashboard 导出文件,内容本身不需要在 compose 中改写
- `grafana/dashboards/poorecorder.json`poo recorder dashboard 导出文件,内容本身不需要在 compose 中改写
当前 `docker-compose.yml` 中,Grafana service 需要挂载以下目录:
- `./grafana/provisioning -> /etc/grafana/provisioning:ro`
- `./grafana/dashboards -> /var/lib/grafana/dashboards:ro`
同时保留现有 named volume `homeautomation_grafana_storage:/var/lib/grafana` 作为 Grafana 运行态数据存储。
一键启动前,至少需要以下文件已经存在:
- `grafana/provisioning/datasources/locationrecorder.yaml`
- `grafana/provisioning/datasources/poorecorder.yaml`
- `grafana/provisioning/dashboards/provider.yaml`
- `grafana/dashboards/locationrecorder.json`
- `grafana/dashboards/poorecorder.json`
启动方式:
```bash
docker compose up -d
```
启动后会发生的事情:
- Grafana 容器会安装 `frser-sqlite-datasource` 插件
- Grafana 会读取 `/etc/grafana/provisioning/datasources/` 下的 datasource YAML
- Grafana 会读取 `/etc/grafana/provisioning/dashboards/provider.yaml`
- Grafana 会从 `/var/lib/grafana/dashboards/` 自动导入两个 dashboard JSON
- 现有 Grafana named volume 继续负责保存 Grafana 运行态数据,不会覆盖 repo 内的 dashboard 与 provisioning 文件
## Container Image CI
项目提供了一个 release image workflow
@@ -238,6 +300,10 @@ docker compose logs -f app
- registry`code.wanderingbadger.dev`
- image`code.wanderingbadger.dev/<owner>/<repo>`
`docker-compose.yml` 中生产默认使用的 app image 当前为:
- `code.wanderingbadger.dev/tliu93/home-automation:latest`
当前 workflow 不再把 image name 硬编码到特定 user package 路径,而是直接使用当前仓库标识生成镜像路径:
- `code.wanderingbadger.dev/${github.repository}:${tag}`
+1
View File
@@ -7,6 +7,7 @@ from app.auth_db import AuthBase
from app.config import get_settings
from app.models.config import AppConfigEntry # noqa: F401
from app.models.auth import AuthSession, AuthUser # noqa: F401
from app.models.public_ip import PublicIPHistory, PublicIPState # noqa: F401
config = context.config
@@ -0,0 +1,55 @@
"""public ip monitor tables
Revision ID: 20260429_05_public_ip_monitor
Revises: 20260420_04_app_config_table
Create Date: 2026-04-29 00:00:01.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "20260429_05_public_ip_monitor"
down_revision: Union[str, None] = "20260420_04_app_config_table"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"public_ip_history",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("ipv4", sa.String(length=45), nullable=False),
sa.Column("observed_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("change_type", sa.String(length=32), nullable=False),
sa.Column("provider", sa.String(length=64), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_public_ip_history_observed_at",
"public_ip_history",
["observed_at"],
unique=False,
)
op.create_table(
"public_ip_state",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("current_ipv4", sa.String(length=45), nullable=False),
sa.Column("previous_ipv4", sa.String(length=45), nullable=True),
sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_checked_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_changed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_check_status", sa.String(length=32), nullable=False),
sa.Column("last_check_error", sa.String(length=255), nullable=True),
sa.Column("last_provider", sa.String(length=64), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
def downgrade() -> None:
op.drop_table("public_ip_state")
op.drop_index("ix_public_ip_history_observed_at", table_name="public_ip_history")
op.drop_table("public_ip_history")
+32 -4
View File
@@ -6,7 +6,19 @@ from fastapi.responses import PlainTextResponse, Response
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_ticktick_client
from app.config import Settings
from app.dependencies import (
get_app_settings,
get_db,
get_homeassistant_client,
get_poo_db,
get_ticktick_client,
)
from app.integrations.homeassistant import (
HomeAssistantClient,
HomeAssistantConfigError,
HomeAssistantRequestError,
)
from app.integrations.ticktick import TickTickClient, TickTickConfigError, TickTickRequestError
from app.schemas.homeassistant import HomeAssistantPublishEnvelope
from app.services.homeassistant_inbound import (
@@ -24,13 +36,23 @@ INTERNAL_SERVER_ERROR_MESSAGE = "internal server error"
async def publish_from_homeassistant(
request: Request,
db: Session = Depends(get_db),
poo_db: Session = Depends(get_poo_db),
settings: Settings = Depends(get_app_settings),
homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client),
ticktick_client: TickTickClient = Depends(get_ticktick_client),
) -> Response:
try:
raw_payload = await request.body()
data = json.loads(raw_payload)
envelope = HomeAssistantPublishEnvelope.model_validate(data)
handle_homeassistant_message(db, envelope, ticktick_client)
handle_homeassistant_message(
db,
envelope,
ticktick_client=ticktick_client,
poo_session=poo_db,
settings=settings,
homeassistant_client=homeassistant_client,
)
except json.JSONDecodeError as exc:
logger.warning("Rejected Home Assistant publish request due to invalid JSON: %s", exc)
return PlainTextResponse(BAD_REQUEST_MESSAGE, status_code=status.HTTP_400_BAD_REQUEST)
@@ -45,8 +67,14 @@ async def publish_from_homeassistant(
INTERNAL_SERVER_ERROR_MESSAGE,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
except (TickTickConfigError, TickTickRequestError, RuntimeError) as exc:
logger.warning("Home Assistant publish request failed during TickTick handling: %s", exc)
except (
TickTickConfigError,
TickTickRequestError,
HomeAssistantConfigError,
HomeAssistantRequestError,
RuntimeError,
) as exc:
logger.warning("Home Assistant publish request failed during integration handling: %s", exc)
return PlainTextResponse(
INTERNAL_SERVER_ERROR_MESSAGE,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+135 -46
View File
@@ -14,6 +14,7 @@ from app.services.config_page import (
is_ticktick_oauth_ready,
save_config_updates,
)
from app.services.email import EmailConfigurationError, EmailDeliveryError, is_smtp_ready, send_smtp_test_email
from sqlalchemy.orm import Session
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[2] / "templates"))
@@ -33,6 +34,49 @@ def _ticktick_oauth_notice(status_value: str | None) -> tuple[str | None, str |
return None, None
def _smtp_test_notice(status_value: str | None) -> tuple[str | None, str | None]:
if status_value == "success":
return "SMTP test email sent successfully.", None
if status_value == "config-error":
return None, "SMTP test failed. Check required SMTP settings before sending a test email."
if status_value == "failed":
return None, "SMTP test failed. Check saved SMTP settings and server reachability."
return None, None
def _build_config_context(
*,
auth_db_session: Session,
settings: Settings,
current_auth: AuthenticatedSession,
config_saved: bool,
config_error: str | None,
password_change_error: str | None,
ticktick_oauth_notice: str | None,
ticktick_oauth_error: str | None,
smtp_test_notice: str | None,
smtp_test_error: str | None,
) -> dict[str, object]:
return {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": password_change_error,
"config_error": config_error,
"config_saved": config_saved,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": ticktick_oauth_notice,
"ticktick_oauth_error": ticktick_oauth_error,
"smtp_test_ready": is_smtp_ready(settings),
"smtp_test_notice": smtp_test_notice,
"smtp_test_error": smtp_test_error,
}
@router.get("/", response_class=HTMLResponse)
def home(
request: Request,
@@ -66,22 +110,19 @@ def config_page(
ticktick_oauth_notice, ticktick_oauth_error = _ticktick_oauth_notice(
request.query_params.get("ticktick_oauth")
)
context = {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": None,
"config_error": None,
"config_saved": request.query_params.get("saved") == "1",
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": ticktick_oauth_notice,
"ticktick_oauth_error": ticktick_oauth_error,
}
smtp_test_notice, smtp_test_error = _smtp_test_notice(request.query_params.get("smtp_test"))
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=request.query_params.get("saved") == "1",
config_error=None,
password_change_error=None,
ticktick_oauth_notice=ticktick_oauth_notice,
ticktick_oauth_error=ticktick_oauth_error,
smtp_test_notice=smtp_test_notice,
smtp_test_error=smtp_test_error,
)
return templates.TemplateResponse(request, "config.html", context)
@@ -99,21 +140,18 @@ async def config_submit(
csrf_token = form.get("csrf_token")
if csrf_token != current_auth.session.csrf_token:
logger.warning("Rejected config update due to CSRF validation failure")
context = {
"app_name": settings.app_name,
"app_env": settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": None,
"config_error": "invalid config update request",
"config_saved": False,
"config_sections": build_config_sections(auth_db_session, settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(settings),
"ticktick_redirect_uri": settings.ticktick_redirect_uri,
"ticktick_oauth_notice": None,
"ticktick_oauth_error": None,
}
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=False,
config_error="invalid config update request",
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error=None,
)
return templates.TemplateResponse(
request,
"config.html",
@@ -126,21 +164,18 @@ async def config_submit(
except ConfigSaveError:
logger.warning("Rejected config update due to invalid submitted values")
refreshed_settings = get_settings()
context = {
"app_name": refreshed_settings.app_name,
"app_env": refreshed_settings.app_env,
"current_username": current_auth.user.username,
"csrf_token": current_auth.session.csrf_token,
"force_password_change": current_auth.user.force_password_change,
"password_change_error": None,
"config_error": "invalid config submission",
"config_saved": False,
"config_sections": build_config_sections(auth_db_session, refreshed_settings),
"ticktick_oauth_ready": is_ticktick_oauth_ready(refreshed_settings),
"ticktick_redirect_uri": refreshed_settings.ticktick_redirect_uri,
"ticktick_oauth_notice": None,
"ticktick_oauth_error": None,
}
context = _build_config_context(
auth_db_session=auth_db_session,
settings=refreshed_settings,
current_auth=current_auth,
config_saved=False,
config_error="invalid config submission",
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error=None,
)
return templates.TemplateResponse(
request,
"config.html",
@@ -149,3 +184,57 @@ async def config_submit(
)
return RedirectResponse(url="/config?saved=1", status_code=status.HTTP_303_SEE_OTHER)
@router.post("/config/smtp/test", response_class=HTMLResponse)
async def smtp_test_submit(
request: Request,
auth_db_session: Session = Depends(get_auth_db),
settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response:
if current_auth is None:
return RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
form = await request.form()
csrf_token = form.get("csrf_token")
if csrf_token != current_auth.session.csrf_token:
logger.warning("Rejected SMTP test due to CSRF validation failure")
context = _build_config_context(
auth_db_session=auth_db_session,
settings=settings,
current_auth=current_auth,
config_saved=False,
config_error=None,
password_change_error=None,
ticktick_oauth_notice=None,
ticktick_oauth_error=None,
smtp_test_notice=None,
smtp_test_error="invalid SMTP test request",
)
return templates.TemplateResponse(
request,
"config.html",
context,
status_code=status.HTTP_400_BAD_REQUEST,
)
try:
send_smtp_test_email(settings)
except EmailConfigurationError as exc:
logger.warning("SMTP test email rejected due to configuration: %s", exc)
return RedirectResponse(
url="/config?smtp_test=config-error",
status_code=status.HTTP_303_SEE_OTHER,
)
except EmailDeliveryError as exc:
logger.warning("SMTP test email failed: %s", exc)
return RedirectResponse(
url="/config?smtp_test=failed",
status_code=status.HTTP_303_SEE_OTHER,
)
return RedirectResponse(
url="/config?smtp_test=success",
status_code=status.HTTP_303_SEE_OTHER,
)
+26
View File
@@ -0,0 +1,26 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.dependencies import get_auth_db, get_current_auth_session
from app.schemas.public_ip import PublicIPCheckResponse
from app.config import get_settings
from app.services.auth import AuthenticatedSession
from app.services.public_ip import check_public_ipv4_and_notify
router = APIRouter(tags=["public-ip"])
@router.get("/public-ip/check", response_model=PublicIPCheckResponse)
def run_public_ip_check(
session: Session = Depends(get_auth_db),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> PublicIPCheckResponse:
if current_auth is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="authentication required")
result = check_public_ipv4_and_notify(session, bootstrap_settings=get_settings())
return PublicIPCheckResponse(
status=result.status,
checked_at=result.checked_at,
changed=result.changed,
)
+9
View File
@@ -23,6 +23,15 @@ class Settings(BaseSettings):
home_assistant_auth_token: str = ""
home_assistant_timeout_seconds: float = 1.0
home_assistant_action_task_project_id: str = ""
smtp_enabled: bool = False
smtp_host: str = ""
smtp_port: int = 587
smtp_username: str = ""
smtp_password: str = ""
smtp_from_name: str = ""
smtp_from_address: str = ""
smtp_to_address: str = ""
smtp_use_starttls: bool = True
poo_webhook_id: str = ""
poo_sensor_entity_name: str = "sensor.test_poo_status"
poo_sensor_friendly_name: str = "Poo Status"
+25
View File
@@ -3,6 +3,8 @@ from pathlib import Path
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.orm import Session
from app import models # noqa: F401
@@ -12,15 +14,26 @@ import app.auth_db as auth_db
from app.api.routes.homeassistant import router as homeassistant_router
from app.api.routes.location import router as location_router
from app.api.routes.poo import router as poo_router
from app.api.routes.public_ip import router as public_ip_router
from app.api.routes.ticktick import router as ticktick_router
from app.config import get_settings
from app.services.auth import AuthBootstrapError, initialize_auth_schema
from app.services.config_page import seed_missing_config_from_bootstrap, sync_app_hostname_from_bootstrap
from app.services.public_ip import check_public_ipv4_and_notify
from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db
from scripts.location_db_adopt import LocationDatabaseAdoptionError, validate_location_runtime_db
from scripts.poo_db_adopt import PooDatabaseAdoptionError, validate_poo_runtime_db
def _run_scheduled_public_ip_check() -> None:
session_local = auth_db.get_auth_session_local()
session: Session = session_local()
try:
check_public_ipv4_and_notify(session, bootstrap_settings=get_settings())
finally:
session.close()
def ensure_auth_db_ready() -> None:
session_local = auth_db.get_auth_session_local()
session: Session = session_local()
@@ -72,7 +85,18 @@ async def lifespan(_: FastAPI):
ensure_auth_db_ready()
ensure_location_db_ready()
ensure_poo_db_ready()
scheduler = BackgroundScheduler(timezone="UTC")
scheduler.add_job(
_run_scheduled_public_ip_check,
trigger=IntervalTrigger(hours=4),
id="public-ip-check",
replace_existing=True,
max_instances=1,
coalesce=True,
)
scheduler.start()
yield
scheduler.shutdown(wait=False)
def create_app() -> FastAPI:
@@ -97,6 +121,7 @@ def create_app() -> FastAPI:
app.include_router(homeassistant_router)
app.include_router(location_router)
app.include_router(poo_router)
app.include_router(public_ip_router)
app.include_router(ticktick_router)
return app
+9 -1
View File
@@ -3,5 +3,13 @@
from app.models.auth import AuthSession, AuthUser
from app.models.config import AppConfigEntry
from app.models.location import Location
from app.models.public_ip import PublicIPHistory, PublicIPState
__all__ = ["AppConfigEntry", "AuthSession", "AuthUser", "Location"]
__all__ = [
"AppConfigEntry",
"AuthSession",
"AuthUser",
"Location",
"PublicIPHistory",
"PublicIPState",
]
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from sqlalchemy import DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from app.auth_db import AuthBase
class PublicIPState(AuthBase):
__tablename__ = "public_ip_state"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
current_ipv4: Mapped[str] = mapped_column(String(45), nullable=False)
previous_ipv4: Mapped[str | None] = mapped_column(String(45), nullable=True)
first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_checked_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_changed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
last_check_status: Mapped[str] = mapped_column(String(32), nullable=False)
last_check_error: Mapped[str | None] = mapped_column(String(255), nullable=True)
last_provider: Mapped[str | None] = mapped_column(String(64), nullable=True)
class PublicIPHistory(AuthBase):
__tablename__ = "public_ip_history"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ipv4: Mapped[str] = mapped_column(String(45), nullable=False)
observed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
change_type: Mapped[str] = mapped_column(String(32), nullable=False)
provider: Mapped[str | None] = mapped_column(String(64), nullable=True)
+13
View File
@@ -0,0 +1,13 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
PublicIPCheckStatus = Literal["first_seen", "unchanged", "changed", "error"]
class PublicIPCheckResponse(BaseModel):
status: PublicIPCheckStatus
checked_at: datetime
changed: bool
+18
View File
@@ -27,6 +27,15 @@ CONFIG_FIELDS: tuple[ConfigField, ...] = (
ConfigField("System", "APP_ENV", "app_env", "App Env"),
ConfigField("System", "APP_DEBUG", "app_debug", "App Debug"),
ConfigField("System", "APP_HOSTNAME", "app_hostname", "App Hostname"),
ConfigField("SMTP", "SMTP_ENABLED", "smtp_enabled", "SMTP Enabled"),
ConfigField("SMTP", "SMTP_HOST", "smtp_host", "SMTP Host"),
ConfigField("SMTP", "SMTP_PORT", "smtp_port", "SMTP Port"),
ConfigField("SMTP", "SMTP_USERNAME", "smtp_username", "SMTP Username"),
ConfigField("SMTP", "SMTP_PASSWORD", "smtp_password", "SMTP Password", secret=True),
ConfigField("SMTP", "SMTP_FROM_NAME", "smtp_from_name", "SMTP From Name"),
ConfigField("SMTP", "SMTP_FROM_ADDRESS", "smtp_from_address", "SMTP From Address"),
ConfigField("SMTP", "SMTP_TO_ADDRESS", "smtp_to_address", "SMTP To Address"),
ConfigField("SMTP", "SMTP_USE_STARTTLS", "smtp_use_starttls", "SMTP Use STARTTLS"),
ConfigField(
"Authentication",
"AUTH_SESSION_COOKIE_NAME",
@@ -260,6 +269,15 @@ def _settings_payload(settings: Settings) -> dict[str, Any]:
"home_assistant_auth_token": settings.home_assistant_auth_token,
"home_assistant_timeout_seconds": settings.home_assistant_timeout_seconds,
"home_assistant_action_task_project_id": settings.home_assistant_action_task_project_id,
"smtp_enabled": settings.smtp_enabled,
"smtp_host": settings.smtp_host,
"smtp_port": settings.smtp_port,
"smtp_username": settings.smtp_username,
"smtp_password": settings.smtp_password,
"smtp_from_name": settings.smtp_from_name,
"smtp_from_address": settings.smtp_from_address,
"smtp_to_address": settings.smtp_to_address,
"smtp_use_starttls": settings.smtp_use_starttls,
"poo_webhook_id": settings.poo_webhook_id,
"poo_sensor_entity_name": settings.poo_sensor_entity_name,
"poo_sensor_friendly_name": settings.poo_sensor_friendly_name,
+149
View File
@@ -0,0 +1,149 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from email.message import EmailMessage
from email.utils import formataddr
import smtplib
from app.config import Settings
class EmailConfigurationError(ValueError):
"""Raised when SMTP settings are incomplete or disabled."""
class EmailDeliveryError(RuntimeError):
"""Raised when sending email fails."""
@dataclass(frozen=True, slots=True)
class SMTPConfig:
host: str
port: int
username: str
password: str
from_name: str
from_address: str
to_address: str
use_starttls: bool
def get_smtp_config(settings: Settings, *, require_enabled: bool = True) -> SMTPConfig:
if require_enabled and not settings.smtp_enabled:
raise EmailConfigurationError("SMTP is disabled")
if not settings.smtp_host:
raise EmailConfigurationError("SMTP host is required")
if settings.smtp_port <= 0:
raise EmailConfigurationError("SMTP port must be greater than zero")
if not settings.smtp_from_address:
raise EmailConfigurationError("SMTP from address is required")
if not settings.smtp_to_address:
raise EmailConfigurationError("SMTP to address is required")
return SMTPConfig(
host=settings.smtp_host,
port=settings.smtp_port,
username=settings.smtp_username,
password=settings.smtp_password,
from_name=settings.smtp_from_name,
from_address=settings.smtp_from_address,
to_address=settings.smtp_to_address,
use_starttls=settings.smtp_use_starttls,
)
def is_smtp_ready(settings: Settings) -> bool:
try:
get_smtp_config(settings, require_enabled=False)
except EmailConfigurationError:
return False
return True
def send_plaintext_email(
settings: Settings,
*,
subject: str,
body: str,
recipient: str | None = None,
require_enabled: bool = True,
) -> None:
smtp_config = get_smtp_config(settings, require_enabled=require_enabled)
message = EmailMessage()
message["Subject"] = subject
message["From"] = _build_from_header(smtp_config)
message["To"] = recipient or smtp_config.to_address
message.set_content(body)
try:
with smtplib.SMTP(smtp_config.host, smtp_config.port, timeout=10) as smtp:
smtp.ehlo()
if smtp_config.use_starttls:
smtp.starttls()
smtp.ehlo()
if smtp_config.username:
smtp.login(smtp_config.username, smtp_config.password)
smtp.send_message(
message,
from_addr=smtp_config.from_address,
to_addrs=[recipient or smtp_config.to_address],
)
except (OSError, smtplib.SMTPException) as exc:
error_message = _sanitize_error_message(str(exc), smtp_config.password)
raise EmailDeliveryError(error_message or "SMTP delivery failed") from exc
def send_smtp_test_email(settings: Settings) -> None:
send_plaintext_email(
settings,
subject="Home Automation SMTP Test",
body="This is a test email from Home Automation SMTP settings.",
require_enabled=False,
)
def send_public_ip_changed_email(
settings: Settings,
*,
previous_ipv4: str,
current_ipv4: str,
detected_at: datetime,
) -> None:
send_plaintext_email(
settings,
subject="Public IP changed",
body=(
"Your public IPv4 address has changed.\n\n"
f"Previous IP: {previous_ipv4}\n"
f"Current IP: {current_ipv4}\n"
f"Detected at: {_format_utc_timestamp(detected_at)}\n\n"
"If you use Namecheap API trusted IP restrictions, you may need to "
"update the trusted IP manually.\n"
),
)
def _sanitize_error_message(message: str, password: str) -> str:
sanitized = message
if password:
sanitized = sanitized.replace(password, "[redacted]")
return sanitized
def _format_utc_timestamp(value: datetime) -> str:
if value.tzinfo is None:
normalized = value.replace(tzinfo=UTC)
else:
normalized = value.astimezone(UTC)
return normalized.strftime("%Y-%m-%d %H:%M:%S UTC")
def _build_from_header(smtp_config: SMTPConfig) -> str:
if smtp_config.from_name:
return formataddr((smtp_config.from_name, smtp_config.from_address))
return smtp_config.from_address
+37
View File
@@ -4,11 +4,14 @@ import json
from datetime import UTC, datetime, time, timedelta
from sqlalchemy.orm import Session
from app.config import Settings
from app.integrations.homeassistant import HomeAssistantClient
from app.integrations.ticktick import TICKTICK_DATETIME_FORMAT, TickTickClient, TickTickTask
from app.schemas.homeassistant import HomeAssistantPublishEnvelope
from app.schemas.location import LocationRecordRequest
from app.schemas.ticktick import TickTickActionTaskRequest
from app.services.location import record_location
from app.services.poo import publish_latest_poo_status
class UnsupportedHomeAssistantMessage(RuntimeError):
@@ -19,11 +22,23 @@ def handle_homeassistant_message(
session: Session,
envelope: HomeAssistantPublishEnvelope,
ticktick_client: TickTickClient | None = None,
poo_session: Session | None = None,
settings: Settings | None = None,
homeassistant_client: HomeAssistantClient | None = None,
) -> None:
if envelope.target == "location_recorder":
_handle_location_message(session, envelope)
return
if envelope.target == "poo_recorder":
_handle_poo_message(
envelope,
poo_session=poo_session,
settings=settings,
homeassistant_client=homeassistant_client,
)
return
if envelope.target == "ticktick":
_handle_ticktick_message(envelope, ticktick_client)
return
@@ -44,6 +59,28 @@ def _handle_location_message(session: Session, envelope: HomeAssistantPublishEnv
record_location(session, payload)
def _handle_poo_message(
envelope: HomeAssistantPublishEnvelope,
*,
poo_session: Session | None,
settings: Settings | None,
homeassistant_client: HomeAssistantClient | None,
) -> None:
if envelope.action != "get_latest":
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
if poo_session is None or settings is None or homeassistant_client is None:
raise RuntimeError("Poo recorder integration is unavailable")
publish_latest_poo_status(
session=poo_session,
settings=settings,
homeassistant_client=homeassistant_client,
)
def _handle_ticktick_message(
envelope: HomeAssistantPublishEnvelope,
ticktick_client: TickTickClient | None,
+191
View File
@@ -0,0 +1,191 @@
from __future__ import annotations
import ipaddress
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Callable, Literal
import httpx
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
from app.models.public_ip import PublicIPHistory, PublicIPState
from app.services.config_page import build_runtime_settings
from app.services.email import EmailConfigurationError, EmailDeliveryError, send_public_ip_changed_email
logger = logging.getLogger(__name__)
PUBLIC_IP_PROVIDER_NAME = "ipify"
PUBLIC_IP_PROVIDER_URL = "https://api.ipify.org"
PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS = 5.0
PublicIPResultStatus = Literal["first_seen", "unchanged", "changed", "error"]
PublicIPv4Fetcher = Callable[[], str]
class PublicIPCheckError(RuntimeError):
"""Raised when the public IPv4 provider cannot return a valid IPv4."""
@dataclass(slots=True)
class PublicIPCheckResult:
status: PublicIPResultStatus
checked_at: datetime
changed: bool
previous_ipv4: str | None = None
current_ipv4: str | None = None
def check_public_ipv4(
session: Session,
*,
fetch_public_ipv4: PublicIPv4Fetcher | None = None,
provider_name: str = PUBLIC_IP_PROVIDER_NAME,
) -> PublicIPCheckResult:
checked_at = _utc_now()
state = session.scalar(select(PublicIPState).where(PublicIPState.id == 1).limit(1))
try:
raw_ipv4 = (fetch_public_ipv4 or fetch_public_ipv4_from_provider)()
current_ipv4 = _validate_ipv4(raw_ipv4)
except PublicIPCheckError as exc:
logger.warning("Public IPv4 check failed: %s", exc)
if state is not None:
state.last_checked_at = checked_at
state.last_check_status = "error"
state.last_check_error = str(exc)
state.last_provider = provider_name
session.commit()
return PublicIPCheckResult(status="error", checked_at=checked_at, changed=False)
if state is None:
state = PublicIPState(
id=1,
current_ipv4=current_ipv4,
previous_ipv4=None,
first_seen_at=checked_at,
last_checked_at=checked_at,
last_changed_at=None,
last_check_status="first_seen",
last_check_error=None,
last_provider=provider_name,
)
session.add(state)
session.add(
PublicIPHistory(
ipv4=current_ipv4,
observed_at=checked_at,
change_type="first_seen",
provider=provider_name,
)
)
session.commit()
return PublicIPCheckResult(
status="first_seen",
checked_at=checked_at,
changed=False,
current_ipv4=current_ipv4,
)
if state.current_ipv4 == current_ipv4:
state.last_checked_at = checked_at
state.last_check_status = "unchanged"
state.last_check_error = None
state.last_provider = provider_name
session.commit()
return PublicIPCheckResult(
status="unchanged",
checked_at=checked_at,
changed=False,
current_ipv4=current_ipv4,
)
previous_ipv4 = state.current_ipv4
state.previous_ipv4 = previous_ipv4
state.current_ipv4 = current_ipv4
state.last_checked_at = checked_at
state.last_changed_at = checked_at
state.last_check_status = "changed"
state.last_check_error = None
state.last_provider = provider_name
session.add(
PublicIPHistory(
ipv4=current_ipv4,
observed_at=checked_at,
change_type="changed",
provider=provider_name,
)
)
session.commit()
return PublicIPCheckResult(
status="changed",
checked_at=checked_at,
changed=True,
previous_ipv4=previous_ipv4,
current_ipv4=current_ipv4,
)
def check_public_ipv4_and_notify(
session: Session,
*,
bootstrap_settings: Settings,
fetch_public_ipv4: PublicIPv4Fetcher | None = None,
provider_name: str = PUBLIC_IP_PROVIDER_NAME,
) -> PublicIPCheckResult:
result = check_public_ipv4(
session,
fetch_public_ipv4=fetch_public_ipv4,
provider_name=provider_name,
)
if result.status != "changed" or result.previous_ipv4 is None or result.current_ipv4 is None:
return result
runtime_settings = build_runtime_settings(session, bootstrap_settings)
try:
send_public_ip_changed_email(
runtime_settings,
previous_ipv4=result.previous_ipv4,
current_ipv4=result.current_ipv4,
detected_at=result.checked_at,
)
except (EmailConfigurationError, EmailDeliveryError) as exc:
logger.warning("Public IPv4 change notification failed: %s", exc)
return result
def fetch_public_ipv4_from_provider() -> str:
try:
response = httpx.get(
PUBLIC_IP_PROVIDER_URL,
params={"format": "text"},
timeout=PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS,
)
response.raise_for_status()
except httpx.HTTPError as exc:
raise PublicIPCheckError(f"provider request failed: {exc}") from exc
return response.text.strip()
def _validate_ipv4(raw_value: str) -> str:
if not raw_value:
raise PublicIPCheckError("provider returned an empty response")
try:
parsed = ipaddress.ip_address(raw_value)
except ValueError as exc:
raise PublicIPCheckError("provider returned an invalid IPv4 value") from exc
if parsed.version != 4:
raise PublicIPCheckError("provider returned a non-IPv4 value")
return str(parsed)
def _utc_now() -> datetime:
return datetime.now(UTC)
+22
View File
@@ -33,6 +33,14 @@
<div class="notice">{{ ticktick_oauth_notice }}</div>
{% endif %}
{% if smtp_test_error %}
<div class="alert">{{ smtp_test_error }}</div>
{% endif %}
{% if smtp_test_notice %}
<div class="notice">{{ smtp_test_notice }}</div>
{% endif %}
<div class="meta single-column">
<div>
<dt>当前用户</dt>
@@ -102,6 +110,20 @@
{% endif %}
</div>
{% endif %}
{% if section.name == "SMTP" %}
<div class="integration-action-row">
<div>
<p class="integration-action-title">SMTP Test Email</p>
<p class="integration-action-copy">Save the SMTP settings first, then send a simple plaintext test email to the configured recipient.</p>
</div>
{% if smtp_test_ready %}
<button type="submit" formaction="/config/smtp/test" formmethod="post">Send SMTP Test</button>
{% else %}
<span class="button-link disabled" aria-disabled="true">Send SMTP Test</span>
{% endif %}
</div>
{% endif %}
</fieldset>
{% endfor %}
+13 -7
View File
@@ -8,15 +8,17 @@ alembic==1.18.4
# via -r requirements.in
annotated-types==0.7.0
# via pydantic
argon2-cffi==25.1.0
# via -r requirements.in
argon2-cffi-bindings==25.1.0
# via argon2-cffi
anyio==4.13.0
# via
# httpx
# starlette
# watchfiles
apscheduler==3.11.2
# via -r requirements.in
argon2-cffi==25.1.0
# via -r requirements.in
argon2-cffi-bindings==25.1.0
# via argon2-cffi
build==1.4.3
# via pip-tools
certifi==2026.2.25
@@ -42,7 +44,9 @@ httpcore==1.0.9
httptools==0.7.1
# via uvicorn
httpx==0.28.1
# via -r dev-requirements.in
# via
# -r dev-requirements.in
# -r requirements.in
idna==3.11
# via
# anyio
@@ -66,6 +70,8 @@ pip-tools==7.5.3
# via -r dev-requirements.in
pluggy==1.6.0
# via pytest
pycparser==2.23
# via cffi
pydantic==2.13.2
# via
# fastapi
@@ -88,8 +94,6 @@ python-dotenv==1.2.2
# uvicorn
python-multipart==0.0.26
# via -r requirements.in
pycparser==2.23
# via cffi
pyyaml==6.0.3
# via
# -r requirements.in
@@ -112,6 +116,8 @@ typing-inspection==0.4.2
# via
# pydantic
# pydantic-settings
tzlocal==5.3.1
# via apscheduler
uvicorn[standard]==0.44.0
# via -r requirements.in
uvloop==0.22.1
+6
View File
@@ -0,0 +1,6 @@
services:
migration:
build: .
app:
build: .
+18 -1
View File
@@ -1,10 +1,24 @@
services:
migration:
container_name: home-automation-migration
image: code.wanderingbadger.dev/tliu93/home-automation:latest
user: "1000:1000"
restart: "no"
init: true
command: ["python", "-m", "scripts.run_migrations"]
volumes:
- ./data:/app/data
- ./.env:/app/.env:ro
app:
container_name: home-automation-app
build: .
image: code.wanderingbadger.dev/tliu93/home-automation:latest
user: "1000:1000"
restart: unless-stopped
init: true
depends_on:
migration:
condition: service_completed_successfully
ports:
- "127.0.0.1:8881:8000"
volumes:
@@ -23,7 +37,10 @@ services:
GF_PLUGINS_PREINSTALL: frser-sqlite-datasource
volumes:
- ./data:/data/home-automation:ro
- ./grafana/provisioning:/etc/grafana/provisioning:ro
- ./grafana/dashboards:/var/lib/grafana/dashboards:ro
- homeautomation_grafana_storage:/var/lib/grafana
volumes:
homeautomation_grafana_storage:
name: homeautomation_grafana_storage
+1 -5
View File
@@ -2,8 +2,4 @@
set -eu
python scripts/app_db_adopt.py
python scripts/location_db_adopt.py
python scripts/poo_db_adopt.py
exec uvicorn app.main:app --host 0.0.0.0 --port 8000
exec "$@"
+288
View File
@@ -0,0 +1,288 @@
{
"apiVersion": "dashboard.grafana.app/v2",
"kind": "Dashboard",
"metadata": {
"name": "adzr6rv",
"namespace": "default",
"uid": "c5fc57e5-7fb5-4104-9861-023710ada568",
"resourceVersion": "1776634346371016",
"generation": 19,
"creationTimestamp": "2026-04-18T19:05:57Z",
"labels": {
"grafana.app/deprecatedInternalID": "945374452785152"
},
"annotations": {
"grafana.app/createdBy": "user:ffjhknvgkvhtsc",
"grafana.app/folder": "",
"grafana.app/saved-from-ui": "Grafana v13.0.1 (a100054f)",
"grafana.app/updatedBy": "user:ffjhknvgkvhtsc",
"grafana.app/updatedTimestamp": "2026-04-19T21:32:26Z"
}
},
"spec": {
"annotations": [
{
"kind": "AnnotationQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "grafana",
"version": "v0",
"datasource": {
"name": "-- Grafana --"
},
"spec": {}
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"builtIn": true
}
}
],
"cursorSync": "Off",
"editable": true,
"elements": {
"panel-1": {
"kind": "Panel",
"spec": {
"id": 1,
"title": "轨迹",
"description": "",
"links": [],
"data": {
"kind": "QueryGroup",
"spec": {
"queries": [
{
"kind": "PanelQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "frser-sqlite-datasource",
"version": "v0",
"datasource": {
"name": "ffjhr941d5iwwf"
},
"spec": {
"queryText": "SELECT\n datetime AS time,\n latitude,\n longitude,\n altitude\nFROM location\nWHERE person = 'Jiangxue'\n AND datetime >= '2021-04-19T21:29:57.036Z'\n AND datetime <= '2026-04-19T21:29:57.036Z'\n AND latitude != 0\n AND longitude != 0\nORDER BY datetime;\n",
"queryType": "table",
"rawQueryText": "SELECT\n datetime AS time,\n latitude,\n longitude,\n altitude\nFROM location\nWHERE person = '$person'\n AND datetime >= '${__from:date:iso}'\n AND datetime <= '${__to:date:iso}'\n AND latitude != 0\n AND longitude != 0\nORDER BY datetime;\n",
"timeColumns": [
"time",
"ts"
]
}
},
"refId": "A",
"hidden": false
}
}
],
"transformations": [],
"queryOptions": {}
}
},
"vizConfig": {
"kind": "VizConfig",
"group": "geomap",
"version": "13.0.1",
"spec": {
"options": {
"basemap": {
"config": {
"server": "streets"
},
"name": "Layer 0",
"noRepeat": false,
"type": "default"
},
"controls": {
"mouseWheelZoom": true,
"showAttribution": true,
"showDebug": false,
"showMeasure": false,
"showScale": false,
"showZoom": true
},
"layers": [
{
"config": {
"showLegend": false,
"style": {
"color": {
"fixed": "blue"
},
"opacity": 0.7,
"rotation": {
"fixed": 0,
"max": 360,
"min": -360,
"mode": "mod"
},
"size": {
"fixed": 3,
"max": 15,
"min": 2
},
"symbol": {
"fixed": "img/icons/marker/circle.svg",
"mode": "fixed"
},
"symbolAlign": {
"horizontal": "center",
"vertical": "center"
},
"textConfig": {
"fontSize": 12,
"offsetX": 0,
"offsetY": 0,
"textAlign": "center",
"textBaseline": "middle"
}
}
},
"layer-tooltip": true,
"name": "path",
"tooltip": true,
"type": "markers"
}
],
"tooltip": {
"mode": "details"
},
"view": {
"allLayers": true,
"dashboardVariable": false,
"id": "fit",
"lat": 0,
"lon": 0,
"noRepeat": false,
"shared": false,
"zoom": 15
}
},
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{
"value": 0,
"color": "green"
}
]
},
"color": {
"mode": "thresholds"
},
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
}
}
},
"overrides": []
}
}
}
}
}
},
"layout": {
"kind": "GridLayout",
"spec": {
"items": [
{
"kind": "GridLayoutItem",
"spec": {
"x": 0,
"y": 0,
"width": 24,
"height": 18,
"element": {
"kind": "ElementReference",
"name": "panel-1"
}
}
}
]
}
},
"links": [],
"liveNow": false,
"preload": false,
"tags": [],
"timeSettings": {
"timezone": "browser",
"from": "now-5y",
"to": "now",
"autoRefresh": "",
"autoRefreshIntervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"hideTimepicker": false,
"fiscalYearStartMonth": 0
},
"title": "轨迹",
"variables": [
{
"kind": "QueryVariable",
"spec": {
"name": "person",
"current": {
"text": "Jiangxue",
"value": "Jiangxue"
},
"label": "person",
"hide": "dontHide",
"refresh": "onDashboardLoad",
"skipUrlSync": false,
"description": "",
"query": {
"kind": "DataQuery",
"group": "frser-sqlite-datasource",
"version": "v0",
"datasource": {
"name": "ffjhr941d5iwwf"
},
"spec": {
"__legacyStringValue": "SELECT DISTINCT person\nFROM location\nORDER BY person;\n"
}
},
"regex": "",
"regexApplyTo": "value",
"sort": "disabled",
"definition": "SELECT DISTINCT person\nFROM location\nORDER BY person;\n",
"options": [],
"multi": false,
"includeAll": false,
"allowCustomValue": true
}
}
],
"preferences": {
"layout": {
"kind": "AutoGridLayout",
"spec": {
"maxColumnCount": 3,
"columnWidthMode": "standard",
"rowHeightMode": "standard",
"items": []
}
}
}
}
}
+231
View File
@@ -0,0 +1,231 @@
{
"apiVersion": "dashboard.grafana.app/v2",
"kind": "Dashboard",
"metadata": {
"name": "adl5sjt",
"namespace": "default",
"uid": "d4c72406-9fc5-4b85-844b-be1250f1fa8b",
"resourceVersion": "1776606363367013",
"generation": 6,
"creationTimestamp": "2026-04-18T20:07:34Z",
"labels": {
"grafana.app/deprecatedInternalID": "960882027798528"
},
"annotations": {
"grafana.app/createdBy": "user:ffjhknvgkvhtsc",
"grafana.app/folder": "",
"grafana.app/saved-from-ui": "Grafana v13.0.1 (a100054f)",
"grafana.app/updatedBy": "user:ffjhknvgkvhtsc",
"grafana.app/updatedTimestamp": "2026-04-19T13:46:03Z"
}
},
"spec": {
"annotations": [
{
"kind": "AnnotationQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "grafana",
"version": "v0",
"datasource": {
"name": "-- Grafana --"
},
"spec": {}
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"builtIn": true
}
}
],
"cursorSync": "Off",
"editable": true,
"elements": {
"panel-1": {
"kind": "Panel",
"spec": {
"id": 1,
"title": "Mika Poo",
"description": "Mika's poo",
"links": [],
"data": {
"kind": "QueryGroup",
"spec": {
"queries": [
{
"kind": "PanelQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "frser-sqlite-datasource",
"version": "v0",
"datasource": {
"name": "ffjhkuu4hc3y8e"
},
"spec": {
"queryText": "SELECT\n latitude,\n longitude,\n timestamp\nFROM poo_records\nWHERE timestamp >= '${__from:date:iso}'\n AND timestamp <= '${__to:date:iso}'\n AND latitude != 0\n AND longitude != 0\nORDER BY timestamp;\n",
"queryType": "table",
"rawQueryText": "SELECT\n latitude,\n longitude,\n timestamp\nFROM poo_records\nWHERE timestamp >= '${__from:date:iso}'\n AND timestamp <= '${__to:date:iso}'\n AND latitude != 0\n AND longitude != 0\nORDER BY timestamp;\n",
"timeColumns": [
"time",
"ts"
]
}
},
"refId": "A",
"hidden": false
}
}
],
"transformations": [],
"queryOptions": {}
}
},
"vizConfig": {
"kind": "VizConfig",
"group": "geomap",
"version": "13.0.1",
"spec": {
"options": {
"basemap": {
"config": {},
"name": "Layer 0",
"noRepeat": false,
"type": "default"
},
"controls": {
"mouseWheelZoom": true,
"showAttribution": true,
"showDebug": false,
"showMeasure": false,
"showScale": false,
"showZoom": true
},
"layers": [
{
"config": {
"blur": 15,
"radius": 5,
"weight": {
"fixed": 1,
"max": 1,
"min": 0
}
},
"filterData": {
"id": "byRefId",
"options": "A"
},
"location": {
"mode": "auto"
},
"name": "Poo",
"tooltip": true,
"type": "heatmap"
}
],
"tooltip": {
"mode": "details"
},
"view": {
"allLayers": true,
"dashboardVariable": false,
"id": "zero",
"lat": 0,
"lon": 0,
"noRepeat": false,
"zoom": 1
}
},
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{
"value": 0,
"color": "green"
},
{
"value": 80,
"color": "red"
}
]
},
"color": {
"mode": "thresholds"
},
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
}
}
},
"overrides": []
}
}
}
}
}
},
"layout": {
"kind": "GridLayout",
"spec": {
"items": [
{
"kind": "GridLayoutItem",
"spec": {
"x": 0,
"y": 0,
"width": 24,
"height": 19,
"element": {
"kind": "ElementReference",
"name": "panel-1"
}
}
}
]
}
},
"links": [],
"liveNow": false,
"preload": false,
"tags": [],
"timeSettings": {
"timezone": "browser",
"from": "now-5y",
"to": "now",
"autoRefresh": "",
"autoRefreshIntervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"hideTimepicker": false,
"fiscalYearStartMonth": 0
},
"title": "Mika Poo",
"variables": [],
"preferences": {
"layout": {
"kind": "GridLayout",
"spec": {
"items": []
}
}
}
}
}
@@ -0,0 +1,13 @@
apiVersion: 1
providers:
- name: home-automation-dashboards
orgId: 1
folder: ""
type: file
disableDeletion: false
allowUiUpdates: false
updateIntervalSeconds: 30
options:
path: /var/lib/grafana/dashboards
foldersFromFilesStructure: false
@@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: locationrecorder
uid: ffjhr941d5iwwf
type: frser-sqlite-datasource
access: proxy
isDefault: false
editable: false
jsonData:
path: /data/home-automation/locationRecorder.db
@@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: poorecorder
uid: ffjhkuu4hc3y8e
type: frser-sqlite-datasource
access: proxy
isDefault: false
editable: false
jsonData:
path: /data/home-automation/pooRecorder.db
+2
View File
@@ -1,6 +1,8 @@
alembic>=1.14,<2.0
apscheduler>=3.10,<4.0
argon2-cffi>=25.1,<26.0
fastapi>=0.115,<0.116
httpx>=0.28,<1.0
jinja2>=3.1,<4.0
pydantic-settings>=2.6,<3.0
python-multipart>=0.0.12,<1.0
+24 -7
View File
@@ -8,14 +8,21 @@ alembic==1.18.4
# via -r requirements.in
annotated-types==0.7.0
# via pydantic
anyio==4.13.0
# via
# httpx
# starlette
# watchfiles
apscheduler==3.11.2
# via -r requirements.in
argon2-cffi==25.1.0
# via -r requirements.in
argon2-cffi-bindings==25.1.0
# via argon2-cffi
anyio==4.13.0
certifi==2026.4.22
# via
# starlette
# watchfiles
# httpcore
# httpx
cffi==2.0.0
# via argon2-cffi-bindings
click==8.3.2
@@ -25,11 +32,19 @@ fastapi==0.115.14
greenlet==3.4.0
# via sqlalchemy
h11==0.16.0
# via uvicorn
# via
# httpcore
# uvicorn
httpcore==1.0.9
# via httpx
httptools==0.7.1
# via uvicorn
httpx==0.28.1
# via -r requirements.in
idna==3.11
# via anyio
# via
# anyio
# httpx
jinja2==3.1.6
# via -r requirements.in
mako==1.3.11
@@ -38,6 +53,8 @@ markupsafe==3.0.3
# via
# jinja2
# mako
pycparser==2.23
# via cffi
pydantic==2.13.2
# via
# fastapi
@@ -52,8 +69,6 @@ python-dotenv==1.2.2
# uvicorn
python-multipart==0.0.26
# via -r requirements.in
pycparser==2.23
# via cffi
pyyaml==6.0.3
# via
# -r requirements.in
@@ -76,6 +91,8 @@ typing-inspection==0.4.2
# via
# pydantic
# pydantic-settings
tzlocal==5.3.1
# via apscheduler
uvicorn[standard]==0.44.0
# via -r requirements.in
uvloop==0.22.1
+32 -4
View File
@@ -6,6 +6,8 @@ from pathlib import Path
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.util.exc import CommandError
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
@@ -13,7 +15,7 @@ if str(PROJECT_ROOT) not in sys.path:
from app.config import get_settings
APP_BASELINE_REVISION = "20260420_04_app_config_table"
APP_BASELINE_REVISION = "20260429_05_public_ip_monitor"
class AppDatabaseAdoptionError(RuntimeError):
@@ -35,6 +37,24 @@ def _make_alembic_config(database_url: str) -> Config:
return config
def _expected_head_revision(alembic_config: Config) -> str:
script = ScriptDirectory.from_config(alembic_config)
heads = script.get_heads()
if len(heads) != 1:
raise AppDatabaseAdoptionError(
f"Expected exactly one Alembic head for app DB, got {len(heads)}"
)
return heads[0]
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
script = ScriptDirectory.from_config(alembic_config)
try:
return script.get_revision(revision) is not None
except CommandError:
return False
def _alembic_version_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
@@ -75,6 +95,8 @@ def _list_user_tables(database_path: Path) -> list[str]:
def validate_app_runtime_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if not database_path.exists():
raise AppDatabaseAdoptionError(
"App DB file was not found. Run 'python scripts/app_db_adopt.py' first to "
@@ -88,22 +110,28 @@ def validate_app_runtime_db(database_url: str) -> None:
)
current_revision = _fetch_alembic_revision(database_path)
if current_revision != APP_BASELINE_REVISION:
if current_revision != expected_revision:
raise AppDatabaseAdoptionError(
"App DB revision mismatch. Refusing to start the app: "
f"expected {APP_BASELINE_REVISION}, got {current_revision}"
f"expected {expected_revision}, got {current_revision}"
)
def adopt_or_initialize_app_db(database_url: str) -> str:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if database_path.exists():
if _alembic_version_table_exists(database_path):
current_revision = _fetch_alembic_revision(database_path)
if current_revision == APP_BASELINE_REVISION:
if current_revision == expected_revision:
return "already_managed"
if not _is_known_revision(alembic_config, current_revision):
raise AppDatabaseAdoptionError(
"App DB is already Alembic-managed but revision does not match "
f"a known migration revision: got {current_revision}"
)
command.upgrade(alembic_config, "head")
return "upgraded"
+34 -6
View File
@@ -6,6 +6,8 @@ from pathlib import Path
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.util.exc import CommandError
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
@@ -43,6 +45,24 @@ def _make_alembic_config(database_url: str) -> Config:
return config
def _expected_head_revision(alembic_config: Config) -> str:
script = ScriptDirectory.from_config(alembic_config)
heads = script.get_heads()
if len(heads) != 1:
raise LocationDatabaseAdoptionError(
f"Expected exactly one Alembic head for location DB, got {len(heads)}"
)
return heads[0]
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
script = ScriptDirectory.from_config(alembic_config)
try:
return script.get_revision(revision) is not None
except CommandError:
return False
def _location_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
@@ -117,6 +137,8 @@ def validate_legacy_location_db(database_url: str) -> None:
def validate_location_runtime_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if not database_path.exists():
raise LocationDatabaseAdoptionError(
"Location DB file was not found. Run 'python scripts/location_db_adopt.py' "
@@ -131,30 +153,36 @@ def validate_location_runtime_db(database_url: str) -> None:
)
current_revision = _fetch_alembic_revision(database_path)
if current_revision != LOCATION_BASELINE_REVISION:
if current_revision != expected_revision:
raise LocationDatabaseAdoptionError(
"Location DB revision mismatch. Refusing to start the app: "
f"expected {LOCATION_BASELINE_REVISION}, got {current_revision}"
f"expected {expected_revision}, got {current_revision}"
)
def adopt_or_initialize_location_db(database_url: str) -> str:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if database_path.exists():
if _alembic_version_table_exists(database_path):
current_revision = _fetch_alembic_revision(database_path)
if current_revision != LOCATION_BASELINE_REVISION:
if current_revision == expected_revision:
return "already_managed"
if not _is_known_revision(alembic_config, current_revision):
raise LocationDatabaseAdoptionError(
"Location DB is already Alembic-managed but revision does not match "
f"the expected baseline: expected {LOCATION_BASELINE_REVISION}, "
f"got {current_revision}"
f"a known migration revision: got {current_revision}"
)
return "already_managed"
command.upgrade(alembic_config, "head")
return "upgraded"
validate_legacy_location_db(database_url)
command.stamp(alembic_config, LOCATION_BASELINE_REVISION)
if LOCATION_BASELINE_REVISION != expected_revision:
command.upgrade(alembic_config, "head")
return "upgraded"
return "adopted"
database_path.parent.mkdir(parents=True, exist_ok=True)
+34 -6
View File
@@ -6,6 +6,8 @@ from pathlib import Path
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.util.exc import CommandError
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
@@ -42,6 +44,24 @@ def _make_alembic_config(database_url: str) -> Config:
return config
def _expected_head_revision(alembic_config: Config) -> str:
script = ScriptDirectory.from_config(alembic_config)
heads = script.get_heads()
if len(heads) != 1:
raise PooDatabaseAdoptionError(
f"Expected exactly one Alembic head for poo DB, got {len(heads)}"
)
return heads[0]
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
script = ScriptDirectory.from_config(alembic_config)
try:
return script.get_revision(revision) is not None
except CommandError:
return False
def _poo_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
@@ -112,6 +132,8 @@ def validate_legacy_poo_db(database_url: str) -> None:
def validate_poo_runtime_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if not database_path.exists():
raise PooDatabaseAdoptionError(
"Poo DB file was not found. Run 'python scripts/poo_db_adopt.py' first to "
@@ -126,30 +148,36 @@ def validate_poo_runtime_db(database_url: str) -> None:
)
current_revision = _fetch_alembic_revision(database_path)
if current_revision != POO_BASELINE_REVISION:
if current_revision != expected_revision:
raise PooDatabaseAdoptionError(
"Poo DB revision mismatch. Refusing to start the app: "
f"expected {POO_BASELINE_REVISION}, got {current_revision}"
f"expected {expected_revision}, got {current_revision}"
)
def adopt_or_initialize_poo_db(database_url: str) -> str:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if database_path.exists():
if _alembic_version_table_exists(database_path):
current_revision = _fetch_alembic_revision(database_path)
if current_revision != POO_BASELINE_REVISION:
if current_revision == expected_revision:
return "already_managed"
if not _is_known_revision(alembic_config, current_revision):
raise PooDatabaseAdoptionError(
"Poo DB is already Alembic-managed but revision does not match "
f"the expected baseline: expected {POO_BASELINE_REVISION}, "
f"got {current_revision}"
f"a known migration revision: got {current_revision}"
)
return "already_managed"
command.upgrade(alembic_config, "head")
return "upgraded"
validate_legacy_poo_db(database_url)
command.stamp(alembic_config, POO_BASELINE_REVISION)
if POO_BASELINE_REVISION != expected_revision:
command.upgrade(alembic_config, "head")
return "upgraded"
return "adopted"
database_path.parent.mkdir(parents=True, exist_ok=True)
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from app.config import get_settings
from scripts.app_db_adopt import adopt_or_initialize_app_db
from scripts.location_db_adopt import adopt_or_initialize_location_db
from scripts.poo_db_adopt import adopt_or_initialize_poo_db
def run_all_migrations() -> dict[str, str]:
settings = get_settings()
return {
"app": adopt_or_initialize_app_db(settings.app_database_url),
"location": adopt_or_initialize_location_db(settings.location_database_url),
"poo": adopt_or_initialize_poo_db(settings.poo_database_url),
}
def main() -> None:
results = run_all_migrations()
for database_name, result in results.items():
print(f"{database_name}: {result}")
if __name__ == "__main__":
main()
+4 -1
View File
@@ -37,12 +37,13 @@ def test_status_endpoint(client: TestClient) -> None:
def test_app_start_fails_when_app_db_missing(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None:
missing_app_path = tmp_path / "missing_app.db"
poo_database_path = tmp_path / "poo_ready.db"
location_database_path = tmp_path / "location_ready.db"
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
command.upgrade(_make_alembic_config(f"sqlite:///{location_database_path}"), "head")
monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{tmp_path / 'missing_app.db'}")
monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{missing_app_path}")
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_database_path}")
@@ -54,6 +55,8 @@ def test_app_start_fails_when_app_db_missing(tmp_path, monkeypatch: pytest.Monke
with pytest.raises(RuntimeError, match="Run 'python scripts/app_db_adopt.py' first"):
anyio.run(_run_lifespan, app)
assert not missing_app_path.exists()
get_settings.cache_clear()
reset_auth_db_caches()
+24
View File
@@ -53,3 +53,27 @@ def test_settings_derive_development_ticktick_redirect_uri(monkeypatch) -> None:
assert settings.app_base_url == "http://localhost:11001"
assert settings.ticktick_redirect_uri == "http://localhost:11001/ticktick/auth/code"
def test_settings_support_smtp_fields(monkeypatch) -> None:
monkeypatch.setenv("SMTP_ENABLED", "true")
monkeypatch.setenv("SMTP_HOST", "smtp.example.com")
monkeypatch.setenv("SMTP_PORT", "2525")
monkeypatch.setenv("SMTP_USERNAME", "smtp-user")
monkeypatch.setenv("SMTP_PASSWORD", "smtp-password")
monkeypatch.setenv("SMTP_FROM_NAME", "Home Automation")
monkeypatch.setenv("SMTP_FROM_ADDRESS", "sender@example.com")
monkeypatch.setenv("SMTP_TO_ADDRESS", "recipient@example.com")
monkeypatch.setenv("SMTP_USE_STARTTLS", "false")
settings = Settings(_env_file=None)
assert settings.smtp_enabled is True
assert settings.smtp_host == "smtp.example.com"
assert settings.smtp_port == 2525
assert settings.smtp_username == "smtp-user"
assert settings.smtp_password == "smtp-password"
assert settings.smtp_from_name == "Home Automation"
assert settings.smtp_from_address == "sender@example.com"
assert settings.smtp_to_address == "recipient@example.com"
assert settings.smtp_use_starttls is False
+215
View File
@@ -0,0 +1,215 @@
from pathlib import Path
import sqlite3
import anyio
import pytest
import yaml
from alembic import command
from app.auth_db import reset_auth_db_caches
from app.config import get_settings
from app.main import create_app
from scripts.app_db_adopt import APP_BASELINE_REVISION
from scripts.location_db_adopt import EXPECTED_USER_VERSION as LOCATION_USER_VERSION
from scripts.location_db_adopt import LOCATION_BASELINE_REVISION
from scripts.poo_db_adopt import EXPECTED_USER_VERSION as POO_USER_VERSION
from scripts.poo_db_adopt import POO_BASELINE_REVISION
from scripts.run_migrations import run_all_migrations
from tests.conftest import _make_alembic_config, _make_poo_alembic_config
PROJECT_ROOT = Path(__file__).resolve().parents[1]
def _read_yaml(path: str) -> dict:
return yaml.safe_load((PROJECT_ROOT / path).read_text())
async def _run_lifespan(app) -> None:
async with app.router.lifespan_context(app):
return None
def _configure_database_env(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> dict[str, Path | str]:
app_path = tmp_path / "app.db"
location_path = tmp_path / "location.db"
poo_path = tmp_path / "poo.db"
monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{app_path}")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_path}")
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("AUTH_COOKIE_SECURE_OVERRIDE", "false")
get_settings.cache_clear()
reset_auth_db_caches()
return {
"app_path": app_path,
"app_url": f"sqlite:///{app_path}",
"location_path": location_path,
"location_url": f"sqlite:///{location_path}",
"poo_path": poo_path,
"poo_url": f"sqlite:///{poo_path}",
}
def _create_legacy_location_db(database_path: Path) -> None:
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) VALUES (?, ?, ?, ?, ?)",
("alice", "2026-04-22T10:00:00Z", 1.23, 4.56, 7.89),
)
conn.execute(f"PRAGMA user_version = {LOCATION_USER_VERSION}")
conn.commit()
conn.close()
def _create_legacy_poo_db(database_path: Path) -> None:
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.execute(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) VALUES (?, ?, ?, ?)",
("2026-04-22T11:00:00Z", "complete", 9.87, 6.54),
)
conn.execute(f"PRAGMA user_version = {POO_USER_VERSION}")
conn.commit()
conn.close()
def test_compose_uses_migration_job_before_app() -> None:
compose = _read_yaml("docker-compose.yml")
override = _read_yaml("docker-compose.override.yml")
migration_service = compose["services"]["migration"]
app_service = compose["services"]["app"]
assert migration_service["command"] == ["python", "-m", "scripts.run_migrations"]
assert migration_service["restart"] == "no"
assert app_service["depends_on"]["migration"]["condition"] == "service_completed_successfully"
assert override["services"]["migration"]["build"] == "."
assert override["services"]["app"]["build"] == "."
def test_image_defaults_to_uvicorn_only() -> None:
dockerfile = (PROJECT_ROOT / "Dockerfile").read_text()
entrypoint = (PROJECT_ROOT / "docker/entrypoint.sh").read_text()
assert 'CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]' in dockerfile
assert 'exec "$@"' in entrypoint
assert "app_db_adopt" not in entrypoint
assert "location_db_adopt" not in entrypoint
assert "poo_db_adopt" not in entrypoint
def test_migration_runner_initializes_and_is_idempotent(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
database_urls = _configure_database_env(tmp_path, monkeypatch)
first_run = run_all_migrations()
second_run = run_all_migrations()
assert first_run == {"app": "initialized", "location": "initialized", "poo": "initialized"}
assert second_run == {
"app": "already_managed",
"location": "already_managed",
"poo": "already_managed",
}
conn = sqlite3.connect(database_urls["app_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == APP_BASELINE_REVISION
tables = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
}
finally:
conn.close()
assert {"auth_users", "auth_sessions", "app_config", "alembic_version"} <= tables
conn = sqlite3.connect(database_urls["location_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == LOCATION_BASELINE_REVISION
finally:
conn.close()
conn = sqlite3.connect(database_urls["poo_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == POO_BASELINE_REVISION
finally:
conn.close()
get_settings.cache_clear()
reset_auth_db_caches()
def test_migration_runner_adopts_legacy_sqlite_without_data_loss(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
database_urls = _configure_database_env(tmp_path, monkeypatch)
_create_legacy_location_db(database_urls["location_path"])
_create_legacy_poo_db(database_urls["poo_path"])
results = run_all_migrations()
assert results == {"app": "initialized", "location": "adopted", "poo": "adopted"}
conn = sqlite3.connect(database_urls["location_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == LOCATION_BASELINE_REVISION
assert conn.execute("SELECT COUNT(*) FROM location").fetchone()[0] == 1
finally:
conn.close()
conn = sqlite3.connect(database_urls["poo_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == POO_BASELINE_REVISION
assert conn.execute("SELECT COUNT(*) FROM poo_records").fetchone()[0] == 1
finally:
conn.close()
get_settings.cache_clear()
reset_auth_db_caches()
def test_app_startup_still_fails_closed_without_running_adoption(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
database_urls = _configure_database_env(tmp_path, monkeypatch)
missing_app_path = database_urls["app_path"]
command.upgrade(_make_alembic_config(database_urls["location_url"]), "head")
command.upgrade(_make_poo_alembic_config(database_urls["poo_url"]), "head")
app = create_app()
with pytest.raises(RuntimeError, match="Run 'python scripts/app_db_adopt.py' first"):
anyio.run(_run_lifespan, app)
assert not Path(missing_app_path).exists()
get_settings.cache_clear()
reset_auth_db_caches()
+158
View File
@@ -1,5 +1,21 @@
from sqlalchemy import text
import app.db as app_db
import app.poo_db as poo_db
from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_homeassistant_client
from app.main import create_app
class _FakeHomeAssistantClient:
def __init__(self) -> None:
self.sensor_calls: list[dict] = []
def publish_sensor(self, *, entity_id: str, state: str, attributes: dict | None = None) -> None:
self.sensor_calls.append(
{"entity_id": entity_id, "state": state, "attributes": attributes or {}}
)
def test_homeassistant_publish_records_location(location_client) -> None:
client, engine = location_client
@@ -141,6 +157,148 @@ def test_homeassistant_publish_rejects_invalid_ticktick_content(location_client)
assert response.text == "bad request"
def test_homeassistant_publish_poo_get_latest_publishes_latest_status(
ready_location_database,
ready_poo_database,
auth_database,
monkeypatch,
) -> None:
location_engine = app_db.create_engine(
ready_location_database["location_url"],
connect_args={"check_same_thread": False},
)
location_session_local = app_db.sessionmaker(
bind=location_engine,
autoflush=False,
autocommit=False,
)
poo_engine = poo_db.create_engine(
ready_poo_database["poo_url"],
connect_args={"check_same_thread": False},
)
poo_session_local = poo_db.sessionmaker(
bind=poo_engine,
autoflush=False,
autocommit=False,
)
fake_ha = _FakeHomeAssistantClient()
settings = Settings(
poo_sensor_entity_name="sensor.test_poo_status",
poo_sensor_friendly_name="Poo Status",
)
monkeypatch.setattr(app_db, "engine", location_engine)
monkeypatch.setattr(app_db, "SessionLocal", location_session_local)
monkeypatch.setattr(poo_db, "poo_engine", poo_engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", poo_session_local)
test_app = create_app()
test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha
test_app.dependency_overrides[get_app_settings] = lambda: settings
with poo_engine.begin() as conn:
conn.execute(
text(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) "
"VALUES (:timestamp, :status, :latitude, :longitude)"
),
{
"timestamp": "2026-04-20T10:05Z",
"status": "done",
"latitude": 1.23,
"longitude": 4.56,
},
)
try:
from fastapi.testclient import TestClient
with TestClient(test_app) as client:
response = client.post(
"/homeassistant/publish",
json={
"target": "poo_recorder",
"action": "get_latest",
"content": "",
},
)
assert response.status_code == 200
assert response.text == ""
assert len(fake_ha.sensor_calls) == 1
assert fake_ha.sensor_calls[0]["entity_id"] == "sensor.test_poo_status"
assert fake_ha.sensor_calls[0]["state"] == "done"
assert fake_ha.sensor_calls[0]["attributes"]["friendly_name"] == "Poo Status"
assert fake_ha.sensor_calls[0]["attributes"]["last_poo"]
finally:
test_app.dependency_overrides.clear()
get_settings.cache_clear()
location_engine.dispose()
poo_engine.dispose()
def test_homeassistant_publish_returns_internal_error_for_unknown_poo_action(
ready_location_database,
ready_poo_database,
auth_database,
monkeypatch,
) -> None:
location_engine = app_db.create_engine(
ready_location_database["location_url"],
connect_args={"check_same_thread": False},
)
location_session_local = app_db.sessionmaker(
bind=location_engine,
autoflush=False,
autocommit=False,
)
poo_engine = poo_db.create_engine(
ready_poo_database["poo_url"],
connect_args={"check_same_thread": False},
)
poo_session_local = poo_db.sessionmaker(
bind=poo_engine,
autoflush=False,
autocommit=False,
)
fake_ha = _FakeHomeAssistantClient()
settings = Settings(
poo_sensor_entity_name="sensor.test_poo_status",
poo_sensor_friendly_name="Poo Status",
)
monkeypatch.setattr(app_db, "engine", location_engine)
monkeypatch.setattr(app_db, "SessionLocal", location_session_local)
monkeypatch.setattr(poo_db, "poo_engine", poo_engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", poo_session_local)
test_app = create_app()
test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha
test_app.dependency_overrides[get_app_settings] = lambda: settings
try:
from fastapi.testclient import TestClient
with TestClient(test_app) as client:
response = client.post(
"/homeassistant/publish",
json={
"target": "poo_recorder",
"action": "unknown_action",
"content": "",
},
)
assert response.status_code == 500
assert response.text == "internal server error"
assert fake_ha.sensor_calls == []
finally:
test_app.dependency_overrides.clear()
get_settings.cache_clear()
location_engine.dispose()
poo_engine.dispose()
def test_homeassistant_publish_returns_not_implemented_for_unknown_location_action(
location_client,
) -> None:
+1 -1
View File
@@ -343,7 +343,7 @@ def test_location_db_adoption_fails_closed_on_alembic_revision_mismatch(
conn.commit()
conn.close()
with pytest.raises(LocationDatabaseAdoptionError, match="revision does not match"):
with pytest.raises(LocationDatabaseAdoptionError, match="known migration revision"):
adopt_or_initialize_location_db(f"sqlite:///{database_path}")
+258
View File
@@ -0,0 +1,258 @@
from datetime import UTC, datetime
import re
import sqlite3
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from app.config import Settings
from app.services.email import EmailDeliveryError
from app.services.public_ip import PublicIPCheckResult, check_public_ipv4, check_public_ipv4_and_notify
def _make_session(database_url: str) -> Session:
engine = create_engine(database_url, connect_args={"check_same_thread": False})
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
return session_local()
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
finally:
session.close()
assert result.status == "first_seen"
assert result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error, last_provider FROM public_ip_state"
).fetchone()
history = conn.execute(
"SELECT ipv4, change_type, provider FROM public_ip_history ORDER BY id"
).fetchall()
finally:
conn.close()
assert state == ("203.0.113.10", None, "first_seen", None, "ipify")
assert history == [("203.0.113.10", "first_seen", "ipify")]
def test_public_ip_unchanged_updates_state_without_adding_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
first_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
unchanged_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
finally:
session.close()
assert first_result.status == "first_seen"
assert unchanged_result.status == "unchanged"
assert unchanged_result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status FROM public_ip_state"
).fetchone()
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
finally:
conn.close()
assert state == ("203.0.113.10", None, "unchanged")
assert history_count == 1
def test_public_ip_changed_updates_state_and_adds_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "198.51.100.25")
finally:
session.close()
assert result.status == "changed"
assert result.changed is True
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_changed_at FROM public_ip_state"
).fetchone()
history = conn.execute(
"SELECT ipv4, change_type FROM public_ip_history ORDER BY id"
).fetchall()
finally:
conn.close()
assert state[0:3] == ("198.51.100.25", "203.0.113.10", "changed")
assert state[3] is not None
assert history == [("203.0.113.10", "first_seen"), ("198.51.100.25", "changed")]
def test_public_ip_error_keeps_existing_ip_and_does_not_add_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "not-an-ip")
finally:
session.close()
assert result.status == "error"
assert result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error FROM public_ip_state"
).fetchone()
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
finally:
conn.close()
assert state[0:3] == ("203.0.113.10", None, "error")
assert state[3] is not None
assert history_count == 1
def test_public_ip_check_endpoint_requires_authentication(client: TestClient) -> None:
response = client.get("/public-ip/check")
assert response.status_code == 401
assert response.json() == {"detail": "authentication required"}
def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatch) -> None:
from app.api.routes import public_ip as public_ip_route
fixed_checked_at = datetime(2026, 4, 29, 12, 0, tzinfo=UTC)
monkeypatch.setattr(
public_ip_route,
"check_public_ipv4_and_notify",
lambda session, bootstrap_settings: PublicIPCheckResult(
status="changed",
checked_at=fixed_checked_at,
changed=True,
),
)
_login(client)
response = client.get("/public-ip/check")
assert response.status_code == 200
assert response.json() == {
"status": "changed",
"checked_at": "2026-04-29T12:00:00Z",
"changed": True,
}
assert "current_ipv4" not in response.text
assert "previous_ipv4" not in response.text
assert "203.0.113.10" not in response.text
def _notification_settings() -> Settings:
return Settings(
_env_file=None,
app_env="development",
app_hostname="localhost:8000",
app_database_url="sqlite:///./data/app.db",
location_database_url="sqlite:///./data/locationRecorder.db",
poo_database_url="sqlite:///./data/pooRecorder.db",
auth_bootstrap_username="admin",
auth_bootstrap_password="secret-password",
smtp_enabled=True,
smtp_host="smtp.example.com",
smtp_port=587,
smtp_username="smtp-user",
smtp_password="super-secret-password",
smtp_from_address="sender@example.com",
smtp_to_address="recipient@example.com",
smtp_use_starttls=True,
)
def test_public_ip_notification_sends_only_when_changed(auth_database, monkeypatch) -> None:
session = _make_session(auth_database["app_url"])
sent = []
monkeypatch.setattr(
"app.services.public_ip.send_public_ip_changed_email",
lambda settings, *, previous_ipv4, current_ipv4, detected_at: sent.append(
(previous_ipv4, current_ipv4, detected_at)
),
)
try:
first_seen = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "203.0.113.10",
)
unchanged = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "203.0.113.10",
)
changed = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "198.51.100.25",
)
finally:
session.close()
assert first_seen.status == "first_seen"
assert unchanged.status == "unchanged"
assert changed.status == "changed"
assert len(sent) == 1
assert sent[0][0] == "203.0.113.10"
assert sent[0][1] == "198.51.100.25"
assert sent[0][2] == changed.checked_at
def test_public_ip_notification_failure_does_not_break_changed_result(auth_database, monkeypatch) -> None:
session = _make_session(auth_database["app_url"])
monkeypatch.setattr(
"app.services.public_ip.send_public_ip_changed_email",
lambda settings, *, previous_ipv4, current_ipv4, detected_at: (_ for _ in ()).throw(
EmailDeliveryError("smtp down")
),
)
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "198.51.100.25",
)
finally:
session.close()
assert result.status == "changed"
assert result.changed is True
assert result.previous_ipv4 == "203.0.113.10"
assert result.current_ipv4 == "198.51.100.25"
+397
View File
@@ -0,0 +1,397 @@
import re
import sqlite3
import smtplib
from fastapi.testclient import TestClient
from app.config import Settings
from app.services.email import (
EmailDeliveryError,
get_smtp_config,
is_smtp_ready,
send_public_ip_changed_email,
send_smtp_test_email,
)
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
def _smtp_settings(**overrides) -> Settings:
payload = {
"app_env": "development",
"app_hostname": "localhost:8000",
"app_database_url": "sqlite:///./data/app.db",
"location_database_url": "sqlite:///./data/locationRecorder.db",
"poo_database_url": "sqlite:///./data/pooRecorder.db",
"auth_bootstrap_username": "admin",
"auth_bootstrap_password": "secret-password",
"smtp_enabled": True,
"smtp_host": "smtp.example.com",
"smtp_port": 587,
"smtp_username": "smtp-user",
"smtp_password": "super-secret-password",
"smtp_from_name": "Home Automation",
"smtp_from_address": "sender@example.com",
"smtp_to_address": "recipient@example.com",
"smtp_use_starttls": True,
}
payload.update(overrides)
return Settings(_env_file=None, **payload)
def test_get_smtp_config_reads_runtime_values() -> None:
settings = _smtp_settings(smtp_port=2525, smtp_use_starttls=False)
smtp_config = get_smtp_config(settings)
assert smtp_config.host == "smtp.example.com"
assert smtp_config.port == 2525
assert smtp_config.username == "smtp-user"
assert smtp_config.password == "super-secret-password"
assert smtp_config.from_name == "Home Automation"
assert smtp_config.from_address == "sender@example.com"
assert smtp_config.to_address == "recipient@example.com"
assert smtp_config.use_starttls is False
def test_smtp_test_readiness_does_not_require_smtp_enabled() -> None:
settings = _smtp_settings(smtp_enabled=False)
assert is_smtp_ready(settings) is True
def test_send_smtp_test_email_success(monkeypatch) -> None:
sent = {}
class FakeSMTP:
def __init__(self, host, port, timeout):
sent["host"] = host
sent["port"] = port
sent["timeout"] = timeout
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
sent["ehlo"] = sent.get("ehlo", 0) + 1
def starttls(self):
sent["starttls"] = True
def login(self, username, password):
sent["username"] = username
sent["password"] = password
def send_message(self, message, from_addr=None, to_addrs=None):
sent["subject"] = message["Subject"]
sent["from"] = message["From"]
sent["to"] = message["To"]
sent["body"] = message.get_content()
sent["envelope_from"] = from_addr
sent["envelope_to"] = to_addrs
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
send_smtp_test_email(_smtp_settings())
assert sent["host"] == "smtp.example.com"
assert sent["port"] == 587
assert sent["timeout"] == 10
assert sent["starttls"] is True
assert sent["username"] == "smtp-user"
assert sent["password"] == "super-secret-password"
assert sent["subject"] == "Home Automation SMTP Test"
assert sent["from"] == "Home Automation <sender@example.com>"
assert sent["to"] == "recipient@example.com"
assert sent["envelope_from"] == "sender@example.com"
assert sent["envelope_to"] == ["recipient@example.com"]
assert "This is a test email" in sent["body"]
def test_send_smtp_test_email_does_not_require_smtp_enabled(monkeypatch) -> None:
sent = {}
class FakeSMTP:
def __init__(self, host, port, timeout):
sent["host"] = host
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
return None
def starttls(self):
return None
def login(self, username, password):
return None
def send_message(self, message, from_addr=None, to_addrs=None):
sent["subject"] = message["Subject"]
sent["from"] = message["From"]
sent["envelope_from"] = from_addr
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
send_smtp_test_email(_smtp_settings(smtp_enabled=False))
assert sent["host"] == "smtp.example.com"
assert sent["subject"] == "Home Automation SMTP Test"
assert sent["from"] == "Home Automation <sender@example.com>"
assert sent["envelope_from"] == "sender@example.com"
def test_send_smtp_test_email_failure_sanitizes_password(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, host, port, timeout):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
return None
def starttls(self):
raise smtplib.SMTPException("authentication failed for super-secret-password")
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
try:
send_smtp_test_email(_smtp_settings())
assert False, "expected EmailDeliveryError"
except EmailDeliveryError as exc:
assert "super-secret-password" not in str(exc)
assert "[redacted]" in str(exc)
def test_send_public_ip_changed_email_contains_expected_english_content(monkeypatch) -> None:
sent = {}
class FakeSMTP:
def __init__(self, host, port, timeout):
sent["host"] = host
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
def ehlo(self):
return None
def starttls(self):
return None
def login(self, username, password):
return None
def send_message(self, message, from_addr=None, to_addrs=None):
sent["subject"] = message["Subject"]
sent["body"] = message.get_content()
sent["from"] = message["From"]
sent["envelope_from"] = from_addr
monkeypatch.setattr("app.services.email.smtplib.SMTP", FakeSMTP)
send_public_ip_changed_email(
_smtp_settings(),
previous_ipv4="203.0.113.10",
current_ipv4="198.51.100.25",
detected_at=__import__("datetime").datetime(2026, 4, 29, 10, 0, tzinfo=__import__("datetime").UTC),
)
assert sent["subject"] == "Public IP changed"
assert sent["from"] == "Home Automation <sender@example.com>"
assert sent["envelope_from"] == "sender@example.com"
assert "Your public IPv4 address has changed." in sent["body"]
assert "Previous IP: 203.0.113.10" in sent["body"]
assert "Current IP: 198.51.100.25" in sent["body"]
assert "Detected at: 2026-04-29 10:00:00 UTC" in sent["body"]
assert "update the trusted IP manually" in sent["body"]
def test_config_update_does_not_clear_existing_smtp_password(
client: TestClient, test_database_urls
) -> None:
_login(client)
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
response = client.post(
"/config",
data={
"csrf_token": config_csrf_token,
"APP_NAME": "SMTP Config Test",
"APP_ENV": "development",
"APP_DEBUG": "true",
"APP_HOSTNAME": "localhost:8000",
"SMTP_ENABLED": "true",
"SMTP_HOST": "smtp.example.com",
"SMTP_PORT": "587",
"SMTP_USERNAME": "smtp-user",
"SMTP_PASSWORD": "persist-me",
"SMTP_FROM_ADDRESS": "sender@example.com",
"SMTP_TO_ADDRESS": "recipient@example.com",
"SMTP_USE_STARTTLS": "true",
"AUTH_SESSION_COOKIE_NAME": "home_automation_session",
"AUTH_SESSION_TTL_HOURS": "12",
"AUTH_COOKIE_SECURE_OVERRIDE": "false",
"POO_WEBHOOK_ID": "",
"POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status",
"POO_SENSOR_FRIENDLY_NAME": "Poo Status",
"TICKTICK_CLIENT_ID": "",
"TICKTICK_CLIENT_SECRET": "",
"TICKTICK_TOKEN": "",
"HOME_ASSISTANT_BASE_URL": "",
"HOME_ASSISTANT_AUTH_TOKEN": "",
"HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0",
"HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "",
},
follow_redirects=False,
)
assert response.status_code == 303
config_page = client.get("/config")
config_csrf_token = _extract_csrf_token(config_page.text)
response = client.post(
"/config",
data={
"csrf_token": config_csrf_token,
"APP_NAME": "SMTP Config Updated",
"APP_ENV": "development",
"APP_DEBUG": "true",
"APP_HOSTNAME": "localhost:8000",
"SMTP_ENABLED": "true",
"SMTP_HOST": "smtp.example.com",
"SMTP_PORT": "587",
"SMTP_USERNAME": "smtp-user",
"SMTP_PASSWORD": "",
"SMTP_FROM_ADDRESS": "sender@example.com",
"SMTP_TO_ADDRESS": "recipient@example.com",
"SMTP_USE_STARTTLS": "true",
"AUTH_SESSION_COOKIE_NAME": "home_automation_session",
"AUTH_SESSION_TTL_HOURS": "12",
"AUTH_COOKIE_SECURE_OVERRIDE": "false",
"POO_WEBHOOK_ID": "",
"POO_SENSOR_ENTITY_NAME": "sensor.test_poo_status",
"POO_SENSOR_FRIENDLY_NAME": "Poo Status",
"TICKTICK_CLIENT_ID": "",
"TICKTICK_CLIENT_SECRET": "",
"TICKTICK_TOKEN": "",
"HOME_ASSISTANT_BASE_URL": "",
"HOME_ASSISTANT_AUTH_TOKEN": "",
"HOME_ASSISTANT_TIMEOUT_SECONDS": "1.0",
"HOME_ASSISTANT_ACTION_TASK_PROJECT_ID": "",
},
follow_redirects=False,
)
assert response.status_code == 303
conn = sqlite3.connect(test_database_urls["app_path"])
try:
rows = dict(conn.execute("SELECT key, value FROM app_config").fetchall())
finally:
conn.close()
assert rows["SMTP_PASSWORD"] == "persist-me"
assert rows["APP_NAME"] == "SMTP Config Updated"
def test_smtp_test_endpoint_requires_authentication(client: TestClient) -> None:
response = client.post("/config/smtp/test", data={"csrf_token": "ignored"}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/login"
def test_smtp_test_endpoint_success_and_failure_do_not_expose_password(
client: TestClient, monkeypatch
) -> None:
from app.api.routes import pages
_login(client)
config_page = client.get("/config")
csrf_token = _extract_csrf_token(config_page.text)
monkeypatch.setattr(pages, "send_smtp_test_email", lambda settings: None)
response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?smtp_test=success"
follow_up = client.get(response.headers["location"])
assert follow_up.status_code == 200
assert "SMTP test email sent successfully." in follow_up.text
assert "super-secret-password" not in follow_up.text
monkeypatch.setattr(
pages,
"send_smtp_test_email",
lambda settings: (_ for _ in ()).throw(EmailDeliveryError("smtp auth failed for [redacted]")),
)
response = client.post("/config/smtp/test", data={"csrf_token": csrf_token}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/config?smtp_test=failed"
follow_up = client.get(response.headers["location"])
assert follow_up.status_code == 200
assert "SMTP test failed. Check saved SMTP settings and server reachability." in follow_up.text
assert "super-secret-password" not in follow_up.text
def test_config_page_renders_smtp_test_button_with_formaction(
client: TestClient, test_database_urls
) -> None:
_login(client)
conn = sqlite3.connect(test_database_urls["app_path"])
try:
conn.executemany(
"INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at",
[
("SMTP_ENABLED", "true"),
("SMTP_HOST", "smtp.example.com"),
("SMTP_PORT", "587"),
("SMTP_FROM_ADDRESS", "sender@example.com"),
("SMTP_TO_ADDRESS", "recipient@example.com"),
],
)
conn.commit()
finally:
conn.close()
response = client.get("/config")
assert response.status_code == 200
assert 'formaction="/config/smtp/test"' in response.text