Files
home-automation/app/services/homeassistant_inbound.py
T
tliu93 35aee79d93
pytest / test (push) Successful in 43s
docker-image / build-and-push (push) Successful in 3m38s
Restore legacy poo inbound dispatch
2026-04-20 23:33:57 +02:00

117 lines
4.1 KiB
Python

from __future__ import annotations
import json
from datetime import UTC, datetime, time, timedelta
from sqlalchemy.orm import Session
from app.config import Settings
from app.integrations.homeassistant import HomeAssistantClient
from app.integrations.ticktick import TICKTICK_DATETIME_FORMAT, TickTickClient, TickTickTask
from app.schemas.homeassistant import HomeAssistantPublishEnvelope
from app.schemas.location import LocationRecordRequest
from app.schemas.ticktick import TickTickActionTaskRequest
from app.services.location import record_location
from app.services.poo import publish_latest_poo_status
class UnsupportedHomeAssistantMessage(RuntimeError):
"""Raised when the inbound gateway receives a target/action that is not supported yet."""
def handle_homeassistant_message(
session: Session,
envelope: HomeAssistantPublishEnvelope,
ticktick_client: TickTickClient | None = None,
poo_session: Session | None = None,
settings: Settings | None = None,
homeassistant_client: HomeAssistantClient | None = None,
) -> None:
if envelope.target == "location_recorder":
_handle_location_message(session, envelope)
return
if envelope.target == "poo_recorder":
_handle_poo_message(
envelope,
poo_session=poo_session,
settings=settings,
homeassistant_client=homeassistant_client,
)
return
if envelope.target == "ticktick":
_handle_ticktick_message(envelope, ticktick_client)
return
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
def _handle_location_message(session: Session, envelope: HomeAssistantPublishEnvelope) -> None:
if envelope.action != "record":
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
content = json.loads(envelope.content.replace("'", '"'))
payload = LocationRecordRequest.model_validate(content)
record_location(session, payload)
def _handle_poo_message(
envelope: HomeAssistantPublishEnvelope,
*,
poo_session: Session | None,
settings: Settings | None,
homeassistant_client: HomeAssistantClient | None,
) -> None:
if envelope.action != "get_latest":
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
if poo_session is None or settings is None or homeassistant_client is None:
raise RuntimeError("Poo recorder integration is unavailable")
publish_latest_poo_status(
session=poo_session,
settings=settings,
homeassistant_client=homeassistant_client,
)
def _handle_ticktick_message(
envelope: HomeAssistantPublishEnvelope,
ticktick_client: TickTickClient | None,
) -> None:
if envelope.action != "create_action_task":
raise UnsupportedHomeAssistantMessage(
f"Unsupported Home Assistant target/action: {envelope.target}/{envelope.action}"
)
if ticktick_client is None:
raise UnsupportedHomeAssistantMessage("TickTick client is unavailable")
content = json.loads(envelope.content.replace("'", '"'))
payload = TickTickActionTaskRequest.model_validate(content)
project_id = ticktick_client.settings.home_assistant_action_task_project_id
if not project_id:
raise RuntimeError(
"TickTick action task integration is missing HOME_ASSISTANT_ACTION_TASK_PROJECT_ID"
)
ticktick_client.create_task(
TickTickTask(
projectId=project_id,
title=payload.action,
dueDate=build_action_task_due_date(datetime.now().astimezone(), payload.due_hour),
)
)
def build_action_task_due_date(now: datetime, due_hour: int) -> str:
local_now = now.astimezone()
due = local_now + timedelta(hours=due_hour)
next_midnight = datetime.combine(due.date(), time.min, tzinfo=local_now.tzinfo) + timedelta(days=1)
return next_midnight.astimezone(UTC).strftime(TICKTICK_DATETIME_FORMAT)