| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040 |
- ##############################################
- #
- # fhem bridge to mqtt (see http://mqtt.org)
- #
- # Copyright (C) 2018 Alexander Schulz
- # Copyright (C) 2017 Stephan Eisler
- # Copyright (C) 2014 - 2016 Norbert Truchsess
- #
- # This file is part of fhem.
- #
- # Fhem is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 2 of the License, or
- # (at your option) any later version.
- #
- # Fhem is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with fhem. If not, see <http://www.gnu.org/licenses/>.
- #
- # $Id: 00_MQTT.pm 17362 2018-09-17 12:57:29Z hexenmeister $
- #
- ##############################################
- my %sets = (
- "connect" => "",
- "disconnect" => "",
- "publish" => "",
- );
- my %gets = (
- "version" => ""
- );
- my @clients = qw(
- MQTT_DEVICE
- MQTT_BRIDGE
- );
- sub MQTT_Initialize($) {
- my $hash = shift @_;
- require "$main::attr{global}{modpath}/FHEM/DevIo.pm";
- # Provider
- $hash->{Clients} = join (':',@clients);
- $hash->{ReadyFn} = "MQTT::Ready";
- $hash->{ReadFn} = "MQTT::Read";
- # Consumer
- $hash->{DefFn} = "MQTT::Define";
- $hash->{UndefFn} = "MQTT::Undef";
- $hash->{DeleteFn} = "MQTT::Delete";
- $hash->{RenameFn} = "MQTT::Rename";
- $hash->{ShutdownFn} = "MQTT::Shutdown";
- $hash->{SetFn} = "MQTT::Set";
- $hash->{NotifyFn} = "MQTT::Notify";
- $hash->{AttrFn} = "MQTT::Attr";
- $hash->{AttrList} = "keep-alive "."last-will client-id "."on-connect on-disconnect on-timeout ".$main::readingFnAttributes;
- }
- package MQTT;
- use Exporter ('import');
- @EXPORT = ();
- @EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams);
- %EXPORT_TAGS = (all => [@EXPORT_OK]);
- use strict;
- use warnings;
- use GPUtils qw(:all);
- use Net::MQTT::Constants;
- use Net::MQTT::Message;
- our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
- BEGIN {GP_Import(qw(
- gettimeofday
- readingsSingleUpdate
- DevIo_OpenDev
- DevIo_SimpleWrite
- DevIo_SimpleRead
- DevIo_CloseDev
- RemoveInternalTimer
- InternalTimer
- AttrVal
- ReadingsVal
- Log3
- AssignIoPort
- getKeyValue
- setKeyValue
- CallFn
- defs
- modules
- looks_like_number
- fhem
- ))};
- sub Define($$) {
- my ( $hash, $def ) = @_;
- $hash->{NOTIFYDEV} = "global";
- $hash->{msgid} = 1;
- $hash->{timeout} = 60;
- $hash->{messages} = {};
- my ($host,$username,$password) = split("[ \t]+", $hash->{DEF});
- $hash->{DeviceName} = $host;
-
- my $name = $hash->{NAME};
- my $user = getKeyValue($name."_user");
- my $pass = getKeyValue($name."_pass");
- setKeyValue($name."_user",$username) unless(defined($user));
- setKeyValue($name."_pass",$password) unless(defined($pass));
- $hash->{DEF} = $host;
-
- #readingsSingleUpdate($hash,"connection","disconnected",0);
- if ($main::init_done) {
- return Start($hash);
- } else {
- return undef;
- }
- }
- sub Undef($) {
- my $hash = shift;
- Stop($hash);
- return undef;
- }
- sub Delete($$) {
- my ($hash, $name) = @_;
- setKeyValue($name."_user",undef);
- setKeyValue($name."_pass",undef);
- return undef;
- }
- sub Shutdown($) {
- my $hash = shift;
- Stop($hash);
- my $name = $hash->{NAME};
- Log3($name,1,"Shutdown executed");
- return undef;
- }
- sub onConnect($) {
- my $hash = shift;
- my $name = $hash->{NAME};
- my $cmdstr = AttrVal($name,"on-connect",undef);
- return process_event($hash,$cmdstr);
- }
- sub onDisconnect($) {
- my $hash = shift;
- my $name = $hash->{NAME};
- my $cmdstr = AttrVal($name,"on-disconnect",undef);
- return process_event($hash,$cmdstr);
- }
- sub onTimeout($) {
- my $hash = shift;
- my $name = $hash->{NAME};
- my $cmdstr = AttrVal($name,"on-timeout",undef);
- if($cmdstr) {
- return eval($cmdstr);
- }
- }
- sub isConnected($) {
- my $hash = shift;
- my $cstate=ReadingsVal($hash->{NAME}, "connection", "");
- return 1 if($cstate eq "connected" || $cstate eq "active");
- return undef;
- }
- sub process_event($$) {
- my $hash = shift;
- my $str = shift;
- my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str);
-
- my $do=1;
- if($cmd) {
- my $name = $hash->{NAME};
- $do=eval($cmd);
- $do=1 if (!defined($do));
- #no strict "refs";
- #my $ret = &{$hash->{WBCallback}}($hash);
- #use strict "refs";
- }
-
- if($do && defined($topic)) {
- $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
- send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain);
- }
- }
- sub Set($@) {
- my ($hash, @a) = @_;
- return "Need at least one parameters" if(@a < 2);
- return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
- if(!defined($sets{$a[1]}));
- my $command = $a[1];
- my $value = $a[2];
- COMMAND_HANDLER: {
- $command eq "connect" and do {
- Start($hash);
- last;
- };
- $command eq "disconnect" and do {
- Stop($hash);
- last;
- };
- $command eq "publish" and do {
- shift(@a);
- shift(@a);
- #if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";}
- #my $qos=0;
- #my $retain=0;
- #if(looks_like_number ($a[0])) {
- # $qos = int($a[0]);
- # $qos = 0 if $qos>1;
- # shift(@a);
- # if(looks_like_number ($a[0])) {
- # $retain = int($a[0]);
- # $retain = 0 if $retain>2;
- # shift(@a);
- # }
- #}
- #if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";}
- #my $topic = shift(@a);
- #my $value = join (" ", @a);
-
- my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
- return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic);
- return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+');
- $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
- my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
- last;
- }
- };
- }
- sub parseParams($;$$$$) {
- my ( $cmd, $separator, $joiner, $keyvalueseparator, $acceptedkeys ) = @_;
- $separator = ' ' if ( !$separator );
- $joiner = $separator if ( !$joiner ); # needed if separator is a regexp
- $keyvalueseparator = ':' if(!$keyvalueseparator);
- my ( @a, %h );
- my @params;
- if ( ref($cmd) eq 'ARRAY' ) {
- @params = @{$cmd};
- }
- else {
- @params = split( $separator, $cmd );
- }
- while (@params) {
- my $param = shift(@params);
- next if ( $param eq "" );
- my ( $key, $value ) = split( $keyvalueseparator, $param, 2 );
- if ( !defined($value) ) {
- $value = $key;
- $key = undef;
- # the key can not start with a { -> it must be a perl expression # vim:}
- }
- elsif ( $key =~ m/^\s*{/ ) { # for vim: }
- $value = $param;
- $key = undef;
- }
- # the key can not start with a ' or "
- elsif ( $key =~ m/^\s*('|")/ ) {
- $value = $param;
- $key = undef;
- }
- # accept known keys only (if defined $acceptedkeys)
- elsif (defined($acceptedkeys) and !defined($acceptedkeys->{$key})) {
- $value = $param;
- $key = undef;
- }
- #collect all parts until the closing ' or "
- while ( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
- my $next = shift(@params);
- last if ( !defined($next) );
- $value .= $joiner . $next;
- }
- #remove matching ' or " from the start and end
- if ( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
- $value =~ s/^.(.*).$/$1/;
- }
- #collect all parts until opening { and closing } are matched
- if ( $value =~ m/^\s*{/ ) { # } for match
- my $count = 0;
- for my $i ( 0 .. length($value) - 1 ) {
- my $c = substr( $value, $i, 1 );
- ++$count if ( $c eq '{' );
- --$count if ( $c eq '}' );
- }
- while ( $param && $count != 0 ) {
- my $next = shift(@params);
- last if ( !defined($next) );
- $value .= $joiner . $next;
- for my $i ( 0 .. length($next) - 1 ) {
- my $c = substr( $next, $i, 1 );
- ++$count if ( $c eq '{' );
- --$count if ( $c eq '}' );
- }
- }
- }
- if ( defined($key) ) {
- $h{$key} = $value;
- }
- else {
- push @a, $value;
- }
- }
- return ( \@a, \%h );
- }
- sub parsePublishCmdStr($) {
- my ($str) = @_;
- return undef unless defined($str);
-
- my @lwa = split("[ \t]+",$str);
- return parsePublishCmd(@lwa);
- }
- sub parsePublishCmd(@) {
- my @a = @_;
- my ( $aa, $bb ) = parseParams(\@a,undef,undef,undef,{qos=>1,retain=>1});
-
- my $qos = 0;
- my $retain = 0;
- my $topic = undef;
- my $value = "\0";
- my $expression = undef;
- if ( exists( $bb->{'qos'} ) ) {
- $qos = $bb->{'qos'};
- }
- if ( exists $bb->{'retain'} ) {
- $retain = $bb->{'retain'};
- }
- my @aaa = ();
- my @xaa = @{$aa};
- while ( scalar(@xaa) > 0 ) {
- my $av = shift @xaa;
- if (!defined($expression) and $av =~ /^\{.*\}$/ and scalar(@xaa)>0) {
- $expression = $av;
- next;
- }
- else {
- push @aaa, $av;
- }
- }
- $topic = shift(@aaa);
- if ( scalar(@aaa) > 0 ) {
- $value = join( " ", @aaa );
- }
- return undef unless $topic || $expression;
- return ( $qos, $retain, $topic, $value, $expression );
- }
- sub Notify($$) {
- my ($hash,$dev) = @_;
- if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
- Start($hash);
- } elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
- }
- }
- sub Attr($$$$) {
- my ($command,$name,$attribute,$value) = @_;
- my $hash = $main::defs{$name};
- ATTRIBUTE_HANDLER: {
- $attribute eq "keep-alive" and do {
- if ($command eq "set") {
- $hash->{timeout} = $value;
- } else {
- $hash->{timeout} = 60;
- }
- if ($main::init_done) {
- $hash->{ping_received}=1;
- Timer($hash);
- };
- last;
- };
- $attribute eq "last-will" and do {
- if($hash->{STATE} ne "disconnected") {
- Stop($hash);
- InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0);
- }
- last;
- };
- };
- }
- #sub Reconnect($){
- # my $hash = shift;
- # Stop($hash);
- # Start($hash);
- #}
- sub Start($) {
- my $hash = shift;
- my $firsttime = $hash->{".cinitmark"};
-
- if(defined($firsttime)) {
- my $cstate=ReadingsVal($hash->{NAME},"connection","");
- if($cstate ne "disconnected" && $cstate ne "timed-out") {
- return undef;
- }
- } else {
- $hash->{".cinitmark"} = 1;
- }
-
- DevIo_CloseDev($hash);
- return DevIo_OpenDev($hash, 0, "MQTT::Init");
- }
- sub Stop($) {
- my $hash = shift;
-
- my $cstate=ReadingsVal($hash->{NAME},"connection","");
- if($cstate eq "disconnected" || $cstate eq "timed-out") {
- return undef;
- }
-
- send_disconnect($hash);
- DevIo_CloseDev($hash);
- RemoveInternalTimer($hash);
- readingsSingleUpdate($hash,"connection","disconnected",1);
- }
- sub Ready($) {
- my $hash = shift;
- return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
- }
- sub Rename() {
- my ($new,$old) = @_;
- setKeyValue($new."_user",getKeyValue($old."_user"));
- setKeyValue($new."_pass",getKeyValue($old."_pass"));
-
- setKeyValue($old."_user",undef);
- setKeyValue($old."_pass",undef);
- return undef;
- }
- sub Init($) {
- my $hash = shift;
- send_connect($hash);
- readingsSingleUpdate($hash,"connection","connecting",1);
- $hash->{ping_received}=1;
- Timer($hash);
- return undef;
- }
- sub Timer($) {
- my $hash = shift;
- RemoveInternalTimer($hash);
- unless ($hash->{ping_received}) {
- onTimeout($hash);
- readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received};
- GP_ForallClients($hash,\¬ify_client_connection_timeout);
- }
- $hash->{ping_received} = 0;
- InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
- send_ping($hash);
- }
- sub Read {
- my ($hash) = @_;
- my $name = $hash->{NAME};
- my $buf = DevIo_SimpleRead($hash);
- return undef unless $buf;
- $hash->{buf} .= $buf;
- while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
- my $message_type = $mqtt->message_type();
- Log3($name,5,"MQTT $name message received: ".$mqtt->string());
- MESSAGE_TYPE: {
- $message_type == MQTT_CONNACK and do {
- readingsSingleUpdate($hash,"connection","connected",1);
- onConnect($hash);
- GP_ForallClients($hash,\&client_start);
- GP_ForallClients($hash,\¬ify_client_connected);
- foreach my $message_id (keys %{$hash->{messages}}) {
- my $msg = $hash->{messages}->{$message_id}->{message};
- $msg->{dup} = 1;
- DevIo_SimpleWrite($hash,$msg->bytes,undef);
- }
- last;
- };
- $message_type == MQTT_PUBLISH and do {
- my $topic = $mqtt->topic();
- GP_ForallClients($hash,sub {
- my $client = shift;
- Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
- if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
- readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
- my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn};
- if($fn) {
- CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message()))
- } elsif ($client->{TYPE} eq "MQTT_DEVICE") {
- MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
- } elsif ($client->{TYPE} eq "MQTT_BRIDGE") {
- MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
- } else {
- Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE});
- }
- };
- },undef);
- if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
- my $message_id = $mqtt->message_id();
- if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
- send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
- } else {
- send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
- }
- }
- last;
- };
- $message_type == MQTT_PUBACK and do {
- my $message_id = $mqtt->message_id();
- GP_ForallClients($hash,sub {
- my $client = shift;
- if ($client->{message_ids}->{$message_id}) {
- readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
- delete $client->{message_ids}->{$message_id};
- };
- },undef);
- delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
- last;
- };
- $message_type == MQTT_PUBREC and do {
- my $message_id = $mqtt->message_id();
- GP_ForallClients($hash,sub {
- my $client = shift;
- if ($client->{message_ids}->{$message_id}) {
- readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
- };
- },undef);
- send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
- last;
- };
- $message_type == MQTT_PUBREL and do {
- my $message_id = $mqtt->message_id();
- GP_ForallClients($hash,sub {
- my $client = shift;
- if ($client->{message_ids}->{$message_id}) {
- readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
- delete $client->{message_ids}->{$message_id};
- };
- },undef);
- send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
- delete $hash->{messages}->{$message_id};
- last;
- };
- $message_type == MQTT_PUBCOMP and do {
- my $message_id = $mqtt->message_id();
- GP_ForallClients($hash,sub {
- my $client = shift;
- if ($client->{message_ids}->{$message_id}) {
- readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
- delete $client->{message_ids}->{$message_id};
- };
- },undef);
- delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
- last;
- };
- $message_type == MQTT_SUBACK and do {
- my $message_id = $mqtt->message_id();
- GP_ForallClients($hash,sub {
- my $client = shift;
- if ($client->{message_ids}->{$message_id}) {
- readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
- delete $client->{message_ids}->{$message_id};
- };
- },undef);
- delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
- last;
- };
- $message_type == MQTT_UNSUBACK and do {
- my $message_id = $mqtt->message_id();
- GP_ForallClients($hash,sub {
- my $client = shift;
- if ($client->{message_ids}->{$message_id}) {
- readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
- delete $client->{message_ids}->{$message_id};
- };
- },undef);
- delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
- last;
- };
- $message_type == MQTT_PINGRESP and do {
- $hash->{ping_received} = 1;
- readingsSingleUpdate($hash,"connection","active",1);
- last;
- };
- Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
- }
- }
- return undef;
- };
- sub send_connect($) {
- my $hash = shift;
- my $name = $hash->{NAME};
- my $user = getKeyValue($name."_user");
- my $pass = getKeyValue($name."_pass");
-
- my $lw = AttrVal($name,"last-will",undef);
- my $clientId = AttrVal($name,"client-id",undef);
- my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw);
-
- return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, client_id=>$clientId, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos);
- };
- sub send_publish($@) {
- my ($hash,%msg) = @_;
- if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
- send_message(shift, message_type => MQTT_PUBLISH, %msg);
- return undef;
- } else {
- my $msgid = $hash->{msgid}++;
- send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
- return $msgid;
- }
- };
- sub send_subscribe($@) {
- my $hash = shift;
- my $msgid = $hash->{msgid}++;
- send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
- return $msgid;
- };
- sub send_unsubscribe($@) {
- my $hash = shift;
- my $msgid = $hash->{msgid}++;
- send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
- return $msgid;
- };
- sub send_ping($) {
- return send_message(shift, message_type => MQTT_PINGREQ);
- };
- sub send_disconnect($) {
- my $hash = shift;
- onDisconnect($hash);
- GP_ForallClients($hash,\¬ify_client_disconnected);
- return send_message($hash, message_type => MQTT_DISCONNECT);
- };
- sub send_message($$$@) {
- my ($hash,%msg) = @_;
- my $name = $hash->{NAME};
- my $message = Net::MQTT::Message->new(%msg);
- Log3($name,5,"MQTT $name message sent: ".$message->string());
- if (defined $msg{message_id}) {
- $hash->{messages}->{$msg{message_id}} = {
- message => $message,
- timeout => gettimeofday()+$hash->{timeout},
- };
- }
-
- DevIo_SimpleWrite($hash,$message->bytes,undef);
- };
- sub topic_to_regexp($) {
- my $t = shift;
- $t =~ s|#$|.\*|;
- $t =~ s|\$|\\\$|g;
- $t =~ s|\/\.\*$|.\*|;
- $t =~ s|\/|\\\/|g;
- $t =~ s|(\+)([^+]*$)|(+)$2|;
- $t =~ s|\+|[^\/]+|g;
- return "^$t\$";
- }
- sub client_subscribe_topic($$;$$) {
- my ($client,$topic,$qos,$retain) = @_;
- push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
- $client->{subscribeQos}->{$topic}=$qos;
- my $expr = topic_to_regexp($topic);
- push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
- if ($main::init_done) {
- if (my $mqtt = $client->{IODev}) {;
- $qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
- $retain = 0 unless defined $retain; # not supported yet
- my $msgid = send_subscribe($mqtt,
- topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]],
- );
- $client->{message_ids}->{$msgid}++;
- readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
- }
- }
- };
- sub client_unsubscribe_topic($$) {
- my ($client,$topic) = @_;
- $client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
- delete $client->{subscribeQos}->{$topic};
- my $expr = topic_to_regexp($topic);
- $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
- if ($main::init_done) {
- if (my $mqtt = $client->{IODev}) {;
- my $msgid = send_unsubscribe($mqtt,
- topics => [$topic],
- );
- $client->{message_ids}->{$msgid}++;
- readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
- }
- }
- };
- sub Client_Define($$) {
- my ( $client, $def ) = @_;
- $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
- #$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
- $client->{".qos"}->{'*'} = 0;
- $client->{".retain"}->{'*'} = "0";
- $client->{subscribe} = [];
- $client->{subscribeQos} = {};
- $client->{subscribeExpr} = [];
- AssignIoPort($client);
- if ($main::init_done) {
- return client_start($client);
- } else {
- return undef;
- }
- };
- sub Client_Undefine($) {
- client_stop(shift);
- return undef;
- };
- #use Data::Dumper;
- sub client_attr($$$$$) {
- my ($client,$command,$name,$attribute,$value) = @_;
- ATTRIBUTE_HANDLER: {
- $attribute eq "qos" and do {
- #if ($command eq "set") {
- # $client->{qos} = $MQTT::qos{$value}; ### ALT
- #} else {
- # $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
- #}
-
- delete($client->{".qos"});
-
- if ($command ne "set") {
- delete($client->{".qos"});
- $client->{".qos"}->{"*"} = "0";
- } else {
-
- my @values = ();
- if(!defined($value) || $value=~/^[ \t]*$/) {
- return "QOS value may not be empty. Format: [<reading>|*:]0|1|2";
- }
- @values = split("[ \t]+",$value);
-
- foreach my $set (@values) {
- my($rname,$rvalue) = split(":",$set);
- if(!defined($rvalue)) {
- $rvalue=$rname;
- $rname="";
- $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
- }
- #if ($command eq "set") {
- # Map constants
- #$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE));
- #$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE));
- #$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE));
- $rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue}));
- if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") {
- return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [<reading>|*:]0|1|2";
- }
- #$rvalue="1" unless ($rvalue eq "0");
- $client->{".qos"}->{$rname} = $rvalue;
- #} else {
- # delete($client->{".qos"}->{$rname});
- # $client->{".qos"}->{"*"} = "0" if($rname eq "*");
- #}
- }
- }
-
- my $showqos = "";
- if(defined($client->{".qos"})) {
- foreach my $rname (sort keys %{$client->{".qos"}}) {
- my $rvalue = $client->{".qos"}->{$rname};
- $rname="[state]" if ($rname eq "");
- $showqos.=$rname.':'.$rvalue.' ';
- }
- }
- $client->{"qos"} = $showqos;
- last;
- };
- $attribute eq "retain" and do {
- delete($client->{".retain"});
-
- if ($command ne "set") {
- delete($client->{".retain"});
- $client->{".retain"}->{"*"} = "0";
- } else {
- my @values = ();
- if(!defined($value) || $value=~/^[ \t]*$/) {
- return "retain value may not be empty. Format: [<reading>|*:]0|1";
- }
- @values = split("[ \t]+",$value);
-
- foreach my $set (@values) {
- my($rname,$rvalue) = split(":",$set);
- if(!defined($rvalue)) {
- $rvalue=$rname;
- $rname="";
- $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
- }
- if($rvalue ne "0" && $rvalue ne "1") {
- return "unexpected retain value. use 0 or 1. Format: [<reading>|*:]0|1";
- }
- $client->{".retain"}->{$rname} = $rvalue;
- }
- }
-
- my $showretain = "";
- if(defined($client->{".retain"})) {
- foreach my $rname (sort keys %{$client->{".retain"}}) {
- my $rvalue = $client->{".retain"}->{$rname};
- $rname="[state]" if ($rname eq "");
- $showretain.=$rname.':'.$rvalue.' ';
- }
- }
- $client->{"retain"} = $showretain;
- last;
- };
- $attribute eq "IODev" and do {
- if ($main::init_done) {
- if ($command eq "set") {
- client_stop($client);
- $main::attr{$name}{IODev} = $value;
- return client_start($client);
- } else {
- client_stop($client);
- }
- }
- last;
- };
- }
- };
- sub notify_client_connected($) {
- my $client = shift;
- CallFn($client->{NAME},"OnClientConnectFn",($client));
- }
- sub notify_client_disconnected($) {
- my $client = shift;
- CallFn($client->{NAME},"OnClientDisconnectFn",($client));
- }
- sub notify_client_connection_timeout($) {
- my $client = shift;
- CallFn($client->{NAME},"OnClientConnectionTimeoutFn",($client));
- }
- sub client_start($) {
- my $client = shift;
- # probably internal failure
- unless (defined $client) {
- Log3("MQTT IODev",1,"no client device hash provided");
- return "no client device hash provided";
- }
- # client device without IODev. probably internal failure
- unless (defined $client->{IODev}) {
- Log3("MQTT IODev",1,"client device hash no IODev provided");
- return "client device hash no IODev provided";
- }
-
- CallFn($client->{NAME},"OnClientStartFn",($client));
-
- unless (ref($client->{subscribe}) eq "ARRAY") {
- Log3($client->{NAME},1,"unknown client or client initialization error");
- return "unknown client or client initialization error";
- }
- if (@{$client->{subscribe}}) {
- my $msgid = send_subscribe($client->{IODev},
- topics => [map { [$_ => $client->{subscribeQos}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
- );
- $client->{message_ids}->{$msgid}++;
- readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
- }
-
- return undef;
- };
- sub client_stop($) {
- my $client = shift;
-
- if (@{$client->{subscribe}}) {
- my $msgid = send_unsubscribe($client->{IODev},
- topics => [@{$client->{subscribe}}],
- );
- $client->{message_ids}->{$msgid}++;
- readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
- }
-
- CallFn($client->{NAME},"OnClientStopFn",($client));
- };
- 1;
- =pod
- =item [device]
- =item summary connects fhem to MQTT
- =begin html
- <a name="MQTT"></a>
- <h3>MQTT</h3>
- <ul>
- <p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
- <p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> and <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
- Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
- Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
- <a name="MQTTdefine"></a>
- <p><b>Define</b></p>
- <ul>
- <p><code>define <name> MQTT <ip:port> [<username>] [<password>]</code></p>
- <p>Specifies the MQTT device.</p>
- </ul>
- <a name="MQTTset"></a>
- <p><b>Set</b></p>
- <ul>
- <li>
- <p><code>set <name> connect</code><br/>
- (re-)connects the MQTT-device to the mqtt-broker</p>
- </li>
- <li>
- <p><code>set <name> disconnect</code><br/>
- disconnects the MQTT-device from the mqtt-broker</p>
- </li>
- <li>
- <p><code>set <name> publish [qos:?] [retain:?] <topic> <message></code><br/>
- sends message to the specified topic</p>
- </li>
- </ul>
- <a name="MQTTattr"></a>
- <p><b>Attributes</b></p>
- <ul>
- <li>
- <p>keep-alive<br/>
- sets the keep-alive time (in seconds).</p>
- </li>
- <li>
- <p><code>attr <name> last-will [qos:?] [retain:?] <topic> <message></code><br/>
- Support for MQTT feature "last will"
- </p>
- <p>example:<br/>
- <code>attr mqtt last-will /fhem/status crashed</code>
- </p>
- </li>
- <li>
- <p><code>attr <name> client-id client id</code><br/>
- redefines client id
- </p>
- <p>example:<br/>
- <code>attr mqtt client-id fhem1234567</code>
- </p>
- </li>
- <li>
- <p>on-connect, on-disconnect<br/>
- <code>attr <name> on-connect {Perl-expression} <topic> <message></code><br/>
- Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression<br/>
- If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.<br/>
- The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message.
- </p>
- <p>examples:<br/>
- <code>attr mqtt on-connect /topic/status connected</code><br/>
- <code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
- </p>
- </li>
- <li>
- <p>on-timeout<br/>
- <code>attr <name> on-timeout {Perl-expression}</code>
- evaluate the given Perl expression on timeout<br/>
- </p>
- </li>
- </ul>
- </ul>
- =end html
- =cut
|