Explorar el Código

implement state updates rate limiting

Chris Mullins hace 8 años
padre
commit
8db6253332

+ 50 - 0
lib/MQTT/BulbStateUpdater.cpp

@@ -0,0 +1,50 @@
+#include <BulbStateUpdater.h>
+
+BulbStateUpdater::BulbStateUpdater(Settings& settings, MqttClient& mqttClient, GroupStateStore& stateStore)
+  : settings(settings),
+    mqttClient(mqttClient),
+    stateStore(stateStore),
+    lastFlush(0)
+{ }
+
+void BulbStateUpdater::enqueueUpdate(GroupId groupId, GroupState& groupState) {
+  // If can flush immediately, do so (avoids lookup of group state later).
+  if (canFlush()) {
+    flushGroup(groupId, groupState);
+  } else {
+    staleGroups.push(groupId);
+  }
+}
+
+void BulbStateUpdater::loop() {
+  while (canFlush() && staleGroups.size() > 0) {
+    GroupId groupId = staleGroups.shift();
+    GroupState& groupState = stateStore.get(groupId);
+
+    if (groupState.isMqttDirty()) {
+      flushGroup(groupId, groupState);
+      groupState.clearMqttDirty();
+    }
+  }
+}
+
+inline void BulbStateUpdater::flushGroup(GroupId groupId, GroupState& state) {
+  char buffer[200];
+  StaticJsonBuffer<200> jsonBuffer;
+  JsonObject& message = jsonBuffer.createObject();
+  state.applyState(message);
+  message.printTo(buffer);
+
+  mqttClient.sendState(
+    *MiLightRemoteConfig::fromType(groupId.deviceType),
+    groupId.deviceId,
+    groupId.groupId,
+    buffer
+  );
+
+  lastFlush = millis();
+}
+
+inline bool BulbStateUpdater::canFlush() const {
+  return (millis() > (lastFlush + settings.mqttStateRateLimit));
+}

+ 31 - 0
lib/MQTT/BulbStateUpdater.h

@@ -0,0 +1,31 @@
+/**
+ * Enqueues updated bulb states and flushes them at the configured interval.
+ */
+
+#include <stddef.h>
+#include <MqttClient.h>
+#include <CircularBuffer.h>
+#include <Settings.h>
+
+#ifndef BULB_STATE_UPDATER
+#define BULB_STATE_UPDATER
+
+class BulbStateUpdater {
+public:
+  BulbStateUpdater(Settings& settings, MqttClient& mqttClient, GroupStateStore& stateStore);
+
+  void enqueueUpdate(GroupId groupId, GroupState& groupState);
+  void loop();
+
+private:
+  Settings& settings;
+  MqttClient& mqttClient;
+  GroupStateStore& stateStore;
+  CircularBuffer<GroupId, MILIGHT_MAX_STALE_MQTT_GROUPS> staleGroups;
+  unsigned long lastFlush;
+
+  inline void flushGroup(GroupId groupId, GroupState& state);
+  inline bool canFlush() const;
+};
+
+#endif

+ 8 - 1
lib/MiLightState/GroupState.cpp

@@ -73,6 +73,7 @@ GroupState::GroupState() {
   state.fields._isSetKelvin          = 0;
   state.fields._isSetBulbMode        = 0;
   state.fields._dirty                = 1;
+  state.fields._mqttDirty            = 0;
 }
 
 bool GroupState::isSetState() const { return state.fields._isSetState; }
@@ -227,9 +228,15 @@ bool GroupState::setBulbMode(BulbMode bulbMode) {
 }
 
 bool GroupState::isDirty() const { return state.fields._dirty; }
-inline bool GroupState::setDirty() { state.fields._dirty = 1; }
+inline bool GroupState::setDirty() {
+  state.fields._dirty = 1;
+  state.fields._mqttDirty = 1;
+}
 bool GroupState::clearDirty() { state.fields._dirty = 0; }
 
+bool GroupState::isMqttDirty() const { return state.fields._mqttDirty; }
+bool GroupState::clearMqttDirty() { state.fields._mqttDirty = 0; }
+
 void GroupState::load(Stream& stream) {
   for (size_t i = 0; i < DATA_BYTES; i++) {
     stream.readBytes(reinterpret_cast<uint8_t*>(&state.data[i]), 4);

+ 7 - 2
lib/MiLightState/GroupState.h

@@ -73,9 +73,13 @@ public:
   bool setBulbMode(BulbMode mode);
 
   bool isDirty() const;
-  bool setDirty();
+  inline bool setDirty();
   bool clearDirty();
 
+  bool isMqttDirty() const;
+  inline bool setMqttDirty();
+  bool clearMqttDirty();
+
   bool patch(const JsonObject& state);
   void applyState(JsonObject& state);
 
@@ -110,7 +114,8 @@ private:
         _isSetBrightnessColor : 1,
         _isSetBrightnessMode  : 1,
         _dirty                : 1,
-                              : 5;
+        _mqttDirty            : 1,
+                              : 4;
     } fields;
   };
 

+ 2 - 0
lib/Settings/Settings.cpp

@@ -81,6 +81,7 @@ void Settings::patch(JsonObject& parsedSettings) {
     this->setIfPresent(parsedSettings, "discovery_port", discoveryPort);
     this->setIfPresent(parsedSettings, "listen_repeats", listenRepeats);
     this->setIfPresent(parsedSettings, "state_flush_interval", stateFlushInterval);
+    this->setIfPresent(parsedSettings, "mqtt_state_rate_limit", mqttStateRateLimit);
 
     if (parsedSettings.containsKey("radio_interface_type")) {
       this->radioInterfaceType = Settings::typeFromString(parsedSettings["radio_interface_type"]);
@@ -149,6 +150,7 @@ void Settings::serialize(Stream& stream, const bool prettyPrint) {
   root["discovery_port"] = this->discoveryPort;
   root["listen_repeats"] = this->listenRepeats;
   root["state_flush_interval"] = this->stateFlushInterval;
+  root["mqtt_state_rate_limit"] = this->mqttStateRateLimit;
 
   if (this->deviceIds) {
     JsonArray& arr = jsonBuffer.createArray();

+ 8 - 2
lib/Settings/Settings.h

@@ -17,7 +17,11 @@
 #endif
 
 #ifndef MILIGHT_MAX_STATE_ITEMS
-#define MILIGHT_MAX_STATE_ITEMS 10
+#define MILIGHT_MAX_STATE_ITEMS 100
+#endif
+
+#ifndef MILIGHT_MAX_STALE_MQTT_GROUPS
+#define MILIGHT_MAX_STALE_MQTT_GROUPS 10
 #endif
 
 #define SETTINGS_FILE  "/config.json"
@@ -69,7 +73,8 @@ public:
     listenRepeats(3),
     _autoRestartPeriod(0),
     discoveryPort(48899),
-    stateFlushInterval(10)
+    stateFlushInterval(10),
+    mqttStateRateLimit(500)
   { }
 
   ~Settings() {
@@ -118,6 +123,7 @@ public:
   uint16_t discoveryPort;
   uint8_t listenRepeats;
   size_t stateFlushInterval;
+  size_t mqttStateRateLimit;
 
 protected:
   size_t _autoRestartPeriod;

+ 1 - 0
platformio.ini

@@ -20,6 +20,7 @@ lib_deps_external =
   https://github.com/ratkins/RGBConverter
   Hash
   WebSockets
+  CircularBuffer
 extra_scripts =
   pre:.build_web.py
 build_flags = !python .get_version.py -DMQTT_MAX_PACKET_SIZE=200 -Idist -Ilib/DataStructures

+ 42 - 17
src/main.cpp

@@ -18,6 +18,7 @@
 #include <RGBConverter.h>
 #include <MiLightDiscoveryServer.h>
 #include <MiLightClient.h>
+#include <BulbStateUpdater.h>
 
 WiFiManager wifiManager;
 
@@ -29,13 +30,19 @@ MiLightHttpServer *httpServer = NULL;
 MqttClient* mqttClient = NULL;
 MiLightDiscoveryServer* discoveryServer = NULL;
 uint8_t currentRadioType = 0;
+
+// For tracking and managing group state
 GroupStateStore stateStore(MILIGHT_MAX_STATE_ITEMS);
+BulbStateUpdater* bulbStateUpdater;
 size_t lastFlush = 0;
 
 int numUdpServers = 0;
 MiLightUdpServer** udpServers;
 WiFiUDP udpSeder;
 
+/**
+ * Set up UDP servers (both v5 and v6).  Clean up old ones if necessary.
+ */
 void initMilightUdpServers() {
   if (udpServers) {
     for (int i = 0; i < numUdpServers; i++) {
@@ -69,39 +76,45 @@ void initMilightUdpServers() {
   }
 }
 
+/**
+ * Milight RF packet handler.
+ *
+ * Called both when a packet is sent locally, and when an intercepted packet
+ * is read.
+ */
 void onPacketSentHandler(uint8_t* packet, const MiLightRemoteConfig& config) {
   StaticJsonBuffer<200> buffer;
   JsonObject& result = buffer.createObject();
-  config.packetFormatter->parsePacket(packet, result, &stateStore);
+  GroupId groupId = config.packetFormatter->parsePacket(packet, result, &stateStore);
 
-  if (!result.containsKey("device_id")
-    ||!result.containsKey("group_id")
-    ||!result.containsKey("device_type")) {
-    Serial.println(F("Skipping update because packet formatter didn't supply necessary information."));
+  if (&groupId == &DEFAULT_GROUP_ID) {
+    Serial.println(F("Skipping packet handler because packet was not decoded"));
     return;
   }
 
-  if (mqttClient) {
-    uint16_t deviceId = result["device_id"];
-    uint16_t groupId = result["group_id"];
-    const MiLightRemoteConfig* remoteConfig = MiLightRemoteConfig::fromType(result.get<String>("device_type"));
+  const MiLightRemoteConfig& remoteConfig =
+    *MiLightRemoteConfig::fromType(groupId.deviceType);
 
-    GroupId bulbId(deviceId, groupId, remoteConfig->type);
-    GroupState& groupState = stateStore.get(bulbId);
-    bool changes = groupState.patch(result);
+  if (mqttClient) {
+    GroupState& groupState = stateStore.get(groupId);
+    groupState.patch(result);
 
+    // Sends the state delta derived from the raw packet
     char output[200];
     result.printTo(output);
+    mqttClient->sendUpdate(remoteConfig, groupId.deviceId, groupId.groupId, output);
 
-    mqttClient->sendUpdate(config, deviceId, groupId, output);
-    groupState.applyState(result);
-    result.printTo(output);
-    mqttClient->sendState(config, deviceId, groupId, output);
+    // Sends the entire state
+    bulbStateUpdater->enqueueUpdate(groupId, groupState);
   }
 
-  httpServer->handlePacketSent(packet, config);
+  httpServer->handlePacketSent(packet, remoteConfig);
 }
 
+/**
+ * Listen for packets on one radio config.  Cycles through all configs as its
+ * called.
+ */
 void handleListen() {
   if (! settings.listenRepeats) {
     return;
@@ -130,6 +143,9 @@ void handleListen() {
   }
 }
 
+/**
+ * Apply what's in the Settings object.
+ */
 void applySettings() {
   if (milightClient) {
     delete milightClient;
@@ -139,6 +155,7 @@ void applySettings() {
   }
   if (mqttClient) {
     delete mqttClient;
+    delete bulbStateUpdater;
   }
 
   radioFactory = MiLightRadioFactory::fromSettings(settings);
@@ -154,6 +171,7 @@ void applySettings() {
   if (settings.mqttServer().length() > 0) {
     mqttClient = new MqttClient(settings, milightClient);
     mqttClient->begin();
+    bulbStateUpdater = new BulbStateUpdater(settings, *mqttClient, stateStore);
   }
 
   initMilightUdpServers();
@@ -168,6 +186,9 @@ void applySettings() {
   }
 }
 
+/**
+ *
+ */
 bool shouldRestart() {
   if (! settings.isAutoRestartEnabled()) {
     return false;
@@ -176,6 +197,9 @@ bool shouldRestart() {
   return settings.getAutoRestartPeriod()*60*1000 < millis();
 }
 
+/**
+ * Checks if group states should be flushed from memory onto flash
+ */
 bool shouldFlush() {
   return ((lastFlush + (settings.stateFlushInterval*1000) < millis())
     || lastFlush > millis()); // in case millis() wraps
@@ -216,6 +240,7 @@ void loop() {
 
   if (mqttClient) {
     mqttClient->handleClient();
+    bulbStateUpdater->loop();
   }
 
   if (udpServers) {