Files
2026-moving-helper/app/main.py
T
tliu93 ed1e3311a5
test / pytest (push) Successful in 37s
Add minimal installable PWA support
- serve manifest and service worker from the app root for install compatibility
- add manifest metadata, service worker registration, and Apple touch icon links to the base template
- add install icon assets for Android, iOS, and desktop install flows
- document deployment and validation notes for the new PWA support
- cover the new endpoints and template output with tests
2026-04-23 15:23:20 +02:00

546 lines
19 KiB
Python

from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
from fastapi.responses import FileResponse, RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.db import get_db, init_db
from app.images import process_upload
from app.models import Box, Item, SubItem
templates = Jinja2Templates(directory="app/templates")
STATIC_DIR = Path("app/static")
def _clean_text(value: str | None) -> str | None:
if value is None:
return None
cleaned = value.strip()
return cleaned or None
def _parse_quantity(value: str | None) -> int | None:
cleaned = _clean_text(value)
if cleaned is None:
return None
return int(cleaned)
def _is_checked(value: str | None) -> bool:
return value == "on"
def _get_box_or_404(db: Session, box_id: int) -> Box:
box = db.get(Box, box_id)
if box is None:
raise HTTPException(status_code=404, detail="Box not found")
return box
def _get_item_or_404(db: Session, item_id: int) -> Item:
item = db.get(Item, item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
def _get_subitem_or_404(db: Session, subitem_id: int) -> SubItem:
subitem = db.get(SubItem, subitem_id)
if subitem is None:
raise HTTPException(status_code=404, detail="Sub-item not found")
return subitem
def _require_container_item(item: Item) -> None:
if not item.is_container:
raise HTTPException(status_code=400, detail="Only container items can have sub-items")
def _set_image_fields(target, processed_image) -> None:
if processed_image is None:
return
target.image_blob = processed_image.blob
target.image_mime_type = processed_image.mime_type
target.image_width = processed_image.width
target.image_height = processed_image.height
def _clear_image_fields(target) -> None:
target.image_blob = None
target.image_mime_type = None
target.image_width = None
target.image_height = None
def _image_response_or_404(target) -> Response:
if target.image_blob is None or target.image_mime_type is None:
raise HTTPException(status_code=404, detail="Image not found")
return Response(content=target.image_blob, media_type=target.image_mime_type)
def _wants_add_next(submit_action: str | None) -> bool:
return submit_action == "save_and_add_next"
def _build_search_results(db: Session, query: str) -> list[dict]:
keyword = f"%{query.lower()}%"
results: list[dict] = []
box_matches = (
db.query(Box)
.filter(
or_(
func.lower(Box.name).like(keyword),
func.lower(func.coalesce(Box.note, "")).like(keyword),
)
)
.order_by(Box.id.desc())
.all()
)
for box in box_matches:
results.append(
{
"type": "Box",
"name": box.name,
"note": box.note,
"detail_url": f"/boxes/{box.id}",
"detail_label": "查看箱子",
"secondary_url": None,
"secondary_label": None,
"path": "顶层箱子",
"is_container": None,
"image_url": f"/boxes/{box.id}/image" if box.image_blob else None,
}
)
item_matches = (
db.query(Item)
.join(Item.box)
.filter(
or_(
func.lower(Item.name).like(keyword),
func.lower(func.coalesce(Item.note, "")).like(keyword),
)
)
.order_by(Item.id.desc())
.all()
)
for item in item_matches:
results.append(
{
"type": "Item",
"name": item.name,
"note": item.note,
"detail_url": f"/items/{item.id}",
"detail_label": "查看物品",
"secondary_url": f"/boxes/{item.box.id}",
"secondary_label": "查看所属箱子",
"path": f"位于箱子:{item.box.name}",
"is_container": item.is_container,
"image_url": f"/items/{item.id}/image" if item.image_blob else None,
}
)
subitem_matches = (
db.query(SubItem)
.join(SubItem.parent_item)
.join(Item.box)
.filter(
or_(
func.lower(SubItem.name).like(keyword),
func.lower(func.coalesce(SubItem.note, "")).like(keyword),
)
)
.order_by(SubItem.id.desc())
.all()
)
for subitem in subitem_matches:
results.append(
{
"type": "SubItem",
"name": subitem.name,
"note": subitem.note,
"detail_url": f"/items/{subitem.parent_item.id}",
"detail_label": "查看所属物品",
"secondary_url": f"/boxes/{subitem.parent_item.box.id}",
"secondary_label": "查看所属箱子",
"path": (
f"位于物品:{subitem.parent_item.name} / "
f"箱子:{subitem.parent_item.box.name}"
),
"is_container": None,
"image_url": f"/subitems/{subitem.id}/image" if subitem.image_blob else None,
}
)
return results
def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
yield
app = FastAPI(title="搬家助手", lifespan=lifespan)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
@app.get("/", include_in_schema=False)
def root() -> RedirectResponse:
return RedirectResponse(url="/boxes", status_code=status.HTTP_302_FOUND)
@app.get("/manifest.webmanifest", include_in_schema=False)
def manifest() -> FileResponse:
return FileResponse(
path=STATIC_DIR / "manifest.webmanifest",
media_type="application/manifest+json",
)
@app.get("/service-worker.js", include_in_schema=False)
def service_worker() -> FileResponse:
return FileResponse(
path=STATIC_DIR / "service-worker.js",
media_type="application/javascript",
)
@app.get("/search")
def search_page(
request: Request,
q: str | None = None,
db: Session = Depends(get_db),
):
query = (q or "").strip()
results = _build_search_results(db, query) if query else []
return templates.TemplateResponse(
request=request,
name="search/index.html",
context={
"page_title": "搜索",
"query": query,
"results": results,
"searched": bool(query),
},
)
@app.get("/boxes")
def list_boxes(request: Request, db: Session = Depends(get_db)):
boxes = db.query(Box).order_by(Box.id.desc()).all()
return templates.TemplateResponse(
request=request,
name="boxes/index.html",
context={"page_title": "箱子", "boxes": boxes},
)
@app.get("/boxes/new")
def new_box_page(request: Request):
return templates.TemplateResponse(
request=request,
name="boxes/form.html",
context={
"page_title": "新建箱子",
"box": None,
"form_action": "/boxes",
"submit_label": "创建箱子",
},
)
@app.post("/boxes")
def create_box(
name: str = Form(...),
note: str | None = Form(default=None),
room: str | None = Form(default=None),
status_text: str | None = Form(default=None, alias="status"),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
box = Box(
name=name.strip(),
note=_clean_text(note),
room=_clean_text(room),
status=_clean_text(status_text),
)
_set_image_fields(box, process_upload(image_file))
db.add(box)
db.commit()
db.refresh(box)
return RedirectResponse(url=f"/boxes/{box.id}", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/boxes/{box_id}")
def show_box(box_id: int, request: Request, db: Session = Depends(get_db)):
box = _get_box_or_404(db, box_id)
return templates.TemplateResponse(
request=request,
name="boxes/show.html",
context={"page_title": box.name, "box": box},
)
@app.get("/boxes/{box_id}/image")
def get_box_image(box_id: int, db: Session = Depends(get_db)) -> Response:
return _image_response_or_404(_get_box_or_404(db, box_id))
@app.get("/boxes/{box_id}/edit")
def edit_box_page(box_id: int, request: Request, db: Session = Depends(get_db)):
box = _get_box_or_404(db, box_id)
return templates.TemplateResponse(
request=request,
name="boxes/form.html",
context={
"page_title": f"编辑箱子:{box.name}",
"box": box,
"form_action": f"/boxes/{box.id}/update",
"submit_label": "保存箱子",
},
)
@app.post("/boxes/{box_id}/update")
def update_box(
box_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
room: str | None = Form(default=None),
status_text: str | None = Form(default=None, alias="status"),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
box.name = name.strip()
box.note = _clean_text(note)
box.room = _clean_text(room)
box.status = _clean_text(status_text)
_set_image_fields(box, process_upload(image_file))
db.commit()
return RedirectResponse(url=f"/boxes/{box.id}", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/boxes/{box_id}/image/delete")
def delete_box_image(box_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
_clear_image_fields(box)
db.commit()
return RedirectResponse(url=f"/boxes/{box.id}/edit", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/boxes/{box_id}/delete")
def delete_box(box_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
db.delete(box)
db.commit()
return RedirectResponse(url="/boxes", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/boxes/{box_id}/items/new")
def new_item_page(box_id: int, request: Request, db: Session = Depends(get_db)):
box = _get_box_or_404(db, box_id)
return templates.TemplateResponse(
request=request,
name="items/form.html",
context={
"page_title": f"为箱子“{box.name}”添加物品",
"box": box,
"item": None,
"form_action": f"/boxes/{box.id}/items",
"submit_label": "创建物品",
},
)
@app.post("/boxes/{box_id}/items")
def create_item(
box_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
is_container: str | None = Form(default=None),
submit_action: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
box = _get_box_or_404(db, box_id)
item = Item(
box=box,
name=name.strip(),
note=_clean_text(note),
quantity=_parse_quantity(quantity),
is_container=_is_checked(is_container),
)
_set_image_fields(item, process_upload(image_file))
db.add(item)
db.commit()
db.refresh(item)
if _wants_add_next(submit_action):
redirect_url = f"/boxes/{box.id}/items/new"
elif item.is_container:
redirect_url = f"/items/{item.id}"
else:
redirect_url = f"/boxes/{box.id}"
return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER)
@app.get("/items/{item_id}")
def show_item(item_id: int, request: Request, db: Session = Depends(get_db)):
item = _get_item_or_404(db, item_id)
return templates.TemplateResponse(
request=request,
name="items/show.html",
context={"page_title": item.name, "item": item},
)
@app.get("/items/{item_id}/image")
def get_item_image(item_id: int, db: Session = Depends(get_db)) -> Response:
return _image_response_or_404(_get_item_or_404(db, item_id))
@app.get("/items/{item_id}/edit")
def edit_item_page(item_id: int, request: Request, db: Session = Depends(get_db)):
item = _get_item_or_404(db, item_id)
return templates.TemplateResponse(
request=request,
name="items/form.html",
context={
"page_title": f"编辑物品:{item.name}",
"box": item.box,
"item": item,
"form_action": f"/items/{item.id}/update",
"submit_label": "保存物品",
},
)
@app.post("/items/{item_id}/update")
def update_item(
item_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
is_container: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
item.name = name.strip()
item.note = _clean_text(note)
item.quantity = _parse_quantity(quantity)
item.is_container = _is_checked(is_container)
if not item.is_container:
item.subitems.clear()
_set_image_fields(item, process_upload(image_file))
db.commit()
return RedirectResponse(url=f"/items/{item.id}", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/items/{item_id}/image/delete")
def delete_item_image(item_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
_clear_image_fields(item)
db.commit()
return RedirectResponse(url=f"/items/{item.id}/edit", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/items/{item_id}/delete")
def delete_item(item_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
box_id = item.box_id
db.delete(item)
db.commit()
return RedirectResponse(url=f"/boxes/{box_id}", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/items/{item_id}/subitems/new")
def new_subitem_page(item_id: int, request: Request, db: Session = Depends(get_db)):
item = _get_item_or_404(db, item_id)
_require_container_item(item)
return templates.TemplateResponse(
request=request,
name="subitems/form.html",
context={
"page_title": f"为“{item.name}”添加子物品",
"item": item,
"subitem": None,
"form_action": f"/items/{item.id}/subitems",
"submit_label": "创建子物品",
},
)
@app.post("/items/{item_id}/subitems")
def create_subitem(
item_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
submit_action: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
item = _get_item_or_404(db, item_id)
_require_container_item(item)
subitem = SubItem(
parent_item=item,
name=name.strip(),
note=_clean_text(note),
quantity=_parse_quantity(quantity),
)
_set_image_fields(subitem, process_upload(image_file))
db.add(subitem)
db.commit()
db.refresh(subitem)
if _wants_add_next(submit_action):
redirect_url = f"/items/{item.id}/subitems/new"
else:
redirect_url = f"/items/{item.id}"
return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER)
@app.get("/subitems/{subitem_id}/image")
def get_subitem_image(subitem_id: int, db: Session = Depends(get_db)) -> Response:
return _image_response_or_404(_get_subitem_or_404(db, subitem_id))
@app.get("/subitems/{subitem_id}/edit")
def edit_subitem_page(subitem_id: int, request: Request, db: Session = Depends(get_db)):
subitem = _get_subitem_or_404(db, subitem_id)
return templates.TemplateResponse(
request=request,
name="subitems/form.html",
context={
"page_title": f"编辑子物品:{subitem.name}",
"item": subitem.parent_item,
"subitem": subitem,
"form_action": f"/subitems/{subitem.id}/update",
"submit_label": "保存子物品",
},
)
@app.post("/subitems/{subitem_id}/update")
def update_subitem(
subitem_id: int,
name: str = Form(...),
note: str | None = Form(default=None),
quantity: str | None = Form(default=None),
image_file: UploadFile | None = File(default=None),
db: Session = Depends(get_db),
) -> RedirectResponse:
subitem = _get_subitem_or_404(db, subitem_id)
subitem.name = name.strip()
subitem.note = _clean_text(note)
subitem.quantity = _parse_quantity(quantity)
_set_image_fields(subitem, process_upload(image_file))
db.commit()
return RedirectResponse(
url=f"/items/{subitem.parent_item_id}",
status_code=status.HTTP_303_SEE_OTHER,
)
@app.post("/subitems/{subitem_id}/image/delete")
def delete_subitem_image(subitem_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
subitem = _get_subitem_or_404(db, subitem_id)
_clear_image_fields(subitem)
db.commit()
return RedirectResponse(url=f"/subitems/{subitem.id}/edit", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/subitems/{subitem_id}/delete")
def delete_subitem(subitem_id: int, db: Session = Depends(get_db)) -> RedirectResponse:
subitem = _get_subitem_or_404(db, subitem_id)
item_id = subitem.parent_item_id
db.delete(subitem)
db.commit()
return RedirectResponse(url=f"/items/{item_id}", status_code=status.HTTP_303_SEE_OTHER)
return app
app = create_app()