Blocking.pm 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. ##############################################
  2. # $Id: Blocking.pm 15412 2017-11-09 14:34:29Z rudolfkoenig $
  3. package main;
  4. =pod
  5. ### Usage:
  6. Define the following in your 99_myUtils.pm
  7. sub TestBlocking($){ BlockingCall("DoSleep", shift, "SleepDone", 5, "AbortFn", "AbortArg"); }
  8. sub DoSleep($) { sleep(shift); return "I'm done"; }
  9. sub SleepDone($) { Log 1, "SleepDone: " . shift; }
  10. sub AbortFn($) { Log 1, "Aborted: " . shift; }
  11. Then call from the fhem prompt
  12. { TestBlocking(3) }
  13. { TestBlocking(6) }
  14. and watch the fhem log.
  15. =cut
  16. use strict;
  17. use warnings;
  18. use IO::Socket::INET;
  19. sub BlockingCall($$@);
  20. sub BlockingExit();
  21. sub BlockingKill($);
  22. sub BlockingInformParent($;$$$);
  23. sub BlockingStart(;$);
  24. our $BC_telnetDevice;
  25. our %BC_hash;
  26. my $telnetClient;
  27. my $bc_pid = 0;
  28. use vars qw($BC_telnet); # set optionally by the user, Forum #68477
  29. sub
  30. BC_searchTelnet($)
  31. {
  32. my ($blockingFn) = @_;
  33. if($BC_telnet) {
  34. $BC_telnetDevice = $BC_telnet;
  35. return;
  36. }
  37. $BC_telnetDevice = undef;
  38. foreach my $d (sort keys %defs) { #
  39. my $h = $defs{$d};
  40. next if(!$h->{TYPE} || $h->{TYPE} ne "telnet" || $h->{SNAME});
  41. next if(AttrVal($d, "SSL", undef) ||
  42. AttrVal($d, "allowfrom", "127.0.0.1") ne "127.0.0.1");
  43. next if($h->{DEF} !~ m/^\d+( global)?$/);
  44. next if($h->{DEF} =~ m/IPV6/);
  45. my %cDev = ( SNAME=>$d, TYPE=>$h->{TYPE}, NAME=>$d.time() );
  46. next if(Authenticate(\%cDev, undef) == 2); # Needs password
  47. $BC_telnetDevice = $d;
  48. last;
  49. }
  50. # If not suitable telnet device found, create a temporary one
  51. if(!$BC_telnetDevice) {
  52. $BC_telnetDevice = "telnetForBlockingFn_".time();
  53. my $ret = CommandDefine(undef, "-temporary $BC_telnetDevice telnet 0");
  54. if($ret) {
  55. $ret = "BlockingCall ($blockingFn): ".
  56. "No telnet port found and cannot create one: $ret";
  57. Log 1, $ret;
  58. return undef;
  59. }
  60. $attr{$BC_telnetDevice}{room} = "hidden"; # no red ?, Forum #46640
  61. $attr{$BC_telnetDevice}{allowfrom} = "127.0.0.1";
  62. }
  63. }
  64. sub
  65. BlockingInfo($$@)
  66. {
  67. my @ret;
  68. foreach my $h (values %BC_hash) {
  69. next if($h->{terminated} || !$h->{pid});
  70. my $fn = (ref($h->{fn}) ? ref($h->{fn}) : $h->{fn});
  71. my $arg = (ref($h->{arg}) ? ref($h->{arg}) : $h->{arg});
  72. my $to = ($h->{timeout} ? $h->{timeout} : "N/A");
  73. my $conn= ($h->{telnet} ? $h->{telnet} : "N/A");
  74. push @ret, "Pid:$h->{pid} Fn:$fn Arg:$arg Timeout:$to ConnectedVia:$conn";
  75. }
  76. push @ret, "No BlockingCall processes running currently" if(!@ret);
  77. return join("\n", @ret);
  78. }
  79. sub
  80. BlockingCall($$@)
  81. {
  82. my %hash = (
  83. Fn => "BlockingInfo",
  84. Hlp => ",show info about processes started by BlockingCall"
  85. );
  86. $cmds{blockinginfo} = \%hash;
  87. my ($blockingFn, $arg, $finishFn, $timeout, $abortFn, $abortArg) = @_;
  88. my %h = ( pid=>'WAITING:', fn=>$blockingFn, arg=>$arg, finishFn=>$finishFn,
  89. timeout=>$timeout, abortFn=>$abortFn, abortArg=>$abortArg );
  90. $BC_hash{++$bc_pid} = \%h;
  91. $h{bc_pid} = $bc_pid;
  92. return BlockingStart(\%h);
  93. }
  94. sub
  95. BlockingStart(;$)
  96. {
  97. my ($curr) = @_;
  98. if($curr && $curr =~ m/^\d+$/) {
  99. $BC_hash{$curr}{terminated} = 1 if($BC_hash{$curr});
  100. $curr = undef;
  101. }
  102. # Look for the telnetport. Must be done before forking to be able to create a
  103. # temporary device. Do it each time, as the old telnet may got a password
  104. BC_searchTelnet($curr && $curr->{fn} ? $curr->{fn}: "BlockingStart");
  105. my $chld_alive = 0;
  106. my $max = AttrVal('global', 'blockingCallMax', 0);
  107. for my $bpid (sort { $a <=> $b} keys %BC_hash) {
  108. my $h = $BC_hash{$bpid};
  109. if($h->{pid} && $h->{pid} =~ m/^-?\d+$/) { # Windows threads are negative
  110. if($^O =~ m/Win/) {
  111. # MaxNr of concurrent forked processes @Win is 64, and must use wait as
  112. # $SIG{CHLD} = 'IGNORE' does not work.
  113. wait if(!$h->{telnet} || !$defs{$h->{telnet}});
  114. } else {
  115. use POSIX ":sys_wait_h";
  116. waitpid(-1, WNOHANG); # Forum #58867
  117. }
  118. if(!kill(0, $h->{pid}) &&
  119. (!$h->{telnet} || !$defs{$h->{telnet}})) {
  120. $h->{pid} = "DEAD:$h->{pid}";
  121. if(!$h->{terminated} && $h->{abortFn}) {
  122. no strict "refs";
  123. my $ret = &{$h->{abortFn}}($h->{abortArg},"Process died prematurely");
  124. use strict "refs";
  125. }
  126. delete($BC_hash{$bpid});
  127. RemoveInternalTimer($h) if($h->{timeout});
  128. } else {
  129. $chld_alive++;
  130. }
  131. next;
  132. }
  133. if(!$h->{fn}) { # Deleted by the module in finishFn?
  134. delete($BC_hash{$bpid});
  135. RemoveInternalTimer($h) if($h->{timeout});
  136. next;
  137. }
  138. if($max && $chld_alive >= $max) {
  139. if($curr && $curr->{fn}) {
  140. Log 4, "BlockingCall ($curr->{fn}) enqueue: ".
  141. "limit (blockingCallMax=$max) reached";
  142. }
  143. RemoveInternalTimer(\%BC_hash);
  144. InternalTimer(gettimeofday()+5, "BlockingStart", \%BC_hash, 0);
  145. return $curr;
  146. }
  147. # do fork
  148. my $pid = fhemFork;
  149. if(!defined($pid)) {
  150. Log 1, "Cannot fork: $!";
  151. return $curr;
  152. }
  153. if($pid) {
  154. Log 4, "BlockingCall ($h->{fn}): created child ($pid), ".
  155. "uses $BC_telnetDevice to connect back";
  156. $h->{pid} = $pid;
  157. InternalTimer(gettimeofday()+$h->{timeout}, "BlockingKill", $h, 0)
  158. if($h->{timeout});
  159. $chld_alive++;
  160. next;
  161. }
  162. # Child here
  163. BlockingInformParent("BlockingRegisterTelnet", "\$cl,$h->{bc_pid}", 1, 1)
  164. if($h->{abortFn} && $^O !~ m/Win/);
  165. no strict "refs";
  166. my $ret = &{$h->{fn}}($h->{arg});
  167. use strict "refs";
  168. BlockingInformParent("BlockingStart", $h->{bc_pid}, 0);
  169. BlockingExit() if(!$h->{finishFn});
  170. # Write the data back, calling the function
  171. BlockingInformParent($h->{finishFn}, $ret, 0);
  172. BlockingExit();
  173. }
  174. return $curr;
  175. }
  176. sub
  177. BlockingRegisterTelnet($$)
  178. {
  179. my ($cl,$idx) = @_;
  180. return 0 if(ref($cl) ne "HASH" || !$cl->{NAME} || !$defs{$cl->{NAME}} ||
  181. !$BC_hash{$idx});
  182. $BC_hash{$idx}{telnet} = $cl->{NAME};
  183. $defs{$cl->{NAME}}{BlockingCall} = $BC_hash{$idx}{fn};
  184. return 1;
  185. }
  186. sub
  187. BlockingInformParent($;$$$)
  188. {
  189. my ($informFn, $param, $waitForRead, $noEscape) = @_;
  190. my $ret = undef;
  191. $waitForRead = 1 if (!defined($waitForRead));
  192. # Write the data back, calling the function
  193. if(!$telnetClient) {
  194. my $addr = "localhost:$defs{$BC_telnetDevice}{PORT}";
  195. $telnetClient = IO::Socket::INET->new(PeerAddr => $addr);
  196. if(!$telnetClient) {
  197. Log 1, "BlockingInformParent ($informFn): Can't connect to $addr: $@";
  198. return;
  199. }
  200. }
  201. if(defined($param)) {
  202. if(!$noEscape) {
  203. if(ref($param) eq "ARRAY") {
  204. $param = join(",", map { $_ =~ s/'/\\'/g; "'$_'" } @{$param});
  205. } else {
  206. $param =~ s/'/\\'/g;
  207. $param = "'$param'"
  208. }
  209. }
  210. } else {
  211. $param = "";
  212. }
  213. $param =~ s/;/;;/g;
  214. syswrite($telnetClient, "{$informFn($param)}\n");
  215. if ($waitForRead) {
  216. my $len = sysread($telnetClient, $ret, 4096);
  217. chop($ret);
  218. $ret = undef if(!defined($len));
  219. } else {
  220. # if data is available read anyway to keep input stream clear
  221. my $rin = '';
  222. vec($rin, $telnetClient->fileno(), 1) = 1;
  223. if (select($rin, undef, undef, 0) > 0) {
  224. sysread($telnetClient, $ret, 4096);
  225. $ret = undef;
  226. }
  227. }
  228. return $ret;
  229. }
  230. # Parent
  231. sub
  232. BlockingKill($)
  233. {
  234. my $h = shift;
  235. return if($h->{terminated});
  236. # if($^O !~ m/Win/) {
  237. if($h->{pid} && $h->{pid} !~ m/:/ && kill(9, $h->{pid})) {
  238. my $ll = (defined($h->{loglevel}) ? $h->{loglevel} : 1); # Forum #77057
  239. Log $ll, "Timeout for $h->{fn} reached, terminated process $h->{pid}";
  240. $h->{terminated} = 1;
  241. if($h->{abortFn}) {
  242. no strict "refs";
  243. my $ret = &{$h->{abortFn}}($h->{abortArg},
  244. "Timeout: process terminated");
  245. use strict "refs";
  246. } elsif($h->{finishFn}) {
  247. no strict "refs";
  248. my $ret = &{$h->{finishFn}}();
  249. use strict "refs";
  250. }
  251. delete($BC_hash{$h->{bc_pid}});
  252. InternalTimer(gettimeofday()+1, "BlockingStart", \%BC_hash, 0)
  253. if(looks_like_number($h->{pid}) && kill(0, $h->{pid})); # Forum #58867
  254. }
  255. # }
  256. BlockingStart();
  257. }
  258. # Child
  259. sub
  260. BlockingExit()
  261. {
  262. close($telnetClient) if($telnetClient);
  263. if($^O =~ m/Win/) {
  264. eval "require threads;";
  265. threads->exit();
  266. } else {
  267. POSIX::_exit(0);
  268. }
  269. }
  270. 1;