175 lines
5.8 KiB
Python
175 lines
5.8 KiB
Python
|
|
from datetime import UTC, datetime
|
||
|
|
import re
|
||
|
|
import sqlite3
|
||
|
|
|
||
|
|
from fastapi.testclient import TestClient
|
||
|
|
from sqlalchemy import create_engine
|
||
|
|
from sqlalchemy.orm import Session, sessionmaker
|
||
|
|
|
||
|
|
from app.services.public_ip import PublicIPCheckResult, check_public_ipv4
|
||
|
|
|
||
|
|
|
||
|
|
def _make_session(database_url: str) -> Session:
|
||
|
|
engine = create_engine(database_url, connect_args={"check_same_thread": False})
|
||
|
|
session_local = sessionmaker(bind=engine, autoflush=False, autocommit=False, class_=Session)
|
||
|
|
return session_local()
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_csrf_token(html: str) -> str:
|
||
|
|
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
|
||
|
|
assert match is not None
|
||
|
|
return match.group(1)
|
||
|
|
|
||
|
|
|
||
|
|
def _login(client: TestClient) -> None:
|
||
|
|
login_page = client.get("/login")
|
||
|
|
csrf_token = _extract_csrf_token(login_page.text)
|
||
|
|
response = client.post(
|
||
|
|
"/login",
|
||
|
|
data={
|
||
|
|
"username": "admin",
|
||
|
|
"password": "test-password",
|
||
|
|
"csrf_token": csrf_token,
|
||
|
|
},
|
||
|
|
follow_redirects=False,
|
||
|
|
)
|
||
|
|
assert response.status_code == 303
|
||
|
|
|
||
|
|
|
||
|
|
def test_public_ip_first_seen_persists_state_and_history(auth_database) -> None:
|
||
|
|
session = _make_session(auth_database["app_url"])
|
||
|
|
try:
|
||
|
|
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
|
||
|
|
finally:
|
||
|
|
session.close()
|
||
|
|
|
||
|
|
assert result.status == "first_seen"
|
||
|
|
assert result.changed is False
|
||
|
|
|
||
|
|
conn = sqlite3.connect(auth_database["app_path"])
|
||
|
|
try:
|
||
|
|
state = conn.execute(
|
||
|
|
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error, last_provider FROM public_ip_state"
|
||
|
|
).fetchone()
|
||
|
|
history = conn.execute(
|
||
|
|
"SELECT ipv4, change_type, provider FROM public_ip_history ORDER BY id"
|
||
|
|
).fetchall()
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
assert state == ("203.0.113.10", None, "first_seen", None, "ipify")
|
||
|
|
assert history == [("203.0.113.10", "first_seen", "ipify")]
|
||
|
|
|
||
|
|
|
||
|
|
def test_public_ip_unchanged_updates_state_without_adding_history(auth_database) -> None:
|
||
|
|
session = _make_session(auth_database["app_url"])
|
||
|
|
try:
|
||
|
|
first_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
|
||
|
|
unchanged_result = check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
|
||
|
|
finally:
|
||
|
|
session.close()
|
||
|
|
|
||
|
|
assert first_result.status == "first_seen"
|
||
|
|
assert unchanged_result.status == "unchanged"
|
||
|
|
assert unchanged_result.changed is False
|
||
|
|
|
||
|
|
conn = sqlite3.connect(auth_database["app_path"])
|
||
|
|
try:
|
||
|
|
state = conn.execute(
|
||
|
|
"SELECT current_ipv4, previous_ipv4, last_check_status FROM public_ip_state"
|
||
|
|
).fetchone()
|
||
|
|
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
assert state == ("203.0.113.10", None, "unchanged")
|
||
|
|
assert history_count == 1
|
||
|
|
|
||
|
|
|
||
|
|
def test_public_ip_changed_updates_state_and_adds_history(auth_database) -> None:
|
||
|
|
session = _make_session(auth_database["app_url"])
|
||
|
|
try:
|
||
|
|
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
|
||
|
|
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "198.51.100.25")
|
||
|
|
finally:
|
||
|
|
session.close()
|
||
|
|
|
||
|
|
assert result.status == "changed"
|
||
|
|
assert result.changed is True
|
||
|
|
|
||
|
|
conn = sqlite3.connect(auth_database["app_path"])
|
||
|
|
try:
|
||
|
|
state = conn.execute(
|
||
|
|
"SELECT current_ipv4, previous_ipv4, last_check_status, last_changed_at FROM public_ip_state"
|
||
|
|
).fetchone()
|
||
|
|
history = conn.execute(
|
||
|
|
"SELECT ipv4, change_type FROM public_ip_history ORDER BY id"
|
||
|
|
).fetchall()
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
assert state[0:3] == ("198.51.100.25", "203.0.113.10", "changed")
|
||
|
|
assert state[3] is not None
|
||
|
|
assert history == [("203.0.113.10", "first_seen"), ("198.51.100.25", "changed")]
|
||
|
|
|
||
|
|
|
||
|
|
def test_public_ip_error_keeps_existing_ip_and_does_not_add_history(auth_database) -> None:
|
||
|
|
session = _make_session(auth_database["app_url"])
|
||
|
|
try:
|
||
|
|
check_public_ipv4(session, fetch_public_ipv4=lambda: "203.0.113.10")
|
||
|
|
result = check_public_ipv4(session, fetch_public_ipv4=lambda: "not-an-ip")
|
||
|
|
finally:
|
||
|
|
session.close()
|
||
|
|
|
||
|
|
assert result.status == "error"
|
||
|
|
assert result.changed is False
|
||
|
|
|
||
|
|
conn = sqlite3.connect(auth_database["app_path"])
|
||
|
|
try:
|
||
|
|
state = conn.execute(
|
||
|
|
"SELECT current_ipv4, previous_ipv4, last_check_status, last_check_error FROM public_ip_state"
|
||
|
|
).fetchone()
|
||
|
|
history_count = conn.execute("SELECT COUNT(*) FROM public_ip_history").fetchone()[0]
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
assert state[0:3] == ("203.0.113.10", None, "error")
|
||
|
|
assert state[3] is not None
|
||
|
|
assert history_count == 1
|
||
|
|
|
||
|
|
|
||
|
|
def test_public_ip_check_endpoint_requires_authentication(client: TestClient) -> None:
|
||
|
|
response = client.get("/public-ip/check")
|
||
|
|
|
||
|
|
assert response.status_code == 401
|
||
|
|
assert response.json() == {"detail": "authentication required"}
|
||
|
|
|
||
|
|
|
||
|
|
def test_public_ip_check_endpoint_hides_ip_values(client: TestClient, monkeypatch) -> None:
|
||
|
|
from app.api.routes import public_ip as public_ip_route
|
||
|
|
|
||
|
|
fixed_checked_at = datetime(2026, 4, 29, 12, 0, tzinfo=UTC)
|
||
|
|
|
||
|
|
monkeypatch.setattr(
|
||
|
|
public_ip_route,
|
||
|
|
"check_public_ipv4",
|
||
|
|
lambda session: PublicIPCheckResult(
|
||
|
|
status="changed",
|
||
|
|
checked_at=fixed_checked_at,
|
||
|
|
changed=True,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
|
||
|
|
_login(client)
|
||
|
|
response = client.get("/public-ip/check")
|
||
|
|
|
||
|
|
assert response.status_code == 200
|
||
|
|
assert response.json() == {
|
||
|
|
"status": "changed",
|
||
|
|
"checked_at": "2026-04-29T12:00:00Z",
|
||
|
|
"changed": True,
|
||
|
|
}
|
||
|
|
assert "current_ipv4" not in response.text
|
||
|
|
assert "previous_ipv4" not in response.text
|
||
|
|
assert "203.0.113.10" not in response.text
|