00_MQTT.pm 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. ##############################################
  2. #
  3. # fhem bridge to mqtt (see http://mqtt.org)
  4. #
  5. # Copyright (C) 2017 Stephan Eisler
  6. # Copyright (C) 2014 - 2016 Norbert Truchsess
  7. #
  8. # This file is part of fhem.
  9. #
  10. # Fhem is free software: you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation, either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # Fhem is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License
  21. # along with fhem. If not, see <http://www.gnu.org/licenses/>.
  22. #
  23. # $Id: 00_MQTT.pm 13318 2017-02-03 09:42:52Z eisler $
  24. #
  25. ##############################################
  26. my %sets = (
  27. "connect" => "",
  28. "disconnect" => "",
  29. );
  30. my %gets = (
  31. "version" => ""
  32. );
  33. my @clients = qw(
  34. MQTT_DEVICE
  35. MQTT_BRIDGE
  36. );
  37. sub MQTT_Initialize($) {
  38. my $hash = shift @_;
  39. require "$main::attr{global}{modpath}/FHEM/DevIo.pm";
  40. # Provider
  41. $hash->{Clients} = join (':',@clients);
  42. $hash->{ReadyFn} = "MQTT::Ready";
  43. $hash->{ReadFn} = "MQTT::Read";
  44. # Consumer
  45. $hash->{DefFn} = "MQTT::Define";
  46. $hash->{UndefFn} = "MQTT::Undef";
  47. $hash->{SetFn} = "MQTT::Set";
  48. $hash->{NotifyFn} = "MQTT::Notify";
  49. $hash->{AttrList} = "keep-alive ".$main::readingFnAttributes;
  50. }
  51. package MQTT;
  52. use Exporter ('import');
  53. @EXPORT = ();
  54. @EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp);
  55. %EXPORT_TAGS = (all => [@EXPORT_OK]);
  56. use strict;
  57. use warnings;
  58. use GPUtils qw(:all);
  59. use Net::MQTT::Constants;
  60. use Net::MQTT::Message;
  61. our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
  62. BEGIN {GP_Import(qw(
  63. gettimeofday
  64. readingsSingleUpdate
  65. DevIo_OpenDev
  66. DevIo_SimpleWrite
  67. DevIo_SimpleRead
  68. DevIo_CloseDev
  69. RemoveInternalTimer
  70. InternalTimer
  71. AttrVal
  72. Log3
  73. AssignIoPort
  74. getKeyValue
  75. setKeyValue
  76. ))};
  77. sub Define($$) {
  78. my ( $hash, $def ) = @_;
  79. $hash->{NOTIFYDEV} = "global";
  80. $hash->{msgid} = 1;
  81. $hash->{timeout} = 60;
  82. $hash->{messages} = {};
  83. my ($host,$username,$password) = split("[ \t]+", $hash->{DEF});
  84. $hash->{DeviceName} = $host;
  85. my $name = $hash->{NAME};
  86. my $user = getKeyValue($name."_user");
  87. my $pass = getKeyValue($name."_pass");
  88. setKeyValue($name."_user",$username) unless(defined($user));
  89. setKeyValue($name."_pass",$password) unless(defined($pass));
  90. $hash->{DEF} = $host;
  91. if ($main::init_done) {
  92. return Start($hash);
  93. } else {
  94. return undef;
  95. }
  96. }
  97. sub Undef($) {
  98. my $hash = shift;
  99. Stop($hash);
  100. my $name = $hash->{NAME};
  101. setKeyValue($name."_user",undef);
  102. setKeyValue($name."_pass",undef);
  103. return undef;
  104. }
  105. sub Set($@) {
  106. my ($hash, @a) = @_;
  107. return "Need at least one parameters" if(@a < 2);
  108. return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
  109. if(!defined($sets{$a[1]}));
  110. my $command = $a[1];
  111. my $value = $a[2];
  112. COMMAND_HANDLER: {
  113. $command eq "connect" and do {
  114. Start($hash);
  115. last;
  116. };
  117. $command eq "disconnect" and do {
  118. Stop($hash);
  119. last;
  120. };
  121. };
  122. }
  123. sub Notify($$) {
  124. my ($hash,$dev) = @_;
  125. if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
  126. Start($hash);
  127. } elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
  128. }
  129. }
  130. sub Attr($$$$) {
  131. my ($command,$name,$attribute,$value) = @_;
  132. my $hash = $main::defs{$name};
  133. ATTRIBUTE_HANDLER: {
  134. $attribute eq "keep-alive" and do {
  135. if ($command eq "set") {
  136. $hash->{timeout} = $value;
  137. } else {
  138. $hash->{timeout} = 60;
  139. }
  140. if ($main::init_done) {
  141. $hash->{ping_received}=1;
  142. Timer($hash);
  143. };
  144. last;
  145. };
  146. };
  147. }
  148. sub Start($) {
  149. my $hash = shift;
  150. DevIo_CloseDev($hash);
  151. return DevIo_OpenDev($hash, 0, "MQTT::Init");
  152. }
  153. sub Stop($) {
  154. my $hash = shift;
  155. send_disconnect($hash);
  156. DevIo_CloseDev($hash);
  157. RemoveInternalTimer($hash);
  158. readingsSingleUpdate($hash,"connection","disconnected",1);
  159. }
  160. sub Ready($) {
  161. my $hash = shift;
  162. return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
  163. }
  164. sub Rename() {
  165. my ($new,$old) = @_;
  166. setKeyValue($new."_user",getKeyValue($old."_user"));
  167. setKeyValue($new."_pass",getKeyValue($old."_pass"));
  168. setKeyValue($old."_user",undef);
  169. setKeyValue($old."_pass",undef);
  170. return undef;
  171. }
  172. sub Init($) {
  173. my $hash = shift;
  174. send_connect($hash);
  175. readingsSingleUpdate($hash,"connection","connecting",1);
  176. $hash->{ping_received}=1;
  177. Timer($hash);
  178. return undef;
  179. }
  180. sub Timer($) {
  181. my $hash = shift;
  182. RemoveInternalTimer($hash);
  183. readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received};
  184. $hash->{ping_received} = 0;
  185. InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
  186. send_ping($hash);
  187. }
  188. sub Read {
  189. my ($hash) = @_;
  190. my $name = $hash->{NAME};
  191. my $buf = DevIo_SimpleRead($hash);
  192. return undef unless $buf;
  193. $hash->{buf} .= $buf;
  194. while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
  195. my $message_type = $mqtt->message_type();
  196. Log3($name,5,"MQTT $name message received: ".$mqtt->string());
  197. MESSAGE_TYPE: {
  198. $message_type == MQTT_CONNACK and do {
  199. readingsSingleUpdate($hash,"connection","connected",1);
  200. GP_ForallClients($hash,\&client_start);
  201. foreach my $message_id (keys %{$hash->{messages}}) {
  202. my $msg = $hash->{messages}->{$message_id}->{message};
  203. $msg->{dup} = 1;
  204. DevIo_SimpleWrite($hash,$msg->bytes,undef);
  205. }
  206. last;
  207. };
  208. $message_type == MQTT_PUBLISH and do {
  209. my $topic = $mqtt->topic();
  210. GP_ForallClients($hash,sub {
  211. my $client = shift;
  212. Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
  213. if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
  214. readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
  215. if ($client->{TYPE} eq "MQTT_DEVICE") {
  216. MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
  217. } else {
  218. MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
  219. }
  220. };
  221. },undef);
  222. if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
  223. my $message_id = $mqtt->message_id();
  224. if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
  225. send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
  226. } else {
  227. send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
  228. }
  229. }
  230. last;
  231. };
  232. $message_type == MQTT_PUBACK and do {
  233. my $message_id = $mqtt->message_id();
  234. GP_ForallClients($hash,sub {
  235. my $client = shift;
  236. if ($client->{message_ids}->{$message_id}) {
  237. readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
  238. delete $client->{message_ids}->{$message_id};
  239. };
  240. },undef);
  241. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  242. last;
  243. };
  244. $message_type == MQTT_PUBREC and do {
  245. my $message_id = $mqtt->message_id();
  246. GP_ForallClients($hash,sub {
  247. my $client = shift;
  248. if ($client->{message_ids}->{$message_id}) {
  249. readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
  250. };
  251. },undef);
  252. send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
  253. last;
  254. };
  255. $message_type == MQTT_PUBREL and do {
  256. my $message_id = $mqtt->message_id();
  257. GP_ForallClients($hash,sub {
  258. my $client = shift;
  259. if ($client->{message_ids}->{$message_id}) {
  260. readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
  261. delete $client->{message_ids}->{$message_id};
  262. };
  263. },undef);
  264. send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
  265. delete $hash->{messages}->{$message_id};
  266. last;
  267. };
  268. $message_type == MQTT_PUBCOMP and do {
  269. my $message_id = $mqtt->message_id();
  270. GP_ForallClients($hash,sub {
  271. my $client = shift;
  272. if ($client->{message_ids}->{$message_id}) {
  273. readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
  274. delete $client->{message_ids}->{$message_id};
  275. };
  276. },undef);
  277. delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
  278. last;
  279. };
  280. $message_type == MQTT_SUBACK and do {
  281. my $message_id = $mqtt->message_id();
  282. GP_ForallClients($hash,sub {
  283. my $client = shift;
  284. if ($client->{message_ids}->{$message_id}) {
  285. readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
  286. delete $client->{message_ids}->{$message_id};
  287. };
  288. },undef);
  289. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  290. last;
  291. };
  292. $message_type == MQTT_UNSUBACK and do {
  293. my $message_id = $mqtt->message_id();
  294. GP_ForallClients($hash,sub {
  295. my $client = shift;
  296. if ($client->{message_ids}->{$message_id}) {
  297. readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
  298. delete $client->{message_ids}->{$message_id};
  299. };
  300. },undef);
  301. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  302. last;
  303. };
  304. $message_type == MQTT_PINGRESP and do {
  305. $hash->{ping_received} = 1;
  306. readingsSingleUpdate($hash,"connection","active",1);
  307. last;
  308. };
  309. Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
  310. }
  311. }
  312. return undef;
  313. };
  314. sub send_connect($) {
  315. my $hash = shift;
  316. my $name = $hash->{NAME};
  317. my $user = getKeyValue($name."_user");
  318. my $pass = getKeyValue($name."_pass");
  319. return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass);
  320. };
  321. sub send_publish($@) {
  322. my ($hash,%msg) = @_;
  323. if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
  324. send_message(shift, message_type => MQTT_PUBLISH, %msg);
  325. return undef;
  326. } else {
  327. my $msgid = $hash->{msgid}++;
  328. send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
  329. return $msgid;
  330. }
  331. };
  332. sub send_subscribe($@) {
  333. my $hash = shift;
  334. my $msgid = $hash->{msgid}++;
  335. send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  336. return $msgid;
  337. };
  338. sub send_unsubscribe($@) {
  339. my $hash = shift;
  340. my $msgid = $hash->{msgid}++;
  341. send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  342. return $msgid;
  343. };
  344. sub send_ping($) {
  345. return send_message(shift, message_type => MQTT_PINGREQ);
  346. };
  347. sub send_disconnect($) {
  348. return send_message(shift, message_type => MQTT_DISCONNECT);
  349. };
  350. sub send_message($$$@) {
  351. my ($hash,%msg) = @_;
  352. my $name = $hash->{NAME};
  353. my $message = Net::MQTT::Message->new(%msg);
  354. Log3($name,5,"MQTT $name message sent: ".$message->string());
  355. if (defined $msg{message_id}) {
  356. $hash->{messages}->{$msg{message_id}} = {
  357. message => $message,
  358. timeout => gettimeofday()+$hash->{timeout},
  359. };
  360. }
  361. DevIo_SimpleWrite($hash,$message->bytes,undef);
  362. };
  363. sub topic_to_regexp($) {
  364. my $t = shift;
  365. $t =~ s|#$|.\*|;
  366. $t =~ s|\/\.\*$|.\*|;
  367. $t =~ s|\/|\\\/|g;
  368. $t =~ s|(\+)([^+]*$)|(+)$2|;
  369. $t =~ s|\+|[^\/]+|g;
  370. return "^$t\$";
  371. }
  372. sub client_subscribe_topic($$) {
  373. my ($client,$topic) = @_;
  374. push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
  375. my $expr = topic_to_regexp($topic);
  376. push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
  377. if ($main::init_done) {
  378. if (my $mqtt = $client->{IODev}) {;
  379. my $msgid = send_subscribe($mqtt,
  380. topics => [[$topic => $client->{qos} || MQTT_QOS_AT_MOST_ONCE]],
  381. );
  382. $client->{message_ids}->{$msgid}++;
  383. readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
  384. }
  385. }
  386. };
  387. sub client_unsubscribe_topic($$) {
  388. my ($client,$topic) = @_;
  389. $client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
  390. my $expr = topic_to_regexp($topic);
  391. $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
  392. if ($main::init_done) {
  393. if (my $mqtt = $client->{IODev}) {;
  394. my $msgid = send_unsubscribe($mqtt,
  395. topics => [$topic],
  396. );
  397. $client->{message_ids}->{$msgid}++;
  398. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
  399. }
  400. }
  401. };
  402. sub Client_Define($$) {
  403. my ( $client, $def ) = @_;
  404. $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
  405. $client->{qos} = MQTT_QOS_AT_MOST_ONCE;
  406. $client->{retain} = 0;
  407. $client->{subscribe} = [];
  408. $client->{subscribeExpr} = [];
  409. AssignIoPort($client);
  410. if ($main::init_done) {
  411. return client_start($client);
  412. } else {
  413. return undef;
  414. }
  415. };
  416. sub Client_Undefine($) {
  417. client_stop(shift);
  418. return undef;
  419. };
  420. sub client_attr($$$$$) {
  421. my ($client,$command,$name,$attribute,$value) = @_;
  422. ATTRIBUTE_HANDLER: {
  423. $attribute eq "qos" and do {
  424. if ($command eq "set") {
  425. $client->{qos} = $MQTT::qos{$value};
  426. } else {
  427. $client->{qos} = MQTT_QOS_AT_MOST_ONCE;
  428. }
  429. last;
  430. };
  431. $attribute eq "retain" and do {
  432. if ($command eq "set") {
  433. $client->{retain} = $value;
  434. } else {
  435. $client->{retain} = 0;
  436. }
  437. last;
  438. };
  439. $attribute eq "IODev" and do {
  440. if ($main::init_done) {
  441. if ($command eq "set") {
  442. client_stop($client);
  443. $main::attr{$name}{IODev} = $value;
  444. client_start($client);
  445. } else {
  446. client_stop($client);
  447. }
  448. }
  449. last;
  450. };
  451. }
  452. };
  453. sub client_start($) {
  454. my $client = shift;
  455. my $name = $client->{NAME};
  456. if (! (defined AttrVal($name,"stateFormat",undef))) {
  457. $main::attr{$name}{stateFormat} = "transmission-state";
  458. }
  459. if (@{$client->{subscribe}}) {
  460. my $msgid = send_subscribe($client->{IODev},
  461. topics => [map { [$_ => $client->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
  462. );
  463. $client->{message_ids}->{$msgid}++;
  464. readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
  465. }
  466. };
  467. sub client_stop($) {
  468. my $client = shift;
  469. if (@{$client->{subscribe}}) {
  470. my $msgid = send_unsubscribe($client->{IODev},
  471. topics => [@{$client->{subscribe}}],
  472. );
  473. $client->{message_ids}->{$msgid}++;
  474. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
  475. }
  476. };
  477. 1;
  478. =pod
  479. =item [device]
  480. =item summary connects fhem to MQTT
  481. =begin html
  482. <a name="MQTT"></a>
  483. <h3>MQTT</h3>
  484. <ul>
  485. <p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
  486. <p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> and <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
  487. Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
  488. Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
  489. <a name="MQTTdefine"></a>
  490. <p><b>Define</b></p>
  491. <ul>
  492. <p><code>define &lt;name&gt; MQTT &lt;ip:port&gt; [&lt;username&gt;] [&lt;password&gt;]</code></p>
  493. <p>Specifies the MQTT device.</p>
  494. </ul>
  495. <a name="MQTTset"></a>
  496. <p><b>Set</b></p>
  497. <ul>
  498. <li>
  499. <p><code>set &lt;name&gt; connect</code><br/>
  500. (re-)connects the MQTT-device to the mqtt-broker</p>
  501. </li>
  502. <li>
  503. <p><code>set &lt;name&gt; disconnect</code><br/>
  504. disconnects the MQTT-device from the mqtt-broker</p>
  505. </li>
  506. </ul>
  507. <a name="MQTTattr"></a>
  508. <p><b>Attributes</b></p>
  509. <ul>
  510. <li>
  511. <p>keep-alive<br/>
  512. sets the keep-alive time (in seconds).</p>
  513. </li>
  514. </ul>
  515. </ul>
  516. =end html
  517. =cut