00_MQTT.pm 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986
  1. ##############################################
  2. #
  3. # fhem bridge to mqtt (see http://mqtt.org)
  4. #
  5. # Copyright (C) 2017 Stephan Eisler
  6. # Copyright (C) 2014 - 2016 Norbert Truchsess
  7. #
  8. # This file is part of fhem.
  9. #
  10. # Fhem is free software: you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation, either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # Fhem is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License
  21. # along with fhem. If not, see <http://www.gnu.org/licenses/>.
  22. #
  23. # $Id: 00_MQTT.pm 16252 2018-02-23 21:32:59Z eisler $
  24. #
  25. ##############################################
  26. my %sets = (
  27. "connect" => "",
  28. "disconnect" => "",
  29. "publish" => "",
  30. );
  31. my %gets = (
  32. "version" => ""
  33. );
  34. my @clients = qw(
  35. MQTT_DEVICE
  36. MQTT_BRIDGE
  37. );
  38. sub MQTT_Initialize($) {
  39. my $hash = shift @_;
  40. require "$main::attr{global}{modpath}/FHEM/DevIo.pm";
  41. # Provider
  42. $hash->{Clients} = join (':',@clients);
  43. $hash->{ReadyFn} = "MQTT::Ready";
  44. $hash->{ReadFn} = "MQTT::Read";
  45. # Consumer
  46. $hash->{DefFn} = "MQTT::Define";
  47. $hash->{UndefFn} = "MQTT::Undef";
  48. $hash->{DeleteFn} = "MQTT::Delete";
  49. $hash->{ShutdownFn} = "MQTT::Shutdown";
  50. $hash->{SetFn} = "MQTT::Set";
  51. $hash->{NotifyFn} = "MQTT::Notify";
  52. $hash->{AttrFn} = "MQTT::Attr";
  53. $hash->{AttrList} = "keep-alive "."last-will client-id "."on-connect on-disconnect on-timeout ".$main::readingFnAttributes;
  54. }
  55. package MQTT;
  56. use Exporter ('import');
  57. @EXPORT = ();
  58. @EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams);
  59. %EXPORT_TAGS = (all => [@EXPORT_OK]);
  60. use strict;
  61. use warnings;
  62. use GPUtils qw(:all);
  63. use Net::MQTT::Constants;
  64. use Net::MQTT::Message;
  65. our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
  66. BEGIN {GP_Import(qw(
  67. gettimeofday
  68. readingsSingleUpdate
  69. DevIo_OpenDev
  70. DevIo_SimpleWrite
  71. DevIo_SimpleRead
  72. DevIo_CloseDev
  73. RemoveInternalTimer
  74. InternalTimer
  75. AttrVal
  76. ReadingsVal
  77. Log3
  78. AssignIoPort
  79. getKeyValue
  80. setKeyValue
  81. CallFn
  82. defs
  83. modules
  84. looks_like_number
  85. fhem
  86. ))};
  87. sub Define($$) {
  88. my ( $hash, $def ) = @_;
  89. $hash->{NOTIFYDEV} = "global";
  90. $hash->{msgid} = 1;
  91. $hash->{timeout} = 60;
  92. $hash->{messages} = {};
  93. my ($host,$username,$password) = split("[ \t]+", $hash->{DEF});
  94. $hash->{DeviceName} = $host;
  95. my $name = $hash->{NAME};
  96. my $user = getKeyValue($name."_user");
  97. my $pass = getKeyValue($name."_pass");
  98. setKeyValue($name."_user",$username) unless(defined($user));
  99. setKeyValue($name."_pass",$password) unless(defined($pass));
  100. $hash->{DEF} = $host;
  101. #readingsSingleUpdate($hash,"connection","disconnected",0);
  102. if ($main::init_done) {
  103. return Start($hash);
  104. } else {
  105. return undef;
  106. }
  107. }
  108. sub Undef($) {
  109. my $hash = shift;
  110. Stop($hash);
  111. return undef;
  112. }
  113. sub Delete($$) {
  114. my ($hash, $name) = @_;
  115. setKeyValue($name."_user",undef);
  116. setKeyValue($name."_pass",undef);
  117. return undef;
  118. }
  119. sub Shutdown($) {
  120. my $hash = shift;
  121. Stop($hash);
  122. my $name = $hash->{NAME};
  123. Log3($name,1,"Shutdown executed");
  124. return undef;
  125. }
  126. sub onConnect($) {
  127. my $hash = shift;
  128. my $name = $hash->{NAME};
  129. my $cmdstr = AttrVal($name,"on-connect",undef);
  130. return process_event($hash,$cmdstr);
  131. }
  132. sub onDisconnect($) {
  133. my $hash = shift;
  134. my $name = $hash->{NAME};
  135. my $cmdstr = AttrVal($name,"on-disconnect",undef);
  136. return process_event($hash,$cmdstr);
  137. }
  138. sub onTimeout($) {
  139. my $hash = shift;
  140. my $name = $hash->{NAME};
  141. my $cmdstr = AttrVal($name,"on-timeout",undef);
  142. if($cmdstr) {
  143. return eval($cmdstr);
  144. }
  145. }
  146. sub process_event($$) {
  147. my $hash = shift;
  148. my $str = shift;
  149. my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str);
  150. my $do=1;
  151. if($cmd) {
  152. my $name = $hash->{NAME};
  153. $do=eval($cmd);
  154. $do=1 if (!defined($do));
  155. #no strict "refs";
  156. #my $ret = &{$hash->{WBCallback}}($hash);
  157. #use strict "refs";
  158. }
  159. if($do && defined($topic)) {
  160. $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
  161. send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain);
  162. }
  163. }
  164. sub Set($@) {
  165. my ($hash, @a) = @_;
  166. return "Need at least one parameters" if(@a < 2);
  167. return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
  168. if(!defined($sets{$a[1]}));
  169. my $command = $a[1];
  170. my $value = $a[2];
  171. COMMAND_HANDLER: {
  172. $command eq "connect" and do {
  173. Start($hash);
  174. last;
  175. };
  176. $command eq "disconnect" and do {
  177. Stop($hash);
  178. last;
  179. };
  180. $command eq "publish" and do {
  181. shift(@a);
  182. shift(@a);
  183. #if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";}
  184. #my $qos=0;
  185. #my $retain=0;
  186. #if(looks_like_number ($a[0])) {
  187. # $qos = int($a[0]);
  188. # $qos = 0 if $qos>1;
  189. # shift(@a);
  190. # if(looks_like_number ($a[0])) {
  191. # $retain = int($a[0]);
  192. # $retain = 0 if $retain>2;
  193. # shift(@a);
  194. # }
  195. #}
  196. #if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";}
  197. #my $topic = shift(@a);
  198. #my $value = join (" ", @a);
  199. my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
  200. return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic);
  201. return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+');
  202. $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
  203. my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
  204. last;
  205. }
  206. };
  207. }
  208. sub parseParams($;$$) {
  209. my ( $cmd, $separator, $joiner ) = @_;
  210. $separator = ' ' if ( !$separator );
  211. $joiner = $separator if ( !$joiner ); # needed if separator is a regexp
  212. my ( @a, %h );
  213. my @params;
  214. if ( ref($cmd) eq 'ARRAY' ) {
  215. @params = @{$cmd};
  216. }
  217. else {
  218. @params = split( $separator, $cmd );
  219. }
  220. while (@params) {
  221. my $param = shift(@params);
  222. next if ( $param eq "" );
  223. my ( $key, $value ) = split( ':', $param, 2 );
  224. if ( !defined($value) ) {
  225. $value = $key;
  226. $key = undef;
  227. # the key can not start with a { -> it must be a perl expression # vim:}
  228. }
  229. elsif ( $key =~ m/^\s*{/ ) { # for vim: }
  230. $value = $param;
  231. $key = undef;
  232. }
  233. #collect all parts until the closing ' or "
  234. while ( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
  235. my $next = shift(@params);
  236. last if ( !defined($next) );
  237. $value .= $joiner . $next;
  238. }
  239. #remove matching ' or " from the start and end
  240. if ( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
  241. $value =~ s/^.(.*).$/$1/;
  242. }
  243. #collect all parts until opening { and closing } are matched
  244. if ( $value =~ m/^\s*{/ ) { # } for match
  245. my $count = 0;
  246. for my $i ( 0 .. length($value) - 1 ) {
  247. my $c = substr( $value, $i, 1 );
  248. ++$count if ( $c eq '{' );
  249. --$count if ( $c eq '}' );
  250. }
  251. while ( $param && $count != 0 ) {
  252. my $next = shift(@params);
  253. last if ( !defined($next) );
  254. $value .= $joiner . $next;
  255. for my $i ( 0 .. length($next) - 1 ) {
  256. my $c = substr( $next, $i, 1 );
  257. ++$count if ( $c eq '{' );
  258. --$count if ( $c eq '}' );
  259. }
  260. }
  261. }
  262. if ( defined($key) ) {
  263. $h{$key} = $value;
  264. }
  265. else {
  266. push @a, $value;
  267. }
  268. }
  269. return ( \@a, \%h );
  270. }
  271. sub parsePublishCmdStr($) {
  272. my ($str) = @_;
  273. if(defined($str) && $str=~m/\s*(?:({.*})\s+)?(.*)/) {
  274. my $exp = $1;
  275. my $rest = $2;
  276. if ($rest){
  277. my @lwa = split("[ \t]+",$rest);
  278. unshift (@lwa,$exp) if($exp);
  279. return parsePublishCmd(@lwa);
  280. }
  281. }
  282. return undef;
  283. }
  284. sub parsePublishCmd(@) {
  285. my @a = @_;
  286. my ( $aa, $bb ) = parseParams(\@a);
  287. my $qos = 0;
  288. my $retain = 0;
  289. my $topic = undef;
  290. my $value = "\0";
  291. my $expression = undef;
  292. if ( exists( $bb->{'qos'} ) ) {
  293. $qos = $bb->{'qos'};
  294. }
  295. if ( exists $bb->{'retain'} ) {
  296. $retain = $bb->{'retain'};
  297. }
  298. my @aaa = ();
  299. my @xaa = @{$aa};
  300. while ( scalar(@xaa) > 0 ) {
  301. my $av = shift @xaa;
  302. if ( $av =~ /\{.*\}/ ) {
  303. $expression = $av;
  304. next;
  305. }
  306. else {
  307. push @aaa, $av;
  308. }
  309. }
  310. $topic = shift(@aaa);
  311. if ( scalar(@aaa) > 0 ) {
  312. $value = join( " ", @aaa );
  313. }
  314. return undef unless $topic || $expression;
  315. return ( $qos, $retain, $topic, $value, $expression );
  316. }
  317. sub Notify($$) {
  318. my ($hash,$dev) = @_;
  319. if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
  320. Start($hash);
  321. } elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
  322. }
  323. }
  324. sub Attr($$$$) {
  325. my ($command,$name,$attribute,$value) = @_;
  326. my $hash = $main::defs{$name};
  327. ATTRIBUTE_HANDLER: {
  328. $attribute eq "keep-alive" and do {
  329. if ($command eq "set") {
  330. $hash->{timeout} = $value;
  331. } else {
  332. $hash->{timeout} = 60;
  333. }
  334. if ($main::init_done) {
  335. $hash->{ping_received}=1;
  336. Timer($hash);
  337. };
  338. last;
  339. };
  340. $attribute eq "last-will" and do {
  341. if($hash->{STATE} ne "disconnected") {
  342. Stop($hash);
  343. InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0);
  344. }
  345. last;
  346. };
  347. };
  348. }
  349. #sub Reconnect($){
  350. # my $hash = shift;
  351. # Stop($hash);
  352. # Start($hash);
  353. #}
  354. sub Start($) {
  355. my $hash = shift;
  356. my $firsttime = $hash->{".cinitmark"};
  357. if(defined($firsttime)) {
  358. my $cstate=ReadingsVal($hash->{NAME},"connection","");
  359. if($cstate ne "disconnected" && $cstate ne "timed-out") {
  360. return undef;
  361. }
  362. } else {
  363. $hash->{".cinitmark"} = 1;
  364. }
  365. DevIo_CloseDev($hash);
  366. return DevIo_OpenDev($hash, 0, "MQTT::Init");
  367. }
  368. sub Stop($) {
  369. my $hash = shift;
  370. my $cstate=ReadingsVal($hash->{NAME},"connection","");
  371. if($cstate eq "disconnected" || $cstate eq "timed-out") {
  372. return undef;
  373. }
  374. send_disconnect($hash);
  375. DevIo_CloseDev($hash);
  376. RemoveInternalTimer($hash);
  377. readingsSingleUpdate($hash,"connection","disconnected",1);
  378. }
  379. sub Ready($) {
  380. my $hash = shift;
  381. return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
  382. }
  383. sub Rename() {
  384. my ($new,$old) = @_;
  385. setKeyValue($new."_user",getKeyValue($old."_user"));
  386. setKeyValue($new."_pass",getKeyValue($old."_pass"));
  387. setKeyValue($old."_user",undef);
  388. setKeyValue($old."_pass",undef);
  389. return undef;
  390. }
  391. sub Init($) {
  392. my $hash = shift;
  393. send_connect($hash);
  394. readingsSingleUpdate($hash,"connection","connecting",1);
  395. $hash->{ping_received}=1;
  396. Timer($hash);
  397. return undef;
  398. }
  399. sub Timer($) {
  400. my $hash = shift;
  401. RemoveInternalTimer($hash);
  402. unless ($hash->{ping_received}) {
  403. onTimeout($hash);
  404. readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received};
  405. }
  406. $hash->{ping_received} = 0;
  407. InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
  408. send_ping($hash);
  409. }
  410. sub Read {
  411. my ($hash) = @_;
  412. my $name = $hash->{NAME};
  413. my $buf = DevIo_SimpleRead($hash);
  414. return undef unless $buf;
  415. $hash->{buf} .= $buf;
  416. while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
  417. my $message_type = $mqtt->message_type();
  418. Log3($name,5,"MQTT $name message received: ".$mqtt->string());
  419. MESSAGE_TYPE: {
  420. $message_type == MQTT_CONNACK and do {
  421. readingsSingleUpdate($hash,"connection","connected",1);
  422. onConnect($hash);
  423. GP_ForallClients($hash,\&client_start);
  424. foreach my $message_id (keys %{$hash->{messages}}) {
  425. my $msg = $hash->{messages}->{$message_id}->{message};
  426. $msg->{dup} = 1;
  427. DevIo_SimpleWrite($hash,$msg->bytes,undef);
  428. }
  429. last;
  430. };
  431. $message_type == MQTT_PUBLISH and do {
  432. my $topic = $mqtt->topic();
  433. GP_ForallClients($hash,sub {
  434. my $client = shift;
  435. Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
  436. if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
  437. readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
  438. my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn};
  439. if($fn) {
  440. CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message()))
  441. } elsif ($client->{TYPE} eq "MQTT_DEVICE") {
  442. MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
  443. } elsif ($client->{TYPE} eq "MQTT_BRIDGE") {
  444. MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
  445. } else {
  446. Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE});
  447. }
  448. };
  449. },undef);
  450. if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
  451. my $message_id = $mqtt->message_id();
  452. if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
  453. send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
  454. } else {
  455. send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
  456. }
  457. }
  458. last;
  459. };
  460. $message_type == MQTT_PUBACK and do {
  461. my $message_id = $mqtt->message_id();
  462. GP_ForallClients($hash,sub {
  463. my $client = shift;
  464. if ($client->{message_ids}->{$message_id}) {
  465. readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
  466. delete $client->{message_ids}->{$message_id};
  467. };
  468. },undef);
  469. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  470. last;
  471. };
  472. $message_type == MQTT_PUBREC and do {
  473. my $message_id = $mqtt->message_id();
  474. GP_ForallClients($hash,sub {
  475. my $client = shift;
  476. if ($client->{message_ids}->{$message_id}) {
  477. readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
  478. };
  479. },undef);
  480. send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
  481. last;
  482. };
  483. $message_type == MQTT_PUBREL and do {
  484. my $message_id = $mqtt->message_id();
  485. GP_ForallClients($hash,sub {
  486. my $client = shift;
  487. if ($client->{message_ids}->{$message_id}) {
  488. readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
  489. delete $client->{message_ids}->{$message_id};
  490. };
  491. },undef);
  492. send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
  493. delete $hash->{messages}->{$message_id};
  494. last;
  495. };
  496. $message_type == MQTT_PUBCOMP and do {
  497. my $message_id = $mqtt->message_id();
  498. GP_ForallClients($hash,sub {
  499. my $client = shift;
  500. if ($client->{message_ids}->{$message_id}) {
  501. readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
  502. delete $client->{message_ids}->{$message_id};
  503. };
  504. },undef);
  505. delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
  506. last;
  507. };
  508. $message_type == MQTT_SUBACK and do {
  509. my $message_id = $mqtt->message_id();
  510. GP_ForallClients($hash,sub {
  511. my $client = shift;
  512. if ($client->{message_ids}->{$message_id}) {
  513. readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
  514. delete $client->{message_ids}->{$message_id};
  515. };
  516. },undef);
  517. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  518. last;
  519. };
  520. $message_type == MQTT_UNSUBACK and do {
  521. my $message_id = $mqtt->message_id();
  522. GP_ForallClients($hash,sub {
  523. my $client = shift;
  524. if ($client->{message_ids}->{$message_id}) {
  525. readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
  526. delete $client->{message_ids}->{$message_id};
  527. };
  528. },undef);
  529. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  530. last;
  531. };
  532. $message_type == MQTT_PINGRESP and do {
  533. $hash->{ping_received} = 1;
  534. readingsSingleUpdate($hash,"connection","active",1);
  535. last;
  536. };
  537. Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
  538. }
  539. }
  540. return undef;
  541. };
  542. sub send_connect($) {
  543. my $hash = shift;
  544. my $name = $hash->{NAME};
  545. my $user = getKeyValue($name."_user");
  546. my $pass = getKeyValue($name."_pass");
  547. my $lw = AttrVal($name,"last-will",undef);
  548. my $clientId = AttrVal($name,"client-id",undef);
  549. my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw);
  550. return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, client_id=>$clientId, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos);
  551. };
  552. sub send_publish($@) {
  553. my ($hash,%msg) = @_;
  554. if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
  555. send_message(shift, message_type => MQTT_PUBLISH, %msg);
  556. return undef;
  557. } else {
  558. my $msgid = $hash->{msgid}++;
  559. send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
  560. return $msgid;
  561. }
  562. };
  563. sub send_subscribe($@) {
  564. my $hash = shift;
  565. my $msgid = $hash->{msgid}++;
  566. send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  567. return $msgid;
  568. };
  569. sub send_unsubscribe($@) {
  570. my $hash = shift;
  571. my $msgid = $hash->{msgid}++;
  572. send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  573. return $msgid;
  574. };
  575. sub send_ping($) {
  576. return send_message(shift, message_type => MQTT_PINGREQ);
  577. };
  578. sub send_disconnect($) {
  579. my $hash = shift;
  580. onDisconnect($hash);
  581. return send_message($hash, message_type => MQTT_DISCONNECT);
  582. };
  583. sub send_message($$$@) {
  584. my ($hash,%msg) = @_;
  585. my $name = $hash->{NAME};
  586. my $message = Net::MQTT::Message->new(%msg);
  587. Log3($name,5,"MQTT $name message sent: ".$message->string());
  588. if (defined $msg{message_id}) {
  589. $hash->{messages}->{$msg{message_id}} = {
  590. message => $message,
  591. timeout => gettimeofday()+$hash->{timeout},
  592. };
  593. }
  594. DevIo_SimpleWrite($hash,$message->bytes,undef);
  595. };
  596. sub topic_to_regexp($) {
  597. my $t = shift;
  598. $t =~ s|#$|.\*|;
  599. $t =~ s|\/\.\*$|.\*|;
  600. $t =~ s|\/|\\\/|g;
  601. $t =~ s|(\+)([^+]*$)|(+)$2|;
  602. $t =~ s|\+|[^\/]+|g;
  603. return "^$t\$";
  604. }
  605. sub client_subscribe_topic($$;$$) {
  606. my ($client,$topic,$qos,$retain) = @_;
  607. push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
  608. my $expr = topic_to_regexp($topic);
  609. push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
  610. if ($main::init_done) {
  611. if (my $mqtt = $client->{IODev}) {;
  612. $qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
  613. $retain = 0 unless defined $retain; # not supported yet
  614. my $msgid = send_subscribe($mqtt,
  615. topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]],
  616. );
  617. $client->{message_ids}->{$msgid}++;
  618. readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
  619. }
  620. }
  621. };
  622. sub client_unsubscribe_topic($$) {
  623. my ($client,$topic) = @_;
  624. $client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
  625. my $expr = topic_to_regexp($topic);
  626. $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
  627. if ($main::init_done) {
  628. if (my $mqtt = $client->{IODev}) {;
  629. my $msgid = send_unsubscribe($mqtt,
  630. topics => [$topic],
  631. );
  632. $client->{message_ids}->{$msgid}++;
  633. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
  634. }
  635. }
  636. };
  637. sub Client_Define($$) {
  638. my ( $client, $def ) = @_;
  639. $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
  640. #$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
  641. $client->{".qos"}->{'*'} = 0;
  642. $client->{".retain"}->{'*'} = "0";
  643. $client->{subscribe} = [];
  644. $client->{subscribeExpr} = [];
  645. AssignIoPort($client);
  646. if ($main::init_done) {
  647. return client_start($client);
  648. } else {
  649. return undef;
  650. }
  651. };
  652. sub Client_Undefine($) {
  653. client_stop(shift);
  654. return undef;
  655. };
  656. #use Data::Dumper;
  657. sub client_attr($$$$$) {
  658. my ($client,$command,$name,$attribute,$value) = @_;
  659. ATTRIBUTE_HANDLER: {
  660. $attribute eq "qos" and do {
  661. #if ($command eq "set") {
  662. # $client->{qos} = $MQTT::qos{$value}; ### ALT
  663. #} else {
  664. # $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
  665. #}
  666. delete($client->{".qos"});
  667. if ($command ne "set") {
  668. delete($client->{".qos"});
  669. $client->{".qos"}->{"*"} = "0";
  670. } else {
  671. my @values = ();
  672. if(!defined($value) || $value=~/^[ \t]*$/) {
  673. return "QOS value may not be empty. Format: [<reading>|*:]0|1|2";
  674. }
  675. @values = split("[ \t]+",$value);
  676. foreach my $set (@values) {
  677. my($rname,$rvalue) = split(":",$set);
  678. if(!defined($rvalue)) {
  679. $rvalue=$rname;
  680. $rname="";
  681. $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
  682. }
  683. #if ($command eq "set") {
  684. # Map constants
  685. #$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE));
  686. #$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE));
  687. #$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE));
  688. $rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue}));
  689. if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") {
  690. return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [<reading>|*:]0|1|2";
  691. }
  692. #$rvalue="1" unless ($rvalue eq "0");
  693. $client->{".qos"}->{$rname} = $rvalue;
  694. #} else {
  695. # delete($client->{".qos"}->{$rname});
  696. # $client->{".qos"}->{"*"} = "0" if($rname eq "*");
  697. #}
  698. }
  699. }
  700. my $showqos = "";
  701. if(defined($client->{".qos"})) {
  702. foreach my $rname (sort keys %{$client->{".qos"}}) {
  703. my $rvalue = $client->{".qos"}->{$rname};
  704. $rname="[state]" if ($rname eq "");
  705. $showqos.=$rname.':'.$rvalue.' ';
  706. }
  707. }
  708. $client->{"qos"} = $showqos;
  709. last;
  710. };
  711. $attribute eq "retain" and do {
  712. delete($client->{".retain"});
  713. if ($command ne "set") {
  714. delete($client->{".retain"});
  715. $client->{".retain"}->{"*"} = "0";
  716. } else {
  717. my @values = ();
  718. if(!defined($value) || $value=~/^[ \t]*$/) {
  719. return "retain value may not be empty. Format: [<reading>|*:]0|1";
  720. }
  721. @values = split("[ \t]+",$value);
  722. foreach my $set (@values) {
  723. my($rname,$rvalue) = split(":",$set);
  724. if(!defined($rvalue)) {
  725. $rvalue=$rname;
  726. $rname="";
  727. $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
  728. }
  729. if($rvalue ne "0" && $rvalue ne "1") {
  730. return "unexpected retain value. use 0 or 1. Format: [<reading>|*:]0|1";
  731. }
  732. $client->{".retain"}->{$rname} = $rvalue;
  733. }
  734. }
  735. my $showretain = "";
  736. if(defined($client->{".retain"})) {
  737. foreach my $rname (sort keys %{$client->{".retain"}}) {
  738. my $rvalue = $client->{".retain"}->{$rname};
  739. $rname="[state]" if ($rname eq "");
  740. $showretain.=$rname.':'.$rvalue.' ';
  741. }
  742. }
  743. $client->{"retain"} = $showretain;
  744. last;
  745. };
  746. $attribute eq "IODev" and do {
  747. if ($main::init_done) {
  748. if ($command eq "set") {
  749. client_stop($client);
  750. $main::attr{$name}{IODev} = $value;
  751. client_start($client);
  752. } else {
  753. client_stop($client);
  754. }
  755. }
  756. last;
  757. };
  758. }
  759. };
  760. sub client_start($) {
  761. my $client = shift;
  762. my $name = $client->{NAME};
  763. if (! (defined AttrVal($name,"stateFormat",undef))) {
  764. $main::attr{$name}{stateFormat} = "transmission-state";
  765. }
  766. if (@{$client->{subscribe}}) {
  767. my $msgid = send_subscribe($client->{IODev},
  768. topics => [map { [$_ => $client->{".qos"}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
  769. );
  770. $client->{message_ids}->{$msgid}++;
  771. readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
  772. }
  773. return undef;
  774. };
  775. sub client_stop($) {
  776. my $client = shift;
  777. if (@{$client->{subscribe}}) {
  778. my $msgid = send_unsubscribe($client->{IODev},
  779. topics => [@{$client->{subscribe}}],
  780. );
  781. $client->{message_ids}->{$msgid}++;
  782. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
  783. }
  784. };
  785. 1;
  786. =pod
  787. =item [device]
  788. =item summary connects fhem to MQTT
  789. =begin html
  790. <a name="MQTT"></a>
  791. <h3>MQTT</h3>
  792. <ul>
  793. <p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
  794. <p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> and <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
  795. Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
  796. Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
  797. <a name="MQTTdefine"></a>
  798. <p><b>Define</b></p>
  799. <ul>
  800. <p><code>define &lt;name&gt; MQTT &lt;ip:port&gt; [&lt;username&gt;] [&lt;password&gt;]</code></p>
  801. <p>Specifies the MQTT device.</p>
  802. </ul>
  803. <a name="MQTTset"></a>
  804. <p><b>Set</b></p>
  805. <ul>
  806. <li>
  807. <p><code>set &lt;name&gt; connect</code><br/>
  808. (re-)connects the MQTT-device to the mqtt-broker</p>
  809. </li>
  810. <li>
  811. <p><code>set &lt;name&gt; disconnect</code><br/>
  812. disconnects the MQTT-device from the mqtt-broker</p>
  813. </li>
  814. <li>
  815. <p><code>set &lt;name&gt; publish [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
  816. sends message to the specified topic</p>
  817. </li>
  818. </ul>
  819. <a name="MQTTattr"></a>
  820. <p><b>Attributes</b></p>
  821. <ul>
  822. <li>
  823. <p>keep-alive<br/>
  824. sets the keep-alive time (in seconds).</p>
  825. </li>
  826. <li>
  827. <p><code>attr &lt;name&gt; last-will [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
  828. Support for MQTT feature "last will"
  829. </p>
  830. <p>example:<br/>
  831. <code>attr mqtt last-will /fhem/status crashed</code>
  832. </p>
  833. </li>
  834. <li>
  835. <p><code>attr &lt;name&gt; client-id client id</code><br/>
  836. redefines client id
  837. </p>
  838. <p>example:<br/>
  839. <code>attr mqtt client-id fhem1234567</code>
  840. </p>
  841. </li>
  842. <li>
  843. <p>on-connect, on-disconnect<br/>
  844. <code>attr &lt;name&gt; on-connect {Perl-expression} &lt;topic&gt; &lt;message&gt;</code><br/>
  845. Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression<br/>
  846. If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.<br/>
  847. The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message.
  848. </p>
  849. <p>examples:<br/>
  850. <code>attr mqtt on-connect /topic/status connected</code><br/>
  851. <code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
  852. </p>
  853. </li>
  854. <li>
  855. <p>on-timeout<br/>
  856. <code>attr &lt;name&gt; on-timeout {Perl-expression}</code>
  857. evaluate the given Perl expression on timeout<br/>
  858. </p>
  859. </li>
  860. </ul>
  861. </ul>
  862. =end html
  863. =cut