Files
home-automation/app/services/public_ip.py
T

192 lines
5.7 KiB
Python
Raw Normal View History

2026-04-29 11:45:49 +02:00
from __future__ import annotations
import ipaddress
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Callable, Literal
import httpx
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
2026-04-29 11:45:49 +02:00
from app.models.public_ip import PublicIPHistory, PublicIPState
from app.services.config_page import build_runtime_settings
from app.services.email import EmailConfigurationError, EmailDeliveryError, send_public_ip_changed_email
2026-04-29 11:45:49 +02:00
logger = logging.getLogger(__name__)
PUBLIC_IP_PROVIDER_NAME = "ipify"
PUBLIC_IP_PROVIDER_URL = "https://api.ipify.org"
PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS = 5.0
PublicIPResultStatus = Literal["first_seen", "unchanged", "changed", "error"]
PublicIPv4Fetcher = Callable[[], str]
class PublicIPCheckError(RuntimeError):
"""Raised when the public IPv4 provider cannot return a valid IPv4."""
@dataclass(slots=True)
class PublicIPCheckResult:
status: PublicIPResultStatus
checked_at: datetime
changed: bool
previous_ipv4: str | None = None
current_ipv4: str | None = None
2026-04-29 11:45:49 +02:00
def check_public_ipv4(
session: Session,
*,
fetch_public_ipv4: PublicIPv4Fetcher | None = None,
provider_name: str = PUBLIC_IP_PROVIDER_NAME,
) -> PublicIPCheckResult:
checked_at = _utc_now()
state = session.scalar(select(PublicIPState).where(PublicIPState.id == 1).limit(1))
try:
raw_ipv4 = (fetch_public_ipv4 or fetch_public_ipv4_from_provider)()
current_ipv4 = _validate_ipv4(raw_ipv4)
except PublicIPCheckError as exc:
logger.warning("Public IPv4 check failed: %s", exc)
if state is not None:
state.last_checked_at = checked_at
state.last_check_status = "error"
state.last_check_error = str(exc)
state.last_provider = provider_name
session.commit()
return PublicIPCheckResult(status="error", checked_at=checked_at, changed=False)
if state is None:
state = PublicIPState(
id=1,
current_ipv4=current_ipv4,
previous_ipv4=None,
first_seen_at=checked_at,
last_checked_at=checked_at,
last_changed_at=None,
last_check_status="first_seen",
last_check_error=None,
last_provider=provider_name,
)
session.add(state)
session.add(
PublicIPHistory(
ipv4=current_ipv4,
observed_at=checked_at,
change_type="first_seen",
provider=provider_name,
)
)
session.commit()
return PublicIPCheckResult(
status="first_seen",
checked_at=checked_at,
changed=False,
current_ipv4=current_ipv4,
)
2026-04-29 11:45:49 +02:00
if state.current_ipv4 == current_ipv4:
state.last_checked_at = checked_at
state.last_check_status = "unchanged"
state.last_check_error = None
state.last_provider = provider_name
session.commit()
return PublicIPCheckResult(
status="unchanged",
checked_at=checked_at,
changed=False,
current_ipv4=current_ipv4,
)
2026-04-29 11:45:49 +02:00
previous_ipv4 = state.current_ipv4
state.previous_ipv4 = previous_ipv4
2026-04-29 11:45:49 +02:00
state.current_ipv4 = current_ipv4
state.last_checked_at = checked_at
state.last_changed_at = checked_at
state.last_check_status = "changed"
state.last_check_error = None
state.last_provider = provider_name
session.add(
PublicIPHistory(
ipv4=current_ipv4,
observed_at=checked_at,
change_type="changed",
provider=provider_name,
)
)
session.commit()
return PublicIPCheckResult(
status="changed",
checked_at=checked_at,
changed=True,
previous_ipv4=previous_ipv4,
current_ipv4=current_ipv4,
)
def check_public_ipv4_and_notify(
session: Session,
*,
bootstrap_settings: Settings,
fetch_public_ipv4: PublicIPv4Fetcher | None = None,
provider_name: str = PUBLIC_IP_PROVIDER_NAME,
) -> PublicIPCheckResult:
result = check_public_ipv4(
session,
fetch_public_ipv4=fetch_public_ipv4,
provider_name=provider_name,
)
if result.status != "changed" or result.previous_ipv4 is None or result.current_ipv4 is None:
return result
runtime_settings = build_runtime_settings(session, bootstrap_settings)
try:
send_public_ip_changed_email(
runtime_settings,
previous_ipv4=result.previous_ipv4,
current_ipv4=result.current_ipv4,
detected_at=result.checked_at,
)
except (EmailConfigurationError, EmailDeliveryError) as exc:
logger.warning("Public IPv4 change notification failed: %s", exc)
return result
2026-04-29 11:45:49 +02:00
def fetch_public_ipv4_from_provider() -> str:
try:
response = httpx.get(
PUBLIC_IP_PROVIDER_URL,
params={"format": "text"},
timeout=PUBLIC_IP_PROVIDER_TIMEOUT_SECONDS,
)
response.raise_for_status()
except httpx.HTTPError as exc:
raise PublicIPCheckError(f"provider request failed: {exc}") from exc
return response.text.strip()
def _validate_ipv4(raw_value: str) -> str:
if not raw_value:
raise PublicIPCheckError("provider returned an empty response")
try:
parsed = ipaddress.ip_address(raw_value)
except ValueError as exc:
raise PublicIPCheckError("provider returned an invalid IPv4 value") from exc
if parsed.version != 4:
raise PublicIPCheckError("provider returned a non-IPv4 value")
return str(parsed)
def _utc_now() -> datetime:
return datetime.now(UTC)