ソースを参照

update MQTT test

Christopher Mullins 6 年 前
コミット
f42d3a3c7e
共有2 個のファイルを変更した167 個の追加35 個の削除を含む
  1. 81 0
      test/remote/lib/mqtt_client.rb
  2. 86 35
      test/remote/spec/mqtt_spec.rb

+ 81 - 0
test/remote/lib/mqtt_client.rb

@@ -0,0 +1,81 @@
+require 'mqtt'
+require 'timeout'
+require 'json'
+
+class MqttClient
+  BreakListenLoopError = Class.new(StandardError)
+
+  def initialize(server, username, password, topic_prefix)
+    @client = MQTT::Client.connect("mqtt://#{username}:#{password}@#{server}")
+    @topic_prefix = topic_prefix
+    @listen_threads = []
+  end
+
+  def disconnect
+    @client.disconnect
+  end
+  
+  def reconnect
+    @client.disconnect
+    @client.connect
+  end
+
+  def id_topic_suffix(params)
+    if params
+      "#{sprintf '0x%04X', params[:id]}/#{params[:type]}/#{params[:group_id]}"
+    else
+      "+/+/+"
+    end
+  end
+
+  def on_update(id_params = nil, timeout = 10, &block)
+    on_id_message('updates', id_params, timeout, &block)
+  end
+
+  def on_state(id_params = nil, timeout = 10, &block)
+    on_id_message('state', id_params, timeout, &block)
+  end
+
+  def on_id_message(path, id_params, timeout, &block)
+    topic = "#{@topic_prefix}#{path}/#{id_topic_suffix(id_params)}"
+
+    on_message(topic, timeout) do |topic, message|
+      topic_parts = topic.split('/')
+
+      yield(
+        {
+          id: topic_parts[2].to_i(16),
+          type: topic_parts[3],
+          group_id: topic_parts[4].to_i
+        },
+        JSON.parse(message)
+      )
+    end
+  end
+
+  def on_message(topic, timeout = 10, &block)
+    @listen_threads << Thread.new do
+      begin
+        Timeout.timeout(timeout) do
+          @client.get(topic) do |topic, message|
+            raise BreakListenLoopError if yield(topic, message)
+          end
+        end
+      rescue Timeout::Error
+      rescue BreakListenLoopError
+      end
+    end
+  end
+
+  def patch_state(id_params, state = {})
+    @client.publish(
+      "#{@topic_prefix}commands/#{id_topic_suffix(id_params)}",
+      state.to_json
+    )
+  end
+
+  def wait_for_listeners
+    @listen_threads.each(&:join)
+    @listen_threads.clear
+  end
+end

+ 86 - 35
test/remote/spec/mqtt_spec.rb

@@ -1,64 +1,115 @@
-require 'mqtt'
 require 'api_client'
+require 'mqtt_client'
 
 RSpec.describe 'State' do
   before(:all) do
     @client = ApiClient.new(ENV.fetch('ESPMH_HOSTNAME'), ENV.fetch('ESPMH_TEST_DEVICE_ID_BASE'))
     @client.upload_json('/settings', 'settings.json')
 
+    @topic_prefix = ENV.fetch('ESPMH_MQTT_TOPIC_PREFIX')
+
     @client.put(
       '/settings', 
       mqtt_server: ENV.fetch('ESPMH_MQTT_SERVER'),
       mqtt_username: ENV.fetch('ESPMH_MQTT_USERNAME'),
       mqtt_password: ENV.fetch('ESPMH_MQTT_PASSWORD'),
+      mqtt_topic_pattern: "#{@topic_prefix}commands/:device_id/:device_type/:group_id",
+      mqtt_state_topic_pattern: "#{@topic_prefix}state/:device_id/:device_type/:group_id",
+      mqtt_update_topic_pattern: "#{@topic_prefix}updates/:device_id/:device_type/:group_id",
     )
 
-    @topic_prefix = ENV.fetch('ESPMH_MQTT_TOPIC_PREFIX')
+    @mqtt_client = MqttClient.new(
+      *%w(SERVER USERNAME PASSWORD).map { |x| ENV.fetch("ESPMH_MQTT_#{x}") } << @topic_prefix
+    )
+  end
+
+  after(:all) do
+    @mqtt_client.disconnect
+  end
+
+  before(:each) do
+    @id_params = {
+      id: @client.generate_id,
+      type: 'rgb_cct',
+      group_id: 1
+    }
   end
 
   context 'birth and LWT' do
-    it 'should send birth and LWT messages when configured' do
-      lwt_topic = "#{@topic_prefix}lwt"
+    # Unfortunately, no way to easily simulate an unclean disconnect, so only test birth
+    it 'should send birth message when configured' do
       birth_topic = "#{@topic_prefix}birth"
 
+      @client.put(
+        '/settings',
+        mqtt_birth_topic: birth_topic
+      )
+
       seen_birth = false
-      seen_lwt = false
-
-      MQTT::Client.connect("mqtt://#{ENV.fetch('ESPMH_MQTT_USERNAME')}:#{ENV.fetch('ESPMH_MQTT_PASSWORD')}@#{ENV.fetch('ESPMH_MQTT_SERVER')}") do |c|
-        birth_listen_thread = Thread.new do
-          begin
-            Timeout.timeout(10) do
-              c.get(birth_topic)
-              seen_birth = true
-            end
-          rescue Timeout::Error 
-          end
-        end
 
-        lwt_listen_thread = Thread.new do
-          begin
-            Timeout.timeout(10) do
-              c.get(lwt_topic)
-              seen_lwt = true
-            end
-          rescue Timeout::Error
-          end
+      @mqtt_client.on_message(birth_topic) do |topic, message|
+        seen_birth = true
+      end
+
+      # Force MQTT reconnect by updating settings
+      @client.put('/settings', fakekey: 'fakevalue')
+
+      @mqtt_client.wait_for_listeners
+
+      expect(seen_birth).to be(true)
+    end
+  end
+
+  context 'commands and state' do
+    # Check state using HTTP
+    it 'should affect state' do
+      @client.patch_state({level: 50, status: 'off'}, @id_params)
+
+      @mqtt_client.patch_state(@id_params, status: 'on', level: 70)
+      state = @client.get_state(@id_params)
+
+      expect(state.keys).to      include(*%w(level status))
+      expect(state['status']).to eq('ON')
+      expect(state['level']).to  eq(70)
+    end
+
+    it 'should publish to state topics' do
+      desired_state = {'status' => 'ON', 'level' => 80}
+      seen_state = false
+
+      @client.patch_state({status: 'off'}, @id_params)
+
+      @mqtt_client.on_state(@id_params) do |id, message|
+        seen_state = (id == @id_params && desired_state.all? { |k,v| v == message[k] })
+      end
+
+      @mqtt_client.patch_state(@id_params, desired_state)
+      @mqtt_client.wait_for_listeners
+
+      expect(seen_state).to be(true)
+    end
+
+    it 'should publish an update message for each new command' do
+      tweak_params = {'hue' => 49, 'brightness' => 128, 'saturation' => 50}
+      desired_state = {'state' => 'ON'}.merge(tweak_params)
+
+      init_state = desired_state.merge(Hash[
+        tweak_params.map do |k, v|
+          [k, v + 10]
         end
+      ])
 
-        @client.put(
-          '/settings',
-          mqtt_lwt_topic: lwt_topic,
-          mqtt_lwt_message: 'disconnected',
-          mqtt_birth_topic: birth_topic
-        )
-        @client.post('/system', command: 'restart')
+      @client.patch_state(@id_params, init_state)
 
-        lwt_listen_thread.join
-        birth_listen_thread.join
+      accumulated_state = {}
+      @mqtt_client.on_update(@id_params) do |id, message|
+        desired_state == accumulated_state.merge!(message)
       end
 
-      expect(seen_birth).to be(true)
-      expect(seen_lwt).to be(true)
+      @mqtt_client.patch_state(@id_params, desired_state)
+      @mqtt_client.wait_for_listeners
+
+      expect(accumulated_state).to eq(desired_state)
     end
   end
 end