M1-T02: add idempotent legacy data migration script
scripts/migrate_legacy_data.py copies rows from the legacy locationRecorder.db / pooRecorder.db into the unified app DB's location / poo_records tables using ATTACH + INSERT OR IGNORE (idempotent via PK-conflict skip; explicit columns, never SELECT *). After copy it reconciles every source row against the target and raises / exits non-zero on any shortfall. Missing legacy files are a safe no-op (skipped); --dry-run writes nothing. Not part of the Alembic chain; run manually once at cut-over. Never deletes or overwrites any file. Validated end-to-end on copies of the real production DBs: dry-run reported 75103 location + 874 poo rows and wrote nothing; the real run copied all rows with reconciliation passing; a second run copied 0 (idempotent).
This commit is contained in:
@@ -0,0 +1,267 @@
|
||||
"""One-time idempotent data migration: copy rows from legacy locationRecorder.db /
|
||||
pooRecorder.db into the unified app DB's location / poo_records tables.
|
||||
|
||||
NOT part of the Alembic chain. Run manually, once, during production cut-over:
|
||||
|
||||
python -m scripts.migrate_legacy_data \\
|
||||
--app-db sqlite:///./data/app.db \\
|
||||
--location-db sqlite:///./data/locationRecorder.db \\
|
||||
--poo-db sqlite:///./data/pooRecorder.db
|
||||
|
||||
Or rely on environment variables:
|
||||
APP_DATABASE_URL, LOCATION_DATABASE_URL, POO_DATABASE_URL
|
||||
|
||||
Add --dry-run to preview row counts without writing anything.
|
||||
|
||||
Return value of migrate_legacy_data(): a dict shaped like:
|
||||
{
|
||||
"location": {"source": N, "copied": C, "skipped": bool, "final": F},
|
||||
"poo_records": {"source": N, "copied": C, "skipped": bool, "final": F},
|
||||
}
|
||||
where:
|
||||
source - rows in the legacy DB (0 when skipped)
|
||||
copied - rows inserted by this run (0 when dry_run or skipped)
|
||||
skipped - True when the legacy file was absent
|
||||
final - rows present in the app table after the run (0 when dry_run)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _sqlite_path_from_url(url: str) -> Path:
|
||||
"""Extract the filesystem path from a sqlite:///... URL.
|
||||
|
||||
If *url* does not start with 'sqlite:///', it is treated as a plain path.
|
||||
"""
|
||||
prefix = "sqlite:///"
|
||||
if url.startswith(prefix):
|
||||
return Path(url[len(prefix):])
|
||||
return Path(url)
|
||||
|
||||
|
||||
def _reconcile(
|
||||
conn: sqlite3.Connection,
|
||||
table: str,
|
||||
pk_cols: list[str],
|
||||
source_count: int,
|
||||
) -> int:
|
||||
"""Verify every legacy source row is present in the main (app) table.
|
||||
|
||||
Returns the count of source rows present in main.
|
||||
Raises RuntimeError if any rows are missing.
|
||||
"""
|
||||
join_cond = " AND ".join(
|
||||
f"m.{col} = l.{col}" for col in pk_cols
|
||||
)
|
||||
sql = (
|
||||
f"SELECT COUNT(*) FROM legacy.{table} l "
|
||||
f"WHERE EXISTS (SELECT 1 FROM main.{table} m WHERE {join_cond})"
|
||||
)
|
||||
(present,) = conn.execute(sql).fetchone()
|
||||
if present < source_count:
|
||||
missing = source_count - present
|
||||
raise RuntimeError(
|
||||
f"Reconciliation failed for table '{table}': "
|
||||
f"{missing} of {source_count} source rows are missing from the app DB."
|
||||
)
|
||||
return present
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def migrate_legacy_data(
|
||||
app_url: str,
|
||||
location_url: str | None,
|
||||
poo_url: str | None,
|
||||
*,
|
||||
dry_run: bool = False,
|
||||
) -> dict:
|
||||
"""Copy rows from legacy DBs into the app DB's location / poo_records tables.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
app_url: sqlite:///... URL (or plain path) for the unified app DB.
|
||||
location_url: sqlite:///... URL (or plain path) for the legacy location DB,
|
||||
or None to skip that table.
|
||||
poo_url: sqlite:///... URL (or plain path) for the legacy poo DB,
|
||||
or None to skip that table.
|
||||
dry_run: When True, gather counts only; perform no writes.
|
||||
|
||||
Returns a dict with per-table stats (see module docstring).
|
||||
Raises RuntimeError on reconciliation failure (non-zero rows missing).
|
||||
"""
|
||||
app_path = _sqlite_path_from_url(app_url)
|
||||
|
||||
results: dict[str, dict] = {}
|
||||
|
||||
# --- location table ---
|
||||
results["location"] = _migrate_table(
|
||||
app_path=app_path,
|
||||
legacy_url=location_url,
|
||||
table="location",
|
||||
columns=["person", "datetime", "latitude", "longitude", "altitude"],
|
||||
pk_cols=["person", "datetime"],
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
# --- poo_records table ---
|
||||
results["poo_records"] = _migrate_table(
|
||||
app_path=app_path,
|
||||
legacy_url=poo_url,
|
||||
table="poo_records",
|
||||
columns=["timestamp", "status", "latitude", "longitude"],
|
||||
pk_cols=["timestamp"],
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _migrate_table(
|
||||
*,
|
||||
app_path: Path,
|
||||
legacy_url: str | None,
|
||||
table: str,
|
||||
columns: list[str],
|
||||
pk_cols: list[str],
|
||||
dry_run: bool,
|
||||
) -> dict:
|
||||
"""Migrate a single table from a legacy DB into the app DB.
|
||||
|
||||
Returns a per-table stats dict.
|
||||
"""
|
||||
# If the caller passed None → treat as absent
|
||||
if legacy_url is None:
|
||||
return {"source": 0, "copied": 0, "skipped": True, "final": 0}
|
||||
|
||||
legacy_path = _sqlite_path_from_url(legacy_url)
|
||||
|
||||
# If the file doesn't exist → safe no-op
|
||||
if not legacy_path.exists():
|
||||
return {"source": 0, "copied": 0, "skipped": True, "final": 0}
|
||||
|
||||
col_list = ", ".join(columns)
|
||||
|
||||
conn = sqlite3.connect(app_path)
|
||||
try:
|
||||
conn.execute("ATTACH DATABASE ? AS legacy", (str(legacy_path),))
|
||||
|
||||
# Count source rows
|
||||
(source_count,) = conn.execute(f"SELECT COUNT(*) FROM legacy.{table}").fetchone()
|
||||
|
||||
if dry_run:
|
||||
conn.execute("DETACH DATABASE legacy")
|
||||
return {
|
||||
"source": source_count,
|
||||
"copied": 0,
|
||||
"skipped": False,
|
||||
"final": 0,
|
||||
}
|
||||
|
||||
# Count rows already in the target before this run
|
||||
(before_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone()
|
||||
|
||||
# Idempotent insert — PK conflict → skip
|
||||
conn.execute(
|
||||
f"INSERT OR IGNORE INTO main.{table} ({col_list}) "
|
||||
f"SELECT {col_list} FROM legacy.{table}"
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Count rows now
|
||||
(after_count,) = conn.execute(f"SELECT COUNT(*) FROM main.{table}").fetchone()
|
||||
copied = after_count - before_count
|
||||
|
||||
# Reconciliation: every source row must be present
|
||||
_reconcile(conn, table, pk_cols, source_count)
|
||||
|
||||
conn.execute("DETACH DATABASE legacy")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
return {
|
||||
"source": source_count,
|
||||
"copied": copied,
|
||||
"skipped": False,
|
||||
"final": after_count,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Migrate legacy location/poo data into the unified app DB."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--app-db",
|
||||
default=os.environ.get("APP_DATABASE_URL"),
|
||||
help="sqlite:///... URL or path for the app DB "
|
||||
"(default: $APP_DATABASE_URL)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--location-db",
|
||||
default=os.environ.get("LOCATION_DATABASE_URL"),
|
||||
help="sqlite:///... URL or path for the legacy location DB "
|
||||
"(default: $LOCATION_DATABASE_URL). Omit to skip location table.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--poo-db",
|
||||
default=os.environ.get("POO_DATABASE_URL"),
|
||||
help="sqlite:///... URL or path for the legacy poo DB "
|
||||
"(default: $POO_DATABASE_URL). Omit to skip poo_records table.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Report counts only; do not write any rows.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.app_db:
|
||||
parser.error(
|
||||
"App DB not specified. Pass --app-db or set APP_DATABASE_URL."
|
||||
)
|
||||
|
||||
try:
|
||||
results = migrate_legacy_data(
|
||||
app_url=args.app_db,
|
||||
location_url=args.location_db,
|
||||
poo_url=args.poo_db,
|
||||
dry_run=args.dry_run,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
prefix = "[DRY RUN] " if args.dry_run else ""
|
||||
print(f"{prefix}Migration results:")
|
||||
for table_name, stats in results.items():
|
||||
if stats["skipped"]:
|
||||
print(f" {table_name}: SKIPPED (legacy file absent or not provided)")
|
||||
else:
|
||||
print(
|
||||
f" {table_name}: source={stats['source']}, "
|
||||
f"copied={stats['copied']}, final={stats['final']}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user