00_MQTT.pm 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
  1. ##############################################
  2. #
  3. # fhem bridge to mqtt (see http://mqtt.org)
  4. #
  5. # Copyright (C) 2017 Stephan Eisler
  6. # Copyright (C) 2014 - 2016 Norbert Truchsess
  7. #
  8. # This file is part of fhem.
  9. #
  10. # Fhem is free software: you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation, either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # Fhem is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License
  21. # along with fhem. If not, see <http://www.gnu.org/licenses/>.
  22. #
  23. # $Id: 00_MQTT.pm 15126 2017-09-24 07:43:17Z eisler $
  24. #
  25. ##############################################
  26. my %sets = (
  27. "connect" => "",
  28. "disconnect" => "",
  29. "publish" => "",
  30. );
  31. my %gets = (
  32. "version" => ""
  33. );
  34. my @clients = qw(
  35. MQTT_DEVICE
  36. MQTT_BRIDGE
  37. );
  38. sub MQTT_Initialize($) {
  39. my $hash = shift @_;
  40. require "$main::attr{global}{modpath}/FHEM/DevIo.pm";
  41. # Provider
  42. $hash->{Clients} = join (':',@clients);
  43. $hash->{ReadyFn} = "MQTT::Ready";
  44. $hash->{ReadFn} = "MQTT::Read";
  45. # Consumer
  46. $hash->{DefFn} = "MQTT::Define";
  47. $hash->{UndefFn} = "MQTT::Undef";
  48. $hash->{DeleteFn} = "MQTT::Delete";
  49. $hash->{ShutdownFn} = "MQTT::Shutdown";
  50. $hash->{SetFn} = "MQTT::Set";
  51. $hash->{NotifyFn} = "MQTT::Notify";
  52. $hash->{AttrFn} = "MQTT::Attr";
  53. $hash->{AttrList} = "keep-alive "."last-will "."on-connect on-disconnect on-timeout ".$main::readingFnAttributes;
  54. }
  55. package MQTT;
  56. use Exporter ('import');
  57. @EXPORT = ();
  58. @EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp);
  59. %EXPORT_TAGS = (all => [@EXPORT_OK]);
  60. use strict;
  61. use warnings;
  62. use GPUtils qw(:all);
  63. use Net::MQTT::Constants;
  64. use Net::MQTT::Message;
  65. our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
  66. BEGIN {GP_Import(qw(
  67. gettimeofday
  68. readingsSingleUpdate
  69. DevIo_OpenDev
  70. DevIo_SimpleWrite
  71. DevIo_SimpleRead
  72. DevIo_CloseDev
  73. RemoveInternalTimer
  74. InternalTimer
  75. AttrVal
  76. ReadingsVal
  77. Log3
  78. AssignIoPort
  79. getKeyValue
  80. setKeyValue
  81. CallFn
  82. defs
  83. modules
  84. looks_like_number
  85. fhem
  86. ))};
  87. sub Define($$) {
  88. my ( $hash, $def ) = @_;
  89. $hash->{NOTIFYDEV} = "global";
  90. $hash->{msgid} = 1;
  91. $hash->{timeout} = 60;
  92. $hash->{messages} = {};
  93. my ($host,$username,$password) = split("[ \t]+", $hash->{DEF});
  94. $hash->{DeviceName} = $host;
  95. my $name = $hash->{NAME};
  96. my $user = getKeyValue($name."_user");
  97. my $pass = getKeyValue($name."_pass");
  98. setKeyValue($name."_user",$username) unless(defined($user));
  99. setKeyValue($name."_pass",$password) unless(defined($pass));
  100. $hash->{DEF} = $host;
  101. #readingsSingleUpdate($hash,"connection","disconnected",0);
  102. if ($main::init_done) {
  103. return Start($hash);
  104. } else {
  105. return undef;
  106. }
  107. }
  108. sub Undef($) {
  109. my $hash = shift;
  110. Stop($hash);
  111. return undef;
  112. }
  113. sub Delete($$) {
  114. my ($hash, $name) = @_;
  115. setKeyValue($name."_user",undef);
  116. setKeyValue($name."_pass",undef);
  117. return undef;
  118. }
  119. sub Shutdown($) {
  120. my $hash = shift;
  121. Stop($hash);
  122. my $name = $hash->{NAME};
  123. Log3($name,1,"Shutdown executed");
  124. return undef;
  125. }
  126. sub onConnect($) {
  127. my $hash = shift;
  128. my $name = $hash->{NAME};
  129. my $cmdstr = AttrVal($name,"on-connect",undef);
  130. return process_event($hash,$cmdstr);
  131. }
  132. sub onDisconnect($) {
  133. my $hash = shift;
  134. my $name = $hash->{NAME};
  135. my $cmdstr = AttrVal($name,"on-disconnect",undef);
  136. return process_event($hash,$cmdstr);
  137. }
  138. sub onTimeout($) {
  139. my $hash = shift;
  140. my $name = $hash->{NAME};
  141. my $cmdstr = AttrVal($name,"on-timeout",undef);
  142. if($cmdstr) {
  143. return eval($cmdstr);
  144. }
  145. }
  146. sub process_event($$) {
  147. my $hash = shift;
  148. my $str = shift;
  149. my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str);
  150. my $do=1;
  151. if($cmd) {
  152. my $name = $hash->{NAME};
  153. $do=eval($cmd);
  154. $do=1 if (!defined($do));
  155. #no strict "refs";
  156. #my $ret = &{$hash->{WBCallback}}($hash);
  157. #use strict "refs";
  158. }
  159. if($do && defined($topic)) {
  160. $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
  161. send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain);
  162. }
  163. }
  164. sub Set($@) {
  165. my ($hash, @a) = @_;
  166. return "Need at least one parameters" if(@a < 2);
  167. return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
  168. if(!defined($sets{$a[1]}));
  169. my $command = $a[1];
  170. my $value = $a[2];
  171. COMMAND_HANDLER: {
  172. $command eq "connect" and do {
  173. Start($hash);
  174. last;
  175. };
  176. $command eq "disconnect" and do {
  177. Stop($hash);
  178. last;
  179. };
  180. $command eq "publish" and do {
  181. shift(@a);
  182. shift(@a);
  183. #if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";}
  184. #my $qos=0;
  185. #my $retain=0;
  186. #if(looks_like_number ($a[0])) {
  187. # $qos = int($a[0]);
  188. # $qos = 0 if $qos>1;
  189. # shift(@a);
  190. # if(looks_like_number ($a[0])) {
  191. # $retain = int($a[0]);
  192. # $retain = 0 if $retain>2;
  193. # shift(@a);
  194. # }
  195. #}
  196. #if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";}
  197. #my $topic = shift(@a);
  198. #my $value = join (" ", @a);
  199. my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
  200. return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic);
  201. return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+');
  202. $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
  203. my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
  204. last;
  205. }
  206. };
  207. }
  208. sub parsePublishCmdStr($) {
  209. my ($str) = @_;
  210. if(defined($str) && $str=~m/\s*(?:({.*})\s+)?(.*)/) {
  211. my $exp = $1;
  212. my $rest = $2;
  213. if ($rest){
  214. my @lwa = split("[ \t]+",$rest);
  215. unshift (@lwa,$exp) if($exp);
  216. return parsePublishCmd(@lwa);
  217. }
  218. }
  219. return undef;
  220. }
  221. sub parsePublishCmd(@) {
  222. my @a = @_;
  223. # [qos:?] [retain:?] topic value
  224. return undef if(!@a);
  225. return undef if(scalar(@a)<1);
  226. my $qos = 0;
  227. my $retain = 0;
  228. my $topic = undef;
  229. my $value = "\0";
  230. my $expression = undef;
  231. while (scalar(@a)>0) {
  232. my $av = shift(@a);
  233. if($av =~ /\{.*\}/) {
  234. $expression = $av;
  235. next;
  236. }
  237. my ($pn,$pv) = split(":",$av);
  238. if(defined($pv)) {
  239. if($pn eq "qos") {
  240. if($pv >=0 && $pv <=2) {
  241. $qos = $pv;
  242. }
  243. } elsif($pn eq "retain") {
  244. if($pv >=0 && $pv <=1) {
  245. $retain = $pv;
  246. }
  247. } else {
  248. # ignore
  249. next;
  250. }
  251. } else {
  252. $topic = $av;
  253. last;
  254. }
  255. }
  256. if(scalar(@a)>0) {
  257. $value = join(" ", @a);
  258. }
  259. return undef unless $topic || $expression;
  260. return ($qos, $retain,$topic, $value, $expression);
  261. }
  262. sub Notify($$) {
  263. my ($hash,$dev) = @_;
  264. if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
  265. Start($hash);
  266. } elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
  267. }
  268. }
  269. sub Attr($$$$) {
  270. my ($command,$name,$attribute,$value) = @_;
  271. my $hash = $main::defs{$name};
  272. ATTRIBUTE_HANDLER: {
  273. $attribute eq "keep-alive" and do {
  274. if ($command eq "set") {
  275. $hash->{timeout} = $value;
  276. } else {
  277. $hash->{timeout} = 60;
  278. }
  279. if ($main::init_done) {
  280. $hash->{ping_received}=1;
  281. Timer($hash);
  282. };
  283. last;
  284. };
  285. $attribute eq "last-will" and do {
  286. if($hash->{STATE} ne "disconnected") {
  287. Stop($hash);
  288. InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0);
  289. }
  290. last;
  291. };
  292. };
  293. }
  294. #sub Reconnect($){
  295. # my $hash = shift;
  296. # Stop($hash);
  297. # Start($hash);
  298. #}
  299. sub Start($) {
  300. my $hash = shift;
  301. my $firsttime = $hash->{".cinitmark"};
  302. if(defined($firsttime)) {
  303. my $cstate=ReadingsVal($hash->{NAME},"connection","");
  304. if($cstate ne "disconnected" && $cstate ne "timed-out") {
  305. return undef;
  306. }
  307. } else {
  308. $hash->{".cinitmark"} = 1;
  309. }
  310. DevIo_CloseDev($hash);
  311. return DevIo_OpenDev($hash, 0, "MQTT::Init");
  312. }
  313. sub Stop($) {
  314. my $hash = shift;
  315. my $cstate=ReadingsVal($hash->{NAME},"connection","");
  316. if($cstate eq "disconnected" || $cstate eq "timed-out") {
  317. return undef;
  318. }
  319. send_disconnect($hash);
  320. DevIo_CloseDev($hash);
  321. RemoveInternalTimer($hash);
  322. readingsSingleUpdate($hash,"connection","disconnected",1);
  323. }
  324. sub Ready($) {
  325. my $hash = shift;
  326. return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
  327. }
  328. sub Rename() {
  329. my ($new,$old) = @_;
  330. setKeyValue($new."_user",getKeyValue($old."_user"));
  331. setKeyValue($new."_pass",getKeyValue($old."_pass"));
  332. setKeyValue($old."_user",undef);
  333. setKeyValue($old."_pass",undef);
  334. return undef;
  335. }
  336. sub Init($) {
  337. my $hash = shift;
  338. send_connect($hash);
  339. readingsSingleUpdate($hash,"connection","connecting",1);
  340. $hash->{ping_received}=1;
  341. Timer($hash);
  342. return undef;
  343. }
  344. sub Timer($) {
  345. my $hash = shift;
  346. RemoveInternalTimer($hash);
  347. unless ($hash->{ping_received}) {
  348. onTimeout($hash);
  349. readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received};
  350. }
  351. $hash->{ping_received} = 0;
  352. InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
  353. send_ping($hash);
  354. }
  355. sub Read {
  356. my ($hash) = @_;
  357. my $name = $hash->{NAME};
  358. my $buf = DevIo_SimpleRead($hash);
  359. return undef unless $buf;
  360. $hash->{buf} .= $buf;
  361. while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
  362. my $message_type = $mqtt->message_type();
  363. Log3($name,5,"MQTT $name message received: ".$mqtt->string());
  364. MESSAGE_TYPE: {
  365. $message_type == MQTT_CONNACK and do {
  366. readingsSingleUpdate($hash,"connection","connected",1);
  367. onConnect($hash);
  368. GP_ForallClients($hash,\&client_start);
  369. foreach my $message_id (keys %{$hash->{messages}}) {
  370. my $msg = $hash->{messages}->{$message_id}->{message};
  371. $msg->{dup} = 1;
  372. DevIo_SimpleWrite($hash,$msg->bytes,undef);
  373. }
  374. last;
  375. };
  376. $message_type == MQTT_PUBLISH and do {
  377. my $topic = $mqtt->topic();
  378. GP_ForallClients($hash,sub {
  379. my $client = shift;
  380. Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
  381. if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
  382. readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
  383. my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn};
  384. if($fn) {
  385. CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message()))
  386. } elsif ($client->{TYPE} eq "MQTT_DEVICE") {
  387. MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
  388. } elsif ($client->{TYPE} eq "MQTT_BRIDGE") {
  389. MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
  390. } else {
  391. Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE});
  392. }
  393. };
  394. },undef);
  395. if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
  396. my $message_id = $mqtt->message_id();
  397. if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
  398. send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
  399. } else {
  400. send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
  401. }
  402. }
  403. last;
  404. };
  405. $message_type == MQTT_PUBACK and do {
  406. my $message_id = $mqtt->message_id();
  407. GP_ForallClients($hash,sub {
  408. my $client = shift;
  409. if ($client->{message_ids}->{$message_id}) {
  410. readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
  411. delete $client->{message_ids}->{$message_id};
  412. };
  413. },undef);
  414. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  415. last;
  416. };
  417. $message_type == MQTT_PUBREC and do {
  418. my $message_id = $mqtt->message_id();
  419. GP_ForallClients($hash,sub {
  420. my $client = shift;
  421. if ($client->{message_ids}->{$message_id}) {
  422. readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
  423. };
  424. },undef);
  425. send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
  426. last;
  427. };
  428. $message_type == MQTT_PUBREL and do {
  429. my $message_id = $mqtt->message_id();
  430. GP_ForallClients($hash,sub {
  431. my $client = shift;
  432. if ($client->{message_ids}->{$message_id}) {
  433. readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
  434. delete $client->{message_ids}->{$message_id};
  435. };
  436. },undef);
  437. send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
  438. delete $hash->{messages}->{$message_id};
  439. last;
  440. };
  441. $message_type == MQTT_PUBCOMP and do {
  442. my $message_id = $mqtt->message_id();
  443. GP_ForallClients($hash,sub {
  444. my $client = shift;
  445. if ($client->{message_ids}->{$message_id}) {
  446. readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
  447. delete $client->{message_ids}->{$message_id};
  448. };
  449. },undef);
  450. delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
  451. last;
  452. };
  453. $message_type == MQTT_SUBACK and do {
  454. my $message_id = $mqtt->message_id();
  455. GP_ForallClients($hash,sub {
  456. my $client = shift;
  457. if ($client->{message_ids}->{$message_id}) {
  458. readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
  459. delete $client->{message_ids}->{$message_id};
  460. };
  461. },undef);
  462. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  463. last;
  464. };
  465. $message_type == MQTT_UNSUBACK and do {
  466. my $message_id = $mqtt->message_id();
  467. GP_ForallClients($hash,sub {
  468. my $client = shift;
  469. if ($client->{message_ids}->{$message_id}) {
  470. readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
  471. delete $client->{message_ids}->{$message_id};
  472. };
  473. },undef);
  474. delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
  475. last;
  476. };
  477. $message_type == MQTT_PINGRESP and do {
  478. $hash->{ping_received} = 1;
  479. readingsSingleUpdate($hash,"connection","active",1);
  480. last;
  481. };
  482. Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
  483. }
  484. }
  485. return undef;
  486. };
  487. sub send_connect($) {
  488. my $hash = shift;
  489. my $name = $hash->{NAME};
  490. my $user = getKeyValue($name."_user");
  491. my $pass = getKeyValue($name."_pass");
  492. my $lw = AttrVal($name,"last-will",undef);
  493. my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw);
  494. return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos);
  495. };
  496. sub send_publish($@) {
  497. my ($hash,%msg) = @_;
  498. if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
  499. send_message(shift, message_type => MQTT_PUBLISH, %msg);
  500. return undef;
  501. } else {
  502. my $msgid = $hash->{msgid}++;
  503. send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
  504. return $msgid;
  505. }
  506. };
  507. sub send_subscribe($@) {
  508. my $hash = shift;
  509. my $msgid = $hash->{msgid}++;
  510. send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  511. return $msgid;
  512. };
  513. sub send_unsubscribe($@) {
  514. my $hash = shift;
  515. my $msgid = $hash->{msgid}++;
  516. send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
  517. return $msgid;
  518. };
  519. sub send_ping($) {
  520. return send_message(shift, message_type => MQTT_PINGREQ);
  521. };
  522. sub send_disconnect($) {
  523. my $hash = shift;
  524. onDisconnect($hash);
  525. return send_message($hash, message_type => MQTT_DISCONNECT);
  526. };
  527. sub send_message($$$@) {
  528. my ($hash,%msg) = @_;
  529. my $name = $hash->{NAME};
  530. my $message = Net::MQTT::Message->new(%msg);
  531. Log3($name,5,"MQTT $name message sent: ".$message->string());
  532. if (defined $msg{message_id}) {
  533. $hash->{messages}->{$msg{message_id}} = {
  534. message => $message,
  535. timeout => gettimeofday()+$hash->{timeout},
  536. };
  537. }
  538. DevIo_SimpleWrite($hash,$message->bytes,undef);
  539. };
  540. sub topic_to_regexp($) {
  541. my $t = shift;
  542. $t =~ s|#$|.\*|;
  543. $t =~ s|\/\.\*$|.\*|;
  544. $t =~ s|\/|\\\/|g;
  545. $t =~ s|(\+)([^+]*$)|(+)$2|;
  546. $t =~ s|\+|[^\/]+|g;
  547. return "^$t\$";
  548. }
  549. sub client_subscribe_topic($$;$$) {
  550. my ($client,$topic,$qos,$retain) = @_;
  551. push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
  552. my $expr = topic_to_regexp($topic);
  553. push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
  554. if ($main::init_done) {
  555. if (my $mqtt = $client->{IODev}) {;
  556. $qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
  557. $retain = 0 unless defined $retain; # not supported yet
  558. my $msgid = send_subscribe($mqtt,
  559. topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]],
  560. );
  561. $client->{message_ids}->{$msgid}++;
  562. readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
  563. }
  564. }
  565. };
  566. sub client_unsubscribe_topic($$) {
  567. my ($client,$topic) = @_;
  568. $client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
  569. my $expr = topic_to_regexp($topic);
  570. $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
  571. if ($main::init_done) {
  572. if (my $mqtt = $client->{IODev}) {;
  573. my $msgid = send_unsubscribe($mqtt,
  574. topics => [$topic],
  575. );
  576. $client->{message_ids}->{$msgid}++;
  577. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
  578. }
  579. }
  580. };
  581. sub Client_Define($$) {
  582. my ( $client, $def ) = @_;
  583. $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
  584. #$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
  585. $client->{".qos"}->{'*'} = 0;
  586. $client->{".retain"}->{'*'} = "0";
  587. $client->{subscribe} = [];
  588. $client->{subscribeExpr} = [];
  589. AssignIoPort($client);
  590. if ($main::init_done) {
  591. return client_start($client);
  592. } else {
  593. return undef;
  594. }
  595. };
  596. sub Client_Undefine($) {
  597. client_stop(shift);
  598. return undef;
  599. };
  600. #use Data::Dumper;
  601. sub client_attr($$$$$) {
  602. my ($client,$command,$name,$attribute,$value) = @_;
  603. ATTRIBUTE_HANDLER: {
  604. $attribute eq "qos" and do {
  605. #if ($command eq "set") {
  606. # $client->{qos} = $MQTT::qos{$value}; ### ALT
  607. #} else {
  608. # $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
  609. #}
  610. delete($client->{".qos"});
  611. if ($command ne "set") {
  612. delete($client->{".qos"});
  613. $client->{".qos"}->{"*"} = "0";
  614. } else {
  615. my @values = ();
  616. if(!defined($value) || $value=~/^[ \t]*$/) {
  617. return "QOS value may not be empty. Format: [<reading>|*:]0|1|2";
  618. }
  619. @values = split("[ \t]+",$value);
  620. foreach my $set (@values) {
  621. my($rname,$rvalue) = split(":",$set);
  622. if(!defined($rvalue)) {
  623. $rvalue=$rname;
  624. $rname="";
  625. $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
  626. }
  627. #if ($command eq "set") {
  628. # Map constants
  629. #$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE));
  630. #$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE));
  631. #$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE));
  632. $rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue}));
  633. if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") {
  634. return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [<reading>|*:]0|1|2";
  635. }
  636. #$rvalue="1" unless ($rvalue eq "0");
  637. $client->{".qos"}->{$rname} = $rvalue;
  638. #} else {
  639. # delete($client->{".qos"}->{$rname});
  640. # $client->{".qos"}->{"*"} = "0" if($rname eq "*");
  641. #}
  642. }
  643. }
  644. my $showqos = "";
  645. if(defined($client->{".qos"})) {
  646. foreach my $rname (sort keys %{$client->{".qos"}}) {
  647. my $rvalue = $client->{".qos"}->{$rname};
  648. $rname="[state]" if ($rname eq "");
  649. $showqos.=$rname.':'.$rvalue.' ';
  650. }
  651. }
  652. $client->{"qos"} = $showqos;
  653. last;
  654. };
  655. $attribute eq "retain" and do {
  656. delete($client->{".retain"});
  657. if ($command ne "set") {
  658. delete($client->{".retain"});
  659. $client->{".retain"}->{"*"} = "0";
  660. } else {
  661. my @values = ();
  662. if(!defined($value) || $value=~/^[ \t]*$/) {
  663. return "retain value may not be empty. Format: [<reading>|*:]0|1";
  664. }
  665. @values = split("[ \t]+",$value);
  666. foreach my $set (@values) {
  667. my($rname,$rvalue) = split(":",$set);
  668. if(!defined($rvalue)) {
  669. $rvalue=$rname;
  670. $rname="";
  671. $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
  672. }
  673. if($rvalue ne "0" && $rvalue ne "1") {
  674. return "unexpected retain value. use 0 or 1. Format: [<reading>|*:]0|1";
  675. }
  676. $client->{".retain"}->{$rname} = $rvalue;
  677. }
  678. }
  679. my $showretain = "";
  680. if(defined($client->{".retain"})) {
  681. foreach my $rname (sort keys %{$client->{".retain"}}) {
  682. my $rvalue = $client->{".retain"}->{$rname};
  683. $rname="[state]" if ($rname eq "");
  684. $showretain.=$rname.':'.$rvalue.' ';
  685. }
  686. }
  687. $client->{"retain"} = $showretain;
  688. last;
  689. };
  690. $attribute eq "IODev" and do {
  691. if ($main::init_done) {
  692. if ($command eq "set") {
  693. client_stop($client);
  694. $main::attr{$name}{IODev} = $value;
  695. client_start($client);
  696. } else {
  697. client_stop($client);
  698. }
  699. }
  700. last;
  701. };
  702. }
  703. };
  704. sub client_start($) {
  705. my $client = shift;
  706. my $name = $client->{NAME};
  707. if (! (defined AttrVal($name,"stateFormat",undef))) {
  708. $main::attr{$name}{stateFormat} = "transmission-state";
  709. }
  710. if (@{$client->{subscribe}}) {
  711. my $msgid = send_subscribe($client->{IODev},
  712. topics => [map { [$_ => $client->{".qos"}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
  713. );
  714. $client->{message_ids}->{$msgid}++;
  715. readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
  716. }
  717. };
  718. sub client_stop($) {
  719. my $client = shift;
  720. if (@{$client->{subscribe}}) {
  721. my $msgid = send_unsubscribe($client->{IODev},
  722. topics => [@{$client->{subscribe}}],
  723. );
  724. $client->{message_ids}->{$msgid}++;
  725. readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
  726. }
  727. };
  728. 1;
  729. =pod
  730. =item [device]
  731. =item summary connects fhem to MQTT
  732. =begin html
  733. <a name="MQTT"></a>
  734. <h3>MQTT</h3>
  735. <ul>
  736. <p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
  737. <p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> and <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
  738. Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
  739. Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
  740. <a name="MQTTdefine"></a>
  741. <p><b>Define</b></p>
  742. <ul>
  743. <p><code>define &lt;name&gt; MQTT &lt;ip:port&gt; [&lt;username&gt;] [&lt;password&gt;]</code></p>
  744. <p>Specifies the MQTT device.</p>
  745. </ul>
  746. <a name="MQTTset"></a>
  747. <p><b>Set</b></p>
  748. <ul>
  749. <li>
  750. <p><code>set &lt;name&gt; connect</code><br/>
  751. (re-)connects the MQTT-device to the mqtt-broker</p>
  752. </li>
  753. <li>
  754. <p><code>set &lt;name&gt; disconnect</code><br/>
  755. disconnects the MQTT-device from the mqtt-broker</p>
  756. </li>
  757. <li>
  758. <p><code>set &lt;name&gt; publish [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
  759. sends message to the specified topic</p>
  760. </li>
  761. </ul>
  762. <a name="MQTTattr"></a>
  763. <p><b>Attributes</b></p>
  764. <ul>
  765. <li>
  766. <p>keep-alive<br/>
  767. sets the keep-alive time (in seconds).</p>
  768. </li>
  769. <li>
  770. <p><code>attr &lt;name&gt; last-will [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
  771. Support for MQTT feature "last will"
  772. </p>
  773. <p>example:<br/>
  774. <code>attr mqtt last-will /fhem/status crashed</code>
  775. </p>
  776. </li>
  777. <li>
  778. <p>on-connect, on-disconnect<br/>
  779. <code>attr &lt;name&gt; on-connect {Perl-expression} &lt;topic&gt; &lt;message&gt;</code><br/>
  780. Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression<br/>
  781. If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.<br/>
  782. The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message.
  783. </p>
  784. <p>examples:<br/>
  785. <code>attr mqtt on-connect /topic/status connected</code><br/>
  786. <code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
  787. </p>
  788. </li>
  789. <li>
  790. <p>on-timeout<br/>
  791. <code>attr &lt;name&gt; on-timeout {Perl-expression}</code>
  792. evaluate the given Perl expression on timeout<br/>
  793. </p>
  794. </li>
  795. </ul>
  796. </ul>
  797. =end html
  798. =cut