RPCQueue.pm 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package RPCQueue;
  2. use strict;
  3. use IO::File;
  4. use Fcntl 'SEEK_END', 'SEEK_SET', 'O_CREAT', 'O_RDWR';
  5. use Carp qw(carp croak);
  6. our $VERSION = '1.01';
  7. sub new
  8. {
  9. my $class = shift;
  10. my $mi = $class . '->new()';
  11. croak "$mi requires an even number of parameters" if (@_ & 1);
  12. my %params = @_;
  13. # convert to lower case
  14. my @keylist = keys %params;
  15. foreach my $key (@keylist) {
  16. my $val = $params{$key};
  17. delete $params{$key};
  18. $params{ lc($key) } = $val;
  19. }
  20. croak "$mi needs an File parameter" unless exists $params{file};
  21. my $queue_file = delete $params{file};
  22. my $idx_file = $queue_file . '.idx';
  23. $queue_file .= '.dat';
  24. my $self;
  25. my $mode = delete $params{mode} || '0600';
  26. $self->{block_size} = delete $params{blocksize} || 64;
  27. $self->{seperator} = delete $params{seperator} || "\n";
  28. $self->{sep_length} = length $self->{seperator};
  29. croak "Seperator length cannot be greater than BlockSize" if ($self->{sep_length} > $self->{block_size});
  30. $self->{queue_file} = $queue_file;
  31. $self->{idx_file} = $idx_file;
  32. $self->{queue} = new IO::File $queue_file, O_CREAT | O_RDWR, $mode or croak $!;
  33. $self->{idx} = new IO::File $idx_file, O_CREAT | O_RDWR, $mode or croak $!;
  34. ### Default ptr to 0, replace it with value in idx file if one exists
  35. $self->{idx}->sysseek(0, SEEK_SET);
  36. $self->{idx}->sysread($self->{ptr}, 1024);
  37. $self->{ptr} = '0' unless $self->{ptr};
  38. if($self->{ptr} > -s $queue_file)
  39. {
  40. carp "Ptr is greater than queue file size, resetting ptr to '0'";
  41. $self->{idx}->truncate(0) or croak "Could not truncate idx: $!";
  42. $self->{idx}->sysseek(0, SEEK_SET);
  43. $self->{idx}->syswrite('0') or croak "Could not syswrite to idx: $!";
  44. }
  45. bless $self, $class;
  46. return $self;
  47. }
  48. sub enq
  49. {
  50. my ($self, $element) = @_;
  51. $self->{queue}->sysseek(0, SEEK_END);
  52. if(ref $element)
  53. {
  54. croak 'Cannot handle references';
  55. }
  56. if($element =~ s/$self->{seperator}//g)
  57. {
  58. carp "Removed illegal seperator(s) from $element";
  59. }
  60. $self->{queue}->syswrite("$element$self->{seperator}") or croak "Could not syswrite to queue: $!";
  61. }
  62. sub deq
  63. {
  64. my $self = shift;
  65. my $element;
  66. $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
  67. my $i;
  68. while($self->{queue}->sysread($_, $self->{block_size}))
  69. {
  70. $i = index($_, $self->{seperator});
  71. if($i != -1)
  72. {
  73. $element .= substr($_, 0, $i);
  74. $self->{ptr} += $i + $self->{sep_length};
  75. $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
  76. last;
  77. }
  78. else
  79. {
  80. ## If seperator isn't found, go back 'sep_length' spaces to ensure we don't miss it between reads
  81. $element .= substr($_, 0, -$self->{sep_length}, '');
  82. $self->{ptr} += $self->{block_size} - $self->{sep_length};
  83. $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
  84. }
  85. }
  86. ## If queue seek pointer is at the EOF, truncate the queue file
  87. if($self->{queue}->sysread($_, 1) == 0)
  88. {
  89. $self->{queue}->truncate(0) or croak "Could not truncate queue: $!";
  90. $self->{queue}->sysseek($self->{ptr} = 0, SEEK_SET);
  91. }
  92. ## Set idx file contents to point to the current seek position in queue file
  93. $self->{idx}->truncate(0) or croak "Could not truncate idx: $!";
  94. $self->{idx}->sysseek(0, SEEK_SET);
  95. $self->{idx}->syswrite($self->{ptr}) or croak "Could not syswrite to idx: $!";
  96. return $element;
  97. }
  98. sub peek
  99. {
  100. my ($self, $count) = @_;
  101. croak "Invalid argument to peek ($count)" unless $count > 0;
  102. my $elements;
  103. $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
  104. my (@items, $remainder);
  105. GATHER:
  106. while($self->{queue}->sysread($_, $self->{block_size}))
  107. {
  108. if(defined $remainder)
  109. {
  110. $_ = $remainder . $_;
  111. }
  112. @items = split /$self->{seperator}/, $_, -1;
  113. $remainder = pop @items;
  114. foreach (@items)
  115. {
  116. push @$elements, $_;
  117. last GATHER if $count == @$elements;
  118. }
  119. }
  120. return $elements;
  121. }
  122. sub reset
  123. {
  124. my $self = shift;
  125. $self->{idx}->truncate(0) or croak "Could not truncate idx: $!";
  126. $self->{idx}->sysseek(0, SEEK_SET);
  127. $self->{idx}->syswrite('0') or croak "Could not syswrite to idx: $!";
  128. $self->{queue}->sysseek($self->{ptr} = 0, SEEK_SET);
  129. }
  130. sub close
  131. {
  132. my $self = shift;
  133. $self->{idx}->close();
  134. $self->{queue}->close();
  135. }
  136. sub delete
  137. {
  138. my $self = shift;
  139. $self->close();
  140. unlink $self->{queue_file};
  141. unlink $self->{idx_file};
  142. }
  143. 1;