collectord 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068
  1. #!/usr/bin/perl
  2. #############################################################################
  3. # $Id: collectord 13121 2017-01-17 11:17:03Z markusbloch $
  4. ##############################################################################
  5. #
  6. # collectord
  7. # Connects to several presenced instances to check for multiple bluetooth devices
  8. # for their presence state and report a summary state to the 73_PRESENCE.pm module
  9. #
  10. # Copyright by Markus Bloch
  11. # e-mail: Notausstieg0309@googlemail.com
  12. #
  13. # This file is part of fhem.
  14. #
  15. # Fhem is free software: you can redistribute it and/or modify
  16. # it under the terms of the GNU General Public License as published by
  17. # the Free Software Foundation, either version 2 of the License, or
  18. # (at your option) any later version.
  19. #
  20. # Fhem is distributed in the hope that it will be useful,
  21. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  22. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  23. # GNU General Public License for more details.
  24. #
  25. # You should have received a copy of the GNU General Public License
  26. # along with fhem. If not, see <http://www.gnu.org/licenses/>.
  27. #
  28. ##############################################################################
  29. use IO::Socket;
  30. use IO::Select;
  31. use POSIX;
  32. #use Data::Dumper;
  33. use File::Basename;
  34. use Getopt::Long;
  35. use threads;
  36. use Thread::Queue;
  37. use Time::HiRes;
  38. use Time::HiRes qw(gettimeofday);
  39. use warnings;
  40. use strict;
  41. use Digest::MD5;
  42. my $new_client;
  43. my $server;
  44. my $client;
  45. my $buf;
  46. sub Log($$);
  47. sub parseParams($;$);
  48. my $opt_d;
  49. my $opt_h;
  50. my $opt_v = 0;
  51. my $opt_p = 5222;
  52. my $opt_P = "/var/run/".basename($0).".pid";
  53. my $opt_l;
  54. my $opt_c;
  55. my %config;
  56. my %queues;
  57. my $thread_counter = 0;
  58. my %state;
  59. my %handle;
  60. my %socket_to_handle;
  61. $SIG{__DIE__} = sub {
  62. my ($msg) = @_;
  63. Log 1, "PERL ERROR: $msg";
  64. };
  65. $SIG{__WARN__} = sub {
  66. my ($msg) = @_;
  67. Log 1, "PERL WARN: $msg";
  68. };
  69. Getopt::Long::Configure('bundling');
  70. GetOptions(
  71. "d" => \$opt_d, "daemon" => \$opt_d,
  72. "v+" => \$opt_v, "verbose+" => \$opt_v,
  73. "l=s" => \$opt_l, "logfile=s" => \$opt_l,
  74. "c=s" => \$opt_c, "configfile=s" => \$opt_c,
  75. "p=i" => \$opt_p, "port=i" => \$opt_p,
  76. "P=s" => \$opt_P, "pid-file=s" => \$opt_P,
  77. "h" => \$opt_h, "help" => \$opt_h
  78. );
  79. Log 0, "=================================================" if($opt_l);
  80. sub print_usage () {
  81. print "Usage:\n";
  82. print " collectord -c <configfile> [-d] [-p <port>] [-P <pidfile>] \n";
  83. print " collectord [-h | --help]\n";
  84. print "\n\nOptions:\n";
  85. print " -c, --configfile <configfile>\n";
  86. print " The config file which contains the room and timeout definitions\n";
  87. print " -p, --port\n";
  88. print " TCP Port which should be used (Default: 5222)\n";
  89. print " -P, --pid-file\n";
  90. print " PID file for storing the local process id (Default: /var/run/".basename($0).".pid)\n";
  91. print " -d, --daemon\n";
  92. print " detach from terminal and run as background daemon\n";
  93. print " -v, --verbose\n";
  94. print " Print detailed log output (can be used multiple times to increase the loglevel, max. 2 times)\n";
  95. print " -l, --logfile <logfile>\n";
  96. print " log to the given logfile\n";
  97. print " -h, --help\n";
  98. print " Print detailed help screen\n";
  99. }
  100. if($opt_h)
  101. {
  102. print_usage();
  103. exit;
  104. }
  105. if(-e "$opt_P")
  106. {
  107. print STDERR timestamp()." another process already running (PID file found at $opt_P)\n";
  108. print STDERR timestamp()." aborted...\n";
  109. exit 1;
  110. }
  111. if(not $opt_c)
  112. {
  113. print STDERR "no config file provided\n\n";
  114. print_usage();
  115. exit 1;
  116. }
  117. if(not -e "$opt_c" or not -r "$opt_c")
  118. {
  119. print STDERR "config-file $opt_c could not be loaded\n";
  120. exit 1;
  121. }
  122. Log 0, "started with PID $$";
  123. readConfig($opt_c);
  124. if($opt_d)
  125. {
  126. daemonize();
  127. }
  128. # Write PID file
  129. open(PIDFILE, ">$opt_P");
  130. print PIDFILE $$."\n";
  131. close PIDFILE;
  132. $server = new IO::Socket::INET (
  133. LocalPort => $opt_p,
  134. Proto => 'tcp',
  135. Listen => 5,
  136. Reuse => 1,
  137. Type => SOCK_STREAM,
  138. KeepAlive => 1,
  139. Blocking => 0
  140. ) or die "error while creating socket: $!\n";
  141. Log 1, "created socket on ".$server->sockhost()." with port ".$server->sockport();
  142. my $listener = IO::Select->new();
  143. $listener->add($server);
  144. my @new_handles;
  145. my %child_handles;
  146. my %child_config;
  147. my $address;
  148. my $name;
  149. my $timeout;
  150. my $write_handle;
  151. my $server_pid;
  152. my @threads;
  153. my $sig_received = undef;
  154. $SIG{HUP} = sub { $sig_received = "SIGHUP"; };
  155. $SIG{INT} = sub { $sig_received = "SIGINT"; };
  156. $SIG{TERM} = sub { $sig_received = "SIGTERM"; };
  157. $SIG{KILL} = sub { $sig_received = "SIGKILL"; };
  158. $SIG{QUIT} = sub { $sig_received = "SIGQUIT"; };
  159. $SIG{ABRT} = sub { $sig_received = "SIGABRT"; };
  160. $server_pid = $$ unless(defined($server_pid));
  161. my $status_queue = Thread::Queue->new();
  162. my $log_queue = Thread::Queue->new();
  163. Log 2, "finished initialization. entering main loop";
  164. while(1)
  165. {
  166. # Cleaning up the status hash for obsolete devices
  167. foreach my $uuid (keys %state)
  168. {
  169. my %handle_to_socket = reverse %socket_to_handle;
  170. unless(exists($handle_to_socket{$uuid}))
  171. {
  172. Log 2, "cleaning up status values (UUID: $uuid)";
  173. delete $state{$uuid};
  174. }
  175. }
  176. # process all status messages from all threads via status queue
  177. while($status_queue->pending)
  178. {
  179. my ($uuid,$room,$value,$data) = split(";", $status_queue->dequeue, 4);
  180. Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid) - value: $value".(defined($data) ? " - data: $data" : "");
  181. if(not $value =~ /^(absence|present)$/)
  182. {
  183. $handle{$uuid}{client}->send("$value;room=$room\n") if(defined($handle{$uuid}{client}));
  184. if($value eq "socket_closed")
  185. {
  186. delete($state{$uuid}{rooms}{$room});
  187. }
  188. }
  189. else
  190. {
  191. $state{$uuid}{rooms}{$room}{state} = $value;
  192. if(defined($data))
  193. {
  194. $state{$uuid}{rooms}{$room}{data} = $data;
  195. }
  196. else
  197. {
  198. delete $state{$uuid}{rooms}{$room}{data};
  199. }
  200. my $result = aggregateRooms($state{$uuid}{rooms});
  201. if(defined($result))
  202. {
  203. if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "$result"))
  204. {
  205. if(defined($handle{$uuid}{client}))
  206. {
  207. $handle{$uuid}{client}->send("$result\n");
  208. $state{$uuid}{lastresult}{value} = "$result";
  209. $state{$uuid}{lastresult}{timestamp} = time();
  210. }
  211. }
  212. }
  213. }
  214. #print Dumper(\%state);
  215. }
  216. # If a thread has something reported via Log Queue, print it out if verbose is activated
  217. while($log_queue->pending)
  218. {
  219. Log 2, $log_queue->dequeue;
  220. }
  221. # If a INET socket has anything to report
  222. if(@new_handles = $listener->can_read(1))
  223. {
  224. foreach my $client (@new_handles)
  225. {
  226. # if the socket is the server socket, accept new client and add it to the socket selector
  227. if($client == $server)
  228. {
  229. $new_client = $server->accept();
  230. $listener->add($new_client);
  231. Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport();
  232. }
  233. else # else is must be a client, so read the message and process it
  234. {
  235. $buf = '';
  236. $buf = <$client>;
  237. # if the message is defined, it is a real message, else the connection is closed (EOF)
  238. if($buf)
  239. {
  240. # replace leading and trailing white spaces
  241. $buf =~ s/(^\s*|\s*$)//g;
  242. # if the message is a new command, accept the command and create threads for all rooms to process the command
  243. if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
  244. {
  245. # send the acknowledgment back to the sender
  246. $client->send("command accepted\n");
  247. Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf";
  248. # Split the message into bluetooth address and the timeout value
  249. # (timeout is ignored within the collectord, as it is given by configuration)
  250. ($address, $timeout) = split("\\|", $buf);
  251. # remove any containing white spaces
  252. $address =~ s/\s*//g;
  253. $timeout =~ s/\s*//g;
  254. # if the client has already a request running, stop at first the old request
  255. if(defined($socket_to_handle{$client}))
  256. {
  257. my $uuid = $socket_to_handle{$client};
  258. # get all threads for this socket and send them a termination signal
  259. foreach my $room (keys %{$handle{$uuid}{threads}})
  260. {
  261. Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room";
  262. $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address");
  263. $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
  264. delete($state{$uuid}{rooms}{$room}{data});
  265. }
  266. $handle{$uuid}{timeout} = $timeout;
  267. $state{$uuid}{lastresult}{timestamp} = 0;
  268. }
  269. else
  270. {
  271. # create a new uuid if not exist for socket
  272. if(not defined($socket_to_handle{$client}))
  273. {
  274. $socket_to_handle{$client} = generateUUID();
  275. Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client};
  276. }
  277. my $uuid = $socket_to_handle{$client};
  278. $handle{$uuid}{address} = $address;
  279. $handle{$uuid}{client} = $client;
  280. $handle{$uuid}{timeout} = $timeout;
  281. $state{$uuid}{lastresult}{value} = "absence";
  282. $state{$uuid}{lastresult}{timestamp} = 0;
  283. # create a new reqester thread for each configured room to perform the query
  284. while (my ($room, $value) = each %config)
  285. {
  286. $thread_counter++;
  287. $queues{$thread_counter} = Thread::Queue->new();
  288. my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid));
  289. Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)";
  290. # detach from the thread, so the thread starts processing independantly
  291. $new_thread->detach();
  292. # save the socket/room relationship to know which thread belongs to which client request (for stop command)
  293. $handle{$uuid}{threads}{$room} = $new_thread;
  294. $state{$uuid}{rooms}{$room}{state} = "";
  295. delete($state{$uuid}{rooms}{$room}{data});
  296. }
  297. }
  298. }
  299. elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server
  300. {
  301. Log 2, "received now command from client ".$client->peerhost();
  302. # just to be sure if the client has really a running request
  303. if(defined($socket_to_handle{$client}))
  304. {
  305. my $uuid = $socket_to_handle{$client};
  306. # get all threads for this socket and send them a termination signal
  307. foreach my $room (keys %{$handle{$uuid}{threads}})
  308. {
  309. Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost();
  310. $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now");
  311. $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
  312. delete($state{$uuid}{rooms}{$room}{data});
  313. }
  314. delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult}));
  315. $client->send("command accepted\n");
  316. }
  317. else
  318. {
  319. # if there is no command running, just tell the client he's wrong
  320. $client->send("no command running\n");
  321. }
  322. }
  323. elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped
  324. {
  325. Log 1, "received stop command from client ".$client->peerhost();
  326. # just to be sure if the client has really a running request
  327. if(defined($socket_to_handle{$client}))
  328. {
  329. my $uuid = $socket_to_handle{$client};
  330. # get all threads for this socket and send them a termination signal
  331. foreach my $room (keys %{$handle{$uuid}{threads}})
  332. {
  333. Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
  334. $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
  335. delete($handle{$uuid}{threads}{$room});
  336. }
  337. # when all threads are signaled, delete all relationship entry for this client
  338. delete($handle{$uuid});
  339. delete($socket_to_handle{$client});
  340. $client->send("command accepted\n");
  341. }
  342. else
  343. {
  344. # if there is no command running, just tell the client he's wrong
  345. $client->send("no command running\n");
  346. }
  347. }
  348. else
  349. { # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging.
  350. $client->send("command rejected\n");
  351. Log 1, "received invalid command >>$buf<< from client ".$client->peerhost();
  352. }
  353. }
  354. else # if the message is not defined (EOF) the connection was closed. Now let's clean up
  355. {
  356. # make a log entry and remove the socket from the socket selector
  357. Log 1, "closed connection from ".$client->peerhost();
  358. $listener->remove($client);
  359. # if there is a running command, stop it first and clean up (same as stop command, see above)
  360. if(defined($socket_to_handle{$client}))
  361. {
  362. my $uuid = $socket_to_handle{$client};
  363. # get all threads for this socket and send them a termination signal
  364. foreach my $room (keys %{$handle{$uuid}{threads}})
  365. {
  366. Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
  367. $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
  368. delete($handle{$uuid}{threads}{$room});
  369. }
  370. # when all threads are signaled, delete all relationship entry for this client
  371. delete($handle{$uuid});
  372. delete($socket_to_handle{$client});
  373. }
  374. # now close the socket, that's it
  375. close $client;
  376. }
  377. }
  378. }
  379. }
  380. # in case we have received a process signal, remove the pid file and shutdown
  381. if(defined($sig_received))
  382. {
  383. Log 1, "Caught $sig_received exiting";
  384. unlink($opt_P);
  385. Log 1, "removed PID-File $opt_P";
  386. Log 1, "server shutdown";
  387. exit;
  388. }
  389. }
  390. Log 2, "leaving main loop";
  391. ########################################################################################################################
  392. #
  393. # Subroutine definitions
  394. #
  395. ########################################################################################################################
  396. # to fork the process from the terminal
  397. sub daemonize
  398. {
  399. POSIX::setsid or die "setsid $!";
  400. my $pid = fork();
  401. if($pid < 0)
  402. {
  403. print STDERR "cannot fork: $!\n";
  404. exit 1;
  405. }
  406. elsif($pid)
  407. {
  408. Log 0, "forked with PID $pid";
  409. exit 0;
  410. }
  411. chdir "/";
  412. umask 0;
  413. foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_ }
  414. # cut off any input and output
  415. open (STDIN, "</dev/null");
  416. open (STDOUT, ">/dev/null");
  417. open (STDERR, ">&STDOUT");
  418. }
  419. # the thread subroutine which performs a request for a specific room
  420. sub doQuery($$$)
  421. {
  422. my ($do_config, $do_room, $do_address, $do_uuid) = @_;
  423. my $return;
  424. my $socket;
  425. my %values = %{$do_config};
  426. my $selector;
  427. my $run = 1;
  428. my @client_handle;
  429. my $reconnect_count = 0;
  430. my $client_socket = undef;
  431. my $last_contact = gettimeofday();
  432. my $cmd;
  433. my $previous_state = "absence";
  434. my $current_state = "absence";
  435. $client_socket = new IO::Socket::INET (
  436. PeerHost => $values{address},
  437. PeerPort => $values{port},
  438. Proto => 'tcp',
  439. Type => SOCK_STREAM,
  440. KeepAlive => 1,
  441. Blocking => 1
  442. ) or ( $log_queue->enqueue(threads->tid()."|$do_room : could not create socket to ".$values{address}." - $! -"));
  443. $selector = IO::Select->new($client_socket);
  444. if(defined($client_socket))
  445. {
  446. # send the given address to the presence daemon
  447. $client_socket->send($do_address."|".$values{absence_timeout}."\n");
  448. }
  449. else
  450. {
  451. $selector->remove($client_socket);
  452. $client_socket = undef;
  453. }
  454. # thread main loop
  455. THREADLOOP: while($run)
  456. {
  457. if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
  458. {
  459. $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
  460. $selector->remove($client_socket);
  461. $client_socket->shutdown(2);
  462. close($client_socket);
  463. $client_socket = undef;
  464. }
  465. if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending)
  466. {
  467. $cmd = $queues{threads->tid()}->dequeue;
  468. $log_queue->enqueue(threads->tid()."|received command: $cmd");
  469. if($cmd eq "now")
  470. {
  471. $log_queue->enqueue(threads->tid()."|sending \"now\" command to ".$values{address}.":".$values{port});
  472. $client_socket->send("now\n") if(defined($client_socket));
  473. }
  474. elsif($cmd eq "stop")
  475. {
  476. $log_queue->enqueue(threads->tid()."|$do_room terminating thread ".threads->tid()." for ".$values{address});
  477. $client_socket->shutdown(2) if(defined($client_socket));
  478. $selector->remove($client_socket) if(defined($selector));
  479. close($client_socket) if(defined($client_socket));
  480. $client_socket = undef;
  481. delete($queues{threads->tid()}) if(exists($queues{threads->tid()}));
  482. $run = 0;
  483. last THREADLOOP;
  484. }
  485. elsif($cmd =~ /^new\|/)
  486. {
  487. ($cmd, $do_address) = split("\\|", $cmd);
  488. $log_queue->enqueue(threads->tid()."|sending new address $do_address to ".$values{address}.":".$values{port});
  489. if($current_state eq "present")
  490. {
  491. $client_socket->send($do_address."|".$values{presence_timeout}."\n") if(defined($client_socket));
  492. }
  493. else
  494. {
  495. $client_socket->send($do_address."|".$values{absence_timeout}."\n") if(defined($client_socket));
  496. }
  497. }
  498. }
  499. if(not defined($client_socket))
  500. {
  501. # if it's the first occurance
  502. if(!$reconnect_count)
  503. {
  504. # Tell this the client;
  505. $status_queue->enqueue("$do_uuid;$do_room;socket_closed");
  506. # create a log message
  507. $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
  508. }
  509. # now try to re-establish the connection
  510. $client_socket = new IO::Socket::INET (
  511. PeerHost => $values{address},
  512. PeerPort => $values{port},
  513. Proto => 'tcp',
  514. Type => SOCK_STREAM,
  515. KeepAlive => 1,
  516. Blocking => 1
  517. ) or ( $reconnect_count++ );
  518. if(defined($client_socket))
  519. {
  520. # give a success message
  521. $log_queue->enqueue(threads->tid()."|$do_room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
  522. $status_queue->enqueue("$do_uuid;$do_room;socket_reconnected");
  523. # reset the reconnect counter
  524. $reconnect_count = 0;
  525. # set the last contact date to now
  526. $last_contact = gettimeofday();
  527. # add the new established socket to the IO selector for incoming data monitoring.
  528. $selector->add($client_socket);
  529. # send the given address to the presence daemon
  530. $client_socket->send($do_address."|".$values{absence_timeout}."\n");
  531. }
  532. else
  533. {
  534. sleep(9);
  535. }
  536. }
  537. # if the socket has a message available
  538. if(@client_handle = $selector->can_read(1))
  539. {
  540. # get all socket handles which has a message available
  541. foreach my $local_client (@client_handle)
  542. {
  543. # get the message from the socket handle
  544. $return = <$local_client>;
  545. # if the message is defined (not EOF) handle the message...
  546. if($return)
  547. {
  548. # set the last contact date
  549. $last_contact = gettimeofday();
  550. # remove trailing whitespaces and newlines
  551. chomp($return);
  552. # if the message is "command accepted"
  553. if($return =~ /command accepted/)
  554. {
  555. # log this to the thread log queue
  556. $log_queue->enqueue(threads->tid()."|$do_room accepted command for $do_address");
  557. }
  558. elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
  559. {
  560. $log_queue->enqueue(threads->tid()."|$do_room REJECTED command for $do_address");
  561. }
  562. else # else its a status message
  563. {
  564. # put the message to the status queue with uuid for identification and the room name
  565. $status_queue->enqueue("$do_uuid;$do_room;".$return);
  566. # if the state changes from present to absence
  567. if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
  568. {
  569. # log the timout change to the log queue
  570. $log_queue->enqueue(threads->tid()."|$do_room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
  571. $current_state = "absence";
  572. # send the new command with the configured absence timeout
  573. $local_client->send($do_address."|".$values{absence_timeout}."\n");
  574. }
  575. elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
  576. {
  577. $log_queue->enqueue(threads->tid()."|$do_room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
  578. $current_state = "present";
  579. # if the state changes from absence to present, set the presence timeout
  580. $local_client->send($do_address."|".$values{presence_timeout}."\n");
  581. }
  582. # set the previous state to the current state
  583. ($previous_state, undef) = split(";", lc($return));
  584. }
  585. }
  586. else # the socket is EOF which means the connection was closed
  587. {
  588. $selector->remove($local_client);
  589. $local_client->shutdown(2);
  590. close($local_client);
  591. $client_socket = undef;
  592. }
  593. }
  594. }
  595. }
  596. $log_queue->enqueue(threads->tid()."|exiting thread");
  597. }
  598. sub readConfig
  599. {
  600. my ($ini) = @_;
  601. my $section;
  602. my $keyword;
  603. my $value;
  604. my $errorcount = 0;
  605. Log 1, "reading configuration file";
  606. %config = ();
  607. open (INI, "$ini") or (print STDERR timestamp()."Can't open $ini: $!\n" and exit(1));
  608. while (<INI>) {
  609. chomp;
  610. if (/^\s*?\[([^\]\n\r]+?)\]/) {
  611. $section = $1;
  612. }
  613. if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/ and defined($section)) {
  614. $keyword = $1;
  615. $value = $2 ;
  616. # put them into hash
  617. $config{$section}{$keyword} = $value;
  618. }
  619. }
  620. close (INI);
  621. # validating config
  622. foreach my $room (keys %config)
  623. {
  624. if(not exists($config{$room}{address}))
  625. {
  626. Log 0, "room $room has no value for address configured";
  627. $errorcount++;
  628. }
  629. else
  630. {
  631. if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/)
  632. {
  633. Log 0, "no valid address for room $room found: ".$config{$room}{address};
  634. $errorcount++;
  635. }
  636. }
  637. if(not exists($config{$room}{port}))
  638. {
  639. Log 0, "room >>$room<< has no value for >>port<< configured";
  640. $errorcount++;
  641. }
  642. else
  643. {
  644. if(not $config{$room}{port} =~ /^\d+$/)
  645. {
  646. Log 0, "value >>port<< for room >>$room<< is not a number: ".$config{$room}{port};
  647. $errorcount++;
  648. }
  649. }
  650. if(not exists($config{$room}{absence_timeout}))
  651. {
  652. Log 0, "room >>$room<< has no value for >>absence_timeout<< configured";
  653. $errorcount++;
  654. }
  655. else
  656. {
  657. if(not $config{$room}{absence_timeout} =~ /^\d+$/)
  658. {
  659. Log 0, "value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout};
  660. $errorcount++;
  661. }
  662. }
  663. if(not exists($config{$room}{presence_timeout}))
  664. {
  665. Log 0, "room >>$room<< has no value for >>presence_timeout<< configured";
  666. $errorcount++;
  667. }
  668. else
  669. {
  670. if(not $config{$room}{presence_timeout} =~ /^\d+$/)
  671. {
  672. Log 0, "value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout};
  673. $errorcount++;
  674. }
  675. }
  676. foreach my $param (keys %{$config{$room}})
  677. {
  678. if(not $param =~ /(address|port|absence_timeout|presence_timeout)/)
  679. {
  680. Log 0, "invalid parameter $param in room $room";
  681. $errorcount++;
  682. }
  683. }
  684. }
  685. if($errorcount)
  686. {
  687. print STDERR timestamp()." found $errorcount config errors. exiting....\n";
  688. exit 2;
  689. }
  690. else
  691. {
  692. Log 0, "no config errors found";
  693. }
  694. }
  695. sub aggregateRooms
  696. {
  697. my ($hash) = @_;
  698. my $previous = "absence";
  699. my @rooms;
  700. my $key;
  701. my $first_key;
  702. # get all present rooms
  703. foreach $key (keys %$hash)
  704. {
  705. my $room_hash = $hash->{$key};
  706. if(defined($room_hash->{state}) and $room_hash->{state} ne "")
  707. {
  708. my ($value, $data) = split(";", $hash->{$key});
  709. if($room_hash->{state} eq "present")
  710. {
  711. push @rooms, $key;
  712. }
  713. }
  714. else
  715. {
  716. # if one room has no result return undef
  717. return undef;
  718. }
  719. }
  720. # if multiple rooms are present, try selection by highest RSSI
  721. if(@rooms > 0)
  722. {
  723. my $rssi_addon_data_key = "rssi";
  724. my $rssi_available = 1;
  725. my $highest_value;
  726. my $highest_key;
  727. foreach $key (@rooms)
  728. {
  729. my $data = $hash->{$key}{data};
  730. if(defined($data))
  731. {
  732. my ($a,$h) = parseParams($data,';');
  733. if(@{$a} == 1 and keys(%{$h}) == 0) # old presenced device name => convert to new style
  734. {
  735. $hash->{$key}{data} = "device_name='".$hash->{$key}{data}."'";
  736. $rssi_available = 0;
  737. }
  738. elsif(@{$a} == 0 and keys(%{$h}) > 0) # new addon data style
  739. {
  740. # check rssi
  741. if($rssi_available and exists($h->{$rssi_addon_data_key}) and $h->{$rssi_addon_data_key} =~ /^-?(?:\d+\.)\d+$/)
  742. {
  743. if(!defined($highest_value) or (defined($highest_value) and $h->{$rssi_addon_data_key} > $highest_value))
  744. {
  745. $highest_value = $h->{rssi};
  746. $highest_key = $key;
  747. }
  748. }
  749. else
  750. {
  751. $rssi_available = 0;
  752. }
  753. }
  754. else
  755. {
  756. Log 1, "invalid addon data received from room $key: $data";
  757. }
  758. }
  759. }
  760. if($rssi_available and defined($highest_key))
  761. {
  762. Log 2, "successful RSSI comparisation (highest $rssi_addon_data_key value $highest_value found in room $highest_key" if(@rooms > 1);
  763. @rooms = ($highest_key);
  764. }
  765. }
  766. if(@rooms > 0)
  767. {
  768. return "present;rooms='".join(",",sort @rooms).(defined($hash->{$rooms[0]}{data}) ? "';".$hash->{$rooms[0]}{data} : "");
  769. }
  770. else
  771. {
  772. return "absence";
  773. }
  774. }
  775. sub generateUUID
  776. {
  777. my $uuid = Digest::MD5::md5_hex(rand);
  778. while(defined($handle{$uuid}))
  779. {
  780. $uuid = Digest::MD5::md5_hex(rand);
  781. }
  782. return $uuid;
  783. }
  784. sub timestamp
  785. {
  786. return POSIX::strftime("%Y-%m-%d %H:%M:%S",localtime);
  787. }
  788. sub Log($$)
  789. {
  790. my ($loglevel, $message) = @_;
  791. my $thread = 0;
  792. if($message =~ /^\d+\|/)
  793. {
  794. ($thread, $message) = split("\\|", $message);
  795. }
  796. if($loglevel <= $opt_v)
  797. {
  798. if($opt_l)
  799. {
  800. open(LOGFILE, ">>$opt_l") or die ("could not open logfile: $opt_l");
  801. }
  802. else
  803. {
  804. open (LOGFILE, ">&STDOUT") or die("cannot open STDOUT");
  805. }
  806. print LOGFILE ($opt_l?"":"\r").timestamp()." - ".($opt_v >= 2 ? ($thread > 0 ? "(Thread $thread)" : "(Main Thread)")." - ":"").$message."\n";
  807. close(LOGFILE);
  808. }
  809. }
  810. #####################################
  811. # parseParams() from fhem.pl by justme1968
  812. sub parseParams($;$)
  813. {
  814. my($cmd, $separator) = @_;
  815. $separator = ' ' if( !$separator );
  816. my(@a, %h);
  817. my @params;
  818. if( ref($cmd) eq 'ARRAY' ) {
  819. @params = @{$cmd};
  820. } else {
  821. @params = split($separator, $cmd);
  822. }
  823. while (@params) {
  824. my $param = shift(@params);
  825. my ($key, $value) = split( '=', $param, 2 );
  826. if( !defined( $value ) ) {
  827. $value = $key;
  828. $key = undef;
  829. }
  830. #collect all parts until the closing ' or "
  831. while( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
  832. my $next = shift(@params);
  833. last if( !defined($next) );
  834. $value .= $separator . $next;
  835. }
  836. #remove matching ' or " from the start and end
  837. if( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
  838. $value =~ s/^.(.*).$/$1/;
  839. }
  840. #collext all parts until opening { and closing } are matched
  841. if( $value =~ m/^{/ ) { # } for match
  842. my $count = 0;
  843. for my $i (0..length($value)-1) {
  844. my $c = substr($value, $i, 1);
  845. ++$count if( $c eq '{' );
  846. --$count if( $c eq '}' );
  847. }
  848. while( $param && $count != 0 ) {
  849. my $next = shift(@params);
  850. last if( !defined($next) );
  851. $value .= $separator . $next;
  852. for my $i (0..length($next)-1) {
  853. my $c = substr($next, $i, 1);
  854. ++$count if( $c eq '{' );
  855. --$count if( $c eq '}' );
  856. }
  857. }
  858. }
  859. if( defined($key) ) {
  860. $h{$key} = $value;
  861. } else {
  862. push @a, $value;
  863. }
  864. }
  865. return(\@a, \%h);
  866. }