diff --git a/README.md b/README.md index 046b8a2..d693117 100644 --- a/README.md +++ b/README.md @@ -393,3 +393,50 @@ python -m pytest - 全局搜索 name / note - 创建后的重定向行为 - 关键页面结构和 UX 文案 + +## 一次性 Notion 导入 + +项目内附带了一个一次性迁移脚本: + +```bash +python scripts/import_notion.py --dry-run +python scripts/import_notion.py --apply +``` + +说明: + +- 这是一次性 migration / import 工具,不是长期同步功能 +- 运行时会交互要求输入: + - Notion API token + - Notion 页面完整 URL +- `--dry-run` 只读取和解析,不写数据库 +- `--apply` 会真正写入当前 SQLite 数据库 +- 建议导入前先备份 `data/app.db` + +### 当前支持的 Notion 结构映射 + +- `heading_2` -> `Box` +- 某个 `heading_2` 下的一级 bullet -> `Item` +- 如果一级 bullet 下还有二级 bullet: + - 一级 bullet -> 容器型 `Item` + - 二级 bullet -> `SubItem` + +当前最大只处理到这个层级: + +```text +heading_2 +└── 一级 bullet + └── 二级 bullet +``` + +更深层级会在日志中提示,但不会继续扩展成无限树。 + +### 这一版不导入图片 + +这一版导入脚本: + +- 不下载图片 +- 不导入图片 +- 遇到图片或其他媒体 block 时会提示已跳过 + +图片后续可以在应用里手动补录。 diff --git a/app/notion_import.py b/app/notion_import.py new file mode 100644 index 0000000..9bcbac0 --- /dev/null +++ b/app/notion_import.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Any +from urllib.parse import urlparse + +import requests +from requests import Response +from sqlalchemy.orm import Session + +from app.db import init_db +from app.models import Box, Item, SubItem + +NOTION_VERSION = "2026-03-11" +NOTION_API_BASE = "https://api.notion.com/v1" + + +@dataclass(slots=True) +class ParsedSubItem: + name: str + note: str | None = None + + +@dataclass(slots=True) +class ParsedItem: + name: str + note: str | None = None + is_container: bool = False + subitems: list[ParsedSubItem] = field(default_factory=list) + + +@dataclass(slots=True) +class ParsedBox: + name: str + note: str | None = None + items: list[ParsedItem] = field(default_factory=list) + + +@dataclass(slots=True) +class ImportSummary: + boxes: list[ParsedBox] + warnings: list[str] = field(default_factory=list) + + @property + def box_count(self) -> int: + return len(self.boxes) + + @property + def item_count(self) -> int: + return sum(len(box.items) for box in self.boxes) + + @property + def container_item_count(self) -> int: + return sum(1 for box in self.boxes for item in box.items if item.is_container) + + @property + def subitem_count(self) -> int: + return sum(len(item.subitems) for box in self.boxes for item in box.items) + + +class NotionClient: + def __init__(self, token: str): + self.session = requests.Session() + self.session.headers.update( + { + "Authorization": f"Bearer {token}", + "Notion-Version": NOTION_VERSION, + } + ) + + def list_block_children(self, block_id: str) -> list[dict[str, Any]]: + results: list[dict[str, Any]] = [] + next_cursor: str | None = None + + while True: + params = {"page_size": 100} + if next_cursor: + params["start_cursor"] = next_cursor + + response = self.session.get( + f"{NOTION_API_BASE}/blocks/{block_id}/children", + params=params, + timeout=30, + ) + self._raise_for_status(response) + payload = response.json() + results.extend(payload.get("results", [])) + + if not payload.get("has_more"): + break + next_cursor = payload.get("next_cursor") + + return results + + def _raise_for_status(self, response: Response) -> None: + try: + response.raise_for_status() + except requests.HTTPError as exc: + message = response.text + raise RuntimeError(f"Notion API 请求失败: {response.status_code} {message}") from exc + + +def extract_page_id(page_url: str) -> str: + cleaned = page_url.strip() + parsed = urlparse(cleaned) + candidates = [segment for segment in parsed.path.split("/") if segment] + if parsed.fragment: + candidates.append(parsed.fragment) + + matches: list[str] = [] + pattern = re.compile( + r"([0-9a-fA-F]{32}|[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" + ) + for candidate in candidates: + matches.extend(pattern.findall(candidate)) + + if not matches: + raise ValueError("无法从 Notion 页面 URL 中提取 page id") + + raw = matches[-1].replace("-", "").lower() + return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:]}" + + +def fetch_page_blocks(token: str, page_id: str) -> list[dict[str, Any]]: + client = NotionClient(token) + return _fetch_block_tree(client, page_id) + + +def _fetch_block_tree(client: NotionClient, block_id: str) -> list[dict[str, Any]]: + blocks = client.list_block_children(block_id) + for block in blocks: + if block.get("has_children"): + block["_children"] = _fetch_block_tree(client, block["id"]) + else: + block["_children"] = [] + return blocks + + +def parse_notion_blocks(blocks: list[dict[str, Any]]) -> ImportSummary: + boxes: list[ParsedBox] = [] + warnings: list[str] = [] + current_box: ParsedBox | None = None + + for block in blocks: + block_type = block.get("type") + + if block_type == "heading_2": + heading_text = extract_block_text(block) + if not heading_text: + warnings.append("发现空的 heading_2,已跳过") + continue + current_box = ParsedBox(name=heading_text) + boxes.append(current_box) + continue + + if block_type == "bulleted_list_item": + if current_box is None: + warnings.append( + f"发现未归属到任何 heading_2 的一级 bullet:{extract_block_text(block) or '[空文本]'}" + ) + continue + parsed_item = _parse_item_block(block, warnings, level=1) + if parsed_item is not None: + current_box.items.append(parsed_item) + continue + + warnings.extend(_warning_for_unsupported_block(block, level=0)) + + return ImportSummary(boxes=boxes, warnings=warnings) + + +def _parse_item_block( + block: dict[str, Any], + warnings: list[str], + *, + level: int, +) -> ParsedItem | None: + item_name = extract_block_text(block) + if not item_name: + warnings.append(f"发现空的 bullet(层级 {level}),已跳过") + return None + + child_blocks = block.get("_children", []) + subitems: list[ParsedSubItem] = [] + + for child in child_blocks: + child_type = child.get("type") + if child_type == "bulleted_list_item": + child_name = extract_block_text(child) + if not child_name: + warnings.append(f"发现空的二级 bullet(父项:{item_name}),已跳过") + continue + subitems.append(ParsedSubItem(name=child_name)) + + if child.get("_children"): + warnings.append( + f"发现超出支持层级的三级内容(父项:{item_name} -> 子项:{child_name}),已忽略更深层级" + ) + for deep_child in child["_children"]: + warnings.extend(_warning_for_unsupported_block(deep_child, level=3)) + continue + + warnings.extend(_warning_for_unsupported_block(child, level=2, parent_name=item_name)) + + return ParsedItem( + name=item_name, + is_container=bool(subitems), + subitems=subitems, + ) + + +def _warning_for_unsupported_block( + block: dict[str, Any], + *, + level: int, + parent_name: str | None = None, +) -> list[str]: + block_type = block.get("type", "unknown") + text = extract_block_text(block) or "[无文本]" + prefix = f"层级 {level} block" + if parent_name: + prefix += f"(父项:{parent_name})" + + if block_type in {"image", "file", "video", "audio", "pdf"}: + return [f"{prefix} 类型 {block_type} 已跳过(这版不导入图片或媒体):{text}"] + + return [f"{prefix} 类型 {block_type} 未按导入规则处理,已跳过:{text}"] + + +def extract_block_text(block: dict[str, Any]) -> str: + block_type = block.get("type") + block_data = block.get(block_type, {}) if block_type else {} + rich_text = block_data.get("rich_text", []) + return "".join(part.get("plain_text", "") for part in rich_text).strip() + + +def print_summary(summary: ImportSummary) -> None: + print() + print("解析结果摘要") + print(f"- Box: {summary.box_count}") + print(f"- Item: {summary.item_count}") + print(f"- 其中容器型 Item: {summary.container_item_count}") + print(f"- SubItem: {summary.subitem_count}") + print(f"- Warnings: {len(summary.warnings)}") + print() + + for box in summary.boxes: + container_names = [item.name for item in box.items if item.is_container] + print(f"[Box] {box.name}") + print(f" - Item 数量: {len(box.items)}") + if container_names: + print(f" - 容器型 Item: {', '.join(container_names)}") + for item in box.items: + if item.is_container: + print(f" * {item.name} -> SubItem {len(item.subitems)} 个") + + if summary.warnings: + print() + print("Warnings") + for warning in summary.warnings: + print(f"- {warning}") + + +def apply_import(summary: ImportSummary, db: Session) -> dict[str, int]: + init_db() + + created_boxes = 0 + created_items = 0 + created_subitems = 0 + + for parsed_box in summary.boxes: + box = Box(name=parsed_box.name, note=parsed_box.note) + db.add(box) + db.flush() + created_boxes += 1 + + for parsed_item in parsed_box.items: + item = Item( + box=box, + name=parsed_item.name, + note=parsed_item.note, + quantity=1, + is_container=parsed_item.is_container, + ) + db.add(item) + db.flush() + created_items += 1 + + for parsed_subitem in parsed_item.subitems: + subitem = SubItem( + parent_item=item, + name=parsed_subitem.name, + note=parsed_subitem.note, + quantity=1, + ) + db.add(subitem) + created_subitems += 1 + + db.commit() + return { + "boxes": created_boxes, + "items": created_items, + "subitems": created_subitems, + } diff --git a/requirements.txt b/requirements.txt index c79d4ff..5ab2879 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,6 @@ jinja2==3.1.6 sqlalchemy==2.0.43 python-multipart==0.0.20 pillow==11.2.1 +requests==2.32.3 pytest==8.4.1 httpx==0.28.1 diff --git a/scripts/import_notion.py b/scripts/import_notion.py new file mode 100755 index 0000000..628adb4 --- /dev/null +++ b/scripts/import_notion.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import getpass +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from app.db import SessionLocal, configure_database +from app.notion_import import ( + apply_import, + extract_page_id, + fetch_page_blocks, + parse_notion_blocks, + print_summary, +) + + +def main() -> int: + parser = argparse.ArgumentParser(description="一次性导入 Notion 搬家记录到当前 SQLite 数据库") + parser.add_argument("--dry-run", action="store_true", help="只解析,不写数据库") + parser.add_argument("--apply", action="store_true", help="真正写入数据库") + args = parser.parse_args() + + mode = _resolve_mode(args) + + token = getpass.getpass("请输入 Notion API token: ").strip() + if not token: + print("未输入 token,已退出") + return 1 + + page_url = input("请输入 Notion 页面完整 URL: ").strip() + if not page_url: + print("未输入页面 URL,已退出") + return 1 + + try: + page_id = extract_page_id(page_url) + except ValueError as exc: + print(f"页面 URL 无法识别: {exc}") + return 1 + + print() + print(f"正在读取 Notion page: {page_id}") + try: + blocks = fetch_page_blocks(token, page_id) + except Exception as exc: + print(f"读取 Notion page 失败: {exc}") + return 1 + + print(f"已读取顶层及嵌套 blocks,总数约 {count_blocks(blocks)} 个") + print("正在解析页面结构...") + summary = parse_notion_blocks(blocks) + print_summary(summary) + + if mode == "dry-run": + print() + print("dry-run 完成,未写入数据库。") + return 0 + + print() + print("这是一次性导入脚本,不建议在同一数据库上重复执行。") + print("建议先备份当前 SQLite 数据库,再继续。") + confirmed = input("确认执行导入?输入 yes 继续: ").strip().lower() + if confirmed != "yes": + print("已取消导入。") + return 0 + + configure_database() + db = SessionLocal() + try: + counts = apply_import(summary, db) + except Exception as exc: + db.rollback() + print(f"导入失败,已回滚: {exc}") + return 1 + finally: + db.close() + + print() + print("导入完成") + print(f"- 写入 Box: {counts['boxes']}") + print(f"- 写入 Item: {counts['items']}") + print(f"- 写入 SubItem: {counts['subitems']}") + return 0 + + +def _resolve_mode(args: argparse.Namespace) -> str: + if args.apply and args.dry_run: + raise SystemExit("请只选择一种模式:--dry-run 或 --apply") + if args.apply: + return "apply" + return "dry-run" + + +def count_blocks(blocks: list[dict]) -> int: + total = 0 + for block in blocks: + total += 1 + total += count_blocks(block.get("_children", [])) + return total + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_notion_import.py b/tests/test_notion_import.py new file mode 100644 index 0000000..7adbdf3 --- /dev/null +++ b/tests/test_notion_import.py @@ -0,0 +1,143 @@ +from app.models import Box, Item, SubItem +from app.notion_import import ( + ImportSummary, + ParsedBox, + ParsedItem, + ParsedSubItem, + apply_import, + extract_page_id, + parse_notion_blocks, +) + + +def make_heading_2(text: str) -> dict: + return { + "type": "heading_2", + "heading_2": {"rich_text": [{"plain_text": text}]}, + "_children": [], + } + + +def make_bullet(text: str, children: list[dict] | None = None) -> dict: + return { + "type": "bulleted_list_item", + "bulleted_list_item": {"rich_text": [{"plain_text": text}]}, + "_children": children or [], + } + + +def make_image_block() -> dict: + return {"type": "image", "image": {}, "_children": []} + + +def test_extract_page_id_from_notion_url(): + url = "https://www.notion.so/workspace/My-Page-1234567890abcdef1234567890abcdef?pvs=4" + + page_id = extract_page_id(url) + + assert page_id == "12345678-90ab-cdef-1234-567890abcdef" + + +def test_parse_heading_2_as_box(): + summary = parse_notion_blocks([make_heading_2("厨房箱")]) + + assert summary.box_count == 1 + assert summary.boxes[0].name == "厨房箱" + + +def test_parse_first_level_bullet_as_item(): + blocks = [make_heading_2("客厅箱"), make_bullet("锅具")] + + summary = parse_notion_blocks(blocks) + + assert summary.item_count == 1 + assert summary.boxes[0].items[0].name == "锅具" + assert summary.boxes[0].items[0].is_container is False + + +def test_parse_bullet_with_children_as_container_item_and_subitems(): + blocks = [ + make_heading_2("电子箱"), + make_bullet("配件盒", children=[make_bullet("USB 线"), make_bullet("转接头")]), + ] + + summary = parse_notion_blocks(blocks) + + item = summary.boxes[0].items[0] + assert item.name == "配件盒" + assert item.is_container is True + assert [subitem.name for subitem in item.subitems] == ["USB 线", "转接头"] + + +def test_parse_second_level_bullets_as_subitems(): + blocks = [ + make_heading_2("文件箱"), + make_bullet("文件袋", children=[make_bullet("合同"), make_bullet("护照复印件")]), + ] + + summary = parse_notion_blocks(blocks) + + assert summary.subitem_count == 2 + assert summary.boxes[0].items[0].subitems[1].name == "护照复印件" + + +def test_parse_deeper_than_supported_levels_adds_warning(): + blocks = [ + make_heading_2("测试箱"), + make_bullet( + "外层袋", + children=[make_bullet("内层物品", children=[make_bullet("更深一层")])], + ), + ] + + summary = parse_notion_blocks(blocks) + + assert summary.container_item_count == 1 + assert any("超出支持层级" in warning for warning in summary.warnings) + + +def test_parse_non_text_media_block_adds_skip_warning(): + blocks = [make_heading_2("照片箱"), make_image_block()] + + summary = parse_notion_blocks(blocks) + + assert any("这版不导入图片或媒体" in warning for warning in summary.warnings) + + +def test_dry_run_parse_does_not_write_database(db_session): + blocks = [make_heading_2("厨房箱"), make_bullet("锅")] + + summary = parse_notion_blocks(blocks) + + assert summary.box_count == 1 + assert db_session.query(Box).count() == 0 + assert db_session.query(Item).count() == 0 + assert db_session.query(SubItem).count() == 0 + + +def test_apply_import_writes_expected_structure(db_session): + summary = ImportSummary( + boxes=[ + ParsedBox( + name="主卧箱", + items=[ + ParsedItem(name="衣服", is_container=False), + ParsedItem( + name="收纳袋", + is_container=True, + subitems=[ParsedSubItem(name="袜子"), ParsedSubItem(name="围巾")], + ), + ], + ) + ] + ) + + counts = apply_import(summary, db_session) + + assert counts == {"boxes": 1, "items": 2, "subitems": 2} + assert db_session.query(Box).count() == 1 + assert db_session.query(Item).count() == 2 + assert db_session.query(SubItem).count() == 2 + + container_item = db_session.query(Item).filter_by(name="收纳袋").one() + assert container_item.is_container is True