Files
2026-04-19 14:47:18 +02:00

124 lines
3.9 KiB
Python

from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
import shutil
import subprocess
from tempfile import TemporaryDirectory
from fastapi import HTTPException, UploadFile
from PIL import Image, ImageOps, UnidentifiedImageError
try:
from pillow_heif import register_heif_opener
register_heif_opener()
HEIF_SUPPORT_ENABLED = True
except ImportError:
HEIF_SUPPORT_ENABLED = False
MAX_IMAGE_SIDE = 1600
JPEG_QUALITY = 80
JPEG_MIME_TYPE = "image/jpeg"
@dataclass(slots=True)
class ProcessedImage:
blob: bytes
mime_type: str
width: int
height: int
def process_upload(file: UploadFile | None) -> ProcessedImage | None:
if file is None or not file.filename:
return None
suffix = Path(file.filename).suffix.lower()
try:
raw_bytes = file.file.read()
if not raw_bytes:
raise HTTPException(status_code=400, detail="上传的图片内容为空")
with Image.open(BytesIO(raw_bytes)) as source_image:
processed_image = _prepare_image(source_image)
except UnidentifiedImageError as exc:
if suffix in {".heic", ".heif"}:
processed_image = _process_heic_with_fallback(raw_bytes, suffix)
return processed_image
raise HTTPException(status_code=400, detail="上传的文件不是合法图片") from exc
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=400, detail="图片处理失败,请尝试更换图片") from exc
finally:
file.file.close()
return processed_image
def _prepare_image(source_image: Image.Image) -> ProcessedImage:
# Normalize orientation from EXIF before resizing or stripping metadata.
prepared = ImageOps.exif_transpose(source_image)
prepared = _strip_metadata_and_convert(prepared)
prepared.thumbnail((MAX_IMAGE_SIDE, MAX_IMAGE_SIDE))
output = BytesIO()
prepared.save(output, format="JPEG", quality=JPEG_QUALITY, optimize=True)
blob = output.getvalue()
return ProcessedImage(
blob=blob,
mime_type=JPEG_MIME_TYPE,
width=prepared.width,
height=prepared.height,
)
def _strip_metadata_and_convert(source_image: Image.Image) -> Image.Image:
if source_image.mode in ("RGBA", "LA"):
background = Image.new("RGB", source_image.size, (255, 255, 255))
alpha = source_image.getchannel("A")
background.paste(source_image.convert("RGBA"), mask=alpha)
return background
if source_image.mode == "P":
return source_image.convert("RGB")
if source_image.mode != "RGB":
return source_image.convert("RGB")
return source_image.copy()
def _process_heic_with_fallback(raw_bytes: bytes, suffix: str) -> ProcessedImage:
if shutil.which("sips") is None:
if HEIF_SUPPORT_ENABLED:
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片")
raise HTTPException(
status_code=400,
detail="当前环境无法处理 HEIC/HEIF,请先转换为 JPG 或 PNG 后再上传",
)
with TemporaryDirectory() as temp_dir:
source_path = Path(temp_dir) / f"upload{suffix}"
output_path = Path(temp_dir) / "converted.jpg"
source_path.write_bytes(raw_bytes)
result = subprocess.run(
["sips", "-s", "format", "jpeg", str(source_path), "--out", str(output_path)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0 or not output_path.exists():
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片")
try:
with Image.open(output_path) as converted_image:
return _prepare_image(converted_image)
except UnidentifiedImageError as exc:
raise HTTPException(status_code=400, detail="HEIC/HEIF 图片处理失败,请尝试更换图片") from exc