from contextlib import asynccontextmanager from pathlib import Path from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status from fastapi.responses import FileResponse, RedirectResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy import func, or_ from sqlalchemy.orm import Session from app.db import get_db, init_db from app.images import process_upload from app.models import Box, Item, SubItem templates = Jinja2Templates(directory="app/templates") STATIC_DIR = Path("app/static") def _clean_text(value: str | None) -> str | None: if value is None: return None cleaned = value.strip() return cleaned or None def _parse_quantity(value: str | None) -> int | None: cleaned = _clean_text(value) if cleaned is None: return None return int(cleaned) def _is_checked(value: str | None) -> bool: return value == "on" def _get_box_or_404(db: Session, box_id: int) -> Box: box = db.get(Box, box_id) if box is None: raise HTTPException(status_code=404, detail="Box not found") return box def _get_item_or_404(db: Session, item_id: int) -> Item: item = db.get(Item, item_id) if item is None: raise HTTPException(status_code=404, detail="Item not found") return item def _get_subitem_or_404(db: Session, subitem_id: int) -> SubItem: subitem = db.get(SubItem, subitem_id) if subitem is None: raise HTTPException(status_code=404, detail="Sub-item not found") return subitem def _require_container_item(item: Item) -> None: if not item.is_container: raise HTTPException(status_code=400, detail="Only container items can have sub-items") def _set_image_fields(target, processed_image) -> None: if processed_image is None: return target.image_blob = processed_image.blob target.image_mime_type = processed_image.mime_type target.image_width = processed_image.width target.image_height = processed_image.height def _clear_image_fields(target) -> None: target.image_blob = None target.image_mime_type = None target.image_width = None target.image_height = None def _image_response_or_404(target) -> Response: if target.image_blob is None or target.image_mime_type is None: raise HTTPException(status_code=404, detail="Image not found") return Response(content=target.image_blob, media_type=target.image_mime_type) def _wants_add_next(submit_action: str | None) -> bool: return submit_action == "save_and_add_next" def _format_average(total: int, divisor: int) -> str: if divisor == 0: return "0.0" return f"{total / divisor:.1f}" def _build_boxes_overview_summary(db: Session) -> dict[str, int | str]: box_count = db.query(func.count(Box.id)).scalar() or 0 item_count = db.query(func.count(Item.id)).scalar() or 0 subitem_count = db.query(func.count(SubItem.id)).scalar() or 0 container_item_count = ( db.query(func.count(Item.id)) .filter(Item.is_container.is_(True)) .scalar() or 0 ) return { "box_count": box_count, "item_count": item_count, "item_and_subitem_count": item_count + subitem_count, "avg_items_per_box": _format_average(item_count, box_count), "avg_subitems_per_container_item": _format_average( subitem_count, container_item_count, ), } def _build_search_results(db: Session, query: str) -> list[dict]: keyword = f"%{query.lower()}%" results: list[dict] = [] box_matches = ( db.query(Box) .filter( or_( func.lower(Box.name).like(keyword), func.lower(func.coalesce(Box.note, "")).like(keyword), ) ) .order_by(Box.id.desc()) .all() ) for box in box_matches: results.append( { "type": "Box", "name": box.name, "note": box.note, "detail_url": f"/boxes/{box.id}", "detail_label": "查看箱子", "secondary_url": None, "secondary_label": None, "path": "顶层箱子", "is_container": None, "image_url": f"/boxes/{box.id}/image" if box.image_blob else None, } ) item_matches = ( db.query(Item) .join(Item.box) .filter( or_( func.lower(Item.name).like(keyword), func.lower(func.coalesce(Item.note, "")).like(keyword), ) ) .order_by(Item.id.desc()) .all() ) for item in item_matches: results.append( { "type": "Item", "name": item.name, "note": item.note, "detail_url": f"/items/{item.id}", "detail_label": "查看物品", "secondary_url": f"/boxes/{item.box.id}", "secondary_label": "查看所属箱子", "path": f"位于箱子:{item.box.name}", "is_container": item.is_container, "image_url": f"/items/{item.id}/image" if item.image_blob else None, } ) subitem_matches = ( db.query(SubItem) .join(SubItem.parent_item) .join(Item.box) .filter( or_( func.lower(SubItem.name).like(keyword), func.lower(func.coalesce(SubItem.note, "")).like(keyword), ) ) .order_by(SubItem.id.desc()) .all() ) for subitem in subitem_matches: results.append( { "type": "SubItem", "name": subitem.name, "note": subitem.note, "detail_url": f"/items/{subitem.parent_item.id}", "detail_label": "查看所属物品", "secondary_url": f"/boxes/{subitem.parent_item.box.id}", "secondary_label": "查看所属箱子", "path": ( f"位于物品:{subitem.parent_item.name} / " f"箱子:{subitem.parent_item.box.name}" ), "is_container": None, "image_url": f"/subitems/{subitem.id}/image" if subitem.image_blob else None, } ) return results def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield app = FastAPI(title="搬家助手", lifespan=lifespan) app.mount("/static", StaticFiles(directory="app/static"), name="static") @app.get("/", include_in_schema=False) def root() -> RedirectResponse: return RedirectResponse(url="/boxes", status_code=status.HTTP_302_FOUND) @app.get("/manifest.webmanifest", include_in_schema=False) def manifest() -> FileResponse: return FileResponse( path=STATIC_DIR / "manifest.webmanifest", media_type="application/manifest+json", ) @app.get("/service-worker.js", include_in_schema=False) def service_worker() -> FileResponse: return FileResponse( path=STATIC_DIR / "service-worker.js", media_type="application/javascript", ) @app.get("/search") def search_page( request: Request, q: str | None = None, db: Session = Depends(get_db), ): query = (q or "").strip() results = _build_search_results(db, query) if query else [] return templates.TemplateResponse( request=request, name="search/index.html", context={ "page_title": "搜索", "query": query, "results": results, "searched": bool(query), }, ) @app.get("/boxes") def list_boxes(request: Request, db: Session = Depends(get_db)): boxes = db.query(Box).order_by(Box.id.desc()).all() summary = _build_boxes_overview_summary(db) return templates.TemplateResponse( request=request, name="boxes/index.html", context={"page_title": "箱子", "boxes": boxes, "summary": summary}, ) @app.get("/boxes/new") def new_box_page(request: Request): return templates.TemplateResponse( request=request, name="boxes/form.html", context={ "page_title": "新建箱子", "box": None, "form_action": "/boxes", "submit_label": "创建箱子", }, ) @app.post("/boxes") def create_box( name: str = Form(...), note: str | None = Form(default=None), room: str | None = Form(default=None), status_text: str | None = Form(default=None, alias="status"), image_file: UploadFile | None = File(default=None), db: Session = Depends(get_db), ) -> RedirectResponse: box = Box( name=name.strip(), note=_clean_text(note), room=_clean_text(room), status=_clean_text(status_text), ) _set_image_fields(box, process_upload(image_file)) db.add(box) db.commit() db.refresh(box) return RedirectResponse(url=f"/boxes/{box.id}", status_code=status.HTTP_303_SEE_OTHER) @app.get("/boxes/{box_id}") def show_box(box_id: int, request: Request, db: Session = Depends(get_db)): box = _get_box_or_404(db, box_id) return templates.TemplateResponse( request=request, name="boxes/show.html", context={"page_title": box.name, "box": box}, ) @app.get("/boxes/{box_id}/image") def get_box_image(box_id: int, db: Session = Depends(get_db)) -> Response: return _image_response_or_404(_get_box_or_404(db, box_id)) @app.get("/boxes/{box_id}/edit") def edit_box_page(box_id: int, request: Request, db: Session = Depends(get_db)): box = _get_box_or_404(db, box_id) return templates.TemplateResponse( request=request, name="boxes/form.html", context={ "page_title": f"编辑箱子:{box.name}", "box": box, "form_action": f"/boxes/{box.id}/update", "submit_label": "保存箱子", }, ) @app.post("/boxes/{box_id}/update") def update_box( box_id: int, name: str = Form(...), note: str | None = Form(default=None), room: str | None = Form(default=None), status_text: str | None = Form(default=None, alias="status"), image_file: UploadFile | None = File(default=None), db: Session = Depends(get_db), ) -> RedirectResponse: box = _get_box_or_404(db, box_id) box.name = name.strip() box.note = _clean_text(note) box.room = _clean_text(room) box.status = _clean_text(status_text) _set_image_fields(box, process_upload(image_file)) db.commit() return RedirectResponse(url=f"/boxes/{box.id}", status_code=status.HTTP_303_SEE_OTHER) @app.post("/boxes/{box_id}/image/delete") def delete_box_image(box_id: int, db: Session = Depends(get_db)) -> RedirectResponse: box = _get_box_or_404(db, box_id) _clear_image_fields(box) db.commit() return RedirectResponse(url=f"/boxes/{box.id}/edit", status_code=status.HTTP_303_SEE_OTHER) @app.post("/boxes/{box_id}/delete") def delete_box(box_id: int, db: Session = Depends(get_db)) -> RedirectResponse: box = _get_box_or_404(db, box_id) db.delete(box) db.commit() return RedirectResponse(url="/boxes", status_code=status.HTTP_303_SEE_OTHER) @app.get("/boxes/{box_id}/items/new") def new_item_page(box_id: int, request: Request, db: Session = Depends(get_db)): box = _get_box_or_404(db, box_id) return templates.TemplateResponse( request=request, name="items/form.html", context={ "page_title": f"为箱子“{box.name}”添加物品", "box": box, "item": None, "form_action": f"/boxes/{box.id}/items", "submit_label": "创建物品", }, ) @app.post("/boxes/{box_id}/items") def create_item( box_id: int, name: str = Form(...), note: str | None = Form(default=None), quantity: str | None = Form(default=None), is_container: str | None = Form(default=None), submit_action: str | None = Form(default=None), image_file: UploadFile | None = File(default=None), db: Session = Depends(get_db), ) -> RedirectResponse: box = _get_box_or_404(db, box_id) item = Item( box=box, name=name.strip(), note=_clean_text(note), quantity=_parse_quantity(quantity), is_container=_is_checked(is_container), ) _set_image_fields(item, process_upload(image_file)) db.add(item) db.commit() db.refresh(item) if _wants_add_next(submit_action): redirect_url = f"/boxes/{box.id}/items/new" elif item.is_container: redirect_url = f"/items/{item.id}" else: redirect_url = f"/boxes/{box.id}" return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER) @app.get("/items/{item_id}") def show_item(item_id: int, request: Request, db: Session = Depends(get_db)): item = _get_item_or_404(db, item_id) return templates.TemplateResponse( request=request, name="items/show.html", context={"page_title": item.name, "item": item}, ) @app.get("/items/{item_id}/image") def get_item_image(item_id: int, db: Session = Depends(get_db)) -> Response: return _image_response_or_404(_get_item_or_404(db, item_id)) @app.get("/items/{item_id}/edit") def edit_item_page(item_id: int, request: Request, db: Session = Depends(get_db)): item = _get_item_or_404(db, item_id) return templates.TemplateResponse( request=request, name="items/form.html", context={ "page_title": f"编辑物品:{item.name}", "box": item.box, "item": item, "form_action": f"/items/{item.id}/update", "submit_label": "保存物品", }, ) @app.post("/items/{item_id}/update") def update_item( item_id: int, name: str = Form(...), note: str | None = Form(default=None), quantity: str | None = Form(default=None), is_container: str | None = Form(default=None), image_file: UploadFile | None = File(default=None), db: Session = Depends(get_db), ) -> RedirectResponse: item = _get_item_or_404(db, item_id) item.name = name.strip() item.note = _clean_text(note) item.quantity = _parse_quantity(quantity) item.is_container = _is_checked(is_container) if not item.is_container: item.subitems.clear() _set_image_fields(item, process_upload(image_file)) db.commit() return RedirectResponse(url=f"/items/{item.id}", status_code=status.HTTP_303_SEE_OTHER) @app.post("/items/{item_id}/image/delete") def delete_item_image(item_id: int, db: Session = Depends(get_db)) -> RedirectResponse: item = _get_item_or_404(db, item_id) _clear_image_fields(item) db.commit() return RedirectResponse(url=f"/items/{item.id}/edit", status_code=status.HTTP_303_SEE_OTHER) @app.post("/items/{item_id}/delete") def delete_item(item_id: int, db: Session = Depends(get_db)) -> RedirectResponse: item = _get_item_or_404(db, item_id) box_id = item.box_id db.delete(item) db.commit() return RedirectResponse(url=f"/boxes/{box_id}", status_code=status.HTTP_303_SEE_OTHER) @app.get("/items/{item_id}/subitems/new") def new_subitem_page(item_id: int, request: Request, db: Session = Depends(get_db)): item = _get_item_or_404(db, item_id) _require_container_item(item) return templates.TemplateResponse( request=request, name="subitems/form.html", context={ "page_title": f"为“{item.name}”添加子物品", "item": item, "subitem": None, "form_action": f"/items/{item.id}/subitems", "submit_label": "创建子物品", }, ) @app.post("/items/{item_id}/subitems") def create_subitem( item_id: int, name: str = Form(...), note: str | None = Form(default=None), quantity: str | None = Form(default=None), submit_action: str | None = Form(default=None), image_file: UploadFile | None = File(default=None), db: Session = Depends(get_db), ) -> RedirectResponse: item = _get_item_or_404(db, item_id) _require_container_item(item) subitem = SubItem( parent_item=item, name=name.strip(), note=_clean_text(note), quantity=_parse_quantity(quantity), ) _set_image_fields(subitem, process_upload(image_file)) db.add(subitem) db.commit() db.refresh(subitem) if _wants_add_next(submit_action): redirect_url = f"/items/{item.id}/subitems/new" else: redirect_url = f"/items/{item.id}" return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER) @app.get("/subitems/{subitem_id}/image") def get_subitem_image(subitem_id: int, db: Session = Depends(get_db)) -> Response: return _image_response_or_404(_get_subitem_or_404(db, subitem_id)) @app.get("/subitems/{subitem_id}/edit") def edit_subitem_page(subitem_id: int, request: Request, db: Session = Depends(get_db)): subitem = _get_subitem_or_404(db, subitem_id) return templates.TemplateResponse( request=request, name="subitems/form.html", context={ "page_title": f"编辑子物品:{subitem.name}", "item": subitem.parent_item, "subitem": subitem, "form_action": f"/subitems/{subitem.id}/update", "submit_label": "保存子物品", }, ) @app.post("/subitems/{subitem_id}/update") def update_subitem( subitem_id: int, name: str = Form(...), note: str | None = Form(default=None), quantity: str | None = Form(default=None), image_file: UploadFile | None = File(default=None), db: Session = Depends(get_db), ) -> RedirectResponse: subitem = _get_subitem_or_404(db, subitem_id) subitem.name = name.strip() subitem.note = _clean_text(note) subitem.quantity = _parse_quantity(quantity) _set_image_fields(subitem, process_upload(image_file)) db.commit() return RedirectResponse( url=f"/items/{subitem.parent_item_id}", status_code=status.HTTP_303_SEE_OTHER, ) @app.post("/subitems/{subitem_id}/image/delete") def delete_subitem_image(subitem_id: int, db: Session = Depends(get_db)) -> RedirectResponse: subitem = _get_subitem_or_404(db, subitem_id) _clear_image_fields(subitem) db.commit() return RedirectResponse(url=f"/subitems/{subitem.id}/edit", status_code=status.HTTP_303_SEE_OTHER) @app.post("/subitems/{subitem_id}/delete") def delete_subitem(subitem_id: int, db: Session = Depends(get_db)) -> RedirectResponse: subitem = _get_subitem_or_404(db, subitem_id) item_id = subitem.parent_item_id db.delete(subitem) db.commit() return RedirectResponse(url=f"/items/{item_id}", status_code=status.HTTP_303_SEE_OTHER) return app app = create_app()