This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Any, TypeVar, cast
|
||||
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlmodel import Session, select
|
||||
|
||||
@@ -10,9 +11,14 @@ from trading_journal import models
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Mapping
|
||||
from enum import Enum
|
||||
|
||||
|
||||
def _check_enum(enum_cls: any, value: any, field_name: str) -> any:
|
||||
# Generic enum member type
|
||||
T = TypeVar("T", bound="Enum")
|
||||
|
||||
|
||||
def _check_enum(enum_cls: type[T], value: object, field_name: str) -> T:
|
||||
if value is None:
|
||||
raise ValueError(f"{field_name} is required")
|
||||
# already an enum member
|
||||
@@ -27,19 +33,41 @@ def _check_enum(enum_cls: any, value: any, field_name: str) -> any:
|
||||
raise ValueError(f"Invalid {field_name!s}: {value!r}. Allowed: {allowed}")
|
||||
|
||||
|
||||
def _allowed_columns(model: type[models.SQLModel]) -> set[str]:
|
||||
tbl = cast("models.SQLModel", model).__table__ # type: ignore[attr-defined]
|
||||
return {c.name for c in tbl.columns}
|
||||
|
||||
|
||||
AnyModel = Any
|
||||
|
||||
|
||||
def _data_to_dict(data: AnyModel) -> dict[str, AnyModel]:
|
||||
if isinstance(data, BaseModel):
|
||||
return data.model_dump(exclude_unset=True)
|
||||
if hasattr(data, "dict"):
|
||||
return data.dict(exclude_unset=True)
|
||||
return dict(data)
|
||||
|
||||
|
||||
# Trades
|
||||
def create_trade(session: Session, trade_data: Mapping) -> models.Trades:
|
||||
if hasattr(trade_data, "dict"):
|
||||
data = trade_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(trade_data)
|
||||
allowed = {c.name for c in models.Trades.__table__.columns}
|
||||
def create_trade(session: Session, trade_data: Mapping[str, Any] | BaseModel) -> models.Trades:
|
||||
data = _data_to_dict(trade_data)
|
||||
allowed = _allowed_columns(models.Trades)
|
||||
payload = {k: v for k, v in data.items() if k in allowed}
|
||||
cycle_id = payload.get("cycle_id")
|
||||
if "symbol" not in payload:
|
||||
raise ValueError("symbol is required")
|
||||
if "exchange_id" not in payload and cycle_id is None:
|
||||
raise ValueError("exchange_id is required when no cycle is attached")
|
||||
# If an exchange_id is provided (and no cycle is attached), ensure the exchange exists
|
||||
# and belongs to the same user as the trade (if user_id is provided).
|
||||
if cycle_id is None and "exchange_id" in payload:
|
||||
ex = session.get(models.Exchanges, payload["exchange_id"])
|
||||
if ex is None:
|
||||
raise ValueError("exchange_id does not exist")
|
||||
user_id = payload.get("user_id")
|
||||
if user_id is not None and ex.user_id != user_id:
|
||||
raise ValueError("exchange.user_id does not match trade.user_id")
|
||||
if "underlying_currency" not in payload:
|
||||
raise ValueError("underlying_currency is required")
|
||||
payload["underlying_currency"] = _check_enum(models.UnderlyingCurrency, payload["underlying_currency"], "underlying_currency")
|
||||
@@ -132,7 +160,7 @@ def get_trades_by_user_id(session: Session, user_id: int) -> list[models.Trades]
|
||||
statement = select(models.Trades).where(
|
||||
models.Trades.user_id == user_id,
|
||||
)
|
||||
return session.exec(statement).all()
|
||||
return list(session.exec(statement).all())
|
||||
|
||||
|
||||
def update_trade_note(session: Session, trade_id: int, note: str) -> models.Trades:
|
||||
@@ -168,23 +196,17 @@ def invalidate_trade(session: Session, trade_id: int) -> models.Trades:
|
||||
return trade
|
||||
|
||||
|
||||
def replace_trade(session: Session, old_trade_id: int, new_trade_data: Mapping) -> models.Trades:
|
||||
def replace_trade(session: Session, old_trade_id: int, new_trade_data: Mapping[str, Any] | BaseModel) -> models.Trades:
|
||||
invalidate_trade(session, old_trade_id)
|
||||
if hasattr(new_trade_data, "dict"):
|
||||
data = new_trade_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(new_trade_data)
|
||||
data = _data_to_dict(new_trade_data)
|
||||
data["replaced_by_trade_id"] = old_trade_id
|
||||
return create_trade(session, data)
|
||||
|
||||
|
||||
# Cycles
|
||||
def create_cycle(session: Session, cycle_data: Mapping) -> models.Cycles:
|
||||
if hasattr(cycle_data, "dict"):
|
||||
data = cycle_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(cycle_data)
|
||||
allowed = {c.name for c in models.Cycles.__table__.columns}
|
||||
def create_cycle(session: Session, cycle_data: Mapping[str, Any] | BaseModel) -> models.Cycles:
|
||||
data = _data_to_dict(cycle_data)
|
||||
allowed = _allowed_columns(models.Cycles)
|
||||
payload = {k: v for k, v in data.items() if k in allowed}
|
||||
if "user_id" not in payload:
|
||||
raise ValueError("user_id is required")
|
||||
@@ -192,6 +214,12 @@ def create_cycle(session: Session, cycle_data: Mapping) -> models.Cycles:
|
||||
raise ValueError("symbol is required")
|
||||
if "exchange_id" not in payload:
|
||||
raise ValueError("exchange_id is required")
|
||||
# ensure the exchange exists and belongs to the same user
|
||||
ex = session.get(models.Exchanges, payload["exchange_id"])
|
||||
if ex is None:
|
||||
raise ValueError("exchange_id does not exist")
|
||||
if ex.user_id != payload.get("user_id"):
|
||||
raise ValueError("exchange.user_id does not match cycle.user_id")
|
||||
if "underlying_currency" not in payload:
|
||||
raise ValueError("underlying_currency is required")
|
||||
payload["underlying_currency"] = _check_enum(models.UnderlyingCurrency, payload["underlying_currency"], "underlying_currency")
|
||||
@@ -215,21 +243,26 @@ def create_cycle(session: Session, cycle_data: Mapping) -> models.Cycles:
|
||||
IMMUTABLE_CYCLE_FIELDS = {"id", "user_id", "start_date", "created_at"}
|
||||
|
||||
|
||||
def update_cycle(session: Session, cycle_id: int, update_data: Mapping) -> models.Cycles:
|
||||
def update_cycle(session: Session, cycle_id: int, update_data: Mapping[str, Any] | BaseModel) -> models.Cycles:
|
||||
cycle: models.Cycles | None = session.get(models.Cycles, cycle_id)
|
||||
if cycle is None:
|
||||
raise ValueError("cycle_id does not exist")
|
||||
if hasattr(update_data, "dict"):
|
||||
data = update_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(update_data)
|
||||
data = _data_to_dict(update_data)
|
||||
|
||||
allowed = {c.name for c in models.Cycles.__table__.columns}
|
||||
allowed = _allowed_columns(models.Cycles)
|
||||
for k, v in data.items():
|
||||
if k in IMMUTABLE_CYCLE_FIELDS:
|
||||
raise ValueError(f"field {k!r} is immutable")
|
||||
if k not in allowed:
|
||||
continue
|
||||
# If trying to change exchange_id, ensure the new exchange exists and belongs to
|
||||
# the same user as the cycle.
|
||||
if k == "exchange_id":
|
||||
ex = session.get(models.Exchanges, v)
|
||||
if ex is None:
|
||||
raise ValueError("exchange_id does not exist")
|
||||
if ex.user_id != cycle.user_id:
|
||||
raise ValueError("exchange.user_id does not match cycle.user_id")
|
||||
if k == "underlying_currency":
|
||||
v = _check_enum(models.UnderlyingCurrency, v, "underlying_currency") # noqa: PLW2901
|
||||
if k == "status":
|
||||
@@ -249,12 +282,9 @@ def update_cycle(session: Session, cycle_id: int, update_data: Mapping) -> model
|
||||
IMMUTABLE_EXCHANGE_FIELDS = {"id"}
|
||||
|
||||
|
||||
def create_exchange(session: Session, exchange_data: Mapping) -> models.Exchanges:
|
||||
if hasattr(exchange_data, "dict"):
|
||||
data = exchange_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(exchange_data)
|
||||
allowed = {c.name for c in models.Exchanges.__table__.columns}
|
||||
def create_exchange(session: Session, exchange_data: Mapping[str, Any] | BaseModel) -> models.Exchanges:
|
||||
data = _data_to_dict(exchange_data)
|
||||
allowed = _allowed_columns(models.Exchanges)
|
||||
payload = {k: v for k, v in data.items() if k in allowed}
|
||||
if "name" not in payload:
|
||||
raise ValueError("name is required")
|
||||
@@ -284,25 +314,22 @@ def get_exchange_by_name_and_user_id(session: Session, name: str, user_id: int)
|
||||
|
||||
def get_all_exchanges(session: Session) -> list[models.Exchanges]:
|
||||
statement = select(models.Exchanges)
|
||||
return session.exec(statement).all()
|
||||
return list(session.exec(statement).all())
|
||||
|
||||
|
||||
def get_all_exchanges_by_user_id(session: Session, user_id: int) -> list[models.Exchanges]:
|
||||
statement = select(models.Exchanges).where(
|
||||
models.Exchanges.user_id == user_id,
|
||||
)
|
||||
return session.exec(statement).all()
|
||||
return list(session.exec(statement).all())
|
||||
|
||||
|
||||
def update_exchange(session: Session, exchange_id: int, update_data: Mapping) -> models.Exchanges:
|
||||
def update_exchange(session: Session, exchange_id: int, update_data: Mapping[str, Any] | BaseModel) -> models.Exchanges:
|
||||
exchange: models.Exchanges | None = session.get(models.Exchanges, exchange_id)
|
||||
if exchange is None:
|
||||
raise ValueError("exchange_id does not exist")
|
||||
if hasattr(update_data, "dict"):
|
||||
data = update_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(update_data)
|
||||
allowed = {c.name for c in models.Exchanges.__table__.columns}
|
||||
data = _data_to_dict(update_data)
|
||||
allowed = _allowed_columns(models.Exchanges)
|
||||
for k, v in data.items():
|
||||
if k in IMMUTABLE_EXCHANGE_FIELDS:
|
||||
raise ValueError(f"field {k!r} is immutable")
|
||||
@@ -334,12 +361,9 @@ def delete_exchange(session: Session, exchange_id: int) -> None:
|
||||
IMMUTABLE_USER_FIELDS = {"id", "username", "created_at"}
|
||||
|
||||
|
||||
def create_user(session: Session, user_data: Mapping) -> models.Users:
|
||||
if hasattr(user_data, "dict"):
|
||||
data = user_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(user_data)
|
||||
allowed = {c.name for c in models.Users.__table__.columns}
|
||||
def create_user(session: Session, user_data: Mapping[str, Any] | BaseModel) -> models.Users:
|
||||
data = _data_to_dict(user_data)
|
||||
allowed = _allowed_columns(models.Users)
|
||||
payload = {k: v for k, v in data.items() if k in allowed}
|
||||
if "username" not in payload:
|
||||
raise ValueError("username is required")
|
||||
@@ -368,15 +392,12 @@ def get_user_by_username(session: Session, username: str) -> models.Users | None
|
||||
return session.exec(statement).first()
|
||||
|
||||
|
||||
def update_user(session: Session, user_id: int, update_data: Mapping) -> models.Users:
|
||||
def update_user(session: Session, user_id: int, update_data: Mapping[str, Any] | BaseModel) -> models.Users:
|
||||
user: models.Users | None = session.get(models.Users, user_id)
|
||||
if user is None:
|
||||
raise ValueError("user_id does not exist")
|
||||
if hasattr(update_data, "dict"):
|
||||
data = update_data.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(update_data)
|
||||
allowed = {c.name for c in models.Users.__table__.columns}
|
||||
data = _data_to_dict(update_data)
|
||||
allowed = _allowed_columns(models.Users)
|
||||
for k, v in data.items():
|
||||
if k in IMMUTABLE_USER_FIELDS:
|
||||
raise ValueError(f"field {k!r} is immutable")
|
||||
@@ -405,10 +426,11 @@ def create_login_session(
|
||||
user: models.Users | None = session.get(models.Users, user_id)
|
||||
if user is None:
|
||||
raise ValueError("user_id does not exist")
|
||||
user_id_val = cast("int", user.id)
|
||||
now = datetime.now(timezone.utc)
|
||||
expires_at = now + timedelta(seconds=session_length_seconds)
|
||||
s = models.Sessions(
|
||||
user_id=user.id,
|
||||
user_id=user_id_val,
|
||||
session_token_hash=session_token_hash,
|
||||
created_at=now,
|
||||
expires_at=expires_at,
|
||||
@@ -449,7 +471,7 @@ def get_login_session_by_token_hash(session: Session, session_token_hash: str) -
|
||||
IMMUTABLE_SESSION_FIELDS = {"id", "user_id", "session_token_hash", "created_at"}
|
||||
|
||||
|
||||
def update_login_session(session: Session, session_token_hashed: str, update_session: Mapping) -> models.Sessions | None:
|
||||
def update_login_session(session: Session, session_token_hashed: str, update_session: Mapping[str, Any] | BaseModel) -> models.Sessions | None:
|
||||
login_session: models.Sessions | None = session.exec(
|
||||
select(models.Sessions).where(
|
||||
models.Sessions.session_token_hash == session_token_hashed,
|
||||
@@ -458,11 +480,8 @@ def update_login_session(session: Session, session_token_hashed: str, update_ses
|
||||
).first()
|
||||
if login_session is None:
|
||||
return None
|
||||
if hasattr(update_session, "dict"):
|
||||
data = update_session.dict(exclude_unset=True)
|
||||
else:
|
||||
data = dict(update_session)
|
||||
allowed = {c.name for c in models.Sessions.__table__.columns}
|
||||
data = _data_to_dict(update_session)
|
||||
allowed = _allowed_columns(models.Sessions)
|
||||
for k, v in data.items():
|
||||
if k in allowed and k not in IMMUTABLE_SESSION_FIELDS:
|
||||
setattr(login_session, k, v)
|
||||
|
||||
Reference in New Issue
Block a user