From 5753ad376766c06ff585e47cb9a18769dabe136d Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Sun, 14 Sep 2025 17:17:48 +0200 Subject: [PATCH] fix test teardown --- backend/tests/test_crud.py | 11 +- backend/tests/test_db.py | 73 +++++++--- backend/tests/test_db_migration.py | 209 +++++++++++++++-------------- 3 files changed, 167 insertions(+), 126 deletions(-) diff --git a/backend/tests/test_crud.py b/backend/tests/test_crud.py index 9f5bf1f..199d493 100644 --- a/backend/tests/test_crud.py +++ b/backend/tests/test_crud.py @@ -11,14 +11,19 @@ from trading_journal import crud, models @pytest.fixture -def engine() -> Engine: +def engine() -> Generator[Engine, None, None]: e = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) SQLModel.metadata.create_all(e) - return e + try: + yield e + finally: + SQLModel.metadata.drop_all(e) + SQLModel.metadata.clear() + e.dispose() @pytest.fixture @@ -41,7 +46,7 @@ def make_cycle(session, user_id: int, friendly_name: str = "Test Cycle") -> int: friendly_name=friendly_name, symbol="AAPL", underlying_currency="USD", - status="open", + status=models.CycleStatus.OPEN, start_date=datetime.now().date(), ) session.add(cycle) diff --git a/backend/tests/test_db.py b/backend/tests/test_db.py index 6d5c505..9660792 100644 --- a/backend/tests/test_db.py +++ b/backend/tests/test_db.py @@ -3,7 +3,7 @@ from contextlib import contextmanager, suppress import pytest from sqlalchemy import text -from sqlmodel import Session +from sqlmodel import Session, SQLModel from trading_journal.db import Database, create_database @@ -27,46 +27,75 @@ def session_ctx(db: Database) -> Generator[Session, None, None]: # Normal completion: advance generator to let it commit/close. with suppress(StopIteration): next(gen) + finally: + # close the generator but DO NOT dispose the engine here + gen.close() + + +@contextmanager +def database_ctx(db: Database) -> Generator[Database, None, None]: + """ + Test-scoped context manager to ensure the Database (engine) is disposed at test end. + Use this to wrap test logic that needs the same in-memory engine across multiple sessions. + """ + try: + yield db + finally: + db.dispose() + SQLModel.metadata.clear() def test_select_one_executes() -> None: db = create_database(None) # in-memory by default - with session_ctx(db) as session: - val = session.exec(text("SELECT 1")).scalar_one() + with database_ctx(db): + with session_ctx(db) as session: + val = session.exec(text("SELECT 1")).scalar_one() assert int(val) == 1 def test_in_memory_persists_across_sessions_when_using_staticpool() -> None: db = create_database(None) # in-memory with StaticPool - with session_ctx(db) as s1: - s1.exec(text("CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, val TEXT);")) - s1.exec(text("INSERT INTO t (val) VALUES (:v)").bindparams(v="hello")) - with session_ctx(db) as s2: - got = s2.exec(text("SELECT val FROM t")).scalar_one() + with database_ctx(db): + with session_ctx(db) as s1: + s1.exec( + text("CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, val TEXT);") + ) + s1.exec(text("INSERT INTO t (val) VALUES (:v)").bindparams(v="hello")) + with session_ctx(db) as s2: + got = s2.exec(text("SELECT val FROM t")).scalar_one() assert got == "hello" def test_sqlite_pragmas_applied() -> None: db = create_database(None) - # PRAGMA returns integer 1 when foreign_keys ON - with session_ctx(db) as session: - fk = session.exec(text("PRAGMA foreign_keys")).scalar_one() + with database_ctx(db): + # PRAGMA returns integer 1 when foreign_keys ON + with session_ctx(db) as session: + fk = session.exec(text("PRAGMA foreign_keys")).scalar_one() assert int(fk) == 1 def test_rollback_on_exception() -> None: db = create_database(None) db.init_db() - # Create table then insert and raise inside the same session to force rollback + with database_ctx(db): + # Create table then insert and raise inside the same session to force rollback + with pytest.raises(RuntimeError): # noqa: PT012, SIM117 + with session_ctx(db) as s: + s.exec( + text( + "CREATE TABLE IF NOT EXISTS t_rb (id INTEGER PRIMARY KEY, val TEXT);" + ) + ) + s.exec( + text("INSERT INTO t_rb (val) VALUES (:v)").bindparams( + v="will_rollback" + ) + ) + # simulate handler error -> should trigger rollback in get_session + raise RuntimeError("simulated failure") - with pytest.raises(RuntimeError): # noqa: PT012, SIM117 - with session_ctx(db) as s: - s.exec(text("CREATE TABLE IF NOT EXISTS t_rb (id INTEGER PRIMARY KEY, val TEXT);")) - s.exec(text("INSERT INTO t_rb (val) VALUES (:v)").bindparams(v="will_rollback")) - # simulate handler error -> should trigger rollback in get_session - raise RuntimeError("simulated failure") - - # New session should not see the inserted row - with session_ctx(db) as s2: - rows = list(s2.exec(text("SELECT val FROM t_rb")).scalars()) + # New session should not see the inserted row + with session_ctx(db) as s2: + rows = list(s2.exec(text("SELECT val FROM t_rb")).scalars()) assert rows == [] diff --git a/backend/tests/test_db_migration.py b/backend/tests/test_db_migration.py index 2ea7fee..1f7fc3d 100644 --- a/backend/tests/test_db_migration.py +++ b/backend/tests/test_db_migration.py @@ -1,7 +1,7 @@ import pytest from sqlalchemy import text from sqlalchemy.pool import StaticPool -from sqlmodel import create_engine +from sqlmodel import SQLModel, create_engine from trading_journal import db_migration @@ -18,107 +18,114 @@ def test_run_migrations_0_to_1(monkeypatch: pytest.MonkeyPatch) -> None: connect_args={"check_same_thread": False}, poolclass=StaticPool, ) - monkeypatch.setattr(db_migration, "LATEST_VERSION", 1) - final_version = db_migration.run_migrations(engine) - assert final_version == 1 + try: + monkeypatch.setattr(db_migration, "LATEST_VERSION", 1) + final_version = db_migration.run_migrations(engine) + assert final_version == 1 - expected_schema = { - "users": { - "id": ("INTEGER", 1, 1), - "username": ("TEXT", 1, 0), - "password_hash": ("TEXT", 1, 0), - "is_active": ("BOOLEAN", 1, 0), - }, - "cycles": { - "id": ("INTEGER", 1, 1), - "user_id": ("INTEGER", 1, 0), - "friendly_name": ("TEXT", 0, 0), - "symbol": ("TEXT", 1, 0), - "underlying_currency": ("TEXT", 1, 0), - "status": ("TEXT", 1, 0), - "funding_source": ("TEXT", 0, 0), - "capital_exposure_cents": ("INTEGER", 0, 0), - "loan_amount_cents": ("INTEGER", 0, 0), - "loan_interest_rate_bps": ("INTEGER", 0, 0), - "start_date": ("DATE", 1, 0), - "end_date": ("DATE", 0, 0), - }, - "trades": { - "id": ("INTEGER", 1, 1), - "user_id": ("INTEGER", 1, 0), - "friendly_name": ("TEXT", 0, 0), - "symbol": ("TEXT", 1, 0), - "underlying_currency": ("TEXT", 1, 0), - "trade_type": ("TEXT", 1, 0), - "trade_strategy": ("TEXT", 1, 0), - "trade_time_utc": ("DATETIME", 1, 0), - "expiry_date": ("DATE", 0, 0), - "strike_price_cents": ("INTEGER", 0, 0), - "quantity": ("INTEGER", 1, 0), - "price_cents": ("INTEGER", 1, 0), - "gross_cash_flow_cents": ("INTEGER", 1, 0), - "commission_cents": ("INTEGER", 1, 0), - "net_cash_flow_cents": ("INTEGER", 1, 0), - "cycle_id": ("INTEGER", 0, 0), - }, - } + expected_schema = { + "users": { + "id": ("INTEGER", 1, 1), + "username": ("TEXT", 1, 0), + "password_hash": ("TEXT", 1, 0), + "is_active": ("BOOLEAN", 1, 0), + }, + "cycles": { + "id": ("INTEGER", 1, 1), + "user_id": ("INTEGER", 1, 0), + "friendly_name": ("TEXT", 0, 0), + "symbol": ("TEXT", 1, 0), + "underlying_currency": ("TEXT", 1, 0), + "status": ("TEXT", 1, 0), + "funding_source": ("TEXT", 0, 0), + "capital_exposure_cents": ("INTEGER", 0, 0), + "loan_amount_cents": ("INTEGER", 0, 0), + "loan_interest_rate_bps": ("INTEGER", 0, 0), + "start_date": ("DATE", 1, 0), + "end_date": ("DATE", 0, 0), + }, + "trades": { + "id": ("INTEGER", 1, 1), + "user_id": ("INTEGER", 1, 0), + "friendly_name": ("TEXT", 0, 0), + "symbol": ("TEXT", 1, 0), + "underlying_currency": ("TEXT", 1, 0), + "trade_type": ("TEXT", 1, 0), + "trade_strategy": ("TEXT", 1, 0), + "trade_time_utc": ("DATETIME", 1, 0), + "expiry_date": ("DATE", 0, 0), + "strike_price_cents": ("INTEGER", 0, 0), + "quantity": ("INTEGER", 1, 0), + "price_cents": ("INTEGER", 1, 0), + "gross_cash_flow_cents": ("INTEGER", 1, 0), + "commission_cents": ("INTEGER", 1, 0), + "net_cash_flow_cents": ("INTEGER", 1, 0), + "cycle_id": ("INTEGER", 0, 0), + }, + } - expected_fks = { - "trades": [ - {"table": "cycles", "from": "cycle_id", "to": "id"}, - {"table": "users", "from": "user_id", "to": "id"}, - ], - "cycles": [ - {"table": "users", "from": "user_id", "to": "id"}, - ], - } + expected_fks = { + "trades": [ + {"table": "cycles", "from": "cycle_id", "to": "id"}, + {"table": "users", "from": "user_id", "to": "id"}, + ], + "cycles": [ + {"table": "users", "from": "user_id", "to": "id"}, + ], + } - with engine.connect() as conn: - # check tables exist - rows = conn.execute( - text("SELECT name FROM sqlite_master WHERE type='table'") - ).fetchall() - found_tables = {r[0] for r in rows} - assert set(expected_schema.keys()).issubset(found_tables), ( - f"missing tables: {set(expected_schema.keys()) - found_tables}" - ) - - # check user_version - uv = conn.execute(text("PRAGMA user_version")).fetchone() - assert uv is not None - assert int(uv[0]) == 1 - - # validate each table columns - for tbl_name, cols in expected_schema.items(): - info_rows = conn.execute(text(f"PRAGMA table_info({tbl_name})")).fetchall() - # map: name -> (type, notnull, pk) - actual = { - r[1]: ((r[2] or "").upper(), int(r[3]), int(r[5])) for r in info_rows - } - for colname, (exp_type, exp_notnull, exp_pk) in cols.items(): - assert colname in actual, f"{tbl_name}: missing column {colname}" - act_type, act_notnull, act_pk = actual[colname] - # compare base type (e.g. VARCHAR(13) -> VARCHAR) - if act_type: - act_base = _base_type_of(act_type) - else: - act_base = "" - assert exp_type in act_base or act_base in exp_type, ( - f"type mismatch {tbl_name}.{colname}: expected {exp_type}, got {act_base}" - ) - assert act_notnull == exp_notnull, ( - f"notnull mismatch {tbl_name}.{colname}: expected {exp_notnull}, got {act_notnull}" - ) - assert act_pk == exp_pk, ( - f"pk mismatch {tbl_name}.{colname}: expected {exp_pk}, got {act_pk}" - ) - for tbl_name, fks in expected_fks.items(): - fk_rows = conn.execute( - text(f"PRAGMA foreign_key_list('{tbl_name}')") + with engine.connect() as conn: + # check tables exist + rows = conn.execute( + text("SELECT name FROM sqlite_master WHERE type='table'") ).fetchall() - # fk_rows columns: (id, seq, table, from, to, on_update, on_delete, match) - actual_fk_list = [ - {"table": r[2], "from": r[3], "to": r[4]} for r in fk_rows - ] - for efk in fks: - assert efk in actual_fk_list, f"missing FK on {tbl_name}: {efk}" + found_tables = {r[0] for r in rows} + assert set(expected_schema.keys()).issubset(found_tables), ( + f"missing tables: {set(expected_schema.keys()) - found_tables}" + ) + + # check user_version + uv = conn.execute(text("PRAGMA user_version")).fetchone() + assert uv is not None + assert int(uv[0]) == 1 + + # validate each table columns + for tbl_name, cols in expected_schema.items(): + info_rows = conn.execute( + text(f"PRAGMA table_info({tbl_name})") + ).fetchall() + # map: name -> (type, notnull, pk) + actual = { + r[1]: ((r[2] or "").upper(), int(r[3]), int(r[5])) + for r in info_rows + } + for colname, (exp_type, exp_notnull, exp_pk) in cols.items(): + assert colname in actual, f"{tbl_name}: missing column {colname}" + act_type, act_notnull, act_pk = actual[colname] + # compare base type (e.g. VARCHAR(13) -> VARCHAR) + if act_type: + act_base = _base_type_of(act_type) + else: + act_base = "" + assert exp_type in act_base or act_base in exp_type, ( + f"type mismatch {tbl_name}.{colname}: expected {exp_type}, got {act_base}" + ) + assert act_notnull == exp_notnull, ( + f"notnull mismatch {tbl_name}.{colname}: expected {exp_notnull}, got {act_notnull}" + ) + assert act_pk == exp_pk, ( + f"pk mismatch {tbl_name}.{colname}: expected {exp_pk}, got {act_pk}" + ) + for tbl_name, fks in expected_fks.items(): + fk_rows = conn.execute( + text(f"PRAGMA foreign_key_list('{tbl_name}')") + ).fetchall() + # fk_rows columns: (id, seq, table, from, to, on_update, on_delete, match) + actual_fk_list = [ + {"table": r[2], "from": r[3], "to": r[4]} for r in fk_rows + ] + for efk in fks: + assert efk in actual_fk_list, f"missing FK on {tbl_name}: {efk}" + finally: + engine.dispose() + SQLModel.metadata.clear()