MqttClient.cpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. #include <stddef.h>
  2. #include <MqttClient.h>
  3. #include <TokenIterator.h>
  4. #include <UrlTokenBindings.h>
  5. #include <IntParsing.h>
  6. #include <ArduinoJson.h>
  7. #include <WiFiClient.h>
  8. #include <MiLightRadioConfig.h>
  9. #include <AboutHelper.h>
  10. static const char* STATUS_CONNECTED = "connected";
  11. static const char* STATUS_DISCONNECTED = "disconnected_clean";
  12. static const char* STATUS_LWT_DISCONNECTED = "disconnected_unclean";
  13. MqttClient::MqttClient(Settings& settings, MiLightClient*& milightClient)
  14. : mqttClient(tcpClient),
  15. milightClient(milightClient),
  16. settings(settings),
  17. lastConnectAttempt(0),
  18. connected(false)
  19. {
  20. String strDomain = settings.mqttServer();
  21. this->domain = new char[strDomain.length() + 1];
  22. strcpy(this->domain, strDomain.c_str());
  23. }
  24. MqttClient::~MqttClient() {
  25. String aboutStr = generateConnectionStatusMessage(STATUS_DISCONNECTED);
  26. mqttClient.publish(settings.mqttClientStatusTopic.c_str(), aboutStr.c_str(), true);
  27. mqttClient.disconnect();
  28. delete this->domain;
  29. }
  30. void MqttClient::onConnect(OnConnectFn fn) {
  31. this->onConnectFn = fn;
  32. }
  33. void MqttClient::begin() {
  34. #ifdef MQTT_DEBUG
  35. printf_P(
  36. PSTR("MqttClient - Connecting to: %s\nparsed:%s:%u\n"),
  37. settings._mqttServer.c_str(),
  38. settings.mqttServer().c_str(),
  39. settings.mqttPort()
  40. );
  41. #endif
  42. mqttClient.setServer(this->domain, settings.mqttPort());
  43. mqttClient.setCallback(
  44. [this](char* topic, byte* payload, int length) {
  45. this->publishCallback(topic, payload, length);
  46. }
  47. );
  48. reconnect();
  49. }
  50. bool MqttClient::connect() {
  51. char nameBuffer[30];
  52. sprintf_P(nameBuffer, PSTR("milight-hub-%u"), ESP.getChipId());
  53. #ifdef MQTT_DEBUG
  54. Serial.println(F("MqttClient - connecting"));
  55. #endif
  56. if (settings.mqttUsername.length() > 0 && settings.mqttClientStatusTopic.length() > 0) {
  57. return mqttClient.connect(
  58. nameBuffer,
  59. settings.mqttUsername.c_str(),
  60. settings.mqttPassword.c_str(),
  61. settings.mqttClientStatusTopic.c_str(),
  62. 2,
  63. true,
  64. generateConnectionStatusMessage(STATUS_LWT_DISCONNECTED).c_str()
  65. );
  66. } else if (settings.mqttUsername.length() > 0) {
  67. return mqttClient.connect(
  68. nameBuffer,
  69. settings.mqttUsername.c_str(),
  70. settings.mqttPassword.c_str()
  71. );
  72. } else if (settings.mqttClientStatusTopic.length() > 0) {
  73. return mqttClient.connect(
  74. nameBuffer,
  75. settings.mqttClientStatusTopic.c_str(),
  76. 2,
  77. true,
  78. generateConnectionStatusMessage(STATUS_LWT_DISCONNECTED).c_str()
  79. );
  80. } else {
  81. return mqttClient.connect(nameBuffer);
  82. }
  83. }
  84. void MqttClient::sendBirthMessage() {
  85. if (settings.mqttClientStatusTopic.length() > 0) {
  86. String aboutStr = generateConnectionStatusMessage(STATUS_CONNECTED);
  87. mqttClient.publish(settings.mqttClientStatusTopic.c_str(), aboutStr.c_str(), true);
  88. }
  89. }
  90. void MqttClient::reconnect() {
  91. if (lastConnectAttempt > 0 && (millis() - lastConnectAttempt) < MQTT_CONNECTION_ATTEMPT_FREQUENCY) {
  92. return;
  93. }
  94. if (! mqttClient.connected()) {
  95. if (connect()) {
  96. subscribe();
  97. sendBirthMessage();
  98. #ifdef MQTT_DEBUG
  99. Serial.println(F("MqttClient - Successfully connected to MQTT server"));
  100. #endif
  101. } else {
  102. Serial.println(F("ERROR: Failed to connect to MQTT server"));
  103. }
  104. }
  105. lastConnectAttempt = millis();
  106. }
  107. void MqttClient::handleClient() {
  108. reconnect();
  109. mqttClient.loop();
  110. if (!connected && mqttClient.connected()) {
  111. this->connected = true;
  112. this->onConnectFn();
  113. } else if (!mqttClient.connected()) {
  114. this->connected = false;
  115. }
  116. }
  117. void MqttClient::sendUpdate(const MiLightRemoteConfig& remoteConfig, uint16_t deviceId, uint16_t groupId, const char* update) {
  118. publish(settings.mqttUpdateTopicPattern, remoteConfig, deviceId, groupId, update);
  119. }
  120. void MqttClient::sendState(const MiLightRemoteConfig& remoteConfig, uint16_t deviceId, uint16_t groupId, const char* update) {
  121. publish(settings.mqttStateTopicPattern, remoteConfig, deviceId, groupId, update, true);
  122. }
  123. void MqttClient::subscribe() {
  124. String topic = settings.mqttTopicPattern;
  125. topic.replace(":device_id", "+");
  126. topic.replace(":hex_device_id", "+");
  127. topic.replace(":dec_device_id", "+");
  128. topic.replace(":group_id", "+");
  129. topic.replace(":device_type", "+");
  130. topic.replace(":device_alias", "+");
  131. #ifdef MQTT_DEBUG
  132. printf_P(PSTR("MqttClient - subscribing to topic: %s\n"), topic.c_str());
  133. #endif
  134. mqttClient.subscribe(topic.c_str());
  135. }
  136. void MqttClient::send(const char* topic, const char* message, const bool retain) {
  137. size_t len = strlen(message);
  138. size_t topicLen = strlen(topic);
  139. if ((topicLen + len + 10) < MQTT_MAX_PACKET_SIZE ) {
  140. mqttClient.publish(topic, message, retain);
  141. } else {
  142. const uint8_t* messageBuffer = reinterpret_cast<const uint8_t*>(message);
  143. mqttClient.beginPublish(topic, len, retain);
  144. #ifdef MQTT_DEBUG
  145. Serial.printf_P(PSTR("Printing message in parts because it's too large for the packet buffer (%d bytes)"), len);
  146. #endif
  147. for (size_t i = 0; i < len; i += MQTT_PACKET_CHUNK_SIZE) {
  148. size_t toWrite = std::min(static_cast<size_t>(MQTT_PACKET_CHUNK_SIZE), len - i);
  149. mqttClient.write(messageBuffer+i, toWrite);
  150. #ifdef MQTT_DEBUG
  151. Serial.printf_P(PSTR(" Wrote %d bytes\n"), toWrite);
  152. #endif
  153. }
  154. mqttClient.endPublish();
  155. }
  156. }
  157. void MqttClient::publish(
  158. const String& _topic,
  159. const MiLightRemoteConfig &remoteConfig,
  160. uint16_t deviceId,
  161. uint16_t groupId,
  162. const char* message,
  163. const bool retain
  164. ) {
  165. if (_topic.length() == 0) {
  166. return;
  167. }
  168. BulbId bulbId(deviceId, groupId, remoteConfig.type);
  169. String topic = bindTopicString(_topic, bulbId);
  170. #ifdef MQTT_DEBUG
  171. printf("MqttClient - publishing update to %s\n", topic.c_str());
  172. #endif
  173. send(topic.c_str(), message, retain);
  174. }
  175. void MqttClient::publishCallback(char* topic, byte* payload, int length) {
  176. uint16_t deviceId = 0;
  177. uint8_t groupId = 0;
  178. const MiLightRemoteConfig* config = &FUT092Config;
  179. char cstrPayload[length + 1];
  180. cstrPayload[length] = 0;
  181. memcpy(cstrPayload, payload, sizeof(byte)*length);
  182. #ifdef MQTT_DEBUG
  183. printf("MqttClient - Got message on topic: %s\n%s\n", topic, cstrPayload);
  184. #endif
  185. char topicPattern[settings.mqttTopicPattern.length()];
  186. strcpy(topicPattern, settings.mqttTopicPattern.c_str());
  187. TokenIterator patternIterator(topicPattern, settings.mqttTopicPattern.length(), '/');
  188. TokenIterator topicIterator(topic, strlen(topic), '/');
  189. UrlTokenBindings tokenBindings(patternIterator, topicIterator);
  190. if (tokenBindings.hasBinding("device_alias")) {
  191. String alias = tokenBindings.get("device_alias");
  192. auto itr = settings.groupIdAliases.find(alias);
  193. if (itr == settings.groupIdAliases.end()) {
  194. Serial.printf_P(PSTR("MqttClient - WARNING: could not find device alias: `%s'. Ignoring packet.\n"), alias.c_str());
  195. return;
  196. } else {
  197. BulbId bulbId = itr->second;
  198. deviceId = bulbId.deviceId;
  199. config = MiLightRemoteConfig::fromType(bulbId.deviceType);
  200. groupId = bulbId.groupId;
  201. }
  202. } else {
  203. if (tokenBindings.hasBinding("device_id")) {
  204. deviceId = parseInt<uint16_t>(tokenBindings.get("device_id"));
  205. } else if (tokenBindings.hasBinding("hex_device_id")) {
  206. deviceId = parseInt<uint16_t>(tokenBindings.get("hex_device_id"));
  207. } else if (tokenBindings.hasBinding("dec_device_id")) {
  208. deviceId = parseInt<uint16_t>(tokenBindings.get("dec_device_id"));
  209. }
  210. if (tokenBindings.hasBinding("group_id")) {
  211. groupId = parseInt<uint16_t>(tokenBindings.get("group_id"));
  212. }
  213. if (tokenBindings.hasBinding("device_type")) {
  214. config = MiLightRemoteConfig::fromType(tokenBindings.get("device_type"));
  215. } else {
  216. Serial.println(F("MqttClient - WARNING: could not find device_type token. Defaulting to FUT092.\n"));
  217. }
  218. }
  219. if (config == NULL) {
  220. Serial.println(F("MqttClient - ERROR: unknown device_type specified"));
  221. return;
  222. }
  223. StaticJsonDocument<400> buffer;
  224. deserializeJson(buffer, cstrPayload);
  225. JsonObject obj = buffer.as<JsonObject>();
  226. #ifdef MQTT_DEBUG
  227. printf("MqttClient - device %04X, group %u\n", deviceId, groupId);
  228. #endif
  229. milightClient->prepare(config, deviceId, groupId);
  230. milightClient->update(obj);
  231. }
  232. String MqttClient::bindTopicString(const String& topicPattern, const BulbId& bulbId) {
  233. String boundTopic = topicPattern;
  234. String deviceIdHex = bulbId.getHexDeviceId();
  235. boundTopic.replace(":device_id", deviceIdHex);
  236. boundTopic.replace(":hex_device_id", deviceIdHex);
  237. boundTopic.replace(":dec_device_id", String(bulbId.deviceId));
  238. boundTopic.replace(":group_id", String(bulbId.groupId));
  239. boundTopic.replace(":device_type", MiLightRemoteTypeHelpers::remoteTypeToString(bulbId.deviceType));
  240. auto it = settings.findAlias(bulbId.deviceType, bulbId.deviceId, bulbId.groupId);
  241. if (it != settings.groupIdAliases.end()) {
  242. boundTopic.replace(":device_alias", it->first);
  243. } else {
  244. boundTopic.replace(":device_alias", "__unnamed_group");
  245. }
  246. return boundTopic;
  247. }
  248. String MqttClient::generateConnectionStatusMessage(const char* connectionStatus) {
  249. if (settings.simpleMqttClientStatus) {
  250. // Don't expand disconnect type for simple status
  251. if (0 == strcmp(connectionStatus, STATUS_CONNECTED)) {
  252. return connectionStatus;
  253. } else {
  254. return "disconnected";
  255. }
  256. } else {
  257. StaticJsonDocument<1024> json;
  258. json["status"] = connectionStatus;
  259. // Fill other fields
  260. AboutHelper::generateAboutObject(json, true);
  261. String response;
  262. serializeJson(json, response);
  263. return response;
  264. }
  265. }