add search function

This commit is contained in:
2026-04-19 13:00:11 +02:00
parent 5fdf3f4ab2
commit ea73b0c165
8 changed files with 380 additions and 1 deletions
+114
View File
@@ -4,6 +4,7 @@ from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, Upload
from fastapi.responses import RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.db import get_db, init_db
@@ -81,6 +82,100 @@ def _image_response_or_404(target) -> Response:
return Response(content=target.image_blob, media_type=target.image_mime_type)
def _build_search_results(db: Session, query: str) -> list[dict]:
keyword = f"%{query.lower()}%"
results: list[dict] = []
box_matches = (
db.query(Box)
.filter(
or_(
func.lower(Box.name).like(keyword),
func.lower(func.coalesce(Box.note, "")).like(keyword),
)
)
.order_by(Box.id.desc())
.all()
)
for box in box_matches:
results.append(
{
"type": "Box",
"name": box.name,
"note": box.note,
"detail_url": f"/boxes/{box.id}",
"detail_label": "查看箱子",
"secondary_url": None,
"secondary_label": None,
"path": "顶层箱子",
"is_container": None,
"image_url": f"/boxes/{box.id}/image" if box.image_blob else None,
}
)
item_matches = (
db.query(Item)
.join(Item.box)
.filter(
or_(
func.lower(Item.name).like(keyword),
func.lower(func.coalesce(Item.note, "")).like(keyword),
)
)
.order_by(Item.id.desc())
.all()
)
for item in item_matches:
results.append(
{
"type": "Item",
"name": item.name,
"note": item.note,
"detail_url": f"/items/{item.id}",
"detail_label": "查看物品",
"secondary_url": f"/boxes/{item.box.id}",
"secondary_label": "查看所属箱子",
"path": f"位于箱子:{item.box.name}",
"is_container": item.is_container,
"image_url": f"/items/{item.id}/image" if item.image_blob else None,
}
)
subitem_matches = (
db.query(SubItem)
.join(SubItem.parent_item)
.join(Item.box)
.filter(
or_(
func.lower(SubItem.name).like(keyword),
func.lower(func.coalesce(SubItem.note, "")).like(keyword),
)
)
.order_by(SubItem.id.desc())
.all()
)
for subitem in subitem_matches:
results.append(
{
"type": "SubItem",
"name": subitem.name,
"note": subitem.note,
"detail_url": f"/items/{subitem.parent_item.id}",
"detail_label": "查看所属物品",
"secondary_url": f"/boxes/{subitem.parent_item.box.id}",
"secondary_label": "查看所属箱子",
"path": (
f"位于物品:{subitem.parent_item.name} / "
f"箱子:{subitem.parent_item.box.name}"
),
"is_container": None,
"image_url": f"/subitems/{subitem.id}/image" if subitem.image_blob else None,
}
)
return results
def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -94,6 +189,25 @@ def create_app() -> FastAPI:
def root() -> RedirectResponse:
return RedirectResponse(url="/boxes", status_code=status.HTTP_302_FOUND)
@app.get("/search")
def search_page(
request: Request,
q: str | None = None,
db: Session = Depends(get_db),
):
query = (q or "").strip()
results = _build_search_results(db, query) if query else []
return templates.TemplateResponse(
request=request,
name="search/index.html",
context={
"page_title": "搜索",
"query": query,
"results": results,
"searched": bool(query),
},
)
@app.get("/boxes")
def list_boxes(request: Request, db: Session = Depends(get_db)):
boxes = db.query(Box).order_by(Box.id.desc()).all()