108 lines
3.8 KiB
C++
108 lines
3.8 KiB
C++
#include <PubSubClient.h>
|
|
#include <WiFiClient.h>
|
|
#include "debugutil.hpp"
|
|
#include "mqtt.h"
|
|
|
|
constexpr uint16_t BUFFER_SIZE = 2048;
|
|
|
|
static WiFiClient wifiClient = WiFiClient();
|
|
static PubSubClient mqttClient = PubSubClient(wifiClient);
|
|
std::string Mqtt::brokerIp;
|
|
uint16_t Mqtt::brokerPort;
|
|
std::string Mqtt::clientId;
|
|
std::string Mqtt::username;
|
|
std::string Mqtt::password;
|
|
std::map<std::string, MqttCallback> Mqtt::callbacks;
|
|
bool Mqtt::initialized = false;
|
|
bool Mqtt::isConnected = false;
|
|
|
|
void Mqtt::mqttCb(char* topic, uint8_t* payload, unsigned int length) {
|
|
std::string topicStr(topic);
|
|
if (callbacks.find(topicStr) != callbacks.end()) {
|
|
callbacks[topicStr](payload, length);
|
|
}
|
|
}
|
|
|
|
void Mqtt::subscribe(const std::string& topic, MqttCallback callback) {
|
|
if (mqttClient.connected()) {
|
|
if (mqttClient.subscribe(topic.c_str())) {
|
|
callbacks[topic] = callback;
|
|
Debug::printf("Subscribed to topic: %s\n", topic.c_str());
|
|
} else {
|
|
Debug::printf("Failed to subscribe to topic: %s\n", topic.c_str());
|
|
}
|
|
} else {
|
|
Debug::println("MQTT client is not connected. Cannot subscribe.");
|
|
}
|
|
}
|
|
|
|
void Mqtt::unsubscribe(const std::string& topic) {
|
|
if (mqttClient.connected()) {
|
|
if (mqttClient.unsubscribe(topic.c_str())) {
|
|
Debug::printf("Unsubscribed from topic: %s\n", topic.c_str());
|
|
} else {
|
|
Debug::printf("Failed to unsubscribe from topic: %s\n", topic.c_str());
|
|
}
|
|
} else {
|
|
Debug::println("MQTT client is not connected. Unsubscribe skipped.");
|
|
}
|
|
if (callbacks.find(topic) != callbacks.end()) {
|
|
callbacks.erase(topic);
|
|
}
|
|
}
|
|
|
|
void Mqtt::publish(const std::string& topic, const std::string& payload, bool retain) {
|
|
if (mqttClient.connected()) {
|
|
if (mqttClient.publish(topic.c_str(), payload.c_str(), retain)) {
|
|
} else {
|
|
Debug::printf("Failed to publish to topic: %s\n", topic.c_str(), payload.c_str());
|
|
}
|
|
} else {
|
|
Debug::println("MQTT client is not connected. Cannot publish.");
|
|
}
|
|
}
|
|
|
|
void Mqtt::poll() {
|
|
if (mqttClient.connected()) {
|
|
mqttClient.loop(); // Process incoming messages
|
|
} else {
|
|
Debug::println("MQTT client is not connected. Polling skipped.");
|
|
}
|
|
}
|
|
|
|
void Mqtt::checkConnection() {
|
|
if (!mqttClient.connected()) {
|
|
Debug::println("MQTT client is not connected. Attempting to reconnect...");
|
|
if (mqttClient.connect(Mqtt::clientId.c_str(), Mqtt::username.c_str(), Mqtt::password.c_str())) {
|
|
Debug::println("Reconnected to MQTT broker successfully.");
|
|
for (const auto& callback : Mqtt::callbacks) {
|
|
mqttClient.subscribe(callback.first.c_str());
|
|
}
|
|
Mqtt::isConnected = true;
|
|
} else {
|
|
Debug::printf("Failed to reconnect to MQTT broker, rc=%d\n", mqttClient.state());
|
|
Mqtt::isConnected = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
void Mqtt::connect(std::string brokerIp, uint16_t brokerPort, std::string clientId, std::string username, std::string password) {
|
|
Mqtt::brokerIp = brokerIp;
|
|
Mqtt::brokerPort = brokerPort;
|
|
Mqtt::clientId = clientId;
|
|
Mqtt::username = username;
|
|
Mqtt::password = password;
|
|
mqttClient.setServer(Mqtt::brokerIp.c_str(), Mqtt::brokerPort);
|
|
mqttClient.setKeepAlive(60);
|
|
mqttClient.setCallback(mqttCb);
|
|
mqttClient.setBufferSize(BUFFER_SIZE);
|
|
|
|
if (mqttClient.connect(Mqtt::clientId.c_str(), Mqtt::username.c_str(), Mqtt::password.c_str())) {
|
|
Debug::println("Connected to MQTT broker");
|
|
Mqtt::initialized = true;
|
|
Mqtt::isConnected = true;
|
|
} else {
|
|
Debug::printf("Failed to connect to MQTT broker, rc=%d\n", mqttClient.state());
|
|
}
|
|
}
|