require 'thread'
require 'timeout'
######################## Parallel Map engine ######################################
class ParallelMap
def initialize(options)
@nbThread = options[:nbThread] || 4
@generator = options[:generator] || raise("missing generator proc")
@mapper = options[:mapper] || raise("missing mapper proc")
@reducer = options[:reducer] || proc { |r| r }
@query=Queue.new
@result=Queue.new
@lthread=(1..@nbThread).to_a.map { |no| Thread.new(no) { mapping(@query,@result,no) } }
@th=Thread.new() { generating() }
end
def generating()
@generator.call(@query)
@nbThread.times { @query << :eend }
@lthread.each { |th| th.join }
end
def get_result()
@th.join
res=[] ; res << @result.pop while @result.size > 0
@reducer.call(res)
end
def mapping(queue,result,no)
loop {
mess=queue.shift
return if mess==:eend
begin
@mapper.call(result,mess)
rescue
result << [no.to_s,"ERROR",mess.inspect,$!.to_s].join(" ")
end
}
end
end
############################# invoke bloc foreach filename matching file
def rfind(root,filter,&blk)
$nbfile=0
Dir.glob("#{root}/*").each do |en|
bn=File.basename(en)
next if bn =~ /^\.\.?$/
if File.directory?(en)
rfind(en,filter,&blk)
else
blk.call(en) if File.fnmatch( filter, bn.downcase())
end
end
end
####################### Map : grep on one file #####################
def selectLine(out,matcher,file)
result=[]
File.open(file,"r") do |f|
f.readlines.each_with_index { |line,nol|
out << "%s:%09d:%s" % [file,nol,line] if matcher =~ line
}
end
end
####################### Reduce : sort result by filename/noline
def reduce(l)
l.sort.map { |s|
a=s.split(":",2)
a[1].gsub!(/^0+/,'')
a.join(":")
}
end
####################################################################################
# M A I N #
####################################################################################
raise("Usage : > pgrep regexp path 'file-filter'") if ARGV.length != 3
query= /#{ARGV[0]}/
path = ARGV[1]
ext = ARGV[2]
starting=Time.now.to_f
result=ParallelMap.new(
:nbThread => 5,
:generator => proc { |res| rfind(path,"*."+ext.downcase) { |file| res << file} },
:mapper => proc { |out,in_file_name| selectLine(out,query,in_file_name) },
:reducer =>proc { |rr| reduce(rr) }
).get_result()
ending=Time.now.to_f
result.each { |s| puts s }
puts "\n Duration: #{ending-starting} secs"
Read more: http://feeds.dzone.com/~r/dzone/snippets/~3/8_D86-Av4aw/13263