mqtt_client.rb 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. require 'mqtt'
  2. require 'timeout'
  3. require 'json'
  4. class MqttClient
  5. BreakListenLoopError = Class.new(StandardError)
  6. def initialize(server, username, password, topic_prefix)
  7. @client = MQTT::Client.connect("mqtt://#{username}:#{password}@#{server}")
  8. @topic_prefix = topic_prefix
  9. @listen_threads = []
  10. end
  11. def disconnect
  12. @client.disconnect
  13. end
  14. def reconnect
  15. @client.disconnect
  16. @client.connect
  17. end
  18. def wait_for_message(topic, timeout = 10)
  19. on_message(topic, timeout) { |topic, message| }
  20. wait_for_listeners
  21. end
  22. def id_topic_suffix(params)
  23. if params
  24. "#{sprintf '0x%04X', params[:id]}/#{params[:type]}/#{params[:group_id]}"
  25. else
  26. "+/+/+"
  27. end
  28. end
  29. def on_update(id_params = nil, timeout = 10, &block)
  30. on_id_message('updates', id_params, timeout, &block)
  31. end
  32. def on_state(id_params = nil, timeout = 10, &block)
  33. on_id_message('state', id_params, timeout, &block)
  34. end
  35. def on_id_message(path, id_params, timeout, &block)
  36. sub_topic = "#{@topic_prefix}#{path}/#{id_topic_suffix(id_params)}"
  37. on_message(sub_topic, timeout) do |topic, message|
  38. topic_parts = topic.split('/')
  39. begin
  40. message = JSON.parse(message)
  41. rescue JSON::ParserError => e
  42. end
  43. yield(
  44. {
  45. id: topic_parts[2].to_i(16),
  46. type: topic_parts[3],
  47. group_id: topic_parts[4].to_i
  48. },
  49. message
  50. )
  51. end
  52. end
  53. def on_message(topic, timeout = 10, &block)
  54. @listen_threads << Thread.new do
  55. begin
  56. Timeout.timeout(timeout) do
  57. @client.get(topic) do |topic, message|
  58. ret_val = yield(topic, message)
  59. raise BreakListenLoopError if ret_val
  60. end
  61. end
  62. rescue Timeout::Error => e
  63. puts "Timed out listening for message on: #{topic}"
  64. raise e
  65. rescue BreakListenLoopError
  66. end
  67. end
  68. end
  69. def publish(topic, state = {})
  70. @client.publish(topic, state.to_json)
  71. end
  72. def patch_state(id_params, state = {})
  73. @client.publish(
  74. "#{@topic_prefix}commands/#{id_topic_suffix(id_params)}",
  75. state.to_json
  76. )
  77. end
  78. def wait_for_listeners
  79. @listen_threads.each(&:join)
  80. @listen_threads.clear
  81. end
  82. end