from __future__ import annotations import sqlite3 import sys from pathlib import Path from alembic import command from alembic.config import Config from alembic.script import ScriptDirectory from alembic.util.exc import CommandError PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from app.config import get_settings POO_BASELINE_REVISION = "20260420_01_poo_baseline" EXPECTED_USER_VERSION = 1 EXPECTED_POO_TABLE_INFO = [ (0, "timestamp", "TEXT", 1, None, 1), (1, "status", "TEXT", 1, None, 0), (2, "latitude", "REAL", 1, None, 0), (3, "longitude", "REAL", 1, None, 0), ] class PooDatabaseAdoptionError(RuntimeError): """Raised when a legacy poo database does not match the expected baseline.""" def _database_path_from_url(database_url: str) -> Path: prefix = "sqlite:///" if not database_url.startswith(prefix): raise PooDatabaseAdoptionError( f"Only sqlite URLs are supported for poo DB adoption, got: {database_url}" ) return Path(database_url[len(prefix) :]) def _make_alembic_config(database_url: str) -> Config: config = Config("alembic_poo.ini") config.set_main_option("sqlalchemy.url", database_url) return config def _expected_head_revision(alembic_config: Config) -> str: script = ScriptDirectory.from_config(alembic_config) heads = script.get_heads() if len(heads) != 1: raise PooDatabaseAdoptionError( f"Expected exactly one Alembic head for poo DB, got {len(heads)}" ) return heads[0] def _is_known_revision(alembic_config: Config, revision: str) -> bool: script = ScriptDirectory.from_config(alembic_config) try: return script.get_revision(revision) is not None except CommandError: return False def _poo_table_exists(database_path: Path) -> bool: conn = sqlite3.connect(database_path) try: row = conn.execute( "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'poo_records'" ).fetchone() return row is not None finally: conn.close() def _alembic_version_table_exists(database_path: Path) -> bool: conn = sqlite3.connect(database_path) try: row = conn.execute( "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'alembic_version'" ).fetchone() return row is not None finally: conn.close() def _fetch_alembic_revision(database_path: Path) -> str: conn = sqlite3.connect(database_path) try: row = conn.execute("SELECT version_num FROM alembic_version").fetchone() if row is None: raise PooDatabaseAdoptionError("Alembic version table exists but contains no revision") return row[0] finally: conn.close() def _fetch_poo_table_info(database_path: Path) -> list[tuple]: conn = sqlite3.connect(database_path) try: return list(conn.execute("PRAGMA table_info(poo_records)")) finally: conn.close() def _fetch_user_version(database_path: Path) -> int: conn = sqlite3.connect(database_path) try: return conn.execute("PRAGMA user_version").fetchone()[0] finally: conn.close() def validate_legacy_poo_db(database_url: str) -> None: database_path = _database_path_from_url(database_url) if not database_path.exists(): raise PooDatabaseAdoptionError(f"Poo DB file does not exist: {database_path}") if not _poo_table_exists(database_path): raise PooDatabaseAdoptionError("Expected table 'poo_records' was not found in the DB") table_info = _fetch_poo_table_info(database_path) if table_info != EXPECTED_POO_TABLE_INFO: raise PooDatabaseAdoptionError("Poo table schema does not match the expected baseline") user_version = _fetch_user_version(database_path) if user_version != EXPECTED_USER_VERSION: raise PooDatabaseAdoptionError( f"Expected PRAGMA user_version = {EXPECTED_USER_VERSION}, got {user_version}" ) def validate_poo_runtime_db(database_url: str) -> None: database_path = _database_path_from_url(database_url) alembic_config = _make_alembic_config(database_url) expected_revision = _expected_head_revision(alembic_config) if not database_path.exists(): raise PooDatabaseAdoptionError( "Poo DB file was not found. Run 'python scripts/poo_db_adopt.py' first to " "initialize or adopt the poo DB before starting the app." ) if not _alembic_version_table_exists(database_path): raise PooDatabaseAdoptionError( "Poo DB exists but is not yet Alembic-managed. Run " "'python scripts/poo_db_adopt.py' first to adopt the legacy DB " "before starting the app." ) current_revision = _fetch_alembic_revision(database_path) if current_revision != expected_revision: raise PooDatabaseAdoptionError( "Poo DB revision mismatch. Refusing to start the app: " f"expected {expected_revision}, got {current_revision}" ) def adopt_or_initialize_poo_db(database_url: str) -> str: database_path = _database_path_from_url(database_url) alembic_config = _make_alembic_config(database_url) expected_revision = _expected_head_revision(alembic_config) if database_path.exists(): if _alembic_version_table_exists(database_path): current_revision = _fetch_alembic_revision(database_path) if current_revision == expected_revision: return "already_managed" if not _is_known_revision(alembic_config, current_revision): raise PooDatabaseAdoptionError( "Poo DB is already Alembic-managed but revision does not match " f"a known migration revision: got {current_revision}" ) command.upgrade(alembic_config, "head") return "upgraded" validate_legacy_poo_db(database_url) command.stamp(alembic_config, POO_BASELINE_REVISION) if POO_BASELINE_REVISION != expected_revision: command.upgrade(alembic_config, "head") return "upgraded" return "adopted" database_path.parent.mkdir(parents=True, exist_ok=True) command.upgrade(alembic_config, "head") return "initialized" def main() -> None: settings = get_settings() result = adopt_or_initialize_poo_db(settings.poo_database_url) if result == "initialized": print("Initialized a new poo DB via Alembic upgrade head.") elif result == "already_managed": print("Poo DB is already Alembic-managed at the expected baseline revision.") else: print("Validated legacy poo DB and stamped Alembic baseline successfully.") if __name__ == "__main__": main()