import queue from dataclasses import dataclass from fastapi_mqtt import FastMQTT, MQTTConfig @dataclass class MQTTSubscription: topic: str callback: callable subscribed: bool @dataclass class MQTTPendingMessage: topic: str payload: dict retain: bool class MQTT: _instance = None def __new__(cls, *args, **kwargs): # noqa: ANN002, ANN003, ANN204 if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance def __init__(self) -> None: self._mqtt_config = MQTTConfig(username="mqtt", password="mqtt", reconnect_retries=-1) # noqa: S106 self._mqtt = FastMQTT(config=self._mqtt_config, client_id="home_automation_backend") self._mqtt.mqtt_handlers.user_connect_handler = self.on_connect self._mqtt.mqtt_handlers.user_message_handler = self.on_message self._connected = False self._subscribed_topic: dict[str, MQTTSubscription] = {} self._queued_message: queue.Queue[MQTTPendingMessage] = queue.Queue() async def start(self) -> None: print("MQTT Starting...") await self._mqtt.mqtt_startup() async def stop(self) -> None: print("MQTT Stopping...") await self._mqtt.mqtt_shutdown() def on_connect(self, client, flags, rc, properties) -> None: # noqa: ANN001, ARG002 print("Connected") self._connected = True while not self._queued_message.empty(): msg = self._queued_message.get(block=False) self.publish(msg.topic, msg.payload, retain=msg.retain) for topic, subscription in self._subscribed_topic.items(): if subscription.subscribed is False: self.subscribe(topic, subscription.callback) async def on_message(self, client, topic: str, payload: bytes, qos: int, properties: any) -> any: # noqa: ANN001, ARG002 print("On message") if topic in self._subscribed_topic and self._subscribed_topic[topic].callback is not None: await self._subscribed_topic[topic].callback(payload) def subscribe(self, topic: str, callback: callable) -> None: if self._connected: print("Subscribe to topic: ", topic) self._mqtt.client.subscribe(topic) self._subscribed_topic[topic] = MQTTSubscription(topic, callback, subscribed=True) else: self._subscribed_topic[topic] = MQTTSubscription(topic, callback, subscribed=False) def publish(self, topic: str, payload: dict, *, retain: bool) -> None: if self._connected: self._mqtt.publish(topic, payload=payload, retain=retain) else: self._queued_message.put(MQTTPendingMessage(topic, payload, retain=retain))