change adoption to separate step
This commit is contained in:
+31
-3
@@ -6,6 +6,8 @@ from pathlib import Path
|
||||
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from alembic.script import ScriptDirectory
|
||||
from alembic.util.exc import CommandError
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
@@ -35,6 +37,24 @@ def _make_alembic_config(database_url: str) -> Config:
|
||||
return config
|
||||
|
||||
|
||||
def _expected_head_revision(alembic_config: Config) -> str:
|
||||
script = ScriptDirectory.from_config(alembic_config)
|
||||
heads = script.get_heads()
|
||||
if len(heads) != 1:
|
||||
raise AppDatabaseAdoptionError(
|
||||
f"Expected exactly one Alembic head for app DB, got {len(heads)}"
|
||||
)
|
||||
return heads[0]
|
||||
|
||||
|
||||
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
|
||||
script = ScriptDirectory.from_config(alembic_config)
|
||||
try:
|
||||
return script.get_revision(revision) is not None
|
||||
except CommandError:
|
||||
return False
|
||||
|
||||
|
||||
def _alembic_version_table_exists(database_path: Path) -> bool:
|
||||
conn = sqlite3.connect(database_path)
|
||||
try:
|
||||
@@ -75,6 +95,8 @@ def _list_user_tables(database_path: Path) -> list[str]:
|
||||
|
||||
def validate_app_runtime_db(database_url: str) -> None:
|
||||
database_path = _database_path_from_url(database_url)
|
||||
alembic_config = _make_alembic_config(database_url)
|
||||
expected_revision = _expected_head_revision(alembic_config)
|
||||
if not database_path.exists():
|
||||
raise AppDatabaseAdoptionError(
|
||||
"App DB file was not found. Run 'python scripts/app_db_adopt.py' first to "
|
||||
@@ -88,22 +110,28 @@ def validate_app_runtime_db(database_url: str) -> None:
|
||||
)
|
||||
|
||||
current_revision = _fetch_alembic_revision(database_path)
|
||||
if current_revision != APP_BASELINE_REVISION:
|
||||
if current_revision != expected_revision:
|
||||
raise AppDatabaseAdoptionError(
|
||||
"App DB revision mismatch. Refusing to start the app: "
|
||||
f"expected {APP_BASELINE_REVISION}, got {current_revision}"
|
||||
f"expected {expected_revision}, got {current_revision}"
|
||||
)
|
||||
|
||||
|
||||
def adopt_or_initialize_app_db(database_url: str) -> str:
|
||||
database_path = _database_path_from_url(database_url)
|
||||
alembic_config = _make_alembic_config(database_url)
|
||||
expected_revision = _expected_head_revision(alembic_config)
|
||||
|
||||
if database_path.exists():
|
||||
if _alembic_version_table_exists(database_path):
|
||||
current_revision = _fetch_alembic_revision(database_path)
|
||||
if current_revision == APP_BASELINE_REVISION:
|
||||
if current_revision == expected_revision:
|
||||
return "already_managed"
|
||||
if not _is_known_revision(alembic_config, current_revision):
|
||||
raise AppDatabaseAdoptionError(
|
||||
"App DB is already Alembic-managed but revision does not match "
|
||||
f"a known migration revision: got {current_revision}"
|
||||
)
|
||||
command.upgrade(alembic_config, "head")
|
||||
return "upgraded"
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@ from pathlib import Path
|
||||
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from alembic.script import ScriptDirectory
|
||||
from alembic.util.exc import CommandError
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
@@ -43,6 +45,24 @@ def _make_alembic_config(database_url: str) -> Config:
|
||||
return config
|
||||
|
||||
|
||||
def _expected_head_revision(alembic_config: Config) -> str:
|
||||
script = ScriptDirectory.from_config(alembic_config)
|
||||
heads = script.get_heads()
|
||||
if len(heads) != 1:
|
||||
raise LocationDatabaseAdoptionError(
|
||||
f"Expected exactly one Alembic head for location DB, got {len(heads)}"
|
||||
)
|
||||
return heads[0]
|
||||
|
||||
|
||||
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
|
||||
script = ScriptDirectory.from_config(alembic_config)
|
||||
try:
|
||||
return script.get_revision(revision) is not None
|
||||
except CommandError:
|
||||
return False
|
||||
|
||||
|
||||
def _location_table_exists(database_path: Path) -> bool:
|
||||
conn = sqlite3.connect(database_path)
|
||||
try:
|
||||
@@ -117,6 +137,8 @@ def validate_legacy_location_db(database_url: str) -> None:
|
||||
|
||||
def validate_location_runtime_db(database_url: str) -> None:
|
||||
database_path = _database_path_from_url(database_url)
|
||||
alembic_config = _make_alembic_config(database_url)
|
||||
expected_revision = _expected_head_revision(alembic_config)
|
||||
if not database_path.exists():
|
||||
raise LocationDatabaseAdoptionError(
|
||||
"Location DB file was not found. Run 'python scripts/location_db_adopt.py' "
|
||||
@@ -131,30 +153,36 @@ def validate_location_runtime_db(database_url: str) -> None:
|
||||
)
|
||||
|
||||
current_revision = _fetch_alembic_revision(database_path)
|
||||
if current_revision != LOCATION_BASELINE_REVISION:
|
||||
if current_revision != expected_revision:
|
||||
raise LocationDatabaseAdoptionError(
|
||||
"Location DB revision mismatch. Refusing to start the app: "
|
||||
f"expected {LOCATION_BASELINE_REVISION}, got {current_revision}"
|
||||
f"expected {expected_revision}, got {current_revision}"
|
||||
)
|
||||
|
||||
|
||||
def adopt_or_initialize_location_db(database_url: str) -> str:
|
||||
database_path = _database_path_from_url(database_url)
|
||||
alembic_config = _make_alembic_config(database_url)
|
||||
expected_revision = _expected_head_revision(alembic_config)
|
||||
|
||||
if database_path.exists():
|
||||
if _alembic_version_table_exists(database_path):
|
||||
current_revision = _fetch_alembic_revision(database_path)
|
||||
if current_revision != LOCATION_BASELINE_REVISION:
|
||||
if current_revision == expected_revision:
|
||||
return "already_managed"
|
||||
if not _is_known_revision(alembic_config, current_revision):
|
||||
raise LocationDatabaseAdoptionError(
|
||||
"Location DB is already Alembic-managed but revision does not match "
|
||||
f"the expected baseline: expected {LOCATION_BASELINE_REVISION}, "
|
||||
f"got {current_revision}"
|
||||
f"a known migration revision: got {current_revision}"
|
||||
)
|
||||
return "already_managed"
|
||||
command.upgrade(alembic_config, "head")
|
||||
return "upgraded"
|
||||
|
||||
validate_legacy_location_db(database_url)
|
||||
command.stamp(alembic_config, LOCATION_BASELINE_REVISION)
|
||||
if LOCATION_BASELINE_REVISION != expected_revision:
|
||||
command.upgrade(alembic_config, "head")
|
||||
return "upgraded"
|
||||
return "adopted"
|
||||
|
||||
database_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
+34
-6
@@ -6,6 +6,8 @@ from pathlib import Path
|
||||
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from alembic.script import ScriptDirectory
|
||||
from alembic.util.exc import CommandError
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
@@ -42,6 +44,24 @@ def _make_alembic_config(database_url: str) -> Config:
|
||||
return config
|
||||
|
||||
|
||||
def _expected_head_revision(alembic_config: Config) -> str:
|
||||
script = ScriptDirectory.from_config(alembic_config)
|
||||
heads = script.get_heads()
|
||||
if len(heads) != 1:
|
||||
raise PooDatabaseAdoptionError(
|
||||
f"Expected exactly one Alembic head for poo DB, got {len(heads)}"
|
||||
)
|
||||
return heads[0]
|
||||
|
||||
|
||||
def _is_known_revision(alembic_config: Config, revision: str) -> bool:
|
||||
script = ScriptDirectory.from_config(alembic_config)
|
||||
try:
|
||||
return script.get_revision(revision) is not None
|
||||
except CommandError:
|
||||
return False
|
||||
|
||||
|
||||
def _poo_table_exists(database_path: Path) -> bool:
|
||||
conn = sqlite3.connect(database_path)
|
||||
try:
|
||||
@@ -112,6 +132,8 @@ def validate_legacy_poo_db(database_url: str) -> None:
|
||||
|
||||
def validate_poo_runtime_db(database_url: str) -> None:
|
||||
database_path = _database_path_from_url(database_url)
|
||||
alembic_config = _make_alembic_config(database_url)
|
||||
expected_revision = _expected_head_revision(alembic_config)
|
||||
if not database_path.exists():
|
||||
raise PooDatabaseAdoptionError(
|
||||
"Poo DB file was not found. Run 'python scripts/poo_db_adopt.py' first to "
|
||||
@@ -126,30 +148,36 @@ def validate_poo_runtime_db(database_url: str) -> None:
|
||||
)
|
||||
|
||||
current_revision = _fetch_alembic_revision(database_path)
|
||||
if current_revision != POO_BASELINE_REVISION:
|
||||
if current_revision != expected_revision:
|
||||
raise PooDatabaseAdoptionError(
|
||||
"Poo DB revision mismatch. Refusing to start the app: "
|
||||
f"expected {POO_BASELINE_REVISION}, got {current_revision}"
|
||||
f"expected {expected_revision}, got {current_revision}"
|
||||
)
|
||||
|
||||
|
||||
def adopt_or_initialize_poo_db(database_url: str) -> str:
|
||||
database_path = _database_path_from_url(database_url)
|
||||
alembic_config = _make_alembic_config(database_url)
|
||||
expected_revision = _expected_head_revision(alembic_config)
|
||||
|
||||
if database_path.exists():
|
||||
if _alembic_version_table_exists(database_path):
|
||||
current_revision = _fetch_alembic_revision(database_path)
|
||||
if current_revision != POO_BASELINE_REVISION:
|
||||
if current_revision == expected_revision:
|
||||
return "already_managed"
|
||||
if not _is_known_revision(alembic_config, current_revision):
|
||||
raise PooDatabaseAdoptionError(
|
||||
"Poo DB is already Alembic-managed but revision does not match "
|
||||
f"the expected baseline: expected {POO_BASELINE_REVISION}, "
|
||||
f"got {current_revision}"
|
||||
f"a known migration revision: got {current_revision}"
|
||||
)
|
||||
return "already_managed"
|
||||
command.upgrade(alembic_config, "head")
|
||||
return "upgraded"
|
||||
|
||||
validate_legacy_poo_db(database_url)
|
||||
command.stamp(alembic_config, POO_BASELINE_REVISION)
|
||||
if POO_BASELINE_REVISION != expected_revision:
|
||||
command.upgrade(alembic_config, "head")
|
||||
return "upgraded"
|
||||
return "adopted"
|
||||
|
||||
database_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.config import get_settings
|
||||
from scripts.app_db_adopt import adopt_or_initialize_app_db
|
||||
from scripts.location_db_adopt import adopt_or_initialize_location_db
|
||||
from scripts.poo_db_adopt import adopt_or_initialize_poo_db
|
||||
|
||||
|
||||
def run_all_migrations() -> dict[str, str]:
|
||||
settings = get_settings()
|
||||
return {
|
||||
"app": adopt_or_initialize_app_db(settings.app_database_url),
|
||||
"location": adopt_or_initialize_location_db(settings.location_database_url),
|
||||
"poo": adopt_or_initialize_poo_db(settings.poo_database_url),
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
results = run_all_migrations()
|
||||
for database_name, result in results.items():
|
||||
print(f"{database_name}: {result}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user