306 lines
9.2 KiB
Python
306 lines
9.2 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from typing import Any
|
||
from urllib.parse import urlparse
|
||
|
||
import requests
|
||
from requests import Response
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.db import init_db
|
||
from app.models import Box, Item, SubItem
|
||
|
||
NOTION_VERSION = "2026-03-11"
|
||
NOTION_API_BASE = "https://api.notion.com/v1"
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class ParsedSubItem:
|
||
name: str
|
||
note: str | None = None
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class ParsedItem:
|
||
name: str
|
||
note: str | None = None
|
||
is_container: bool = False
|
||
subitems: list[ParsedSubItem] = field(default_factory=list)
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class ParsedBox:
|
||
name: str
|
||
note: str | None = None
|
||
items: list[ParsedItem] = field(default_factory=list)
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class ImportSummary:
|
||
boxes: list[ParsedBox]
|
||
warnings: list[str] = field(default_factory=list)
|
||
|
||
@property
|
||
def box_count(self) -> int:
|
||
return len(self.boxes)
|
||
|
||
@property
|
||
def item_count(self) -> int:
|
||
return sum(len(box.items) for box in self.boxes)
|
||
|
||
@property
|
||
def container_item_count(self) -> int:
|
||
return sum(1 for box in self.boxes for item in box.items if item.is_container)
|
||
|
||
@property
|
||
def subitem_count(self) -> int:
|
||
return sum(len(item.subitems) for box in self.boxes for item in box.items)
|
||
|
||
|
||
class NotionClient:
|
||
def __init__(self, token: str):
|
||
self.session = requests.Session()
|
||
self.session.headers.update(
|
||
{
|
||
"Authorization": f"Bearer {token}",
|
||
"Notion-Version": NOTION_VERSION,
|
||
}
|
||
)
|
||
|
||
def list_block_children(self, block_id: str) -> list[dict[str, Any]]:
|
||
results: list[dict[str, Any]] = []
|
||
next_cursor: str | None = None
|
||
|
||
while True:
|
||
params = {"page_size": 100}
|
||
if next_cursor:
|
||
params["start_cursor"] = next_cursor
|
||
|
||
response = self.session.get(
|
||
f"{NOTION_API_BASE}/blocks/{block_id}/children",
|
||
params=params,
|
||
timeout=30,
|
||
)
|
||
self._raise_for_status(response)
|
||
payload = response.json()
|
||
results.extend(payload.get("results", []))
|
||
|
||
if not payload.get("has_more"):
|
||
break
|
||
next_cursor = payload.get("next_cursor")
|
||
|
||
return results
|
||
|
||
def _raise_for_status(self, response: Response) -> None:
|
||
try:
|
||
response.raise_for_status()
|
||
except requests.HTTPError as exc:
|
||
message = response.text
|
||
raise RuntimeError(f"Notion API 请求失败: {response.status_code} {message}") from exc
|
||
|
||
|
||
def extract_page_id(page_url: str) -> str:
|
||
cleaned = page_url.strip()
|
||
parsed = urlparse(cleaned)
|
||
candidates = [segment for segment in parsed.path.split("/") if segment]
|
||
if parsed.fragment:
|
||
candidates.append(parsed.fragment)
|
||
|
||
matches: list[str] = []
|
||
pattern = re.compile(
|
||
r"([0-9a-fA-F]{32}|[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
|
||
)
|
||
for candidate in candidates:
|
||
matches.extend(pattern.findall(candidate))
|
||
|
||
if not matches:
|
||
raise ValueError("无法从 Notion 页面 URL 中提取 page id")
|
||
|
||
raw = matches[-1].replace("-", "").lower()
|
||
return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:]}"
|
||
|
||
|
||
def fetch_page_blocks(token: str, page_id: str) -> list[dict[str, Any]]:
|
||
client = NotionClient(token)
|
||
return _fetch_block_tree(client, page_id)
|
||
|
||
|
||
def _fetch_block_tree(client: NotionClient, block_id: str) -> list[dict[str, Any]]:
|
||
blocks = client.list_block_children(block_id)
|
||
for block in blocks:
|
||
if block.get("has_children"):
|
||
block["_children"] = _fetch_block_tree(client, block["id"])
|
||
else:
|
||
block["_children"] = []
|
||
return blocks
|
||
|
||
|
||
def parse_notion_blocks(blocks: list[dict[str, Any]]) -> ImportSummary:
|
||
boxes: list[ParsedBox] = []
|
||
warnings: list[str] = []
|
||
current_box: ParsedBox | None = None
|
||
|
||
for block in blocks:
|
||
block_type = block.get("type")
|
||
|
||
if block_type == "heading_2":
|
||
heading_text = extract_block_text(block)
|
||
if not heading_text:
|
||
warnings.append("发现空的 heading_2,已跳过")
|
||
continue
|
||
current_box = ParsedBox(name=heading_text)
|
||
boxes.append(current_box)
|
||
continue
|
||
|
||
if block_type == "bulleted_list_item":
|
||
if current_box is None:
|
||
warnings.append(
|
||
f"发现未归属到任何 heading_2 的一级 bullet:{extract_block_text(block) or '[空文本]'}"
|
||
)
|
||
continue
|
||
parsed_item = _parse_item_block(block, warnings, level=1)
|
||
if parsed_item is not None:
|
||
current_box.items.append(parsed_item)
|
||
continue
|
||
|
||
warnings.extend(_warning_for_unsupported_block(block, level=0))
|
||
|
||
return ImportSummary(boxes=boxes, warnings=warnings)
|
||
|
||
|
||
def _parse_item_block(
|
||
block: dict[str, Any],
|
||
warnings: list[str],
|
||
*,
|
||
level: int,
|
||
) -> ParsedItem | None:
|
||
item_name = extract_block_text(block)
|
||
if not item_name:
|
||
warnings.append(f"发现空的 bullet(层级 {level}),已跳过")
|
||
return None
|
||
|
||
child_blocks = block.get("_children", [])
|
||
subitems: list[ParsedSubItem] = []
|
||
|
||
for child in child_blocks:
|
||
child_type = child.get("type")
|
||
if child_type == "bulleted_list_item":
|
||
child_name = extract_block_text(child)
|
||
if not child_name:
|
||
warnings.append(f"发现空的二级 bullet(父项:{item_name}),已跳过")
|
||
continue
|
||
subitems.append(ParsedSubItem(name=child_name))
|
||
|
||
if child.get("_children"):
|
||
warnings.append(
|
||
f"发现超出支持层级的三级内容(父项:{item_name} -> 子项:{child_name}),已忽略更深层级"
|
||
)
|
||
for deep_child in child["_children"]:
|
||
warnings.extend(_warning_for_unsupported_block(deep_child, level=3))
|
||
continue
|
||
|
||
warnings.extend(_warning_for_unsupported_block(child, level=2, parent_name=item_name))
|
||
|
||
return ParsedItem(
|
||
name=item_name,
|
||
is_container=bool(subitems),
|
||
subitems=subitems,
|
||
)
|
||
|
||
|
||
def _warning_for_unsupported_block(
|
||
block: dict[str, Any],
|
||
*,
|
||
level: int,
|
||
parent_name: str | None = None,
|
||
) -> list[str]:
|
||
block_type = block.get("type", "unknown")
|
||
text = extract_block_text(block) or "[无文本]"
|
||
prefix = f"层级 {level} block"
|
||
if parent_name:
|
||
prefix += f"(父项:{parent_name})"
|
||
|
||
if block_type in {"image", "file", "video", "audio", "pdf"}:
|
||
return [f"{prefix} 类型 {block_type} 已跳过(这版不导入图片或媒体):{text}"]
|
||
|
||
return [f"{prefix} 类型 {block_type} 未按导入规则处理,已跳过:{text}"]
|
||
|
||
|
||
def extract_block_text(block: dict[str, Any]) -> str:
|
||
block_type = block.get("type")
|
||
block_data = block.get(block_type, {}) if block_type else {}
|
||
rich_text = block_data.get("rich_text", [])
|
||
return "".join(part.get("plain_text", "") for part in rich_text).strip()
|
||
|
||
|
||
def print_summary(summary: ImportSummary) -> None:
|
||
print()
|
||
print("解析结果摘要")
|
||
print(f"- Box: {summary.box_count}")
|
||
print(f"- Item: {summary.item_count}")
|
||
print(f"- 其中容器型 Item: {summary.container_item_count}")
|
||
print(f"- SubItem: {summary.subitem_count}")
|
||
print(f"- Warnings: {len(summary.warnings)}")
|
||
print()
|
||
|
||
for box in summary.boxes:
|
||
container_names = [item.name for item in box.items if item.is_container]
|
||
print(f"[Box] {box.name}")
|
||
print(f" - Item 数量: {len(box.items)}")
|
||
if container_names:
|
||
print(f" - 容器型 Item: {', '.join(container_names)}")
|
||
for item in box.items:
|
||
if item.is_container:
|
||
print(f" * {item.name} -> SubItem {len(item.subitems)} 个")
|
||
|
||
if summary.warnings:
|
||
print()
|
||
print("Warnings")
|
||
for warning in summary.warnings:
|
||
print(f"- {warning}")
|
||
|
||
|
||
def apply_import(summary: ImportSummary, db: Session) -> dict[str, int]:
|
||
init_db()
|
||
|
||
created_boxes = 0
|
||
created_items = 0
|
||
created_subitems = 0
|
||
|
||
for parsed_box in summary.boxes:
|
||
box = Box(name=parsed_box.name, note=parsed_box.note)
|
||
db.add(box)
|
||
db.flush()
|
||
created_boxes += 1
|
||
|
||
for parsed_item in parsed_box.items:
|
||
item = Item(
|
||
box=box,
|
||
name=parsed_item.name,
|
||
note=parsed_item.note,
|
||
quantity=1,
|
||
is_container=parsed_item.is_container,
|
||
)
|
||
db.add(item)
|
||
db.flush()
|
||
created_items += 1
|
||
|
||
for parsed_subitem in parsed_item.subitems:
|
||
subitem = SubItem(
|
||
parent_item=item,
|
||
name=parsed_subitem.name,
|
||
note=parsed_subitem.note,
|
||
quantity=1,
|
||
)
|
||
db.add(subitem)
|
||
created_subitems += 1
|
||
|
||
db.commit()
|
||
return {
|
||
"boxes": created_boxes,
|
||
"items": created_items,
|
||
"subitems": created_subitems,
|
||
}
|