| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068 |
- #!/usr/bin/perl
- #############################################################################
- # $Id: collectord 13121 2017-01-17 11:17:03Z markusbloch $
- ##############################################################################
- #
- # collectord
- # Connects to several presenced instances to check for multiple bluetooth devices
- # for their presence state and report a summary state to the 73_PRESENCE.pm module
- #
- # Copyright by Markus Bloch
- # e-mail: Notausstieg0309@googlemail.com
- #
- # This file is part of fhem.
- #
- # Fhem is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 2 of the License, or
- # (at your option) any later version.
- #
- # Fhem is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with fhem. If not, see <http://www.gnu.org/licenses/>.
- #
- ##############################################################################
- use IO::Socket;
- use IO::Select;
- use POSIX;
- #use Data::Dumper;
- use File::Basename;
- use Getopt::Long;
- use threads;
- use Thread::Queue;
- use Time::HiRes;
- use Time::HiRes qw(gettimeofday);
- use warnings;
- use strict;
- use Digest::MD5;
- my $new_client;
- my $server;
- my $client;
- my $buf;
- sub Log($$);
- sub parseParams($;$);
- my $opt_d;
- my $opt_h;
- my $opt_v = 0;
- my $opt_p = 5222;
- my $opt_P = "/var/run/".basename($0).".pid";
- my $opt_l;
- my $opt_c;
- my %config;
- my %queues;
- my $thread_counter = 0;
- my %state;
- my %handle;
- my %socket_to_handle;
- $SIG{__DIE__} = sub {
- my ($msg) = @_;
- Log 1, "PERL ERROR: $msg";
- };
- $SIG{__WARN__} = sub {
- my ($msg) = @_;
- Log 1, "PERL WARN: $msg";
- };
- Getopt::Long::Configure('bundling');
- GetOptions(
- "d" => \$opt_d, "daemon" => \$opt_d,
- "v+" => \$opt_v, "verbose+" => \$opt_v,
- "l=s" => \$opt_l, "logfile=s" => \$opt_l,
- "c=s" => \$opt_c, "configfile=s" => \$opt_c,
- "p=i" => \$opt_p, "port=i" => \$opt_p,
- "P=s" => \$opt_P, "pid-file=s" => \$opt_P,
- "h" => \$opt_h, "help" => \$opt_h
- );
- Log 0, "=================================================" if($opt_l);
- sub print_usage () {
- print "Usage:\n";
- print " collectord -c <configfile> [-d] [-p <port>] [-P <pidfile>] \n";
- print " collectord [-h | --help]\n";
- print "\n\nOptions:\n";
- print " -c, --configfile <configfile>\n";
- print " The config file which contains the room and timeout definitions\n";
- print " -p, --port\n";
- print " TCP Port which should be used (Default: 5222)\n";
- print " -P, --pid-file\n";
- print " PID file for storing the local process id (Default: /var/run/".basename($0).".pid)\n";
- print " -d, --daemon\n";
- print " detach from terminal and run as background daemon\n";
- print " -v, --verbose\n";
- print " Print detailed log output (can be used multiple times to increase the loglevel, max. 2 times)\n";
- print " -l, --logfile <logfile>\n";
- print " log to the given logfile\n";
- print " -h, --help\n";
- print " Print detailed help screen\n";
- }
- if($opt_h)
- {
- print_usage();
- exit;
- }
- if(-e "$opt_P")
- {
- print STDERR timestamp()." another process already running (PID file found at $opt_P)\n";
- print STDERR timestamp()." aborted...\n";
- exit 1;
- }
- if(not $opt_c)
- {
- print STDERR "no config file provided\n\n";
- print_usage();
- exit 1;
- }
- if(not -e "$opt_c" or not -r "$opt_c")
- {
- print STDERR "config-file $opt_c could not be loaded\n";
- exit 1;
- }
- Log 0, "started with PID $$";
- readConfig($opt_c);
- if($opt_d)
- {
- daemonize();
- }
- # Write PID file
- open(PIDFILE, ">$opt_P");
- print PIDFILE $$."\n";
- close PIDFILE;
- $server = new IO::Socket::INET (
- LocalPort => $opt_p,
- Proto => 'tcp',
- Listen => 5,
- Reuse => 1,
- Type => SOCK_STREAM,
- KeepAlive => 1,
- Blocking => 0
- ) or die "error while creating socket: $!\n";
- Log 1, "created socket on ".$server->sockhost()." with port ".$server->sockport();
- my $listener = IO::Select->new();
- $listener->add($server);
- my @new_handles;
- my %child_handles;
- my %child_config;
- my $address;
- my $name;
- my $timeout;
- my $write_handle;
- my $server_pid;
- my @threads;
- my $sig_received = undef;
- $SIG{HUP} = sub { $sig_received = "SIGHUP"; };
- $SIG{INT} = sub { $sig_received = "SIGINT"; };
- $SIG{TERM} = sub { $sig_received = "SIGTERM"; };
- $SIG{KILL} = sub { $sig_received = "SIGKILL"; };
- $SIG{QUIT} = sub { $sig_received = "SIGQUIT"; };
- $SIG{ABRT} = sub { $sig_received = "SIGABRT"; };
- $server_pid = $$ unless(defined($server_pid));
- my $status_queue = Thread::Queue->new();
- my $log_queue = Thread::Queue->new();
- Log 2, "finished initialization. entering main loop";
- while(1)
- {
- # Cleaning up the status hash for obsolete devices
- foreach my $uuid (keys %state)
- {
- my %handle_to_socket = reverse %socket_to_handle;
- unless(exists($handle_to_socket{$uuid}))
- {
- Log 2, "cleaning up status values (UUID: $uuid)";
- delete $state{$uuid};
- }
-
- }
-
- # process all status messages from all threads via status queue
- while($status_queue->pending)
- {
- my ($uuid,$room,$value,$data) = split(";", $status_queue->dequeue, 4);
-
- Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid) - value: $value".(defined($data) ? " - data: $data" : "");
-
- if(not $value =~ /^(absence|present)$/)
- {
- $handle{$uuid}{client}->send("$value;room=$room\n") if(defined($handle{$uuid}{client}));
-
- if($value eq "socket_closed")
- {
- delete($state{$uuid}{rooms}{$room});
- }
- }
- else
- {
- $state{$uuid}{rooms}{$room}{state} = $value;
-
- if(defined($data))
- {
- $state{$uuid}{rooms}{$room}{data} = $data;
- }
- else
- {
- delete $state{$uuid}{rooms}{$room}{data};
- }
-
- my $result = aggregateRooms($state{$uuid}{rooms});
-
- if(defined($result))
- {
- if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "$result"))
- {
- if(defined($handle{$uuid}{client}))
- {
- $handle{$uuid}{client}->send("$result\n");
- $state{$uuid}{lastresult}{value} = "$result";
- $state{$uuid}{lastresult}{timestamp} = time();
- }
- }
- }
-
- }
-
- #print Dumper(\%state);
- }
-
-
- # If a thread has something reported via Log Queue, print it out if verbose is activated
- while($log_queue->pending)
- {
- Log 2, $log_queue->dequeue;
- }
-
-
-
- # If a INET socket has anything to report
- if(@new_handles = $listener->can_read(1))
- {
- foreach my $client (@new_handles)
- {
- # if the socket is the server socket, accept new client and add it to the socket selector
- if($client == $server)
- {
- $new_client = $server->accept();
-
- $listener->add($new_client);
- Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport();
- }
- else # else is must be a client, so read the message and process it
- {
- $buf = '';
- $buf = <$client>;
-
- # if the message is defined, it is a real message, else the connection is closed (EOF)
- if($buf)
- {
- # replace leading and trailing white spaces
- $buf =~ s/(^\s*|\s*$)//g;
-
- # if the message is a new command, accept the command and create threads for all rooms to process the command
- if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
- {
- # send the acknowledgment back to the sender
- $client->send("command accepted\n");
- Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf";
-
- # Split the message into bluetooth address and the timeout value
- # (timeout is ignored within the collectord, as it is given by configuration)
- ($address, $timeout) = split("\\|", $buf);
- # remove any containing white spaces
- $address =~ s/\s*//g;
- $timeout =~ s/\s*//g;
-
- # if the client has already a request running, stop at first the old request
- if(defined($socket_to_handle{$client}))
- {
- my $uuid = $socket_to_handle{$client};
-
- # get all threads for this socket and send them a termination signal
- foreach my $room (keys %{$handle{$uuid}{threads}})
- {
- Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room";
- $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address");
- $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
- delete($state{$uuid}{rooms}{$room}{data});
- }
-
- $handle{$uuid}{timeout} = $timeout;
- $state{$uuid}{lastresult}{timestamp} = 0;
- }
- else
- {
- # create a new uuid if not exist for socket
- if(not defined($socket_to_handle{$client}))
- {
- $socket_to_handle{$client} = generateUUID();
- Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client};
- }
-
- my $uuid = $socket_to_handle{$client};
-
- $handle{$uuid}{address} = $address;
- $handle{$uuid}{client} = $client;
- $handle{$uuid}{timeout} = $timeout;
-
- $state{$uuid}{lastresult}{value} = "absence";
- $state{$uuid}{lastresult}{timestamp} = 0;
-
- # create a new reqester thread for each configured room to perform the query
- while (my ($room, $value) = each %config)
- {
- $thread_counter++;
- $queues{$thread_counter} = Thread::Queue->new();
- my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid));
- Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)";
-
- # detach from the thread, so the thread starts processing independantly
- $new_thread->detach();
-
- # save the socket/room relationship to know which thread belongs to which client request (for stop command)
- $handle{$uuid}{threads}{$room} = $new_thread;
- $state{$uuid}{rooms}{$room}{state} = "";
- delete($state{$uuid}{rooms}{$room}{data});
- }
- }
- }
- elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server
- {
- Log 2, "received now command from client ".$client->peerhost();
-
- # just to be sure if the client has really a running request
- if(defined($socket_to_handle{$client}))
- {
- my $uuid = $socket_to_handle{$client};
-
- # get all threads for this socket and send them a termination signal
- foreach my $room (keys %{$handle{$uuid}{threads}})
- {
- Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost();
- $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now");
- $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
- delete($state{$uuid}{rooms}{$room}{data});
- }
-
- delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult}));
- $client->send("command accepted\n");
-
- }
- else
- {
- # if there is no command running, just tell the client he's wrong
- $client->send("no command running\n");
- }
- }
- elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped
- {
- Log 1, "received stop command from client ".$client->peerhost();
-
- # just to be sure if the client has really a running request
- if(defined($socket_to_handle{$client}))
- {
- my $uuid = $socket_to_handle{$client};
-
- # get all threads for this socket and send them a termination signal
- foreach my $room (keys %{$handle{$uuid}{threads}})
- {
- Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
- $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
- delete($handle{$uuid}{threads}{$room});
- }
-
- # when all threads are signaled, delete all relationship entry for this client
- delete($handle{$uuid});
- delete($socket_to_handle{$client});
-
- $client->send("command accepted\n");
-
- }
- else
- {
- # if there is no command running, just tell the client he's wrong
- $client->send("no command running\n");
- }
- }
- else
- { # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging.
- $client->send("command rejected\n");
- Log 1, "received invalid command >>$buf<< from client ".$client->peerhost();
- }
- }
- else # if the message is not defined (EOF) the connection was closed. Now let's clean up
- {
- # make a log entry and remove the socket from the socket selector
- Log 1, "closed connection from ".$client->peerhost();
- $listener->remove($client);
-
-
- # if there is a running command, stop it first and clean up (same as stop command, see above)
- if(defined($socket_to_handle{$client}))
- {
- my $uuid = $socket_to_handle{$client};
-
- # get all threads for this socket and send them a termination signal
- foreach my $room (keys %{$handle{$uuid}{threads}})
- {
- Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
- $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
- delete($handle{$uuid}{threads}{$room});
- }
-
- # when all threads are signaled, delete all relationship entry for this client
- delete($handle{$uuid});
- delete($socket_to_handle{$client});
- }
-
- # now close the socket, that's it
- close $client;
- }
- }
- }
- }
- # in case we have received a process signal, remove the pid file and shutdown
- if(defined($sig_received))
- {
- Log 1, "Caught $sig_received exiting";
- unlink($opt_P);
- Log 1, "removed PID-File $opt_P";
- Log 1, "server shutdown";
- exit;
- }
- }
- Log 2, "leaving main loop";
- ########################################################################################################################
- #
- # Subroutine definitions
- #
- ########################################################################################################################
- # to fork the process from the terminal
- sub daemonize
- {
- POSIX::setsid or die "setsid $!";
-
- my $pid = fork();
- if($pid < 0)
- {
- print STDERR "cannot fork: $!\n";
- exit 1;
- }
- elsif($pid)
- {
- Log 0, "forked with PID $pid";
- exit 0;
- }
- chdir "/";
- umask 0;
- foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_ }
- # cut off any input and output
- open (STDIN, "</dev/null");
- open (STDOUT, ">/dev/null");
- open (STDERR, ">&STDOUT");
- }
- # the thread subroutine which performs a request for a specific room
- sub doQuery($$$)
- {
- my ($do_config, $do_room, $do_address, $do_uuid) = @_;
- my $return;
- my $socket;
- my %values = %{$do_config};
- my $selector;
- my $run = 1;
- my @client_handle;
- my $reconnect_count = 0;
- my $client_socket = undef;
-
- my $last_contact = gettimeofday();
-
- my $cmd;
- my $previous_state = "absence";
- my $current_state = "absence";
-
- $client_socket = new IO::Socket::INET (
- PeerHost => $values{address},
- PeerPort => $values{port},
- Proto => 'tcp',
- Type => SOCK_STREAM,
- KeepAlive => 1,
- Blocking => 1
- ) or ( $log_queue->enqueue(threads->tid()."|$do_room : could not create socket to ".$values{address}." - $! -"));
- $selector = IO::Select->new($client_socket);
- if(defined($client_socket))
- {
- # send the given address to the presence daemon
- $client_socket->send($do_address."|".$values{absence_timeout}."\n");
- }
- else
- {
- $selector->remove($client_socket);
- $client_socket = undef;
- }
-
- # thread main loop
- THREADLOOP: while($run)
- {
-
- if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
- {
- $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
-
- $selector->remove($client_socket);
- $client_socket->shutdown(2);
- close($client_socket);
- $client_socket = undef;
- }
-
- if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending)
- {
- $cmd = $queues{threads->tid()}->dequeue;
- $log_queue->enqueue(threads->tid()."|received command: $cmd");
-
- if($cmd eq "now")
- {
- $log_queue->enqueue(threads->tid()."|sending \"now\" command to ".$values{address}.":".$values{port});
- $client_socket->send("now\n") if(defined($client_socket));
- }
- elsif($cmd eq "stop")
- {
- $log_queue->enqueue(threads->tid()."|$do_room terminating thread ".threads->tid()." for ".$values{address});
- $client_socket->shutdown(2) if(defined($client_socket));
- $selector->remove($client_socket) if(defined($selector));
- close($client_socket) if(defined($client_socket));
- $client_socket = undef;
- delete($queues{threads->tid()}) if(exists($queues{threads->tid()}));
- $run = 0;
- last THREADLOOP;
- }
- elsif($cmd =~ /^new\|/)
- {
- ($cmd, $do_address) = split("\\|", $cmd);
-
- $log_queue->enqueue(threads->tid()."|sending new address $do_address to ".$values{address}.":".$values{port});
-
- if($current_state eq "present")
- {
- $client_socket->send($do_address."|".$values{presence_timeout}."\n") if(defined($client_socket));
- }
- else
- {
- $client_socket->send($do_address."|".$values{absence_timeout}."\n") if(defined($client_socket));
- }
- }
- }
-
- if(not defined($client_socket))
- {
- # if it's the first occurance
- if(!$reconnect_count)
- {
- # Tell this the client;
- $status_queue->enqueue("$do_uuid;$do_room;socket_closed");
-
- # create a log message
- $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
- }
-
- # now try to re-establish the connection
- $client_socket = new IO::Socket::INET (
- PeerHost => $values{address},
- PeerPort => $values{port},
- Proto => 'tcp',
- Type => SOCK_STREAM,
- KeepAlive => 1,
- Blocking => 1
- ) or ( $reconnect_count++ );
-
- if(defined($client_socket))
- {
- # give a success message
- $log_queue->enqueue(threads->tid()."|$do_room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
- $status_queue->enqueue("$do_uuid;$do_room;socket_reconnected");
-
- # reset the reconnect counter
- $reconnect_count = 0;
-
- # set the last contact date to now
- $last_contact = gettimeofday();
-
- # add the new established socket to the IO selector for incoming data monitoring.
- $selector->add($client_socket);
- # send the given address to the presence daemon
- $client_socket->send($do_address."|".$values{absence_timeout}."\n");
- }
- else
- {
- sleep(9);
- }
- }
-
- # if the socket has a message available
- if(@client_handle = $selector->can_read(1))
- {
-
- # get all socket handles which has a message available
- foreach my $local_client (@client_handle)
- {
- # get the message from the socket handle
- $return = <$local_client>;
-
- # if the message is defined (not EOF) handle the message...
- if($return)
- {
-
- # set the last contact date
- $last_contact = gettimeofday();
-
- # remove trailing whitespaces and newlines
- chomp($return);
-
- # if the message is "command accepted"
- if($return =~ /command accepted/)
- {
- # log this to the thread log queue
- $log_queue->enqueue(threads->tid()."|$do_room accepted command for $do_address");
- }
- elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
- {
- $log_queue->enqueue(threads->tid()."|$do_room REJECTED command for $do_address");
- }
- else # else its a status message
- {
- # put the message to the status queue with uuid for identification and the room name
- $status_queue->enqueue("$do_uuid;$do_room;".$return);
-
- # if the state changes from present to absence
- if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
- {
- # log the timout change to the log queue
- $log_queue->enqueue(threads->tid()."|$do_room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
-
- $current_state = "absence";
-
- # send the new command with the configured absence timeout
- $local_client->send($do_address."|".$values{absence_timeout}."\n");
- }
- elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
- {
- $log_queue->enqueue(threads->tid()."|$do_room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
-
- $current_state = "present";
-
- # if the state changes from absence to present, set the presence timeout
- $local_client->send($do_address."|".$values{presence_timeout}."\n");
- }
-
- # set the previous state to the current state
- ($previous_state, undef) = split(";", lc($return));
- }
- }
- else # the socket is EOF which means the connection was closed
- {
-
- $selector->remove($local_client);
-
- $local_client->shutdown(2);
- close($local_client);
- $client_socket = undef;
- }
- }
- }
- }
- $log_queue->enqueue(threads->tid()."|exiting thread");
- }
- sub readConfig
- {
- my ($ini) = @_;
- my $section;
- my $keyword;
- my $value;
- my $errorcount = 0;
- Log 1, "reading configuration file";
- %config = ();
- open (INI, "$ini") or (print STDERR timestamp()."Can't open $ini: $!\n" and exit(1));
- while (<INI>) {
- chomp;
- if (/^\s*?\[([^\]\n\r]+?)\]/) {
- $section = $1;
-
- }
- if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/ and defined($section)) {
- $keyword = $1;
- $value = $2 ;
- # put them into hash
- $config{$section}{$keyword} = $value;
- }
- }
- close (INI);
-
- # validating config
- foreach my $room (keys %config)
- {
- if(not exists($config{$room}{address}))
- {
- Log 0, "room $room has no value for address configured";
- $errorcount++;
- }
- else
- {
- if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/)
- {
- Log 0, "no valid address for room $room found: ".$config{$room}{address};
- $errorcount++;
- }
-
- }
-
- if(not exists($config{$room}{port}))
- {
- Log 0, "room >>$room<< has no value for >>port<< configured";
- $errorcount++;
- }
- else
- {
- if(not $config{$room}{port} =~ /^\d+$/)
- {
- Log 0, "value >>port<< for room >>$room<< is not a number: ".$config{$room}{port};
- $errorcount++;
- }
- }
-
- if(not exists($config{$room}{absence_timeout}))
- {
- Log 0, "room >>$room<< has no value for >>absence_timeout<< configured";
- $errorcount++;
- }
- else
- {
- if(not $config{$room}{absence_timeout} =~ /^\d+$/)
- {
- Log 0, "value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout};
- $errorcount++;
- }
- }
-
- if(not exists($config{$room}{presence_timeout}))
- {
- Log 0, "room >>$room<< has no value for >>presence_timeout<< configured";
- $errorcount++;
- }
- else
- {
- if(not $config{$room}{presence_timeout} =~ /^\d+$/)
- {
- Log 0, "value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout};
- $errorcount++;
- }
- }
-
- foreach my $param (keys %{$config{$room}})
- {
- if(not $param =~ /(address|port|absence_timeout|presence_timeout)/)
- {
- Log 0, "invalid parameter $param in room $room";
- $errorcount++;
- }
- }
- }
- if($errorcount)
- {
- print STDERR timestamp()." found $errorcount config errors. exiting....\n";
- exit 2;
- }
- else
- {
- Log 0, "no config errors found";
- }
- }
- sub aggregateRooms
- {
- my ($hash) = @_;
- my $previous = "absence";
- my @rooms;
- my $key;
- my $first_key;
-
- # get all present rooms
- foreach $key (keys %$hash)
- {
- my $room_hash = $hash->{$key};
-
- if(defined($room_hash->{state}) and $room_hash->{state} ne "")
- {
- my ($value, $data) = split(";", $hash->{$key});
-
- if($room_hash->{state} eq "present")
- {
- push @rooms, $key;
- }
- }
- else
- {
- # if one room has no result return undef
- return undef;
- }
- }
-
-
- # if multiple rooms are present, try selection by highest RSSI
- if(@rooms > 0)
- {
- my $rssi_addon_data_key = "rssi";
- my $rssi_available = 1;
- my $highest_value;
- my $highest_key;
-
- foreach $key (@rooms)
- {
- my $data = $hash->{$key}{data};
-
- if(defined($data))
- {
- my ($a,$h) = parseParams($data,';');
-
- if(@{$a} == 1 and keys(%{$h}) == 0) # old presenced device name => convert to new style
- {
- $hash->{$key}{data} = "device_name='".$hash->{$key}{data}."'";
- $rssi_available = 0;
- }
- elsif(@{$a} == 0 and keys(%{$h}) > 0) # new addon data style
- {
- # check rssi
- if($rssi_available and exists($h->{$rssi_addon_data_key}) and $h->{$rssi_addon_data_key} =~ /^-?(?:\d+\.)\d+$/)
- {
- if(!defined($highest_value) or (defined($highest_value) and $h->{$rssi_addon_data_key} > $highest_value))
- {
- $highest_value = $h->{rssi};
- $highest_key = $key;
- }
- }
- else
- {
- $rssi_available = 0;
- }
- }
- else
- {
- Log 1, "invalid addon data received from room $key: $data";
- }
- }
- }
-
- if($rssi_available and defined($highest_key))
- {
- Log 2, "successful RSSI comparisation (highest $rssi_addon_data_key value $highest_value found in room $highest_key" if(@rooms > 1);
- @rooms = ($highest_key);
- }
- }
- if(@rooms > 0)
- {
- return "present;rooms='".join(",",sort @rooms).(defined($hash->{$rooms[0]}{data}) ? "';".$hash->{$rooms[0]}{data} : "");
- }
- else
- {
- return "absence";
- }
- }
- sub generateUUID
- {
- my $uuid = Digest::MD5::md5_hex(rand);
- while(defined($handle{$uuid}))
- {
- $uuid = Digest::MD5::md5_hex(rand);
- }
- return $uuid;
- }
- sub timestamp
- {
- return POSIX::strftime("%Y-%m-%d %H:%M:%S",localtime);
- }
- sub Log($$)
- {
- my ($loglevel, $message) = @_;
- my $thread = 0;
- if($message =~ /^\d+\|/)
- {
- ($thread, $message) = split("\\|", $message);
- }
-
- if($loglevel <= $opt_v)
- {
- if($opt_l)
- {
- open(LOGFILE, ">>$opt_l") or die ("could not open logfile: $opt_l");
- }
- else
- {
- open (LOGFILE, ">&STDOUT") or die("cannot open STDOUT");
- }
- print LOGFILE ($opt_l?"":"\r").timestamp()." - ".($opt_v >= 2 ? ($thread > 0 ? "(Thread $thread)" : "(Main Thread)")." - ":"").$message."\n";
- close(LOGFILE);
- }
- }
- #####################################
- # parseParams() from fhem.pl by justme1968
- sub parseParams($;$)
- {
- my($cmd, $separator) = @_;
- $separator = ' ' if( !$separator );
- my(@a, %h);
- my @params;
- if( ref($cmd) eq 'ARRAY' ) {
- @params = @{$cmd};
- } else {
- @params = split($separator, $cmd);
- }
- while (@params) {
- my $param = shift(@params);
- my ($key, $value) = split( '=', $param, 2 );
- if( !defined( $value ) ) {
- $value = $key;
- $key = undef;
- }
- #collect all parts until the closing ' or "
- while( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
- my $next = shift(@params);
- last if( !defined($next) );
- $value .= $separator . $next;
- }
- #remove matching ' or " from the start and end
- if( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
- $value =~ s/^.(.*).$/$1/;
- }
- #collext all parts until opening { and closing } are matched
- if( $value =~ m/^{/ ) { # } for match
- my $count = 0;
- for my $i (0..length($value)-1) {
- my $c = substr($value, $i, 1);
- ++$count if( $c eq '{' );
- --$count if( $c eq '}' );
- }
- while( $param && $count != 0 ) {
- my $next = shift(@params);
- last if( !defined($next) );
- $value .= $separator . $next;
- for my $i (0..length($next)-1) {
- my $c = substr($next, $i, 1);
- ++$count if( $c eq '{' );
- --$count if( $c eq '}' );
- }
- }
- }
- if( defined($key) ) {
- $h{$key} = $value;
- } else {
- push @a, $value;
- }
- }
- return(\@a, \%h);
- }
|