00_MQTT.pm 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040
  1. ##############################################
  2. #
  3. # fhem bridge to mqtt (see http://mqtt.org)
  4. #
  5. # Copyright (C) 2018 Alexander Schulz
  6. # Copyright (C) 2017 Stephan Eisler
  7. # Copyright (C) 2014 - 2016 Norbert Truchsess
  8. #
  9. # This file is part of fhem.
  10. #
  11. # Fhem is free software: you can redistribute it and/or modify
  12. # it under the terms of the GNU General Public License as published by
  13. # the Free Software Foundation, either version 2 of the License, or
  14. # (at your option) any later version.
  15. #
  16. # Fhem is distributed in the hope that it will be useful,
  17. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  19. # GNU General Public License for more details.
  20. #
  21. # You should have received a copy of the GNU General Public License
  22. # along with fhem. If not, see <http://www.gnu.org/licenses/>.
  23. #
  24. # $Id: 00_MQTT.pm 17362 2018-09-17 12:57:29Z hexenmeister $
  25. #
  26. ##############################################
  27. my %sets = (
  28. "connect" => "",
  29. "disconnect" => "",
  30. "publish" => "",
  31. );
  32. my %gets = (
  33. "version" => ""
  34. );
  35. my @clients = qw(
  36. MQTT_DEVICE
  37. MQTT_BRIDGE
  38. );
  39. sub MQTT_Initialize($) {
  40. my $hash = shift @_;
  41. require "$main::attr{global}{modpath}/FHEM/DevIo.pm";
  42. # Provider
  43. $hash->{Clients} = join (':',@clients);
  44. $hash->{ReadyFn} = "MQTT::Ready";
  45. $hash->{ReadFn} = "MQTT::Read";
  46. # Consumer
  47. $hash->{DefFn} = "MQTT::Define";
  48. $hash->{UndefFn} = "MQTT::Undef";
  49. $hash->{DeleteFn} = "MQTT::Delete";
  50. $hash->{RenameFn} = "MQTT::Rename";
  51. $hash->{ShutdownFn} = "MQTT::Shutdown";
  52. $hash->{SetFn} = "MQTT::Set";
  53. $hash->{NotifyFn} = "MQTT::Notify";
  54. $hash->{AttrFn} = "MQTT::Attr";
  55. $hash->{AttrList} = "keep-alive "."last-will client-id "."on-connect on-disconnect on-timeout ".$main::readingFnAttributes;
  56. }
  57. package MQTT;
  58. use Exporter ('import');
  59. @EXPORT = ();
  60. @EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams);
  61. %EXPORT_TAGS = (all => [@EXPORT_OK]);
  62. use strict;
  63. use warnings;
  64. use GPUtils qw(:all);
  65. use Net::MQTT::Constants;
  66. use Net::MQTT::Message;
  67. our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
  68. BEGIN {GP_Import(qw(
  69. gettimeofday
  70. readingsSingleUpdate
  71. DevIo_OpenDev
  72. DevIo_SimpleWrite
  73. DevIo_SimpleRead
  74. DevIo_CloseDev
  75. RemoveInternalTimer
  76. InternalTimer
  77. AttrVal
  78. ReadingsVal
  79. Log3
  80. AssignIoPort
  81. getKeyValue
  82. setKeyValue
  83. CallFn
  84. defs
  85. modules
  86. looks_like_number
  87. fhem
  88. ))};
  89. sub Define($$) {
  90. my ( $hash, $def ) = @_;
  91. $hash->{NOTIFYDEV} = "global";
  92. $hash->{msgid} = 1;
  93. $hash->{timeout} = 60;
  94. $hash->{messages} = {};
  95. my ($host,$username,$password) = split("[ \t]+", $hash->{DEF});
  96. $hash->{DeviceName} = $host;
  97. my $name = $hash->{NAME};
  98. my $user = getKeyValue($name."_user");
  99. my $pass = getKeyValue($name."_pass");
  100. setKeyValue($name."_user",$username) unless(defined($user));
  101. setKeyValue($name."_pass",$password) unless(defined($pass));
  102. $hash->{DEF} = $host;
  103. #readingsSingleUpdate($hash,"connection","disconnected",0);
  104. if ($main::init_done) {
  105. return Start($hash);
  106. } else {
  107. return undef;
  108. }
  109. }
  110. sub Undef($) {
  111. my $hash = shift;
  112. Stop($hash);
  113. return undef;
  114. }
  115. sub Delete($$) {
  116. my ($hash, $name) = @_;
  117. setKeyValue($name."_user",undef);
  118. setKeyValue($name."_pass",undef);
  119. return undef;
  120. }
  121. sub Shutdown($) {
  122. my $hash = shift;
  123. Stop($hash);
  124. my $name = $hash->{NAME};
  125. Log3($name,1,"Shutdown executed");
  126. return undef;
  127. }
  128. sub onConnect($) {
  129. my $hash = shift;
  130. my $name = $hash->{NAME};
  131. my $cmdstr = AttrVal($name,"on-connect",undef);
  132. return process_event($hash,$cmdstr);
  133. }
  134. sub onDisconnect($) {
  135. my $hash = shift;
  136. my $name = $hash->{NAME};
  137. my $cmdstr = AttrVal($name,"on-disconnect",undef);
  138. return process_event($hash,$cmdstr);
  139. }
  140. sub onTimeout($) {
  141. my $hash = shift;
  142. my $name = $hash->{NAME};
  143. my $cmdstr = AttrVal($name,"on-timeout",undef);
  144. if($cmdstr) {
  145. return eval($cmdstr);
  146. }
  147. }
  148. sub isConnected($) {
  149. my $hash = shift;
  150. my $cstate=ReadingsVal($hash->{NAME}, "connection", "");
  151. return 1 if($cstate eq "connected" || $cstate eq "active");
  152. return undef;
  153. }
  154. sub process_event($$) {
  155. my $hash = shift;
  156. my $str = shift;
  157. my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str);
  158. my $do=1;
  159. if($cmd) {
  160. my $name = $hash->{NAME};
  161. $do=eval($cmd);
  162. $do=1 if (!defined($do));
  163. #no strict "refs";
  164. #my $ret = &{$hash->{WBCallback}}($hash);
  165. #use strict "refs";
  166. }
  167. if($do && defined($topic)) {
  168. $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
  169. send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain);
  170. }
  171. }
  172. sub Set($@) {
  173. my ($hash, @a) = @_;
  174. return "Need at least one parameters" if(@a < 2);
  175. return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
  176. if(!defined($sets{$a[1]}));
  177. my $command = $a[1];
  178. my $value = $a[2];
  179. COMMAND_HANDLER: {
  180. $command eq "connect" and do {
  181. Start($hash);
  182. last;
  183. };
  184. $command eq "disconnect" and do {
  185. Stop($hash);
  186. last;
  187. };
  188. $command eq "publish" and do {
  189. shift(@a);
  190. shift(@a);
  191. #if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";}
  192. #my $qos=0;
  193. #my $retain=0;
  194. #if(looks_like_number ($a[0])) {
  195. # $qos = int($a[0]);
  196. # $qos = 0 if $qos>1;
  197. # shift(@a);
  198. # if(looks_like_number ($a[0])) {
  199. # $retain = int($a[0]);
  200. # $retain = 0 if $retain>2;
  201. # shift(@a);
  202. # }
  203. #}
  204. #if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";}
  205. #my $topic = shift(@a);
  206. #my $value = join (" ", @a);
  207. my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
  208. return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic);
  209. return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+');
  210. $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
  211. my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
  212. last;
  213. }
  214. };
  215. }
  216. sub parseParams($;$$$$) {
  217. my ( $cmd, $separator, $joiner, $keyvalueseparator, $acceptedkeys ) = @_;
  218. $separator = ' ' if ( !$separator );
  219. $joiner = $separator if ( !$joiner ); # needed if separator is a regexp
  220. $keyvalueseparator = ':' if(!$keyvalueseparator);
  221. my ( @a, %h );
  222. my @params;
  223. if ( ref($cmd) eq 'ARRAY' ) {
  224. @params = @{$cmd};
  225. }
  226. else {
  227. @params = split( $separator, $cmd );
  228. }
  229. while (@params) {
  230. my $param = shift(@params);
  231. next if ( $param eq "" );
  232. my ( $key, $value ) = split( $keyvalueseparator, $param, 2 );
  233. if ( !defined($value) ) {
  234. $value = $key;
  235. $key = undef;
  236. # the key can not start with a { -> it must be a perl expression # vim:}
  237. }
  238. elsif ( $key =~ m/^\s*{/ ) { # for vim: }
  239. $value = $param;
  240. $key = undef;
  241. }
  242. # the key can not start with a ' or "
  243. elsif ( $key =~ m/^\s*('|")/ ) {
  244. $value = $param;
  245. $key = undef;
  246. }
  247. # accept known keys only (if defined $acceptedkeys)
  248. elsif (defined($acceptedkeys) and !defined($acceptedkeys->{$key})) {
  249. $value = $param;
  250. $key = undef;
  251. }
  252. #collect all parts until the closing ' or "
  253. while ( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
  254. my $next = shift(@params);
  255. last if ( !defined($next) );
  256. $value .= $joiner . $next;
  257. }
  258. #remove matching ' or " from the start and end
  259. if ( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
  260. $value =~ s/^.(.*).$/$1/;
  261. }
  262. #collect all parts until opening { and closing } are matched
  263. if ( $value =~ m/^\s*{/ ) { # } for match
  264. my $count = 0;
  265. for my $i ( 0 .. length($value) - 1 ) {
  266. my $c = substr( $value, $i, 1 );
  267. ++$count if ( $c eq '{' );
  268. --$count if ( $c eq '}' );
  269. }
  270. while ( $param && $count != 0 ) {
  271. my $next = shift(@params);
  272. last if ( !defined($next) );
  273. $value .= $joiner . $next;
  274. for my $i ( 0 .. length($next) - 1 ) {
  275. my $c = substr( $next, $i, 1 );
  276. ++$count if ( $c eq '{' );
  277. --$count if ( $c eq '}' );
  278. }
  279. }
  280. }
  281. if ( defined($key) ) {
  282. $h{$key} = $value;
  283. }
  284. else {
  285. push @a, $value;
  286. }
  287. }
  288. return ( \@a, \%h );
  289. }
  290. sub parsePublishCmdStr($) {
  291. my ($str) = @_;
  292. return undef unless defined($str);
  293. my @lwa = split("[ \t]+",$str);
  294. return parsePublishCmd(@lwa);
  295. }
  296. sub parsePublishCmd(@) {
  297. my @a = @_;
  298. my ( $aa, $bb ) = parseParams(\@a,undef,undef,undef,{qos=>1,retain=>1});
  299. my $qos = 0;
  300. my $retain = 0;
  301. my $topic = undef;
  302. my $value = "\0";
  303. my $expression = undef;
  304. if ( exists( $bb->{'qos'} ) ) {
  305. $qos = $bb->{'qos'};
  306. }
  307. if ( exists $bb->{'retain'} ) {
  308. $retain = $bb->{'retain'};
  309. }
  310. my @aaa = ();
  311. my @xaa = @{$aa};
  312. while ( scalar(@xaa) > 0 ) {
  313. my $av = shift @xaa;
  314. if (!defined($expression) and $av =~ /^\{.*\}$/ and scalar(@xaa)>0) {
  315. $expression = $av;
  316. next;
  317. }
  318. else {
  319. push @aaa, $av;
  320. }
  321. }
  322. $topic = shift(@aaa);
  323. if ( scalar(@aaa) > 0 ) {
  324. $value = join( " ", @aaa );
  325. }
  326. return undef unless $topic || $expression;
  327. return ( $qos, $retain, $topic, $value, $expression );
  328. }
  329. sub Notify($$) {
  330. my ($hash,$dev) = @_;
  331. if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
  332. Start($hash);
  333. } elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
  334. }
  335. }
  336. sub Attr($$$$) {
  337. my ($command,$name,$attribute,$value) = @_;
  338. my $hash = $main::defs{$name};
  339. ATTRIBUTE_HANDLER: {
  340. $attribute eq "keep-alive" and do {
  341. if ($command eq "set") {
  342. $hash->{timeout} = $value;
  343. } else {
  344. $hash->{timeout} = 60;
  345. }
  346. if ($main::init_done) {
  347. $hash->{ping_received}=1;
  348. Timer($hash);
  349. };
  350. last;
  351. };
  352. $attribute eq "last-will" and do {
  353. if($hash->{STATE} ne "disconnected") {
  354. Stop($hash);
  355. InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0);
  356. }
  357. last;
  358. };
  359. };
  360. }
  361. #sub Reconnect($){
  362. # my $hash = shift;
  363. # Stop($hash);
  364. # Start($hash);
  365. #}
  366. sub Start($) {
  367. my $hash = shift;
  368. my $firsttime = $hash->{".cinitmark"};
  369. if(defined($firsttime)) {
  370. my $cstate=ReadingsVal($hash->{NAME},"connection","");
  371. if($cstate ne "disconnected" && $cstate ne "timed-out") {
  372. return undef;
  373. }
  374. } else {
  375. $hash->{".cinitmark"} = 1;
  376. }
  377. DevIo_CloseDev($hash);
  378. return DevIo_OpenDev($hash, 0, "MQTT::Init");
  379. }
  380. sub Stop($) {
  381. my $hash = shift;
  382. my $cstate=ReadingsVal($hash->{NAME},"connection","");
  383. if($cstate eq "disconnected" || $cstate eq "timed-out") {
  384. return undef;
  385. }
  386. send_disconnect($hash);
  387. DevIo_CloseDev($hash);
  388. RemoveInternalTimer($hash);
  389. readingsSingleUpdate($hash,"connection","disconnected",1);
  390. }
  391. sub Ready($) {
  392. my $hash = shift;
  393. return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
  394. }
  395. sub Rename() {
  396. my ($new,$old) = @_;
  397. setKeyValue($new."_user",getKeyValue($old."_user"));
  398. setKeyValue($new."_pass",getKeyValue($old."_pass"));
  399. setKeyValue($old."_user",undef);
  400. setKeyValue($old."_pass",undef);
  401. return undef;
  402. }
  403. sub Init($) {
  404. my $hash = shift;
  405. send_connect($hash);
  406. readingsSingleUpdate($hash,"connection","connecting",1);
  407. $hash->{ping_received}=1;
  408. Timer($hash);
  409. return undef;
  410. }
  411. sub Timer($) {
  412. my $hash = shift;
  413. RemoveInternalTimer($hash);
  414. unless ($hash->{ping_received}) {
  415. onTimeout($hash);
  416. readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received};
  417. GP_ForallClients($hash,\&notify_client_connection_timeout);
  418. }
  419. $hash->{ping_received} = 0;
  420. InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
  421. send_ping($hash);
  422. }
  423. sub Read {
  424. my ($hash) = @_;
  425. my $name = $hash->{NAME};
  426. my $buf = DevIo_SimpleRead($hash);
  427. return undef unless $buf;
  428. $hash->{buf} .= $buf;
  429. while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
  430. my $message_type = $mqtt->message_type();
  431. Log3($name,5,"MQTT $name message received: ".$mqtt->string());
  432. MESSAGE_TYPE: {
  433. $message_type == MQTT_CONNACK and do {
  434. readingsSingleUpdate($hash,"connection","connected",1);
  435. onConnect($hash);
  436. GP_ForallClients($hash,\&client_start);
  437. GP_ForallClients($hash,\&notify_client_connected);
  438. foreach my $message_id (keys %{$hash->{messages}}) {
  439. my $msg = $hash->{messages}->{$message_id}->{message};
  440. $msg->{dup} = 1;
  441. DevIo_SimpleWrite($hash,$msg->bytes,undef);
  442. }
  443. last;
  444. };
  445. $message_type == MQTT_PUBLISH and do {
  446. my $topic = $mqtt->topic();
  447. GP_ForallClients($hash,sub {
  448. my $client = shift;
  449. Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
  450. if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
  451. readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
  452. my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn};
  453. if($fn) {
  454. CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message()))
  455. } elsif ($client->{TYPE} eq "MQTT_DEVICE") {
  456. MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
  457. } elsif ($client->{TYPE} eq "MQTT_BRIDGE") {
  458. MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
  459. } else {
  460. Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE});
  461. }
  462. };
  463. },undef);
  464. if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
  465. my $message_id = $mqtt->message_id();
  466. if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
  467. send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
  468. } else {
  469. send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
  470. }
  471. }
  472. last;
  473. };
  474. $message_type == MQTT_PUBACK and do {
  475. my $message_id = $mqtt->message_id();
  476. GP_ForallClients($hash,sub {
  477. my $client = shift;
  478. if ($client->{message_ids}->{$message_id}) {
  479. readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
  480. delete $client->{message_ids}->{$message_id};
  481. };
  482. },undef);
  483. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  484. last;
  485. };
  486. $message_type == MQTT_PUBREC and do {
  487. my $message_id = $mqtt->message_id();
  488. GP_ForallClients($hash,sub {
  489. my $client = shift;
  490. if ($client->{message_ids}->{$message_id}) {
  491. readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
  492. };
  493. },undef);
  494. send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
  495. last;
  496. };
  497. $message_type == MQTT_PUBREL and do {
  498. my $message_id = $mqtt->message_id();
  499. GP_ForallClients($hash,sub {
  500. my $client = shift;
  501. if ($client->{message_ids}->{$message_id}) {
  502. readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
  503. delete $client->{message_ids}->{$message_id};
  504. };
  505. },undef);
  506. send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
  507. delete $hash->{messages}->{$message_id};
  508. last;
  509. };
  510. $message_type == MQTT_PUBCOMP and do {
  511. my $message_id = $mqtt->message_id();
  512. GP_ForallClients($hash,sub {
  513. my $client = shift;
  514. if ($client->{message_ids}->{$message_id}) {
  515. readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
  516. delete $client->{message_ids}->{$message_id};
  517. };
  518. },undef);
  519. delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
  520. last;
  521. };
  522. $message_type == MQTT_SUBACK and do {
  523. my $message_id = $mqtt->message_id();
  524. GP_ForallClients($hash,sub {
  525. my $client = shift;
  526. if ($client->{message_ids}->{$message_id}) {
  527. readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
  528. delete $client->{message_ids}->{$message_id};
  529. };
  530. },undef);
  531. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  532. last;
  533. };
  534. $message_type == MQTT_UNSUBACK and do {
  535. my $message_id = $mqtt->message_id();
  536. GP_ForallClients($hash,sub {
  537. my $client = shift;
  538. if ($client->{message_ids}->{$message_id}) {
  539. readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
  540. delete $client->{message_ids}->{$message_id};
  541. };
  542. },undef);
  543. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  544. last;
  545. };
  546. $message_type == MQTT_PINGRESP and do {
  547. $hash->{ping_received} = 1;
  548. readingsSingleUpdate($hash,"connection","active",1);
  549. last;
  550. };
  551. Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
  552. }
  553. }
  554. return undef;
  555. };
  556. sub send_connect($) {
  557. my $hash = shift;
  558. my $name = $hash->{NAME};
  559. my $user = getKeyValue($name."_user");
  560. my $pass = getKeyValue($name."_pass");
  561. my $lw = AttrVal($name,"last-will",undef);
  562. my $clientId = AttrVal($name,"client-id",undef);
  563. my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw);
  564. return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, client_id=>$clientId, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos);
  565. };
  566. sub send_publish($@) {
  567. my ($hash,%msg) = @_;
  568. if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
  569. send_message(shift, message_type => MQTT_PUBLISH, %msg);
  570. return undef;
  571. } else {
  572. my $msgid = $hash->{msgid}++;
  573. send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
  574. return $msgid;
  575. }
  576. };
  577. sub send_subscribe($@) {
  578. my $hash = shift;
  579. my $msgid = $hash->{msgid}++;
  580. send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  581. return $msgid;
  582. };
  583. sub send_unsubscribe($@) {
  584. my $hash = shift;
  585. my $msgid = $hash->{msgid}++;
  586. send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  587. return $msgid;
  588. };
  589. sub send_ping($) {
  590. return send_message(shift, message_type => MQTT_PINGREQ);
  591. };
  592. sub send_disconnect($) {
  593. my $hash = shift;
  594. onDisconnect($hash);
  595. GP_ForallClients($hash,\&notify_client_disconnected);
  596. return send_message($hash, message_type => MQTT_DISCONNECT);
  597. };
  598. sub send_message($$$@) {
  599. my ($hash,%msg) = @_;
  600. my $name = $hash->{NAME};
  601. my $message = Net::MQTT::Message->new(%msg);
  602. Log3($name,5,"MQTT $name message sent: ".$message->string());
  603. if (defined $msg{message_id}) {
  604. $hash->{messages}->{$msg{message_id}} = {
  605. message => $message,
  606. timeout => gettimeofday()+$hash->{timeout},
  607. };
  608. }
  609. DevIo_SimpleWrite($hash,$message->bytes,undef);
  610. };
  611. sub topic_to_regexp($) {
  612. my $t = shift;
  613. $t =~ s|#$|.\*|;
  614. $t =~ s|\$|\\\$|g;
  615. $t =~ s|\/\.\*$|.\*|;
  616. $t =~ s|\/|\\\/|g;
  617. $t =~ s|(\+)([^+]*$)|(+)$2|;
  618. $t =~ s|\+|[^\/]+|g;
  619. return "^$t\$";
  620. }
  621. sub client_subscribe_topic($$;$$) {
  622. my ($client,$topic,$qos,$retain) = @_;
  623. push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
  624. $client->{subscribeQos}->{$topic}=$qos;
  625. my $expr = topic_to_regexp($topic);
  626. push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
  627. if ($main::init_done) {
  628. if (my $mqtt = $client->{IODev}) {;
  629. $qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
  630. $retain = 0 unless defined $retain; # not supported yet
  631. my $msgid = send_subscribe($mqtt,
  632. topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]],
  633. );
  634. $client->{message_ids}->{$msgid}++;
  635. readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
  636. }
  637. }
  638. };
  639. sub client_unsubscribe_topic($$) {
  640. my ($client,$topic) = @_;
  641. $client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
  642. delete $client->{subscribeQos}->{$topic};
  643. my $expr = topic_to_regexp($topic);
  644. $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
  645. if ($main::init_done) {
  646. if (my $mqtt = $client->{IODev}) {;
  647. my $msgid = send_unsubscribe($mqtt,
  648. topics => [$topic],
  649. );
  650. $client->{message_ids}->{$msgid}++;
  651. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
  652. }
  653. }
  654. };
  655. sub Client_Define($$) {
  656. my ( $client, $def ) = @_;
  657. $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
  658. #$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
  659. $client->{".qos"}->{'*'} = 0;
  660. $client->{".retain"}->{'*'} = "0";
  661. $client->{subscribe} = [];
  662. $client->{subscribeQos} = {};
  663. $client->{subscribeExpr} = [];
  664. AssignIoPort($client);
  665. if ($main::init_done) {
  666. return client_start($client);
  667. } else {
  668. return undef;
  669. }
  670. };
  671. sub Client_Undefine($) {
  672. client_stop(shift);
  673. return undef;
  674. };
  675. #use Data::Dumper;
  676. sub client_attr($$$$$) {
  677. my ($client,$command,$name,$attribute,$value) = @_;
  678. ATTRIBUTE_HANDLER: {
  679. $attribute eq "qos" and do {
  680. #if ($command eq "set") {
  681. # $client->{qos} = $MQTT::qos{$value}; ### ALT
  682. #} else {
  683. # $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
  684. #}
  685. delete($client->{".qos"});
  686. if ($command ne "set") {
  687. delete($client->{".qos"});
  688. $client->{".qos"}->{"*"} = "0";
  689. } else {
  690. my @values = ();
  691. if(!defined($value) || $value=~/^[ \t]*$/) {
  692. return "QOS value may not be empty. Format: [<reading>|*:]0|1|2";
  693. }
  694. @values = split("[ \t]+",$value);
  695. foreach my $set (@values) {
  696. my($rname,$rvalue) = split(":",$set);
  697. if(!defined($rvalue)) {
  698. $rvalue=$rname;
  699. $rname="";
  700. $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
  701. }
  702. #if ($command eq "set") {
  703. # Map constants
  704. #$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE));
  705. #$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE));
  706. #$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE));
  707. $rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue}));
  708. if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") {
  709. return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [<reading>|*:]0|1|2";
  710. }
  711. #$rvalue="1" unless ($rvalue eq "0");
  712. $client->{".qos"}->{$rname} = $rvalue;
  713. #} else {
  714. # delete($client->{".qos"}->{$rname});
  715. # $client->{".qos"}->{"*"} = "0" if($rname eq "*");
  716. #}
  717. }
  718. }
  719. my $showqos = "";
  720. if(defined($client->{".qos"})) {
  721. foreach my $rname (sort keys %{$client->{".qos"}}) {
  722. my $rvalue = $client->{".qos"}->{$rname};
  723. $rname="[state]" if ($rname eq "");
  724. $showqos.=$rname.':'.$rvalue.' ';
  725. }
  726. }
  727. $client->{"qos"} = $showqos;
  728. last;
  729. };
  730. $attribute eq "retain" and do {
  731. delete($client->{".retain"});
  732. if ($command ne "set") {
  733. delete($client->{".retain"});
  734. $client->{".retain"}->{"*"} = "0";
  735. } else {
  736. my @values = ();
  737. if(!defined($value) || $value=~/^[ \t]*$/) {
  738. return "retain value may not be empty. Format: [<reading>|*:]0|1";
  739. }
  740. @values = split("[ \t]+",$value);
  741. foreach my $set (@values) {
  742. my($rname,$rvalue) = split(":",$set);
  743. if(!defined($rvalue)) {
  744. $rvalue=$rname;
  745. $rname="";
  746. $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
  747. }
  748. if($rvalue ne "0" && $rvalue ne "1") {
  749. return "unexpected retain value. use 0 or 1. Format: [<reading>|*:]0|1";
  750. }
  751. $client->{".retain"}->{$rname} = $rvalue;
  752. }
  753. }
  754. my $showretain = "";
  755. if(defined($client->{".retain"})) {
  756. foreach my $rname (sort keys %{$client->{".retain"}}) {
  757. my $rvalue = $client->{".retain"}->{$rname};
  758. $rname="[state]" if ($rname eq "");
  759. $showretain.=$rname.':'.$rvalue.' ';
  760. }
  761. }
  762. $client->{"retain"} = $showretain;
  763. last;
  764. };
  765. $attribute eq "IODev" and do {
  766. if ($main::init_done) {
  767. if ($command eq "set") {
  768. client_stop($client);
  769. $main::attr{$name}{IODev} = $value;
  770. return client_start($client);
  771. } else {
  772. client_stop($client);
  773. }
  774. }
  775. last;
  776. };
  777. }
  778. };
  779. sub notify_client_connected($) {
  780. my $client = shift;
  781. CallFn($client->{NAME},"OnClientConnectFn",($client));
  782. }
  783. sub notify_client_disconnected($) {
  784. my $client = shift;
  785. CallFn($client->{NAME},"OnClientDisconnectFn",($client));
  786. }
  787. sub notify_client_connection_timeout($) {
  788. my $client = shift;
  789. CallFn($client->{NAME},"OnClientConnectionTimeoutFn",($client));
  790. }
  791. sub client_start($) {
  792. my $client = shift;
  793. # probably internal failure
  794. unless (defined $client) {
  795. Log3("MQTT IODev",1,"no client device hash provided");
  796. return "no client device hash provided";
  797. }
  798. # client device without IODev. probably internal failure
  799. unless (defined $client->{IODev}) {
  800. Log3("MQTT IODev",1,"client device hash no IODev provided");
  801. return "client device hash no IODev provided";
  802. }
  803. CallFn($client->{NAME},"OnClientStartFn",($client));
  804. unless (ref($client->{subscribe}) eq "ARRAY") {
  805. Log3($client->{NAME},1,"unknown client or client initialization error");
  806. return "unknown client or client initialization error";
  807. }
  808. if (@{$client->{subscribe}}) {
  809. my $msgid = send_subscribe($client->{IODev},
  810. topics => [map { [$_ => $client->{subscribeQos}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
  811. );
  812. $client->{message_ids}->{$msgid}++;
  813. readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
  814. }
  815. return undef;
  816. };
  817. sub client_stop($) {
  818. my $client = shift;
  819. if (@{$client->{subscribe}}) {
  820. my $msgid = send_unsubscribe($client->{IODev},
  821. topics => [@{$client->{subscribe}}],
  822. );
  823. $client->{message_ids}->{$msgid}++;
  824. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
  825. }
  826. CallFn($client->{NAME},"OnClientStopFn",($client));
  827. };
  828. 1;
  829. =pod
  830. =item [device]
  831. =item summary connects fhem to MQTT
  832. =begin html
  833. <a name="MQTT"></a>
  834. <h3>MQTT</h3>
  835. <ul>
  836. <p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
  837. <p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> and <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
  838. Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
  839. Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
  840. <a name="MQTTdefine"></a>
  841. <p><b>Define</b></p>
  842. <ul>
  843. <p><code>define &lt;name&gt; MQTT &lt;ip:port&gt; [&lt;username&gt;] [&lt;password&gt;]</code></p>
  844. <p>Specifies the MQTT device.</p>
  845. </ul>
  846. <a name="MQTTset"></a>
  847. <p><b>Set</b></p>
  848. <ul>
  849. <li>
  850. <p><code>set &lt;name&gt; connect</code><br/>
  851. (re-)connects the MQTT-device to the mqtt-broker</p>
  852. </li>
  853. <li>
  854. <p><code>set &lt;name&gt; disconnect</code><br/>
  855. disconnects the MQTT-device from the mqtt-broker</p>
  856. </li>
  857. <li>
  858. <p><code>set &lt;name&gt; publish [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
  859. sends message to the specified topic</p>
  860. </li>
  861. </ul>
  862. <a name="MQTTattr"></a>
  863. <p><b>Attributes</b></p>
  864. <ul>
  865. <li>
  866. <p>keep-alive<br/>
  867. sets the keep-alive time (in seconds).</p>
  868. </li>
  869. <li>
  870. <p><code>attr &lt;name&gt; last-will [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
  871. Support for MQTT feature "last will"
  872. </p>
  873. <p>example:<br/>
  874. <code>attr mqtt last-will /fhem/status crashed</code>
  875. </p>
  876. </li>
  877. <li>
  878. <p><code>attr &lt;name&gt; client-id client id</code><br/>
  879. redefines client id
  880. </p>
  881. <p>example:<br/>
  882. <code>attr mqtt client-id fhem1234567</code>
  883. </p>
  884. </li>
  885. <li>
  886. <p>on-connect, on-disconnect<br/>
  887. <code>attr &lt;name&gt; on-connect {Perl-expression} &lt;topic&gt; &lt;message&gt;</code><br/>
  888. Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression<br/>
  889. If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.<br/>
  890. The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message.
  891. </p>
  892. <p>examples:<br/>
  893. <code>attr mqtt on-connect /topic/status connected</code><br/>
  894. <code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
  895. </p>
  896. </li>
  897. <li>
  898. <p>on-timeout<br/>
  899. <code>attr &lt;name&gt; on-timeout {Perl-expression}</code>
  900. evaluate the given Perl expression on timeout<br/>
  901. </p>
  902. </li>
  903. </ul>
  904. </ul>
  905. =end html
  906. =cut