mqtt_client.rb 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. require 'mqtt'
  2. require 'timeout'
  3. require 'json'
  4. class MqttClient
  5. BreakListenLoopError = Class.new(StandardError)
  6. def initialize(server, username, password, topic_prefix)
  7. @client = MQTT::Client.connect("mqtt://#{username}:#{password}@#{server}")
  8. @topic_prefix = topic_prefix
  9. @listen_threads = []
  10. end
  11. def disconnect
  12. @client.disconnect
  13. end
  14. def reconnect
  15. @client.disconnect
  16. @client.connect
  17. end
  18. def wait_for_message(topic, timeout = 10)
  19. on_message(topic, timeout) { |topic, message| }
  20. wait_for_listeners
  21. end
  22. def id_topic_suffix(params)
  23. if params
  24. str_id = if params[:id_format] == 'decimal'
  25. params[:id].to_s
  26. else
  27. sprintf '0x%04X', params[:id]
  28. end
  29. "#{str_id}/#{params[:type]}/#{params[:group_id]}"
  30. else
  31. "+/+/+"
  32. end
  33. end
  34. def on_update(id_params = nil, timeout = 10, &block)
  35. on_id_message('updates', id_params, timeout, &block)
  36. end
  37. def on_state(id_params = nil, timeout = 10, &block)
  38. on_id_message('state', id_params, timeout, &block)
  39. end
  40. def on_id_message(path, id_params, timeout, &block)
  41. sub_topic = "#{@topic_prefix}#{path}/#{id_topic_suffix(nil)}"
  42. on_message(sub_topic, timeout) do |topic, message|
  43. topic_parts = topic.split('/')
  44. topic_id_params = {
  45. id: topic_parts[2].to_i(16),
  46. type: topic_parts[3],
  47. group_id: topic_parts[4].to_i,
  48. unparsed_id: topic_parts[2]
  49. }
  50. if !id_params || %w(id type group_id).all? { |k| k=k.to_sym; topic_id_params[k] == id_params[k] }
  51. begin
  52. message = JSON.parse(message)
  53. rescue JSON::ParserError => e
  54. end
  55. yield( topic_id_params, message )
  56. end
  57. end
  58. end
  59. def on_message(topic, timeout = 10, raise_error = true, &block)
  60. @listen_threads << Thread.new do
  61. begin
  62. Timeout.timeout(timeout) do
  63. @client.get(topic) do |topic, message|
  64. ret_val = yield(topic, message)
  65. raise BreakListenLoopError if ret_val
  66. end
  67. end
  68. rescue Timeout::Error => e
  69. puts "Timed out listening for message on: #{topic}"
  70. raise e if raise_error
  71. rescue BreakListenLoopError
  72. end
  73. end
  74. end
  75. def publish(topic, state = {})
  76. @client.publish(topic, state.to_json)
  77. end
  78. def patch_state(id_params, state = {})
  79. @client.publish(
  80. "#{@topic_prefix}commands/#{id_topic_suffix(id_params)}",
  81. state.to_json
  82. )
  83. end
  84. def wait_for_listeners
  85. @listen_threads.each(&:join)
  86. @listen_threads.clear
  87. end
  88. end