from __future__ import annotations import ipaddress import logging from dataclasses import dataclass from datetime import UTC, datetime from typing import Callable, Literal import httpx from sqlalchemy import select from sqlalchemy.orm import Session from app.config import Settings from app.models.public_ip import PublicIPHistory, PublicIPState from app.services.config_page import build_runtime_settings from app.services.email import EmailConfigurationError, EmailDeliveryError, send_public_ip_changed_email logger = logging.getLogger(__name__) PUBLIC_IP_PROVIDER_NAME = "ipify" PUBLIC_IP_PROVIDER_URL = "https://api.ipify.org" PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS = 5.0 PublicIPResultStatus = Literal["first_seen", "unchanged", "changed", "error"] PublicIPv4Fetcher = Callable[[], str] class PublicIPCheckError(RuntimeError): """Raised when the public IPv4 provider cannot return a valid IPv4.""" @dataclass(slots=True) class PublicIPCheckResult: status: PublicIPResultStatus checked_at: datetime changed: bool previous_ipv4: str | None = None current_ipv4: str | None = None def check_public_ipv4( session: Session, *, fetch_public_ipv4: PublicIPv4Fetcher | None = None, provider_name: str = PUBLIC_IP_PROVIDER_NAME, ) -> PublicIPCheckResult: checked_at = _utc_now() state = session.scalar(select(PublicIPState).where(PublicIPState.id == 1).limit(1)) try: raw_ipv4 = (fetch_public_ipv4 or fetch_public_ipv4_from_provider)() current_ipv4 = _validate_ipv4(raw_ipv4) except PublicIPCheckError as exc: logger.warning("Public IPv4 check failed: %s", exc) if state is not None: state.last_checked_at = checked_at state.last_check_status = "error" state.last_check_error = str(exc) state.last_provider = provider_name session.commit() return PublicIPCheckResult(status="error", checked_at=checked_at, changed=False) if state is None: state = PublicIPState( id=1, current_ipv4=current_ipv4, previous_ipv4=None, first_seen_at=checked_at, last_checked_at=checked_at, last_changed_at=None, last_check_status="first_seen", last_check_error=None, last_provider=provider_name, ) session.add(state) session.add( PublicIPHistory( ipv4=current_ipv4, observed_at=checked_at, change_type="first_seen", provider=provider_name, ) ) session.commit() return PublicIPCheckResult( status="first_seen", checked_at=checked_at, changed=False, current_ipv4=current_ipv4, ) if state.current_ipv4 == current_ipv4: state.last_checked_at = checked_at state.last_check_status = "unchanged" state.last_check_error = None state.last_provider = provider_name session.commit() return PublicIPCheckResult( status="unchanged", checked_at=checked_at, changed=False, current_ipv4=current_ipv4, ) previous_ipv4 = state.current_ipv4 state.previous_ipv4 = previous_ipv4 state.current_ipv4 = current_ipv4 state.last_checked_at = checked_at state.last_changed_at = checked_at state.last_check_status = "changed" state.last_check_error = None state.last_provider = provider_name session.add( PublicIPHistory( ipv4=current_ipv4, observed_at=checked_at, change_type="changed", provider=provider_name, ) ) session.commit() return PublicIPCheckResult( status="changed", checked_at=checked_at, changed=True, previous_ipv4=previous_ipv4, current_ipv4=current_ipv4, ) def check_public_ipv4_and_notify( session: Session, *, bootstrap_settings: Settings, fetch_public_ipv4: PublicIPv4Fetcher | None = None, provider_name: str = PUBLIC_IP_PROVIDER_NAME, ) -> PublicIPCheckResult: result = check_public_ipv4( session, fetch_public_ipv4=fetch_public_ipv4, provider_name=provider_name, ) if result.status != "changed" or result.previous_ipv4 is None or result.current_ipv4 is None: return result runtime_settings = build_runtime_settings(session, bootstrap_settings) try: send_public_ip_changed_email( runtime_settings, previous_ipv4=result.previous_ipv4, current_ipv4=result.current_ipv4, detected_at=result.checked_at, ) except (EmailConfigurationError, EmailDeliveryError) as exc: logger.warning("Public IPv4 change notification failed: %s", exc) return result def fetch_public_ipv4_from_provider() -> str: try: response = httpx.get( PUBLIC_IP_PROVIDER_URL, params={"format": "text"}, timeout=PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS, ) response.raise_for_status() except httpx.HTTPError as exc: raise PublicIPCheckError(f"provider request failed: {exc}") from exc return response.text.strip() def _validate_ipv4(raw_value: str) -> str: if not raw_value: raise PublicIPCheckError("provider returned an empty response") try: parsed = ipaddress.ip_address(raw_value) except ValueError as exc: raise PublicIPCheckError("provider returned an invalid IPv4 value") from exc if parsed.version != 4: raise PublicIPCheckError("provider returned a non-IPv4 value") return str(parsed) def _utc_now() -> datetime: return datetime.now(UTC)