Files
home-automation/app/services/homeassistant_inbound.py
T

36 lines
1.2 KiB
Python

from __future__ import annotations
import json
from sqlalchemy.orm import Session
from app.schemas.homeassistant import HomeAssistantPublishEnvelope
from app.schemas.location import LocationRecordRequest
from app.services.location import record_location
class UnsupportedHomeAssistantMessage(RuntimeError):
"""Raised when the inbound gateway receives a target/action that is not supported yet."""
def handle_homeassistant_message(
session: Session, envelope: HomeAssistantPublishEnvelope
) -> None:
if envelope.target == "location_recorder":
_handle_location_message(session, envelope)
return
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
def _handle_location_message(session: Session, envelope: HomeAssistantPublishEnvelope) -> None:
if envelope.action != "record":
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
content = json.loads(envelope.content.replace("'", '"'))
payload = LocationRecordRequest.model_validate(content)
record_location(session, payload)