Migrate location recorder and refine db config
This commit is contained in:
+2
-2
@@ -3,7 +3,8 @@ APP_ENV=development
|
||||
APP_DEBUG=true
|
||||
APP_HOST=0.0.0.0
|
||||
APP_PORT=8000
|
||||
DATABASE_URL=sqlite:///./data/app.db
|
||||
LOCATION_DATABASE_URL=sqlite:///./data/locationRecorder.db
|
||||
POO_DATABASE_URL=sqlite:///./data/pooRecorder.db
|
||||
TICKTICK_CLIENT_ID=
|
||||
TICKTICK_CLIENT_SECRET=
|
||||
TICKTICK_REDIRECT_URI=http://localhost:8000/ticktick/auth/callback
|
||||
@@ -11,4 +12,3 @@ TICKTICK_TOKEN=
|
||||
HOME_ASSISTANT_BASE_URL=http://localhost:8123
|
||||
HOME_ASSISTANT_AUTH_TOKEN=
|
||||
HOME_ASSISTANT_ACTION_TASK_PROJECT_ID=
|
||||
|
||||
|
||||
+1
-1
@@ -1,3 +1,4 @@
|
||||
.codex
|
||||
.env
|
||||
.pytest_cache/
|
||||
.venv/
|
||||
@@ -5,4 +6,3 @@ __pycache__/
|
||||
*.pyc
|
||||
data/
|
||||
openapi/
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
- 环境变量配置体系
|
||||
- SQLite + SQLAlchemy + Alembic 基础设施
|
||||
- 极简 server-side templates
|
||||
- location recorder 第一版迁移
|
||||
- pytest 测试基础
|
||||
- OpenAPI 导出脚本
|
||||
- Docker / Compose 基础骨架
|
||||
@@ -19,7 +20,6 @@
|
||||
- TickTick 业务逻辑迁移
|
||||
- Home Assistant 业务逻辑迁移
|
||||
- poo records 业务迁移
|
||||
- location / life trajectory 业务迁移
|
||||
- Notion 模块
|
||||
|
||||
Notion 在 Go 版本中仍然存在,但已被明确视为 legacy / removed scope,不进入新的 Python 系统目标。
|
||||
@@ -30,6 +30,20 @@ Notion 在 Go 版本中仍然存在,但已被明确视为 legacy / removed sco
|
||||
- `legacy/go-backend/helper/`
|
||||
- `legacy/go-backend/.github/workflows/`
|
||||
|
||||
## 当前配置现实
|
||||
|
||||
当前系统仍然是两个独立的 SQLite 数据库文件,而不是单一数据库:
|
||||
|
||||
- `location` 模块使用自己的 DB 文件
|
||||
- `poo` 模块未来也将使用自己的 DB 文件
|
||||
|
||||
当前阶段明确不借这次重构把两个 DB 合并。配置层已经显式反映这一点:
|
||||
|
||||
- `LOCATION_DATABASE_URL`
|
||||
- `POO_DATABASE_URL`
|
||||
|
||||
目前真正接入的是 `location` 对应的数据库;`poo` 先保留配置占位,等模块迁入时再接上。
|
||||
|
||||
## 当前目录
|
||||
|
||||
Python 骨架的主要目录如下:
|
||||
@@ -100,9 +114,10 @@ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
|
||||
|
||||
## 数据库与 Alembic
|
||||
|
||||
当前默认数据库使用 SQLite。
|
||||
当前默认仍使用 SQLite,但要明确区分两个数据库文件:
|
||||
|
||||
- 默认数据库地址:`sqlite:///./data/app.db`
|
||||
- Location DB:`sqlite:///./data/locationRecorder.db`
|
||||
- Poo DB:`sqlite:///./data/pooRecorder.db`
|
||||
- 数据目录:`./data/`
|
||||
|
||||
初始化 migration 环境后,可继续添加模型并生成迁移:
|
||||
@@ -112,7 +127,7 @@ alembic revision --autogenerate -m "init tables"
|
||||
alembic upgrade head
|
||||
```
|
||||
|
||||
这一轮尚未引入业务表,因此 Alembic 目前主要是基础设施就绪状态。
|
||||
当前 Alembic 只接管 `location` 这条链路;`poo` 相关数据库与 migration 还没有迁入。
|
||||
|
||||
## 运行测试
|
||||
|
||||
@@ -163,15 +178,15 @@ SQLite 持久化目录:
|
||||
|
||||
## 后续迁移建议
|
||||
|
||||
后续可以在当前骨架上逐步迁移这些模块:
|
||||
后续可以在当前骨架上继续迁移这些模块:
|
||||
|
||||
- TickTick integration
|
||||
- Home Assistant integration
|
||||
- poo records
|
||||
- location / life trajectory
|
||||
|
||||
建议继续参考:
|
||||
|
||||
- [当前系统盘点](docs/current-system-inventory.md)
|
||||
- [Python 重构方案](docs/python-rewrite-plan.md)
|
||||
- [迁移风险清单](docs/migration-risks.md)
|
||||
- [Location Recorder 接管说明](docs/location-recorder.md)
|
||||
|
||||
+2
-2
@@ -1,7 +1,8 @@
|
||||
[alembic]
|
||||
script_location = alembic
|
||||
prepend_sys_path = .
|
||||
sqlalchemy.url = sqlite:///./data/app.db
|
||||
path_separator = os
|
||||
sqlalchemy.url = sqlite:///./data/locationRecorder.db
|
||||
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
@@ -34,4 +35,3 @@ formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
|
||||
|
||||
+4
-2
@@ -4,6 +4,7 @@ from alembic import context
|
||||
from sqlalchemy import engine_from_config, pool
|
||||
|
||||
from app.config import get_settings
|
||||
from app.models import Location # noqa: F401
|
||||
from app.models.base import Base
|
||||
|
||||
config = context.config
|
||||
@@ -12,7 +13,9 @@ if config.config_file_name is not None:
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
settings = get_settings()
|
||||
config.set_main_option("sqlalchemy.url", settings.database_url)
|
||||
configured_url = config.get_main_option("sqlalchemy.url")
|
||||
if not configured_url or configured_url == "sqlite:///./data/locationRecorder.db":
|
||||
config.set_main_option("sqlalchemy.url", settings.location_database_url)
|
||||
|
||||
target_metadata = Base.metadata
|
||||
|
||||
@@ -43,4 +46,3 @@ if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
||||
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
"""location baseline
|
||||
|
||||
Revision ID: 20260419_01_location_baseline
|
||||
Revises:
|
||||
Create Date: 2026-04-19 00:00:00.000000
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
revision: str = "20260419_01_location_baseline"
|
||||
down_revision: Union[str, None] = None
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"location",
|
||||
sa.Column("person", sa.Text(), nullable=False),
|
||||
sa.Column("datetime", sa.Text(), nullable=False),
|
||||
sa.Column("latitude", sa.Float(), nullable=False),
|
||||
sa.Column("longitude", sa.Float(), nullable=False),
|
||||
sa.Column("altitude", sa.Float(), nullable=True),
|
||||
sa.PrimaryKeyConstraint("person", "datetime"),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_table("location")
|
||||
@@ -0,0 +1,28 @@
|
||||
import json
|
||||
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
from fastapi.responses import PlainTextResponse, Response
|
||||
from pydantic import ValidationError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.dependencies import get_db
|
||||
from app.schemas.location import LocationRecordRequest
|
||||
from app.services.location import record_location
|
||||
|
||||
router = APIRouter(tags=["location"])
|
||||
|
||||
|
||||
@router.post("/location/record")
|
||||
async def create_location_record(request: Request, db: Session = Depends(get_db)) -> Response:
|
||||
try:
|
||||
raw_payload = await request.body()
|
||||
data = json.loads(raw_payload)
|
||||
payload = LocationRecordRequest.model_validate(data)
|
||||
except json.JSONDecodeError as exc:
|
||||
return PlainTextResponse(str(exc), status_code=400)
|
||||
except ValidationError as exc:
|
||||
return PlainTextResponse(str(exc), status_code=400)
|
||||
|
||||
record_location(db, payload)
|
||||
return Response(status_code=200)
|
||||
|
||||
+17
-8
@@ -12,7 +12,8 @@ class Settings(BaseSettings):
|
||||
app_host: str = "0.0.0.0"
|
||||
app_port: int = 8000
|
||||
|
||||
database_url: str = "sqlite:///./data/app.db"
|
||||
location_database_url: str = "sqlite:///./data/locationRecorder.db"
|
||||
poo_database_url: str = "sqlite:///./data/pooRecorder.db"
|
||||
|
||||
ticktick_client_id: str = ""
|
||||
ticktick_client_secret: str = ""
|
||||
@@ -34,17 +35,25 @@ class Settings(BaseSettings):
|
||||
def is_development(self) -> bool:
|
||||
return self.app_env.lower() == "development"
|
||||
|
||||
@staticmethod
|
||||
def _sqlite_path_from_url(database_url: str) -> Path | None:
|
||||
prefix = "sqlite:///"
|
||||
if not database_url.startswith(prefix):
|
||||
return None
|
||||
raw_path = database_url[len(prefix) :]
|
||||
return Path(raw_path)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def sqlite_path(self) -> Path | None:
|
||||
prefix = "sqlite:///"
|
||||
if not self.database_url.startswith(prefix):
|
||||
return None
|
||||
raw_path = self.database_url[len(prefix) :]
|
||||
return Path(raw_path)
|
||||
def location_sqlite_path(self) -> Path | None:
|
||||
return self._sqlite_path_from_url(self.location_database_url)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def poo_sqlite_path(self) -> Path | None:
|
||||
return self._sqlite_path_from_url(self.poo_database_url)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_settings() -> Settings:
|
||||
return Settings()
|
||||
|
||||
|
||||
@@ -13,10 +13,10 @@ class Base(DeclarativeBase):
|
||||
settings = get_settings()
|
||||
|
||||
connect_args: dict[str, object] = {}
|
||||
if settings.database_url.startswith("sqlite"):
|
||||
if settings.location_database_url.startswith("sqlite"):
|
||||
connect_args["check_same_thread"] = False
|
||||
|
||||
engine = create_engine(settings.database_url, connect_args=connect_args)
|
||||
engine = create_engine(settings.location_database_url, connect_args=connect_args)
|
||||
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
|
||||
|
||||
|
||||
@@ -26,4 +26,3 @@ def get_db_session() -> Generator[Session, None, None]:
|
||||
yield session
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
+6
-3
@@ -4,14 +4,17 @@ from pathlib import Path
|
||||
from fastapi import FastAPI
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
from app import models # noqa: F401
|
||||
from app.api.routes import pages, status
|
||||
from app.api.routes.location import router as location_router
|
||||
from app.config import get_settings
|
||||
|
||||
|
||||
def ensure_runtime_dirs() -> None:
|
||||
settings = get_settings()
|
||||
if settings.sqlite_path is not None:
|
||||
settings.sqlite_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
for path in (settings.location_sqlite_path, settings.poo_sqlite_path):
|
||||
if path is not None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -38,8 +41,8 @@ def create_app() -> FastAPI:
|
||||
|
||||
app.include_router(status.router)
|
||||
app.include_router(pages.router)
|
||||
app.include_router(location_router)
|
||||
return app
|
||||
|
||||
|
||||
app = create_app()
|
||||
|
||||
|
||||
@@ -1,2 +1,5 @@
|
||||
"""SQLAlchemy models package."""
|
||||
|
||||
from app.models.location import Location
|
||||
|
||||
__all__ = ["Location"]
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
from sqlalchemy import Float, String
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.db import Base
|
||||
|
||||
|
||||
class Location(Base):
|
||||
__tablename__ = "location"
|
||||
|
||||
person: Mapped[str] = mapped_column(String, primary_key=True)
|
||||
datetime: Mapped[str] = mapped_column(String, primary_key=True)
|
||||
latitude: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
longitude: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
altitude: Mapped[float | None] = mapped_column(Float, nullable=True)
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class LocationRecordRequest(BaseModel):
|
||||
person: str
|
||||
latitude: str
|
||||
longitude: str
|
||||
altitude: str = ""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import insert
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.location import Location
|
||||
from app.schemas.location import LocationRecordRequest
|
||||
|
||||
|
||||
def _parse_float_compat(value: str) -> float:
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
return 0.0
|
||||
|
||||
|
||||
def _utc_now_rfc3339() -> str:
|
||||
now = datetime.now(timezone.utc).replace(microsecond=0)
|
||||
return now.isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def record_location(session: Session, payload: LocationRecordRequest) -> None:
|
||||
stmt = (
|
||||
insert(Location)
|
||||
.prefix_with("OR IGNORE")
|
||||
.values(
|
||||
person=payload.person,
|
||||
datetime=_utc_now_rfc3339(),
|
||||
latitude=_parse_float_compat(payload.latitude),
|
||||
longitude=_parse_float_compat(payload.longitude),
|
||||
altitude=_parse_float_compat(payload.altitude),
|
||||
)
|
||||
)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
|
||||
+2
-2
@@ -8,9 +8,9 @@ services:
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
DATABASE_URL: sqlite:////app/data/app.db
|
||||
LOCATION_DATABASE_URL: sqlite:////app/data/locationRecorder.db
|
||||
POO_DATABASE_URL: sqlite:////app/data/pooRecorder.db
|
||||
APP_HOST: 0.0.0.0
|
||||
APP_PORT: 8000
|
||||
volumes:
|
||||
- ./data:/app/data
|
||||
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
# Location Recorder
|
||||
|
||||
本文档说明 `location recorder` 在 Python 项目中的当前数据库接管策略。
|
||||
|
||||
## Legacy 事实基线
|
||||
|
||||
当前 legacy SQLite 中 `location` 表的真实 schema 为:
|
||||
|
||||
```sql
|
||||
CREATE TABLE location (
|
||||
person TEXT NOT NULL,
|
||||
datetime TEXT NOT NULL,
|
||||
latitude REAL NOT NULL,
|
||||
longitude REAL NOT NULL,
|
||||
altitude REAL,
|
||||
PRIMARY KEY (person, datetime)
|
||||
);
|
||||
```
|
||||
|
||||
历史上 legacy Go 实现使用:
|
||||
|
||||
```sql
|
||||
PRAGMA user_version = 2;
|
||||
```
|
||||
|
||||
这代表旧系统曾依赖 `user_version` 管理 location 数据库版本,但这不再是 Python 项目的长期 migration 机制。
|
||||
|
||||
## 当前策略
|
||||
|
||||
当前采用的最小必要接管方案是:
|
||||
|
||||
1. 把上述 `location` schema 视为 Alembic baseline
|
||||
2. 新数据库通过 Alembic `upgrade head` 初始化
|
||||
3. 已有 legacy SQLite 数据库,只要确认 schema 与 baseline 一致,就通过 `alembic stamp` 接管
|
||||
4. 未来不再以 `PRAGMA user_version` 作为主 migration 机制
|
||||
|
||||
当前 baseline revision 是:
|
||||
|
||||
- `20260419_01_location_baseline`
|
||||
|
||||
## 新数据库初始化
|
||||
|
||||
对于一个全新 SQLite 数据库,执行:
|
||||
|
||||
```bash
|
||||
alembic upgrade head
|
||||
```
|
||||
|
||||
这会创建与 legacy 相同的 `location` 表结构,并在库中建立 Alembic revision 记录。
|
||||
|
||||
## 旧数据库接管
|
||||
|
||||
对于已经存在的 legacy SQLite 数据库:
|
||||
|
||||
1. 先确认其 `location` 表 schema 与 baseline 一致
|
||||
2. 旧库里的 `PRAGMA user_version = 2` 仅视为历史事实,不再继续沿用
|
||||
3. 确认无误后,对该数据库执行 `stamp`,而不是重新跑创建表 migration
|
||||
|
||||
示例:
|
||||
|
||||
```bash
|
||||
LOCATION_DATABASE_URL=sqlite:///./data/locationRecorder.db alembic stamp 20260419_01_location_baseline
|
||||
```
|
||||
|
||||
这样做的含义是:
|
||||
|
||||
- 告诉 Alembic:这个数据库已经处于 baseline 结构
|
||||
- 不修改已有 `location` 表数据
|
||||
- 后续 migration 由 Alembic 接管
|
||||
|
||||
## 关于 `data/locationRecorder.db`
|
||||
|
||||
你本地放在 `data/locationRecorder.db` 的 legacy 样本库,可以用于:
|
||||
|
||||
- 人工核对 schema
|
||||
- 手动验证 `stamp` 接管流程
|
||||
- 做开发时的兼容性确认
|
||||
|
||||
但当前代码不应硬依赖这个文件存在。
|
||||
|
||||
## 测试样本的安全使用方式
|
||||
|
||||
如果要用 legacy SQLite 样本做测试或验证,应遵守:
|
||||
|
||||
1. 不直接在原始样本文件上跑测试
|
||||
2. 先复制到临时路径
|
||||
3. 所有 `stamp`、写入、实验性 migration 都只针对副本执行
|
||||
|
||||
自动化测试里当前采用的方式是:
|
||||
|
||||
- 构造一个“legacy 风格”的临时 SQLite 文件
|
||||
- 建出同样的 `location` 表
|
||||
- 设置 `PRAGMA user_version = 2`
|
||||
- 再执行 Alembic `stamp`
|
||||
|
||||
这样可以验证接管路径,同时不污染真实样本库。
|
||||
+41
-2
@@ -11,16 +11,56 @@
|
||||
- 建立 pytest 基础设施
|
||||
- 建立 Docker / Compose 基础骨架
|
||||
- 建立 OpenAPI 导出脚本
|
||||
- 迁入 `location recorder` 第一版
|
||||
|
||||
## 数据库配置现状
|
||||
|
||||
当前系统在配置层上已明确保留两个独立 SQLite DB 文件:
|
||||
|
||||
- `LOCATION_DATABASE_URL`
|
||||
- `POO_DATABASE_URL`
|
||||
|
||||
当前阶段不打算把这两个数据库合并。
|
||||
|
||||
其中:
|
||||
|
||||
- `location` 模块已经实际接到 `LOCATION_DATABASE_URL`
|
||||
- `poo` 目前只保留 `POO_DATABASE_URL` 配置占位,等待模块迁入
|
||||
|
||||
## 当前阶段未做内容
|
||||
|
||||
- 未迁移 TickTick 业务逻辑
|
||||
- 未迁移 Home Assistant 业务逻辑
|
||||
- 未迁移 poo records
|
||||
- 未迁移 location / life trajectory
|
||||
- 未实现真实 OAuth 流程
|
||||
- 未做数据迁移
|
||||
|
||||
## Location recorder 说明
|
||||
|
||||
当前 Python 项目已经接入 `POST /location/record`,并对齐 legacy SQLite schema:
|
||||
|
||||
```sql
|
||||
CREATE TABLE location (
|
||||
person TEXT NOT NULL,
|
||||
datetime TEXT NOT NULL,
|
||||
latitude REAL NOT NULL,
|
||||
longitude REAL NOT NULL,
|
||||
altitude REAL,
|
||||
PRIMARY KEY (person, datetime)
|
||||
);
|
||||
```
|
||||
|
||||
当前已经补上最小 Alembic baseline / 接管策略:
|
||||
|
||||
- `location` 当前 schema 被视为 Alembic baseline
|
||||
- 新数据库通过 `alembic upgrade head` 初始化
|
||||
- 已有 legacy SQLite 数据库通过 `alembic stamp` 接管
|
||||
- `PRAGMA user_version = 2` 仅保留为历史事实,不再作为新的主 migration 机制
|
||||
|
||||
详见:
|
||||
|
||||
- [location-recorder.md](location-recorder.md)
|
||||
|
||||
## 后续建议顺序
|
||||
|
||||
建议继续沿用既有迁移文档中的顺序:
|
||||
@@ -37,4 +77,3 @@
|
||||
- 不要把旧 Python 版本当作设计基线
|
||||
- 不要重新引入 Notion 作为 Python 主系统能力
|
||||
- 在迁业务模块时,优先补 contract tests
|
||||
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
from app.config import Settings
|
||||
|
||||
|
||||
def test_settings_support_two_independent_database_urls(monkeypatch) -> None:
|
||||
monkeypatch.setenv("LOCATION_DATABASE_URL", "sqlite:///./data/locationRecorder.db")
|
||||
monkeypatch.setenv("POO_DATABASE_URL", "sqlite:///./data/pooRecorder.db")
|
||||
|
||||
settings = Settings()
|
||||
|
||||
assert settings.location_database_url == "sqlite:///./data/locationRecorder.db"
|
||||
assert settings.poo_database_url == "sqlite:///./data/pooRecorder.db"
|
||||
assert settings.location_sqlite_path is not None
|
||||
assert settings.location_sqlite_path.name == "locationRecorder.db"
|
||||
assert settings.poo_sqlite_path is not None
|
||||
assert settings.poo_sqlite_path.name == "pooRecorder.db"
|
||||
@@ -0,0 +1,172 @@
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import sqlite3
|
||||
|
||||
import pytest
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
import app.db
|
||||
from app.main import create_app
|
||||
|
||||
LOCATION_BASELINE_REVISION = "20260419_01_location_baseline"
|
||||
|
||||
|
||||
def _make_alembic_config(database_url: str) -> Config:
|
||||
config = Config("alembic.ini")
|
||||
config.set_main_option("sqlalchemy.url", database_url)
|
||||
return config
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def location_client(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
|
||||
database_path = tmp_path / "location_test.db"
|
||||
database_url = f"sqlite:///{database_path}"
|
||||
|
||||
command.upgrade(_make_alembic_config(database_url), "head")
|
||||
|
||||
engine = create_engine(database_url, connect_args={"check_same_thread": False})
|
||||
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
||||
|
||||
monkeypatch.setattr(app.db, "engine", engine)
|
||||
monkeypatch.setattr(app.db, "SessionLocal", session_local)
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
fastapi_app = create_app()
|
||||
with TestClient(fastapi_app) as client:
|
||||
yield client, engine
|
||||
|
||||
engine.dispose()
|
||||
|
||||
|
||||
def test_location_record_endpoint_writes_row(location_client) -> None:
|
||||
client, engine = location_client
|
||||
|
||||
response = client.post(
|
||||
"/location/record",
|
||||
json={
|
||||
"person": "tianyu",
|
||||
"latitude": "1.23",
|
||||
"longitude": "4.56",
|
||||
"altitude": "7.89",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == ""
|
||||
|
||||
with engine.connect() as conn:
|
||||
row = conn.execute(
|
||||
text(
|
||||
"SELECT person, datetime, latitude, longitude, altitude "
|
||||
"FROM location ORDER BY datetime DESC LIMIT 1"
|
||||
)
|
||||
).one()
|
||||
|
||||
assert row.person == "tianyu"
|
||||
assert row.latitude == pytest.approx(1.23)
|
||||
assert row.longitude == pytest.approx(4.56)
|
||||
assert row.altitude == pytest.approx(7.89)
|
||||
datetime.fromisoformat(row.datetime.replace("Z", "+00:00"))
|
||||
|
||||
|
||||
def test_location_record_endpoint_rejects_unknown_fields(location_client) -> None:
|
||||
client, _ = location_client
|
||||
|
||||
response = client.post(
|
||||
"/location/record",
|
||||
json={
|
||||
"person": "tianyu",
|
||||
"latitude": "1.23",
|
||||
"longitude": "4.56",
|
||||
"extra": "not-allowed",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_location_record_endpoint_keeps_legacy_lenient_number_parsing(location_client) -> None:
|
||||
client, engine = location_client
|
||||
|
||||
response = client.post(
|
||||
"/location/record",
|
||||
json={
|
||||
"person": "tianyu",
|
||||
"latitude": "bad-lat",
|
||||
"longitude": "bad-long",
|
||||
"altitude": "bad-alt",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
with engine.connect() as conn:
|
||||
row = conn.execute(
|
||||
text(
|
||||
"SELECT latitude, longitude, altitude "
|
||||
"FROM location ORDER BY datetime DESC LIMIT 1"
|
||||
)
|
||||
).one()
|
||||
|
||||
assert row.latitude == pytest.approx(0.0)
|
||||
assert row.longitude == pytest.approx(0.0)
|
||||
assert row.altitude == pytest.approx(0.0)
|
||||
|
||||
|
||||
def test_legacy_style_location_db_can_be_stamped_and_adopted(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
database_path = tmp_path / "legacy_location.db"
|
||||
conn = sqlite3.connect(database_path)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE location (
|
||||
person TEXT NOT NULL,
|
||||
datetime TEXT NOT NULL,
|
||||
latitude REAL NOT NULL,
|
||||
longitude REAL NOT NULL,
|
||||
altitude REAL,
|
||||
PRIMARY KEY (person, datetime)
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute("PRAGMA user_version = 2")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
database_url = f"sqlite:///{database_path}"
|
||||
command.stamp(_make_alembic_config(database_url), LOCATION_BASELINE_REVISION)
|
||||
|
||||
engine = create_engine(database_url, connect_args={"check_same_thread": False})
|
||||
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
||||
monkeypatch.setattr(app.db, "engine", engine)
|
||||
monkeypatch.setattr(app.db, "SessionLocal", session_local)
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
fastapi_app = create_app()
|
||||
with TestClient(fastapi_app) as client:
|
||||
response = client.post(
|
||||
"/location/record",
|
||||
json={
|
||||
"person": "legacy-user",
|
||||
"latitude": "12.3",
|
||||
"longitude": "45.6",
|
||||
"altitude": "7.8",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
with engine.connect() as db_conn:
|
||||
revision = db_conn.execute(text("SELECT version_num FROM alembic_version")).scalar_one()
|
||||
row_count = db_conn.execute(text("SELECT COUNT(*) FROM location")).scalar_one()
|
||||
|
||||
assert revision == LOCATION_BASELINE_REVISION
|
||||
assert row_count == 1
|
||||
|
||||
engine.dispose()
|
||||
Reference in New Issue
Block a user