Files
2026-moving-helper/app/main.py
T
tliu93 70b0cf08ee
test / pytest (push) Successful in 1m20s
docker-image / build-and-push (push) Successful in 5m6s
Add AI search query expansion
2026-06-01 21:28:29 +02:00

840 lines
30 KiB
Python

from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
from fastapi.responses import FileResponse, RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, false, or_
from sqlalchemy.orm import Session
from app.db import get_db, init_db
from app.images import process_upload
from app.llm import expand_query, is_configured, test_connection
from app.llm import LLMResult
from app.models import Box, Item, SubItem
from app.settings_store import LLMConfig, get_app_settings, save_app_settings
templates = Jinja2Templates(directory="app/templates")
STATIC_DIR = Path("app/static")
def _clean_text(value: str | None) -> str | None:
if value is None:
return None
cleaned = value.strip()
return cleaned or None
def _parse_quantity(value: str | None) -> int | None:
cleaned = _clean_text(value)
if cleaned is None:
return None
return int(cleaned)
def _is_checked(value: str | None) -> bool:
return value == "on"
def _get_box_or_404(db: Session, box_id: int) -> Box:
box = db.get(Box, box_id)
if box is None:
raise HTTPException(status_code=404, detail="Box not found")
return box
def _get_item_or_404(db: Session, item_id: int) -> Item:
item = db.get(Item, item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
def _get_subitem_or_404(db: Session, subitem_id: int) -> SubItem:
subitem = db.get(SubItem, subitem_id)
if subitem is None:
raise HTTPException(status_code=404, detail="Sub-item not found")
return subitem
def _require_container_item(item: Item) -> None:
if not item.is_container:
raise HTTPException(status_code=400, detail="Only container items can have sub-items")
def _set_image_fields(target, processed_image) -> None:
if processed_image is None:
return
target.image_blob = processed_image.blob
target.image_mime_type = processed_image.mime_type
target.image_width = processed_image.width
target.image_height = processed_image.height
def _clear_image_fields(target) -> None:
target.image_blob = None
target.image_mime_type = None
target.image_width = None
target.image_height = None
def _image_response_or_404(target) -> Response:
if target.image_blob is None or target.image_mime_type is None:
raise HTTPException(status_code=404, detail="Image not found")
return Response(content=target.image_blob, media_type=target.image_mime_type)
def _wants_add_next(submit_action: str | None) -> bool:
return submit_action == "save_and_add_next"
def _validate_settings_origin(request: Request) -> str | None:
"""Check Origin/Referer for same-host browser requests.
Returns an error message if validation fails, or None if OK.
Missing both headers (e.g. curl, API call) is allowed for now.
"""
origin = request.headers.get("origin")
referer = request.headers.get("referer")
if origin:
host = request.headers.get("host", "")
# origin includes scheme, host only has host:port
from urllib.parse import urlparse
parsed = urlparse(origin)
origin_host = parsed.netloc
if origin_host != host:
return "请求来源与当前站点不一致,操作被拒绝。"
elif referer:
host = request.headers.get("host", "")
from urllib.parse import urlparse
parsed = urlparse(referer)
referer_host = parsed.netloc
if referer_host != host:
return "请求来源与当前站点不一致,操作被拒绝。"
return None
def _validate_base_url_scheme(base_url: str) -> str | None:
"""Return an error message if base_url scheme is not allowed, else None."""
from urllib.parse import urlparse
parsed = urlparse(base_url)
if parsed.scheme not in ("https", "http"):
return "Base URL 必须以 http:// 或 https:// 开头。"
return None
def _format_average(total: int, divisor: int) -> str:
if divisor == 0:
return "0.0"
return f"{total / divisor:.1f}"
def _build_boxes_overview_summary(db: Session) -> dict[str, int | str]:
box_count = db.query(func.count(Box.id)).scalar() or 0
item_count = db.query(func.count(Item.id)).scalar() or 0
subitem_count = db.query(func.count(SubItem.id)).scalar() or 0
container_item_count = (
db.query(func.count(Item.id))
.filter(Item.is_container.is_(True))
.scalar()
or 0
)
return {
"box_count": box_count,
"item_count": item_count,
"item_and_subitem_count": item_count + subitem_count,
"avg_items_per_box": _format_average(item_count, box_count),
"avg_subitems_per_container_item": _format_average(
subitem_count,
container_item_count,
),
}
def _build_search_results(db: Session, query: str | list[str]) -> list[dict]:
"""Search Box / Item / SubItem by name and note using case-insensitive LIKE.
Accepts either a single query string or a list of keywords.
When multiple keywords are given, they are combined with OR — a match on
*any* keyword is sufficient.
"""
keywords = [query] if isinstance(query, str) else query
patterns = [f"%{kw.lower()}%" for kw in keywords]
def _or_like(column, note_column):
"""Build an OR filter that matches any pattern on either column."""
conditions = []
for pat in patterns:
conditions.append(func.lower(column).like(pat))
conditions.append(func.lower(func.coalesce(note_column, "")).like(pat))
return or_(false(), *conditions) if conditions else false()
results: list[dict] = []
seen_ids: set[tuple[str, int]] = set()
def _add(result_type: str, obj_id: int, entry: dict) -> None:
key = (result_type, obj_id)
if key not in seen_ids:
seen_ids.add(key)
results.append(entry)
box_matches = (
db.query(Box)
.filter(_or_like(Box.name, Box.note))
.order_by(Box.id.desc())
.all()
)
for box in box_matches:
_add("Box", box.id, {
"type": "Box",
"name": box.name,
"note": box.note,
"detail_url": f"/boxes/{box.id}",
"detail_label": "查看箱子",
"secondary_url": None,
"secondary_label": None,
"path": "顶层箱子",
"is_container": None,
"image_url": f"/boxes/{box.id}/image" if box.image_blob else None,
})
item_matches = (
db.query(Item)
.join(Item.box)
.filter(_or_like(Item.name, Item.note))
.order_by(Item.id.desc())
.all()
)
for item in item_matches:
_add("Item", item.id, {
"type": "Item",
"name": item.name,
"note": item.note,
"detail_url": f"/items/{item.id}",
"detail_label": "查看物品",
"secondary_url": f"/boxes/{item.box.id}",
"secondary_label": "查看所属箱子",
"path": f"位于箱子:{item.box.name}",
"is_container": item.is_container,
"image_url": f"/items/{item.id}/image" if item.image_blob else None,
})
subitem_matches = (
db.query(SubItem)
.join(SubItem.parent_item)
.join(Item.box)
.filter(_or_like(SubItem.name, SubItem.note))
.order_by(SubItem.id.desc())
.all()
)
for subitem in subitem_matches:
_add("SubItem", subitem.id, {
"type": "SubItem",
"name": subitem.name,
"note": subitem.note,
"detail_url": f"/items/{subitem.parent_item.id}",
"detail_label": "查看所属物品",
"secondary_url": f"/boxes/{subitem.parent_item.box.id}",
"secondary_label": "查看所属箱子",
"path": (
f"位于物品:{subitem.parent_item.name} / "
f"箱子:{subitem.parent_item.box.name}"
),
"is_container": None,
"image_url": f"/subitems/{subitem.id}/image" if subitem.image_blob else None,
})
return results
def _ai_search(db: Session, cfg: "LLMConfig", query: str) -> tuple[list[str], list[dict], str | None]:
"""Swappable AI search seam.
Returns ``(expanded_terms, results, error_message)``.
- On success: expanded terms + broadened results, ``error_message`` is ``None``.
- On failure (timeout, network error, HTTP error): empty terms + normal LIKE
results + friendly error message.
- On empty expansion (model returned ``[]`` legitimately): empty terms + normal
results, ``error_message`` is ``None``.
"""
result = expand_query(cfg, query, extra_hints=cfg.ai_search_extra_hints)
if result.error:
# Real failure (timeout / network / HTTP) → show error + fallback
results = _build_search_results(db, query)
return [], results, result.error
if not result.terms:
# Legitimate empty expansion → normal results, no error
results = _build_search_results(db, query)
return [], results, None
# Deduplicate: original query + expanded terms
all_terms = list(dict.fromkeys([query] + result.terms))
results = _build_search_results(db, all_terms)
return result.terms, results, None
def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
yield
app = FastAPI(title="搬家助手", lifespan=lifespan)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
@app.get("/", include_in_schema=False)
def root() -> RedirectResponse:
return RedirectResponse(url="/boxes", status_code=status.HTTP_302_FOUND)
@app.get("/manifest.webmanifest", include_in_schema=False)
def manifest() -> FileResponse:
return FileResponse(
path=STATIC_DIR / "manifest.webmanifest",
media_type="application/manifest+json",
)
@app.get("/service-worker.js", include_in_schema=False)
def service_worker() -> FileResponse:
return FileResponse(
path=STATIC_DIR / "service-worker.js",
media_type="application/javascript",
)
@app.get("/search")
def search_page(
request: Request,
q: str | None = None,
ai: str | None = None,
db: Session = Depends(get_db),
):
query = (q or "").strip()
cfg = get_app_settings(db)
ai_requested = ai == "1"
ai_available = cfg.ai_search_enabled and is_configured(cfg)
expanded_terms: list[str] = []
ai_error: str | None = None
if query:
if ai_requested and ai_available:
try:
expanded_terms, results, ai_error = _ai_search(db, cfg, query)
except Exception: # noqa: BLE001 — graceful degradation
ai_error = "AI 搜索暂时不可用,已回退到普通搜索。"
results = _build_search_results(db, query)
else:
results = _build_search_results(db, query)
else:
results = []
return templates.TemplateResponse(
request=request,
name="search/index.html",
context={
"page_title": "搜索",
"query": query,
"results": results,
"searched": bool(query),
"ai_activated": ai_requested and ai_available and bool(query),
"expanded_terms": expanded_terms,
"ai_error": ai_error,
"ai_available": ai_available,
},
)
@app.get("/boxes")
def list_boxes(request: Request, db: Session = Depends(get_db)):
boxes = db.query(Box).order_by(Box.id.desc()).all()
summary = _build_boxes_overview_summary(db)
return templates.TemplateResponse(
request=request,
name="boxes/index.html",
context={"page_title": "箱子", "boxes": boxes, "summary": summary},
)
# ------------------------------------------------------------------
# Settings
# ------------------------------------------------------------------
@app.get("/settings")
def settings_page(request: Request, db: Session = Depends(get_db)):
cfg = get_app_settings(db)
return templates.TemplateResponse(
request=request,
name="settings/form.html",
context={
"page_title": "设置",
"config": cfg,
"api_key_configured": bool(cfg.api_key),
"test_result": None,
},
)
@app.post("/settings")
def save_settings(
request: Request,
enabled: str | None = Form(default=None),
base_url: str | None = Form(default=None),
model: str | None = Form(default=None),
api_key: str | None = Form(default=None),
ai_search_enabled: str | None = Form(default=None),
ai_search_extra_hints: str | None = Form(default=None),
db: Session = Depends(get_db),
) -> Response:
# Origin/Referer check for browser requests
origin_error = _validate_settings_origin(request)
if origin_error:
raise HTTPException(status_code=403, detail=origin_error)
resolved_base_url = _clean_text(base_url) or "https://api.openai.com/v1"
# Validate base_url scheme
scheme_error = _validate_base_url_scheme(resolved_base_url)
if scheme_error:
raise HTTPException(status_code=400, detail=scheme_error)
resolved_model = _clean_text(model) or ""
# Only base_url change counts as an endpoint change — model switches
# under the same base_url do not require a new key.
existing_cfg = get_app_settings(db)
submitted_key = _clean_text(api_key)
base_url_changed = resolved_base_url != existing_cfg.base_url
if base_url_changed and submitted_key is None:
# base_url changed but no new key provided — refuse to save,
# return to settings page with a clear error message.
return templates.TemplateResponse(
request=request,
name="settings/form.html",
context={
"page_title": "设置",
"config": LLMConfig(
enabled=enabled == "on",
base_url=resolved_base_url,
model=resolved_model,
api_key=existing_cfg.api_key,
ai_search_enabled=ai_search_enabled == "on",
ai_search_extra_hints=_clean_text(ai_search_extra_hints) or "",
),
"api_key_configured": bool(existing_cfg.api_key),
"test_result": LLMResult(
success=False,
message="Base URL 已变更,请重新输入 API Key 后保存。",
),
},
)
# submitted_key is None → keep old key; str (including "") → use new value
resolved_api_key = submitted_key
resolved_extra_hints = _clean_text(ai_search_extra_hints) or ""
save_app_settings(
db,
enabled=enabled == "on",
base_url=resolved_base_url,
model=resolved_model,
api_key=resolved_api_key,
ai_search_enabled=ai_search_enabled == "on",
ai_search_extra_hints=resolved_extra_hints,
)
return RedirectResponse(url="/settings", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/settings/test")
def test_settings_connection(
request: Request,
enabled: str | None = Form(default=None),
base_url: str | None = Form(default=None),
model: str | None = Form(default=None),
api_key: str | None = Form(default=None),
ai_search_enabled: str | None = Form(default=None),
ai_search_extra_hints: str | None = Form(default=None),
db: Session = Depends(get_db),
):
# Origin/Referer check for browser requests
origin_error = _validate_settings_origin(request)
if origin_error:
raise HTTPException(status_code=403, detail=origin_error)
resolved_base_url = _clean_text(base_url) or "https://api.openai.com/v1"
# Validate base_url scheme
scheme_error = _validate_base_url_scheme(resolved_base_url)
if scheme_error:
raise HTTPException(status_code=400, detail=scheme_error)
resolved_model = _clean_text(model) or ""
# Only reuse stored key if base_url matches saved config. Model switches
# under the same base_url can use the same key; a base_url change cannot.
existing_cfg = get_app_settings(db)
submitted_key = _clean_text(api_key)
base_url_matches = resolved_base_url == existing_cfg.base_url
if base_url_matches and submitted_key is None:
resolved_api_key = existing_cfg.api_key
elif submitted_key is not None:
resolved_api_key = submitted_key
else:
# base_url changed but no key provided — refuse to test
return templates.TemplateResponse(
request=request,
name="settings/form.html",
context={
"page_title": "设置",
"config": LLMConfig(
enabled=enabled == "on",
base_url=resolved_base_url,
model=resolved_model,
api_key="",
ai_search_enabled=ai_search_enabled == "on",
ai_search_extra_hints=_clean_text(ai_search_extra_hints) or "",
),
"api_key_configured": bool(existing_cfg.api_key),
"test_result": LLMResult(
success=False,
message="Base URL 已变更,请重新输入 API Key 后再测试。",
),
},
)
test_cfg = LLMConfig(
enabled=enabled == "on",
base_url=resolved_base_url,
model=resolved_model,
api_key=resolved_api_key or "",
ai_search_enabled=ai_search_enabled == "on",
ai_search_extra_hints=_clean_text(ai_search_extra_hints) or "",
)
result = test_connection(test_cfg)
return templates.TemplateResponse(
request=request,
name="settings/form.html",
context={
"page_title": "设置",
"config": test_cfg,
"api_key_configured": bool(test_cfg.api_key),
"test_result": result,
},
)
@app.get("/boxes/new")
def new_box_page(request: Request):
return templates.TemplateResponse(
request=request,
name="boxes/form.html",
context={
"page_title": "新建箱子",
"box": None,
"form_action": "/boxes",
"submit_label": "创建箱子",
},
)
@app.post("/boxes")
def create_box(
name: str = Form(...),
note: str | None = Form(default=None),
room: str | None = Form(default=None),
status_text: str | None = Form(default=None, alias="status"),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
box = Box(
name=name.strip(),
note=_clean_text(note),
room=_clean_text(room),
status=_clean_text(status_text),
)
_set_image_fields(box, process_upload(image_file))
db.add(box)
db.commit()
db.refresh(box)
return RedirectResponse(url=f"/boxes/{box.id}", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/boxes/{box_id}")
def show_box(box_id: int, request: Request, db: Session = Depends(get_db)):
box = _get_box_or_404(db, box_id)
return templates.TemplateResponse(
request=request,
name="boxes/show.html",
context={"page_title": box.name, "box": box},
)
@app.get("/boxes/{box_id}/image")
def get_box_image(box_id: int, db: Session = Depends(get_db)) -> Response:
return _image_response_or_404(_get_box_or_404(db, box_id))
@app.get("/boxes/{box_id}/edit")
def edit_box_page(box_id: int, request: Request, db: Session = Depends(get_db)):
box = _get_box_or_404(db, box_id)
return templates.TemplateResponse(
request=request,
name="boxes/form.html",
context={
"page_title": f"编辑箱子:{box.name}",
"box": box,
"form_action": f"/boxes/{box.id}/update",
"submit_label": "保存箱子",
},
)
@app.post("/boxes/{box_id}/update")
def update_box(
box_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
room: str | None = Form(default=None),
status_text: str | None = Form(default=None, alias="status"),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
box.name = name.strip()
box.note = _clean_text(note)
box.room = _clean_text(room)
box.status = _clean_text(status_text)
_set_image_fields(box, process_upload(image_file))
db.commit()
return RedirectResponse(url=f"/boxes/{box.id}", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/boxes/{box_id}/image/delete")
def delete_box_image(box_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
_clear_image_fields(box)
db.commit()
return RedirectResponse(url=f"/boxes/{box.id}/edit", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/boxes/{box_id}/delete")
def delete_box(box_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
db.delete(box)
db.commit()
return RedirectResponse(url="/boxes", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/boxes/{box_id}/items/new")
def new_item_page(box_id: int, request: Request, db: Session = Depends(get_db)):
box = _get_box_or_404(db, box_id)
return templates.TemplateResponse(
request=request,
name="items/form.html",
context={
"page_title": f"为箱子“{box.name}”添加物品",
"box": box,
"item": None,
"form_action": f"/boxes/{box.id}/items",
"submit_label": "创建物品",
},
)
@app.post("/boxes/{box_id}/items")
def create_item(
box_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
is_container: str | None = Form(default=None),
submit_action: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
item = Item(
box=box,
name=name.strip(),
note=_clean_text(note),
quantity=_parse_quantity(quantity),
is_container=_is_checked(is_container),
)
_set_image_fields(item, process_upload(image_file))
db.add(item)
db.commit()
db.refresh(item)
if _wants_add_next(submit_action):
redirect_url = f"/boxes/{box.id}/items/new"
elif item.is_container:
redirect_url = f"/items/{item.id}"
else:
redirect_url = f"/boxes/{box.id}"
return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER)
@app.get("/items/{item_id}")
def show_item(item_id: int, request: Request, db: Session = Depends(get_db)):
item = _get_item_or_404(db, item_id)
return templates.TemplateResponse(
request=request,
name="items/show.html",
context={"page_title": item.name, "item": item},
)
@app.get("/items/{item_id}/image")
def get_item_image(item_id: int, db: Session = Depends(get_db)) -> Response:
return _image_response_or_404(_get_item_or_404(db, item_id))
@app.get("/items/{item_id}/edit")
def edit_item_page(item_id: int, request: Request, db: Session = Depends(get_db)):
item = _get_item_or_404(db, item_id)
return templates.TemplateResponse(
request=request,
name="items/form.html",
context={
"page_title": f"编辑物品:{item.name}",
"box": item.box,
"item": item,
"form_action": f"/items/{item.id}/update",
"submit_label": "保存物品",
},
)
@app.post("/items/{item_id}/update")
def update_item(
item_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
is_container: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
item.name = name.strip()
item.note = _clean_text(note)
item.quantity = _parse_quantity(quantity)
item.is_container = _is_checked(is_container)
if not item.is_container:
item.subitems.clear()
_set_image_fields(item, process_upload(image_file))
db.commit()
return RedirectResponse(url=f"/items/{item.id}", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/items/{item_id}/image/delete")
def delete_item_image(item_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
_clear_image_fields(item)
db.commit()
return RedirectResponse(url=f"/items/{item.id}/edit", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/items/{item_id}/delete")
def delete_item(item_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
box_id = item.box_id
db.delete(item)
db.commit()
return RedirectResponse(url=f"/boxes/{box_id}", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/items/{item_id}/subitems/new")
def new_subitem_page(item_id: int, request: Request, db: Session = Depends(get_db)):
item = _get_item_or_404(db, item_id)
_require_container_item(item)
return templates.TemplateResponse(
request=request,
name="subitems/form.html",
context={
"page_title": f"为“{item.name}”添加子物品",
"item": item,
"subitem": None,
"form_action": f"/items/{item.id}/subitems",
"submit_label": "创建子物品",
},
)
@app.post("/items/{item_id}/subitems")
def create_subitem(
item_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
submit_action: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
_require_container_item(item)
subitem = SubItem(
parent_item=item,
name=name.strip(),
note=_clean_text(note),
quantity=_parse_quantity(quantity),
)
_set_image_fields(subitem, process_upload(image_file))
db.add(subitem)
db.commit()
db.refresh(subitem)
if _wants_add_next(submit_action):
redirect_url = f"/items/{item.id}/subitems/new"
else:
redirect_url = f"/items/{item.id}"
return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER)
@app.get("/subitems/{subitem_id}/image")
def get_subitem_image(subitem_id: int, db: Session = Depends(get_db)) -> Response:
return _image_response_or_404(_get_subitem_or_404(db, subitem_id))
@app.get("/subitems/{subitem_id}/edit")
def edit_subitem_page(subitem_id: int, request: Request, db: Session = Depends(get_db)):
subitem = _get_subitem_or_404(db, subitem_id)
return templates.TemplateResponse(
request=request,
name="subitems/form.html",
context={
"page_title": f"编辑子物品:{subitem.name}",
"item": subitem.parent_item,
"subitem": subitem,
"form_action": f"/subitems/{subitem.id}/update",
"submit_label": "保存子物品",
},
)
@app.post("/subitems/{subitem_id}/update")
def update_subitem(
subitem_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
subitem = _get_subitem_or_404(db, subitem_id)
subitem.name = name.strip()
subitem.note = _clean_text(note)
subitem.quantity = _parse_quantity(quantity)
_set_image_fields(subitem, process_upload(image_file))
db.commit()
return RedirectResponse(
url=f"/items/{subitem.parent_item_id}",
status_code=status.HTTP_303_SEE_OTHER,
)
@app.post("/subitems/{subitem_id}/image/delete")
def delete_subitem_image(subitem_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
subitem = _get_subitem_or_404(db, subitem_id)
_clear_image_fields(subitem)
db.commit()
return RedirectResponse(url=f"/subitems/{subitem.id}/edit", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/subitems/{subitem_id}/delete")
def delete_subitem(subitem_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
subitem = _get_subitem_or_404(db, subitem_id)
item_id = subitem.parent_item_id
db.delete(subitem)
db.commit()
return RedirectResponse(url=f"/items/{item_id}", status_code=status.HTTP_303_SEE_OTHER)
return app
app = create_app()