"""Tests for the Alembic migration wrapper (app.migrate). Covers: - Fresh DB: empty → upgrade head (tables created, version at head) - Unmanaged DB (2a): has tables + matches baseline → stamp V1 → upgrade head - Unmanaged DB (2b): schema mismatch → fail-close, no changes - Non-empty non-app DB (rogue tables) → unmanaged → fail-close - Managed DB: already at head → upgrade head is a no-op - verify_schema_is_current: pass when at head, fail-close otherwise - verify_schema_is_current: no write side-effects (no file creation) - init_db startup: fail-close when DB not at head, pass when at head - Data preservation: adoption does not lose existing data - Schema correctness: tables match the ORM model definitions - V1_REVISION constant matches the actual revision in versions/ - _detect_db_state correctly identifies all three states - _schema_matches_baseline checks FK, indexes, PK, types — not just column names - CLI entry point: python -m app.migrate """ from pathlib import Path import shutil import pytest from sqlalchemy import create_engine, inspect, text import app.models # noqa: F401 — register models on Base.metadata from app.db import Base, SessionLocal, configure_database from app.migrate import ( V1_REVISION, _detect_db_state, _make_alembic_config, run_migrations, verify_schema_is_current, ) from app.main import create_app from fastapi.testclient import TestClient def _get_head_revision() -> str: """Resolve the current Alembic head revision from migration scripts.""" from alembic.script import ScriptDirectory cfg = _make_alembic_config("sqlite:///") # URL is unused for script lookup script = ScriptDirectory.from_config(cfg) return script.get_current_head() HEAD_REVISION = _get_head_revision() @pytest.fixture() def tmp_db_path(tmp_path): """Provide a temporary SQLite database path.""" return tmp_path / "test.db" @pytest.fixture() def tmp_db_url(tmp_db_path): """Provide a temporary SQLite database URL.""" return f"sqlite:///{tmp_db_path}" # --------------------------------------------------------------------------- # Fresh DB: empty → upgrade head # --------------------------------------------------------------------------- class TestFreshDBMigration: """Empty database gets all tables created by migration.""" def test_creates_all_tables(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) tables = set(inspect(eng).get_table_names()) eng.dispose() assert "boxes" in tables assert "items" in tables assert "subitems" in tables assert "app_settings" in tables def test_creates_alembic_version_table(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) tables = set(inspect(eng).get_table_names()) eng.dispose() assert "alembic_version" in tables def test_version_at_head(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) with eng.begin() as conn: version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar() eng.dispose() assert version == HEAD_REVISION def test_boxes_table_has_all_columns(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) columns = {col["name"] for col in inspect(eng).get_columns("boxes")} eng.dispose() expected = { "id", "name", "note", "room", "status", "image_blob", "image_mime_type", "image_width", "image_height", "created_at", "updated_at", } assert columns == expected def test_items_table_has_all_columns(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) columns = {col["name"] for col in inspect(eng).get_columns("items")} eng.dispose() expected = { "id", "box_id", "name", "note", "quantity", "is_container", "image_blob", "image_mime_type", "image_width", "image_height", "created_at", "updated_at", } assert columns == expected def test_subitems_table_has_all_columns(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) columns = {col["name"] for col in inspect(eng).get_columns("subitems")} eng.dispose() expected = { "id", "parent_item_id", "name", "note", "quantity", "image_blob", "image_mime_type", "image_width", "image_height", "created_at", "updated_at", } assert columns == expected def test_foreign_keys_exist(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) item_fks = inspect(eng).get_foreign_keys("items") subitem_fks = inspect(eng).get_foreign_keys("subitems") eng.dispose() assert len(item_fks) == 1 assert item_fks[0]["constrained_columns"] == ["box_id"] assert len(subitem_fks) == 1 assert subitem_fks[0]["constrained_columns"] == ["parent_item_id"] def test_indexes_exist(self, tmp_db_url): run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) box_indexes = inspect(eng).get_indexes("boxes") item_indexes = inspect(eng).get_indexes("items") subitem_indexes = inspect(eng).get_indexes("subitems") eng.dispose() assert any("ix_boxes_id" in idx["name"] for idx in box_indexes) assert any("ix_items_id" in idx["name"] for idx in item_indexes) assert any("ix_subitems_id" in idx["name"] for idx in subitem_indexes) # --------------------------------------------------------------------------- # Unmanaged DB adoption — 2a: matches baseline # --------------------------------------------------------------------------- class TestUnmanagedDBAdoption2a: """Database with existing tables matching baseline gets adopted.""" def _create_old_db(self, db_url: str) -> None: """Simulate a pre-Alembic DB: create V1 tables only + insert data.""" eng = create_engine(db_url) # Only create V1 tables (boxes, items, subitems) — not app_settings for table_name in ("boxes", "items", "subitems"): Base.metadata.tables[table_name].create(bind=eng) with eng.begin() as conn: conn.execute(text( "INSERT INTO boxes (name, room, status, created_at, updated_at) " "VALUES ('Kitchen Box', 'Kitchen', 'packed', '2026-01-01 00:00:00', '2026-01-01 00:00:00')" )) conn.execute(text( "INSERT INTO items (box_id, name, quantity, is_container, created_at, updated_at) " "VALUES (1, 'Plates', 4, 0, '2026-01-01 00:00:00', '2026-01-01 00:00:00')" )) eng.dispose() def test_stamp_and_upgrade(self, tmp_db_url): self._create_old_db(tmp_db_url) assert _detect_db_state(tmp_db_url) == "unmanaged" run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) with eng.begin() as conn: version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar() eng.dispose() assert version == HEAD_REVISION def test_data_preserved_after_adoption(self, tmp_db_url): self._create_old_db(tmp_db_url) run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) with eng.begin() as conn: box_count = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar() item_count = conn.execute(text("SELECT COUNT(*) FROM items")).scalar() box_name = conn.execute(text("SELECT name FROM boxes WHERE id = 1")).scalar() eng.dispose() assert box_count == 1 assert item_count == 1 assert box_name == "Kitchen Box" def test_no_extra_tables_beyond_migrations(self, tmp_db_url): self._create_old_db(tmp_db_url) run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) tables = set(inspect(eng).get_table_names()) eng.dispose() assert tables == {"alembic_version", "boxes", "items", "subitems", "app_settings"} def test_adoption_is_idempotent(self, tmp_db_url): """Running run_migrations twice does not error or duplicate data.""" self._create_old_db(tmp_db_url) run_migrations(tmp_db_url) run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) with eng.begin() as conn: box_count = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar() version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar() eng.dispose() assert box_count == 1 assert version == HEAD_REVISION # --------------------------------------------------------------------------- # Unmanaged DB — 2b: schema mismatch → fail-close # --------------------------------------------------------------------------- class TestUnmanagedDBMismatch2b: """Database with schema not matching baseline → fail-close, no changes.""" def _create_mismatched_db(self, db_url: str) -> None: """Create a DB that has tables but with wrong columns (missing image cols).""" eng = create_engine(db_url) with eng.begin() as conn: conn.execute(text( "CREATE TABLE boxes (" "id INTEGER PRIMARY KEY, name TEXT NOT NULL, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) conn.execute(text( "CREATE TABLE items (" "id INTEGER PRIMARY KEY, box_id INTEGER NOT NULL, name TEXT NOT NULL, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) conn.execute(text( "CREATE TABLE subitems (" "id INTEGER PRIMARY KEY, parent_item_id INTEGER NOT NULL, name TEXT NOT NULL, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) conn.execute(text( "INSERT INTO boxes (name, created_at, updated_at) " "VALUES ('Bad Box', '2026-01-01 00:00:00', '2026-01-01 00:00:00')" )) eng.dispose() def test_fail_close_on_mismatch(self, tmp_db_url): self._create_mismatched_db(tmp_db_url) assert _detect_db_state(tmp_db_url) == "unmanaged" with pytest.raises(SystemExit, match="does not match"): run_migrations(tmp_db_url) def test_db_unchanged_after_fail_close(self, tmp_db_url): self._create_mismatched_db(tmp_db_url) with pytest.raises(SystemExit): run_migrations(tmp_db_url) # DB should be completely unchanged eng = create_engine(tmp_db_url) tables = set(inspect(eng).get_table_names()) assert "alembic_version" not in tables with eng.begin() as conn: count = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar() eng.dispose() assert count == 1 # original data still there def test_extra_table_causes_fail_close(self, tmp_db_url): """A DB with the correct tables PLUS an extra one should fail.""" eng = create_engine(tmp_db_url) Base.metadata.create_all(bind=eng) with eng.begin() as conn: conn.execute(text("CREATE TABLE rogue_table (id INTEGER PRIMARY KEY)")) eng.dispose() with pytest.raises(SystemExit, match="does not match"): run_migrations(tmp_db_url) def test_missing_fk_causes_fail_close(self, tmp_db_url): """Tables with correct columns but missing FK should fail.""" eng = create_engine(tmp_db_url) with eng.begin() as conn: conn.execute(text( "CREATE TABLE boxes (id INTEGER PRIMARY KEY, name VARCHAR(100) NOT NULL, " "note TEXT, room VARCHAR(100), status VARCHAR(50), " "image_blob BLOB, image_mime_type VARCHAR(50), " "image_width INTEGER, image_height INTEGER, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) conn.execute(text( "CREATE TABLE items (id INTEGER PRIMARY KEY, box_id INTEGER NOT NULL, " "name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, " "is_container BOOLEAN NOT NULL DEFAULT 0, " "image_blob BLOB, image_mime_type VARCHAR(50), " "image_width INTEGER, image_height INTEGER, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) conn.execute(text( "CREATE TABLE subitems (id INTEGER PRIMARY KEY, parent_item_id INTEGER NOT NULL, " "name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, " "image_blob BLOB, image_mime_type VARCHAR(50), " "image_width INTEGER, image_height INTEGER, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) eng.dispose() with pytest.raises(SystemExit, match="does not match"): run_migrations(tmp_db_url) def test_missing_index_causes_fail_close(self, tmp_db_url): """Tables with correct columns and FK but missing index should fail.""" eng = create_engine(tmp_db_url) with eng.begin() as conn: conn.execute(text( "CREATE TABLE boxes (id INTEGER PRIMARY KEY, name VARCHAR(100) NOT NULL, " "note TEXT, room VARCHAR(100), status VARCHAR(50), " "image_blob BLOB, image_mime_type VARCHAR(50), " "image_width INTEGER, image_height INTEGER, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)" )) conn.execute(text( "CREATE TABLE items (id INTEGER PRIMARY KEY, box_id INTEGER NOT NULL, " "name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, " "is_container BOOLEAN NOT NULL DEFAULT 0, " "image_blob BLOB, image_mime_type VARCHAR(50), " "image_width INTEGER, image_height INTEGER, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, " "FOREIGN KEY(box_id) REFERENCES boxes(id) ON DELETE CASCADE)" )) conn.execute(text( "CREATE TABLE subitems (id INTEGER PRIMARY KEY, parent_item_id INTEGER NOT NULL, " "name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, " "image_blob BLOB, image_mime_type VARCHAR(50), " "image_width INTEGER, image_height INTEGER, " "created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, " "FOREIGN KEY(parent_item_id) REFERENCES items(id) ON DELETE CASCADE)" )) # No indexes created — should fail eng.dispose() with pytest.raises(SystemExit, match="does not match"): run_migrations(tmp_db_url) # --------------------------------------------------------------------------- # Non-empty non-app DB (rogue tables) — treated as unmanaged, fail-close # --------------------------------------------------------------------------- class TestRogueDatabase: """A DB with unrelated tables must be treated as unmanaged and fail-close.""" def test_rogue_table_detected_as_unmanaged(self, tmp_db_url): """A DB with only rogue_table should be 'unmanaged', not 'empty'.""" eng = create_engine(tmp_db_url) with eng.begin() as conn: conn.execute(text("CREATE TABLE rogue_table (id INTEGER PRIMARY KEY)")) eng.dispose() assert _detect_db_state(tmp_db_url) == "unmanaged" def test_rogue_table_migration_fails_closed(self, tmp_db_url): """Migration should fail-close, NOT create app tables in rogue DB.""" eng = create_engine(tmp_db_url) with eng.begin() as conn: conn.execute(text("CREATE TABLE rogue_table (id INTEGER PRIMARY KEY)")) eng.dispose() with pytest.raises(SystemExit, match="does not match"): run_migrations(tmp_db_url) # Verify no app tables were created eng = create_engine(tmp_db_url) tables = set(inspect(eng).get_table_names()) eng.dispose() assert tables == {"rogue_table"} # only the original rogue table assert "boxes" not in tables assert "alembic_version" not in tables # --------------------------------------------------------------------------- # Managed DB (already at head) # --------------------------------------------------------------------------- class TestManagedDBMigration: """Database already under Alembic control: upgrade head is a no-op.""" def test_upgrade_head_is_noop(self, tmp_db_url): run_migrations(tmp_db_url) # first run: creates tables assert _detect_db_state(tmp_db_url) == "managed" run_migrations(tmp_db_url) # second run: should be a no-op eng = create_engine(tmp_db_url) with eng.begin() as conn: version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar() eng.dispose() assert version == HEAD_REVISION # --------------------------------------------------------------------------- # _detect_db_state # --------------------------------------------------------------------------- class TestDetectDBState: def test_empty_db(self, tmp_db_url): assert _detect_db_state(tmp_db_url) == "empty" def test_unmanaged_db(self, tmp_db_url): eng = create_engine(tmp_db_url) Base.metadata.create_all(bind=eng) eng.dispose() assert _detect_db_state(tmp_db_url) == "unmanaged" def test_managed_db(self, tmp_db_url): run_migrations(tmp_db_url) assert _detect_db_state(tmp_db_url) == "managed" def test_rogue_table_is_unmanaged(self, tmp_db_url): """Any DB with tables but no alembic_version is 'unmanaged'.""" eng = create_engine(tmp_db_url) with eng.begin() as conn: conn.execute(text("CREATE TABLE something (id INTEGER)")) eng.dispose() assert _detect_db_state(tmp_db_url) == "unmanaged" # --------------------------------------------------------------------------- # verify_schema_is_current (read-only startup check) # --------------------------------------------------------------------------- class TestVerifySchemaIsCurrent: """verify_schema_is_current is read-only — only checks, never modifies.""" def test_passes_when_at_head(self, tmp_db_url): run_migrations(tmp_db_url) # Should not raise verify_schema_is_current(tmp_db_url) def test_fails_on_empty_db(self, tmp_db_url): with pytest.raises(RuntimeError, match="empty"): verify_schema_is_current(tmp_db_url) def test_fails_on_unmanaged_db(self, tmp_db_url): eng = create_engine(tmp_db_url) Base.metadata.create_all(bind=eng) eng.dispose() with pytest.raises(RuntimeError, match="alembic_version"): verify_schema_is_current(tmp_db_url) def test_fails_on_wrong_revision(self, tmp_db_url): """Stamp at an old/fake revision, then verify should fail.""" run_migrations(tmp_db_url) eng = create_engine(tmp_db_url) with eng.begin() as conn: conn.execute(text("DELETE FROM alembic_version")) conn.execute(text("INSERT INTO alembic_version VALUES ('fake_old_rev')")) eng.dispose() with pytest.raises(RuntimeError, match="fake_old_rev"): verify_schema_is_current(tmp_db_url) def test_does_not_modify_db(self, tmp_db_url): """Calling verify on an empty DB must not create any tables.""" with pytest.raises(RuntimeError): verify_schema_is_current(tmp_db_url) eng = create_engine(tmp_db_url) tables = set(inspect(eng).get_table_names()) eng.dispose() assert tables == set() # still empty def test_no_file_creation_for_missing_sqlite(self, tmp_path): """verify_schema_is_current must NOT create a missing SQLite file.""" missing_path = tmp_path / "nonexistent" / "missing.db" db_url = f"sqlite:///{missing_path}" with pytest.raises(RuntimeError, match="does not exist"): verify_schema_is_current(db_url) assert not missing_path.exists() assert not missing_path.parent.exists() # --------------------------------------------------------------------------- # V1_REVISION constant # --------------------------------------------------------------------------- class TestV1RevisionConstant: def test_revision_file_exists(self): """V1_REVISION must point to an actual migration file.""" versions_dir = Path(__file__).resolve().parent.parent / "migrations" / "versions" revision_files = list(versions_dir.glob(f"*{V1_REVISION}*.py")) assert len(revision_files) == 1, ( f"Expected exactly one file matching revision {V1_REVISION} " f"in {versions_dir}, found: {revision_files}" ) def test_revision_matches_baseline(self): """V1_REVISION must be the baseline (no down_revision).""" import importlib.util versions_dir = Path(__file__).resolve().parent.parent / "migrations" / "versions" revision_files = list(versions_dir.glob(f"*{V1_REVISION}*.py")) assert len(revision_files) == 1 spec = importlib.util.spec_from_file_location("v1_migration", revision_files[0]) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) assert mod.down_revision is None, "V1 baseline must have down_revision = None" assert mod.revision == V1_REVISION # --------------------------------------------------------------------------- # Integration: init_db startup verification # --------------------------------------------------------------------------- class TestInitDBStartupVerify: """init_db (called by create_app lifespan) verifies schema at startup.""" def test_app_starts_when_db_at_head(self, tmp_path): """App starts normally when DB has been migrated to head.""" test_db_path = tmp_path / "integration.db" database_url = f"sqlite:///{test_db_path}" run_migrations(database_url) configure_database(database_url) app = create_app() with TestClient(app) as client: response = client.get("/boxes", follow_redirects=False) assert response.status_code == 200 def test_init_db_fails_on_empty_db(self, tmp_path): """init_db raises RuntimeError on empty DB — app must not start.""" test_db_path = tmp_path / "empty.db" database_url = f"sqlite:///{test_db_path}" configure_database(database_url) app = create_app() with pytest.raises(RuntimeError, match="empty"): with TestClient(app): pass def test_init_db_fails_on_unmanaged_db(self, tmp_path): """init_db raises RuntimeError on unmanaged DB — app must not start.""" test_db_path = tmp_path / "unmanaged.db" database_url = f"sqlite:///{test_db_path}" # Create tables the old way (no alembic_version) eng = create_engine(database_url) Base.metadata.create_all(bind=eng) eng.dispose() configure_database(database_url) app = create_app() with pytest.raises(RuntimeError, match="alembic_version"): with TestClient(app): pass def test_full_crud_after_migration(self, tmp_path): """Full CRUD works when DB is migrated first, then app starts.""" test_db_path = tmp_path / "crud.db" database_url = f"sqlite:///{test_db_path}" run_migrations(database_url) configure_database(database_url) app = create_app() with TestClient(app) as client: # Create a box resp = client.post("/boxes", data={ "name": "Test Box", "room": "Living Room", "status": "ready", }, follow_redirects=False) assert resp.status_code in (200, 302, 303) # Verify it's there resp = client.get("/boxes") assert "Test Box" in resp.text # Create an item resp = client.post("/boxes/1/items", data={ "name": "Test Item", "quantity": "3", }, follow_redirects=False) assert resp.status_code in (200, 302, 303) # Delete the box (cascade) resp = client.post("/boxes/1/delete", follow_redirects=False) assert resp.status_code in (200, 302, 303) # Verify empty resp = client.get("/boxes") assert "Test Box" not in resp.text # --------------------------------------------------------------------------- # Production DB copy adoption # --------------------------------------------------------------------------- class TestProdDBCopyAdoption: """Verify migration works against a copy of the real production DB.""" def test_adopt_prod_copy(self, tmp_path): prod_db = Path("data/app.db") if not prod_db.exists(): pytest.skip("data/app.db not present — skipping prod copy test") copy_path = tmp_path / "prod_copy.db" shutil.copy2(prod_db, copy_path) db_url = f"sqlite:///{copy_path}" # Record row counts before eng = create_engine(db_url) with eng.begin() as conn: boxes_before = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar() items_before = conn.execute(text("SELECT COUNT(*) FROM items")).scalar() subitems_before = conn.execute(text("SELECT COUNT(*) FROM subitems")).scalar() eng.dispose() # Run migration (handles managed, unmanaged, or empty) run_migrations(db_url) # Verify version at head and data preserved eng = create_engine(db_url) with eng.begin() as conn: version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar() boxes_after = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar() items_after = conn.execute(text("SELECT COUNT(*) FROM items")).scalar() subitems_after = conn.execute(text("SELECT COUNT(*) FROM subitems")).scalar() eng.dispose() assert version == HEAD_REVISION assert boxes_after == boxes_before assert items_after == items_before assert subitems_after == subitems_before