124 lines
3.9 KiB
Python
124 lines
3.9 KiB
Python
from dataclasses import dataclass
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
import shutil
|
|
import subprocess
|
|
from tempfile import TemporaryDirectory
|
|
|
|
from fastapi import HTTPException, UploadFile
|
|
from PIL import Image, ImageOps, UnidentifiedImageError
|
|
|
|
try:
|
|
from pillow_heif import register_heif_opener
|
|
|
|
register_heif_opener()
|
|
HEIF_SUPPORT_ENABLED = True
|
|
except ImportError:
|
|
HEIF_SUPPORT_ENABLED = False
|
|
|
|
|
|
MAX_IMAGE_SIDE = 1600
|
|
JPEG_QUALITY = 80
|
|
JPEG_MIME_TYPE = "image/jpeg"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ProcessedImage:
|
|
blob: bytes
|
|
mime_type: str
|
|
width: int
|
|
height: int
|
|
|
|
|
|
def process_upload(file: UploadFile | None) -> ProcessedImage | None:
|
|
if file is None or not file.filename:
|
|
return None
|
|
|
|
suffix = Path(file.filename).suffix.lower()
|
|
|
|
try:
|
|
raw_bytes = file.file.read()
|
|
if not raw_bytes:
|
|
raise HTTPException(status_code=400, detail="上传的图片内容为空")
|
|
|
|
with Image.open(BytesIO(raw_bytes)) as source_image:
|
|
processed_image = _prepare_image(source_image)
|
|
except UnidentifiedImageError as exc:
|
|
if suffix in {".heic", ".heif"}:
|
|
processed_image = _process_heic_with_fallback(raw_bytes, suffix)
|
|
return processed_image
|
|
raise HTTPException(status_code=400, detail="上传的文件不是合法图片") from exc
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail="图片处理失败,请尝试更换图片") from exc
|
|
finally:
|
|
file.file.close()
|
|
|
|
return processed_image
|
|
|
|
|
|
def _prepare_image(source_image: Image.Image) -> ProcessedImage:
|
|
# Normalize orientation from EXIF before resizing or stripping metadata.
|
|
prepared = ImageOps.exif_transpose(source_image)
|
|
prepared = _strip_metadata_and_convert(prepared)
|
|
prepared.thumbnail((MAX_IMAGE_SIDE, MAX_IMAGE_SIDE))
|
|
|
|
output = BytesIO()
|
|
prepared.save(output, format="JPEG", quality=JPEG_QUALITY, optimize=True)
|
|
blob = output.getvalue()
|
|
|
|
return ProcessedImage(
|
|
blob=blob,
|
|
mime_type=JPEG_MIME_TYPE,
|
|
width=prepared.width,
|
|
height=prepared.height,
|
|
)
|
|
|
|
|
|
def _strip_metadata_and_convert(source_image: Image.Image) -> Image.Image:
|
|
if source_image.mode in ("RGBA", "LA"):
|
|
background = Image.new("RGB", source_image.size, (255, 255, 255))
|
|
alpha = source_image.getchannel("A")
|
|
background.paste(source_image.convert("RGBA"), mask=alpha)
|
|
return background
|
|
|
|
if source_image.mode == "P":
|
|
return source_image.convert("RGB")
|
|
|
|
if source_image.mode != "RGB":
|
|
return source_image.convert("RGB")
|
|
|
|
return source_image.copy()
|
|
|
|
|
|
def _process_heic_with_fallback(raw_bytes: bytes, suffix: str) -> ProcessedImage:
|
|
if shutil.which("sips") is None:
|
|
if HEIF_SUPPORT_ENABLED:
|
|
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片")
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="当前环境无法处理 HEIC/HEIF,请先转换为 JPG 或 PNG 后再上传",
|
|
)
|
|
|
|
with TemporaryDirectory() as temp_dir:
|
|
source_path = Path(temp_dir) / f"upload{suffix}"
|
|
output_path = Path(temp_dir) / "converted.jpg"
|
|
source_path.write_bytes(raw_bytes)
|
|
|
|
result = subprocess.run(
|
|
["sips", "-s", "format", "jpeg", str(source_path), "--out", str(output_path)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
|
|
if result.returncode != 0 or not output_path.exists():
|
|
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片")
|
|
|
|
try:
|
|
with Image.open(output_path) as converted_image:
|
|
return _prepare_image(converted_image)
|
|
except UnidentifiedImageError as exc:
|
|
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片") from exc
|