fix test teardown
All checks were successful
Backend CI / unit-test (push) Successful in 24s

This commit is contained in:
2025-09-14 17:17:48 +02:00
parent 1d215c8032
commit 5753ad3767
3 changed files with 167 additions and 126 deletions

View File

@@ -11,14 +11,19 @@ from trading_journal import crud, models
@pytest.fixture @pytest.fixture
def engine() -> Engine: def engine() -> Generator[Engine, None, None]:
e = create_engine( e = create_engine(
"sqlite:///:memory:", "sqlite:///:memory:",
connect_args={"check_same_thread": False}, connect_args={"check_same_thread": False},
poolclass=StaticPool, poolclass=StaticPool,
) )
SQLModel.metadata.create_all(e) SQLModel.metadata.create_all(e)
return e try:
yield e
finally:
SQLModel.metadata.drop_all(e)
SQLModel.metadata.clear()
e.dispose()
@pytest.fixture @pytest.fixture
@@ -41,7 +46,7 @@ def make_cycle(session, user_id: int, friendly_name: str = "Test Cycle") -> int:
friendly_name=friendly_name, friendly_name=friendly_name,
symbol="AAPL", symbol="AAPL",
underlying_currency="USD", underlying_currency="USD",
status="open", status=models.CycleStatus.OPEN,
start_date=datetime.now().date(), start_date=datetime.now().date(),
) )
session.add(cycle) session.add(cycle)

View File

@@ -3,7 +3,7 @@ from contextlib import contextmanager, suppress
import pytest import pytest
from sqlalchemy import text from sqlalchemy import text
from sqlmodel import Session from sqlmodel import Session, SQLModel
from trading_journal.db import Database, create_database from trading_journal.db import Database, create_database
@@ -27,46 +27,75 @@ def session_ctx(db: Database) -> Generator[Session, None, None]:
# Normal completion: advance generator to let it commit/close. # Normal completion: advance generator to let it commit/close.
with suppress(StopIteration): with suppress(StopIteration):
next(gen) next(gen)
finally:
# close the generator but DO NOT dispose the engine here
gen.close()
@contextmanager
def database_ctx(db: Database) -> Generator[Database, None, None]:
"""
Test-scoped context manager to ensure the Database (engine) is disposed at test end.
Use this to wrap test logic that needs the same in-memory engine across multiple sessions.
"""
try:
yield db
finally:
db.dispose()
SQLModel.metadata.clear()
def test_select_one_executes() -> None: def test_select_one_executes() -> None:
db = create_database(None) # in-memory by default db = create_database(None) # in-memory by default
with session_ctx(db) as session: with database_ctx(db):
val = session.exec(text("SELECT 1")).scalar_one() with session_ctx(db) as session:
val = session.exec(text("SELECT 1")).scalar_one()
assert int(val) == 1 assert int(val) == 1
def test_in_memory_persists_across_sessions_when_using_staticpool() -> None: def test_in_memory_persists_across_sessions_when_using_staticpool() -> None:
db = create_database(None) # in-memory with StaticPool db = create_database(None) # in-memory with StaticPool
with session_ctx(db) as s1: with database_ctx(db):
s1.exec(text("CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, val TEXT);")) with session_ctx(db) as s1:
s1.exec(text("INSERT INTO t (val) VALUES (:v)").bindparams(v="hello")) s1.exec(
with session_ctx(db) as s2: text("CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, val TEXT);")
got = s2.exec(text("SELECT val FROM t")).scalar_one() )
s1.exec(text("INSERT INTO t (val) VALUES (:v)").bindparams(v="hello"))
with session_ctx(db) as s2:
got = s2.exec(text("SELECT val FROM t")).scalar_one()
assert got == "hello" assert got == "hello"
def test_sqlite_pragmas_applied() -> None: def test_sqlite_pragmas_applied() -> None:
db = create_database(None) db = create_database(None)
# PRAGMA returns integer 1 when foreign_keys ON with database_ctx(db):
with session_ctx(db) as session: # PRAGMA returns integer 1 when foreign_keys ON
fk = session.exec(text("PRAGMA foreign_keys")).scalar_one() with session_ctx(db) as session:
fk = session.exec(text("PRAGMA foreign_keys")).scalar_one()
assert int(fk) == 1 assert int(fk) == 1
def test_rollback_on_exception() -> None: def test_rollback_on_exception() -> None:
db = create_database(None) db = create_database(None)
db.init_db() db.init_db()
# Create table then insert and raise inside the same session to force rollback with database_ctx(db):
# Create table then insert and raise inside the same session to force rollback
with pytest.raises(RuntimeError): # noqa: PT012, SIM117
with session_ctx(db) as s:
s.exec(
text(
"CREATE TABLE IF NOT EXISTS t_rb (id INTEGER PRIMARY KEY, val TEXT);"
)
)
s.exec(
text("INSERT INTO t_rb (val) VALUES (:v)").bindparams(
v="will_rollback"
)
)
# simulate handler error -> should trigger rollback in get_session
raise RuntimeError("simulated failure")
with pytest.raises(RuntimeError): # noqa: PT012, SIM117 # New session should not see the inserted row
with session_ctx(db) as s: with session_ctx(db) as s2:
s.exec(text("CREATE TABLE IF NOT EXISTS t_rb (id INTEGER PRIMARY KEY, val TEXT);")) rows = list(s2.exec(text("SELECT val FROM t_rb")).scalars())
s.exec(text("INSERT INTO t_rb (val) VALUES (:v)").bindparams(v="will_rollback"))
# simulate handler error -> should trigger rollback in get_session
raise RuntimeError("simulated failure")
# New session should not see the inserted row
with session_ctx(db) as s2:
rows = list(s2.exec(text("SELECT val FROM t_rb")).scalars())
assert rows == [] assert rows == []

View File

@@ -1,7 +1,7 @@
import pytest import pytest
from sqlalchemy import text from sqlalchemy import text
from sqlalchemy.pool import StaticPool from sqlalchemy.pool import StaticPool
from sqlmodel import create_engine from sqlmodel import SQLModel, create_engine
from trading_journal import db_migration from trading_journal import db_migration
@@ -18,107 +18,114 @@ def test_run_migrations_0_to_1(monkeypatch: pytest.MonkeyPatch) -> None:
connect_args={"check_same_thread": False}, connect_args={"check_same_thread": False},
poolclass=StaticPool, poolclass=StaticPool,
) )
monkeypatch.setattr(db_migration, "LATEST_VERSION", 1) try:
final_version = db_migration.run_migrations(engine) monkeypatch.setattr(db_migration, "LATEST_VERSION", 1)
assert final_version == 1 final_version = db_migration.run_migrations(engine)
assert final_version == 1
expected_schema = { expected_schema = {
"users": { "users": {
"id": ("INTEGER", 1, 1), "id": ("INTEGER", 1, 1),
"username": ("TEXT", 1, 0), "username": ("TEXT", 1, 0),
"password_hash": ("TEXT", 1, 0), "password_hash": ("TEXT", 1, 0),
"is_active": ("BOOLEAN", 1, 0), "is_active": ("BOOLEAN", 1, 0),
}, },
"cycles": { "cycles": {
"id": ("INTEGER", 1, 1), "id": ("INTEGER", 1, 1),
"user_id": ("INTEGER", 1, 0), "user_id": ("INTEGER", 1, 0),
"friendly_name": ("TEXT", 0, 0), "friendly_name": ("TEXT", 0, 0),
"symbol": ("TEXT", 1, 0), "symbol": ("TEXT", 1, 0),
"underlying_currency": ("TEXT", 1, 0), "underlying_currency": ("TEXT", 1, 0),
"status": ("TEXT", 1, 0), "status": ("TEXT", 1, 0),
"funding_source": ("TEXT", 0, 0), "funding_source": ("TEXT", 0, 0),
"capital_exposure_cents": ("INTEGER", 0, 0), "capital_exposure_cents": ("INTEGER", 0, 0),
"loan_amount_cents": ("INTEGER", 0, 0), "loan_amount_cents": ("INTEGER", 0, 0),
"loan_interest_rate_bps": ("INTEGER", 0, 0), "loan_interest_rate_bps": ("INTEGER", 0, 0),
"start_date": ("DATE", 1, 0), "start_date": ("DATE", 1, 0),
"end_date": ("DATE", 0, 0), "end_date": ("DATE", 0, 0),
}, },
"trades": { "trades": {
"id": ("INTEGER", 1, 1), "id": ("INTEGER", 1, 1),
"user_id": ("INTEGER", 1, 0), "user_id": ("INTEGER", 1, 0),
"friendly_name": ("TEXT", 0, 0), "friendly_name": ("TEXT", 0, 0),
"symbol": ("TEXT", 1, 0), "symbol": ("TEXT", 1, 0),
"underlying_currency": ("TEXT", 1, 0), "underlying_currency": ("TEXT", 1, 0),
"trade_type": ("TEXT", 1, 0), "trade_type": ("TEXT", 1, 0),
"trade_strategy": ("TEXT", 1, 0), "trade_strategy": ("TEXT", 1, 0),
"trade_time_utc": ("DATETIME", 1, 0), "trade_time_utc": ("DATETIME", 1, 0),
"expiry_date": ("DATE", 0, 0), "expiry_date": ("DATE", 0, 0),
"strike_price_cents": ("INTEGER", 0, 0), "strike_price_cents": ("INTEGER", 0, 0),
"quantity": ("INTEGER", 1, 0), "quantity": ("INTEGER", 1, 0),
"price_cents": ("INTEGER", 1, 0), "price_cents": ("INTEGER", 1, 0),
"gross_cash_flow_cents": ("INTEGER", 1, 0), "gross_cash_flow_cents": ("INTEGER", 1, 0),
"commission_cents": ("INTEGER", 1, 0), "commission_cents": ("INTEGER", 1, 0),
"net_cash_flow_cents": ("INTEGER", 1, 0), "net_cash_flow_cents": ("INTEGER", 1, 0),
"cycle_id": ("INTEGER", 0, 0), "cycle_id": ("INTEGER", 0, 0),
}, },
} }
expected_fks = { expected_fks = {
"trades": [ "trades": [
{"table": "cycles", "from": "cycle_id", "to": "id"}, {"table": "cycles", "from": "cycle_id", "to": "id"},
{"table": "users", "from": "user_id", "to": "id"}, {"table": "users", "from": "user_id", "to": "id"},
], ],
"cycles": [ "cycles": [
{"table": "users", "from": "user_id", "to": "id"}, {"table": "users", "from": "user_id", "to": "id"},
], ],
} }
with engine.connect() as conn: with engine.connect() as conn:
# check tables exist # check tables exist
rows = conn.execute( rows = conn.execute(
text("SELECT name FROM sqlite_master WHERE type='table'") text("SELECT name FROM sqlite_master WHERE type='table'")
).fetchall()
found_tables = {r[0] for r in rows}
assert set(expected_schema.keys()).issubset(found_tables), (
f"missing tables: {set(expected_schema.keys()) - found_tables}"
)
# check user_version
uv = conn.execute(text("PRAGMA user_version")).fetchone()
assert uv is not None
assert int(uv[0]) == 1
# validate each table columns
for tbl_name, cols in expected_schema.items():
info_rows = conn.execute(text(f"PRAGMA table_info({tbl_name})")).fetchall()
# map: name -> (type, notnull, pk)
actual = {
r[1]: ((r[2] or "").upper(), int(r[3]), int(r[5])) for r in info_rows
}
for colname, (exp_type, exp_notnull, exp_pk) in cols.items():
assert colname in actual, f"{tbl_name}: missing column {colname}"
act_type, act_notnull, act_pk = actual[colname]
# compare base type (e.g. VARCHAR(13) -> VARCHAR)
if act_type:
act_base = _base_type_of(act_type)
else:
act_base = ""
assert exp_type in act_base or act_base in exp_type, (
f"type mismatch {tbl_name}.{colname}: expected {exp_type}, got {act_base}"
)
assert act_notnull == exp_notnull, (
f"notnull mismatch {tbl_name}.{colname}: expected {exp_notnull}, got {act_notnull}"
)
assert act_pk == exp_pk, (
f"pk mismatch {tbl_name}.{colname}: expected {exp_pk}, got {act_pk}"
)
for tbl_name, fks in expected_fks.items():
fk_rows = conn.execute(
text(f"PRAGMA foreign_key_list('{tbl_name}')")
).fetchall() ).fetchall()
# fk_rows columns: (id, seq, table, from, to, on_update, on_delete, match) found_tables = {r[0] for r in rows}
actual_fk_list = [ assert set(expected_schema.keys()).issubset(found_tables), (
{"table": r[2], "from": r[3], "to": r[4]} for r in fk_rows f"missing tables: {set(expected_schema.keys()) - found_tables}"
] )
for efk in fks:
assert efk in actual_fk_list, f"missing FK on {tbl_name}: {efk}" # check user_version
uv = conn.execute(text("PRAGMA user_version")).fetchone()
assert uv is not None
assert int(uv[0]) == 1
# validate each table columns
for tbl_name, cols in expected_schema.items():
info_rows = conn.execute(
text(f"PRAGMA table_info({tbl_name})")
).fetchall()
# map: name -> (type, notnull, pk)
actual = {
r[1]: ((r[2] or "").upper(), int(r[3]), int(r[5]))
for r in info_rows
}
for colname, (exp_type, exp_notnull, exp_pk) in cols.items():
assert colname in actual, f"{tbl_name}: missing column {colname}"
act_type, act_notnull, act_pk = actual[colname]
# compare base type (e.g. VARCHAR(13) -> VARCHAR)
if act_type:
act_base = _base_type_of(act_type)
else:
act_base = ""
assert exp_type in act_base or act_base in exp_type, (
f"type mismatch {tbl_name}.{colname}: expected {exp_type}, got {act_base}"
)
assert act_notnull == exp_notnull, (
f"notnull mismatch {tbl_name}.{colname}: expected {exp_notnull}, got {act_notnull}"
)
assert act_pk == exp_pk, (
f"pk mismatch {tbl_name}.{colname}: expected {exp_pk}, got {act_pk}"
)
for tbl_name, fks in expected_fks.items():
fk_rows = conn.execute(
text(f"PRAGMA foreign_key_list('{tbl_name}')")
).fetchall()
# fk_rows columns: (id, seq, table, from, to, on_update, on_delete, match)
actual_fk_list = [
{"table": r[2], "from": r[3], "to": r[4]} for r in fk_rows
]
for efk in fks:
assert efk in actual_fk_list, f"missing FK on {tbl_name}: {efk}"
finally:
engine.dispose()
SQLModel.metadata.clear()