add crud for exchange

This commit is contained in:
2025-09-22 14:39:33 +02:00
parent 1fbc93353d
commit 76ed38e9af
2 changed files with 150 additions and 0 deletions

View File

@@ -245,6 +245,83 @@ def update_cycle(session: Session, cycle_id: int, update_data: Mapping) -> model
return cycle
# Exchanges
IMMUTABLE_EXCHANGE_FIELDS = {"id"}
def create_exchange(session: Session, exchange_data: Mapping) -> models.Exchanges:
if hasattr(exchange_data, "dict"):
data = exchange_data.dict(exclude_unset=True)
else:
data = dict(exchange_data)
allowed = {c.name for c in models.Exchanges.__table__.columns}
payload = {k: v for k, v in data.items() if k in allowed}
if "name" not in payload:
raise ValueError("name is required")
e = models.Exchanges(**payload)
session.add(e)
try:
session.flush()
except IntegrityError as e:
session.rollback()
raise ValueError("create_exchange integrity error") from e
session.refresh(e)
return e
def get_exchange_by_id(session: Session, exchange_id: int) -> models.Exchanges | None:
return session.get(models.Exchanges, exchange_id)
def get_exchange_by_name(session: Session, name: str) -> models.Exchanges | None:
statement = select(models.Exchanges).where(
models.Exchanges.name == name,
)
return session.exec(statement).first()
def get_all_exchanges(session: Session) -> list[models.Exchanges]:
statement = select(models.Exchanges)
return session.exec(statement).all()
def update_exchange(session: Session, exchange_id: int, update_data: Mapping) -> models.Exchanges:
exchange: models.Exchanges | None = session.get(models.Exchanges, exchange_id)
if exchange is None:
raise ValueError("exchange_id does not exist")
if hasattr(update_data, "dict"):
data = update_data.dict(exclude_unset=True)
else:
data = dict(update_data)
allowed = {c.name for c in models.Exchanges.__table__.columns}
for k, v in data.items():
if k in IMMUTABLE_EXCHANGE_FIELDS:
raise ValueError(f"field {k!r} is immutable")
if k in allowed:
setattr(exchange, k, v)
session.add(exchange)
try:
session.flush()
except IntegrityError as e:
session.rollback()
raise ValueError("update_exchange integrity error") from e
session.refresh(exchange)
return exchange
def delete_exchange(session: Session, exchange_id: int) -> None:
exchange: models.Exchanges | None = session.get(models.Exchanges, exchange_id)
if exchange is None:
return
session.delete(exchange)
try:
session.flush()
except IntegrityError as e:
session.rollback()
raise ValueError("delete_exchange integrity error") from e
# Users
IMMUTABLE_USER_FIELDS = {"id", "username", "created_at"}