Blocking.pm 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. ##############################################
  2. # $Id: Blocking.pm 17553 2018-10-17 15:56:35Z rudolfkoenig $
  3. package main;
  4. =pod
  5. ### Usage:
  6. Define the following in your 99_myUtils.pm
  7. sub TestBlocking($){ BlockingCall("DoSleep", shift, "SleepDone", 5, "AbortFn", "AbortArg"); }
  8. sub DoSleep($) { sleep(shift); return "I'm done"; }
  9. sub SleepDone($) { Log 1, "SleepDone: " . shift; }
  10. sub AbortFn($) { Log 1, "Aborted: " . shift; }
  11. Then call from the fhem prompt
  12. { TestBlocking(3) }
  13. { TestBlocking(6) }
  14. and watch the fhem log.
  15. =cut
  16. use strict;
  17. use warnings;
  18. use IO::Socket::INET;
  19. sub BlockingCall($$@);
  20. sub BlockingExit();
  21. sub BlockingKill($);
  22. sub BlockingInformParent($;$$$);
  23. sub BlockingStart(;$);
  24. our $BC_telnetDevice;
  25. our %BC_hash;
  26. my $telnetClient;
  27. my $bc_pid = 0;
  28. use vars qw($BC_telnet); # set optionally by the user, Forum #68477
  29. sub
  30. BC_searchTelnet($)
  31. {
  32. my ($blockingFn) = @_;
  33. if($BC_telnet) {
  34. $BC_telnetDevice = $BC_telnet;
  35. return;
  36. }
  37. $BC_telnetDevice = undef;
  38. foreach my $d (sort keys %defs) { #
  39. my $h = $defs{$d};
  40. next if(!$h->{TYPE} || $h->{TYPE} ne "telnet" || $h->{SNAME});
  41. next if(AttrVal($d, "SSL", undef) ||
  42. AttrVal($d, "allowfrom", "127.0.0.1") ne "127.0.0.1");
  43. next if($h->{DEF} !~ m/^\d+( global)?$/);
  44. next if($h->{DEF} =~ m/IPV6/);
  45. my %cDev = ( SNAME=>$d, TYPE=>$h->{TYPE}, NAME=>$d.time() );
  46. next if(Authenticate(\%cDev, undef) == 2); # Needs password
  47. $BC_telnetDevice = $d;
  48. last;
  49. }
  50. # If not suitable telnet device found, create a temporary one
  51. if(!$BC_telnetDevice) {
  52. $BC_telnetDevice = "telnetForBlockingFn_".time();
  53. my $ret = CommandDefine(undef, "-temporary $BC_telnetDevice telnet 0");
  54. if($ret) {
  55. $ret = "BlockingCall ($blockingFn): ".
  56. "No telnet port found and cannot create one: $ret";
  57. Log 1, $ret;
  58. return undef;
  59. }
  60. $attr{$BC_telnetDevice}{room} = "hidden"; # no red ?, Forum #46640
  61. $attr{$BC_telnetDevice}{allowfrom} = "127.0.0.1";
  62. }
  63. }
  64. sub
  65. BlockingInfo($$@)
  66. {
  67. my @ret;
  68. foreach my $h (values %BC_hash) {
  69. next if($h->{terminated} || !$h->{pid});
  70. my $fn = (ref($h->{fn}) ? ref($h->{fn}) : $h->{fn});
  71. my $arg = (ref($h->{arg}) ? ref($h->{arg}) : $h->{arg});
  72. my $to = ($h->{timeout} ? $h->{timeout} : "N/A");
  73. my $conn= ($h->{telnet} ? $h->{telnet} : "N/A");
  74. push @ret, "Pid:$h->{pid} Fn:$fn Arg:$arg Timeout:$to ConnectedVia:$conn";
  75. }
  76. push @ret, "No BlockingCall processes running currently" if(!@ret);
  77. return join("\n", @ret);
  78. }
  79. sub
  80. BlockingCall($$@)
  81. {
  82. my %hash = (
  83. Fn => "BlockingInfo",
  84. Hlp => ",show info about processes started by BlockingCall"
  85. );
  86. $cmds{blockinginfo} = \%hash;
  87. my ($blockingFn, $arg, $finishFn, $timeout, $abortFn, $abortArg) = @_;
  88. my %h = ( pid=>'WAITING:', fn=>$blockingFn, arg=>$arg, finishFn=>$finishFn,
  89. timeout=>$timeout, abortFn=>$abortFn, abortArg=>$abortArg );
  90. $BC_hash{++$bc_pid} = \%h;
  91. $h{bc_pid} = $bc_pid;
  92. return BlockingStart(\%h);
  93. }
  94. sub
  95. BlockingStart(;$)
  96. {
  97. my ($curr) = @_;
  98. if($curr && $curr =~ m/^\d+$/) {
  99. $BC_hash{$curr}{terminated} = 1 if($BC_hash{$curr});
  100. $curr = undef;
  101. }
  102. # Look for the telnetport. Must be done before forking to be able to create a
  103. # temporary device. Do it each time, as the old telnet may got a password
  104. BC_searchTelnet($curr && $curr->{fn} ? $curr->{fn}: "BlockingStart");
  105. my $chld_alive = 0;
  106. my $max = AttrVal('global', 'blockingCallMax', 32);
  107. for my $bpid (sort { $a <=> $b} keys %BC_hash) {
  108. my $h = $BC_hash{$bpid};
  109. if($h->{pid} && $h->{pid} =~ m/^-?\d+$/) { # Windows threads are negative
  110. if($^O =~ m/Win/) {
  111. # MaxNr of concurrent forked processes @Win is 64, and must use wait as
  112. # $SIG{CHLD} = 'IGNORE' does not work.
  113. wait if($h->{terminated});
  114. } else {
  115. use POSIX ":sys_wait_h";
  116. waitpid(-1, WNOHANG); # Forum #58867
  117. }
  118. if(!kill(0, $h->{pid}) &&
  119. (!$h->{telnet} || !$defs{$h->{telnet}})) {
  120. $h->{pid} = "DEAD:$h->{pid}";
  121. if(!$h->{terminated} && $h->{abortFn}) {
  122. no strict "refs";
  123. my $ret = &{$h->{abortFn}}($h->{abortArg},"Process died prematurely");
  124. use strict "refs";
  125. }
  126. delete($BC_hash{$bpid});
  127. RemoveInternalTimer($h) if($h->{timeout});
  128. } else {
  129. $chld_alive++;
  130. }
  131. next;
  132. }
  133. if(!$h->{fn}) { # Deleted by the module in finishFn?
  134. delete($BC_hash{$bpid});
  135. RemoveInternalTimer($h) if($h->{timeout});
  136. next;
  137. }
  138. if($max && $chld_alive >= $max) {
  139. if($curr && $curr->{fn}) {
  140. Log 4, "BlockingCall ($curr->{fn}) enqueue: ".
  141. "limit (blockingCallMax=$max) reached";
  142. }
  143. RemoveInternalTimer(\%BC_hash);
  144. InternalTimer(gettimeofday()+5, "BlockingStart", \%BC_hash, 0);
  145. return $curr;
  146. }
  147. # do fork
  148. my $pid = fhemFork;
  149. if(!defined($pid)) {
  150. Log 1, "Cannot fork: $!";
  151. stacktrace() if(AttrVal("global", "stracktrace", 0));
  152. DoTrigger("global", "CANNOT_FORK", 1);
  153. return $curr;
  154. }
  155. if($pid) {
  156. Log 4, "BlockingCall ($h->{fn}): created child ($pid), ".
  157. "uses $BC_telnetDevice to connect back";
  158. $h->{pid} = $pid;
  159. InternalTimer(gettimeofday()+$h->{timeout}, "BlockingKill", $h, 0)
  160. if($h->{timeout});
  161. $chld_alive++;
  162. next;
  163. }
  164. # Child here
  165. BlockingInformParent("BlockingRegisterTelnet", "\$cl,$h->{bc_pid}", 1, 1)
  166. if($h->{abortFn} && $^O !~ m/Win/);
  167. no strict "refs";
  168. my $ret = &{$h->{fn}}($h->{arg});
  169. use strict "refs";
  170. BlockingInformParent("BlockingStart", $h->{bc_pid}, 0);
  171. BlockingExit() if(!$h->{finishFn});
  172. # Write the data back, calling the function
  173. BlockingInformParent($h->{finishFn}, $ret, 0);
  174. BlockingExit();
  175. }
  176. return $curr;
  177. }
  178. sub
  179. BlockingRegisterTelnet($$)
  180. {
  181. my ($cl,$idx) = @_;
  182. return 0 if(ref($cl) ne "HASH" || !$cl->{NAME} || !$defs{$cl->{NAME}} ||
  183. !$BC_hash{$idx});
  184. $BC_hash{$idx}{telnet} = $cl->{NAME};
  185. $defs{$cl->{NAME}}{BlockingCall} = $BC_hash{$idx}{fn};
  186. return 1;
  187. }
  188. sub
  189. BlockingInformParent($;$$$)
  190. {
  191. my ($informFn, $param, $waitForRead, $noEscape) = @_;
  192. my $ret = undef;
  193. $waitForRead = 1 if (!defined($waitForRead));
  194. # Write the data back, calling the function
  195. if(!$telnetClient) {
  196. my $addr = "localhost:$defs{$BC_telnetDevice}{PORT}";
  197. $telnetClient = IO::Socket::INET->new(PeerAddr => $addr);
  198. if(!$telnetClient) {
  199. Log 1, "BlockingInformParent ($informFn): Can't connect to $addr: $@";
  200. return;
  201. }
  202. }
  203. if(defined($param)) {
  204. if(!$noEscape) {
  205. if(ref($param) eq "ARRAY") {
  206. $param = join(",", map { $_ =~ s/'/\\'/g; "'$_'" } @{$param});
  207. } else {
  208. $param =~ s/'/\\'/g;
  209. $param = "'$param'"
  210. }
  211. }
  212. } else {
  213. $param = "";
  214. }
  215. $param =~ s/;/;;/g;
  216. syswrite($telnetClient, "{$informFn($param)}\n");
  217. if ($waitForRead) {
  218. my $len = sysread($telnetClient, $ret, 4096);
  219. chop($ret);
  220. $ret = undef if(!defined($len));
  221. } else {
  222. # if data is available read anyway to keep input stream clear
  223. my $rin = '';
  224. vec($rin, $telnetClient->fileno(), 1) = 1;
  225. if (select($rin, undef, undef, 0) > 0) {
  226. sysread($telnetClient, $ret, 4096);
  227. $ret = undef;
  228. }
  229. }
  230. return $ret;
  231. }
  232. # Parent
  233. sub
  234. BlockingKill($)
  235. {
  236. my $h = shift;
  237. return if($h->{terminated});
  238. # if($^O !~ m/Win/) {
  239. if($h->{pid} && $h->{pid} !~ m/:/ && kill(9, $h->{pid})) {
  240. my $ll = (defined($h->{loglevel}) ? $h->{loglevel} : 1); # Forum #77057
  241. Log $ll, "Timeout for $h->{fn} reached, terminated process $h->{pid}";
  242. $h->{terminated} = 1;
  243. if($h->{abortFn}) {
  244. no strict "refs";
  245. my $ret = &{$h->{abortFn}}($h->{abortArg},
  246. "Timeout: process terminated");
  247. use strict "refs";
  248. } elsif($h->{finishFn}) {
  249. no strict "refs";
  250. my $ret = &{$h->{finishFn}}();
  251. use strict "refs";
  252. }
  253. delete($BC_hash{$h->{bc_pid}});
  254. InternalTimer(gettimeofday()+1, "BlockingStart", \%BC_hash, 0)
  255. if(looks_like_number($h->{pid}) && kill(0, $h->{pid})); # Forum #58867
  256. }
  257. # }
  258. BlockingStart();
  259. }
  260. # Child
  261. sub
  262. BlockingExit()
  263. {
  264. close($telnetClient) if($telnetClient);
  265. if($^O =~ m/Win/) {
  266. eval "require threads;";
  267. threads->exit();
  268. } else {
  269. POSIX::_exit(0);
  270. }
  271. }
  272. 1;