summaryrefslogtreecommitdiff
path: root/jni/ruby/lib/rake/thread_pool.rb
blob: d2ac6e7ac25832a75c85d071fb2c10275f510ff1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
require 'thread'
require 'set'

require 'rake/promise'

module Rake

  class ThreadPool # :nodoc: all

    # Creates a ThreadPool object.  The +thread_count+ parameter is the size
    # of the pool.
    def initialize(thread_count)
      @max_active_threads = [thread_count, 0].max
      @threads = Set.new
      @threads_mon = Monitor.new
      @queue = Queue.new
      @join_cond = @threads_mon.new_cond

      @history_start_time = nil
      @history = []
      @history_mon = Monitor.new
      @total_threads_in_play = 0
    end

    # Creates a future executed by the +ThreadPool+.
    #
    # The args are passed to the block when executing (similarly to
    # Thread#new) The return value is an object representing
    # a future which has been created and added to the queue in the
    # pool. Sending #value to the object will sleep the
    # current thread until the future is finished and will return the
    # result (or raise an exception thrown from the future)
    def future(*args, &block)
      promise = Promise.new(args, &block)
      promise.recorder = lambda { |*stats| stat(*stats) }

      @queue.enq promise
      stat :queued, :item_id => promise.object_id
      start_thread
      promise
    end

    # Waits until the queue of futures is empty and all threads have exited.
    def join
      @threads_mon.synchronize do
        begin
          stat :joining
          @join_cond.wait unless @threads.empty?
          stat :joined
        rescue Exception => e
          stat :joined
          $stderr.puts e
          $stderr.print "Queue contains #{@queue.size} items. " +
            "Thread pool contains #{@threads.count} threads\n"
          $stderr.print "Current Thread #{Thread.current} status = " +
            "#{Thread.current.status}\n"
          $stderr.puts e.backtrace.join("\n")
          @threads.each do |t|
            $stderr.print "Thread #{t} status = #{t.status}\n"
            # 1.8 doesn't support Thread#backtrace
            $stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
          end
          raise e
        end
      end
    end

    # Enable the gathering of history events.
    def gather_history          #:nodoc:
      @history_start_time = Time.now if @history_start_time.nil?
    end

    # Return a array of history events for the thread pool.
    #
    # History gathering must be enabled to be able to see the events
    # (see #gather_history). Best to call this when the job is
    # complete (i.e. after ThreadPool#join is called).
    def history                 # :nodoc:
      @history_mon.synchronize { @history.dup }.
        sort_by { |i| i[:time] }.
        each { |i| i[:time] -= @history_start_time }
    end

    # Return a hash of always collected statistics for the thread pool.
    def statistics              #  :nodoc:
      {
        :total_threads_in_play => @total_threads_in_play,
        :max_active_threads => @max_active_threads,
      }
    end

    private

    # processes one item on the queue. Returns true if there was an
    # item to process, false if there was no item
    def process_queue_item      #:nodoc:
      return false if @queue.empty?

      # Even though we just asked if the queue was empty, it
      # still could have had an item which by this statement
      # is now gone. For this reason we pass true to Queue#deq
      # because we will sleep indefinitely if it is empty.
      promise = @queue.deq(true)
      stat :dequeued, :item_id => promise.object_id
      promise.work
      return true

      rescue ThreadError # this means the queue is empty
      false
    end

    def safe_thread_count
      @threads_mon.synchronize do
        @threads.count
      end
    end

    def start_thread # :nodoc:
      @threads_mon.synchronize do
        next unless @threads.count < @max_active_threads

        t = Thread.new do
          begin
            while safe_thread_count <= @max_active_threads
              break unless process_queue_item
            end
          ensure
            @threads_mon.synchronize do
              @threads.delete Thread.current
              stat :ended, :thread_count => @threads.count
              @join_cond.broadcast if @threads.empty?
            end
          end
        end

        @threads << t
        stat(
          :spawned,
          :new_thread   => t.object_id,
          :thread_count => @threads.count)
        @total_threads_in_play = @threads.count if
          @threads.count > @total_threads_in_play
      end
    end

    def stat(event, data=nil) # :nodoc:
      return if @history_start_time.nil?
      info = {
        :event  => event,
        :data   => data,
        :time   => Time.now,
        :thread => Thread.current.object_id,
      }
      @history_mon.synchronize { @history << info }
    end

    # for testing only

    def __queue__ # :nodoc:
      @queue
    end
  end

end