from datetime import UTC, datetime import re import sqlite3 from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from app.config import Settings from app.services.email import EmailDeliveryError from app.services.public_ip import PublicIPCheckResult, check_public_ipv4, check_public_ipv4_and_notify def _make_session(database_url: str) -> Session: engine = create_engine(database_url, connect_args={"check_same_thread": False}) session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session) return session_local() def _extract_csrf_token(html: str) -> str: match = re.search(r'name="csrf_token" value="([^"]+)"', html) assert match is not None return match.group(1) def _login(client: TestClient) -> None: login_page = client.get("/login") csrf_token = _extract_csrf_token(login_page.text) response = client.post( "/login", data={ "username": "admin", "password": "test-password", "csrf_token": csrf_token, }, follow_redirects=False, ) assert response.status_code == 303 def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None: session = _make_session(auth_database["app_url"]) try: result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") finally: session.close() assert result.status == "first_seen" assert result.changed is False conn = sqlite3.connect(auth_database["app_path"]) try: state = conn.execute( "SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error, last_provider FROM public_ip_state" ).fetchone() history = conn.execute( "SELECT ipv4, change_type, provider FROM public_ip_history ORDER BY id" ).fetchall() finally: conn.close() assert state == ("203.0.113.10", None, "first_seen", None, "ipify") assert history == [("203.0.113.10", "first_seen", "ipify")] def test_public_ip_unchanged_updates_state_without_adding_history(auth_database) -> None: session = _make_session(auth_database["app_url"]) try: first_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") unchanged_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") finally: session.close() assert first_result.status == "first_seen" assert unchanged_result.status == "unchanged" assert unchanged_result.changed is False conn = sqlite3.connect(auth_database["app_path"]) try: state = conn.execute( "SELECT current_ipv4, previous_ipv4, last_check_status FROM public_ip_state" ).fetchone() history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0] finally: conn.close() assert state == ("203.0.113.10", None, "unchanged") assert history_count == 1 def test_public_ip_changed_updates_state_and_adds_history(auth_database) -> None: session = _make_session(auth_database["app_url"]) try: check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") result = check_public_ipv4(session, fetch_public_ipv4=lambda: "198.51.100.25") finally: session.close() assert result.status == "changed" assert result.changed is True conn = sqlite3.connect(auth_database["app_path"]) try: state = conn.execute( "SELECT current_ipv4, previous_ipv4, last_check_status, last_changed_at FROM public_ip_state" ).fetchone() history = conn.execute( "SELECT ipv4, change_type FROM public_ip_history ORDER BY id" ).fetchall() finally: conn.close() assert state[0:3] == ("198.51.100.25", "203.0.113.10", "changed") assert state[3] is not None assert history == [("203.0.113.10", "first_seen"), ("198.51.100.25", "changed")] def test_public_ip_error_keeps_existing_ip_and_does_not_add_history(auth_database) -> None: session = _make_session(auth_database["app_url"]) try: check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") result = check_public_ipv4(session, fetch_public_ipv4=lambda: "not-an-ip") finally: session.close() assert result.status == "error" assert result.changed is False conn = sqlite3.connect(auth_database["app_path"]) try: state = conn.execute( "SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error FROM public_ip_state" ).fetchone() history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0] finally: conn.close() assert state[0:3] == ("203.0.113.10", None, "error") assert state[3] is not None assert history_count == 1 def test_public_ip_check_endpoint_requires_authentication(client: TestClient) -> None: response = client.get("/public-ip/check") assert response.status_code == 401 assert response.json() == {"detail": "authentication required"} def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatch) -> None: from app.api.routes import public_ip as public_ip_route fixed_checked_at = datetime(2026, 4, 29, 12, 0, tzinfo=UTC) monkeypatch.setattr( public_ip_route, "check_public_ipv4_and_notify", lambda session, bootstrap_settings: PublicIPCheckResult( status="changed", checked_at=fixed_checked_at, changed=True, ), ) _login(client) response = client.get("/public-ip/check") assert response.status_code == 200 assert response.json() == { "status": "changed", "checked_at": "2026-04-29T12:00:00Z", "changed": True, } assert "current_ipv4" not in response.text assert "previous_ipv4" not in response.text assert "203.0.113.10" not in response.text def _notification_settings() -> Settings: return Settings( _env_file=None, app_env="development", app_hostname="localhost:8000", app_database_url="sqlite:///./data/app.db", location_database_url="sqlite:///./data/locationRecorder.db", poo_database_url="sqlite:///./data/pooRecorder.db", auth_bootstrap_username="admin", auth_bootstrap_password="secret-password", smtp_enabled=True, smtp_host="smtp.example.com", smtp_port=587, smtp_username="smtp-user", smtp_password="super-secret-password", smtp_from_address="sender@example.com", smtp_to_address="recipient@example.com", smtp_use_starttls=True, ) def test_public_ip_notification_sends_only_when_changed(auth_database, monkeypatch) -> None: session = _make_session(auth_database["app_url"]) sent = [] monkeypatch.setattr( "app.services.public_ip.send_public_ip_changed_email", lambda settings, *, previous_ipv4, current_ipv4, detected_at: sent.append( (previous_ipv4, current_ipv4, detected_at) ), ) try: first_seen = check_public_ipv4_and_notify( session, bootstrap_settings=_notification_settings(), fetch_public_ipv4=lambda: "203.0.113.10", ) unchanged = check_public_ipv4_and_notify( session, bootstrap_settings=_notification_settings(), fetch_public_ipv4=lambda: "203.0.113.10", ) changed = check_public_ipv4_and_notify( session, bootstrap_settings=_notification_settings(), fetch_public_ipv4=lambda: "198.51.100.25", ) finally: session.close() assert first_seen.status == "first_seen" assert unchanged.status == "unchanged" assert changed.status == "changed" assert len(sent) == 1 assert sent[0][0] == "203.0.113.10" assert sent[0][1] == "198.51.100.25" assert sent[0][2] == changed.checked_at def test_public_ip_notification_failure_does_not_break_changed_result(auth_database, monkeypatch) -> None: session = _make_session(auth_database["app_url"]) monkeypatch.setattr( "app.services.public_ip.send_public_ip_changed_email", lambda settings, *, previous_ipv4, current_ipv4, detected_at: (_ for _ in ()).throw( EmailDeliveryError("smtp down") ), ) try: check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10") result = check_public_ipv4_and_notify( session, bootstrap_settings=_notification_settings(), fetch_public_ipv4=lambda: "198.51.100.25", ) finally: session.close() assert result.status == "changed" assert result.changed is True assert result.previous_ipv4 == "203.0.113.10" assert result.current_ipv4 == "198.51.100.25"