Files
smart-rgb-esp32/lib/communication/mqtt.cpp

108 lines
3.8 KiB
C++
Raw Permalink Normal View History

2025-08-03 00:34:05 +02:00
#include <PubSubClient.h>
#include <WiFiClient.h>
2025-08-29 19:17:09 +02:00
#include "debugutil.hpp"
2025-08-03 00:34:05 +02:00
#include "mqtt.h"
constexpr uint16_t BUFFER_SIZE = 2048;
static WiFiClient wifiClient = WiFiClient();
static PubSubClient mqttClient = PubSubClient(wifiClient);
std::string Mqtt::brokerIp;
uint16_t Mqtt::brokerPort;
std::string Mqtt::clientId;
std::string Mqtt::username;
std::string Mqtt::password;
std::map<std::string, MqttCallback> Mqtt::callbacks;
bool Mqtt::initialized = false;
bool Mqtt::isConnected = false;
void Mqtt::mqttCb(char* topic, uint8_t* payload, unsigned int length) {
std::string topicStr(topic);
if (callbacks.find(topicStr) != callbacks.end()) {
callbacks[topicStr](payload, length);
}
}
void Mqtt::subscribe(const std::string& topic, MqttCallback callback) {
if (mqttClient.connected()) {
if (mqttClient.subscribe(topic.c_str())) {
callbacks[topic] = callback;
2025-08-29 19:17:09 +02:00
Debug::printf("Subscribed to topic: %s\n", topic.c_str());
2025-08-03 00:34:05 +02:00
} else {
2025-08-29 19:17:09 +02:00
Debug::printf("Failed to subscribe to topic: %s\n", topic.c_str());
2025-08-03 00:34:05 +02:00
}
} else {
2025-08-29 19:17:09 +02:00
Debug::println("MQTT client is not connected. Cannot subscribe.");
}
}
void Mqtt::unsubscribe(const std::string& topic) {
if (mqttClient.connected()) {
if (mqttClient.unsubscribe(topic.c_str())) {
Debug::printf("Unsubscribed from topic: %s\n", topic.c_str());
} else {
Debug::printf("Failed to unsubscribe from topic: %s\n", topic.c_str());
}
} else {
Debug::println("MQTT client is not connected. Unsubscribe skipped.");
}
if (callbacks.find(topic) != callbacks.end()) {
callbacks.erase(topic);
2025-08-03 00:34:05 +02:00
}
}
void Mqtt::publish(const std::string& topic, const std::string& payload, bool retain) {
if (mqttClient.connected()) {
if (mqttClient.publish(topic.c_str(), payload.c_str(), retain)) {
} else {
2025-08-29 19:17:09 +02:00
Debug::printf("Failed to publish to topic: %s\n", topic.c_str(), payload.c_str());
2025-08-03 00:34:05 +02:00
}
} else {
2025-08-29 19:17:09 +02:00
Debug::println("MQTT client is not connected. Cannot publish.");
2025-08-03 00:34:05 +02:00
}
}
void Mqtt::poll() {
if (mqttClient.connected()) {
mqttClient.loop(); // Process incoming messages
} else {
2025-08-29 19:17:09 +02:00
Debug::println("MQTT client is not connected. Polling skipped.");
2025-08-03 00:34:05 +02:00
}
}
void Mqtt::checkConnection() {
if (!mqttClient.connected()) {
2025-08-29 19:17:09 +02:00
Debug::println("MQTT client is not connected. Attempting to reconnect...");
2025-08-03 00:34:05 +02:00
if (mqttClient.connect(Mqtt::clientId.c_str(), Mqtt::username.c_str(), Mqtt::password.c_str())) {
2025-08-29 19:17:09 +02:00
Debug::println("Reconnected to MQTT broker successfully.");
2025-08-03 00:34:05 +02:00
for (const auto& callback : Mqtt::callbacks) {
mqttClient.subscribe(callback.first.c_str());
}
Mqtt::isConnected = true;
} else {
2025-08-29 19:17:09 +02:00
Debug::printf("Failed to reconnect to MQTT broker, rc=%d\n", mqttClient.state());
2025-08-03 00:34:05 +02:00
Mqtt::isConnected = false;
}
}
}
void Mqtt::connect(std::string brokerIp, uint16_t brokerPort, std::string clientId, std::string username, std::string password) {
Mqtt::brokerIp = brokerIp;
Mqtt::brokerPort = brokerPort;
Mqtt::clientId = clientId;
Mqtt::username = username;
Mqtt::password = password;
mqttClient.setServer(Mqtt::brokerIp.c_str(), Mqtt::brokerPort);
mqttClient.setKeepAlive(60);
mqttClient.setCallback(mqttCb);
mqttClient.setBufferSize(BUFFER_SIZE);
if (mqttClient.connect(Mqtt::clientId.c_str(), Mqtt::username.c_str(), Mqtt::password.c_str())) {
2025-08-29 19:17:09 +02:00
Debug::println("Connected to MQTT broker");
2025-08-03 00:34:05 +02:00
Mqtt::initialized = true;
Mqtt::isConnected = true;
} else {
2025-08-29 19:17:09 +02:00
Debug::printf("Failed to connect to MQTT broker, rc=%d\n", mqttClient.state());
2025-08-03 00:34:05 +02:00
}
}