Files
home-automation/tests/test_migrate_legacy_data.py
T
tliu93 1cbe6c46d2 M1-rework: harden legacy-migration reconciliation to full-row equality
Audit finding (review-notes/M1-full-review-1.md, FINDING 1): _reconcile only
checked primary-key presence, so a source row skipped by INSERT OR IGNORE due
to a value difference against a pre-existing same-PK target row would
false-pass. Compare ALL columns with SQLite's NULL-safe IS operator instead,
so reconciliation is a true full-row guarantee (idempotent re-runs still pass
because the rows match column-for-column). Add tests for the value-mismatch
abort and for idempotency under full-row reconciliation. Remove the now-unused
pk_cols parameter.

pytest 97 passed; ruff clean (pre-existing only); data-safety grep still empty.
2026-06-12 19:05:56 +02:00

461 lines
17 KiB
Python

"""Tests for scripts/migrate_legacy_data.py (M1-T02).
Uses pytest tmp_path for all temp files. The app DB is brought to head via
alembic_app.ini (the same approach used by conftest._make_app_alembic_config),
so it has the location and poo_records tables created in T01.
Legacy DBs are built by hand with real columns matching the legacy baseline schema.
"""
from __future__ import annotations
import sqlite3
import sys
from pathlib import Path
import pytest
from alembic import command
from alembic.config import Config
from scripts.migrate_legacy_data import (
_reconcile,
_sqlite_path_from_url,
migrate_legacy_data,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_app_alembic_config(database_url: str) -> Config:
cfg = Config("alembic_app.ini")
cfg.set_main_option("sqlalchemy.url", database_url)
return cfg
def _upgraded_app_db(tmp_path: Path, name: str = "app_test.db") -> tuple[Path, str]:
"""Create and upgrade an app DB to head; return (path, url)."""
db_path = tmp_path / name
db_url = f"sqlite:///{db_path}"
command.upgrade(_make_app_alembic_config(db_url), "head")
return db_path, db_url
def _make_legacy_location_db(db_path: Path, rows: list[tuple]) -> None:
"""Create a legacy location DB and insert given rows.
Each row is a tuple: (person, datetime, latitude, longitude, altitude).
altitude may be None.
"""
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE location (
person TEXT NOT NULL,
datetime TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
PRIMARY KEY (person, datetime)
)
"""
)
conn.executemany(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) "
"VALUES (?, ?, ?, ?, ?)",
rows,
)
conn.commit()
conn.close()
def _make_legacy_poo_db(db_path: Path, rows: list[tuple]) -> None:
"""Create a legacy poo DB and insert given rows.
Each row is a tuple: (timestamp, status, latitude, longitude).
"""
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE poo_records (
timestamp TEXT NOT NULL,
status TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
PRIMARY KEY (timestamp)
)
"""
)
conn.executemany(
"INSERT INTO poo_records (timestamp, status, latitude, longitude) "
"VALUES (?, ?, ?, ?)",
rows,
)
conn.commit()
conn.close()
def _count_rows(db_path: Path, table: str) -> int:
conn = sqlite3.connect(db_path)
try:
(count,) = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
return count
finally:
conn.close()
# ---------------------------------------------------------------------------
# Sample data
# ---------------------------------------------------------------------------
LOCATION_ROWS = [
("alice", "2026-01-01T10:00:00Z", 1.23, 4.56, 7.89),
("bob", "2026-01-02T10:00:00Z", 2.34, 5.67, None),
("alice", "2026-01-03T10:00:00Z", 3.45, 6.78, 9.01),
]
POO_ROWS = [
("2026-01-01T08:00:00Z", "complete", 10.0, 20.0),
("2026-01-02T08:00:00Z", "urgent", 11.0, 21.0),
]
# ---------------------------------------------------------------------------
# Test 1: Idempotency
# ---------------------------------------------------------------------------
def test_location_migration_is_idempotent(tmp_path: Path) -> None:
"""N source rows → app table has N rows; run again → still N rows."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
# First run
result1 = migrate_legacy_data(app_url, legacy_url, None)
assert result1["location"]["source"] == len(LOCATION_ROWS)
assert result1["location"]["copied"] == len(LOCATION_ROWS)
assert result1["location"]["skipped"] is False
assert result1["location"]["final"] == len(LOCATION_ROWS)
assert _count_rows(app_path, "location") == len(LOCATION_ROWS)
# Second run — idempotent, no dupes, no error
result2 = migrate_legacy_data(app_url, legacy_url, None)
assert result2["location"]["source"] == len(LOCATION_ROWS)
assert result2["location"]["copied"] == 0 # nothing new
assert result2["location"]["skipped"] is False
assert result2["location"]["final"] == len(LOCATION_ROWS)
assert _count_rows(app_path, "location") == len(LOCATION_ROWS)
def test_poo_migration_is_idempotent(tmp_path: Path) -> None:
"""N poo source rows → app table has N rows; run again → still N rows."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(legacy_path, POO_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
result1 = migrate_legacy_data(app_url, None, legacy_url)
assert result1["poo_records"]["source"] == len(POO_ROWS)
assert result1["poo_records"]["copied"] == len(POO_ROWS)
assert result1["poo_records"]["skipped"] is False
assert result1["poo_records"]["final"] == len(POO_ROWS)
assert _count_rows(app_path, "poo_records") == len(POO_ROWS)
result2 = migrate_legacy_data(app_url, None, legacy_url)
assert result2["poo_records"]["copied"] == 0
assert result2["poo_records"]["final"] == len(POO_ROWS)
assert _count_rows(app_path, "poo_records") == len(POO_ROWS)
def test_both_tables_migration_is_idempotent(tmp_path: Path) -> None:
"""Migrating both tables at once is idempotent."""
app_path, app_url = _upgraded_app_db(tmp_path)
loc_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(loc_path, LOCATION_ROWS)
loc_url = f"sqlite:///{loc_path}"
poo_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(poo_path, POO_ROWS)
poo_url = f"sqlite:///{poo_path}"
result1 = migrate_legacy_data(app_url, loc_url, poo_url)
assert result1["location"]["final"] == len(LOCATION_ROWS)
assert result1["poo_records"]["final"] == len(POO_ROWS)
result2 = migrate_legacy_data(app_url, loc_url, poo_url)
assert result2["location"]["copied"] == 0
assert result2["poo_records"]["copied"] == 0
assert _count_rows(app_path, "location") == len(LOCATION_ROWS)
assert _count_rows(app_path, "poo_records") == len(POO_ROWS)
# ---------------------------------------------------------------------------
# Test 2: Missing legacy file
# ---------------------------------------------------------------------------
def test_missing_location_file_is_skipped(tmp_path: Path) -> None:
"""Absent location DB → table result is skipped, no exception, app table empty."""
app_path, app_url = _upgraded_app_db(tmp_path)
nonexistent = f"sqlite:///{tmp_path / 'does_not_exist_location.db'}"
result = migrate_legacy_data(app_url, nonexistent, None)
assert result["location"]["skipped"] is True
assert result["location"]["source"] == 0
assert result["location"]["copied"] == 0
assert _count_rows(app_path, "location") == 0
def test_missing_poo_file_is_skipped(tmp_path: Path) -> None:
"""Absent poo DB → table result is skipped, no exception, app table empty."""
app_path, app_url = _upgraded_app_db(tmp_path)
nonexistent = f"sqlite:///{tmp_path / 'does_not_exist_poo.db'}"
result = migrate_legacy_data(app_url, None, nonexistent)
assert result["poo_records"]["skipped"] is True
assert result["poo_records"]["source"] == 0
assert result["poo_records"]["copied"] == 0
assert _count_rows(app_path, "poo_records") == 0
def test_none_location_url_is_skipped(tmp_path: Path) -> None:
"""Passing None for location_url → skipped, no exception."""
_, app_url = _upgraded_app_db(tmp_path)
result = migrate_legacy_data(app_url, None, None)
assert result["location"]["skipped"] is True
assert result["poo_records"]["skipped"] is True
# ---------------------------------------------------------------------------
# Test 3: Reconciliation failure
# ---------------------------------------------------------------------------
def test_reconcile_raises_on_missing_rows(tmp_path: Path) -> None:
"""_reconcile() raises RuntimeError when source rows are missing from target."""
# Build an app DB and a legacy DB with 3 rows
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
# Only insert 1 row into the app DB manually (simulate partial migration)
conn = sqlite3.connect(app_path)
conn.execute(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) "
"VALUES (?, ?, ?, ?, ?)",
LOCATION_ROWS[0],
)
conn.commit()
# ATTACH legacy to run _reconcile
conn.execute(f"ATTACH DATABASE '{legacy_path}' AS legacy")
with pytest.raises(RuntimeError, match="Reconciliation failed"):
_reconcile(
conn,
table="location",
columns=["person", "datetime", "latitude", "longitude", "altitude"],
source_count=len(LOCATION_ROWS),
)
conn.execute("DETACH DATABASE legacy")
conn.close()
def test_migrate_reconciliation_failure_raises(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""If a row goes missing after INSERT, migrate_legacy_data raises RuntimeError."""
import scripts.migrate_legacy_data as mod
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
def _always_fail(conn, table, pk_cols, source_count):
# Simulate a scenario where reconciliation finds rows missing
raise RuntimeError(
f"Reconciliation failed for table '{table}': "
f"1 of {source_count} source rows are missing from the app DB."
)
monkeypatch.setattr(mod, "_reconcile", _always_fail)
with pytest.raises(RuntimeError, match="Reconciliation failed"):
migrate_legacy_data(app_url, legacy_url, None)
def test_cli_exits_nonzero_on_reconciliation_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""CLI main() exits non-zero when reconciliation raises."""
import scripts.migrate_legacy_data as mod
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
# Patch _reconcile to always raise
def _always_fail(conn, table, pk_cols, source_count):
raise RuntimeError(
f"Reconciliation failed for table '{table}': 1 row missing."
)
monkeypatch.setattr(mod, "_reconcile", _always_fail)
# Patch sys.argv so main() picks up the right args
monkeypatch.setattr(
sys,
"argv",
[
"migrate_legacy_data",
"--app-db", app_url,
"--location-db", legacy_url,
],
)
with pytest.raises(SystemExit) as exc_info:
mod.main()
assert exc_info.value.code != 0
def test_reconcile_catches_value_mismatch_not_just_pk(tmp_path: Path) -> None:
"""Full-row reconciliation catches value mismatch that PK-only check would miss.
Scenario: the app DB is PRE-POPULATED with a row that shares the same PK as
a legacy source row but has DIFFERENT non-PK column values. INSERT OR IGNORE
skips the source row (PK conflict), so the target retains the stale data.
The old PK-only reconciliation would have incorrectly reported success.
The new full-row reconciliation must detect the mismatch and raise.
"""
app_path, app_url = _upgraded_app_db(tmp_path)
# Legacy source has a row: person="alice", datetime="2026-01-01T10:00:00Z",
# latitude=1.23, longitude=4.56, altitude=7.89
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, [("alice", "2026-01-01T10:00:00Z", 1.23, 4.56, 7.89)])
legacy_url = f"sqlite:///{legacy_path}"
# App DB is pre-populated with the SAME PK but DIFFERENT non-PK values
# (latitude/longitude/altitude all differ from the source row)
conn = sqlite3.connect(app_path)
conn.execute(
"INSERT INTO location (person, datetime, latitude, longitude, altitude) "
"VALUES (?, ?, ?, ?, ?)",
("alice", "2026-01-01T10:00:00Z", 99.0, 99.0, 99.0),
)
conn.commit()
conn.close()
# migrate_legacy_data must raise: the source row's data is NOT in the target
# (INSERT OR IGNORE skipped it because of PK conflict, retaining the 99.0 values)
with pytest.raises(RuntimeError, match="Reconciliation failed"):
migrate_legacy_data(app_url, legacy_url, None)
def test_full_row_reconciliation_idempotent_on_identical_data(tmp_path: Path) -> None:
"""Second run on already-migrated data still reconciles cleanly.
When the target already holds identical rows (from the first run), the full-row
IS predicate matches every column and reconciliation passes (no raise).
"""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
# First run: migrate all rows
result1 = migrate_legacy_data(app_url, legacy_url, None)
assert result1["location"]["copied"] == len(LOCATION_ROWS)
# Second run: rows already present, INSERT OR IGNORE skips all, full-row
# reconciliation must still pass because values are identical
result2 = migrate_legacy_data(app_url, legacy_url, None)
assert result2["location"]["copied"] == 0
assert result2["location"]["final"] == len(LOCATION_ROWS)
# No exception raised — idempotency holds under full-row reconciliation
# ---------------------------------------------------------------------------
# Test 4: dry_run
# ---------------------------------------------------------------------------
def test_dry_run_does_not_write_location_rows(tmp_path: Path) -> None:
"""dry_run=True reports source counts but writes nothing."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(legacy_path, LOCATION_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
result = migrate_legacy_data(app_url, legacy_url, None, dry_run=True)
assert result["location"]["source"] == len(LOCATION_ROWS)
assert result["location"]["copied"] == 0
assert result["location"]["skipped"] is False
# dry_run returns final=0 (no actual query on app side)
assert result["location"]["final"] == 0
# App table must still be empty
assert _count_rows(app_path, "location") == 0
def test_dry_run_does_not_write_poo_rows(tmp_path: Path) -> None:
"""dry_run=True for poo_records: source reported, nothing written."""
app_path, app_url = _upgraded_app_db(tmp_path)
legacy_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(legacy_path, POO_ROWS)
legacy_url = f"sqlite:///{legacy_path}"
result = migrate_legacy_data(app_url, None, legacy_url, dry_run=True)
assert result["poo_records"]["source"] == len(POO_ROWS)
assert result["poo_records"]["copied"] == 0
assert result["poo_records"]["skipped"] is False
assert result["poo_records"]["final"] == 0
assert _count_rows(app_path, "poo_records") == 0
def test_dry_run_both_tables(tmp_path: Path) -> None:
"""dry_run=True for both tables: both reported, nothing written."""
app_path, app_url = _upgraded_app_db(tmp_path)
loc_path = tmp_path / "locationRecorder.db"
_make_legacy_location_db(loc_path, LOCATION_ROWS)
loc_url = f"sqlite:///{loc_path}"
poo_path = tmp_path / "pooRecorder.db"
_make_legacy_poo_db(poo_path, POO_ROWS)
poo_url = f"sqlite:///{poo_path}"
result = migrate_legacy_data(app_url, loc_url, poo_url, dry_run=True)
assert result["location"]["source"] == len(LOCATION_ROWS)
assert result["location"]["copied"] == 0
assert result["poo_records"]["source"] == len(POO_ROWS)
assert result["poo_records"]["copied"] == 0
assert _count_rows(app_path, "location") == 0
assert _count_rows(app_path, "poo_records") == 0
# ---------------------------------------------------------------------------
# Test: _sqlite_path_from_url helper
# ---------------------------------------------------------------------------
def test_sqlite_path_from_url_parses_url() -> None:
path = _sqlite_path_from_url("sqlite:///./data/app.db")
# Path normalises './' away, but the tail should remain
assert path == Path("data/app.db")
def test_sqlite_path_from_url_treats_plain_path_as_path() -> None:
path = _sqlite_path_from_url("/tmp/some.db")
assert str(path) == "/tmp/some.db"