add import script from notion

This commit is contained in:
2026-04-19 14:28:00 +02:00
parent bda23909bf
commit ef058765de
5 changed files with 605 additions and 0 deletions
+47
View File
@@ -393,3 +393,50 @@ python -m pytest
- 全局搜索 name / note
- 创建后的重定向行为
- 关键页面结构和 UX 文案
## 一次性 Notion 导入
项目内附带了一个一次性迁移脚本:
```bash
python scripts/import_notion.py --dry-run
python scripts/import_notion.py --apply
```
说明:
- 这是一次性 migration / import 工具,不是长期同步功能
- 运行时会交互要求输入:
- Notion API token
- Notion 页面完整 URL
- `--dry-run` 只读取和解析,不写数据库
- `--apply` 会真正写入当前 SQLite 数据库
- 建议导入前先备份 `data/app.db`
### 当前支持的 Notion 结构映射
- `heading_2` -> `Box`
- 某个 `heading_2` 下的一级 bullet -> `Item`
- 如果一级 bullet 下还有二级 bullet
- 一级 bullet -> 容器型 `Item`
- 二级 bullet -> `SubItem`
当前最大只处理到这个层级:
```text
heading_2
└── 一级 bullet
└── 二级 bullet
```
更深层级会在日志中提示,但不会继续扩展成无限树。
### 这一版不导入图片
这一版导入脚本:
- 不下载图片
- 不导入图片
- 遇到图片或其他媒体 block 时会提示已跳过
图片后续可以在应用里手动补录。
+305
View File
@@ -0,0 +1,305 @@
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
from urllib.parse import urlparse
import requests
from requests import Response
from sqlalchemy.orm import Session
from app.db import init_db
from app.models import Box, Item, SubItem
NOTION_VERSION = "2026-03-11"
NOTION_API_BASE = "https://api.notion.com/v1"
@dataclass(slots=True)
class ParsedSubItem:
name: str
note: str | None = None
@dataclass(slots=True)
class ParsedItem:
name: str
note: str | None = None
is_container: bool = False
subitems: list[ParsedSubItem] = field(default_factory=list)
@dataclass(slots=True)
class ParsedBox:
name: str
note: str | None = None
items: list[ParsedItem] = field(default_factory=list)
@dataclass(slots=True)
class ImportSummary:
boxes: list[ParsedBox]
warnings: list[str] = field(default_factory=list)
@property
def box_count(self) -> int:
return len(self.boxes)
@property
def item_count(self) -> int:
return sum(len(box.items) for box in self.boxes)
@property
def container_item_count(self) -> int:
return sum(1 for box in self.boxes for item in box.items if item.is_container)
@property
def subitem_count(self) -> int:
return sum(len(item.subitems) for box in self.boxes for item in box.items)
class NotionClient:
def __init__(self, token: str):
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {token}",
"Notion-Version": NOTION_VERSION,
}
)
def list_block_children(self, block_id: str) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
next_cursor: str | None = None
while True:
params = {"page_size": 100}
if next_cursor:
params["start_cursor"] = next_cursor
response = self.session.get(
f"{NOTION_API_BASE}/blocks/{block_id}/children",
params=params,
timeout=30,
)
self._raise_for_status(response)
payload = response.json()
results.extend(payload.get("results", []))
if not payload.get("has_more"):
break
next_cursor = payload.get("next_cursor")
return results
def _raise_for_status(self, response: Response) -> None:
try:
response.raise_for_status()
except requests.HTTPError as exc:
message = response.text
raise RuntimeError(f"Notion API 请求失败: {response.status_code} {message}") from exc
def extract_page_id(page_url: str) -> str:
cleaned = page_url.strip()
parsed = urlparse(cleaned)
candidates = [segment for segment in parsed.path.split("/") if segment]
if parsed.fragment:
candidates.append(parsed.fragment)
matches: list[str] = []
pattern = re.compile(
r"([0-9a-fA-F]{32}|[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
)
for candidate in candidates:
matches.extend(pattern.findall(candidate))
if not matches:
raise ValueError("无法从 Notion 页面 URL 中提取 page id")
raw = matches[-1].replace("-", "").lower()
return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:]}"
def fetch_page_blocks(token: str, page_id: str) -> list[dict[str, Any]]:
client = NotionClient(token)
return _fetch_block_tree(client, page_id)
def _fetch_block_tree(client: NotionClient, block_id: str) -> list[dict[str, Any]]:
blocks = client.list_block_children(block_id)
for block in blocks:
if block.get("has_children"):
block["_children"] = _fetch_block_tree(client, block["id"])
else:
block["_children"] = []
return blocks
def parse_notion_blocks(blocks: list[dict[str, Any]]) -> ImportSummary:
boxes: list[ParsedBox] = []
warnings: list[str] = []
current_box: ParsedBox | None = None
for block in blocks:
block_type = block.get("type")
if block_type == "heading_2":
heading_text = extract_block_text(block)
if not heading_text:
warnings.append("发现空的 heading_2,已跳过")
continue
current_box = ParsedBox(name=heading_text)
boxes.append(current_box)
continue
if block_type == "bulleted_list_item":
if current_box is None:
warnings.append(
f"发现未归属到任何 heading_2 的一级 bullet{extract_block_text(block) or '[空文本]'}"
)
continue
parsed_item = _parse_item_block(block, warnings, level=1)
if parsed_item is not None:
current_box.items.append(parsed_item)
continue
warnings.extend(_warning_for_unsupported_block(block, level=0))
return ImportSummary(boxes=boxes, warnings=warnings)
def _parse_item_block(
block: dict[str, Any],
warnings: list[str],
*,
level: int,
) -> ParsedItem | None:
item_name = extract_block_text(block)
if not item_name:
warnings.append(f"发现空的 bullet(层级 {level}),已跳过")
return None
child_blocks = block.get("_children", [])
subitems: list[ParsedSubItem] = []
for child in child_blocks:
child_type = child.get("type")
if child_type == "bulleted_list_item":
child_name = extract_block_text(child)
if not child_name:
warnings.append(f"发现空的二级 bullet(父项:{item_name}),已跳过")
continue
subitems.append(ParsedSubItem(name=child_name))
if child.get("_children"):
warnings.append(
f"发现超出支持层级的三级内容(父项:{item_name} -> 子项:{child_name}),已忽略更深层级"
)
for deep_child in child["_children"]:
warnings.extend(_warning_for_unsupported_block(deep_child, level=3))
continue
warnings.extend(_warning_for_unsupported_block(child, level=2, parent_name=item_name))
return ParsedItem(
name=item_name,
is_container=bool(subitems),
subitems=subitems,
)
def _warning_for_unsupported_block(
block: dict[str, Any],
*,
level: int,
parent_name: str | None = None,
) -> list[str]:
block_type = block.get("type", "unknown")
text = extract_block_text(block) or "[无文本]"
prefix = f"层级 {level} block"
if parent_name:
prefix += f"(父项:{parent_name}"
if block_type in {"image", "file", "video", "audio", "pdf"}:
return [f"{prefix} 类型 {block_type} 已跳过(这版不导入图片或媒体):{text}"]
return [f"{prefix} 类型 {block_type} 未按导入规则处理,已跳过:{text}"]
def extract_block_text(block: dict[str, Any]) -> str:
block_type = block.get("type")
block_data = block.get(block_type, {}) if block_type else {}
rich_text = block_data.get("rich_text", [])
return "".join(part.get("plain_text", "") for part in rich_text).strip()
def print_summary(summary: ImportSummary) -> None:
print()
print("解析结果摘要")
print(f"- Box: {summary.box_count}")
print(f"- Item: {summary.item_count}")
print(f"- 其中容器型 Item: {summary.container_item_count}")
print(f"- SubItem: {summary.subitem_count}")
print(f"- Warnings: {len(summary.warnings)}")
print()
for box in summary.boxes:
container_names = [item.name for item in box.items if item.is_container]
print(f"[Box] {box.name}")
print(f" - Item 数量: {len(box.items)}")
if container_names:
print(f" - 容器型 Item: {', '.join(container_names)}")
for item in box.items:
if item.is_container:
print(f" * {item.name} -> SubItem {len(item.subitems)}")
if summary.warnings:
print()
print("Warnings")
for warning in summary.warnings:
print(f"- {warning}")
def apply_import(summary: ImportSummary, db: Session) -> dict[str, int]:
init_db()
created_boxes = 0
created_items = 0
created_subitems = 0
for parsed_box in summary.boxes:
box = Box(name=parsed_box.name, note=parsed_box.note)
db.add(box)
db.flush()
created_boxes += 1
for parsed_item in parsed_box.items:
item = Item(
box=box,
name=parsed_item.name,
note=parsed_item.note,
quantity=1,
is_container=parsed_item.is_container,
)
db.add(item)
db.flush()
created_items += 1
for parsed_subitem in parsed_item.subitems:
subitem = SubItem(
parent_item=item,
name=parsed_subitem.name,
note=parsed_subitem.note,
quantity=1,
)
db.add(subitem)
created_subitems += 1
db.commit()
return {
"boxes": created_boxes,
"items": created_items,
"subitems": created_subitems,
}
+1
View File
@@ -4,5 +4,6 @@ jinja2==3.1.6
sqlalchemy==2.0.43
python-multipart==0.0.20
pillow==11.2.1
requests==2.32.3
pytest==8.4.1
httpx==0.28.1
+109
View File
@@ -0,0 +1,109 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import getpass
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from app.db import SessionLocal, configure_database
from app.notion_import import (
apply_import,
extract_page_id,
fetch_page_blocks,
parse_notion_blocks,
print_summary,
)
def main() -> int:
parser = argparse.ArgumentParser(description="一次性导入 Notion 搬家记录到当前 SQLite 数据库")
parser.add_argument("--dry-run", action="store_true", help="只解析,不写数据库")
parser.add_argument("--apply", action="store_true", help="真正写入数据库")
args = parser.parse_args()
mode = _resolve_mode(args)
token = getpass.getpass("请输入 Notion API token: ").strip()
if not token:
print("未输入 token,已退出")
return 1
page_url = input("请输入 Notion 页面完整 URL: ").strip()
if not page_url:
print("未输入页面 URL,已退出")
return 1
try:
page_id = extract_page_id(page_url)
except ValueError as exc:
print(f"页面 URL 无法识别: {exc}")
return 1
print()
print(f"正在读取 Notion page: {page_id}")
try:
blocks = fetch_page_blocks(token, page_id)
except Exception as exc:
print(f"读取 Notion page 失败: {exc}")
return 1
print(f"已读取顶层及嵌套 blocks,总数约 {count_blocks(blocks)}")
print("正在解析页面结构...")
summary = parse_notion_blocks(blocks)
print_summary(summary)
if mode == "dry-run":
print()
print("dry-run 完成,未写入数据库。")
return 0
print()
print("这是一次性导入脚本,不建议在同一数据库上重复执行。")
print("建议先备份当前 SQLite 数据库,再继续。")
confirmed = input("确认执行导入?输入 yes 继续: ").strip().lower()
if confirmed != "yes":
print("已取消导入。")
return 0
configure_database()
db = SessionLocal()
try:
counts = apply_import(summary, db)
except Exception as exc:
db.rollback()
print(f"导入失败,已回滚: {exc}")
return 1
finally:
db.close()
print()
print("导入完成")
print(f"- 写入 Box: {counts['boxes']}")
print(f"- 写入 Item: {counts['items']}")
print(f"- 写入 SubItem: {counts['subitems']}")
return 0
def _resolve_mode(args: argparse.Namespace) -> str:
if args.apply and args.dry_run:
raise SystemExit("请只选择一种模式:--dry-run 或 --apply")
if args.apply:
return "apply"
return "dry-run"
def count_blocks(blocks: list[dict]) -> int:
total = 0
for block in blocks:
total += 1
total += count_blocks(block.get("_children", []))
return total
if __name__ == "__main__":
raise SystemExit(main())
+143
View File
@@ -0,0 +1,143 @@
from app.models import Box, Item, SubItem
from app.notion_import import (
ImportSummary,
ParsedBox,
ParsedItem,
ParsedSubItem,
apply_import,
extract_page_id,
parse_notion_blocks,
)
def make_heading_2(text: str) -> dict:
return {
"type": "heading_2",
"heading_2": {"rich_text": [{"plain_text": text}]},
"_children": [],
}
def make_bullet(text: str, children: list[dict] | None = None) -> dict:
return {
"type": "bulleted_list_item",
"bulleted_list_item": {"rich_text": [{"plain_text": text}]},
"_children": children or [],
}
def make_image_block() -> dict:
return {"type": "image", "image": {}, "_children": []}
def test_extract_page_id_from_notion_url():
url = "https://www.notion.so/workspace/My-Page-1234567890abcdef1234567890abcdef?pvs=4"
page_id = extract_page_id(url)
assert page_id == "12345678-90ab-cdef-1234-567890abcdef"
def test_parse_heading_2_as_box():
summary = parse_notion_blocks([make_heading_2("厨房箱")])
assert summary.box_count == 1
assert summary.boxes[0].name == "厨房箱"
def test_parse_first_level_bullet_as_item():
blocks = [make_heading_2("客厅箱"), make_bullet("锅具")]
summary = parse_notion_blocks(blocks)
assert summary.item_count == 1
assert summary.boxes[0].items[0].name == "锅具"
assert summary.boxes[0].items[0].is_container is False
def test_parse_bullet_with_children_as_container_item_and_subitems():
blocks = [
make_heading_2("电子箱"),
make_bullet("配件盒", children=[make_bullet("USB 线"), make_bullet("转接头")]),
]
summary = parse_notion_blocks(blocks)
item = summary.boxes[0].items[0]
assert item.name == "配件盒"
assert item.is_container is True
assert [subitem.name for subitem in item.subitems] == ["USB 线", "转接头"]
def test_parse_second_level_bullets_as_subitems():
blocks = [
make_heading_2("文件箱"),
make_bullet("文件袋", children=[make_bullet("合同"), make_bullet("护照复印件")]),
]
summary = parse_notion_blocks(blocks)
assert summary.subitem_count == 2
assert summary.boxes[0].items[0].subitems[1].name == "护照复印件"
def test_parse_deeper_than_supported_levels_adds_warning():
blocks = [
make_heading_2("测试箱"),
make_bullet(
"外层袋",
children=[make_bullet("内层物品", children=[make_bullet("更深一层")])],
),
]
summary = parse_notion_blocks(blocks)
assert summary.container_item_count == 1
assert any("超出支持层级" in warning for warning in summary.warnings)
def test_parse_non_text_media_block_adds_skip_warning():
blocks = [make_heading_2("照片箱"), make_image_block()]
summary = parse_notion_blocks(blocks)
assert any("这版不导入图片或媒体" in warning for warning in summary.warnings)
def test_dry_run_parse_does_not_write_database(db_session):
blocks = [make_heading_2("厨房箱"), make_bullet("")]
summary = parse_notion_blocks(blocks)
assert summary.box_count == 1
assert db_session.query(Box).count() == 0
assert db_session.query(Item).count() == 0
assert db_session.query(SubItem).count() == 0
def test_apply_import_writes_expected_structure(db_session):
summary = ImportSummary(
boxes=[
ParsedBox(
name="主卧箱",
items=[
ParsedItem(name="衣服", is_container=False),
ParsedItem(
name="收纳袋",
is_container=True,
subitems=[ParsedSubItem(name="袜子"), ParsedSubItem(name="围巾")],
),
],
)
]
)
counts = apply_import(summary, db_session)
assert counts == {"boxes": 1, "items": 2, "subitems": 2}
assert db_session.query(Box).count() == 1
assert db_session.query(Item).count() == 2
assert db_session.query(SubItem).count() == 2
container_item = db_session.query(Item).filter_by(name="收纳袋").one()
assert container_item.is_container is True