add support for heic images
This commit is contained in:
@@ -1,9 +1,21 @@
|
||||
from dataclasses import dataclass
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import subprocess
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from fastapi import HTTPException, UploadFile
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
|
||||
try:
|
||||
from pillow_heif import register_heif_opener
|
||||
|
||||
register_heif_opener()
|
||||
HEIF_SUPPORT_ENABLED = True
|
||||
except ImportError:
|
||||
HEIF_SUPPORT_ENABLED = False
|
||||
|
||||
|
||||
MAX_IMAGE_SIDE = 1600
|
||||
JPEG_QUALITY = 80
|
||||
@@ -22,6 +34,8 @@ def process_upload(file: UploadFile | None) -> ProcessedImage | None:
|
||||
if file is None or not file.filename:
|
||||
return None
|
||||
|
||||
suffix = Path(file.filename).suffix.lower()
|
||||
|
||||
try:
|
||||
raw_bytes = file.file.read()
|
||||
if not raw_bytes:
|
||||
@@ -30,6 +44,9 @@ def process_upload(file: UploadFile | None) -> ProcessedImage | None:
|
||||
with Image.open(BytesIO(raw_bytes)) as source_image:
|
||||
processed_image = _prepare_image(source_image)
|
||||
except UnidentifiedImageError as exc:
|
||||
if suffix in {".heic", ".heif"}:
|
||||
processed_image = _process_heic_with_fallback(raw_bytes, suffix)
|
||||
return processed_image
|
||||
raise HTTPException(status_code=400, detail="上传的文件不是合法图片") from exc
|
||||
except HTTPException:
|
||||
raise
|
||||
@@ -71,3 +88,34 @@ def _strip_metadata_and_convert(source_image: Image.Image) -> Image.Image:
|
||||
return source_image.convert("RGB")
|
||||
|
||||
return source_image.copy()
|
||||
|
||||
|
||||
def _process_heic_with_fallback(raw_bytes: bytes, suffix: str) -> ProcessedImage:
|
||||
if shutil.which("sips") is None:
|
||||
if HEIF_SUPPORT_ENABLED:
|
||||
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片")
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="当前环境无法处理 HEIC/HEIF,请先转换为 JPG 或 PNG 后再上传",
|
||||
)
|
||||
|
||||
with TemporaryDirectory() as temp_dir:
|
||||
source_path = Path(temp_dir) / f"upload{suffix}"
|
||||
output_path = Path(temp_dir) / "converted.jpg"
|
||||
source_path.write_bytes(raw_bytes)
|
||||
|
||||
result = subprocess.run(
|
||||
["sips", "-s", "format", "jpeg", str(source_path), "--out", str(output_path)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
if result.returncode != 0 or not output_path.exists():
|
||||
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片")
|
||||
|
||||
try:
|
||||
with Image.open(output_path) as converted_image:
|
||||
return _prepare_image(converted_image)
|
||||
except UnidentifiedImageError as exc:
|
||||
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片") from exc
|
||||
|
||||
@@ -2,7 +2,9 @@ from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
import app.db as db_module
|
||||
import app.images as images_module
|
||||
from PIL import Image
|
||||
from fastapi import UploadFile
|
||||
|
||||
from app.models import Box, Item, SubItem
|
||||
|
||||
@@ -572,6 +574,24 @@ def test_broken_image_processing_returns_400_and_keeps_image_fields_empty(client
|
||||
assert updated_box.image_height is None
|
||||
|
||||
|
||||
def test_heic_upload_returns_clear_error_if_heif_support_is_unavailable(monkeypatch):
|
||||
heic_file = UploadFile(filename="sample.heic", file=BytesIO(b"not-a-real-heic"))
|
||||
|
||||
def fake_open(*args, **kwargs):
|
||||
raise images_module.UnidentifiedImageError("cannot identify image file")
|
||||
|
||||
monkeypatch.setattr(images_module, "HEIF_SUPPORT_ENABLED", False)
|
||||
monkeypatch.setattr(images_module.Image, "open", fake_open)
|
||||
monkeypatch.setattr(images_module.shutil, "which", lambda command: None)
|
||||
|
||||
try:
|
||||
images_module.process_upload(heic_file)
|
||||
assert False, "Expected HEIC upload to raise HTTPException"
|
||||
except Exception as exc:
|
||||
assert getattr(exc, "status_code", None) == 400
|
||||
assert "HEIC/HEIF" in getattr(exc, "detail", "")
|
||||
|
||||
|
||||
def test_can_search_box_by_name(client, db_session):
|
||||
box = Box(name="冬季衣物箱")
|
||||
db_session.add(box)
|
||||
|
||||
Reference in New Issue
Block a user