diff --git a/scripts/migrate_legacy_data.py b/scripts/migrate_legacy_data.py new file mode 100644 index 0000000..d36bfae --- /dev/null +++ b/scripts/migrate_legacy_data.py @@ -0,0 +1,267 @@ +"""One-time idempotent data migration: copy rows from legacy locationRecorder.db / +pooRecorder.db into the unified app DB's location / poo_records tables. + +NOT part of the Alembic chain. Run manually, once, during production cut-over: + + python -m scripts.migrate_legacy_data \\ + --app-db sqlite:///./data/app.db \\ + --location-db sqlite:///./data/locationRecorder.db \\ + --poo-db sqlite:///./data/pooRecorder.db + +Or rely on environment variables: + APP_DATABASE_URL, LOCATION_DATABASE_URL, POO_DATABASE_URL + +Add --dry-run to preview row counts without writing anything. + +Return value of migrate_legacy_data(): a dict shaped like: + { + "location": {"source": N, "copied": C, "skipped": bool, "final": F}, + "poo_records": {"source": N, "copied": C, "skipped": bool, "final": F}, + } +where: + source - rows in the legacy DB (0 when skipped) + copied - rows inserted by this run (0 when dry_run or skipped) + skipped - True when the legacy file was absent + final - rows present in the app table after the run (0 when dry_run) +""" + +from __future__ import annotations + +import argparse +import os +import sqlite3 +import sys +from pathlib import Path + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _sqlite_path_from_url(url: str) -> Path: + """Extract the filesystem path from a sqlite:///... URL. + + If *url* does not start with 'sqlite:///', it is treated as a plain path. + """ + prefix = "sqlite:///" + if url.startswith(prefix): + return Path(url[len(prefix):]) + return Path(url) + + +def _reconcile( + conn: sqlite3.Connection, + table: str, + pk_cols: list[str], + source_count: int, +) -> int: + """Verify every legacy source row is present in the main (app) table. + + Returns the count of source rows present in main. + Raises RuntimeError if any rows are missing. + """ + join_cond = " AND ".join( + f"m.{col} = l.{col}" for col in pk_cols + ) + sql = ( + f"SELECT COUNT(*) FROM legacy.{table} l " + f"WHERE EXISTS (SELECT 1 FROM main.{table} m WHERE {join_cond})" + ) + (present,) = conn.execute(sql).fetchone() + if present < source_count: + missing = source_count - present + raise RuntimeError( + f"Reconciliation failed for table '{table}': " + f"{missing} of {source_count} source rows are missing from the app DB." + ) + return present + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def migrate_legacy_data( + app_url: str, + location_url: str | None, + poo_url: str | None, + *, + dry_run: bool = False, +) -> dict: + """Copy rows from legacy DBs into the app DB's location / poo_records tables. + + Parameters + ---------- + app_url: sqlite:///... URL (or plain path) for the unified app DB. + location_url: sqlite:///... URL (or plain path) for the legacy location DB, + or None to skip that table. + poo_url: sqlite:///... URL (or plain path) for the legacy poo DB, + or None to skip that table. + dry_run: When True, gather counts only; perform no writes. + + Returns a dict with per-table stats (see module docstring). + Raises RuntimeError on reconciliation failure (non-zero rows missing). + """ + app_path = _sqlite_path_from_url(app_url) + + results: dict[str, dict] = {} + + # --- location table --- + results["location"] = _migrate_table( + app_path=app_path, + legacy_url=location_url, + table="location", + columns=["person", "datetime", "latitude", "longitude", "altitude"], + pk_cols=["person", "datetime"], + dry_run=dry_run, + ) + + # --- poo_records table --- + results["poo_records"] = _migrate_table( + app_path=app_path, + legacy_url=poo_url, + table="poo_records", + columns=["timestamp", "status", "latitude", "longitude"], + pk_cols=["timestamp"], + dry_run=dry_run, + ) + + return results + + +def _migrate_table( + *, + app_path: Path, + legacy_url: str | None, + table: str, + columns: list[str], + pk_cols: list[str], + dry_run: bool, +) -> dict: + """Migrate a single table from a legacy DB into the app DB. + + Returns a per-table stats dict. + """ + # If the caller passed None → treat as absent + if legacy_url is None: + return {"source": 0, "copied": 0, "skipped": True, "final": 0} + + legacy_path = _sqlite_path_from_url(legacy_url) + + # If the file doesn't exist → safe no-op + if not legacy_path.exists(): + return {"source": 0, "copied": 0, "skipped": True, "final": 0} + + col_list = ", ".join(columns) + + conn = sqlite3.connect(app_path) + try: + conn.execute("ATTACH DATABASE ? AS legacy", (str(legacy_path),)) + + # Count source rows + (source_count,) = conn.execute(f"SELECT COUNT(*) FROM legacy.{table}").fetchone() + + if dry_run: + conn.execute("DETACH DATABASE legacy") + return { + "source": source_count, + "copied": 0, + "skipped": False, + "final": 0, + } + + # Count rows already in the target before this run + (before_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone() + + # Idempotent insert — PK conflict → skip + conn.execute( + f"INSERT OR IGNORE INTO main.{table} ({col_list}) " + f"SELECT {col_list} FROM legacy.{table}" + ) + conn.commit() + + # Count rows now + (after_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone() + copied = after_count - before_count + + # Reconciliation: every source row must be present + _reconcile(conn, table, pk_cols, source_count) + + conn.execute("DETACH DATABASE legacy") + finally: + conn.close() + + return { + "source": source_count, + "copied": copied, + "skipped": False, + "final": after_count, + } + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Migrate legacy location/poo data into the unified app DB." + ) + parser.add_argument( + "--app-db", + default=os.environ.get("APP_DATABASE_URL"), + help="sqlite:///... URL or path for the app DB " + "(default: $APP_DATABASE_URL)", + ) + parser.add_argument( + "--location-db", + default=os.environ.get("LOCATION_DATABASE_URL"), + help="sqlite:///... URL or path for the legacy location DB " + "(default: $LOCATION_DATABASE_URL). Omit to skip location table.", + ) + parser.add_argument( + "--poo-db", + default=os.environ.get("POO_DATABASE_URL"), + help="sqlite:///... URL or path for the legacy poo DB " + "(default: $POO_DATABASE_URL). Omit to skip poo_records table.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Report counts only; do not write any rows.", + ) + args = parser.parse_args() + + if not args.app_db: + parser.error( + "App DB not specified. Pass --app-db or set APP_DATABASE_URL." + ) + + try: + results = migrate_legacy_data( + app_url=args.app_db, + location_url=args.location_db, + poo_url=args.poo_db, + dry_run=args.dry_run, + ) + except RuntimeError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + sys.exit(1) + + prefix = "[DRY RUN] " if args.dry_run else "" + print(f"{prefix}Migration results:") + for table_name, stats in results.items(): + if stats["skipped"]: + print(f" {table_name}: SKIPPED (legacy file absent or not provided)") + else: + print( + f" {table_name}: source={stats['source']}, " + f"copied={stats['copied']}, final={stats['final']}" + ) + + +if __name__ == "__main__": + main() diff --git a/tests/test_migrate_legacy_data.py b/tests/test_migrate_legacy_data.py new file mode 100644 index 0000000..bd6fa47 --- /dev/null +++ b/tests/test_migrate_legacy_data.py @@ -0,0 +1,403 @@ +"""Tests for scripts/migrate_legacy_data.py (M1-T02). + +Uses pytest tmp_path for all temp files. The app DB is brought to head via +alembic_app.ini (the same approach used by conftest._make_app_alembic_config), +so it has the location and poo_records tables created in T01. + +Legacy DBs are built by hand with real columns matching the legacy baseline schema. +""" + +from __future__ import annotations + +import sqlite3 +import sys +from pathlib import Path + +import pytest +from alembic import command +from alembic.config import Config + +from scripts.migrate_legacy_data import ( + _reconcile, + _sqlite_path_from_url, + migrate_legacy_data, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_app_alembic_config(database_url: str) -> Config: + cfg = Config("alembic_app.ini") + cfg.set_main_option("sqlalchemy.url", database_url) + return cfg + + +def _upgraded_app_db(tmp_path: Path, name: str = "app_test.db") -> tuple[Path, str]: + """Create and upgrade an app DB to head; return (path, url).""" + db_path = tmp_path / name + db_url = f"sqlite:///{db_path}" + command.upgrade(_make_app_alembic_config(db_url), "head") + return db_path, db_url + + +def _make_legacy_location_db(db_path: Path, rows: list[tuple]) -> None: + """Create a legacy location DB and insert given rows. + + Each row is a tuple: (person, datetime, latitude, longitude, altitude). + altitude may be None. + """ + conn = sqlite3.connect(db_path) + conn.execute( + """ + CREATE TABLE location ( + person TEXT NOT NULL, + datetime TEXT NOT NULL, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + altitude REAL, + PRIMARY KEY (person, datetime) + ) + """ + ) + conn.executemany( + "INSERT INTO location (person, datetime, latitude, longitude, altitude) " + "VALUES (?, ?, ?, ?, ?)", + rows, + ) + conn.commit() + conn.close() + + +def _make_legacy_poo_db(db_path: Path, rows: list[tuple]) -> None: + """Create a legacy poo DB and insert given rows. + + Each row is a tuple: (timestamp, status, latitude, longitude). + """ + conn = sqlite3.connect(db_path) + conn.execute( + """ + CREATE TABLE poo_records ( + timestamp TEXT NOT NULL, + status TEXT NOT NULL, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + PRIMARY KEY (timestamp) + ) + """ + ) + conn.executemany( + "INSERT INTO poo_records (timestamp, status, latitude, longitude) " + "VALUES (?, ?, ?, ?)", + rows, + ) + conn.commit() + conn.close() + + +def _count_rows(db_path: Path, table: str) -> int: + conn = sqlite3.connect(db_path) + try: + (count,) = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone() + return count + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Sample data +# --------------------------------------------------------------------------- + +LOCATION_ROWS = [ + ("alice", "2026-01-01T10:00:00Z", 1.23, 4.56, 7.89), + ("bob", "2026-01-02T10:00:00Z", 2.34, 5.67, None), + ("alice", "2026-01-03T10:00:00Z", 3.45, 6.78, 9.01), +] + +POO_ROWS = [ + ("2026-01-01T08:00:00Z", "complete", 10.0, 20.0), + ("2026-01-02T08:00:00Z", "urgent", 11.0, 21.0), +] + + +# --------------------------------------------------------------------------- +# Test 1: Idempotency +# --------------------------------------------------------------------------- + + +def test_location_migration_is_idempotent(tmp_path: Path) -> None: + """N source rows → app table has N rows; run again → still N rows.""" + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(legacy_path, LOCATION_ROWS) + legacy_url = f"sqlite:///{legacy_path}" + + # First run + result1 = migrate_legacy_data(app_url, legacy_url, None) + assert result1["location"]["source"] == len(LOCATION_ROWS) + assert result1["location"]["copied"] == len(LOCATION_ROWS) + assert result1["location"]["skipped"] is False + assert result1["location"]["final"] == len(LOCATION_ROWS) + assert _count_rows(app_path, "location") == len(LOCATION_ROWS) + + # Second run — idempotent, no dupes, no error + result2 = migrate_legacy_data(app_url, legacy_url, None) + assert result2["location"]["source"] == len(LOCATION_ROWS) + assert result2["location"]["copied"] == 0 # nothing new + assert result2["location"]["skipped"] is False + assert result2["location"]["final"] == len(LOCATION_ROWS) + assert _count_rows(app_path, "location") == len(LOCATION_ROWS) + + +def test_poo_migration_is_idempotent(tmp_path: Path) -> None: + """N poo source rows → app table has N rows; run again → still N rows.""" + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "pooRecorder.db" + _make_legacy_poo_db(legacy_path, POO_ROWS) + legacy_url = f"sqlite:///{legacy_path}" + + result1 = migrate_legacy_data(app_url, None, legacy_url) + assert result1["poo_records"]["source"] == len(POO_ROWS) + assert result1["poo_records"]["copied"] == len(POO_ROWS) + assert result1["poo_records"]["skipped"] is False + assert result1["poo_records"]["final"] == len(POO_ROWS) + assert _count_rows(app_path, "poo_records") == len(POO_ROWS) + + result2 = migrate_legacy_data(app_url, None, legacy_url) + assert result2["poo_records"]["copied"] == 0 + assert result2["poo_records"]["final"] == len(POO_ROWS) + assert _count_rows(app_path, "poo_records") == len(POO_ROWS) + + +def test_both_tables_migration_is_idempotent(tmp_path: Path) -> None: + """Migrating both tables at once is idempotent.""" + app_path, app_url = _upgraded_app_db(tmp_path) + + loc_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(loc_path, LOCATION_ROWS) + loc_url = f"sqlite:///{loc_path}" + + poo_path = tmp_path / "pooRecorder.db" + _make_legacy_poo_db(poo_path, POO_ROWS) + poo_url = f"sqlite:///{poo_path}" + + result1 = migrate_legacy_data(app_url, loc_url, poo_url) + assert result1["location"]["final"] == len(LOCATION_ROWS) + assert result1["poo_records"]["final"] == len(POO_ROWS) + + result2 = migrate_legacy_data(app_url, loc_url, poo_url) + assert result2["location"]["copied"] == 0 + assert result2["poo_records"]["copied"] == 0 + assert _count_rows(app_path, "location") == len(LOCATION_ROWS) + assert _count_rows(app_path, "poo_records") == len(POO_ROWS) + + +# --------------------------------------------------------------------------- +# Test 2: Missing legacy file +# --------------------------------------------------------------------------- + + +def test_missing_location_file_is_skipped(tmp_path: Path) -> None: + """Absent location DB → table result is skipped, no exception, app table empty.""" + app_path, app_url = _upgraded_app_db(tmp_path) + nonexistent = f"sqlite:///{tmp_path / 'does_not_exist_location.db'}" + + result = migrate_legacy_data(app_url, nonexistent, None) + + assert result["location"]["skipped"] is True + assert result["location"]["source"] == 0 + assert result["location"]["copied"] == 0 + assert _count_rows(app_path, "location") == 0 + + +def test_missing_poo_file_is_skipped(tmp_path: Path) -> None: + """Absent poo DB → table result is skipped, no exception, app table empty.""" + app_path, app_url = _upgraded_app_db(tmp_path) + nonexistent = f"sqlite:///{tmp_path / 'does_not_exist_poo.db'}" + + result = migrate_legacy_data(app_url, None, nonexistent) + + assert result["poo_records"]["skipped"] is True + assert result["poo_records"]["source"] == 0 + assert result["poo_records"]["copied"] == 0 + assert _count_rows(app_path, "poo_records") == 0 + + +def test_none_location_url_is_skipped(tmp_path: Path) -> None: + """Passing None for location_url → skipped, no exception.""" + _, app_url = _upgraded_app_db(tmp_path) + + result = migrate_legacy_data(app_url, None, None) + + assert result["location"]["skipped"] is True + assert result["poo_records"]["skipped"] is True + + +# --------------------------------------------------------------------------- +# Test 3: Reconciliation failure +# --------------------------------------------------------------------------- + + +def test_reconcile_raises_on_missing_rows(tmp_path: Path) -> None: + """_reconcile() raises RuntimeError when source rows are missing from target.""" + # Build an app DB and a legacy DB with 3 rows + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(legacy_path, LOCATION_ROWS) + + # Only insert 1 row into the app DB manually (simulate partial migration) + conn = sqlite3.connect(app_path) + conn.execute( + "INSERT INTO location (person, datetime, latitude, longitude, altitude) " + "VALUES (?, ?, ?, ?, ?)", + LOCATION_ROWS[0], + ) + conn.commit() + # ATTACH legacy to run _reconcile + conn.execute(f"ATTACH DATABASE '{legacy_path}' AS legacy") + with pytest.raises(RuntimeError, match="Reconciliation failed"): + _reconcile( + conn, + table="location", + pk_cols=["person", "datetime"], + source_count=len(LOCATION_ROWS), + ) + conn.execute("DETACH DATABASE legacy") + conn.close() + + +def test_migrate_reconciliation_failure_raises(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """If a row goes missing after INSERT, migrate_legacy_data raises RuntimeError.""" + import scripts.migrate_legacy_data as mod + + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(legacy_path, LOCATION_ROWS) + legacy_url = f"sqlite:///{legacy_path}" + + def _always_fail(conn, table, pk_cols, source_count): + # Simulate a scenario where reconciliation finds rows missing + raise RuntimeError( + f"Reconciliation failed for table '{table}': " + f"1 of {source_count} source rows are missing from the app DB." + ) + + monkeypatch.setattr(mod, "_reconcile", _always_fail) + + with pytest.raises(RuntimeError, match="Reconciliation failed"): + migrate_legacy_data(app_url, legacy_url, None) + + +def test_cli_exits_nonzero_on_reconciliation_failure( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """CLI main() exits non-zero when reconciliation raises.""" + import scripts.migrate_legacy_data as mod + + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(legacy_path, LOCATION_ROWS) + legacy_url = f"sqlite:///{legacy_path}" + + # Patch _reconcile to always raise + def _always_fail(conn, table, pk_cols, source_count): + raise RuntimeError( + f"Reconciliation failed for table '{table}': 1 row missing." + ) + + monkeypatch.setattr(mod, "_reconcile", _always_fail) + + # Patch sys.argv so main() picks up the right args + monkeypatch.setattr( + sys, + "argv", + [ + "migrate_legacy_data", + "--app-db", app_url, + "--location-db", legacy_url, + ], + ) + + with pytest.raises(SystemExit) as exc_info: + mod.main() + + assert exc_info.value.code != 0 + + +# --------------------------------------------------------------------------- +# Test 4: dry_run +# --------------------------------------------------------------------------- + + +def test_dry_run_does_not_write_location_rows(tmp_path: Path) -> None: + """dry_run=True reports source counts but writes nothing.""" + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(legacy_path, LOCATION_ROWS) + legacy_url = f"sqlite:///{legacy_path}" + + result = migrate_legacy_data(app_url, legacy_url, None, dry_run=True) + + assert result["location"]["source"] == len(LOCATION_ROWS) + assert result["location"]["copied"] == 0 + assert result["location"]["skipped"] is False + # dry_run returns final=0 (no actual query on app side) + assert result["location"]["final"] == 0 + # App table must still be empty + assert _count_rows(app_path, "location") == 0 + + +def test_dry_run_does_not_write_poo_rows(tmp_path: Path) -> None: + """dry_run=True for poo_records: source reported, nothing written.""" + app_path, app_url = _upgraded_app_db(tmp_path) + legacy_path = tmp_path / "pooRecorder.db" + _make_legacy_poo_db(legacy_path, POO_ROWS) + legacy_url = f"sqlite:///{legacy_path}" + + result = migrate_legacy_data(app_url, None, legacy_url, dry_run=True) + + assert result["poo_records"]["source"] == len(POO_ROWS) + assert result["poo_records"]["copied"] == 0 + assert result["poo_records"]["skipped"] is False + assert result["poo_records"]["final"] == 0 + assert _count_rows(app_path, "poo_records") == 0 + + +def test_dry_run_both_tables(tmp_path: Path) -> None: + """dry_run=True for both tables: both reported, nothing written.""" + app_path, app_url = _upgraded_app_db(tmp_path) + + loc_path = tmp_path / "locationRecorder.db" + _make_legacy_location_db(loc_path, LOCATION_ROWS) + loc_url = f"sqlite:///{loc_path}" + + poo_path = tmp_path / "pooRecorder.db" + _make_legacy_poo_db(poo_path, POO_ROWS) + poo_url = f"sqlite:///{poo_path}" + + result = migrate_legacy_data(app_url, loc_url, poo_url, dry_run=True) + + assert result["location"]["source"] == len(LOCATION_ROWS) + assert result["location"]["copied"] == 0 + assert result["poo_records"]["source"] == len(POO_ROWS) + assert result["poo_records"]["copied"] == 0 + assert _count_rows(app_path, "location") == 0 + assert _count_rows(app_path, "poo_records") == 0 + + +# --------------------------------------------------------------------------- +# Test: _sqlite_path_from_url helper +# --------------------------------------------------------------------------- + + +def test_sqlite_path_from_url_parses_url() -> None: + path = _sqlite_path_from_url("sqlite:///./data/app.db") + # Path normalises './' away, but the tail should remain + assert path == Path("data/app.db") + + +def test_sqlite_path_from_url_treats_plain_path_as_path() -> None: + path = _sqlite_path_from_url("/tmp/some.db") + assert str(path) == "/tmp/some.db"