From fcbf63e62c627deae76c1b8cb8c0876c536ed811 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Mon, 16 Mar 2020 18:49:26 +0900 Subject: Fresh start --- jni/ruby/lib/rinda/rinda.rb | 327 ++++++++++++++++++++ jni/ruby/lib/rinda/ring.rb | 480 +++++++++++++++++++++++++++++ jni/ruby/lib/rinda/tuplespace.rb | 642 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 1449 insertions(+) create mode 100644 jni/ruby/lib/rinda/rinda.rb create mode 100644 jni/ruby/lib/rinda/ring.rb create mode 100644 jni/ruby/lib/rinda/tuplespace.rb (limited to 'jni/ruby/lib/rinda') diff --git a/jni/ruby/lib/rinda/rinda.rb b/jni/ruby/lib/rinda/rinda.rb new file mode 100644 index 0000000..d9cd378 --- /dev/null +++ b/jni/ruby/lib/rinda/rinda.rb @@ -0,0 +1,327 @@ +require 'drb/drb' +require 'thread' + +## +# A module to implement the Linda distributed computing paradigm in Ruby. +# +# Rinda is part of DRb (dRuby). +# +# == Example(s) +# +# See the sample/drb/ directory in the Ruby distribution, from 1.8.2 onwards. +# +#-- +# TODO +# == Introduction to Linda/rinda? +# +# == Why is this library separate from DRb? + +module Rinda + + ## + # Rinda error base class + + class RindaError < RuntimeError; end + + ## + # Raised when a hash-based tuple has an invalid key. + + class InvalidHashTupleKey < RindaError; end + + ## + # Raised when trying to use a canceled tuple. + + class RequestCanceledError < ThreadError; end + + ## + # Raised when trying to use an expired tuple. + + class RequestExpiredError < ThreadError; end + + ## + # A tuple is the elementary object in Rinda programming. + # Tuples may be matched against templates if the tuple and + # the template are the same size. + + class Tuple + + ## + # Creates a new Tuple from +ary_or_hash+ which must be an Array or Hash. + + def initialize(ary_or_hash) + if hash?(ary_or_hash) + init_with_hash(ary_or_hash) + else + init_with_ary(ary_or_hash) + end + end + + ## + # The number of elements in the tuple. + + def size + @tuple.size + end + + ## + # Accessor method for elements of the tuple. + + def [](k) + @tuple[k] + end + + ## + # Fetches item +k+ from the tuple. + + def fetch(k) + @tuple.fetch(k) + end + + ## + # Iterate through the tuple, yielding the index or key, and the + # value, thus ensuring arrays are iterated similarly to hashes. + + def each # FIXME + if Hash === @tuple + @tuple.each { |k, v| yield(k, v) } + else + @tuple.each_with_index { |v, k| yield(k, v) } + end + end + + ## + # Return the tuple itself + def value + @tuple + end + + private + + def hash?(ary_or_hash) + ary_or_hash.respond_to?(:keys) + end + + ## + # Munges +ary+ into a valid Tuple. + + def init_with_ary(ary) + @tuple = Array.new(ary.size) + @tuple.size.times do |i| + @tuple[i] = ary[i] + end + end + + ## + # Ensures +hash+ is a valid Tuple. + + def init_with_hash(hash) + @tuple = Hash.new + hash.each do |k, v| + raise InvalidHashTupleKey unless String === k + @tuple[k] = v + end + end + + end + + ## + # Templates are used to match tuples in Rinda. + + class Template < Tuple + + ## + # Matches this template against +tuple+. The +tuple+ must be the same + # size as the template. An element with a +nil+ value in a template acts + # as a wildcard, matching any value in the corresponding position in the + # tuple. Elements of the template match the +tuple+ if the are #== or + # #===. + # + # Template.new([:foo, 5]).match Tuple.new([:foo, 5]) # => true + # Template.new([:foo, nil]).match Tuple.new([:foo, 5]) # => true + # Template.new([String]).match Tuple.new(['hello']) # => true + # + # Template.new([:foo]).match Tuple.new([:foo, 5]) # => false + # Template.new([:foo, 6]).match Tuple.new([:foo, 5]) # => false + # Template.new([:foo, nil]).match Tuple.new([:foo]) # => false + # Template.new([:foo, 6]).match Tuple.new([:foo]) # => false + + def match(tuple) + return false unless tuple.respond_to?(:size) + return false unless tuple.respond_to?(:fetch) + return false unless self.size == tuple.size + each do |k, v| + begin + it = tuple.fetch(k) + rescue + return false + end + next if v.nil? + next if v == it + next if v === it + return false + end + return true + end + + ## + # Alias for #match. + + def ===(tuple) + match(tuple) + end + + end + + ## + # Documentation? + + class DRbObjectTemplate + + ## + # Creates a new DRbObjectTemplate that will match against +uri+ and +ref+. + + def initialize(uri=nil, ref=nil) + @drb_uri = uri + @drb_ref = ref + end + + ## + # This DRbObjectTemplate matches +ro+ if the remote object's drburi and + # drbref are the same. +nil+ is used as a wildcard. + + def ===(ro) + return true if super(ro) + unless @drb_uri.nil? + return false unless (@drb_uri === ro.__drburi rescue false) + end + unless @drb_ref.nil? + return false unless (@drb_ref === ro.__drbref rescue false) + end + true + end + + end + + ## + # TupleSpaceProxy allows a remote Tuplespace to appear as local. + + class TupleSpaceProxy + ## + # A Port ensures that a moved tuple arrives properly at its destination + # and does not get lost. + # + # See https://bugs.ruby-lang.org/issues/8125 + + class Port # :nodoc: + attr_reader :value + + def self.deliver + port = new + + begin + yield(port) + ensure + port.close + end + + port.value + end + + def initialize + @open = true + @value = nil + end + + ## + # Don't let the DRb thread push to it when remote sends tuple + + def close + @open = false + end + + ## + # Stores +value+ and ensure it does not get marshaled multiple times. + + def push value + raise 'port closed' unless @open + + @value = value + + nil # avoid Marshal + end + end + + ## + # Creates a new TupleSpaceProxy to wrap +ts+. + + def initialize(ts) + @ts = ts + end + + ## + # Adds +tuple+ to the proxied TupleSpace. See TupleSpace#write. + + def write(tuple, sec=nil) + @ts.write(tuple, sec) + end + + ## + # Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take. + + def take(tuple, sec=nil, &block) + Port.deliver do |port| + @ts.move(DRbObject.new(port), tuple, sec, &block) + end + end + + ## + # Reads +tuple+ from the proxied TupleSpace. See TupleSpace#read. + + def read(tuple, sec=nil, &block) + @ts.read(tuple, sec, &block) + end + + ## + # Reads all tuples matching +tuple+ from the proxied TupleSpace. See + # TupleSpace#read_all. + + def read_all(tuple) + @ts.read_all(tuple) + end + + ## + # Registers for notifications of event +ev+ on the proxied TupleSpace. + # See TupleSpace#notify + + def notify(ev, tuple, sec=nil) + @ts.notify(ev, tuple, sec) + end + + end + + ## + # An SimpleRenewer allows a TupleSpace to check if a TupleEntry is still + # alive. + + class SimpleRenewer + + include DRbUndumped + + ## + # Creates a new SimpleRenewer that keeps an object alive for another +sec+ + # seconds. + + def initialize(sec=180) + @sec = sec + end + + ## + # Called by the TupleSpace to check if the object is still alive. + + def renew + @sec + end + end + +end + diff --git a/jni/ruby/lib/rinda/ring.rb b/jni/ruby/lib/rinda/ring.rb new file mode 100644 index 0000000..fe33420 --- /dev/null +++ b/jni/ruby/lib/rinda/ring.rb @@ -0,0 +1,480 @@ +# +# Note: Rinda::Ring API is unstable. +# +require 'drb/drb' +require 'rinda/rinda' +require 'thread' +require 'ipaddr' + +module Rinda + + ## + # The default port Ring discovery will use. + + Ring_PORT = 7647 + + ## + # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. + # Default service location uses the following steps: + # + # 1. A RingServer begins listening on the network broadcast UDP address. + # 2. A RingFinger sends a UDP packet containing the DRb URI where it will + # listen for a reply. + # 3. The RingServer receives the UDP packet and connects back to the + # provided DRb URI with the DRb service. + # + # A RingServer requires a TupleSpace: + # + # ts = Rinda::TupleSpace.new + # rs = Rinda::RingServer.new + # + # RingServer can also listen on multicast addresses for announcements. This + # allows multiple RingServers to run on the same host. To use network + # broadcast and multicast: + # + # ts = Rinda::TupleSpace.new + # rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1] + + class RingServer + + include DRbUndumped + + ## + # Special renewer for the RingServer to allow shutdown + + class Renewer # :nodoc: + include DRbUndumped + + ## + # Set to false to shutdown future requests using this Renewer + + attr_writer :renew + + def initialize # :nodoc: + @renew = true + end + + def renew # :nodoc: + @renew ? 1 : true + end + end + + ## + # Advertises +ts+ on the given +addresses+ at +port+. + # + # If +addresses+ is omitted only the UDP broadcast address is used. + # + # +addresses+ can contain multiple addresses. If a multicast address is + # given in +addresses+ then the RingServer will listen for multicast + # queries. + # + # If you use IPv4 multicast you may need to set an address of the inbound + # interface which joins a multicast group. + # + # ts = Rinda::TupleSpace.new + # rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']]) + # + # You can set addresses as an Array Object. The first element of the + # Array is a multicast address and the second is an inbound interface + # address. If the second is omitted then '0.0.0.0' is used. + # + # If you use IPv6 multicast you may need to set both the local interface + # address and the inbound interface index: + # + # rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]]) + # + # The first element is a multicast address and the second is an inbound + # interface address. The third is an inbound interface index. + # + # At this time there is no easy way to get an interface index by name. + # + # If the second is omitted then '::1' is used. + # If the third is omitted then 0 (default interface) is used. + + def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) + @port = port + + if Integer === addresses then + addresses, @port = [Socket::INADDR_ANY], addresses + end + + @renewer = Renewer.new + + @ts = ts + @sockets = [] + addresses.each do |address| + if Array === address + make_socket(*address) + else + make_socket(address) + end + end + + @w_services = write_services + @r_service = reply_service + end + + ## + # Creates a socket at +address+ + # + # If +address+ is multicast address then +interface_address+ and + # +multicast_interface+ can be set as optional. + # + # A created socket is bound to +interface_address+. If you use IPv4 + # multicast then the interface of +interface_address+ is used as the + # inbound interface. If +interface_address+ is omitted or nil then + # '0.0.0.0' or '::1' is used. + # + # If you use IPv6 multicast then +multicast_interface+ is used as the + # inbound interface. +multicast_interface+ is a network interface index. + # If +multicast_interface+ is omitted then 0 (default interface) is used. + + def make_socket(address, interface_address=nil, multicast_interface=0) + addrinfo = Addrinfo.udp(address, @port) + + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, + addrinfo.protocol) + @sockets << socket + + if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then + if Socket.const_defined?(:SO_REUSEPORT) then + socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) + else + socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) + end + + if addrinfo.ipv4_multicast? then + interface_address = '0.0.0.0' if interface_address.nil? + socket.bind(Addrinfo.udp(interface_address, @port)) + + mreq = IPAddr.new(addrinfo.ip_address).hton + + IPAddr.new(interface_address).hton + + socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) + else + interface_address = '::1' if interface_address.nil? + socket.bind(Addrinfo.udp(interface_address, @port)) + + mreq = IPAddr.new(addrinfo.ip_address).hton + + [multicast_interface].pack('I') + + socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) + end + else + socket.bind(addrinfo) + end + + socket + end + + ## + # Creates threads that pick up UDP packets and passes them to do_write for + # decoding. + + def write_services + @sockets.map do |s| + Thread.new(s) do |socket| + loop do + msg = socket.recv(1024) + do_write(msg) + end + end + end + end + + ## + # Extracts the response URI from +msg+ and adds it to TupleSpace where it + # will be picked up by +reply_service+ for notification. + + def do_write(msg) + Thread.new do + begin + tuple, sec = Marshal.load(msg) + @ts.write(tuple, sec) + rescue + end + end + end + + ## + # Creates a thread that notifies waiting clients from the TupleSpace. + + def reply_service + Thread.new do + loop do + do_reply + end + end + end + + ## + # Pulls lookup tuples out of the TupleSpace and sends their DRb object the + # address of the local TupleSpace. + + def do_reply + tuple = @ts.take([:lookup_ring, nil], @renewer) + Thread.new { tuple[1].call(@ts) rescue nil} + rescue + end + + ## + # Shuts down the RingServer + + def shutdown + @renewer.renew = false + + @w_services.each do |thread| + thread.kill + thread.join + end + + @sockets.each do |socket| + socket.close + end + + @r_service.kill + @r_service.join + end + + end + + ## + # RingFinger is used by RingServer clients to discover the RingServer's + # TupleSpace. Typically, all a client needs to do is call + # RingFinger.primary to retrieve the remote TupleSpace, which it can then + # begin using. + # + # To find the first available remote TupleSpace: + # + # Rinda::RingFinger.primary + # + # To create a RingFinger that broadcasts to a custom list: + # + # rf = Rinda::RingFinger.new ['localhost', '192.0.2.1'] + # rf.primary + # + # Rinda::RingFinger also understands multicast addresses and sets them up + # properly. This allows you to run multiple RingServers on the same host: + # + # rf = Rinda::RingFinger.new ['239.0.0.1'] + # rf.primary + # + # You can set the hop count (or TTL) for multicast searches using + # #multicast_hops. + # + # If you use IPv6 multicast you may need to set both an address and the + # outbound interface index: + # + # rf = Rinda::RingFinger.new ['ff02::1'] + # rf.multicast_interface = 1 + # rf.primary + # + # At this time there is no easy way to get an interface index by name. + + class RingFinger + + @@broadcast_list = ['', 'localhost'] + + @@finger = nil + + ## + # Creates a singleton RingFinger and looks for a RingServer. Returns the + # created RingFinger. + + def self.finger + unless @@finger + @@finger = self.new + @@finger.lookup_ring_any + end + @@finger + end + + ## + # Returns the first advertised TupleSpace. + + def self.primary + finger.primary + end + + ## + # Contains all discovered TupleSpaces except for the primary. + + def self.to_a + finger.to_a + end + + ## + # The list of addresses where RingFinger will send query packets. + + attr_accessor :broadcast_list + + ## + # Maximum number of hops for sent multicast packets (if using a multicast + # address in the broadcast list). The default is 1 (same as UDP + # broadcast). + + attr_accessor :multicast_hops + + ## + # The interface index to send IPv6 multicast packets from. + + attr_accessor :multicast_interface + + ## + # The port that RingFinger will send query packets to. + + attr_accessor :port + + ## + # Contain the first advertised TupleSpace after lookup_ring_any is called. + + attr_accessor :primary + + ## + # Creates a new RingFinger that will look for RingServers at +port+ on + # the addresses in +broadcast_list+. + # + # If +broadcast_list+ contains a multicast address then multicast queries + # will be made using the given multicast_hops and multicast_interface. + + def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT) + @broadcast_list = broadcast_list || ['localhost'] + @port = port + @primary = nil + @rings = [] + + @multicast_hops = 1 + @multicast_interface = 0 + end + + ## + # Contains all discovered TupleSpaces except for the primary. + + def to_a + @rings + end + + ## + # Iterates over all discovered TupleSpaces starting with the primary. + + def each + lookup_ring_any unless @primary + return unless @primary + yield(@primary) + @rings.each { |x| yield(x) } + end + + ## + # Looks up RingServers waiting +timeout+ seconds. RingServers will be + # given +block+ as a callback, which will be called with the remote + # TupleSpace. + + def lookup_ring(timeout=5, &block) + return lookup_ring_any(timeout) unless block_given? + + msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) + @broadcast_list.each do |it| + send_message(it, msg) + end + sleep(timeout) + end + + ## + # Returns the first found remote TupleSpace. Any further recovered + # TupleSpaces can be found by calling +to_a+. + + def lookup_ring_any(timeout=5) + queue = Queue.new + + Thread.new do + self.lookup_ring(timeout) do |ts| + queue.push(ts) + end + queue.push(nil) + end + + @primary = queue.pop + raise('RingNotFound') if @primary.nil? + + Thread.new do + while it = queue.pop + @rings.push(it) + end + end + + @primary + end + + ## + # Creates a socket for +address+ with the appropriate multicast options + # for multicast addresses. + + def make_socket(address) # :nodoc: + addrinfo = Addrinfo.udp(address, @port) + + soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + begin + if addrinfo.ipv4_multicast? then + soc.setsockopt(Socket::Option.ipv4_multicast_loop(1)) + soc.setsockopt(Socket::Option.ipv4_multicast_ttl(@multicast_hops)) + elsif addrinfo.ipv6_multicast? then + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS, + [@multicast_hops].pack('I')) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF, + [@multicast_interface].pack('I')) + else + soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true) + end + + soc.connect(addrinfo) + rescue Exception + soc.close + raise + end + + soc + end + + def send_message(address, message) # :nodoc: + soc = make_socket(address) + + soc.send(message, 0) + rescue + nil + ensure + soc.close if soc + end + + end + + ## + # RingProvider uses a RingServer advertised TupleSpace as a name service. + # TupleSpace clients can register themselves with the remote TupleSpace and + # look up other provided services via the remote TupleSpace. + # + # Services are registered with a tuple of the format [:name, klass, + # DRbObject, description]. + + class RingProvider + + ## + # Creates a RingProvider that will provide a +klass+ service running on + # +front+, with a +description+. +renewer+ is optional. + + def initialize(klass, front, desc, renewer = nil) + @tuple = [:name, klass, front, desc] + @renewer = renewer || Rinda::SimpleRenewer.new + end + + ## + # Advertises this service on the primary remote TupleSpace. + + def provide + ts = Rinda::RingFinger.primary + ts.write(@tuple, @renewer) + end + + end + +end diff --git a/jni/ruby/lib/rinda/tuplespace.rb b/jni/ruby/lib/rinda/tuplespace.rb new file mode 100644 index 0000000..11532fd --- /dev/null +++ b/jni/ruby/lib/rinda/tuplespace.rb @@ -0,0 +1,642 @@ +require 'monitor' +require 'thread' +require 'drb/drb' +require 'rinda/rinda' +require 'enumerator' +require 'forwardable' + +module Rinda + + ## + # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) + # together with expiry and cancellation data. + + class TupleEntry + + include DRbUndumped + + attr_accessor :expires + + ## + # Creates a TupleEntry based on +ary+ with an optional renewer or expiry + # time +sec+. + # + # A renewer must implement the +renew+ method which returns a Numeric, + # nil, or true to indicate when the tuple has expired. + + def initialize(ary, sec=nil) + @cancel = false + @expires = nil + @tuple = make_tuple(ary) + @renewer = nil + renew(sec) + end + + ## + # Marks this TupleEntry as canceled. + + def cancel + @cancel = true + end + + ## + # A TupleEntry is dead when it is canceled or expired. + + def alive? + !canceled? && !expired? + end + + ## + # Return the object which makes up the tuple itself: the Array + # or Hash. + + def value; @tuple.value; end + + ## + # Returns the canceled status. + + def canceled?; @cancel; end + + ## + # Has this tuple expired? (true/false). + # + # A tuple has expired when its expiry timer based on the +sec+ argument to + # #initialize runs out. + + def expired? + return true unless @expires + return false if @expires > Time.now + return true if @renewer.nil? + renew(@renewer) + return true unless @expires + return @expires < Time.now + end + + ## + # Reset the expiry time according to +sec_or_renewer+. + # + # +nil+:: it is set to expire in the far future. + # +true+:: it has expired. + # Numeric:: it will expire in that many seconds. + # + # Otherwise the argument refers to some kind of renewer object + # which will reset its expiry time. + + def renew(sec_or_renewer) + sec, @renewer = get_renewer(sec_or_renewer) + @expires = make_expires(sec) + end + + ## + # Returns an expiry Time based on +sec+ which can be one of: + # Numeric:: +sec+ seconds into the future + # +true+:: the expiry time is the start of 1970 (i.e. expired) + # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when + # UNIX clocks will die) + + def make_expires(sec=nil) + case sec + when Numeric + Time.now + sec + when true + Time.at(1) + when nil + Time.at(2**31-1) + end + end + + ## + # Retrieves +key+ from the tuple. + + def [](key) + @tuple[key] + end + + ## + # Fetches +key+ from the tuple. + + def fetch(key) + @tuple.fetch(key) + end + + ## + # The size of the tuple. + + def size + @tuple.size + end + + ## + # Creates a Rinda::Tuple for +ary+. + + def make_tuple(ary) + Rinda::Tuple.new(ary) + end + + private + + ## + # Returns a valid argument to make_expires and the renewer or nil. + # + # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual + # renewer). Otherwise it returns an expiry value from calling +it.renew+ + # and the renewer. + + def get_renewer(it) + case it + when Numeric, true, nil + return it, nil + else + begin + return it.renew, it + rescue Exception + return it, nil + end + end + end + + end + + ## + # A TemplateEntry is a Template together with expiry and cancellation data. + + class TemplateEntry < TupleEntry + ## + # Matches this TemplateEntry against +tuple+. See Template#match for + # details on how a Template matches a Tuple. + + def match(tuple) + @tuple.match(tuple) + end + + alias === match + + def make_tuple(ary) # :nodoc: + Rinda::Template.new(ary) + end + + end + + ## + # Documentation? + + class WaitTemplateEntry < TemplateEntry + + attr_reader :found + + def initialize(place, ary, expires=nil) + super(ary, expires) + @place = place + @cond = place.new_cond + @found = nil + end + + def cancel + super + signal + end + + def wait + @cond.wait + end + + def read(tuple) + @found = tuple + signal + end + + def signal + @place.synchronize do + @cond.signal + end + end + + end + + ## + # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of + # TupleSpace changes. You may receive either your subscribed event or the + # 'close' event when iterating over notifications. + # + # See TupleSpace#notify_event for valid notification types. + # + # == Example + # + # ts = Rinda::TupleSpace.new + # observer = ts.notify 'write', [nil] + # + # Thread.start do + # observer.each { |t| p t } + # end + # + # 3.times { |i| ts.write [i] } + # + # Outputs: + # + # ['write', [0]] + # ['write', [1]] + # ['write', [2]] + + class NotifyTemplateEntry < TemplateEntry + + ## + # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that + # match +tuple+. + + def initialize(place, event, tuple, expires=nil) + ary = [event, Rinda::Template.new(tuple)] + super(ary, expires) + @queue = Queue.new + @done = false + end + + ## + # Called by TupleSpace to notify this NotifyTemplateEntry of a new event. + + def notify(ev) + @queue.push(ev) + end + + ## + # Retrieves a notification. Raises RequestExpiredError when this + # NotifyTemplateEntry expires. + + def pop + raise RequestExpiredError if @done + it = @queue.pop + @done = true if it[0] == 'close' + return it + end + + ## + # Yields event/tuple pairs until this NotifyTemplateEntry expires. + + def each # :yields: event, tuple + while !@done + it = pop + yield(it) + end + rescue + ensure + cancel + end + + end + + ## + # TupleBag is an unordered collection of tuples. It is the basis + # of Tuplespace. + + class TupleBag + class TupleBin + extend Forwardable + def_delegators '@bin', :find_all, :delete_if, :each, :empty? + + def initialize + @bin = [] + end + + def add(tuple) + @bin.push(tuple) + end + + def delete(tuple) + idx = @bin.rindex(tuple) + @bin.delete_at(idx) if idx + end + + def find + @bin.reverse_each do |x| + return x if yield(x) + end + nil + end + end + + def initialize # :nodoc: + @hash = {} + @enum = enum_for(:each_entry) + end + + ## + # +true+ if the TupleBag to see if it has any expired entries. + + def has_expires? + @enum.find do |tuple| + tuple.expires + end + end + + ## + # Add +tuple+ to the TupleBag. + + def push(tuple) + key = bin_key(tuple) + @hash[key] ||= TupleBin.new + @hash[key].add(tuple) + end + + ## + # Removes +tuple+ from the TupleBag. + + def delete(tuple) + key = bin_key(tuple) + bin = @hash[key] + return nil unless bin + bin.delete(tuple) + @hash.delete(key) if bin.empty? + tuple + end + + ## + # Finds all live tuples that match +template+. + def find_all(template) + bin_for_find(template).find_all do |tuple| + tuple.alive? && template.match(tuple) + end + end + + ## + # Finds a live tuple that matches +template+. + + def find(template) + bin_for_find(template).find do |tuple| + tuple.alive? && template.match(tuple) + end + end + + ## + # Finds all tuples in the TupleBag which when treated as templates, match + # +tuple+ and are alive. + + def find_all_template(tuple) + @enum.find_all do |template| + template.alive? && template.match(tuple) + end + end + + ## + # Delete tuples which dead tuples from the TupleBag, returning the deleted + # tuples. + + def delete_unless_alive + deleted = [] + @hash.each do |key, bin| + bin.delete_if do |tuple| + if tuple.alive? + false + else + deleted.push(tuple) + true + end + end + end + deleted + end + + private + def each_entry(&blk) + @hash.each do |k, v| + v.each(&blk) + end + end + + def bin_key(tuple) + head = tuple[0] + if head.class == Symbol + return head + else + false + end + end + + def bin_for_find(template) + key = bin_key(template) + key ? @hash.fetch(key, []) : @enum + end + end + + ## + # The Tuplespace manages access to the tuples it contains, + # ensuring mutual exclusion requirements are met. + # + # The +sec+ option for the write, take, move, read and notify methods may + # either be a number of seconds or a Renewer object. + + class TupleSpace + + include DRbUndumped + include MonitorMixin + + ## + # Creates a new TupleSpace. +period+ is used to control how often to look + # for dead tuples after modifications to the TupleSpace. + # + # If no dead tuples are found +period+ seconds after the last + # modification, the TupleSpace will stop looking for dead tuples. + + def initialize(period=60) + super() + @bag = TupleBag.new + @read_waiter = TupleBag.new + @take_waiter = TupleBag.new + @notify_waiter = TupleBag.new + @period = period + @keeper = nil + end + + ## + # Adds +tuple+ + + def write(tuple, sec=nil) + entry = create_entry(tuple, sec) + synchronize do + if entry.expired? + @read_waiter.find_all_template(entry).each do |template| + template.read(tuple) + end + notify_event('write', entry.value) + notify_event('delete', entry.value) + else + @bag.push(entry) + start_keeper if entry.expires + @read_waiter.find_all_template(entry).each do |template| + template.read(tuple) + end + @take_waiter.find_all_template(entry).each do |template| + template.signal + end + notify_event('write', entry.value) + end + end + entry + end + + ## + # Removes +tuple+ + + def take(tuple, sec=nil, &block) + move(nil, tuple, sec, &block) + end + + ## + # Moves +tuple+ to +port+. + + def move(port, tuple, sec=nil) + template = WaitTemplateEntry.new(self, tuple, sec) + yield(template) if block_given? + synchronize do + entry = @bag.find(template) + if entry + port.push(entry.value) if port + @bag.delete(entry) + notify_event('take', entry.value) + return port ? nil : entry.value + end + raise RequestExpiredError if template.expired? + + begin + @take_waiter.push(template) + start_keeper if template.expires + while true + raise RequestCanceledError if template.canceled? + raise RequestExpiredError if template.expired? + entry = @bag.find(template) + if entry + port.push(entry.value) if port + @bag.delete(entry) + notify_event('take', entry.value) + return port ? nil : entry.value + end + template.wait + end + ensure + @take_waiter.delete(template) + end + end + end + + ## + # Reads +tuple+, but does not remove it. + + def read(tuple, sec=nil) + template = WaitTemplateEntry.new(self, tuple, sec) + yield(template) if block_given? + synchronize do + entry = @bag.find(template) + return entry.value if entry + raise RequestExpiredError if template.expired? + + begin + @read_waiter.push(template) + start_keeper if template.expires + template.wait + raise RequestCanceledError if template.canceled? + raise RequestExpiredError if template.expired? + return template.found + ensure + @read_waiter.delete(template) + end + end + end + + ## + # Returns all tuples matching +tuple+. Does not remove the found tuples. + + def read_all(tuple) + template = WaitTemplateEntry.new(self, tuple, nil) + synchronize do + entry = @bag.find_all(template) + entry.collect do |e| + e.value + end + end + end + + ## + # Registers for notifications of +event+. Returns a NotifyTemplateEntry. + # See NotifyTemplateEntry for examples of how to listen for notifications. + # + # +event+ can be: + # 'write':: A tuple was added + # 'take':: A tuple was taken or moved + # 'delete':: A tuple was lost after being overwritten or expiring + # + # The TupleSpace will also notify you of the 'close' event when the + # NotifyTemplateEntry has expired. + + def notify(event, tuple, sec=nil) + template = NotifyTemplateEntry.new(self, event, tuple, sec) + synchronize do + @notify_waiter.push(template) + end + template + end + + private + + def create_entry(tuple, sec) + TupleEntry.new(tuple, sec) + end + + ## + # Removes dead tuples. + + def keep_clean + synchronize do + @read_waiter.delete_unless_alive.each do |e| + e.signal + end + @take_waiter.delete_unless_alive.each do |e| + e.signal + end + @notify_waiter.delete_unless_alive.each do |e| + e.notify(['close']) + end + @bag.delete_unless_alive.each do |e| + notify_event('delete', e.value) + end + end + end + + ## + # Notifies all registered listeners for +event+ of a status change of + # +tuple+. + + def notify_event(event, tuple) + ev = [event, tuple] + @notify_waiter.find_all_template(ev).each do |template| + template.notify(ev) + end + end + + ## + # Creates a thread that scans the tuplespace for expired tuples. + + def start_keeper + return if @keeper && @keeper.alive? + @keeper = Thread.new do + while true + sleep(@period) + synchronize do + break unless need_keeper? + keep_clean + end + end + end + end + + ## + # Checks the tuplespace to see if it needs cleaning. + + def need_keeper? + return true if @bag.has_expires? + return true if @read_waiter.has_expires? + return true if @take_waiter.has_expires? + return true if @notify_waiter.has_expires? + end + + end + +end + -- cgit v1.2.3