109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
from urllib import error, parse, request
|
|
|
|
from app.config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
SUCCESS_STATUS_CODES = {200, 201}
|
|
|
|
|
|
class HomeAssistantConfigError(RuntimeError):
|
|
"""Raised when required Home Assistant outbound configuration is missing."""
|
|
|
|
|
|
class HomeAssistantRequestError(RuntimeError):
|
|
"""Raised when a Home Assistant outbound HTTP request fails."""
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class HomeAssistantClient:
|
|
settings: Settings
|
|
timeout_seconds: float | None = field(default=None)
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.timeout_seconds is None:
|
|
self.timeout_seconds = self.settings.home_assistant_timeout_seconds
|
|
|
|
def is_configured(self) -> bool:
|
|
return bool(self.settings.home_assistant_base_url and self.settings.home_assistant_auth_token)
|
|
|
|
def publish_sensor(
|
|
self,
|
|
*,
|
|
entity_id: str,
|
|
state: str,
|
|
attributes: dict[str, Any] | None = None,
|
|
) -> None:
|
|
self._require_config()
|
|
if not entity_id:
|
|
raise ValueError("entity_id must not be empty")
|
|
|
|
payload = {
|
|
"entity_id": entity_id,
|
|
"state": state,
|
|
"attributes": attributes or {},
|
|
}
|
|
self._post_json(f"/api/states/{entity_id}", payload, operation="publish_sensor")
|
|
|
|
def trigger_webhook(self, *, webhook_id: str, body: Any) -> None:
|
|
self._require_config()
|
|
if not webhook_id:
|
|
raise ValueError("webhook_id must not be empty")
|
|
|
|
self._post_json(f"/api/webhook/{webhook_id}", body, operation="trigger_webhook")
|
|
|
|
def _require_config(self) -> None:
|
|
if self.is_configured():
|
|
return
|
|
raise HomeAssistantConfigError(
|
|
"Home Assistant outbound integration is not configured. "
|
|
"Set HOME_ASSISTANT_BASE_URL and HOME_ASSISTANT_AUTH_TOKEN."
|
|
)
|
|
|
|
def _post_json(self, path: str, payload: Any, *, operation: str) -> None:
|
|
url = self._build_url(path)
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = request.Request(url, data=body, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
req.add_header("Authorization", f"Bearer {self.settings.home_assistant_auth_token}")
|
|
|
|
try:
|
|
with request.urlopen(req, timeout=self.timeout_seconds) as response:
|
|
status_code = response.getcode()
|
|
except error.HTTPError as exc:
|
|
logger.warning(
|
|
"Home Assistant outbound %s failed with HTTP %s for %s",
|
|
operation,
|
|
exc.code,
|
|
url,
|
|
)
|
|
raise HomeAssistantRequestError(
|
|
f"Home Assistant outbound {operation} failed with HTTP {exc.code}"
|
|
) from exc
|
|
except error.URLError as exc:
|
|
logger.warning("Home Assistant outbound %s failed for %s: %s", operation, url, exc)
|
|
raise HomeAssistantRequestError(
|
|
f"Home Assistant outbound {operation} failed to reach Home Assistant"
|
|
) from exc
|
|
|
|
if status_code not in SUCCESS_STATUS_CODES:
|
|
logger.warning(
|
|
"Home Assistant outbound %s returned unexpected status %s for %s",
|
|
operation,
|
|
status_code,
|
|
url,
|
|
)
|
|
raise HomeAssistantRequestError(
|
|
f"Home Assistant outbound {operation} returned unexpected status {status_code}"
|
|
)
|
|
|
|
def _build_url(self, path: str) -> str:
|
|
base_url = self.settings.home_assistant_base_url.rstrip("/")
|
|
quoted_path = parse.quote(path.lstrip("/"), safe="/")
|
|
return f"{base_url}/{quoted_path}"
|