Blocking.pm 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. ##############################################
  2. # $Id: Blocking.pm 12648 2016-11-24 12:15:25Z rudolfkoenig $
  3. package main;
  4. =pod
  5. ### Usage:
  6. Define the following in your 99_myUtils.pm
  7. sub TestBlocking($){ BlockingCall("DoSleep", shift, "SleepDone", 5, "AbortFn", "AbortArg"); }
  8. sub DoSleep($) { sleep(shift); return "I'm done"; }
  9. sub SleepDone($) { Log 1, "SleepDone: " . shift; }
  10. sub AbortFn($) { Log 1, "Aborted: " . shift; }
  11. Then call from the fhem prompt
  12. { TestBlocking(3) }
  13. { TestBlocking(6) }
  14. and watch the fhem log.
  15. =cut
  16. use strict;
  17. use warnings;
  18. use IO::Socket::INET;
  19. sub BlockingCall($$@);
  20. sub BlockingExit();
  21. sub BlockingKill($);
  22. sub BlockingInformParent($;$$);
  23. sub BlockingStart(;$);
  24. our $BC_telnetDevice;
  25. our %BC_hash;
  26. my $telnetClient;
  27. my $bc_pid = 0;
  28. sub
  29. BC_searchTelnet($)
  30. {
  31. my ($blockingFn) = @_;
  32. $BC_telnetDevice = undef;
  33. foreach my $d (sort keys %defs) { #
  34. my $h = $defs{$d};
  35. next if(!$h->{TYPE} || $h->{TYPE} ne "telnet" || $h->{SNAME});
  36. next if($attr{$d}{SSL} ||
  37. AttrVal($d, "allowfrom", "127.0.0.1") ne "127.0.0.1");
  38. next if($h->{DEF} !~ m/^\d+( global)?$/);
  39. next if($h->{DEF} =~ m/IPV6/);
  40. my %cDev = ( SNAME=>$d, TYPE=>$h->{TYPE}, NAME=>$d.time() );
  41. next if(Authenticate(\%cDev, undef) == 2); # Needs password
  42. $BC_telnetDevice = $d;
  43. last;
  44. }
  45. # If not suitable telnet device found, create a temporary one
  46. if(!$BC_telnetDevice) {
  47. $BC_telnetDevice = "telnetForBlockingFn_".time();
  48. my $ret = CommandDefine(undef, "-temporary $BC_telnetDevice telnet 0");
  49. if($ret) {
  50. $ret = "BlockingCall ($blockingFn): ".
  51. "No telnet port found and cannot create one: $ret";
  52. Log 1, $ret;
  53. return undef;
  54. }
  55. $attr{$BC_telnetDevice}{room} = "hidden"; # no red ?, Forum #46640
  56. $attr{$BC_telnetDevice}{allowfrom} = "127.0.0.1";
  57. }
  58. }
  59. sub
  60. BlockingCall($$@)
  61. {
  62. my ($blockingFn, $arg, $finishFn, $timeout, $abortFn, $abortArg) = @_;
  63. my %h = ( pid=>'WAITING:', fn=>$blockingFn, arg=>$arg, finishFn=>$finishFn,
  64. timeout=>$timeout, abortFn=>$abortFn, abortArg=>$abortArg );
  65. $BC_hash{++$bc_pid} = \%h;
  66. $h{bc_pid} = $bc_pid;
  67. return BlockingStart(\%h);
  68. }
  69. sub
  70. BlockingStart(;$)
  71. {
  72. my ($curr) = @_;
  73. if($curr && $curr =~ m/^\d+$/) {
  74. $BC_hash{$curr}{terminated} = 1 if($BC_hash{$curr});
  75. $curr = undef;
  76. }
  77. # Look for the telnetport. Must be done before forking to be able to create a
  78. # temporary device. Do it each time, as the old telnet may got a password
  79. BC_searchTelnet($curr && $curr->{fn} ? $curr->{fn}: "BlockingStart");
  80. my $chld_alive = 0;
  81. my $max = AttrVal('global', 'blockingCallMax', 0);
  82. for my $bpid (sort { $a <=> $b} keys %BC_hash) {
  83. my $h = $BC_hash{$bpid};
  84. if($h->{pid} && $h->{pid} =~ m/^-?\d+$/) { # Windows threads are negative
  85. if($^O =~ m/Win/) {
  86. # MaxNr of concurrent forked processes @Win is 64, and must use wait as
  87. # $SIG{CHLD} = 'IGNORE' does not work.
  88. wait;
  89. } else {
  90. use POSIX ":sys_wait_h";
  91. waitpid(-1, WNOHANG); # Forum #58867
  92. }
  93. if(!kill(0, $h->{pid})) {
  94. $h->{pid} = "DEAD:$h->{pid}";
  95. delete($BC_hash{$bpid});
  96. } else {
  97. $chld_alive++;
  98. }
  99. next;
  100. }
  101. if(!$h->{fn}) { # Deleted by the module in finishFn?
  102. delete($BC_hash{$bpid});
  103. next;
  104. }
  105. if($max && $chld_alive >= $max) {
  106. if($curr && $curr->{fn}) {
  107. Log 4, "BlockingCall ($curr->{fn}) enqueue: ".
  108. "limit (blockingCallMax=$max) reached";
  109. }
  110. RemoveInternalTimer(\%BC_hash);
  111. InternalTimer(gettimeofday()+5, "BlockingStart", \%BC_hash, 0);
  112. return $curr;
  113. }
  114. # do fork
  115. my $pid = fhemFork;
  116. if(!defined($pid)) {
  117. Log 1, "Cannot fork: $!";
  118. return $curr;
  119. }
  120. if($pid) {
  121. Log 4, "BlockingCall ($h->{fn}): created child ($pid), ".
  122. "uses $BC_telnetDevice to connect back";
  123. $h->{pid} = $pid;
  124. InternalTimer(gettimeofday()+$h->{timeout}, "BlockingKill", $h, 0)
  125. if($h->{timeout});
  126. $chld_alive++;
  127. next;
  128. }
  129. # Child here
  130. no strict "refs";
  131. my $ret = &{$h->{fn}}($h->{arg});
  132. use strict "refs";
  133. BlockingInformParent("BlockingStart", $h->{bc_pid}, 0);
  134. BlockingExit() if(!$h->{finishFn});
  135. # Write the data back, calling the function
  136. BlockingInformParent($h->{finishFn}, $ret, 0);
  137. BlockingExit();
  138. }
  139. return $curr;
  140. }
  141. sub
  142. BlockingInformParent($;$$)
  143. {
  144. my ($informFn, $param, $waitForRead) = @_;
  145. my $ret = undef;
  146. $waitForRead = 1 if (!defined($waitForRead));
  147. # Write the data back, calling the function
  148. if(!$telnetClient) {
  149. my $addr = "localhost:$defs{$BC_telnetDevice}{PORT}";
  150. $telnetClient = IO::Socket::INET->new(PeerAddr => $addr);
  151. if(!$telnetClient) {
  152. Log 1, "BlockingInformParent ($informFn): Can't connect to $addr: $@";
  153. return;
  154. }
  155. }
  156. if(defined($param)) {
  157. if(ref($param) eq "ARRAY") {
  158. $param = join(",", map { $_ =~ s/'/\\'/g; "'$_'" } @{$param});
  159. } else {
  160. $param =~ s/'/\\'/g;
  161. $param = "'$param'"
  162. }
  163. } else {
  164. $param = "";
  165. }
  166. $param =~ s/;/;;/g;
  167. syswrite($telnetClient, "{$informFn($param)}\n");
  168. if ($waitForRead) {
  169. my $len = sysread($telnetClient, $ret, 4096);
  170. chop($ret);
  171. $ret = undef if(!defined($len));
  172. } else {
  173. # if data is available read anyway to keep input stream clear
  174. my $rin = '';
  175. vec($rin, $telnetClient->fileno(), 1) = 1;
  176. if (select($rin, undef, undef, 0) > 0) {
  177. sysread($telnetClient, $ret, 4096);
  178. $ret = undef;
  179. }
  180. }
  181. return $ret;
  182. }
  183. # Parent
  184. sub
  185. BlockingKill($)
  186. {
  187. my $h = shift;
  188. return if($h->{terminated});
  189. if($^O !~ m/Win/) {
  190. if($h->{pid} && $h->{pid} !~ m/:/ && kill(9, $h->{pid})) {
  191. Log 1, "Timeout for $h->{fn} reached, terminated process $h->{pid}";
  192. if($h->{abortFn}) {
  193. no strict "refs";
  194. my $ret = &{$h->{abortFn}}($h->{abortArg});
  195. use strict "refs";
  196. } elsif($h->{finishFn}) {
  197. no strict "refs";
  198. my $ret = &{$h->{finishFn}}();
  199. use strict "refs";
  200. }
  201. InternalTimer(gettimeofday()+1, "BlockingStart", \%BC_hash, 0)
  202. if(kill(0, $h->{pid})); # Forum #58867
  203. }
  204. }
  205. BlockingStart();
  206. }
  207. # Child
  208. sub
  209. BlockingExit()
  210. {
  211. close($telnetClient) if($telnetClient);
  212. if($^O =~ m/Win/) {
  213. eval "require threads;";
  214. threads->exit();
  215. } else {
  216. POSIX::_exit(0);
  217. }
  218. }
  219. 1;