Files

306 lines
9.2 KiB
Python
Raw Permalink Normal View History

2026-04-19 14:28:00 +02:00
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
from urllib.parse import urlparse
import requests
from requests import Response
from sqlalchemy.orm import Session
from app.db import init_db
from app.models import Box, Item, SubItem
NOTION_VERSION = "2026-03-11"
NOTION_API_BASE = "https://api.notion.com/v1"
@dataclass(slots=True)
class ParsedSubItem:
name: str
note: str | None = None
@dataclass(slots=True)
class ParsedItem:
name: str
note: str | None = None
is_container: bool = False
subitems: list[ParsedSubItem] = field(default_factory=list)
@dataclass(slots=True)
class ParsedBox:
name: str
note: str | None = None
items: list[ParsedItem] = field(default_factory=list)
@dataclass(slots=True)
class ImportSummary:
boxes: list[ParsedBox]
warnings: list[str] = field(default_factory=list)
@property
def box_count(self) -> int:
return len(self.boxes)
@property
def item_count(self) -> int:
return sum(len(box.items) for box in self.boxes)
@property
def container_item_count(self) -> int:
return sum(1 for box in self.boxes for item in box.items if item.is_container)
@property
def subitem_count(self) -> int:
return sum(len(item.subitems) for box in self.boxes for item in box.items)
class NotionClient:
def __init__(self, token: str):
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {token}",
"Notion-Version": NOTION_VERSION,
}
)
def list_block_children(self, block_id: str) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
next_cursor: str | None = None
while True:
params = {"page_size": 100}
if next_cursor:
params["start_cursor"] = next_cursor
response = self.session.get(
f"{NOTION_API_BASE}/blocks/{block_id}/children",
params=params,
timeout=30,
)
self._raise_for_status(response)
payload = response.json()
results.extend(payload.get("results", []))
if not payload.get("has_more"):
break
next_cursor = payload.get("next_cursor")
return results
def _raise_for_status(self, response: Response) -> None:
try:
response.raise_for_status()
except requests.HTTPError as exc:
message = response.text
raise RuntimeError(f"Notion API 请求失败: {response.status_code} {message}") from exc
def extract_page_id(page_url: str) -> str:
cleaned = page_url.strip()
parsed = urlparse(cleaned)
candidates = [segment for segment in parsed.path.split("/") if segment]
if parsed.fragment:
candidates.append(parsed.fragment)
matches: list[str] = []
pattern = re.compile(
r"([0-9a-fA-F]{32}|[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
)
for candidate in candidates:
matches.extend(pattern.findall(candidate))
if not matches:
raise ValueError("无法从 Notion 页面 URL 中提取 page id")
raw = matches[-1].replace("-", "").lower()
return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:]}"
def fetch_page_blocks(token: str, page_id: str) -> list[dict[str, Any]]:
client = NotionClient(token)
return _fetch_block_tree(client, page_id)
def _fetch_block_tree(client: NotionClient, block_id: str) -> list[dict[str, Any]]:
blocks = client.list_block_children(block_id)
for block in blocks:
if block.get("has_children"):
block["_children"] = _fetch_block_tree(client, block["id"])
else:
block["_children"] = []
return blocks
def parse_notion_blocks(blocks: list[dict[str, Any]]) -> ImportSummary:
boxes: list[ParsedBox] = []
warnings: list[str] = []
current_box: ParsedBox | None = None
for block in blocks:
block_type = block.get("type")
if block_type == "heading_2":
heading_text = extract_block_text(block)
if not heading_text:
warnings.append("发现空的 heading_2,已跳过")
continue
current_box = ParsedBox(name=heading_text)
boxes.append(current_box)
continue
if block_type == "bulleted_list_item":
if current_box is None:
warnings.append(
f"发现未归属到任何 heading_2 的一级 bullet{extract_block_text(block) or '[空文本]'}"
)
continue
parsed_item = _parse_item_block(block, warnings, level=1)
if parsed_item is not None:
current_box.items.append(parsed_item)
continue
warnings.extend(_warning_for_unsupported_block(block, level=0))
return ImportSummary(boxes=boxes, warnings=warnings)
def _parse_item_block(
block: dict[str, Any],
warnings: list[str],
*,
level: int,
) -> ParsedItem | None:
item_name = extract_block_text(block)
if not item_name:
warnings.append(f"发现空的 bullet(层级 {level}),已跳过")
return None
child_blocks = block.get("_children", [])
subitems: list[ParsedSubItem] = []
for child in child_blocks:
child_type = child.get("type")
if child_type == "bulleted_list_item":
child_name = extract_block_text(child)
if not child_name:
warnings.append(f"发现空的二级 bullet(父项:{item_name}),已跳过")
continue
subitems.append(ParsedSubItem(name=child_name))
if child.get("_children"):
warnings.append(
f"发现超出支持层级的三级内容(父项:{item_name} -> 子项:{child_name}),已忽略更深层级"
)
for deep_child in child["_children"]:
warnings.extend(_warning_for_unsupported_block(deep_child, level=3))
continue
warnings.extend(_warning_for_unsupported_block(child, level=2, parent_name=item_name))
return ParsedItem(
name=item_name,
is_container=bool(subitems),
subitems=subitems,
)
def _warning_for_unsupported_block(
block: dict[str, Any],
*,
level: int,
parent_name: str | None = None,
) -> list[str]:
block_type = block.get("type", "unknown")
text = extract_block_text(block) or "[无文本]"
prefix = f"层级 {level} block"
if parent_name:
prefix += f"(父项:{parent_name}"
if block_type in {"image", "file", "video", "audio", "pdf"}:
return [f"{prefix} 类型 {block_type} 已跳过(这版不导入图片或媒体):{text}"]
return [f"{prefix} 类型 {block_type} 未按导入规则处理,已跳过:{text}"]
def extract_block_text(block: dict[str, Any]) -> str:
block_type = block.get("type")
block_data = block.get(block_type, {}) if block_type else {}
rich_text = block_data.get("rich_text", [])
return "".join(part.get("plain_text", "") for part in rich_text).strip()
def print_summary(summary: ImportSummary) -> None:
print()
print("解析结果摘要")
print(f"- Box: {summary.box_count}")
print(f"- Item: {summary.item_count}")
print(f"- 其中容器型 Item: {summary.container_item_count}")
print(f"- SubItem: {summary.subitem_count}")
print(f"- Warnings: {len(summary.warnings)}")
print()
for box in summary.boxes:
container_names = [item.name for item in box.items if item.is_container]
print(f"[Box] {box.name}")
print(f" - Item 数量: {len(box.items)}")
if container_names:
print(f" - 容器型 Item: {', '.join(container_names)}")
for item in box.items:
if item.is_container:
print(f" * {item.name} -> SubItem {len(item.subitems)}")
if summary.warnings:
print()
print("Warnings")
for warning in summary.warnings:
print(f"- {warning}")
def apply_import(summary: ImportSummary, db: Session) -> dict[str, int]:
init_db()
created_boxes = 0
created_items = 0
created_subitems = 0
for parsed_box in summary.boxes:
box = Box(name=parsed_box.name, note=parsed_box.note)
db.add(box)
db.flush()
created_boxes += 1
for parsed_item in parsed_box.items:
item = Item(
box=box,
name=parsed_item.name,
note=parsed_item.note,
quantity=1,
is_container=parsed_item.is_container,
)
db.add(item)
db.flush()
created_items += 1
for parsed_subitem in parsed_item.subitems:
subitem = SubItem(
parent_item=item,
name=parsed_subitem.name,
note=parsed_subitem.note,
quantity=1,
)
db.add(subitem)
created_subitems += 1
db.commit()
return {
"boxes": created_boxes,
"items": created_items,
"subitems": created_subitems,
}