This commit is contained in:
2025-08-29 19:17:09 +02:00
parent d630a7ea5f
commit 8a76e869cb
10 changed files with 70 additions and 44 deletions

View File

@@ -1,5 +1,6 @@
#include <PubSubClient.h>
#include <WiFiClient.h>
#include "debugutil.hpp"
#include "mqtt.h"
constexpr uint16_t BUFFER_SIZE = 2048;
@@ -26,12 +27,27 @@ void Mqtt::subscribe(const std::string& topic, MqttCallback callback) {
if (mqttClient.connected()) {
if (mqttClient.subscribe(topic.c_str())) {
callbacks[topic] = callback;
Serial.printf("Subscribed to topic: %s\n", topic.c_str());
Debug::printf("Subscribed to topic: %s\n", topic.c_str());
} else {
Serial.printf("Failed to subscribe to topic: %s\n", topic.c_str());
Debug::printf("Failed to subscribe to topic: %s\n", topic.c_str());
}
} else {
Serial.println("MQTT client is not connected. Cannot subscribe.");
Debug::println("MQTT client is not connected. Cannot subscribe.");
}
}
void Mqtt::unsubscribe(const std::string& topic) {
if (mqttClient.connected()) {
if (mqttClient.unsubscribe(topic.c_str())) {
Debug::printf("Unsubscribed from topic: %s\n", topic.c_str());
} else {
Debug::printf("Failed to unsubscribe from topic: %s\n", topic.c_str());
}
} else {
Debug::println("MQTT client is not connected. Unsubscribe skipped.");
}
if (callbacks.find(topic) != callbacks.end()) {
callbacks.erase(topic);
}
}
@@ -39,10 +55,10 @@ void Mqtt::publish(const std::string& topic, const std::string& payload, bool re
if (mqttClient.connected()) {
if (mqttClient.publish(topic.c_str(), payload.c_str(), retain)) {
} else {
Serial.printf("Failed to publish to topic: %s\n", topic.c_str(), payload.c_str());
Debug::printf("Failed to publish to topic: %s\n", topic.c_str(), payload.c_str());
}
} else {
Serial.println("MQTT client is not connected. Cannot publish.");
Debug::println("MQTT client is not connected. Cannot publish.");
}
}
@@ -50,21 +66,21 @@ void Mqtt::poll() {
if (mqttClient.connected()) {
mqttClient.loop(); // Process incoming messages
} else {
Serial.println("MQTT client is not connected. Polling skipped.");
Debug::println("MQTT client is not connected. Polling skipped.");
}
}
void Mqtt::checkConnection() {
if (!mqttClient.connected()) {
Serial.println("MQTT client is not connected. Attempting to reconnect...");
Debug::println("MQTT client is not connected. Attempting to reconnect...");
if (mqttClient.connect(Mqtt::clientId.c_str(), Mqtt::username.c_str(), Mqtt::password.c_str())) {
Serial.println("Reconnected to MQTT broker successfully.");
Debug::println("Reconnected to MQTT broker successfully.");
for (const auto& callback : Mqtt::callbacks) {
mqttClient.subscribe(callback.first.c_str());
}
Mqtt::isConnected = true;
} else {
Serial.printf("Failed to reconnect to MQTT broker, rc=%d\n", mqttClient.state());
Debug::printf("Failed to reconnect to MQTT broker, rc=%d\n", mqttClient.state());
Mqtt::isConnected = false;
}
}
@@ -82,10 +98,10 @@ void Mqtt::connect(std::string brokerIp, uint16_t brokerPort, std::string client
mqttClient.setBufferSize(BUFFER_SIZE);
if (mqttClient.connect(Mqtt::clientId.c_str(), Mqtt::username.c_str(), Mqtt::password.c_str())) {
Serial.println("Connected to MQTT broker");
Debug::println("Connected to MQTT broker");
Mqtt::initialized = true;
Mqtt::isConnected = true;
} else {
Serial.printf("Failed to connect to MQTT broker, rc=%d\n", mqttClient.state());
Debug::printf("Failed to connect to MQTT broker, rc=%d\n", mqttClient.state());
}
}