Files
tliu93 779e160b95
pytest / test (push) Successful in 57s
pytest / test (pull_request) Successful in 54s
add ip change notification and refine sender display
2026-04-29 13:03:12 +02:00

259 lines
8.8 KiB
Python

from datetime import UTC, datetime
import re
import sqlite3
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from app.config import Settings
from app.services.email import EmailDeliveryError
from app.services.public_ip import PublicIPCheckResult, check_public_ipv4, check_public_ipv4_and_notify
def _make_session(database_url: str) -> Session:
engine = create_engine(database_url, connect_args={"check_same_thread": False})
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
return session_local()
def _extract_csrf_token(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _login(client: TestClient) -> None:
login_page = client.get("/login")
csrf_token = _extract_csrf_token(login_page.text)
response = client.post(
"/login",
data={
"username": "admin",
"password": "test-password",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert response.status_code == 303
def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
finally:
session.close()
assert result.status == "first_seen"
assert result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error, last_provider FROM public_ip_state"
).fetchone()
history = conn.execute(
"SELECT ipv4, change_type, provider FROM public_ip_history ORDER BY id"
).fetchall()
finally:
conn.close()
assert state == ("203.0.113.10", None, "first_seen", None, "ipify")
assert history == [("203.0.113.10", "first_seen", "ipify")]
def test_public_ip_unchanged_updates_state_without_adding_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
first_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
unchanged_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
finally:
session.close()
assert first_result.status == "first_seen"
assert unchanged_result.status == "unchanged"
assert unchanged_result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status FROM public_ip_state"
).fetchone()
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
finally:
conn.close()
assert state == ("203.0.113.10", None, "unchanged")
assert history_count == 1
def test_public_ip_changed_updates_state_and_adds_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "198.51.100.25")
finally:
session.close()
assert result.status == "changed"
assert result.changed is True
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_changed_at FROM public_ip_state"
).fetchone()
history = conn.execute(
"SELECT ipv4, change_type FROM public_ip_history ORDER BY id"
).fetchall()
finally:
conn.close()
assert state[0:3] == ("198.51.100.25", "203.0.113.10", "changed")
assert state[3] is not None
assert history == [("203.0.113.10", "first_seen"), ("198.51.100.25", "changed")]
def test_public_ip_error_keeps_existing_ip_and_does_not_add_history(auth_database) -> None:
session = _make_session(auth_database["app_url"])
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "not-an-ip")
finally:
session.close()
assert result.status == "error"
assert result.changed is False
conn = sqlite3.connect(auth_database["app_path"])
try:
state = conn.execute(
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error FROM public_ip_state"
).fetchone()
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
finally:
conn.close()
assert state[0:3] == ("203.0.113.10", None, "error")
assert state[3] is not None
assert history_count == 1
def test_public_ip_check_endpoint_requires_authentication(client: TestClient) -> None:
response = client.get("/public-ip/check")
assert response.status_code == 401
assert response.json() == {"detail": "authentication required"}
def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatch) -> None:
from app.api.routes import public_ip as public_ip_route
fixed_checked_at = datetime(2026, 4, 29, 12, 0, tzinfo=UTC)
monkeypatch.setattr(
public_ip_route,
"check_public_ipv4_and_notify",
lambda session, bootstrap_settings: PublicIPCheckResult(
status="changed",
checked_at=fixed_checked_at,
changed=True,
),
)
_login(client)
response = client.get("/public-ip/check")
assert response.status_code == 200
assert response.json() == {
"status": "changed",
"checked_at": "2026-04-29T12:00:00Z",
"changed": True,
}
assert "current_ipv4" not in response.text
assert "previous_ipv4" not in response.text
assert "203.0.113.10" not in response.text
def _notification_settings() -> Settings:
return Settings(
_env_file=None,
app_env="development",
app_hostname="localhost:8000",
app_database_url="sqlite:///./data/app.db",
location_database_url="sqlite:///./data/locationRecorder.db",
poo_database_url="sqlite:///./data/pooRecorder.db",
auth_bootstrap_username="admin",
auth_bootstrap_password="secret-password",
smtp_enabled=True,
smtp_host="smtp.example.com",
smtp_port=587,
smtp_username="smtp-user",
smtp_password="super-secret-password",
smtp_from_address="sender@example.com",
smtp_to_address="recipient@example.com",
smtp_use_starttls=True,
)
def test_public_ip_notification_sends_only_when_changed(auth_database, monkeypatch) -> None:
session = _make_session(auth_database["app_url"])
sent = []
monkeypatch.setattr(
"app.services.public_ip.send_public_ip_changed_email",
lambda settings, *, previous_ipv4, current_ipv4, detected_at: sent.append(
(previous_ipv4, current_ipv4, detected_at)
),
)
try:
first_seen = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "203.0.113.10",
)
unchanged = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "203.0.113.10",
)
changed = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "198.51.100.25",
)
finally:
session.close()
assert first_seen.status == "first_seen"
assert unchanged.status == "unchanged"
assert changed.status == "changed"
assert len(sent) == 1
assert sent[0][0] == "203.0.113.10"
assert sent[0][1] == "198.51.100.25"
assert sent[0][2] == changed.checked_at
def test_public_ip_notification_failure_does_not_break_changed_result(auth_database, monkeypatch) -> None:
session = _make_session(auth_database["app_url"])
monkeypatch.setattr(
"app.services.public_ip.send_public_ip_changed_email",
lambda settings, *, previous_ipv4, current_ipv4, detected_at: (_ for _ in ()).throw(
EmailDeliveryError("smtp down")
),
)
try:
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
result = check_public_ipv4_and_notify(
session,
bootstrap_settings=_notification_settings(),
fetch_public_ipv4=lambda: "198.51.100.25",
)
finally:
session.close()
assert result.status == "changed"
assert result.changed is True
assert result.previous_ipv4 == "203.0.113.10"
assert result.current_ipv4 == "198.51.100.25"