Merge pull request 'Feature/m1 db consolidation' (#7) from feature/m1-db-consolidation into main
pytest / test (push) Has been cancelled
docker-image / build-and-push (push) Has been cancelled

Reviewed-on: #7
This commit was merged in pull request #7.
This commit is contained in:
2026-06-12 20:33:34 +02:00
59 changed files with 1067 additions and 2266 deletions
-2
View File
@@ -4,8 +4,6 @@ APP_NAME=Home Automation Backend (Python)
APP_ENV=production APP_ENV=production
APP_HOSTNAME=home-automation.example.com APP_HOSTNAME=home-automation.example.com
APP_DATABASE_URL=sqlite:////app/data/app.db APP_DATABASE_URL=sqlite:////app/data/app.db
LOCATION_DATABASE_URL=sqlite:////app/data/locationRecorder.db
POO_DATABASE_URL=sqlite:////app/data/pooRecorder.db
AUTH_BOOTSTRAP_USERNAME=admin AUTH_BOOTSTRAP_USERNAME=admin
AUTH_BOOTSTRAP_PASSWORD=change-me AUTH_BOOTSTRAP_PASSWORD=change-me
+35 -88
View File
@@ -5,7 +5,7 @@
当前系统已经包含: 当前系统已经包含:
- FastAPI Web 应用与服务端模板页面 - FastAPI Web 应用与服务端模板页面
- SQLite + SQLAlchemy + Alembic 的库结构 - SQLite + SQLAlchemy + Alembic 的库结构
- username/password + server-side session 鉴权 - username/password + server-side session 鉴权
- runtime config 页面与 app DB 持久化 - runtime config 页面与 app DB 持久化
- public IPv4 monitor、历史持久化与定时检查 - public IPv4 monitor、历史持久化与定时检查
@@ -23,41 +23,32 @@
## 当前配置现实 ## 当前配置现实
当前系统仍然是三个独立的 SQLite 数据库文件,而不是单一数据库 当前系统使用单一 SQLite 数据库文件`app.db`),所有数据表都在其中
- `app` 级共享数据使用自己的 DB 文件 - auth(单个 admin 用户、server-side session
- `location` 模块使用自己的 DB 文件 - runtime config 持久化(`app_config` 表)
- `poo` 模块使用自己的 DB 文件 - public IPv4 当前状态与变化历史
- location 记录(`location` 表)
- poo 记录(`poo_records` 表)
当前阶段明确不借这次重构把这些 DB 合并。配置层已经显式反映这一点 配置层只保留一个数据库环境变量
- `APP_DATABASE_URL` - `APP_DATABASE_URL`
- `LOCATION_DATABASE_URL`
- `POO_DATABASE_URL`
目前 auth、`location``poo` 都已经接到各自独立的数据库文件。 `app.db` 不会在应用启动时自动创建,需要先运行:
其中 `app` 级共享 DB 当前主要用于: ```bash
python -m scripts.run_migrations
```
- 单个 admin 用户 该命令会通过 Alembic 将 `app.db` 初始化或升级到最新 head(含 `location` / `poo_records` 表)。
- server-side session
- runtime config 持久化
- public IPv4 当前状态与变化历史
这部分现在也使用 Alembic 管理:
- `app db` 不会在应用启动时自动创建
- 需要先运行 `python scripts/app_db_adopt.py`
- 这个脚本会创建新 DB 并建好 schema
## 当前目录 ## 当前目录
主要目录如下: 主要目录如下:
- `app/`: FastAPI 应用代码 - `app/`: FastAPI 应用代码
- `alembic_app/`: App DB 的 Alembic migration 环境 - `alembic_app/`: App DB 的 Alembic migration 环境(同时管理 `location` / `poo_records` 表)
- `alembic_location/`: Location DB 的 Alembic migration 环境
- `alembic_poo/`: Poo DB 的 Alembic migration 环境
- `tests/`: pytest 测试 - `tests/`: pytest 测试
- `docs/`: 当前系统说明文档 - `docs/`: 当前系统说明文档
- `scripts/`: 辅助脚本,例如 OpenAPI 导出 - `scripts/`: 辅助脚本,例如 OpenAPI 导出
@@ -128,24 +119,22 @@ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
## 数据库与 Alembic ## 数据库与 Alembic
当前默认使用 SQLite,并区分三个数据库文件: 当前使用单一 SQLite 数据库文件:
- App DB`sqlite:///./data/app.db` - App DB`sqlite:///./data/app.db`
- Location DB`sqlite:///./data/locationRecorder.db`
- Poo DB`sqlite:///./data/pooRecorder.db`
- 数据目录:`./data/` - 数据目录:`./data/`
初始化 migration 环境后,可继续添加模型并生成迁移 所有模型(auth / config / public_ip / location / poo)共用同一个 `Base`,均通过单一 Alembic 链管理
当前 `app``location` `poo` 都已经有各自独立的 Alembic 链路。 - Alembic 环境:`alembic_app.ini` + `alembic_app/`
- App Alembic 环境:`alembic_app.ini` + `alembic_app/`
- Location Alembic 环境:`alembic_location.ini` + `alembic_location/`
- Poo Alembic 环境:`alembic_poo.ini` + `alembic_poo/`
- 统一 migration job`python -m scripts.run_migrations` - 统一 migration job`python -m scripts.run_migrations`
- App DB 初始化:`python scripts/app_db_adopt.py` - App DB 接管 / 初始化:`python scripts/app_db_adopt.py`
- Location DB 接管 / 初始化:`python scripts/location_db_adopt.py`
- Poo DB 接管 / 初始化:`python scripts/poo_db_adopt.py` 历史 location / poo 数据(旧版本遗留的独立 DB 文件)已通过以下脚本一次性迁移至 `app.db`(幂等,不删除旧文件):
```bash
python -m scripts.migrate_legacy_data
```
## 基础鉴权 ## 基础鉴权
@@ -197,7 +186,7 @@ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
这意味着: 这意味着:
- location / poo / app DB 地址仍然属于 bootstrap 范畴 - app DB 地址`APP_DATABASE_URL`仍然属于 bootstrap 范畴
- 运行时可编辑配置主要通过 `app_config` 表持久化 - 运行时可编辑配置主要通过 `app_config` 表持久化
- token / secret 这类运行时必须可取回的配置,目前允许明文存储在 config 表中 - token / secret 这类运行时必须可取回的配置,目前允许明文存储在 config 表中
- 登录密码仍然单独使用 Argon2 哈希,不走 config 表明文存储 - 登录密码仍然单独使用 Argon2 哈希,不走 config 表明文存储
@@ -318,55 +307,6 @@ docker compose -f docker-compose.yml up -d
docker compose logs -f app docker compose logs -f app
``` ```
## Grafana Provisioning
当前仓库支持通过 Grafana provisioning 自动加载 SQLite datasource 和 repo 内的 dashboard 导出文件。
需要保留的文件路径如下:
- `grafana/provisioning/datasources/locationrecorder.yaml`
- `grafana/provisioning/datasources/poorecorder.yaml`
- `grafana/provisioning/dashboards/provider.yaml`
- `grafana/dashboards/locationrecorder.json`
- `grafana/dashboards/poorecorder.json`
这些文件的职责分别是:
- `grafana/provisioning/datasources/locationrecorder.yaml`:声明 `locationrecorder` SQLite datasource,并指向 `/data/home-automation/locationRecorder.db`
- `grafana/provisioning/datasources/poorecorder.yaml`:声明 `poorecorder` SQLite datasource,并指向 `/data/home-automation/pooRecorder.db`
- `grafana/provisioning/dashboards/provider.yaml`:告诉 Grafana 从 `/var/lib/grafana/dashboards` 扫描并加载 dashboard JSON
- `grafana/dashboards/locationrecorder.json`location recorder dashboard 导出文件,内容本身不需要在 compose 中改写
- `grafana/dashboards/poorecorder.json`poo recorder dashboard 导出文件,内容本身不需要在 compose 中改写
当前 `docker-compose.yml` 中,Grafana service 需要挂载以下目录:
- `./grafana/provisioning -> /etc/grafana/provisioning:ro`
- `./grafana/dashboards -> /var/lib/grafana/dashboards:ro`
同时保留现有 named volume `homeautomation_grafana_storage:/var/lib/grafana` 作为 Grafana 运行态数据存储。
一键启动前,至少需要以下文件已经存在:
- `grafana/provisioning/datasources/locationrecorder.yaml`
- `grafana/provisioning/datasources/poorecorder.yaml`
- `grafana/provisioning/dashboards/provider.yaml`
- `grafana/dashboards/locationrecorder.json`
- `grafana/dashboards/poorecorder.json`
启动方式:
```bash
docker compose up -d
```
启动后会发生的事情:
- Grafana 容器会安装 `frser-sqlite-datasource` 插件
- Grafana 会读取 `/etc/grafana/provisioning/datasources/` 下的 datasource YAML
- Grafana 会读取 `/etc/grafana/provisioning/dashboards/provider.yaml`
- Grafana 会从 `/var/lib/grafana/dashboards/` 自动导入两个 dashboard JSON
- 现有 Grafana named volume 继续负责保存 Grafana 运行态数据,不会覆盖 repo 内的 dashboard 与 provisioning 文件
## Container Image CI ## Container Image CI
项目提供了一个 release image workflow 项目提供了一个 release image workflow
@@ -411,9 +351,16 @@ pytest
当前测试包含: 当前测试包含:
- app 基本启动测试 - app 启动与 `/status` 检查
- `/status` endpoint 测试 - 登录 / session / 鉴权流程
- 登录 / session 基础流程测试 - runtime config 读写
- public IPv4 monitor
- SMTP 配置与测试发信
- location / poo recorder 端点
- Home Assistant inbound 集成
- TickTick OAuth
- 部署与迁移(`run_migrations`
- legacy 数据迁移脚本(`migrate_legacy_data`
## OpenAPI 导出 ## OpenAPI 导出
+4 -2
View File
@@ -3,11 +3,13 @@ from logging.config import fileConfig
from alembic import context from alembic import context
from sqlalchemy import engine_from_config, pool from sqlalchemy import engine_from_config, pool
from app.auth_db import AuthBase
from app.config import get_settings from app.config import get_settings
from app.db import Base
from app.models.config import AppConfigEntry # noqa: F401 from app.models.config import AppConfigEntry # noqa: F401
from app.models.auth import AuthSession, AuthUser # noqa: F401 from app.models.auth import AuthSession, AuthUser # noqa: F401
from app.models.public_ip import PublicIPHistory, PublicIPState # noqa: F401 from app.models.public_ip import PublicIPHistory, PublicIPState # noqa: F401
from app.models.location import Location # noqa: F401
from app.models.poo import PooRecord # noqa: F401
config = context.config config = context.config
@@ -19,7 +21,7 @@ configured_url = config.get_main_option("sqlalchemy.url")
if not configured_url or configured_url == "sqlite:///./data/app.db": if not configured_url or configured_url == "sqlite:///./data/app.db":
config.set_main_option("sqlalchemy.url", settings.app_database_url) config.set_main_option("sqlalchemy.url", settings.app_database_url)
target_metadata = AuthBase.metadata target_metadata = Base.metadata
def run_migrations_offline() -> None: def run_migrations_offline() -> None:
@@ -0,0 +1,43 @@
"""merge location and poo_records tables into app chain
Revision ID: 20260611_06_merge_location_poo_tables
Revises: 20260429_05_public_ip_monitor
Create Date: 2026-06-11 00:00:01.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "20260611_06_merge_location_poo_tables"
down_revision: Union[str, None] = "20260429_05_public_ip_monitor"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"location",
sa.Column("person", sa.Text(), nullable=False),
sa.Column("datetime", sa.Text(), nullable=False),
sa.Column("latitude", sa.REAL(), nullable=False),
sa.Column("longitude", sa.REAL(), nullable=False),
sa.Column("altitude", sa.REAL(), nullable=True),
sa.PrimaryKeyConstraint("person", "datetime"),
)
op.create_table(
"poo_records",
sa.Column("timestamp", sa.Text(), nullable=False),
sa.Column("status", sa.Text(), nullable=False),
sa.Column("latitude", sa.REAL(), nullable=False),
sa.Column("longitude", sa.REAL(), nullable=False),
sa.PrimaryKeyConstraint("timestamp"),
)
def downgrade() -> None:
op.drop_table("poo_records")
op.drop_table("location")
-37
View File
@@ -1,37 +0,0 @@
[alembic]
script_location = alembic_location
prepend_sys_path = .
path_separator = os
sqlalchemy.url = sqlite:///./data/locationRecorder.db
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
-2
View File
@@ -1,2 +0,0 @@
This directory contains the Alembic migration environment for the Python rewrite skeleton.
-48
View File
@@ -1,48 +0,0 @@
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
from app.config import get_settings
from app.models import Location # noqa: F401
from app.models.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
settings = get_settings()
configured_url = config.get_main_option("sqlalchemy.url")
if not configured_url or configured_url == "sqlite:///./data/locationRecorder.db":
config.set_main_option("sqlalchemy.url", settings.location_database_url)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
-26
View File
@@ -1,26 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
-1
View File
@@ -1 +0,0 @@
@@ -1,33 +0,0 @@
"""location baseline
Revision ID: 20260419_01_location_baseline
Revises:
Create Date: 2026-04-19 00:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "20260419_01_location_baseline"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"location",
sa.Column("person", sa.Text(), nullable=False),
sa.Column("datetime", sa.Text(), nullable=False),
sa.Column("latitude", sa.Float(), nullable=False),
sa.Column("longitude", sa.Float(), nullable=False),
sa.Column("altitude", sa.Float(), nullable=True),
sa.PrimaryKeyConstraint("person", "datetime"),
)
def downgrade() -> None:
op.drop_table("location")
-37
View File
@@ -1,37 +0,0 @@
[alembic]
script_location = alembic_poo
prepend_sys_path = .
path_separator = os
sqlalchemy.url = sqlite:///./data/pooRecorder.db
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers = console
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
-48
View File
@@ -1,48 +0,0 @@
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
from app.config import get_settings
from app.models.poo import PooRecord # noqa: F401
from app.poo_db import PooBase
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
settings = get_settings()
configured_url = config.get_main_option("sqlalchemy.url")
if not configured_url or configured_url == "sqlite:///./data/pooRecorder.db":
config.set_main_option("sqlalchemy.url", settings.poo_database_url)
target_metadata = PooBase.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
@@ -1,32 +0,0 @@
"""poo baseline
Revision ID: 20260420_01_poo_baseline
Revises:
Create Date: 2026-04-20 00:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "20260420_01_poo_baseline"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"poo_records",
sa.Column("timestamp", sa.Text(), nullable=False),
sa.Column("status", sa.Text(), nullable=False),
sa.Column("latitude", sa.Float(), nullable=False),
sa.Column("longitude", sa.Float(), nullable=False),
sa.PrimaryKeyConstraint("timestamp"),
)
def downgrade() -> None:
op.drop_table("poo_records")
+4 -4
View File
@@ -7,7 +7,7 @@ from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.config import Settings from app.config import Settings
from app.dependencies import get_app_settings, get_auth_db, get_current_auth_session from app.dependencies import get_app_settings, get_db, get_current_auth_session
from app.services.auth import ( from app.services.auth import (
AuthenticatedSession, AuthenticatedSession,
authenticate_user, authenticate_user,
@@ -57,7 +57,7 @@ def login_submit(
username: str = Form(), username: str = Form(),
password: str = Form(), password: str = Form(),
csrf_token: str = Form(), csrf_token: str = Form(),
session: Session = Depends(get_auth_db), session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
) -> Response: ) -> Response:
cookie_csrf_token = request.cookies.get(LOGIN_CSRF_COOKIE_NAME) cookie_csrf_token = request.cookies.get(LOGIN_CSRF_COOKIE_NAME)
@@ -102,7 +102,7 @@ def change_password_submit(
new_password: str = Form(), new_password: str = Form(),
confirm_password: str = Form(), confirm_password: str = Form(),
csrf_token: str = Form(), csrf_token: str = Form(),
session: Session = Depends(get_auth_db), session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response: ) -> Response:
@@ -151,7 +151,7 @@ def change_password_submit(
def logout( def logout(
request: Request, request: Request,
csrf_token: str = Form(), csrf_token: str = Form(),
session: Session = Depends(get_auth_db), session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> RedirectResponse: ) -> RedirectResponse:
+1 -3
View File
@@ -11,7 +11,6 @@ from app.dependencies import (
get_app_settings, get_app_settings,
get_db, get_db,
get_homeassistant_client, get_homeassistant_client,
get_poo_db,
get_ticktick_client, get_ticktick_client,
) )
from app.integrations.homeassistant import ( from app.integrations.homeassistant import (
@@ -36,7 +35,6 @@ INTERNAL_SERVER_ERROR_MESSAGE = "internal server error"
async def publish_from_homeassistant( async def publish_from_homeassistant(
request: Request, request: Request,
db: Session = Depends(get_db), db: Session = Depends(get_db),
poo_db: Session = Depends(get_poo_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client), homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client),
ticktick_client: TickTickClient = Depends(get_ticktick_client), ticktick_client: TickTickClient = Depends(get_ticktick_client),
@@ -49,7 +47,7 @@ async def publish_from_homeassistant(
db, db,
envelope, envelope,
ticktick_client=ticktick_client, ticktick_client=ticktick_client,
poo_session=poo_db, poo_session=db,
settings=settings, settings=settings,
homeassistant_client=homeassistant_client, homeassistant_client=homeassistant_client,
) )
+4 -4
View File
@@ -6,7 +6,7 @@ from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_auth_db, get_current_auth_session from app.dependencies import get_app_settings, get_db, get_current_auth_session
from app.services.auth import AuthenticatedSession from app.services.auth import AuthenticatedSession
from app.services.config_page import ( from app.services.config_page import (
ConfigSaveError, ConfigSaveError,
@@ -100,7 +100,7 @@ def admin_redirect(
@router.get("/config", response_class=HTMLResponse) @router.get("/config", response_class=HTMLResponse)
def config_page( def config_page(
request: Request, request: Request,
auth_db_session: Session = Depends(get_auth_db), auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response: ) -> Response:
@@ -129,7 +129,7 @@ def config_page(
@router.post("/config", response_class=HTMLResponse) @router.post("/config", response_class=HTMLResponse)
async def config_submit( async def config_submit(
request: Request, request: Request,
auth_db_session: Session = Depends(get_auth_db), auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response: ) -> Response:
@@ -189,7 +189,7 @@ async def config_submit(
@router.post("/config/smtp/test", response_class=HTMLResponse) @router.post("/config/smtp/test", response_class=HTMLResponse)
async def smtp_test_submit( async def smtp_test_submit(
request: Request, request: Request,
auth_db_session: Session = Depends(get_auth_db), auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> Response: ) -> Response:
+3 -3
View File
@@ -7,7 +7,7 @@ from pydantic import ValidationError
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.config import Settings from app.config import Settings
from app.dependencies import get_app_settings, get_homeassistant_client, get_poo_db from app.dependencies import get_app_settings, get_homeassistant_client, get_db
from app.integrations.homeassistant import HomeAssistantClient from app.integrations.homeassistant import HomeAssistantClient
from app.schemas.poo import PooRecordRequest from app.schemas.poo import PooRecordRequest
from app.services.poo import publish_latest_poo_status, record_poo from app.services.poo import publish_latest_poo_status, record_poo
@@ -21,7 +21,7 @@ INTERNAL_SERVER_ERROR_MESSAGE = "internal server error"
@router.post("/poo/record") @router.post("/poo/record")
async def create_poo_record( async def create_poo_record(
request: Request, request: Request,
db: Session = Depends(get_poo_db), db: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client), homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client),
) -> Response: ) -> Response:
@@ -56,7 +56,7 @@ async def create_poo_record(
@router.get("/poo/latest") @router.get("/poo/latest")
def notify_latest_poo( def notify_latest_poo(
db: Session = Depends(get_poo_db), db: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client), homeassistant_client: HomeAssistantClient = Depends(get_homeassistant_client),
) -> Response: ) -> Response:
+2 -2
View File
@@ -1,7 +1,7 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.dependencies import get_auth_db, get_current_auth_session from app.dependencies import get_db, get_current_auth_session
from app.schemas.public_ip import PublicIPCheckResponse from app.schemas.public_ip import PublicIPCheckResponse
from app.config import get_settings from app.config import get_settings
from app.services.auth import AuthenticatedSession from app.services.auth import AuthenticatedSession
@@ -12,7 +12,7 @@ router = APIRouter(tags=["public-ip"])
@router.get("/public-ip/check", response_model=PublicIPCheckResponse) @router.get("/public-ip/check", response_model=PublicIPCheckResponse)
def run_public_ip_check( def run_public_ip_check(
session: Session = Depends(get_auth_db), session: Session = Depends(get_db),
current_auth: AuthenticatedSession | None = Depends(get_current_auth_session), current_auth: AuthenticatedSession | None = Depends(get_current_auth_session),
) -> PublicIPCheckResponse: ) -> PublicIPCheckResponse:
if current_auth is None: if current_auth is None:
+2 -2
View File
@@ -7,7 +7,7 @@ from sqlalchemy.orm import Session
from app.config import Settings from app.config import Settings
from app.dependencies import ( from app.dependencies import (
get_app_settings, get_app_settings,
get_auth_db, get_db,
get_current_auth_session, get_current_auth_session,
get_ticktick_client, get_ticktick_client,
) )
@@ -39,7 +39,7 @@ def start_ticktick_auth(
@router.get("/ticktick/auth/code") @router.get("/ticktick/auth/code")
def handle_ticktick_auth_code( def handle_ticktick_auth_code(
request: Request, request: Request,
auth_db_session: Session = Depends(get_auth_db), auth_db_session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
ticktick_client: TickTickClient = Depends(get_ticktick_client), ticktick_client: TickTickClient = Depends(get_ticktick_client),
) -> Response: ) -> Response:
-53
View File
@@ -1,53 +0,0 @@
from collections.abc import Generator
from functools import lru_cache
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.config import get_settings
class AuthBase(DeclarativeBase):
pass
def _build_connect_args(database_url: str) -> dict[str, object]:
connect_args: dict[str, object] = {}
if database_url.startswith("sqlite"):
connect_args["check_same_thread"] = False
return connect_args
@lru_cache
def _get_auth_engine(database_url: str):
return create_engine(database_url, connect_args=_build_connect_args(database_url))
@lru_cache
def _get_auth_session_local(database_url: str):
engine = _get_auth_engine(database_url)
return sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
def get_auth_engine():
settings = get_settings()
return _get_auth_engine(settings.app_database_url)
def get_auth_session_local():
settings = get_settings()
return _get_auth_session_local(settings.app_database_url)
def reset_auth_db_caches() -> None:
_get_auth_session_local.cache_clear()
_get_auth_engine.cache_clear()
def get_auth_db_session() -> Generator[Session, None, None]:
session_local = get_auth_session_local()
session = session_local()
try:
yield session
finally:
session.close()
-13
View File
@@ -12,9 +12,6 @@ class Settings(BaseSettings):
app_hostname: str = "localhost:8000" app_hostname: str = "localhost:8000"
app_database_url: str = "sqlite:///./data/app.db" app_database_url: str = "sqlite:///./data/app.db"
location_database_url: str = "sqlite:///./data/locationRecorder.db"
poo_database_url: str = "sqlite:///./data/pooRecorder.db"
ticktick_client_id: str = "" ticktick_client_id: str = ""
ticktick_client_secret: str = "" ticktick_client_secret: str = ""
ticktick_token: str = "" ticktick_token: str = ""
@@ -77,21 +74,11 @@ class Settings(BaseSettings):
raw_path = database_url[len(prefix) :] raw_path = database_url[len(prefix) :]
return Path(raw_path) return Path(raw_path)
@computed_field
@property
def location_sqlite_path(self) -> Path | None:
return self._sqlite_path_from_url(self.location_database_url)
@computed_field @computed_field
@property @property
def app_sqlite_path(self) -> Path | None: def app_sqlite_path(self) -> Path | None:
return self._sqlite_path_from_url(self.app_database_url) return self._sqlite_path_from_url(self.app_database_url)
@computed_field
@property
def poo_sqlite_path(self) -> Path | None:
return self._sqlite_path_from_url(self.poo_database_url)
@computed_field @computed_field
@property @property
def auth_cookie_secure(self) -> bool: def auth_cookie_secure(self) -> bool:
+41 -8
View File
@@ -1,6 +1,8 @@
from collections.abc import Generator from collections.abc import Generator
from functools import lru_cache
from sqlalchemy import create_engine from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.config import get_settings from app.config import get_settings
@@ -10,18 +12,49 @@ class Base(DeclarativeBase):
pass pass
settings = get_settings() def _build_connect_args(database_url: str) -> dict[str, object]:
connect_args: dict[str, object] = {}
if database_url.startswith("sqlite"):
connect_args["check_same_thread"] = False
return connect_args
connect_args: dict[str, object] = {}
if settings.location_database_url.startswith("sqlite"):
connect_args["check_same_thread"] = False
engine = create_engine(settings.location_database_url, connect_args=connect_args) @lru_cache
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session) def _get_engine(database_url: str) -> Engine:
engine = create_engine(database_url, connect_args=_build_connect_args(database_url))
if database_url.startswith("sqlite"):
@event.listens_for(engine, "connect")
def _enable_sqlite_wal(dbapi_connection, _connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.close()
return engine
@lru_cache
def _get_session_local(database_url: str) -> sessionmaker:
engine = _get_engine(database_url)
return sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
def get_engine() -> Engine:
return _get_engine(get_settings().app_database_url)
def get_session_local() -> sessionmaker:
return _get_session_local(get_settings().app_database_url)
def reset_db_caches() -> None:
_get_session_local.cache_clear()
_get_engine.cache_clear()
def get_db_session() -> Generator[Session, None, None]: def get_db_session() -> Generator[Session, None, None]:
session = SessionLocal() session_local = get_session_local()
session = session_local()
try: try:
yield session yield session
finally: finally:
+3 -13
View File
@@ -3,30 +3,20 @@ from collections.abc import Generator
from fastapi import Depends, Request from fastapi import Depends, Request
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.auth_db import get_auth_db_session
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.db import get_db_session from app.db import get_db_session
from app.integrations.homeassistant import HomeAssistantClient from app.integrations.homeassistant import HomeAssistantClient
from app.integrations.ticktick import TickTickClient from app.integrations.ticktick import TickTickClient
from app.poo_db import get_poo_db_session
from app.services.auth import AuthenticatedSession, get_authenticated_session from app.services.auth import AuthenticatedSession, get_authenticated_session
from app.services.config_page import build_runtime_settings from app.services.config_page import build_runtime_settings
def get_auth_db() -> Generator[Session, None, None]:
yield from get_auth_db_session()
def get_app_settings(session: Session = Depends(get_auth_db)) -> Settings:
return build_runtime_settings(session, get_settings())
def get_db() -> Generator[Session, None, None]: def get_db() -> Generator[Session, None, None]:
yield from get_db_session() yield from get_db_session()
def get_poo_db() -> Generator[Session, None, None]: def get_app_settings(session: Session = Depends(get_db)) -> Settings:
yield from get_poo_db_session() return build_runtime_settings(session, get_settings())
def get_homeassistant_client(settings: Settings = Depends(get_app_settings)) -> HomeAssistantClient: def get_homeassistant_client(settings: Settings = Depends(get_app_settings)) -> HomeAssistantClient:
@@ -39,7 +29,7 @@ def get_ticktick_client(settings: Settings = Depends(get_app_settings)) -> TickT
def get_current_auth_session( def get_current_auth_session(
request: Request, request: Request,
session: Session = Depends(get_auth_db), session: Session = Depends(get_db),
settings: Settings = Depends(get_app_settings), settings: Settings = Depends(get_app_settings),
) -> AuthenticatedSession | None: ) -> AuthenticatedSession | None:
raw_token = request.cookies.get(settings.auth_session_cookie_name) raw_token = request.cookies.get(settings.auth_session_cookie_name)
+5 -32
View File
@@ -10,7 +10,7 @@ from sqlalchemy.orm import Session
from app import models # noqa: F401 from app import models # noqa: F401
from app.api.routes.auth import router as auth_router from app.api.routes.auth import router as auth_router
from app.api.routes import pages, status from app.api.routes import pages, status
import app.auth_db as auth_db from app.db import get_session_local
from app.api.routes.homeassistant import router as homeassistant_router from app.api.routes.homeassistant import router as homeassistant_router
from app.api.routes.location import router as location_router from app.api.routes.location import router as location_router
from app.api.routes.poo import router as poo_router from app.api.routes.poo import router as poo_router
@@ -21,12 +21,10 @@ from app.services.auth import AuthBootstrapError, initialize_auth_schema
from app.services.config_page import seed_missing_config_from_bootstrap, sync_app_hostname_from_bootstrap from app.services.config_page import seed_missing_config_from_bootstrap, sync_app_hostname_from_bootstrap
from app.services.public_ip import check_public_ipv4_and_notify from app.services.public_ip import check_public_ipv4_and_notify
from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db from scripts.app_db_adopt import AppDatabaseAdoptionError, validate_app_runtime_db
from scripts.location_db_adopt import LocationDatabaseAdoptionError, validate_location_runtime_db
from scripts.poo_db_adopt import PooDatabaseAdoptionError, validate_poo_runtime_db
def _run_scheduled_public_ip_check() -> None: def _run_scheduled_public_ip_check() -> None:
session_local = auth_db.get_auth_session_local() session_local = get_session_local()
session: Session = session_local() session: Session = session_local()
try: try:
check_public_ipv4_and_notify(session, bootstrap_settings=get_settings()) check_public_ipv4_and_notify(session, bootstrap_settings=get_settings())
@@ -35,7 +33,7 @@ def _run_scheduled_public_ip_check() -> None:
def ensure_auth_db_ready() -> None: def ensure_auth_db_ready() -> None:
session_local = auth_db.get_auth_session_local() session_local = get_session_local()
session: Session = session_local() session: Session = session_local()
try: try:
validate_app_runtime_db(get_settings().app_database_url) validate_app_runtime_db(get_settings().app_database_url)
@@ -50,41 +48,16 @@ def ensure_auth_db_ready() -> None:
session.close() session.close()
def ensure_location_db_ready() -> None:
settings = get_settings()
if settings.location_sqlite_path is None:
return
try:
validate_location_runtime_db(settings.location_database_url)
except LocationDatabaseAdoptionError as exc:
raise RuntimeError(str(exc)) from exc
def ensure_poo_db_ready() -> None:
settings = get_settings()
if settings.poo_sqlite_path is None:
return
try:
validate_poo_runtime_db(settings.poo_database_url)
except PooDatabaseAdoptionError as exc:
raise RuntimeError(str(exc)) from exc
def ensure_runtime_dirs() -> None: def ensure_runtime_dirs() -> None:
settings = get_settings() settings = get_settings()
for path in (settings.app_sqlite_path, settings.location_sqlite_path, settings.poo_sqlite_path): if settings.app_sqlite_path is not None:
if path is not None: settings.app_sqlite_path.parent.mkdir(parents=True, exist_ok=True)
path.parent.mkdir(parents=True, exist_ok=True)
@asynccontextmanager @asynccontextmanager
async def lifespan(_: FastAPI): async def lifespan(_: FastAPI):
ensure_runtime_dirs() ensure_runtime_dirs()
ensure_auth_db_ready() ensure_auth_db_ready()
ensure_location_db_ready()
ensure_poo_db_ready()
scheduler = BackgroundScheduler(timezone="UTC") scheduler = BackgroundScheduler(timezone="UTC")
scheduler.add_job( scheduler.add_job(
_run_scheduled_public_ip_check, _run_scheduled_public_ip_check,
+2
View File
@@ -3,6 +3,7 @@
from app.models.auth import AuthSession, AuthUser from app.models.auth import AuthSession, AuthUser
from app.models.config import AppConfigEntry from app.models.config import AppConfigEntry
from app.models.location import Location from app.models.location import Location
from app.models.poo import PooRecord
from app.models.public_ip import PublicIPHistory, PublicIPState from app.models.public_ip import PublicIPHistory, PublicIPState
__all__ = [ __all__ = [
@@ -10,6 +11,7 @@ __all__ = [
"AuthSession", "AuthSession",
"AuthUser", "AuthUser",
"Location", "Location",
"PooRecord",
"PublicIPHistory", "PublicIPHistory",
"PublicIPState", "PublicIPState",
] ]
+3 -3
View File
@@ -3,10 +3,10 @@ from datetime import datetime
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.auth_db import AuthBase from app.db import Base
class AuthUser(AuthBase): class AuthUser(Base):
__tablename__ = "auth_users" __tablename__ = "auth_users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
@@ -19,7 +19,7 @@ class AuthUser(AuthBase):
sessions: Mapped[list["AuthSession"]] = relationship(back_populates="user") sessions: Mapped[list["AuthSession"]] = relationship(back_populates="user")
class AuthSession(AuthBase): class AuthSession(Base):
__tablename__ = "auth_sessions" __tablename__ = "auth_sessions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
-4
View File
@@ -1,4 +0,0 @@
from app.db import Base
__all__ = ["Base"]
+2 -2
View File
@@ -3,10 +3,10 @@ from datetime import datetime
from sqlalchemy import DateTime, Integer, String from sqlalchemy import DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.auth_db import AuthBase from app.db import Base
class AppConfigEntry(AuthBase): class AppConfigEntry(Base):
__tablename__ = "app_config" __tablename__ = "app_config"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+2 -2
View File
@@ -1,10 +1,10 @@
from sqlalchemy import Float, String from sqlalchemy import Float, String
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.poo_db import PooBase from app.db import Base
class PooRecord(PooBase): class PooRecord(Base):
__tablename__ = "poo_records" __tablename__ = "poo_records"
timestamp: Mapped[str] = mapped_column(String, primary_key=True) timestamp: Mapped[str] = mapped_column(String, primary_key=True)
+3 -3
View File
@@ -3,10 +3,10 @@ from datetime import datetime
from sqlalchemy import DateTime, Integer, String from sqlalchemy import DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.auth_db import AuthBase from app.db import Base
class PublicIPState(AuthBase): class PublicIPState(Base):
__tablename__ = "public_ip_state" __tablename__ = "public_ip_state"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
@@ -20,7 +20,7 @@ class PublicIPState(AuthBase):
last_provider: Mapped[str | None] = mapped_column(String(64), nullable=True) last_provider: Mapped[str | None] = mapped_column(String(64), nullable=True)
class PublicIPHistory(AuthBase): class PublicIPHistory(Base):
__tablename__ = "public_ip_history" __tablename__ = "public_ip_history"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
-28
View File
@@ -1,28 +0,0 @@
from collections.abc import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.config import get_settings
class PooBase(DeclarativeBase):
pass
settings = get_settings()
connect_args: dict[str, object] = {}
if settings.poo_database_url.startswith("sqlite"):
connect_args["check_same_thread"] = False
poo_engine = create_engine(settings.poo_database_url, connect_args=connect_args)
PooSessionLocal = sessionmaker(bind=poo_engine, autoflush=False, autocommit=False, class_=Session)
def get_poo_db_session() -> Generator[Session, None, None]:
session = PooSessionLocal()
try:
yield session
finally:
session.close()
+4 -6
View File
@@ -7,7 +7,7 @@ from typing import Any
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.auth_db import reset_auth_db_caches from app.db import reset_db_caches
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.models.config import AppConfigEntry from app.models.config import AppConfigEntry
@@ -127,7 +127,7 @@ def sync_app_hostname_from_bootstrap(session: Session, bootstrap_settings: Setti
current_values["APP_HOSTNAME"] = bootstrap_hostname current_values["APP_HOSTNAME"] = bootstrap_hostname
_persist_config_values(session, current_values) _persist_config_values(session, current_values)
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def build_runtime_settings(session: Session, bootstrap_settings: Settings) -> Settings: def build_runtime_settings(session: Session, bootstrap_settings: Settings) -> Settings:
@@ -184,7 +184,7 @@ def save_config_updates(session: Session, form_data: dict[str, str], bootstrap_s
_validate_config_values(merged_values, bootstrap_settings) _validate_config_values(merged_values, bootstrap_settings)
_persist_config_values(session, merged_values) _persist_config_values(session, merged_values)
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def save_config_value( def save_config_value(
@@ -199,7 +199,7 @@ def save_config_value(
_validate_config_values(current_values, bootstrap_settings) _validate_config_values(current_values, bootstrap_settings)
_persist_config_values(session, current_values) _persist_config_values(session, current_values)
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def is_ticktick_oauth_ready(settings: Settings) -> bool: def is_ticktick_oauth_ready(settings: Settings) -> bool:
@@ -260,8 +260,6 @@ def _settings_payload(settings: Settings) -> dict[str, Any]:
"app_debug": settings.app_debug, "app_debug": settings.app_debug,
"app_hostname": settings.app_hostname, "app_hostname": settings.app_hostname,
"app_database_url": settings.app_database_url, "app_database_url": settings.app_database_url,
"location_database_url": settings.location_database_url,
"poo_database_url": settings.poo_database_url,
"ticktick_client_id": settings.ticktick_client_id, "ticktick_client_id": settings.ticktick_client_id,
"ticktick_client_secret": settings.ticktick_client_secret, "ticktick_client_secret": settings.ticktick_client_secret,
"ticktick_token": settings.ticktick_token, "ticktick_token": settings.ticktick_token,
-19
View File
@@ -25,22 +25,3 @@ services:
- ./data:/app/data - ./data:/app/data
- ./.env:/app/.env:ro - ./.env:/app/.env:ro
grafana:
image: grafana/grafana:latest
container_name: home-automation-grafana
depends_on:
- app
restart: unless-stopped
ports:
- "10.238.75.70:8882:3000"
environment:
GF_PLUGINS_PREINSTALL: frser-sqlite-datasource
volumes:
- ./data:/data/home-automation:ro
- ./grafana/provisioning:/etc/grafana/provisioning:ro
- ./grafana/dashboards:/var/lib/grafana/dashboards:ro
- homeautomation_grafana_storage:/var/lib/grafana
volumes:
homeautomation_grafana_storage:
name: homeautomation_grafana_storage
+3 -13
View File
@@ -23,10 +23,8 @@
- 基础路由注册 - 基础路由注册
- `config.py` - `config.py`
- 环境变量驱动的 settings - 环境变量驱动的 settings
- `auth_db.py`
- app 级共享 auth 数据库
- `db.py` - `db.py`
- SQLAlchemy engine / session / Base - 统一数据层:一个 `Base`、一个绑定 `app_database_url` 的 cached engineSQLite WAL)、`get_engine` / `get_session_local` / `reset_db_caches` / `get_db_session`
- `dependencies.py` - `dependencies.py`
- 通用依赖注入 - 通用依赖注入
- `api/` - `api/`
@@ -37,7 +35,7 @@
- 当前已迁入 `POST /poo/record``GET /poo/latest` - 当前已迁入 `POST /poo/record``GET /poo/latest`
- `models/` - `models/`
- SQLAlchemy models - SQLAlchemy models
- 当前 `auth``location``poo` 使用各自独立的数据库 base - 所有模型(auth / config / public_ip / location / poo)共用同一个 `Base`,均落在单一 `app.db`
- `schemas/` - `schemas/`
- Pydantic schemas - Pydantic schemas
- `services/` - `services/`
@@ -53,17 +51,9 @@
- `static/` - `static/`
- 极简静态资源 - 极简静态资源
### `alembic_location/`
Location DB 的 migration 基础设施。
### `alembic_app/` ### `alembic_app/`
App DB 的 migration 基础设施 App DB 的唯一 Alembic migration 链,同时管理 `location` / `poo_records` 表。M1 将三个独立 DB 合并进 `app.db` 后,`alembic_location/``alembic_poo/` 已退役,全部由此链统一管理
### `alembic_poo/`
Poo DB 的 migration 基础设施。
### `tests/` ### `tests/`
+2 -2
View File
@@ -34,7 +34,7 @@
| 里程碑 | 主题 | 一句话 | | 里程碑 | 主题 | 一句话 |
| --- | --- | --- | | --- | --- | --- |
| **M1** | 单库化地基 | 把三库合并成单一 `app.db`,清理散落数据层,删掉 Grafana | | **M1** | 单库化地基 | 把三库合并成单一 `app.db`,清理散落数据层,删掉 Grafana |
| **M2** | 前端 v2 | React SPA 取代 Jinja,承载 config + 可视化 + 记录增删改 | | **M2** | 前端 v2 | React SPA 取代 Jinja,承载 config + 可视化 + 记录增删改 |
| **M3** | 开放与移动端(远期试水) | token 鉴权 + React Native 移动端 | | **M3** | 开放与移动端(远期试水) | token 鉴权 + React Native 移动端 |
@@ -42,7 +42,7 @@
--- ---
## M1 — 单库化地基 ## M1 — 单库化地基(✅ 已完成)
### 目标 ### 目标
-288
View File
@@ -1,288 +0,0 @@
{
"apiVersion": "dashboard.grafana.app/v2",
"kind": "Dashboard",
"metadata": {
"name": "adzr6rv",
"namespace": "default",
"uid": "c5fc57e5-7fb5-4104-9861-023710ada568",
"resourceVersion": "1776634346371016",
"generation": 19,
"creationTimestamp": "2026-04-18T19:05:57Z",
"labels": {
"grafana.app/deprecatedInternalID": "945374452785152"
},
"annotations": {
"grafana.app/createdBy": "user:ffjhknvgkvhtsc",
"grafana.app/folder": "",
"grafana.app/saved-from-ui": "Grafana v13.0.1 (a100054f)",
"grafana.app/updatedBy": "user:ffjhknvgkvhtsc",
"grafana.app/updatedTimestamp": "2026-04-19T21:32:26Z"
}
},
"spec": {
"annotations": [
{
"kind": "AnnotationQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "grafana",
"version": "v0",
"datasource": {
"name": "-- Grafana --"
},
"spec": {}
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"builtIn": true
}
}
],
"cursorSync": "Off",
"editable": true,
"elements": {
"panel-1": {
"kind": "Panel",
"spec": {
"id": 1,
"title": "轨迹",
"description": "",
"links": [],
"data": {
"kind": "QueryGroup",
"spec": {
"queries": [
{
"kind": "PanelQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "frser-sqlite-datasource",
"version": "v0",
"datasource": {
"name": "ffjhr941d5iwwf"
},
"spec": {
"queryText": "SELECT\n datetime AS time,\n latitude,\n longitude,\n altitude\nFROM location\nWHERE person = 'Jiangxue'\n AND datetime >= '2021-04-19T21:29:57.036Z'\n AND datetime <= '2026-04-19T21:29:57.036Z'\n AND latitude != 0\n AND longitude != 0\nORDER BY datetime;\n",
"queryType": "table",
"rawQueryText": "SELECT\n datetime AS time,\n latitude,\n longitude,\n altitude\nFROM location\nWHERE person = '$person'\n AND datetime >= '${__from:date:iso}'\n AND datetime <= '${__to:date:iso}'\n AND latitude != 0\n AND longitude != 0\nORDER BY datetime;\n",
"timeColumns": [
"time",
"ts"
]
}
},
"refId": "A",
"hidden": false
}
}
],
"transformations": [],
"queryOptions": {}
}
},
"vizConfig": {
"kind": "VizConfig",
"group": "geomap",
"version": "13.0.1",
"spec": {
"options": {
"basemap": {
"config": {
"server": "streets"
},
"name": "Layer 0",
"noRepeat": false,
"type": "default"
},
"controls": {
"mouseWheelZoom": true,
"showAttribution": true,
"showDebug": false,
"showMeasure": false,
"showScale": false,
"showZoom": true
},
"layers": [
{
"config": {
"showLegend": false,
"style": {
"color": {
"fixed": "blue"
},
"opacity": 0.7,
"rotation": {
"fixed": 0,
"max": 360,
"min": -360,
"mode": "mod"
},
"size": {
"fixed": 3,
"max": 15,
"min": 2
},
"symbol": {
"fixed": "img/icons/marker/circle.svg",
"mode": "fixed"
},
"symbolAlign": {
"horizontal": "center",
"vertical": "center"
},
"textConfig": {
"fontSize": 12,
"offsetX": 0,
"offsetY": 0,
"textAlign": "center",
"textBaseline": "middle"
}
}
},
"layer-tooltip": true,
"name": "path",
"tooltip": true,
"type": "markers"
}
],
"tooltip": {
"mode": "details"
},
"view": {
"allLayers": true,
"dashboardVariable": false,
"id": "fit",
"lat": 0,
"lon": 0,
"noRepeat": false,
"shared": false,
"zoom": 15
}
},
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{
"value": 0,
"color": "green"
}
]
},
"color": {
"mode": "thresholds"
},
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
}
}
},
"overrides": []
}
}
}
}
}
},
"layout": {
"kind": "GridLayout",
"spec": {
"items": [
{
"kind": "GridLayoutItem",
"spec": {
"x": 0,
"y": 0,
"width": 24,
"height": 18,
"element": {
"kind": "ElementReference",
"name": "panel-1"
}
}
}
]
}
},
"links": [],
"liveNow": false,
"preload": false,
"tags": [],
"timeSettings": {
"timezone": "browser",
"from": "now-5y",
"to": "now",
"autoRefresh": "",
"autoRefreshIntervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"hideTimepicker": false,
"fiscalYearStartMonth": 0
},
"title": "轨迹",
"variables": [
{
"kind": "QueryVariable",
"spec": {
"name": "person",
"current": {
"text": "Jiangxue",
"value": "Jiangxue"
},
"label": "person",
"hide": "dontHide",
"refresh": "onDashboardLoad",
"skipUrlSync": false,
"description": "",
"query": {
"kind": "DataQuery",
"group": "frser-sqlite-datasource",
"version": "v0",
"datasource": {
"name": "ffjhr941d5iwwf"
},
"spec": {
"__legacyStringValue": "SELECT DISTINCT person\nFROM location\nORDER BY person;\n"
}
},
"regex": "",
"regexApplyTo": "value",
"sort": "disabled",
"definition": "SELECT DISTINCT person\nFROM location\nORDER BY person;\n",
"options": [],
"multi": false,
"includeAll": false,
"allowCustomValue": true
}
}
],
"preferences": {
"layout": {
"kind": "AutoGridLayout",
"spec": {
"maxColumnCount": 3,
"columnWidthMode": "standard",
"rowHeightMode": "standard",
"items": []
}
}
}
}
}
-231
View File
@@ -1,231 +0,0 @@
{
"apiVersion": "dashboard.grafana.app/v2",
"kind": "Dashboard",
"metadata": {
"name": "adl5sjt",
"namespace": "default",
"uid": "d4c72406-9fc5-4b85-844b-be1250f1fa8b",
"resourceVersion": "1776606363367013",
"generation": 6,
"creationTimestamp": "2026-04-18T20:07:34Z",
"labels": {
"grafana.app/deprecatedInternalID": "960882027798528"
},
"annotations": {
"grafana.app/createdBy": "user:ffjhknvgkvhtsc",
"grafana.app/folder": "",
"grafana.app/saved-from-ui": "Grafana v13.0.1 (a100054f)",
"grafana.app/updatedBy": "user:ffjhknvgkvhtsc",
"grafana.app/updatedTimestamp": "2026-04-19T13:46:03Z"
}
},
"spec": {
"annotations": [
{
"kind": "AnnotationQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "grafana",
"version": "v0",
"datasource": {
"name": "-- Grafana --"
},
"spec": {}
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"builtIn": true
}
}
],
"cursorSync": "Off",
"editable": true,
"elements": {
"panel-1": {
"kind": "Panel",
"spec": {
"id": 1,
"title": "Mika Poo",
"description": "Mika's poo",
"links": [],
"data": {
"kind": "QueryGroup",
"spec": {
"queries": [
{
"kind": "PanelQuery",
"spec": {
"query": {
"kind": "DataQuery",
"group": "frser-sqlite-datasource",
"version": "v0",
"datasource": {
"name": "ffjhkuu4hc3y8e"
},
"spec": {
"queryText": "SELECT\n latitude,\n longitude,\n timestamp\nFROM poo_records\nWHERE timestamp >= '${__from:date:iso}'\n AND timestamp <= '${__to:date:iso}'\n AND latitude != 0\n AND longitude != 0\nORDER BY timestamp;\n",
"queryType": "table",
"rawQueryText": "SELECT\n latitude,\n longitude,\n timestamp\nFROM poo_records\nWHERE timestamp >= '${__from:date:iso}'\n AND timestamp <= '${__to:date:iso}'\n AND latitude != 0\n AND longitude != 0\nORDER BY timestamp;\n",
"timeColumns": [
"time",
"ts"
]
}
},
"refId": "A",
"hidden": false
}
}
],
"transformations": [],
"queryOptions": {}
}
},
"vizConfig": {
"kind": "VizConfig",
"group": "geomap",
"version": "13.0.1",
"spec": {
"options": {
"basemap": {
"config": {},
"name": "Layer 0",
"noRepeat": false,
"type": "default"
},
"controls": {
"mouseWheelZoom": true,
"showAttribution": true,
"showDebug": false,
"showMeasure": false,
"showScale": false,
"showZoom": true
},
"layers": [
{
"config": {
"blur": 15,
"radius": 5,
"weight": {
"fixed": 1,
"max": 1,
"min": 0
}
},
"filterData": {
"id": "byRefId",
"options": "A"
},
"location": {
"mode": "auto"
},
"name": "Poo",
"tooltip": true,
"type": "heatmap"
}
],
"tooltip": {
"mode": "details"
},
"view": {
"allLayers": true,
"dashboardVariable": false,
"id": "zero",
"lat": 0,
"lon": 0,
"noRepeat": false,
"zoom": 1
}
},
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{
"value": 0,
"color": "green"
},
{
"value": 80,
"color": "red"
}
]
},
"color": {
"mode": "thresholds"
},
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
}
}
},
"overrides": []
}
}
}
}
}
},
"layout": {
"kind": "GridLayout",
"spec": {
"items": [
{
"kind": "GridLayoutItem",
"spec": {
"x": 0,
"y": 0,
"width": 24,
"height": 19,
"element": {
"kind": "ElementReference",
"name": "panel-1"
}
}
}
]
}
},
"links": [],
"liveNow": false,
"preload": false,
"tags": [],
"timeSettings": {
"timezone": "browser",
"from": "now-5y",
"to": "now",
"autoRefresh": "",
"autoRefreshIntervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"hideTimepicker": false,
"fiscalYearStartMonth": 0
},
"title": "Mika Poo",
"variables": [],
"preferences": {
"layout": {
"kind": "GridLayout",
"spec": {
"items": []
}
}
}
}
}
@@ -1,13 +0,0 @@
apiVersion: 1
providers:
- name: home-automation-dashboards
orgId: 1
folder: ""
type: file
disableDeletion: false
allowUiUpdates: false
updateIntervalSeconds: 30
options:
path: /var/lib/grafana/dashboards
foldersFromFilesStructure: false
@@ -1,11 +0,0 @@
apiVersion: 1
datasources:
- name: locationrecorder
uid: ffjhr941d5iwwf
type: frser-sqlite-datasource
access: proxy
isDefault: false
editable: false
jsonData:
path: /data/home-automation/locationRecorder.db
@@ -1,11 +0,0 @@
apiVersion: 1
datasources:
- name: poorecorder
uid: ffjhkuu4hc3y8e
type: frser-sqlite-datasource
access: proxy
isDefault: false
editable: false
jsonData:
path: /data/home-automation/pooRecorder.db
+72
View File
@@ -249,6 +249,27 @@
} }
} }
}, },
"/config/smtp/test": {
"post": {
"tags": [
"pages"
],
"summary": "Smtp Test Submit",
"operationId": "smtp_test_submit_config_smtp_test_post",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/html": {
"schema": {
"type": "string"
}
}
}
}
}
}
},
"/homeassistant/publish": { "/homeassistant/publish": {
"post": { "post": {
"tags": [ "tags": [
@@ -325,6 +346,27 @@
} }
} }
}, },
"/public-ip/check": {
"get": {
"tags": [
"public-ip"
],
"summary": "Run Public Ip Check",
"operationId": "run_public_ip_check_public_ip_check_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/PublicIPCheckResponse"
}
}
}
}
}
}
},
"/ticktick/auth/start": { "/ticktick/auth/start": {
"get": { "get": {
"tags": [ "tags": [
@@ -443,6 +485,36 @@
"type": "object", "type": "object",
"title": "HTTPValidationError" "title": "HTTPValidationError"
}, },
"PublicIPCheckResponse": {
"properties": {
"status": {
"type": "string",
"enum": [
"first_seen",
"unchanged",
"changed",
"error"
],
"title": "Status"
},
"checked_at": {
"type": "string",
"format": "date-time",
"title": "Checked At"
},
"changed": {
"type": "boolean",
"title": "Changed"
}
},
"type": "object",
"required": [
"status",
"checked_at",
"changed"
],
"title": "PublicIPCheckResponse"
},
"StatusResponse": { "StatusResponse": {
"properties": { "properties": {
"status": { "status": {
+49
View File
@@ -155,6 +155,19 @@ paths:
text/html: text/html:
schema: schema:
type: string type: string
/config/smtp/test:
post:
tags:
- pages
summary: Smtp Test Submit
operationId: smtp_test_submit_config_smtp_test_post
responses:
'200':
description: Successful Response
content:
text/html:
schema:
type: string
/homeassistant/publish: /homeassistant/publish:
post: post:
tags: tags:
@@ -203,6 +216,19 @@ paths:
content: content:
application/json: application/json:
schema: {} schema: {}
/public-ip/check:
get:
tags:
- public-ip
summary: Run Public Ip Check
operationId: run_public_ip_check_public_ip_check_get
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/PublicIPCheckResponse'
/ticktick/auth/start: /ticktick/auth/start:
get: get:
tags: tags:
@@ -285,6 +311,29 @@ components:
title: Detail title: Detail
type: object type: object
title: HTTPValidationError title: HTTPValidationError
PublicIPCheckResponse:
properties:
status:
type: string
enum:
- first_seen
- unchanged
- changed
- error
title: Status
checked_at:
type: string
format: date-time
title: Checked At
changed:
type: boolean
title: Changed
type: object
required:
- status
- checked_at
- changed
title: PublicIPCheckResponse
StatusResponse: StatusResponse:
properties: properties:
status: status:
+1 -1
View File
@@ -15,7 +15,7 @@ if str(PROJECT_ROOT) not in sys.path:
from app.config import get_settings from app.config import get_settings
APP_BASELINE_REVISION = "20260429_05_public_ip_monitor" APP_BASELINE_REVISION = "20260611_06_merge_location_poo_tables"
class AppDatabaseAdoptionError(RuntimeError): class AppDatabaseAdoptionError(RuntimeError):
-205
View File
@@ -1,205 +0,0 @@
from __future__ import annotations
import sqlite3
import sys
from pathlib import Path
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.util.exc import CommandError
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from app.config import get_settings
LOCATION_BASELINE_REVISION = "20260419_01_location_baseline"
EXPECTED_USER_VERSION = 2
EXPECTED_LOCATION_TABLE_INFO = [
(0, "person", "TEXT", 1, None, 1),
(1, "datetime", "TEXT", 1, None, 2),
(2, "latitude", "REAL", 1, None, 0),
(3, "longitude", "REAL", 1, None, 0),
(4, "altitude", "REAL", 0, None, 0),
]
class LocationDatabaseAdoptionError(RuntimeError):
"""Raised when a legacy location database does not match the expected baseline."""
def _database_path_from_url(database_url: str) -> Path:
prefix = "sqlite:///"
if not database_url.startswith(prefix):
raise LocationDatabaseAdoptionError(
f"Only sqlite URLs are supported for location DB adoption, got: {database_url}"
)
return Path(database_url[len(prefix) :])
def _make_alembic_config(database_url: str) -> Config:
config = Config("alembic_location.ini")
config.set_main_option("sqlalchemy.url", database_url)
return config
def _expected_head_revision(alembic_config: Config) -> str:
script = ScriptDirectory.from_config(alembic_config)
heads = script.get_heads()
if len(heads) != 1:
raise LocationDatabaseAdoptionError(
f"Expected exactly one Alembic head for location DB, got {len(heads)}"
)
return heads[0]
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
script = ScriptDirectory.from_config(alembic_config)
try:
return script.get_revision(revision) is not None
except CommandError:
return False
def _location_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
row = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'location'"
).fetchone()
return row is not None
finally:
conn.close()
def _alembic_version_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
row = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'alembic_version'"
).fetchone()
return row is not None
finally:
conn.close()
def _fetch_alembic_revision(database_path: Path) -> str:
conn = sqlite3.connect(database_path)
try:
row = conn.execute("SELECT version_num FROM alembic_version").fetchone()
if row is None:
raise LocationDatabaseAdoptionError(
"Alembic version table exists but contains no revision"
)
return row[0]
finally:
conn.close()
def _fetch_location_table_info(database_path: Path) -> list[tuple]:
conn = sqlite3.connect(database_path)
try:
return list(conn.execute("PRAGMA table_info(location)"))
finally:
conn.close()
def _fetch_user_version(database_path: Path) -> int:
conn = sqlite3.connect(database_path)
try:
return conn.execute("PRAGMA user_version").fetchone()[0]
finally:
conn.close()
def validate_legacy_location_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
if not database_path.exists():
raise LocationDatabaseAdoptionError(f"Location DB file does not exist: {database_path}")
if not _location_table_exists(database_path):
raise LocationDatabaseAdoptionError("Expected table 'location' was not found in the DB")
table_info = _fetch_location_table_info(database_path)
if table_info != EXPECTED_LOCATION_TABLE_INFO:
raise LocationDatabaseAdoptionError(
"Location table schema does not match the expected baseline schema"
)
user_version = _fetch_user_version(database_path)
if user_version != EXPECTED_USER_VERSION:
raise LocationDatabaseAdoptionError(
f"Expected PRAGMA user_version = {EXPECTED_USER_VERSION}, got {user_version}"
)
def validate_location_runtime_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if not database_path.exists():
raise LocationDatabaseAdoptionError(
"Location DB file was not found. Run 'python scripts/location_db_adopt.py' "
"first to initialize or adopt the location DB before starting the app."
)
if not _alembic_version_table_exists(database_path):
raise LocationDatabaseAdoptionError(
"Location DB exists but is not yet Alembic-managed. Run "
"'python scripts/location_db_adopt.py' first to adopt the legacy DB "
"before starting the app."
)
current_revision = _fetch_alembic_revision(database_path)
if current_revision != expected_revision:
raise LocationDatabaseAdoptionError(
"Location DB revision mismatch. Refusing to start the app: "
f"expected {expected_revision}, got {current_revision}"
)
def adopt_or_initialize_location_db(database_url: str) -> str:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if database_path.exists():
if _alembic_version_table_exists(database_path):
current_revision = _fetch_alembic_revision(database_path)
if current_revision == expected_revision:
return "already_managed"
if not _is_known_revision(alembic_config, current_revision):
raise LocationDatabaseAdoptionError(
"Location DB is already Alembic-managed but revision does not match "
f"a known migration revision: got {current_revision}"
)
command.upgrade(alembic_config, "head")
return "upgraded"
validate_legacy_location_db(database_url)
command.stamp(alembic_config, LOCATION_BASELINE_REVISION)
if LOCATION_BASELINE_REVISION != expected_revision:
command.upgrade(alembic_config, "head")
return "upgraded"
return "adopted"
database_path.parent.mkdir(parents=True, exist_ok=True)
command.upgrade(alembic_config, "head")
return "initialized"
def main() -> None:
settings = get_settings()
result = adopt_or_initialize_location_db(settings.location_database_url)
if result == "initialized":
print("Initialized a new location DB via Alembic upgrade head.")
elif result == "already_managed":
print("Location DB is already Alembic-managed at the expected baseline revision.")
else:
print("Validated legacy location DB and stamped Alembic baseline successfully.")
if __name__ == "__main__":
main()
+267
View File
@@ -0,0 +1,267 @@
"""One-time idempotent data migration: copy rows from legacy locationRecorder.db /
pooRecorder.db into the unified app DB's location / poo_records tables.
NOT part of the Alembic chain. Run manually, once, during production cut-over:
python -m scripts.migrate_legacy_data \\
--app-db sqlite:///./data/app.db \\
--location-db sqlite:///./data/locationRecorder.db \\
--poo-db sqlite:///./data/pooRecorder.db
Or rely on environment variables:
APP_DATABASE_URL, LOCATION_DATABASE_URL, POO_DATABASE_URL
Add --dry-run to preview row counts without writing anything.
Return value of migrate_legacy_data(): a dict shaped like:
{
"location": {"source": N, "copied": C, "skipped": bool, "final": F},
"poo_records": {"source": N, "copied": C, "skipped": bool, "final": F},
}
where:
source - rows in the legacy DB (0 when skipped)
copied - rows inserted by this run (0 when dry_run or skipped)
skipped - True when the legacy file was absent
final - rows present in the app table after the run (0 when dry_run)
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _sqlite_path_from_url(url: str) -> Path:
"""Extract the filesystem path from a sqlite:///... URL.
If *url* does not start with 'sqlite:///', it is treated as a plain path.
"""
prefix = "sqlite:///"
if url.startswith(prefix):
return Path(url[len(prefix):])
return Path(url)
def _reconcile(
conn: sqlite3.Connection,
table: str,
columns: list[str],
source_count: int,
) -> int:
"""Verify every legacy source row is present in the main (app) table.
Matches on ALL columns using SQLite's NULL-safe IS operator so that nullable
columns (e.g. altitude) compare correctly. A row that was silently skipped
by INSERT OR IGNORE due to a value difference will NOT satisfy this predicate
even if its primary key is present in the target.
Returns the count of source rows whose full-row data is present in main.
Raises RuntimeError if any rows are missing or differ in value.
"""
join_cond = " AND ".join(f"m.{col} IS l.{col}" for col in columns)
sql = (
f"SELECT COUNT(*) FROM legacy.{table} l "
f"WHERE EXISTS (SELECT 1 FROM main.{table} m WHERE {join_cond})"
)
(present,) = conn.execute(sql).fetchone()
if present < source_count:
missing = source_count - present
raise RuntimeError(
f"Reconciliation failed for table '{table}': "
f"{missing} of {source_count} source rows are missing or differing in the app DB."
)
return present
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def migrate_legacy_data(
app_url: str,
location_url: str | None,
poo_url: str | None,
*,
dry_run: bool = False,
) -> dict:
"""Copy rows from legacy DBs into the app DB's location / poo_records tables.
Parameters
----------
app_url: sqlite:///... URL (or plain path) for the unified app DB.
location_url: sqlite:///... URL (or plain path) for the legacy location DB,
or None to skip that table.
poo_url: sqlite:///... URL (or plain path) for the legacy poo DB,
or None to skip that table.
dry_run: When True, gather counts only; perform no writes.
Returns a dict with per-table stats (see module docstring).
Raises RuntimeError on reconciliation failure (non-zero rows missing).
"""
app_path = _sqlite_path_from_url(app_url)
results: dict[str, dict] = {}
# --- location table ---
results["location"] = _migrate_table(
app_path=app_path,
legacy_url=location_url,
table="location",
columns=["person", "datetime", "latitude", "longitude", "altitude"],
dry_run=dry_run,
)
# --- poo_records table ---
results["poo_records"] = _migrate_table(
app_path=app_path,
legacy_url=poo_url,
table="poo_records",
columns=["timestamp", "status", "latitude", "longitude"],
dry_run=dry_run,
)
return results
def _migrate_table(
*,
app_path: Path,
legacy_url: str | None,
table: str,
columns: list[str],
dry_run: bool,
) -> dict:
"""Migrate a single table from a legacy DB into the app DB.
Returns a per-table stats dict.
"""
# If the caller passed None → treat as absent
if legacy_url is None:
return {"source": 0, "copied": 0, "skipped": True, "final": 0}
legacy_path = _sqlite_path_from_url(legacy_url)
# If the file doesn't exist → safe no-op
if not legacy_path.exists():
return {"source": 0, "copied": 0, "skipped": True, "final": 0}
col_list = ", ".join(columns)
conn = sqlite3.connect(app_path)
try:
conn.execute("ATTACH DATABASE ? AS legacy", (str(legacy_path),))
# Count source rows
(source_count,) = conn.execute(f"SELECT COUNT(*) FROM legacy.{table}").fetchone()
if dry_run:
conn.execute("DETACH DATABASE legacy")
return {
"source": source_count,
"copied": 0,
"skipped": False,
"final": 0,
}
# Count rows already in the target before this run
(before_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone()
# Idempotent insert — PK conflict → skip
conn.execute(
f"INSERT OR IGNORE INTO main.{table} ({col_list}) "
f"SELECT {col_list} FROM legacy.{table}"
)
conn.commit()
# Count rows now
(after_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone()
copied = after_count - before_count
# Reconciliation: every source row must be present with matching values
_reconcile(conn, table, columns, source_count)
conn.execute("DETACH DATABASE legacy")
finally:
conn.close()
return {
"source": source_count,
"copied": copied,
"skipped": False,
"final": after_count,
}
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Migrate legacy location/poo data into the unified app DB."
)
parser.add_argument(
"--app-db",
default=os.environ.get("APP_DATABASE_URL"),
help="sqlite:///... URL or path for the app DB "
"(default: $APP_DATABASE_URL)",
)
parser.add_argument(
"--location-db",
default=os.environ.get("LOCATION_DATABASE_URL"),
help="sqlite:///... URL or path for the legacy location DB "
"(default: $LOCATION_DATABASE_URL). Omit to skip location table.",
)
parser.add_argument(
"--poo-db",
default=os.environ.get("POO_DATABASE_URL"),
help="sqlite:///... URL or path for the legacy poo DB "
"(default: $POO_DATABASE_URL). Omit to skip poo_records table.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Report counts only; do not write any rows.",
)
args = parser.parse_args()
if not args.app_db:
parser.error(
"App DB not specified. Pass --app-db or set APP_DATABASE_URL."
)
try:
results = migrate_legacy_data(
app_url=args.app_db,
location_url=args.location_db,
poo_url=args.poo_db,
dry_run=args.dry_run,
)
except RuntimeError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
prefix = "[DRY RUN] " if args.dry_run else ""
print(f"{prefix}Migration results:")
for table_name, stats in results.items():
if stats["skipped"]:
print(f" {table_name}: SKIPPED (legacy file absent or not provided)")
else:
print(
f" {table_name}: source={stats['source']}, "
f"copied={stats['copied']}, final={stats['final']}"
)
if __name__ == "__main__":
main()
-200
View File
@@ -1,200 +0,0 @@
from __future__ import annotations
import sqlite3
import sys
from pathlib import Path
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.util.exc import CommandError
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from app.config import get_settings
POO_BASELINE_REVISION = "20260420_01_poo_baseline"
EXPECTED_USER_VERSION = 1
EXPECTED_POO_TABLE_INFO = [
(0, "timestamp", "TEXT", 1, None, 1),
(1, "status", "TEXT", 1, None, 0),
(2, "latitude", "REAL", 1, None, 0),
(3, "longitude", "REAL", 1, None, 0),
]
class PooDatabaseAdoptionError(RuntimeError):
"""Raised when a legacy poo database does not match the expected baseline."""
def _database_path_from_url(database_url: str) -> Path:
prefix = "sqlite:///"
if not database_url.startswith(prefix):
raise PooDatabaseAdoptionError(
f"Only sqlite URLs are supported for poo DB adoption, got: {database_url}"
)
return Path(database_url[len(prefix) :])
def _make_alembic_config(database_url: str) -> Config:
config = Config("alembic_poo.ini")
config.set_main_option("sqlalchemy.url", database_url)
return config
def _expected_head_revision(alembic_config: Config) -> str:
script = ScriptDirectory.from_config(alembic_config)
heads = script.get_heads()
if len(heads) != 1:
raise PooDatabaseAdoptionError(
f"Expected exactly one Alembic head for poo DB, got {len(heads)}"
)
return heads[0]
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
script = ScriptDirectory.from_config(alembic_config)
try:
return script.get_revision(revision) is not None
except CommandError:
return False
def _poo_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
row = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'poo_records'"
).fetchone()
return row is not None
finally:
conn.close()
def _alembic_version_table_exists(database_path: Path) -> bool:
conn = sqlite3.connect(database_path)
try:
row = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'alembic_version'"
).fetchone()
return row is not None
finally:
conn.close()
def _fetch_alembic_revision(database_path: Path) -> str:
conn = sqlite3.connect(database_path)
try:
row = conn.execute("SELECT version_num FROM alembic_version").fetchone()
if row is None:
raise PooDatabaseAdoptionError("Alembic version table exists but contains no revision")
return row[0]
finally:
conn.close()
def _fetch_poo_table_info(database_path: Path) -> list[tuple]:
conn = sqlite3.connect(database_path)
try:
return list(conn.execute("PRAGMA table_info(poo_records)"))
finally:
conn.close()
def _fetch_user_version(database_path: Path) -> int:
conn = sqlite3.connect(database_path)
try:
return conn.execute("PRAGMA user_version").fetchone()[0]
finally:
conn.close()
def validate_legacy_poo_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
if not database_path.exists():
raise PooDatabaseAdoptionError(f"Poo DB file does not exist: {database_path}")
if not _poo_table_exists(database_path):
raise PooDatabaseAdoptionError("Expected table 'poo_records' was not found in the DB")
table_info = _fetch_poo_table_info(database_path)
if table_info != EXPECTED_POO_TABLE_INFO:
raise PooDatabaseAdoptionError("Poo table schema does not match the expected baseline")
user_version = _fetch_user_version(database_path)
if user_version != EXPECTED_USER_VERSION:
raise PooDatabaseAdoptionError(
f"Expected PRAGMA user_version = {EXPECTED_USER_VERSION}, got {user_version}"
)
def validate_poo_runtime_db(database_url: str) -> None:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if not database_path.exists():
raise PooDatabaseAdoptionError(
"Poo DB file was not found. Run 'python scripts/poo_db_adopt.py' first to "
"initialize or adopt the poo DB before starting the app."
)
if not _alembic_version_table_exists(database_path):
raise PooDatabaseAdoptionError(
"Poo DB exists but is not yet Alembic-managed. Run "
"'python scripts/poo_db_adopt.py' first to adopt the legacy DB "
"before starting the app."
)
current_revision = _fetch_alembic_revision(database_path)
if current_revision != expected_revision:
raise PooDatabaseAdoptionError(
"Poo DB revision mismatch. Refusing to start the app: "
f"expected {expected_revision}, got {current_revision}"
)
def adopt_or_initialize_poo_db(database_url: str) -> str:
database_path = _database_path_from_url(database_url)
alembic_config = _make_alembic_config(database_url)
expected_revision = _expected_head_revision(alembic_config)
if database_path.exists():
if _alembic_version_table_exists(database_path):
current_revision = _fetch_alembic_revision(database_path)
if current_revision == expected_revision:
return "already_managed"
if not _is_known_revision(alembic_config, current_revision):
raise PooDatabaseAdoptionError(
"Poo DB is already Alembic-managed but revision does not match "
f"a known migration revision: got {current_revision}"
)
command.upgrade(alembic_config, "head")
return "upgraded"
validate_legacy_poo_db(database_url)
command.stamp(alembic_config, POO_BASELINE_REVISION)
if POO_BASELINE_REVISION != expected_revision:
command.upgrade(alembic_config, "head")
return "upgraded"
return "adopted"
database_path.parent.mkdir(parents=True, exist_ok=True)
command.upgrade(alembic_config, "head")
return "initialized"
def main() -> None:
settings = get_settings()
result = adopt_or_initialize_poo_db(settings.poo_database_url)
if result == "initialized":
print("Initialized a new poo DB via Alembic upgrade head.")
elif result == "already_managed":
print("Poo DB is already Alembic-managed at the expected baseline revision.")
else:
print("Validated legacy poo DB and stamped Alembic baseline successfully.")
if __name__ == "__main__":
main()
-4
View File
@@ -2,16 +2,12 @@ from __future__ import annotations
from app.config import get_settings from app.config import get_settings
from scripts.app_db_adopt import adopt_or_initialize_app_db from scripts.app_db_adopt import adopt_or_initialize_app_db
from scripts.location_db_adopt import adopt_or_initialize_location_db
from scripts.poo_db_adopt import adopt_or_initialize_poo_db
def run_all_migrations() -> dict[str, str]: def run_all_migrations() -> dict[str, str]:
settings = get_settings() settings = get_settings()
return { return {
"app": adopt_or_initialize_app_db(settings.app_database_url), "app": adopt_or_initialize_app_db(settings.app_database_url),
"location": adopt_or_initialize_location_db(settings.location_database_url),
"poo": adopt_or_initialize_poo_db(settings.poo_database_url),
} }
+12 -74
View File
@@ -5,10 +5,8 @@ from alembic import command
from alembic.config import Config from alembic.config import Config
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.auth_db import reset_auth_db_caches from app.db import reset_db_caches
import app.db as app_db
from app.config import get_settings from app.config import get_settings
from app.main import create_app from app.main import create_app
@@ -19,74 +17,40 @@ def _make_app_alembic_config(database_url: str) -> Config:
return config return config
def _make_alembic_config(database_url: str) -> Config:
config = Config("alembic_location.ini")
config.set_main_option("sqlalchemy.url", database_url)
return config
def _make_poo_alembic_config(database_url: str) -> Config:
config = Config("alembic_poo.ini")
config.set_main_option("sqlalchemy.url", database_url)
return config
@pytest.fixture @pytest.fixture
def test_database_urls(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): def test_database_urls(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
app_database_path = tmp_path / "app_test.db" app_database_path = tmp_path / "app_test.db"
location_database_path = tmp_path / "location_test.db"
poo_database_path = tmp_path / "poo_placeholder.db"
app_database_url = f"sqlite:///{app_database_path}" app_database_url = f"sqlite:///{app_database_path}"
location_database_url = f"sqlite:///{location_database_path}"
poo_database_url = f"sqlite:///{poo_database_path}"
monkeypatch.setenv("APP_DATABASE_URL", app_database_url) monkeypatch.setenv("APP_DATABASE_URL", app_database_url)
monkeypatch.setenv("LOCATION_DATABASE_URL", location_database_url)
monkeypatch.setenv("POO_DATABASE_URL", poo_database_url)
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin") monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password") monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("AUTH_COOKIE_SECURE_OVERRIDE", "false") monkeypatch.setenv("AUTH_COOKIE_SECURE_OVERRIDE", "false")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
try: try:
yield { yield {
"app_path": app_database_path, "app_path": app_database_path,
"app_url": app_database_url, "app_url": app_database_url,
"location_path": location_database_path,
"location_url": location_database_url,
"poo_path": poo_database_path,
"poo_url": poo_database_url,
} }
finally: finally:
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
@pytest.fixture
def ready_location_database(test_database_urls):
command.upgrade(_make_alembic_config(test_database_urls["location_url"]), "head")
return test_database_urls
@pytest.fixture
def ready_poo_database(test_database_urls):
command.upgrade(_make_poo_alembic_config(test_database_urls["poo_url"]), "head")
return test_database_urls
@pytest.fixture @pytest.fixture
def auth_database(test_database_urls, monkeypatch: pytest.MonkeyPatch): def auth_database(test_database_urls, monkeypatch: pytest.MonkeyPatch):
database_url = test_database_urls["app_url"] database_url = test_database_urls["app_url"]
command.upgrade(_make_app_alembic_config(database_url), "head") command.upgrade(_make_app_alembic_config(database_url), "head")
reset_auth_db_caches() reset_db_caches()
yield test_database_urls yield test_database_urls
reset_auth_db_caches() reset_db_caches()
@pytest.fixture @pytest.fixture
def app(ready_location_database, ready_poo_database, auth_database): def app(auth_database):
yield create_app() yield create_app()
@@ -97,46 +61,20 @@ def client(app):
@pytest.fixture @pytest.fixture
def location_client( def location_client(auth_database):
ready_location_database, app_url = auth_database["app_url"]
ready_poo_database, engine = create_engine(app_url, connect_args={"check_same_thread": False})
auth_database,
monkeypatch: pytest.MonkeyPatch,
):
database_url = ready_location_database["location_url"]
engine = create_engine(database_url, connect_args={"check_same_thread": False})
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False)
monkeypatch.setattr(app_db, "engine", engine)
monkeypatch.setattr(app_db, "SessionLocal", session_local)
fastapi_app = create_app() fastapi_app = create_app()
with TestClient(fastapi_app) as client: with TestClient(fastapi_app) as client:
yield client, engine yield client, engine
engine.dispose() engine.dispose()
@pytest.fixture @pytest.fixture
def poo_client( def poo_client(auth_database):
ready_location_database, app_url = auth_database["app_url"]
ready_poo_database, engine = create_engine(app_url, connect_args={"check_same_thread": False})
auth_database,
monkeypatch: pytest.MonkeyPatch,
):
database_url = ready_poo_database["poo_url"]
engine = create_engine(database_url, connect_args={"check_same_thread": False})
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False)
import app.poo_db as poo_db
monkeypatch.setattr(poo_db, "poo_engine", engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", session_local)
fastapi_app = create_app() fastapi_app = create_app()
with TestClient(fastapi_app) as client: with TestClient(fastapi_app) as client:
yield client, engine yield client, engine
engine.dispose() engine.dispose()
+8 -121
View File
@@ -5,11 +5,11 @@ import pytest
from alembic import command from alembic import command
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from app.auth_db import reset_auth_db_caches from app.db import reset_db_caches
from app.config import get_settings from app.config import get_settings
from app.main import create_app from app.main import create_app
from scripts.app_db_adopt import APP_BASELINE_REVISION, adopt_or_initialize_app_db from scripts.app_db_adopt import APP_BASELINE_REVISION, adopt_or_initialize_app_db
from tests.conftest import _make_alembic_config, _make_app_alembic_config, _make_poo_alembic_config from tests.conftest import _make_app_alembic_config
async def _run_lifespan(app) -> None: async def _run_lifespan(app) -> None:
@@ -38,18 +38,12 @@ def test_status_endpoint(client: TestClient) -> None:
def test_app_start_fails_when_app_db_missing(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None: def test_app_start_fails_when_app_db_missing(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None:
missing_app_path = tmp_path / "missing_app.db" missing_app_path = tmp_path / "missing_app.db"
poo_database_path = tmp_path / "poo_ready.db"
location_database_path = tmp_path / "location_ready.db"
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
command.upgrade(_make_alembic_config(f"sqlite:///{location_database_path}"), "head")
monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{missing_app_path}") monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{missing_app_path}")
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin") monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password") monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_database_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
app = create_app() app = create_app()
with pytest.raises(RuntimeError, match="Run 'python scripts/app_db_adopt.py' first"): with pytest.raises(RuntimeError, match="Run 'python scripts/app_db_adopt.py' first"):
@@ -58,7 +52,7 @@ def test_app_start_fails_when_app_db_missing(tmp_path, monkeypatch: pytest.Monke
assert not missing_app_path.exists() assert not missing_app_path.exists()
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def test_app_db_adoption_initializes_new_database(tmp_path) -> None: def test_app_db_adoption_initializes_new_database(tmp_path) -> None:
@@ -86,10 +80,6 @@ def test_app_start_seeds_missing_config_from_env_without_overwriting_existing_va
tmp_path, monkeypatch: pytest.MonkeyPatch tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None: ) -> None:
app_database_url = _prepare_app_db(tmp_path) app_database_url = _prepare_app_db(tmp_path)
location_database_path = tmp_path / "location_ready.db"
poo_database_path = tmp_path / "poo_ready.db"
command.upgrade(_make_alembic_config(f"sqlite:///{location_database_path}"), "head")
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
app_database_path = tmp_path / "app_ready.db" app_database_path = tmp_path / "app_ready.db"
conn = sqlite3.connect(app_database_path) conn = sqlite3.connect(app_database_path)
@@ -105,10 +95,8 @@ def test_app_start_seeds_missing_config_from_env_without_overwriting_existing_va
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password") monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("APP_NAME", "Bootstrap Name") monkeypatch.setenv("APP_NAME", "Bootstrap Name")
monkeypatch.setenv("HOME_ASSISTANT_BASE_URL", "http://bootstrap-ha.local:8123") monkeypatch.setenv("HOME_ASSISTANT_BASE_URL", "http://bootstrap-ha.local:8123")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_database_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
app = create_app() app = create_app()
anyio.run(_run_lifespan, app) anyio.run(_run_lifespan, app)
@@ -124,17 +112,13 @@ def test_app_start_seeds_missing_config_from_env_without_overwriting_existing_va
assert rows["AUTH_SESSION_COOKIE_NAME"] == "home_automation_session" assert rows["AUTH_SESSION_COOKIE_NAME"] == "home_automation_session"
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def test_app_start_syncs_app_hostname_from_env_even_when_db_has_old_value( def test_app_start_syncs_app_hostname_from_env_even_when_db_has_old_value(
tmp_path, monkeypatch: pytest.MonkeyPatch tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None: ) -> None:
app_database_url = _prepare_app_db(tmp_path) app_database_url = _prepare_app_db(tmp_path)
location_database_path = tmp_path / "location_ready.db"
poo_database_path = tmp_path / "poo_ready.db"
command.upgrade(_make_alembic_config(f"sqlite:///{location_database_path}"), "head")
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
app_database_path = tmp_path / "app_ready.db" app_database_path = tmp_path / "app_ready.db"
conn = sqlite3.connect(app_database_path) conn = sqlite3.connect(app_database_path)
@@ -149,10 +133,8 @@ def test_app_start_syncs_app_hostname_from_env_even_when_db_has_old_value(
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin") monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password") monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("APP_HOSTNAME", "new.example.com") monkeypatch.setenv("APP_HOSTNAME", "new.example.com")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_database_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
app = create_app() app = create_app()
anyio.run(_run_lifespan, app) anyio.run(_run_lifespan, app)
@@ -166,99 +148,4 @@ def test_app_start_syncs_app_hostname_from_env_even_when_db_has_old_value(
assert rows["APP_HOSTNAME"] == "new.example.com" assert rows["APP_HOSTNAME"] == "new.example.com"
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def test_app_start_fails_when_location_db_missing(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
app_database_url = _prepare_app_db(tmp_path)
monkeypatch.setenv("APP_DATABASE_URL", app_database_url)
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
poo_database_path = tmp_path / "poo_ready.db"
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{tmp_path / 'missing.db'}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear()
reset_auth_db_caches()
app = create_app()
with pytest.raises(RuntimeError, match="Run 'python scripts/location_db_adopt.py' first"):
anyio.run(_run_lifespan, app)
get_settings.cache_clear()
reset_auth_db_caches()
def test_app_start_fails_when_location_db_exists_but_is_not_adopted(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
app_database_url = _prepare_app_db(tmp_path)
monkeypatch.setenv("APP_DATABASE_URL", app_database_url)
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
poo_database_path = tmp_path / "poo_ready.db"
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
database_path = tmp_path / "legacy_only.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute("PRAGMA user_version = 2")
conn.commit()
conn.close()
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{database_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear()
reset_auth_db_caches()
app = create_app()
with pytest.raises(RuntimeError, match="is not yet Alembic-managed"):
anyio.run(_run_lifespan, app)
get_settings.cache_clear()
reset_auth_db_caches()
def test_app_start_fails_when_location_db_revision_mismatches(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
app_database_url = _prepare_app_db(tmp_path)
monkeypatch.setenv("APP_DATABASE_URL", app_database_url)
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
poo_database_path = tmp_path / "poo_ready.db"
command.upgrade(_make_poo_alembic_config(f"sqlite:///{poo_database_path}"), "head")
database_path = tmp_path / "wrong_revision.db"
command.upgrade(_make_alembic_config(f"sqlite:///{database_path}"), "head")
conn = sqlite3.connect(database_path)
conn.execute("UPDATE alembic_version SET version_num = 'wrong_revision'")
conn.commit()
conn.close()
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{database_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_database_path}")
get_settings.cache_clear()
reset_auth_db_caches()
app = create_app()
with pytest.raises(RuntimeError, match="Location DB revision mismatch"):
anyio.run(_run_lifespan, app)
get_settings.cache_clear()
reset_auth_db_caches()
+2 -5
View File
@@ -4,7 +4,7 @@ from pathlib import Path
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from app.auth_db import reset_auth_db_caches from app.db import reset_db_caches
from app.config import get_settings from app.config import get_settings
from app.main import create_app from app.main import create_app
@@ -194,9 +194,6 @@ def test_config_page_update_persists_to_database(
def test_config_page_shows_ticktick_oauth_link_when_ticktick_is_configured( def test_config_page_shows_ticktick_oauth_link_when_ticktick_is_configured(
test_database_urls,
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch, monkeypatch,
) -> None: ) -> None:
@@ -205,7 +202,7 @@ def test_config_page_shows_ticktick_oauth_link_when_ticktick_is_configured(
monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id") monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id")
monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret") monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
with TestClient(create_app()) as client: with TestClient(create_app()) as client:
login_page = client.get("/login") login_page = client.get("/login")
+1 -9
View File
@@ -1,10 +1,8 @@
from app.config import Settings from app.config import Settings
def test_settings_support_two_independent_database_urls(monkeypatch) -> None: def test_settings_load_core_fields_from_env(monkeypatch) -> None:
monkeypatch.setenv("APP_DATABASE_URL", "sqlite:///./data/app.db") monkeypatch.setenv("APP_DATABASE_URL", "sqlite:///./data/app.db")
monkeypatch.setenv("LOCATION_DATABASE_URL", "sqlite:///./data/locationRecorder.db")
monkeypatch.setenv("POO_DATABASE_URL", "sqlite:///./data/pooRecorder.db")
monkeypatch.setenv("APP_HOSTNAME", "home.example.com") monkeypatch.setenv("APP_HOSTNAME", "home.example.com")
monkeypatch.setenv("POO_WEBHOOK_ID", "poo-hook") monkeypatch.setenv("POO_WEBHOOK_ID", "poo-hook")
monkeypatch.setenv("POO_SENSOR_ENTITY_NAME", "sensor.test_poo_status") monkeypatch.setenv("POO_SENSOR_ENTITY_NAME", "sensor.test_poo_status")
@@ -21,8 +19,6 @@ def test_settings_support_two_independent_database_urls(monkeypatch) -> None:
settings = Settings(_env_file=None) settings = Settings(_env_file=None)
assert settings.app_database_url == "sqlite:///./data/app.db" assert settings.app_database_url == "sqlite:///./data/app.db"
assert settings.location_database_url == "sqlite:///./data/locationRecorder.db"
assert settings.poo_database_url == "sqlite:///./data/pooRecorder.db"
assert settings.poo_webhook_id == "poo-hook" assert settings.poo_webhook_id == "poo-hook"
assert settings.poo_sensor_entity_name == "sensor.test_poo_status" assert settings.poo_sensor_entity_name == "sensor.test_poo_status"
assert settings.poo_sensor_friendly_name == "Poo Status" assert settings.poo_sensor_friendly_name == "Poo Status"
@@ -36,12 +32,8 @@ def test_settings_support_two_independent_database_urls(monkeypatch) -> None:
assert settings.auth_bootstrap_password == "secret" assert settings.auth_bootstrap_password == "secret"
assert settings.auth_session_cookie_name == "auth_cookie" assert settings.auth_session_cookie_name == "auth_cookie"
assert settings.auth_session_ttl_hours == 8 assert settings.auth_session_ttl_hours == 8
assert settings.location_sqlite_path is not None
assert settings.location_sqlite_path.name == "locationRecorder.db"
assert settings.app_sqlite_path is not None assert settings.app_sqlite_path is not None
assert settings.app_sqlite_path.name == "app.db" assert settings.app_sqlite_path.name == "app.db"
assert settings.poo_sqlite_path is not None
assert settings.poo_sqlite_path.name == "pooRecorder.db"
assert settings.auth_cookie_secure is True assert settings.auth_cookie_secure is True
+9 -113
View File
@@ -4,18 +4,12 @@ import sqlite3
import anyio import anyio
import pytest import pytest
import yaml import yaml
from alembic import command
from app.auth_db import reset_auth_db_caches from app.db import reset_db_caches
from app.config import get_settings from app.config import get_settings
from app.main import create_app from app.main import create_app
from scripts.app_db_adopt import APP_BASELINE_REVISION from scripts.app_db_adopt import APP_BASELINE_REVISION
from scripts.location_db_adopt import EXPECTED_USER_VERSION as LOCATION_USER_VERSION
from scripts.location_db_adopt import LOCATION_BASELINE_REVISION
from scripts.poo_db_adopt import EXPECTED_USER_VERSION as POO_USER_VERSION
from scripts.poo_db_adopt import POO_BASELINE_REVISION
from scripts.run_migrations import run_all_migrations from scripts.run_migrations import run_all_migrations
from tests.conftest import _make_alembic_config, _make_poo_alembic_config
PROJECT_ROOT = Path(__file__).resolve().parents[1] PROJECT_ROOT = Path(__file__).resolve().parents[1]
@@ -31,73 +25,20 @@ async def _run_lifespan(app) -> None:
def _configure_database_env(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> dict[str, Path | str]: def _configure_database_env(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> dict[str, Path | str]:
app_path = tmp_path / "app.db" app_path = tmp_path / "app.db"
location_path = tmp_path / "location.db"
poo_path = tmp_path / "poo.db"
monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{app_path}") monkeypatch.setenv("APP_DATABASE_URL", f"sqlite:///{app_path}")
monkeypatch.setenv("LOCATION_DATABASE_URL", f"sqlite:///{location_path}")
monkeypatch.setenv("POO_DATABASE_URL", f"sqlite:///{poo_path}")
monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin") monkeypatch.setenv("AUTH_BOOTSTRAP_USERNAME", "admin")
monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password") monkeypatch.setenv("AUTH_BOOTSTRAP_PASSWORD", "test-password")
monkeypatch.setenv("AUTH_COOKIE_SECURE_OVERRIDE", "false") monkeypatch.setenv("AUTH_COOKIE_SECURE_OVERRIDE", "false")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
return { return {
"app_path": app_path, "app_path": app_path,
"app_url": f"sqlite:///{app_path}", "app_url": f"sqlite:///{app_path}",
"location_path": location_path,
"location_url": f"sqlite:///{location_path}",
"poo_path": poo_path,
"poo_url": f"sqlite:///{poo_path}",
} }
def _create_legacy_location_db(database_path: Path) -> None:
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) VALUES (?, ?, ?, ?, ?)",
("alice", "2026-04-22T10:00:00Z", 1.23, 4.56, 7.89),
)
conn.execute(f"PRAGMA user_version = {LOCATION_USER_VERSION}")
conn.commit()
conn.close()
def _create_legacy_poo_db(database_path: Path) -> None:
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.execute(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) VALUES (?, ?, ?, ?)",
("2026-04-22T11:00:00Z", "complete", 9.87, 6.54),
)
conn.execute(f"PRAGMA user_version = {POO_USER_VERSION}")
conn.commit()
conn.close()
def test_compose_uses_migration_job_before_app() -> None: def test_compose_uses_migration_job_before_app() -> None:
compose = _read_yaml("docker-compose.yml") compose = _read_yaml("docker-compose.yml")
override = _read_yaml("docker-compose.override.yml") override = _read_yaml("docker-compose.override.yml")
@@ -131,12 +72,8 @@ def test_migration_runner_initializes_and_is_idempotent(
first_run = run_all_migrations() first_run = run_all_migrations()
second_run = run_all_migrations() second_run = run_all_migrations()
assert first_run == {"app": "initialized", "location": "initialized", "poo": "initialized"} assert first_run == {"app": "initialized"}
assert second_run == { assert second_run == {"app": "already_managed"}
"app": "already_managed",
"location": "already_managed",
"poo": "already_managed",
}
conn = sqlite3.connect(database_urls["app_path"]) conn = sqlite3.connect(database_urls["app_path"])
try: try:
@@ -150,51 +87,12 @@ def test_migration_runner_initializes_and_is_idempotent(
finally: finally:
conn.close() conn.close()
assert {"auth_users", "auth_sessions", "app_config", "alembic_version"} <= tables assert {
"auth_users", "auth_sessions", "app_config", "alembic_version", "location", "poo_records"
conn = sqlite3.connect(database_urls["location_path"]) } <= tables
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == LOCATION_BASELINE_REVISION
finally:
conn.close()
conn = sqlite3.connect(database_urls["poo_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == POO_BASELINE_REVISION
finally:
conn.close()
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
def test_migration_runner_adopts_legacy_sqlite_without_data_loss(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
database_urls = _configure_database_env(tmp_path, monkeypatch)
_create_legacy_location_db(database_urls["location_path"])
_create_legacy_poo_db(database_urls["poo_path"])
results = run_all_migrations()
assert results == {"app": "initialized", "location": "adopted", "poo": "adopted"}
conn = sqlite3.connect(database_urls["location_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == LOCATION_BASELINE_REVISION
assert conn.execute("SELECT COUNT(*) FROM location").fetchone()[0] == 1
finally:
conn.close()
conn = sqlite3.connect(database_urls["poo_path"])
try:
assert conn.execute("SELECT version_num FROM alembic_version").fetchone()[0] == POO_BASELINE_REVISION
assert conn.execute("SELECT COUNT(*) FROM poo_records").fetchone()[0] == 1
finally:
conn.close()
get_settings.cache_clear()
reset_auth_db_caches()
def test_app_startup_still_fails_closed_without_running_adoption( def test_app_startup_still_fails_closed_without_running_adoption(
@@ -202,8 +100,6 @@ def test_app_startup_still_fails_closed_without_running_adoption(
) -> None: ) -> None:
database_urls = _configure_database_env(tmp_path, monkeypatch) database_urls = _configure_database_env(tmp_path, monkeypatch)
missing_app_path = database_urls["app_path"] missing_app_path = database_urls["app_path"]
command.upgrade(_make_alembic_config(database_urls["location_url"]), "head")
command.upgrade(_make_poo_alembic_config(database_urls["poo_url"]), "head")
app = create_app() app = create_app()
with pytest.raises(RuntimeError, match="Run 'python scripts/app_db_adopt.py' first"): with pytest.raises(RuntimeError, match="Run 'python scripts/app_db_adopt.py' first"):
@@ -212,4 +108,4 @@ def test_app_startup_still_fails_closed_without_running_adoption(
assert not Path(missing_app_path).exists() assert not Path(missing_app_path).exists()
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
+10 -63
View File
@@ -1,7 +1,5 @@
from sqlalchemy import text from sqlalchemy import text
import app.db as app_db
import app.poo_db as poo_db
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_homeassistant_client from app.dependencies import get_app_settings, get_homeassistant_client
from app.main import create_app from app.main import create_app
@@ -158,45 +156,25 @@ def test_homeassistant_publish_rejects_invalid_ticktick_content(location_client)
def test_homeassistant_publish_poo_get_latest_publishes_latest_status( def test_homeassistant_publish_poo_get_latest_publishes_latest_status(
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch,
) -> None: ) -> None:
location_engine = app_db.create_engine( from fastapi.testclient import TestClient
ready_location_database["location_url"], from sqlalchemy import create_engine
connect_args={"check_same_thread": False},
) app_url = auth_database["app_url"]
location_session_local = app_db.sessionmaker( engine = create_engine(app_url, connect_args={"check_same_thread": False})
bind=location_engine,
autoflush=False,
autocommit=False,
)
poo_engine = poo_db.create_engine(
ready_poo_database["poo_url"],
connect_args={"check_same_thread": False},
)
poo_session_local = poo_db.sessionmaker(
bind=poo_engine,
autoflush=False,
autocommit=False,
)
fake_ha = _FakeHomeAssistantClient() fake_ha = _FakeHomeAssistantClient()
settings = Settings( settings = Settings(
poo_sensor_entity_name="sensor.test_poo_status", poo_sensor_entity_name="sensor.test_poo_status",
poo_sensor_friendly_name="Poo Status", poo_sensor_friendly_name="Poo Status",
) )
monkeypatch.setattr(app_db, "engine", location_engine)
monkeypatch.setattr(app_db, "SessionLocal", location_session_local)
monkeypatch.setattr(poo_db, "poo_engine", poo_engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", poo_session_local)
test_app = create_app() test_app = create_app()
test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha
test_app.dependency_overrides[get_app_settings] = lambda: settings test_app.dependency_overrides[get_app_settings] = lambda: settings
with poo_engine.begin() as conn: with engine.begin() as conn:
conn.execute( conn.execute(
text( text(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) " "INSERT INTO poo_records (timestamp, status, latitude, longitude) "
@@ -211,8 +189,6 @@ def test_homeassistant_publish_poo_get_latest_publishes_latest_status(
) )
try: try:
from fastapi.testclient import TestClient
with TestClient(test_app) as client: with TestClient(test_app) as client:
response = client.post( response = client.post(
"/homeassistant/publish", "/homeassistant/publish",
@@ -233,52 +209,25 @@ def test_homeassistant_publish_poo_get_latest_publishes_latest_status(
finally: finally:
test_app.dependency_overrides.clear() test_app.dependency_overrides.clear()
get_settings.cache_clear() get_settings.cache_clear()
location_engine.dispose() engine.dispose()
poo_engine.dispose()
def test_homeassistant_publish_returns_internal_error_for_unknown_poo_action( def test_homeassistant_publish_returns_internal_error_for_unknown_poo_action(
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch,
) -> None: ) -> None:
location_engine = app_db.create_engine( from fastapi.testclient import TestClient
ready_location_database["location_url"],
connect_args={"check_same_thread": False},
)
location_session_local = app_db.sessionmaker(
bind=location_engine,
autoflush=False,
autocommit=False,
)
poo_engine = poo_db.create_engine(
ready_poo_database["poo_url"],
connect_args={"check_same_thread": False},
)
poo_session_local = poo_db.sessionmaker(
bind=poo_engine,
autoflush=False,
autocommit=False,
)
fake_ha = _FakeHomeAssistantClient() fake_ha = _FakeHomeAssistantClient()
settings = Settings( settings = Settings(
poo_sensor_entity_name="sensor.test_poo_status", poo_sensor_entity_name="sensor.test_poo_status",
poo_sensor_friendly_name="Poo Status", poo_sensor_friendly_name="Poo Status",
) )
monkeypatch.setattr(app_db, "engine", location_engine)
monkeypatch.setattr(app_db, "SessionLocal", location_session_local)
monkeypatch.setattr(poo_db, "poo_engine", poo_engine)
monkeypatch.setattr(poo_db, "PooSessionLocal", poo_session_local)
test_app = create_app() test_app = create_app()
test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha test_app.dependency_overrides[get_homeassistant_client] = lambda: fake_ha
test_app.dependency_overrides[get_app_settings] = lambda: settings test_app.dependency_overrides[get_app_settings] = lambda: settings
try: try:
from fastapi.testclient import TestClient
with TestClient(test_app) as client: with TestClient(test_app) as client:
response = client.post( response = client.post(
"/homeassistant/publish", "/homeassistant/publish",
@@ -295,8 +244,6 @@ def test_homeassistant_publish_returns_internal_error_for_unknown_poo_action(
finally: finally:
test_app.dependency_overrides.clear() test_app.dependency_overrides.clear()
get_settings.cache_clear() get_settings.cache_clear()
location_engine.dispose()
poo_engine.dispose()
def test_homeassistant_publish_returns_not_implemented_for_unknown_location_action( def test_homeassistant_publish_returns_not_implemented_for_unknown_location_action(
+1 -214
View File
@@ -1,28 +1,8 @@
from datetime import datetime from datetime import datetime
from pathlib import Path
import sqlite3
import pytest import pytest
from alembic import command from sqlalchemy import text
from alembic.config import Config
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import app.db as app_db
from app.main import create_app
from scripts.location_db_adopt import (
EXPECTED_USER_VERSION,
LOCATION_BASELINE_REVISION,
LocationDatabaseAdoptionError,
adopt_or_initialize_location_db,
)
from tests.conftest import _make_app_alembic_config, _make_poo_alembic_config
def _make_alembic_config(database_url: str) -> Config:
config = Config("alembic_location.ini")
config.set_main_option("sqlalchemy.url", database_url)
return config
def test_location_record_endpoint_writes_row(location_client) -> None: def test_location_record_endpoint_writes_row(location_client) -> None:
client, engine = location_client client, engine = location_client
@@ -197,196 +177,3 @@ def test_location_record_endpoint_defaults_invalid_altitude_to_zero(location_cli
assert row.altitude == pytest.approx(0.0) assert row.altitude == pytest.approx(0.0)
def test_legacy_style_location_db_can_be_stamped_and_adopted(
test_database_urls, monkeypatch: pytest.MonkeyPatch
) -> None:
app_database_url = test_database_urls["app_url"]
database_path = test_database_urls["location_path"]
database_url = test_database_urls["location_url"]
poo_database_url = test_database_urls["poo_url"]
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute("PRAGMA user_version = 2")
conn.commit()
conn.close()
command.upgrade(_make_app_alembic_config(app_database_url), "head")
command.stamp(_make_alembic_config(database_url), LOCATION_BASELINE_REVISION)
command.upgrade(_make_poo_alembic_config(poo_database_url), "head")
engine = create_engine(database_url, connect_args={"check_same_thread": False})
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False)
monkeypatch.setattr(app_db, "engine", engine)
monkeypatch.setattr(app_db, "SessionLocal", session_local)
from fastapi.testclient import TestClient
fastapi_app = create_app()
with TestClient(fastapi_app) as client:
response = client.post(
"/location/record",
json={
"person": "legacy-user",
"latitude": "12.3",
"longitude": "45.6",
"altitude": "7.8",
},
)
assert response.status_code == 200
with engine.connect() as db_conn:
revision = db_conn.execute(text("SELECT version_num FROM alembic_version")).scalar_one()
row_count = db_conn.execute(text("SELECT COUNT(*) FROM location")).scalar_one()
assert revision == LOCATION_BASELINE_REVISION
assert row_count == 1
engine.dispose()
def test_location_db_adoption_initializes_new_db(tmp_path: Path) -> None:
database_path = tmp_path / "new_location.db"
result = adopt_or_initialize_location_db(f"sqlite:///{database_path}")
assert result == "initialized"
assert database_path.exists()
conn = sqlite3.connect(database_path)
try:
revision = conn.execute("SELECT version_num FROM alembic_version").fetchone()[0]
location_table = conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'location'"
).fetchone()
finally:
conn.close()
assert revision == LOCATION_BASELINE_REVISION
assert location_table is not None
def test_location_db_adoption_validates_and_stamps_legacy_db(tmp_path: Path) -> None:
database_path = tmp_path / "legacy_location.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute(f"PRAGMA user_version = {EXPECTED_USER_VERSION}")
conn.commit()
conn.close()
result = adopt_or_initialize_location_db(f"sqlite:///{database_path}")
assert result == "adopted"
conn = sqlite3.connect(database_path)
try:
revision = conn.execute("SELECT version_num FROM alembic_version").fetchone()[0]
finally:
conn.close()
assert revision == LOCATION_BASELINE_REVISION
def test_location_db_adoption_accepts_already_managed_matching_revision(
tmp_path: Path,
) -> None:
database_path = tmp_path / "managed_location.db"
command.upgrade(_make_alembic_config(f"sqlite:///{database_path}"), "head")
result = adopt_or_initialize_location_db(f"sqlite:///{database_path}")
assert result == "already_managed"
def test_location_db_adoption_fails_closed_on_alembic_revision_mismatch(
tmp_path: Path,
) -> None:
database_path = tmp_path / "wrong_revision.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute("CREATE TABLE alembic_version (version_num VARCHAR(32) NOT NULL)")
conn.execute("INSERT INTO alembic_version (version_num) VALUES ('wrong_revision')")
conn.execute(f"PRAGMA user_version = {EXPECTED_USER_VERSION}")
conn.commit()
conn.close()
with pytest.raises(LocationDatabaseAdoptionError, match="known migration revision"):
adopt_or_initialize_location_db(f"sqlite:///{database_path}")
def test_location_db_adoption_fails_closed_on_schema_mismatch(tmp_path: Path) -> None:
database_path = tmp_path / "bad_schema.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute(f"PRAGMA user_version = {EXPECTED_USER_VERSION}")
conn.commit()
conn.close()
with pytest.raises(LocationDatabaseAdoptionError, match="schema does not match"):
adopt_or_initialize_location_db(f"sqlite:///{database_path}")
def test_location_db_adoption_fails_closed_on_user_version_mismatch(tmp_path: Path) -> None:
database_path = tmp_path / "bad_user_version.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.execute("PRAGMA user_version = 999")
conn.commit()
conn.close()
with pytest.raises(LocationDatabaseAdoptionError, match="Expected PRAGMA user_version"):
adopt_or_initialize_location_db(f"sqlite:///{database_path}")
+460
View File
@@ -0,0 +1,460 @@
"""Tests for scripts/migrate_legacy_data.py (M1-T02).
Uses pytest tmp_path for all temp files. The app DB is brought to head via
alembic_app.ini (the same approach used by conftest._make_app_alembic_config),
so it has the location and poo_records tables created in T01.
Legacy DBs are built by hand with real columns matching the legacy baseline schema.
"""
from __future__ import annotations
import sqlite3
import sys
from pathlib import Path
import pytest
from alembic import command
from alembic.config import Config
from scripts.migrate_legacy_data import (
_reconcile,
_sqlite_path_from_url,
migrate_legacy_data,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_app_alembic_config(database_url: str) -> Config:
cfg = Config("alembic_app.ini")
cfg.set_main_option("sqlalchemy.url", database_url)
return cfg
def _upgraded_app_db(tmp_path: Path, name: str = "app_test.db") -> tuple[Path, str]:
"""Create and upgrade an app DB to head; return (path, url)."""
db_path = tmp_path / name
db_url = f"sqlite:///{db_path}"
command.upgrade(_make_app_alembic_config(db_url), "head")
return db_path, db_url
def _make_legacy_location_db(db_path: Path, rows: list[tuple]) -> None:
"""Create a legacy location DB and insert given rows.
Each row is a tuple: (person, datetime, latitude, longitude, altitude).
altitude may be None.
"""
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.executemany(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) "
"VALUES (?, ?, ?, ?, ?)",
rows,
)
conn.commit()
conn.close()
def _make_legacy_poo_db(db_path: Path, rows: list[tuple]) -> None:
"""Create a legacy poo DB and insert given rows.
Each row is a tuple: (timestamp, status, latitude, longitude).
"""
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.executemany(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) "
"VALUES (?, ?, ?, ?)",
rows,
)
conn.commit()
conn.close()
def _count_rows(db_path: Path, table: str) -> int:
conn = sqlite3.connect(db_path)
try:
(count,) = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
return count
finally:
conn.close()
# ---------------------------------------------------------------------------
# Sample data
# ---------------------------------------------------------------------------
LOCATION_ROWS = [
("alice", "2026-01-01T10:00:00Z", 1.23, 4.56, 7.89),
("bob", "2026-01-02T10:00:00Z", 2.34, 5.67, None),
("alice", "2026-01-03T10:00:00Z", 3.45, 6.78, 9.01),
]
POO_ROWS = [
("2026-01-01T08:00:00Z", "complete", 10.0, 20.0),
("2026-01-02T08:00:00Z", "urgent", 11.0, 21.0),
]
# ---------------------------------------------------------------------------
# Test 1: Idempotency
# ---------------------------------------------------------------------------
def test_location_migration_is_idempotent(tmp_path: Path) -> None:
"""N source rows → app table has N rows; run again → still N rows."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
# First run
result1 = migrate_legacy_data(app_url, legacy_url, None)
assert result1["location"]["source"] == len(LOCATION_ROWS)
assert result1["location"]["copied"] == len(LOCATION_ROWS)
assert result1["location"]["skipped"] is False
assert result1["location"]["final"] == len(LOCATION_ROWS)
assert _count_rows(app_path, "location") == len(LOCATION_ROWS)
# Second run — idempotent, no dupes, no error
result2 = migrate_legacy_data(app_url, legacy_url, None)
assert result2["location"]["source"] == len(LOCATION_ROWS)
assert result2["location"]["copied"] == 0 # nothing new
assert result2["location"]["skipped"] is False
assert result2["location"]["final"] == len(LOCATION_ROWS)
assert _count_rows(app_path, "location") == len(LOCATION_ROWS)
def test_poo_migration_is_idempotent(tmp_path: Path) -> None:
"""N poo source rows → app table has N rows; run again → still N rows."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(legacy_path, POO_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
result1 = migrate_legacy_data(app_url, None, legacy_url)
assert result1["poo_records"]["source"] == len(POO_ROWS)
assert result1["poo_records"]["copied"] == len(POO_ROWS)
assert result1["poo_records"]["skipped"] is False
assert result1["poo_records"]["final"] == len(POO_ROWS)
assert _count_rows(app_path, "poo_records") == len(POO_ROWS)
result2 = migrate_legacy_data(app_url, None, legacy_url)
assert result2["poo_records"]["copied"] == 0
assert result2["poo_records"]["final"] == len(POO_ROWS)
assert _count_rows(app_path, "poo_records") == len(POO_ROWS)
def test_both_tables_migration_is_idempotent(tmp_path: Path) -> None:
"""Migrating both tables at once is idempotent."""
app_path, app_url = _upgraded_app_db(tmp_path)
loc_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(loc_path, LOCATION_ROWS)
loc_url = f"sqlite:///{loc_path}"
poo_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(poo_path, POO_ROWS)
poo_url = f"sqlite:///{poo_path}"
result1 = migrate_legacy_data(app_url, loc_url, poo_url)
assert result1["location"]["final"] == len(LOCATION_ROWS)
assert result1["poo_records"]["final"] == len(POO_ROWS)
result2 = migrate_legacy_data(app_url, loc_url, poo_url)
assert result2["location"]["copied"] == 0
assert result2["poo_records"]["copied"] == 0
assert _count_rows(app_path, "location") == len(LOCATION_ROWS)
assert _count_rows(app_path, "poo_records") == len(POO_ROWS)
# ---------------------------------------------------------------------------
# Test 2: Missing legacy file
# ---------------------------------------------------------------------------
def test_missing_location_file_is_skipped(tmp_path: Path) -> None:
"""Absent location DB → table result is skipped, no exception, app table empty."""
app_path, app_url = _upgraded_app_db(tmp_path)
nonexistent = f"sqlite:///{tmp_path / 'does_not_exist_location.db'}"
result = migrate_legacy_data(app_url, nonexistent, None)
assert result["location"]["skipped"] is True
assert result["location"]["source"] == 0
assert result["location"]["copied"] == 0
assert _count_rows(app_path, "location") == 0
def test_missing_poo_file_is_skipped(tmp_path: Path) -> None:
"""Absent poo DB → table result is skipped, no exception, app table empty."""
app_path, app_url = _upgraded_app_db(tmp_path)
nonexistent = f"sqlite:///{tmp_path / 'does_not_exist_poo.db'}"
result = migrate_legacy_data(app_url, None, nonexistent)
assert result["poo_records"]["skipped"] is True
assert result["poo_records"]["source"] == 0
assert result["poo_records"]["copied"] == 0
assert _count_rows(app_path, "poo_records") == 0
def test_none_location_url_is_skipped(tmp_path: Path) -> None:
"""Passing None for location_url → skipped, no exception."""
_, app_url = _upgraded_app_db(tmp_path)
result = migrate_legacy_data(app_url, None, None)
assert result["location"]["skipped"] is True
assert result["poo_records"]["skipped"] is True
# ---------------------------------------------------------------------------
# Test 3: Reconciliation failure
# ---------------------------------------------------------------------------
def test_reconcile_raises_on_missing_rows(tmp_path: Path) -> None:
"""_reconcile() raises RuntimeError when source rows are missing from target."""
# Build an app DB and a legacy DB with 3 rows
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
# Only insert 1 row into the app DB manually (simulate partial migration)
conn = sqlite3.connect(app_path)
conn.execute(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) "
"VALUES (?, ?, ?, ?, ?)",
LOCATION_ROWS[0],
)
conn.commit()
# ATTACH legacy to run _reconcile
conn.execute(f"ATTACH DATABASE '{legacy_path}' AS legacy")
with pytest.raises(RuntimeError, match="Reconciliation failed"):
_reconcile(
conn,
table="location",
columns=["person", "datetime", "latitude", "longitude", "altitude"],
source_count=len(LOCATION_ROWS),
)
conn.execute("DETACH DATABASE legacy")
conn.close()
def test_migrate_reconciliation_failure_raises(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""If a row goes missing after INSERT, migrate_legacy_data raises RuntimeError."""
import scripts.migrate_legacy_data as mod
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
def _always_fail(conn, table, columns, source_count):
# Simulate a scenario where reconciliation finds rows missing
raise RuntimeError(
f"Reconciliation failed for table '{table}': "
f"1 of {source_count} source rows are missing from the app DB."
)
monkeypatch.setattr(mod, "_reconcile", _always_fail)
with pytest.raises(RuntimeError, match="Reconciliation failed"):
migrate_legacy_data(app_url, legacy_url, None)
def test_cli_exits_nonzero_on_reconciliation_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""CLI main() exits non-zero when reconciliation raises."""
import scripts.migrate_legacy_data as mod
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
# Patch _reconcile to always raise
def _always_fail(conn, table, columns, source_count):
raise RuntimeError(
f"Reconciliation failed for table '{table}': 1 row missing."
)
monkeypatch.setattr(mod, "_reconcile", _always_fail)
# Patch sys.argv so main() picks up the right args
monkeypatch.setattr(
sys,
"argv",
[
"migrate_legacy_data",
"--app-db", app_url,
"--location-db", legacy_url,
],
)
with pytest.raises(SystemExit) as exc_info:
mod.main()
assert exc_info.value.code != 0
def test_reconcile_catches_value_mismatch_not_just_pk(tmp_path: Path) -> None:
"""Full-row reconciliation catches value mismatch that PK-only check would miss.
Scenario: the app DB is PRE-POPULATED with a row that shares the same PK as
a legacy source row but has DIFFERENT non-PK column values. INSERT OR IGNORE
skips the source row (PK conflict), so the target retains the stale data.
The old PK-only reconciliation would have incorrectly reported success.
The new full-row reconciliation must detect the mismatch and raise.
"""
app_path, app_url = _upgraded_app_db(tmp_path)
# Legacy source has a row: person="alice", datetime="2026-01-01T10:00:00Z",
# latitude=1.23, longitude=4.56, altitude=7.89
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, [("alice", "2026-01-01T10:00:00Z", 1.23, 4.56, 7.89)])
legacy_url = f"sqlite:///{legacy_path}"
# App DB is pre-populated with the SAME PK but DIFFERENT non-PK values
# (latitude/longitude/altitude all differ from the source row)
conn = sqlite3.connect(app_path)
conn.execute(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) "
"VALUES (?, ?, ?, ?, ?)",
("alice", "2026-01-01T10:00:00Z", 99.0, 99.0, 99.0),
)
conn.commit()
conn.close()
# migrate_legacy_data must raise: the source row's data is NOT in the target
# (INSERT OR IGNORE skipped it because of PK conflict, retaining the 99.0 values)
with pytest.raises(RuntimeError, match="Reconciliation failed"):
migrate_legacy_data(app_url, legacy_url, None)
def test_full_row_reconciliation_idempotent_on_identical_data(tmp_path: Path) -> None:
"""Second run on already-migrated data still reconciles cleanly.
When the target already holds identical rows (from the first run), the full-row
IS predicate matches every column and reconciliation passes (no raise).
"""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
# First run: migrate all rows
result1 = migrate_legacy_data(app_url, legacy_url, None)
assert result1["location"]["copied"] == len(LOCATION_ROWS)
# Second run: rows already present, INSERT OR IGNORE skips all, full-row
# reconciliation must still pass because values are identical
result2 = migrate_legacy_data(app_url, legacy_url, None)
assert result2["location"]["copied"] == 0
assert result2["location"]["final"] == len(LOCATION_ROWS)
# No exception raised — idempotency holds under full-row reconciliation
# ---------------------------------------------------------------------------
# Test 4: dry_run
# ---------------------------------------------------------------------------
def test_dry_run_does_not_write_location_rows(tmp_path: Path) -> None:
"""dry_run=True reports source counts but writes nothing."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
result = migrate_legacy_data(app_url, legacy_url, None, dry_run=True)
assert result["location"]["source"] == len(LOCATION_ROWS)
assert result["location"]["copied"] == 0
assert result["location"]["skipped"] is False
# dry_run returns final=0 (no actual query on app side)
assert result["location"]["final"] == 0
# App table must still be empty
assert _count_rows(app_path, "location") == 0
def test_dry_run_does_not_write_poo_rows(tmp_path: Path) -> None:
"""dry_run=True for poo_records: source reported, nothing written."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(legacy_path, POO_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
result = migrate_legacy_data(app_url, None, legacy_url, dry_run=True)
assert result["poo_records"]["source"] == len(POO_ROWS)
assert result["poo_records"]["copied"] == 0
assert result["poo_records"]["skipped"] is False
assert result["poo_records"]["final"] == 0
assert _count_rows(app_path, "poo_records") == 0
def test_dry_run_both_tables(tmp_path: Path) -> None:
"""dry_run=True for both tables: both reported, nothing written."""
app_path, app_url = _upgraded_app_db(tmp_path)
loc_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(loc_path, LOCATION_ROWS)
loc_url = f"sqlite:///{loc_path}"
poo_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(poo_path, POO_ROWS)
poo_url = f"sqlite:///{poo_path}"
result = migrate_legacy_data(app_url, loc_url, poo_url, dry_run=True)
assert result["location"]["source"] == len(LOCATION_ROWS)
assert result["location"]["copied"] == 0
assert result["poo_records"]["source"] == len(POO_ROWS)
assert result["poo_records"]["copied"] == 0
assert _count_rows(app_path, "location") == 0
assert _count_rows(app_path, "poo_records") == 0
# ---------------------------------------------------------------------------
# Test: _sqlite_path_from_url helper
# ---------------------------------------------------------------------------
def test_sqlite_path_from_url_parses_url() -> None:
path = _sqlite_path_from_url("sqlite:///./data/app.db")
# Path normalises './' away, but the tail should remain
assert path == Path("data/app.db")
def test_sqlite_path_from_url_treats_plain_path_as_path() -> None:
path = _sqlite_path_from_url("/tmp/some.db")
assert str(path) == "/tmp/some.db"
-102
View File
@@ -1,17 +1,8 @@
from pathlib import Path
import sqlite3
import pytest import pytest
from sqlalchemy import text from sqlalchemy import text
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.dependencies import get_app_settings, get_homeassistant_client from app.dependencies import get_app_settings, get_homeassistant_client
from scripts.poo_db_adopt import (
EXPECTED_USER_VERSION,
POO_BASELINE_REVISION,
PooDatabaseAdoptionError,
adopt_or_initialize_poo_db,
)
class _FakeHomeAssistantClient: class _FakeHomeAssistantClient:
@@ -153,96 +144,3 @@ def test_poo_latest_endpoint_returns_ok_when_no_record_exists(poo_client_with_ov
assert response.text == "" assert response.text == ""
def test_poo_db_adoption_initializes_new_db(tmp_path: Path) -> None:
database_path = tmp_path / "new_poo.db"
result = adopt_or_initialize_poo_db(f"sqlite:///{database_path}")
assert result == "initialized"
assert database_path.exists()
conn = sqlite3.connect(database_path)
try:
revision = conn.execute("SELECT version_num FROM alembic_version").fetchone()[0]
poo_table = conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'poo_records'"
).fetchone()
finally:
conn.close()
assert revision == POO_BASELINE_REVISION
assert poo_table is not None
def test_poo_db_adoption_validates_and_stamps_legacy_db(tmp_path: Path) -> None:
database_path = tmp_path / "legacy_poo.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.execute(f"PRAGMA user_version = {EXPECTED_USER_VERSION}")
conn.commit()
conn.close()
result = adopt_or_initialize_poo_db(f"sqlite:///{database_path}")
assert result == "adopted"
conn = sqlite3.connect(database_path)
try:
revision = conn.execute("SELECT version_num FROM alembic_version").fetchone()[0]
finally:
conn.close()
assert revision == POO_BASELINE_REVISION
def test_poo_db_adoption_fails_closed_on_schema_mismatch(tmp_path: Path) -> None:
database_path = tmp_path / "bad_poo_schema.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.execute(f"PRAGMA user_version = {EXPECTED_USER_VERSION}")
conn.commit()
conn.close()
with pytest.raises(PooDatabaseAdoptionError, match="schema does not match"):
adopt_or_initialize_poo_db(f"sqlite:///{database_path}")
def test_poo_db_adoption_fails_closed_on_user_version_mismatch(tmp_path: Path) -> None:
database_path = tmp_path / "bad_poo_user_version.db"
conn = sqlite3.connect(database_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.execute("PRAGMA user_version = 999")
conn.commit()
conn.close()
with pytest.raises(PooDatabaseAdoptionError, match="Expected PRAGMA user_version"):
adopt_or_initialize_poo_db(f"sqlite:///{database_path}")
-2
View File
@@ -182,8 +182,6 @@ def _notification_settings() -> Settings:
app_env="development", app_env="development",
app_hostname="localhost:8000", app_hostname="localhost:8000",
app_database_url="sqlite:///./data/app.db", app_database_url="sqlite:///./data/app.db",
location_database_url="sqlite:///./data/locationRecorder.db",
poo_database_url="sqlite:///./data/pooRecorder.db",
auth_bootstrap_username="admin", auth_bootstrap_username="admin",
auth_bootstrap_password="secret-password", auth_bootstrap_password="secret-password",
smtp_enabled=True, smtp_enabled=True,
-2
View File
@@ -40,8 +40,6 @@ def _smtp_settings(**overrides) -> Settings:
"app_env": "development", "app_env": "development",
"app_hostname": "localhost:8000", "app_hostname": "localhost:8000",
"app_database_url": "sqlite:///./data/app.db", "app_database_url": "sqlite:///./data/app.db",
"location_database_url": "sqlite:///./data/locationRecorder.db",
"poo_database_url": "sqlite:///./data/pooRecorder.db",
"auth_bootstrap_username": "admin", "auth_bootstrap_username": "admin",
"auth_bootstrap_password": "secret-password", "auth_bootstrap_password": "secret-password",
"smtp_enabled": True, "smtp_enabled": True,
+7 -22
View File
@@ -6,7 +6,7 @@ from urllib.parse import parse_qs, urlparse
import pytest import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from app.auth_db import reset_auth_db_caches from app.db import reset_db_caches
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.integrations.ticktick import ( from app.integrations.ticktick import (
AUTH_SCOPE, AUTH_SCOPE,
@@ -209,9 +209,6 @@ def test_create_task_posts_expected_payload(monkeypatch: pytest.MonkeyPatch) ->
def test_homeassistant_publish_creates_ticktick_action_task( def test_homeassistant_publish_creates_ticktick_action_task(
test_database_urls,
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
@@ -221,7 +218,7 @@ def test_homeassistant_publish_creates_ticktick_action_task(
monkeypatch.setenv("TICKTICK_TOKEN", "ticktick-access-token") monkeypatch.setenv("TICKTICK_TOKEN", "ticktick-access-token")
monkeypatch.setenv("HOME_ASSISTANT_ACTION_TASK_PROJECT_ID", "project-123") monkeypatch.setenv("HOME_ASSISTANT_ACTION_TASK_PROJECT_ID", "project-123")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
captured = {"calls": []} captured = {"calls": []}
@@ -255,9 +252,6 @@ def test_homeassistant_publish_creates_ticktick_action_task(
def test_ticktick_auth_start_redirects_authenticated_user( def test_ticktick_auth_start_redirects_authenticated_user(
test_database_urls,
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
@@ -265,7 +259,7 @@ def test_ticktick_auth_start_redirects_authenticated_user(
monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id") monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id")
monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret") monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
monkeypatch.setattr("app.integrations.ticktick.secrets.token_hex", lambda _: "state-redirect") monkeypatch.setattr("app.integrations.ticktick.secrets.token_hex", lambda _: "state-redirect")
with TestClient(create_app()) as client: with TestClient(create_app()) as client:
@@ -291,9 +285,6 @@ def test_ticktick_auth_start_redirects_authenticated_user(
def test_ticktick_auth_callback_persists_token( def test_ticktick_auth_callback_persists_token(
test_database_urls,
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
@@ -301,7 +292,7 @@ def test_ticktick_auth_callback_persists_token(
monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id") monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id")
monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret") monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
default_auth_state_store.pending_state = "callback-state" default_auth_state_store.pending_state = "callback-state"
def fake_urlopen(req, timeout): def fake_urlopen(req, timeout):
@@ -318,7 +309,7 @@ def test_ticktick_auth_callback_persists_token(
assert response.status_code == 303 assert response.status_code == 303
assert response.headers["location"] == "/config?ticktick_oauth=success" assert response.headers["location"] == "/config?ticktick_oauth=success"
conn = sqlite3.connect(test_database_urls["app_path"]) conn = sqlite3.connect(auth_database["app_path"])
try: try:
row = conn.execute( row = conn.execute(
"SELECT value FROM app_config WHERE key = ?", "SELECT value FROM app_config WHERE key = ?",
@@ -332,9 +323,6 @@ def test_ticktick_auth_callback_persists_token(
def test_ticktick_auth_callback_redirects_on_invalid_state( def test_ticktick_auth_callback_redirects_on_invalid_state(
test_database_urls,
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
@@ -342,7 +330,7 @@ def test_ticktick_auth_callback_redirects_on_invalid_state(
monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id") monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id")
monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret") monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
default_auth_state_store.pending_state = "expected-state" default_auth_state_store.pending_state = "expected-state"
with TestClient(create_app()) as client: with TestClient(create_app()) as client:
@@ -356,9 +344,6 @@ def test_ticktick_auth_callback_redirects_on_invalid_state(
def test_ticktick_auth_callback_redirects_when_token_exchange_fails( def test_ticktick_auth_callback_redirects_when_token_exchange_fails(
test_database_urls,
ready_location_database,
ready_poo_database,
auth_database, auth_database,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
@@ -366,7 +351,7 @@ def test_ticktick_auth_callback_redirects_when_token_exchange_fails(
monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id") monkeypatch.setenv("TICKTICK_CLIENT_ID", "ticktick-client-id")
monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret") monkeypatch.setenv("TICKTICK_CLIENT_SECRET", "ticktick-client-secret")
get_settings.cache_clear() get_settings.cache_clear()
reset_auth_db_caches() reset_db_caches()
default_auth_state_store.pending_state = "callback-state" default_auth_state_store.pending_state = "callback-state"
def fake_urlopen(req, timeout): def fake_urlopen(req, timeout):