Add Alembic migration foundation
test / pytest (push) Successful in 1m34s

This commit is contained in:
2026-06-01 16:02:43 +02:00
parent c42cc2ddb6
commit 8b8bd9f38f
17 changed files with 1459 additions and 101 deletions
+4
View File
@@ -6,6 +6,7 @@ from sqlalchemy.orm import Session
from app.db import SessionLocal, configure_database
from app.main import create_app
from app.migrate import run_migrations
@pytest.fixture
@@ -13,6 +14,9 @@ def client(tmp_path: Path):
test_db_path = tmp_path / "test.db"
database_url = f"sqlite:///{test_db_path}"
# Run migration first so DB is at head before app starts.
run_migrations(database_url)
configure_database(database_url)
app = create_app()
+645
View File
@@ -0,0 +1,645 @@
"""Tests for the Alembic migration wrapper (app.migrate).
Covers:
- Fresh DB: empty → upgrade head (tables created, version at head)
- Unmanaged DB (2a): has tables + matches baseline → stamp V1 → upgrade head
- Unmanaged DB (2b): schema mismatch → fail-close, no changes
- Non-empty non-app DB (rogue tables) → unmanaged → fail-close
- Managed DB: already at head → upgrade head is a no-op
- verify_schema_is_current: pass when at head, fail-close otherwise
- verify_schema_is_current: no write side-effects (no file creation)
- init_db startup: fail-close when DB not at head, pass when at head
- Data preservation: adoption does not lose existing data
- Schema correctness: tables match the ORM model definitions
- V1_REVISION constant matches the actual revision in versions/
- _detect_db_state correctly identifies all three states
- _schema_matches_baseline checks FK, indexes, PK, types — not just column names
- CLI entry point: python -m app.migrate
"""
from pathlib import Path
import shutil
import pytest
from sqlalchemy import create_engine, inspect, text
import app.models # noqa: F401 — register models on Base.metadata
from app.db import Base, SessionLocal, configure_database
from app.migrate import (
V1_REVISION,
_detect_db_state,
run_migrations,
verify_schema_is_current,
)
from app.main import create_app
from fastapi.testclient import TestClient
@pytest.fixture()
def tmp_db_path(tmp_path):
"""Provide a temporary SQLite database path."""
return tmp_path / "test.db"
@pytest.fixture()
def tmp_db_url(tmp_db_path):
"""Provide a temporary SQLite database URL."""
return f"sqlite:///{tmp_db_path}"
# ---------------------------------------------------------------------------
# Fresh DB: empty → upgrade head
# ---------------------------------------------------------------------------
class TestFreshDBMigration:
"""Empty database gets all tables created by migration."""
def test_creates_all_three_tables(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
tables = set(inspect(eng).get_table_names())
eng.dispose()
assert "boxes" in tables
assert "items" in tables
assert "subitems" in tables
def test_creates_alembic_version_table(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
tables = set(inspect(eng).get_table_names())
eng.dispose()
assert "alembic_version" in tables
def test_version_at_head(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar()
eng.dispose()
assert version == V1_REVISION
def test_boxes_table_has_all_columns(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
columns = {col["name"] for col in inspect(eng).get_columns("boxes")}
eng.dispose()
expected = {
"id", "name", "note", "room", "status",
"image_blob", "image_mime_type", "image_width", "image_height",
"created_at", "updated_at",
}
assert columns == expected
def test_items_table_has_all_columns(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
columns = {col["name"] for col in inspect(eng).get_columns("items")}
eng.dispose()
expected = {
"id", "box_id", "name", "note", "quantity", "is_container",
"image_blob", "image_mime_type", "image_width", "image_height",
"created_at", "updated_at",
}
assert columns == expected
def test_subitems_table_has_all_columns(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
columns = {col["name"] for col in inspect(eng).get_columns("subitems")}
eng.dispose()
expected = {
"id", "parent_item_id", "name", "note", "quantity",
"image_blob", "image_mime_type", "image_width", "image_height",
"created_at", "updated_at",
}
assert columns == expected
def test_foreign_keys_exist(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
item_fks = inspect(eng).get_foreign_keys("items")
subitem_fks = inspect(eng).get_foreign_keys("subitems")
eng.dispose()
assert len(item_fks) == 1
assert item_fks[0]["constrained_columns"] == ["box_id"]
assert len(subitem_fks) == 1
assert subitem_fks[0]["constrained_columns"] == ["parent_item_id"]
def test_indexes_exist(self, tmp_db_url):
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
box_indexes = inspect(eng).get_indexes("boxes")
item_indexes = inspect(eng).get_indexes("items")
subitem_indexes = inspect(eng).get_indexes("subitems")
eng.dispose()
assert any("ix_boxes_id" in idx["name"] for idx in box_indexes)
assert any("ix_items_id" in idx["name"] for idx in item_indexes)
assert any("ix_subitems_id" in idx["name"] for idx in subitem_indexes)
# ---------------------------------------------------------------------------
# Unmanaged DB adoption — 2a: matches baseline
# ---------------------------------------------------------------------------
class TestUnmanagedDBAdoption2a:
"""Database with existing tables matching baseline gets adopted."""
def _create_old_db(self, db_url: str) -> None:
"""Simulate a pre-Alembic DB: create_all + insert data."""
eng = create_engine(db_url)
Base.metadata.create_all(bind=eng)
with eng.begin() as conn:
conn.execute(text(
"INSERT INTO boxes (name, room, status, created_at, updated_at) "
"VALUES ('Kitchen Box', 'Kitchen', 'packed', '2026-01-01 00:00:00', '2026-01-01 00:00:00')"
))
conn.execute(text(
"INSERT INTO items (box_id, name, quantity, is_container, created_at, updated_at) "
"VALUES (1, 'Plates', 4, 0, '2026-01-01 00:00:00', '2026-01-01 00:00:00')"
))
eng.dispose()
def test_stamp_and_upgrade(self, tmp_db_url):
self._create_old_db(tmp_db_url)
assert _detect_db_state(tmp_db_url) == "unmanaged"
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar()
eng.dispose()
assert version == V1_REVISION
def test_data_preserved_after_adoption(self, tmp_db_url):
self._create_old_db(tmp_db_url)
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
box_count = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar()
item_count = conn.execute(text("SELECT COUNT(*) FROM items")).scalar()
box_name = conn.execute(text("SELECT name FROM boxes WHERE id = 1")).scalar()
eng.dispose()
assert box_count == 1
assert item_count == 1
assert box_name == "Kitchen Box"
def test_no_extra_tables_created(self, tmp_db_url):
self._create_old_db(tmp_db_url)
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
tables = set(inspect(eng).get_table_names())
eng.dispose()
assert tables == {"alembic_version", "boxes", "items", "subitems"}
def test_adoption_is_idempotent(self, tmp_db_url):
"""Running run_migrations twice does not error or duplicate data."""
self._create_old_db(tmp_db_url)
run_migrations(tmp_db_url)
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
box_count = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar()
version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar()
eng.dispose()
assert box_count == 1
assert version == V1_REVISION
# ---------------------------------------------------------------------------
# Unmanaged DB — 2b: schema mismatch → fail-close
# ---------------------------------------------------------------------------
class TestUnmanagedDBMismatch2b:
"""Database with schema not matching baseline → fail-close, no changes."""
def _create_mismatched_db(self, db_url: str) -> None:
"""Create a DB that has tables but with wrong columns (missing image cols)."""
eng = create_engine(db_url)
with eng.begin() as conn:
conn.execute(text(
"CREATE TABLE boxes ("
"id INTEGER PRIMARY KEY, name TEXT NOT NULL, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
conn.execute(text(
"CREATE TABLE items ("
"id INTEGER PRIMARY KEY, box_id INTEGER NOT NULL, name TEXT NOT NULL, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
conn.execute(text(
"CREATE TABLE subitems ("
"id INTEGER PRIMARY KEY, parent_item_id INTEGER NOT NULL, name TEXT NOT NULL, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
conn.execute(text(
"INSERT INTO boxes (name, created_at, updated_at) "
"VALUES ('Bad Box', '2026-01-01 00:00:00', '2026-01-01 00:00:00')"
))
eng.dispose()
def test_fail_close_on_mismatch(self, tmp_db_url):
self._create_mismatched_db(tmp_db_url)
assert _detect_db_state(tmp_db_url) == "unmanaged"
with pytest.raises(SystemExit, match="does not match"):
run_migrations(tmp_db_url)
def test_db_unchanged_after_fail_close(self, tmp_db_url):
self._create_mismatched_db(tmp_db_url)
with pytest.raises(SystemExit):
run_migrations(tmp_db_url)
# DB should be completely unchanged
eng = create_engine(tmp_db_url)
tables = set(inspect(eng).get_table_names())
assert "alembic_version" not in tables
with eng.begin() as conn:
count = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar()
eng.dispose()
assert count == 1 # original data still there
def test_extra_table_causes_fail_close(self, tmp_db_url):
"""A DB with the correct tables PLUS an extra one should fail."""
eng = create_engine(tmp_db_url)
Base.metadata.create_all(bind=eng)
with eng.begin() as conn:
conn.execute(text("CREATE TABLE rogue_table (id INTEGER PRIMARY KEY)"))
eng.dispose()
with pytest.raises(SystemExit, match="does not match"):
run_migrations(tmp_db_url)
def test_missing_fk_causes_fail_close(self, tmp_db_url):
"""Tables with correct columns but missing FK should fail."""
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
conn.execute(text(
"CREATE TABLE boxes (id INTEGER PRIMARY KEY, name VARCHAR(100) NOT NULL, "
"note TEXT, room VARCHAR(100), status VARCHAR(50), "
"image_blob BLOB, image_mime_type VARCHAR(50), "
"image_width INTEGER, image_height INTEGER, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
conn.execute(text(
"CREATE TABLE items (id INTEGER PRIMARY KEY, box_id INTEGER NOT NULL, "
"name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, "
"is_container BOOLEAN NOT NULL DEFAULT 0, "
"image_blob BLOB, image_mime_type VARCHAR(50), "
"image_width INTEGER, image_height INTEGER, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
conn.execute(text(
"CREATE TABLE subitems (id INTEGER PRIMARY KEY, parent_item_id INTEGER NOT NULL, "
"name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, "
"image_blob BLOB, image_mime_type VARCHAR(50), "
"image_width INTEGER, image_height INTEGER, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
eng.dispose()
with pytest.raises(SystemExit, match="does not match"):
run_migrations(tmp_db_url)
def test_missing_index_causes_fail_close(self, tmp_db_url):
"""Tables with correct columns and FK but missing index should fail."""
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
conn.execute(text(
"CREATE TABLE boxes (id INTEGER PRIMARY KEY, name VARCHAR(100) NOT NULL, "
"note TEXT, room VARCHAR(100), status VARCHAR(50), "
"image_blob BLOB, image_mime_type VARCHAR(50), "
"image_width INTEGER, image_height INTEGER, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL)"
))
conn.execute(text(
"CREATE TABLE items (id INTEGER PRIMARY KEY, box_id INTEGER NOT NULL, "
"name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, "
"is_container BOOLEAN NOT NULL DEFAULT 0, "
"image_blob BLOB, image_mime_type VARCHAR(50), "
"image_width INTEGER, image_height INTEGER, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, "
"FOREIGN KEY(box_id) REFERENCES boxes(id) ON DELETE CASCADE)"
))
conn.execute(text(
"CREATE TABLE subitems (id INTEGER PRIMARY KEY, parent_item_id INTEGER NOT NULL, "
"name VARCHAR(100) NOT NULL, note TEXT, quantity INTEGER, "
"image_blob BLOB, image_mime_type VARCHAR(50), "
"image_width INTEGER, image_height INTEGER, "
"created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, "
"FOREIGN KEY(parent_item_id) REFERENCES items(id) ON DELETE CASCADE)"
))
# No indexes created — should fail
eng.dispose()
with pytest.raises(SystemExit, match="does not match"):
run_migrations(tmp_db_url)
# ---------------------------------------------------------------------------
# Non-empty non-app DB (rogue tables) — treated as unmanaged, fail-close
# ---------------------------------------------------------------------------
class TestRogueDatabase:
"""A DB with unrelated tables must be treated as unmanaged and fail-close."""
def test_rogue_table_detected_as_unmanaged(self, tmp_db_url):
"""A DB with only rogue_table should be 'unmanaged', not 'empty'."""
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
conn.execute(text("CREATE TABLE rogue_table (id INTEGER PRIMARY KEY)"))
eng.dispose()
assert _detect_db_state(tmp_db_url) == "unmanaged"
def test_rogue_table_migration_fails_closed(self, tmp_db_url):
"""Migration should fail-close, NOT create app tables in rogue DB."""
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
conn.execute(text("CREATE TABLE rogue_table (id INTEGER PRIMARY KEY)"))
eng.dispose()
with pytest.raises(SystemExit, match="does not match"):
run_migrations(tmp_db_url)
# Verify no app tables were created
eng = create_engine(tmp_db_url)
tables = set(inspect(eng).get_table_names())
eng.dispose()
assert tables == {"rogue_table"} # only the original rogue table
assert "boxes" not in tables
assert "alembic_version" not in tables
# ---------------------------------------------------------------------------
# Managed DB (already at head)
# ---------------------------------------------------------------------------
class TestManagedDBMigration:
"""Database already under Alembic control: upgrade head is a no-op."""
def test_upgrade_head_is_noop(self, tmp_db_url):
run_migrations(tmp_db_url) # first run: creates tables
assert _detect_db_state(tmp_db_url) == "managed"
run_migrations(tmp_db_url) # second run: should be a no-op
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar()
eng.dispose()
assert version == V1_REVISION
# ---------------------------------------------------------------------------
# _detect_db_state
# ---------------------------------------------------------------------------
class TestDetectDBState:
def test_empty_db(self, tmp_db_url):
assert _detect_db_state(tmp_db_url) == "empty"
def test_unmanaged_db(self, tmp_db_url):
eng = create_engine(tmp_db_url)
Base.metadata.create_all(bind=eng)
eng.dispose()
assert _detect_db_state(tmp_db_url) == "unmanaged"
def test_managed_db(self, tmp_db_url):
run_migrations(tmp_db_url)
assert _detect_db_state(tmp_db_url) == "managed"
def test_rogue_table_is_unmanaged(self, tmp_db_url):
"""Any DB with tables but no alembic_version is 'unmanaged'."""
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
conn.execute(text("CREATE TABLE something (id INTEGER)"))
eng.dispose()
assert _detect_db_state(tmp_db_url) == "unmanaged"
# ---------------------------------------------------------------------------
# verify_schema_is_current (read-only startup check)
# ---------------------------------------------------------------------------
class TestVerifySchemaIsCurrent:
"""verify_schema_is_current is read-only — only checks, never modifies."""
def test_passes_when_at_head(self, tmp_db_url):
run_migrations(tmp_db_url)
# Should not raise
verify_schema_is_current(tmp_db_url)
def test_fails_on_empty_db(self, tmp_db_url):
with pytest.raises(RuntimeError, match="empty"):
verify_schema_is_current(tmp_db_url)
def test_fails_on_unmanaged_db(self, tmp_db_url):
eng = create_engine(tmp_db_url)
Base.metadata.create_all(bind=eng)
eng.dispose()
with pytest.raises(RuntimeError, match="alembic_version"):
verify_schema_is_current(tmp_db_url)
def test_fails_on_wrong_revision(self, tmp_db_url):
"""Stamp at an old/fake revision, then verify should fail."""
run_migrations(tmp_db_url)
eng = create_engine(tmp_db_url)
with eng.begin() as conn:
conn.execute(text("DELETE FROM alembic_version"))
conn.execute(text("INSERT INTO alembic_version VALUES ('fake_old_rev')"))
eng.dispose()
with pytest.raises(RuntimeError, match="fake_old_rev"):
verify_schema_is_current(tmp_db_url)
def test_does_not_modify_db(self, tmp_db_url):
"""Calling verify on an empty DB must not create any tables."""
with pytest.raises(RuntimeError):
verify_schema_is_current(tmp_db_url)
eng = create_engine(tmp_db_url)
tables = set(inspect(eng).get_table_names())
eng.dispose()
assert tables == set() # still empty
def test_no_file_creation_for_missing_sqlite(self, tmp_path):
"""verify_schema_is_current must NOT create a missing SQLite file."""
missing_path = tmp_path / "nonexistent" / "missing.db"
db_url = f"sqlite:///{missing_path}"
with pytest.raises(RuntimeError, match="does not exist"):
verify_schema_is_current(db_url)
assert not missing_path.exists()
assert not missing_path.parent.exists()
# ---------------------------------------------------------------------------
# V1_REVISION constant
# ---------------------------------------------------------------------------
class TestV1RevisionConstant:
def test_revision_file_exists(self):
"""V1_REVISION must point to an actual migration file."""
versions_dir = Path(__file__).resolve().parent.parent / "migrations" / "versions"
revision_files = list(versions_dir.glob(f"*{V1_REVISION}*.py"))
assert len(revision_files) == 1, (
f"Expected exactly one file matching revision {V1_REVISION} "
f"in {versions_dir}, found: {revision_files}"
)
def test_revision_matches_baseline(self):
"""V1_REVISION must be the baseline (no down_revision)."""
import importlib.util
versions_dir = Path(__file__).resolve().parent.parent / "migrations" / "versions"
revision_files = list(versions_dir.glob(f"*{V1_REVISION}*.py"))
assert len(revision_files) == 1
spec = importlib.util.spec_from_file_location("v1_migration", revision_files[0])
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
assert mod.down_revision is None, "V1 baseline must have down_revision = None"
assert mod.revision == V1_REVISION
# ---------------------------------------------------------------------------
# Integration: init_db startup verification
# ---------------------------------------------------------------------------
class TestInitDBStartupVerify:
"""init_db (called by create_app lifespan) verifies schema at startup."""
def test_app_starts_when_db_at_head(self, tmp_path):
"""App starts normally when DB has been migrated to head."""
test_db_path = tmp_path / "integration.db"
database_url = f"sqlite:///{test_db_path}"
run_migrations(database_url)
configure_database(database_url)
app = create_app()
with TestClient(app) as client:
response = client.get("/boxes", follow_redirects=False)
assert response.status_code == 200
def test_init_db_fails_on_empty_db(self, tmp_path):
"""init_db raises RuntimeError on empty DB — app must not start."""
test_db_path = tmp_path / "empty.db"
database_url = f"sqlite:///{test_db_path}"
configure_database(database_url)
app = create_app()
with pytest.raises(RuntimeError, match="empty"):
with TestClient(app):
pass
def test_init_db_fails_on_unmanaged_db(self, tmp_path):
"""init_db raises RuntimeError on unmanaged DB — app must not start."""
test_db_path = tmp_path / "unmanaged.db"
database_url = f"sqlite:///{test_db_path}"
# Create tables the old way (no alembic_version)
eng = create_engine(database_url)
Base.metadata.create_all(bind=eng)
eng.dispose()
configure_database(database_url)
app = create_app()
with pytest.raises(RuntimeError, match="alembic_version"):
with TestClient(app):
pass
def test_full_crud_after_migration(self, tmp_path):
"""Full CRUD works when DB is migrated first, then app starts."""
test_db_path = tmp_path / "crud.db"
database_url = f"sqlite:///{test_db_path}"
run_migrations(database_url)
configure_database(database_url)
app = create_app()
with TestClient(app) as client:
# Create a box
resp = client.post("/boxes", data={
"name": "Test Box",
"room": "Living Room",
"status": "ready",
}, follow_redirects=False)
assert resp.status_code in (200, 302, 303)
# Verify it's there
resp = client.get("/boxes")
assert "Test Box" in resp.text
# Create an item
resp = client.post("/boxes/1/items", data={
"name": "Test Item",
"quantity": "3",
}, follow_redirects=False)
assert resp.status_code in (200, 302, 303)
# Delete the box (cascade)
resp = client.post("/boxes/1/delete", follow_redirects=False)
assert resp.status_code in (200, 302, 303)
# Verify empty
resp = client.get("/boxes")
assert "Test Box" not in resp.text
# ---------------------------------------------------------------------------
# Production DB copy adoption
# ---------------------------------------------------------------------------
class TestProdDBCopyAdoption:
"""Verify migration works against a copy of the real production DB."""
def test_adopt_prod_copy(self, tmp_path):
prod_db = Path("data/app.db")
if not prod_db.exists():
pytest.skip("data/app.db not present — skipping prod copy test")
copy_path = tmp_path / "prod_copy.db"
shutil.copy2(prod_db, copy_path)
db_url = f"sqlite:///{copy_path}"
# Record row counts before
eng = create_engine(db_url)
with eng.begin() as conn:
boxes_before = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar()
items_before = conn.execute(text("SELECT COUNT(*) FROM items")).scalar()
subitems_before = conn.execute(text("SELECT COUNT(*) FROM subitems")).scalar()
eng.dispose()
# Run migration (handles managed, unmanaged, or empty)
run_migrations(db_url)
# Verify version at head and data preserved
eng = create_engine(db_url)
with eng.begin() as conn:
version = conn.execute(text("SELECT version_num FROM alembic_version")).scalar()
boxes_after = conn.execute(text("SELECT COUNT(*) FROM boxes")).scalar()
items_after = conn.execute(text("SELECT COUNT(*) FROM items")).scalar()
subitems_after = conn.execute(text("SELECT COUNT(*) FROM subitems")).scalar()
eng.dispose()
assert version == V1_REVISION
assert boxes_after == boxes_before
assert items_after == items_before
assert subitems_after == subitems_before