1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
require 'thread'
require 'set'
require 'rake/promise'
module Rake
class ThreadPool # :nodoc: all
# Creates a ThreadPool object. The +thread_count+ parameter is the size
# of the pool.
def initialize(thread_count)
@max_active_threads = [thread_count, 0].max
@threads = Set.new
@threads_mon = Monitor.new
@queue = Queue.new
@join_cond = @threads_mon.new_cond
@history_start_time = nil
@history = []
@history_mon = Monitor.new
@total_threads_in_play = 0
end
# Creates a future executed by the +ThreadPool+.
#
# The args are passed to the block when executing (similarly to
# Thread#new) The return value is an object representing
# a future which has been created and added to the queue in the
# pool. Sending #value to the object will sleep the
# current thread until the future is finished and will return the
# result (or raise an exception thrown from the future)
def future(*args, &block)
promise = Promise.new(args, &block)
promise.recorder = lambda { |*stats| stat(*stats) }
@queue.enq promise
stat :queued, :item_id => promise.object_id
start_thread
promise
end
# Waits until the queue of futures is empty and all threads have exited.
def join
@threads_mon.synchronize do
begin
stat :joining
@join_cond.wait unless @threads.empty?
stat :joined
rescue Exception => e
stat :joined
$stderr.puts e
$stderr.print "Queue contains #{@queue.size} items. " +
"Thread pool contains #{@threads.count} threads\n"
$stderr.print "Current Thread #{Thread.current} status = " +
"#{Thread.current.status}\n"
$stderr.puts e.backtrace.join("\n")
@threads.each do |t|
$stderr.print "Thread #{t} status = #{t.status}\n"
# 1.8 doesn't support Thread#backtrace
$stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
end
raise e
end
end
end
# Enable the gathering of history events.
def gather_history #:nodoc:
@history_start_time = Time.now if @history_start_time.nil?
end
# Return a array of history events for the thread pool.
#
# History gathering must be enabled to be able to see the events
# (see #gather_history). Best to call this when the job is
# complete (i.e. after ThreadPool#join is called).
def history # :nodoc:
@history_mon.synchronize { @history.dup }.
sort_by { |i| i[:time] }.
each { |i| i[:time] -= @history_start_time }
end
# Return a hash of always collected statistics for the thread pool.
def statistics # :nodoc:
{
:total_threads_in_play => @total_threads_in_play,
:max_active_threads => @max_active_threads,
}
end
private
# processes one item on the queue. Returns true if there was an
# item to process, false if there was no item
def process_queue_item #:nodoc:
return false if @queue.empty?
# Even though we just asked if the queue was empty, it
# still could have had an item which by this statement
# is now gone. For this reason we pass true to Queue#deq
# because we will sleep indefinitely if it is empty.
promise = @queue.deq(true)
stat :dequeued, :item_id => promise.object_id
promise.work
return true
rescue ThreadError # this means the queue is empty
false
end
def safe_thread_count
@threads_mon.synchronize do
@threads.count
end
end
def start_thread # :nodoc:
@threads_mon.synchronize do
next unless @threads.count < @max_active_threads
t = Thread.new do
begin
while safe_thread_count <= @max_active_threads
break unless process_queue_item
end
ensure
@threads_mon.synchronize do
@threads.delete Thread.current
stat :ended, :thread_count => @threads.count
@join_cond.broadcast if @threads.empty?
end
end
end
@threads << t
stat(
:spawned,
:new_thread => t.object_id,
:thread_count => @threads.count)
@total_threads_in_play = @threads.count if
@threads.count > @total_threads_in_play
end
end
def stat(event, data=nil) # :nodoc:
return if @history_start_time.nil?
info = {
:event => event,
:data => data,
:time => Time.now,
:thread => Thread.current.object_id,
}
@history_mon.synchronize { @history << info }
end
# for testing only
def __queue__ # :nodoc:
@queue
end
end
end
|