"""One-time idempotent data migration: copy rows from legacy locationRecorder.db / pooRecorder.db into the unified app DB's location / poo_records tables. NOT part of the Alembic chain. Run manually, once, during production cut-over: python -m scripts.migrate_legacy_data \\ --app-db sqlite:///./data/app.db \\ --location-db sqlite:///./data/locationRecorder.db \\ --poo-db sqlite:///./data/pooRecorder.db Or rely on environment variables: APP_DATABASE_URL, LOCATION_DATABASE_URL, POO_DATABASE_URL Add --dry-run to preview row counts without writing anything. Return value of migrate_legacy_data(): a dict shaped like: { "location": {"source": N, "copied": C, "skipped": bool, "final": F}, "poo_records": {"source": N, "copied": C, "skipped": bool, "final": F}, } where: source - rows in the legacy DB (0 when skipped) copied - rows inserted by this run (0 when dry_run or skipped) skipped - True when the legacy file was absent final - rows present in the app table after the run (0 when dry_run) """ from __future__ import annotations import argparse import os import sqlite3 import sys from pathlib import Path # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _sqlite_path_from_url(url: str) -> Path: """Extract the filesystem path from a sqlite:///... URL. If *url* does not start with 'sqlite:///', it is treated as a plain path. """ prefix = "sqlite:///" if url.startswith(prefix): return Path(url[len(prefix):]) return Path(url) def _reconcile( conn: sqlite3.Connection, table: str, pk_cols: list[str], source_count: int, ) -> int: """Verify every legacy source row is present in the main (app) table. Returns the count of source rows present in main. Raises RuntimeError if any rows are missing. """ join_cond = " AND ".join( f"m.{col} = l.{col}" for col in pk_cols ) sql = ( f"SELECT COUNT(*) FROM legacy.{table} l " f"WHERE EXISTS (SELECT 1 FROM main.{table} m WHERE {join_cond})" ) (present,) = conn.execute(sql).fetchone() if present < source_count: missing = source_count - present raise RuntimeError( f"Reconciliation failed for table '{table}': " f"{missing} of {source_count} source rows are missing from the app DB." ) return present # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def migrate_legacy_data( app_url: str, location_url: str | None, poo_url: str | None, *, dry_run: bool = False, ) -> dict: """Copy rows from legacy DBs into the app DB's location / poo_records tables. Parameters ---------- app_url: sqlite:///... URL (or plain path) for the unified app DB. location_url: sqlite:///... URL (or plain path) for the legacy location DB, or None to skip that table. poo_url: sqlite:///... URL (or plain path) for the legacy poo DB, or None to skip that table. dry_run: When True, gather counts only; perform no writes. Returns a dict with per-table stats (see module docstring). Raises RuntimeError on reconciliation failure (non-zero rows missing). """ app_path = _sqlite_path_from_url(app_url) results: dict[str, dict] = {} # --- location table --- results["location"] = _migrate_table( app_path=app_path, legacy_url=location_url, table="location", columns=["person", "datetime", "latitude", "longitude", "altitude"], pk_cols=["person", "datetime"], dry_run=dry_run, ) # --- poo_records table --- results["poo_records"] = _migrate_table( app_path=app_path, legacy_url=poo_url, table="poo_records", columns=["timestamp", "status", "latitude", "longitude"], pk_cols=["timestamp"], dry_run=dry_run, ) return results def _migrate_table( *, app_path: Path, legacy_url: str | None, table: str, columns: list[str], pk_cols: list[str], dry_run: bool, ) -> dict: """Migrate a single table from a legacy DB into the app DB. Returns a per-table stats dict. """ # If the caller passed None → treat as absent if legacy_url is None: return {"source": 0, "copied": 0, "skipped": True, "final": 0} legacy_path = _sqlite_path_from_url(legacy_url) # If the file doesn't exist → safe no-op if not legacy_path.exists(): return {"source": 0, "copied": 0, "skipped": True, "final": 0} col_list = ", ".join(columns) conn = sqlite3.connect(app_path) try: conn.execute("ATTACH DATABASE ? AS legacy", (str(legacy_path),)) # Count source rows (source_count,) = conn.execute(f"SELECT COUNT(*) FROM legacy.{table}").fetchone() if dry_run: conn.execute("DETACH DATABASE legacy") return { "source": source_count, "copied": 0, "skipped": False, "final": 0, } # Count rows already in the target before this run (before_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone() # Idempotent insert — PK conflict → skip conn.execute( f"INSERT OR IGNORE INTO main.{table} ({col_list}) " f"SELECT {col_list} FROM legacy.{table}" ) conn.commit() # Count rows now (after_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone() copied = after_count - before_count # Reconciliation: every source row must be present _reconcile(conn, table, pk_cols, source_count) conn.execute("DETACH DATABASE legacy") finally: conn.close() return { "source": source_count, "copied": copied, "skipped": False, "final": after_count, } # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Migrate legacy location/poo data into the unified app DB." ) parser.add_argument( "--app-db", default=os.environ.get("APP_DATABASE_URL"), help="sqlite:///... URL or path for the app DB " "(default: $APP_DATABASE_URL)", ) parser.add_argument( "--location-db", default=os.environ.get("LOCATION_DATABASE_URL"), help="sqlite:///... URL or path for the legacy location DB " "(default: $LOCATION_DATABASE_URL). Omit to skip location table.", ) parser.add_argument( "--poo-db", default=os.environ.get("POO_DATABASE_URL"), help="sqlite:///... URL or path for the legacy poo DB " "(default: $POO_DATABASE_URL). Omit to skip poo_records table.", ) parser.add_argument( "--dry-run", action="store_true", help="Report counts only; do not write any rows.", ) args = parser.parse_args() if not args.app_db: parser.error( "App DB not specified. Pass --app-db or set APP_DATABASE_URL." ) try: results = migrate_legacy_data( app_url=args.app_db, location_url=args.location_db, poo_url=args.poo_db, dry_run=args.dry_run, ) except RuntimeError as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1) prefix = "[DRY RUN] " if args.dry_run else "" print(f"{prefix}Migration results:") for table_name, stats in results.items(): if stats["skipped"]: print(f" {table_name}: SKIPPED (legacy file absent or not provided)") else: print( f" {table_name}: source={stats['source']}, " f"copied={stats['copied']}, final={stats['final']}" ) if __name__ == "__main__": main()