mqtt_client.rb 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. require 'mqtt'
  2. require 'timeout'
  3. require 'json'
  4. class MqttClient
  5. BreakListenLoopError = Class.new(StandardError)
  6. def initialize(server, username, password, topic_prefix)
  7. @client = MQTT::Client.connect("mqtt://#{username}:#{password}@#{server}")
  8. @topic_prefix = topic_prefix
  9. @listen_threads = []
  10. end
  11. def disconnect
  12. @client.disconnect
  13. end
  14. def reconnect
  15. @client.disconnect
  16. @client.connect
  17. end
  18. def id_topic_suffix(params)
  19. if params
  20. "#{sprintf '0x%04X', params[:id]}/#{params[:type]}/#{params[:group_id]}"
  21. else
  22. "+/+/+"
  23. end
  24. end
  25. def on_update(id_params = nil, timeout = 10, &block)
  26. on_id_message('updates', id_params, timeout, &block)
  27. end
  28. def on_state(id_params = nil, timeout = 10, &block)
  29. on_id_message('state', id_params, timeout, &block)
  30. end
  31. def on_id_message(path, id_params, timeout, &block)
  32. topic = "#{@topic_prefix}#{path}/#{id_topic_suffix(id_params)}"
  33. on_message(topic, timeout) do |topic, message|
  34. topic_parts = topic.split('/')
  35. yield(
  36. {
  37. id: topic_parts[2].to_i(16),
  38. type: topic_parts[3],
  39. group_id: topic_parts[4].to_i
  40. },
  41. JSON.parse(message)
  42. )
  43. end
  44. end
  45. def on_message(topic, timeout = 10, &block)
  46. @listen_threads << Thread.new do
  47. begin
  48. Timeout.timeout(timeout) do
  49. @client.get(topic) do |topic, message|
  50. raise BreakListenLoopError if yield(topic, message)
  51. end
  52. end
  53. rescue Timeout::Error
  54. rescue BreakListenLoopError
  55. end
  56. end
  57. end
  58. def patch_state(id_params, state = {})
  59. @client.publish(
  60. "#{@topic_prefix}commands/#{id_topic_suffix(id_params)}",
  61. state.to_json
  62. )
  63. end
  64. def wait_for_listeners
  65. @listen_threads.each(&:join)
  66. @listen_threads.clear
  67. end
  68. end