summaryrefslogtreecommitdiff
path: root/jni/ruby/lib/rinda
diff options
context:
space:
mode:
authorJari Vetoniemi <jari.vetoniemi@indooratlas.com>2020-03-16 18:49:26 +0900
committerJari Vetoniemi <jari.vetoniemi@indooratlas.com>2020-03-30 00:39:06 +0900
commitfcbf63e62c627deae76c1b8cb8c0876c536ed811 (patch)
tree64cb17de3f41a2b6fef2368028fbd00349946994 /jni/ruby/lib/rinda
Fresh start
Diffstat (limited to 'jni/ruby/lib/rinda')
-rw-r--r--jni/ruby/lib/rinda/rinda.rb327
-rw-r--r--jni/ruby/lib/rinda/ring.rb480
-rw-r--r--jni/ruby/lib/rinda/tuplespace.rb642
3 files changed, 1449 insertions, 0 deletions
diff --git a/jni/ruby/lib/rinda/rinda.rb b/jni/ruby/lib/rinda/rinda.rb
new file mode 100644
index 0000000..d9cd378
--- /dev/null
+++ b/jni/ruby/lib/rinda/rinda.rb
@@ -0,0 +1,327 @@
+require 'drb/drb'
+require 'thread'
+
+##
+# A module to implement the Linda distributed computing paradigm in Ruby.
+#
+# Rinda is part of DRb (dRuby).
+#
+# == Example(s)
+#
+# See the sample/drb/ directory in the Ruby distribution, from 1.8.2 onwards.
+#
+#--
+# TODO
+# == Introduction to Linda/rinda?
+#
+# == Why is this library separate from DRb?
+
+module Rinda
+
+ ##
+ # Rinda error base class
+
+ class RindaError < RuntimeError; end
+
+ ##
+ # Raised when a hash-based tuple has an invalid key.
+
+ class InvalidHashTupleKey < RindaError; end
+
+ ##
+ # Raised when trying to use a canceled tuple.
+
+ class RequestCanceledError < ThreadError; end
+
+ ##
+ # Raised when trying to use an expired tuple.
+
+ class RequestExpiredError < ThreadError; end
+
+ ##
+ # A tuple is the elementary object in Rinda programming.
+ # Tuples may be matched against templates if the tuple and
+ # the template are the same size.
+
+ class Tuple
+
+ ##
+ # Creates a new Tuple from +ary_or_hash+ which must be an Array or Hash.
+
+ def initialize(ary_or_hash)
+ if hash?(ary_or_hash)
+ init_with_hash(ary_or_hash)
+ else
+ init_with_ary(ary_or_hash)
+ end
+ end
+
+ ##
+ # The number of elements in the tuple.
+
+ def size
+ @tuple.size
+ end
+
+ ##
+ # Accessor method for elements of the tuple.
+
+ def [](k)
+ @tuple[k]
+ end
+
+ ##
+ # Fetches item +k+ from the tuple.
+
+ def fetch(k)
+ @tuple.fetch(k)
+ end
+
+ ##
+ # Iterate through the tuple, yielding the index or key, and the
+ # value, thus ensuring arrays are iterated similarly to hashes.
+
+ def each # FIXME
+ if Hash === @tuple
+ @tuple.each { |k, v| yield(k, v) }
+ else
+ @tuple.each_with_index { |v, k| yield(k, v) }
+ end
+ end
+
+ ##
+ # Return the tuple itself
+ def value
+ @tuple
+ end
+
+ private
+
+ def hash?(ary_or_hash)
+ ary_or_hash.respond_to?(:keys)
+ end
+
+ ##
+ # Munges +ary+ into a valid Tuple.
+
+ def init_with_ary(ary)
+ @tuple = Array.new(ary.size)
+ @tuple.size.times do |i|
+ @tuple[i] = ary[i]
+ end
+ end
+
+ ##
+ # Ensures +hash+ is a valid Tuple.
+
+ def init_with_hash(hash)
+ @tuple = Hash.new
+ hash.each do |k, v|
+ raise InvalidHashTupleKey unless String === k
+ @tuple[k] = v
+ end
+ end
+
+ end
+
+ ##
+ # Templates are used to match tuples in Rinda.
+
+ class Template < Tuple
+
+ ##
+ # Matches this template against +tuple+. The +tuple+ must be the same
+ # size as the template. An element with a +nil+ value in a template acts
+ # as a wildcard, matching any value in the corresponding position in the
+ # tuple. Elements of the template match the +tuple+ if the are #== or
+ # #===.
+ #
+ # Template.new([:foo, 5]).match Tuple.new([:foo, 5]) # => true
+ # Template.new([:foo, nil]).match Tuple.new([:foo, 5]) # => true
+ # Template.new([String]).match Tuple.new(['hello']) # => true
+ #
+ # Template.new([:foo]).match Tuple.new([:foo, 5]) # => false
+ # Template.new([:foo, 6]).match Tuple.new([:foo, 5]) # => false
+ # Template.new([:foo, nil]).match Tuple.new([:foo]) # => false
+ # Template.new([:foo, 6]).match Tuple.new([:foo]) # => false
+
+ def match(tuple)
+ return false unless tuple.respond_to?(:size)
+ return false unless tuple.respond_to?(:fetch)
+ return false unless self.size == tuple.size
+ each do |k, v|
+ begin
+ it = tuple.fetch(k)
+ rescue
+ return false
+ end
+ next if v.nil?
+ next if v == it
+ next if v === it
+ return false
+ end
+ return true
+ end
+
+ ##
+ # Alias for #match.
+
+ def ===(tuple)
+ match(tuple)
+ end
+
+ end
+
+ ##
+ # <i>Documentation?</i>
+
+ class DRbObjectTemplate
+
+ ##
+ # Creates a new DRbObjectTemplate that will match against +uri+ and +ref+.
+
+ def initialize(uri=nil, ref=nil)
+ @drb_uri = uri
+ @drb_ref = ref
+ end
+
+ ##
+ # This DRbObjectTemplate matches +ro+ if the remote object's drburi and
+ # drbref are the same. +nil+ is used as a wildcard.
+
+ def ===(ro)
+ return true if super(ro)
+ unless @drb_uri.nil?
+ return false unless (@drb_uri === ro.__drburi rescue false)
+ end
+ unless @drb_ref.nil?
+ return false unless (@drb_ref === ro.__drbref rescue false)
+ end
+ true
+ end
+
+ end
+
+ ##
+ # TupleSpaceProxy allows a remote Tuplespace to appear as local.
+
+ class TupleSpaceProxy
+ ##
+ # A Port ensures that a moved tuple arrives properly at its destination
+ # and does not get lost.
+ #
+ # See https://bugs.ruby-lang.org/issues/8125
+
+ class Port # :nodoc:
+ attr_reader :value
+
+ def self.deliver
+ port = new
+
+ begin
+ yield(port)
+ ensure
+ port.close
+ end
+
+ port.value
+ end
+
+ def initialize
+ @open = true
+ @value = nil
+ end
+
+ ##
+ # Don't let the DRb thread push to it when remote sends tuple
+
+ def close
+ @open = false
+ end
+
+ ##
+ # Stores +value+ and ensure it does not get marshaled multiple times.
+
+ def push value
+ raise 'port closed' unless @open
+
+ @value = value
+
+ nil # avoid Marshal
+ end
+ end
+
+ ##
+ # Creates a new TupleSpaceProxy to wrap +ts+.
+
+ def initialize(ts)
+ @ts = ts
+ end
+
+ ##
+ # Adds +tuple+ to the proxied TupleSpace. See TupleSpace#write.
+
+ def write(tuple, sec=nil)
+ @ts.write(tuple, sec)
+ end
+
+ ##
+ # Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take.
+
+ def take(tuple, sec=nil, &block)
+ Port.deliver do |port|
+ @ts.move(DRbObject.new(port), tuple, sec, &block)
+ end
+ end
+
+ ##
+ # Reads +tuple+ from the proxied TupleSpace. See TupleSpace#read.
+
+ def read(tuple, sec=nil, &block)
+ @ts.read(tuple, sec, &block)
+ end
+
+ ##
+ # Reads all tuples matching +tuple+ from the proxied TupleSpace. See
+ # TupleSpace#read_all.
+
+ def read_all(tuple)
+ @ts.read_all(tuple)
+ end
+
+ ##
+ # Registers for notifications of event +ev+ on the proxied TupleSpace.
+ # See TupleSpace#notify
+
+ def notify(ev, tuple, sec=nil)
+ @ts.notify(ev, tuple, sec)
+ end
+
+ end
+
+ ##
+ # An SimpleRenewer allows a TupleSpace to check if a TupleEntry is still
+ # alive.
+
+ class SimpleRenewer
+
+ include DRbUndumped
+
+ ##
+ # Creates a new SimpleRenewer that keeps an object alive for another +sec+
+ # seconds.
+
+ def initialize(sec=180)
+ @sec = sec
+ end
+
+ ##
+ # Called by the TupleSpace to check if the object is still alive.
+
+ def renew
+ @sec
+ end
+ end
+
+end
+
diff --git a/jni/ruby/lib/rinda/ring.rb b/jni/ruby/lib/rinda/ring.rb
new file mode 100644
index 0000000..fe33420
--- /dev/null
+++ b/jni/ruby/lib/rinda/ring.rb
@@ -0,0 +1,480 @@
+#
+# Note: Rinda::Ring API is unstable.
+#
+require 'drb/drb'
+require 'rinda/rinda'
+require 'thread'
+require 'ipaddr'
+
+module Rinda
+
+ ##
+ # The default port Ring discovery will use.
+
+ Ring_PORT = 7647
+
+ ##
+ # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts.
+ # Default service location uses the following steps:
+ #
+ # 1. A RingServer begins listening on the network broadcast UDP address.
+ # 2. A RingFinger sends a UDP packet containing the DRb URI where it will
+ # listen for a reply.
+ # 3. The RingServer receives the UDP packet and connects back to the
+ # provided DRb URI with the DRb service.
+ #
+ # A RingServer requires a TupleSpace:
+ #
+ # ts = Rinda::TupleSpace.new
+ # rs = Rinda::RingServer.new
+ #
+ # RingServer can also listen on multicast addresses for announcements. This
+ # allows multiple RingServers to run on the same host. To use network
+ # broadcast and multicast:
+ #
+ # ts = Rinda::TupleSpace.new
+ # rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]
+
+ class RingServer
+
+ include DRbUndumped
+
+ ##
+ # Special renewer for the RingServer to allow shutdown
+
+ class Renewer # :nodoc:
+ include DRbUndumped
+
+ ##
+ # Set to false to shutdown future requests using this Renewer
+
+ attr_writer :renew
+
+ def initialize # :nodoc:
+ @renew = true
+ end
+
+ def renew # :nodoc:
+ @renew ? 1 : true
+ end
+ end
+
+ ##
+ # Advertises +ts+ on the given +addresses+ at +port+.
+ #
+ # If +addresses+ is omitted only the UDP broadcast address is used.
+ #
+ # +addresses+ can contain multiple addresses. If a multicast address is
+ # given in +addresses+ then the RingServer will listen for multicast
+ # queries.
+ #
+ # If you use IPv4 multicast you may need to set an address of the inbound
+ # interface which joins a multicast group.
+ #
+ # ts = Rinda::TupleSpace.new
+ # rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])
+ #
+ # You can set addresses as an Array Object. The first element of the
+ # Array is a multicast address and the second is an inbound interface
+ # address. If the second is omitted then '0.0.0.0' is used.
+ #
+ # If you use IPv6 multicast you may need to set both the local interface
+ # address and the inbound interface index:
+ #
+ # rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])
+ #
+ # The first element is a multicast address and the second is an inbound
+ # interface address. The third is an inbound interface index.
+ #
+ # At this time there is no easy way to get an interface index by name.
+ #
+ # If the second is omitted then '::1' is used.
+ # If the third is omitted then 0 (default interface) is used.
+
+ def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
+ @port = port
+
+ if Integer === addresses then
+ addresses, @port = [Socket::INADDR_ANY], addresses
+ end
+
+ @renewer = Renewer.new
+
+ @ts = ts
+ @sockets = []
+ addresses.each do |address|
+ if Array === address
+ make_socket(*address)
+ else
+ make_socket(address)
+ end
+ end
+
+ @w_services = write_services
+ @r_service = reply_service
+ end
+
+ ##
+ # Creates a socket at +address+
+ #
+ # If +address+ is multicast address then +interface_address+ and
+ # +multicast_interface+ can be set as optional.
+ #
+ # A created socket is bound to +interface_address+. If you use IPv4
+ # multicast then the interface of +interface_address+ is used as the
+ # inbound interface. If +interface_address+ is omitted or nil then
+ # '0.0.0.0' or '::1' is used.
+ #
+ # If you use IPv6 multicast then +multicast_interface+ is used as the
+ # inbound interface. +multicast_interface+ is a network interface index.
+ # If +multicast_interface+ is omitted then 0 (default interface) is used.
+
+ def make_socket(address, interface_address=nil, multicast_interface=0)
+ addrinfo = Addrinfo.udp(address, @port)
+
+ socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
+ addrinfo.protocol)
+ @sockets << socket
+
+ if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
+ if Socket.const_defined?(:SO_REUSEPORT) then
+ socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
+ else
+ socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
+ end
+
+ if addrinfo.ipv4_multicast? then
+ interface_address = '0.0.0.0' if interface_address.nil?
+ socket.bind(Addrinfo.udp(interface_address, @port))
+
+ mreq = IPAddr.new(addrinfo.ip_address).hton +
+ IPAddr.new(interface_address).hton
+
+ socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
+ else
+ interface_address = '::1' if interface_address.nil?
+ socket.bind(Addrinfo.udp(interface_address, @port))
+
+ mreq = IPAddr.new(addrinfo.ip_address).hton +
+ [multicast_interface].pack('I')
+
+ socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
+ end
+ else
+ socket.bind(addrinfo)
+ end
+
+ socket
+ end
+
+ ##
+ # Creates threads that pick up UDP packets and passes them to do_write for
+ # decoding.
+
+ def write_services
+ @sockets.map do |s|
+ Thread.new(s) do |socket|
+ loop do
+ msg = socket.recv(1024)
+ do_write(msg)
+ end
+ end
+ end
+ end
+
+ ##
+ # Extracts the response URI from +msg+ and adds it to TupleSpace where it
+ # will be picked up by +reply_service+ for notification.
+
+ def do_write(msg)
+ Thread.new do
+ begin
+ tuple, sec = Marshal.load(msg)
+ @ts.write(tuple, sec)
+ rescue
+ end
+ end
+ end
+
+ ##
+ # Creates a thread that notifies waiting clients from the TupleSpace.
+
+ def reply_service
+ Thread.new do
+ loop do
+ do_reply
+ end
+ end
+ end
+
+ ##
+ # Pulls lookup tuples out of the TupleSpace and sends their DRb object the
+ # address of the local TupleSpace.
+
+ def do_reply
+ tuple = @ts.take([:lookup_ring, nil], @renewer)
+ Thread.new { tuple[1].call(@ts) rescue nil}
+ rescue
+ end
+
+ ##
+ # Shuts down the RingServer
+
+ def shutdown
+ @renewer.renew = false
+
+ @w_services.each do |thread|
+ thread.kill
+ thread.join
+ end
+
+ @sockets.each do |socket|
+ socket.close
+ end
+
+ @r_service.kill
+ @r_service.join
+ end
+
+ end
+
+ ##
+ # RingFinger is used by RingServer clients to discover the RingServer's
+ # TupleSpace. Typically, all a client needs to do is call
+ # RingFinger.primary to retrieve the remote TupleSpace, which it can then
+ # begin using.
+ #
+ # To find the first available remote TupleSpace:
+ #
+ # Rinda::RingFinger.primary
+ #
+ # To create a RingFinger that broadcasts to a custom list:
+ #
+ # rf = Rinda::RingFinger.new ['localhost', '192.0.2.1']
+ # rf.primary
+ #
+ # Rinda::RingFinger also understands multicast addresses and sets them up
+ # properly. This allows you to run multiple RingServers on the same host:
+ #
+ # rf = Rinda::RingFinger.new ['239.0.0.1']
+ # rf.primary
+ #
+ # You can set the hop count (or TTL) for multicast searches using
+ # #multicast_hops.
+ #
+ # If you use IPv6 multicast you may need to set both an address and the
+ # outbound interface index:
+ #
+ # rf = Rinda::RingFinger.new ['ff02::1']
+ # rf.multicast_interface = 1
+ # rf.primary
+ #
+ # At this time there is no easy way to get an interface index by name.
+
+ class RingFinger
+
+ @@broadcast_list = ['<broadcast>', 'localhost']
+
+ @@finger = nil
+
+ ##
+ # Creates a singleton RingFinger and looks for a RingServer. Returns the
+ # created RingFinger.
+
+ def self.finger
+ unless @@finger
+ @@finger = self.new
+ @@finger.lookup_ring_any
+ end
+ @@finger
+ end
+
+ ##
+ # Returns the first advertised TupleSpace.
+
+ def self.primary
+ finger.primary
+ end
+
+ ##
+ # Contains all discovered TupleSpaces except for the primary.
+
+ def self.to_a
+ finger.to_a
+ end
+
+ ##
+ # The list of addresses where RingFinger will send query packets.
+
+ attr_accessor :broadcast_list
+
+ ##
+ # Maximum number of hops for sent multicast packets (if using a multicast
+ # address in the broadcast list). The default is 1 (same as UDP
+ # broadcast).
+
+ attr_accessor :multicast_hops
+
+ ##
+ # The interface index to send IPv6 multicast packets from.
+
+ attr_accessor :multicast_interface
+
+ ##
+ # The port that RingFinger will send query packets to.
+
+ attr_accessor :port
+
+ ##
+ # Contain the first advertised TupleSpace after lookup_ring_any is called.
+
+ attr_accessor :primary
+
+ ##
+ # Creates a new RingFinger that will look for RingServers at +port+ on
+ # the addresses in +broadcast_list+.
+ #
+ # If +broadcast_list+ contains a multicast address then multicast queries
+ # will be made using the given multicast_hops and multicast_interface.
+
+ def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
+ @broadcast_list = broadcast_list || ['localhost']
+ @port = port
+ @primary = nil
+ @rings = []
+
+ @multicast_hops = 1
+ @multicast_interface = 0
+ end
+
+ ##
+ # Contains all discovered TupleSpaces except for the primary.
+
+ def to_a
+ @rings
+ end
+
+ ##
+ # Iterates over all discovered TupleSpaces starting with the primary.
+
+ def each
+ lookup_ring_any unless @primary
+ return unless @primary
+ yield(@primary)
+ @rings.each { |x| yield(x) }
+ end
+
+ ##
+ # Looks up RingServers waiting +timeout+ seconds. RingServers will be
+ # given +block+ as a callback, which will be called with the remote
+ # TupleSpace.
+
+ def lookup_ring(timeout=5, &block)
+ return lookup_ring_any(timeout) unless block_given?
+
+ msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
+ @broadcast_list.each do |it|
+ send_message(it, msg)
+ end
+ sleep(timeout)
+ end
+
+ ##
+ # Returns the first found remote TupleSpace. Any further recovered
+ # TupleSpaces can be found by calling +to_a+.
+
+ def lookup_ring_any(timeout=5)
+ queue = Queue.new
+
+ Thread.new do
+ self.lookup_ring(timeout) do |ts|
+ queue.push(ts)
+ end
+ queue.push(nil)
+ end
+
+ @primary = queue.pop
+ raise('RingNotFound') if @primary.nil?
+
+ Thread.new do
+ while it = queue.pop
+ @rings.push(it)
+ end
+ end
+
+ @primary
+ end
+
+ ##
+ # Creates a socket for +address+ with the appropriate multicast options
+ # for multicast addresses.
+
+ def make_socket(address) # :nodoc:
+ addrinfo = Addrinfo.udp(address, @port)
+
+ soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
+ begin
+ if addrinfo.ipv4_multicast? then
+ soc.setsockopt(Socket::Option.ipv4_multicast_loop(1))
+ soc.setsockopt(Socket::Option.ipv4_multicast_ttl(@multicast_hops))
+ elsif addrinfo.ipv6_multicast? then
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
+ [@multicast_hops].pack('I'))
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
+ [@multicast_interface].pack('I'))
+ else
+ soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
+ end
+
+ soc.connect(addrinfo)
+ rescue Exception
+ soc.close
+ raise
+ end
+
+ soc
+ end
+
+ def send_message(address, message) # :nodoc:
+ soc = make_socket(address)
+
+ soc.send(message, 0)
+ rescue
+ nil
+ ensure
+ soc.close if soc
+ end
+
+ end
+
+ ##
+ # RingProvider uses a RingServer advertised TupleSpace as a name service.
+ # TupleSpace clients can register themselves with the remote TupleSpace and
+ # look up other provided services via the remote TupleSpace.
+ #
+ # Services are registered with a tuple of the format [:name, klass,
+ # DRbObject, description].
+
+ class RingProvider
+
+ ##
+ # Creates a RingProvider that will provide a +klass+ service running on
+ # +front+, with a +description+. +renewer+ is optional.
+
+ def initialize(klass, front, desc, renewer = nil)
+ @tuple = [:name, klass, front, desc]
+ @renewer = renewer || Rinda::SimpleRenewer.new
+ end
+
+ ##
+ # Advertises this service on the primary remote TupleSpace.
+
+ def provide
+ ts = Rinda::RingFinger.primary
+ ts.write(@tuple, @renewer)
+ end
+
+ end
+
+end
diff --git a/jni/ruby/lib/rinda/tuplespace.rb b/jni/ruby/lib/rinda/tuplespace.rb
new file mode 100644
index 0000000..11532fd
--- /dev/null
+++ b/jni/ruby/lib/rinda/tuplespace.rb
@@ -0,0 +1,642 @@
+require 'monitor'
+require 'thread'
+require 'drb/drb'
+require 'rinda/rinda'
+require 'enumerator'
+require 'forwardable'
+
+module Rinda
+
+ ##
+ # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
+ # together with expiry and cancellation data.
+
+ class TupleEntry
+
+ include DRbUndumped
+
+ attr_accessor :expires
+
+ ##
+ # Creates a TupleEntry based on +ary+ with an optional renewer or expiry
+ # time +sec+.
+ #
+ # A renewer must implement the +renew+ method which returns a Numeric,
+ # nil, or true to indicate when the tuple has expired.
+
+ def initialize(ary, sec=nil)
+ @cancel = false
+ @expires = nil
+ @tuple = make_tuple(ary)
+ @renewer = nil
+ renew(sec)
+ end
+
+ ##
+ # Marks this TupleEntry as canceled.
+
+ def cancel
+ @cancel = true
+ end
+
+ ##
+ # A TupleEntry is dead when it is canceled or expired.
+
+ def alive?
+ !canceled? && !expired?
+ end
+
+ ##
+ # Return the object which makes up the tuple itself: the Array
+ # or Hash.
+
+ def value; @tuple.value; end
+
+ ##
+ # Returns the canceled status.
+
+ def canceled?; @cancel; end
+
+ ##
+ # Has this tuple expired? (true/false).
+ #
+ # A tuple has expired when its expiry timer based on the +sec+ argument to
+ # #initialize runs out.
+
+ def expired?
+ return true unless @expires
+ return false if @expires > Time.now
+ return true if @renewer.nil?
+ renew(@renewer)
+ return true unless @expires
+ return @expires < Time.now
+ end
+
+ ##
+ # Reset the expiry time according to +sec_or_renewer+.
+ #
+ # +nil+:: it is set to expire in the far future.
+ # +true+:: it has expired.
+ # Numeric:: it will expire in that many seconds.
+ #
+ # Otherwise the argument refers to some kind of renewer object
+ # which will reset its expiry time.
+
+ def renew(sec_or_renewer)
+ sec, @renewer = get_renewer(sec_or_renewer)
+ @expires = make_expires(sec)
+ end
+
+ ##
+ # Returns an expiry Time based on +sec+ which can be one of:
+ # Numeric:: +sec+ seconds into the future
+ # +true+:: the expiry time is the start of 1970 (i.e. expired)
+ # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
+ # UNIX clocks will die)
+
+ def make_expires(sec=nil)
+ case sec
+ when Numeric
+ Time.now + sec
+ when true
+ Time.at(1)
+ when nil
+ Time.at(2**31-1)
+ end
+ end
+
+ ##
+ # Retrieves +key+ from the tuple.
+
+ def [](key)
+ @tuple[key]
+ end
+
+ ##
+ # Fetches +key+ from the tuple.
+
+ def fetch(key)
+ @tuple.fetch(key)
+ end
+
+ ##
+ # The size of the tuple.
+
+ def size
+ @tuple.size
+ end
+
+ ##
+ # Creates a Rinda::Tuple for +ary+.
+
+ def make_tuple(ary)
+ Rinda::Tuple.new(ary)
+ end
+
+ private
+
+ ##
+ # Returns a valid argument to make_expires and the renewer or nil.
+ #
+ # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
+ # renewer). Otherwise it returns an expiry value from calling +it.renew+
+ # and the renewer.
+
+ def get_renewer(it)
+ case it
+ when Numeric, true, nil
+ return it, nil
+ else
+ begin
+ return it.renew, it
+ rescue Exception
+ return it, nil
+ end
+ end
+ end
+
+ end
+
+ ##
+ # A TemplateEntry is a Template together with expiry and cancellation data.
+
+ class TemplateEntry < TupleEntry
+ ##
+ # Matches this TemplateEntry against +tuple+. See Template#match for
+ # details on how a Template matches a Tuple.
+
+ def match(tuple)
+ @tuple.match(tuple)
+ end
+
+ alias === match
+
+ def make_tuple(ary) # :nodoc:
+ Rinda::Template.new(ary)
+ end
+
+ end
+
+ ##
+ # <i>Documentation?</i>
+
+ class WaitTemplateEntry < TemplateEntry
+
+ attr_reader :found
+
+ def initialize(place, ary, expires=nil)
+ super(ary, expires)
+ @place = place
+ @cond = place.new_cond
+ @found = nil
+ end
+
+ def cancel
+ super
+ signal
+ end
+
+ def wait
+ @cond.wait
+ end
+
+ def read(tuple)
+ @found = tuple
+ signal
+ end
+
+ def signal
+ @place.synchronize do
+ @cond.signal
+ end
+ end
+
+ end
+
+ ##
+ # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
+ # TupleSpace changes. You may receive either your subscribed event or the
+ # 'close' event when iterating over notifications.
+ #
+ # See TupleSpace#notify_event for valid notification types.
+ #
+ # == Example
+ #
+ # ts = Rinda::TupleSpace.new
+ # observer = ts.notify 'write', [nil]
+ #
+ # Thread.start do
+ # observer.each { |t| p t }
+ # end
+ #
+ # 3.times { |i| ts.write [i] }
+ #
+ # Outputs:
+ #
+ # ['write', [0]]
+ # ['write', [1]]
+ # ['write', [2]]
+
+ class NotifyTemplateEntry < TemplateEntry
+
+ ##
+ # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
+ # match +tuple+.
+
+ def initialize(place, event, tuple, expires=nil)
+ ary = [event, Rinda::Template.new(tuple)]
+ super(ary, expires)
+ @queue = Queue.new
+ @done = false
+ end
+
+ ##
+ # Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
+
+ def notify(ev)
+ @queue.push(ev)
+ end
+
+ ##
+ # Retrieves a notification. Raises RequestExpiredError when this
+ # NotifyTemplateEntry expires.
+
+ def pop
+ raise RequestExpiredError if @done
+ it = @queue.pop
+ @done = true if it[0] == 'close'
+ return it
+ end
+
+ ##
+ # Yields event/tuple pairs until this NotifyTemplateEntry expires.
+
+ def each # :yields: event, tuple
+ while !@done
+ it = pop
+ yield(it)
+ end
+ rescue
+ ensure
+ cancel
+ end
+
+ end
+
+ ##
+ # TupleBag is an unordered collection of tuples. It is the basis
+ # of Tuplespace.
+
+ class TupleBag
+ class TupleBin
+ extend Forwardable
+ def_delegators '@bin', :find_all, :delete_if, :each, :empty?
+
+ def initialize
+ @bin = []
+ end
+
+ def add(tuple)
+ @bin.push(tuple)
+ end
+
+ def delete(tuple)
+ idx = @bin.rindex(tuple)
+ @bin.delete_at(idx) if idx
+ end
+
+ def find
+ @bin.reverse_each do |x|
+ return x if yield(x)
+ end
+ nil
+ end
+ end
+
+ def initialize # :nodoc:
+ @hash = {}
+ @enum = enum_for(:each_entry)
+ end
+
+ ##
+ # +true+ if the TupleBag to see if it has any expired entries.
+
+ def has_expires?
+ @enum.find do |tuple|
+ tuple.expires
+ end
+ end
+
+ ##
+ # Add +tuple+ to the TupleBag.
+
+ def push(tuple)
+ key = bin_key(tuple)
+ @hash[key] ||= TupleBin.new
+ @hash[key].add(tuple)
+ end
+
+ ##
+ # Removes +tuple+ from the TupleBag.
+
+ def delete(tuple)
+ key = bin_key(tuple)
+ bin = @hash[key]
+ return nil unless bin
+ bin.delete(tuple)
+ @hash.delete(key) if bin.empty?
+ tuple
+ end
+
+ ##
+ # Finds all live tuples that match +template+.
+ def find_all(template)
+ bin_for_find(template).find_all do |tuple|
+ tuple.alive? && template.match(tuple)
+ end
+ end
+
+ ##
+ # Finds a live tuple that matches +template+.
+
+ def find(template)
+ bin_for_find(template).find do |tuple|
+ tuple.alive? && template.match(tuple)
+ end
+ end
+
+ ##
+ # Finds all tuples in the TupleBag which when treated as templates, match
+ # +tuple+ and are alive.
+
+ def find_all_template(tuple)
+ @enum.find_all do |template|
+ template.alive? && template.match(tuple)
+ end
+ end
+
+ ##
+ # Delete tuples which dead tuples from the TupleBag, returning the deleted
+ # tuples.
+
+ def delete_unless_alive
+ deleted = []
+ @hash.each do |key, bin|
+ bin.delete_if do |tuple|
+ if tuple.alive?
+ false
+ else
+ deleted.push(tuple)
+ true
+ end
+ end
+ end
+ deleted
+ end
+
+ private
+ def each_entry(&blk)
+ @hash.each do |k, v|
+ v.each(&blk)
+ end
+ end
+
+ def bin_key(tuple)
+ head = tuple[0]
+ if head.class == Symbol
+ return head
+ else
+ false
+ end
+ end
+
+ def bin_for_find(template)
+ key = bin_key(template)
+ key ? @hash.fetch(key, []) : @enum
+ end
+ end
+
+ ##
+ # The Tuplespace manages access to the tuples it contains,
+ # ensuring mutual exclusion requirements are met.
+ #
+ # The +sec+ option for the write, take, move, read and notify methods may
+ # either be a number of seconds or a Renewer object.
+
+ class TupleSpace
+
+ include DRbUndumped
+ include MonitorMixin
+
+ ##
+ # Creates a new TupleSpace. +period+ is used to control how often to look
+ # for dead tuples after modifications to the TupleSpace.
+ #
+ # If no dead tuples are found +period+ seconds after the last
+ # modification, the TupleSpace will stop looking for dead tuples.
+
+ def initialize(period=60)
+ super()
+ @bag = TupleBag.new
+ @read_waiter = TupleBag.new
+ @take_waiter = TupleBag.new
+ @notify_waiter = TupleBag.new
+ @period = period
+ @keeper = nil
+ end
+
+ ##
+ # Adds +tuple+
+
+ def write(tuple, sec=nil)
+ entry = create_entry(tuple, sec)
+ synchronize do
+ if entry.expired?
+ @read_waiter.find_all_template(entry).each do |template|
+ template.read(tuple)
+ end
+ notify_event('write', entry.value)
+ notify_event('delete', entry.value)
+ else
+ @bag.push(entry)
+ start_keeper if entry.expires
+ @read_waiter.find_all_template(entry).each do |template|
+ template.read(tuple)
+ end
+ @take_waiter.find_all_template(entry).each do |template|
+ template.signal
+ end
+ notify_event('write', entry.value)
+ end
+ end
+ entry
+ end
+
+ ##
+ # Removes +tuple+
+
+ def take(tuple, sec=nil, &block)
+ move(nil, tuple, sec, &block)
+ end
+
+ ##
+ # Moves +tuple+ to +port+.
+
+ def move(port, tuple, sec=nil)
+ template = WaitTemplateEntry.new(self, tuple, sec)
+ yield(template) if block_given?
+ synchronize do
+ entry = @bag.find(template)
+ if entry
+ port.push(entry.value) if port
+ @bag.delete(entry)
+ notify_event('take', entry.value)
+ return port ? nil : entry.value
+ end
+ raise RequestExpiredError if template.expired?
+
+ begin
+ @take_waiter.push(template)
+ start_keeper if template.expires
+ while true
+ raise RequestCanceledError if template.canceled?
+ raise RequestExpiredError if template.expired?
+ entry = @bag.find(template)
+ if entry
+ port.push(entry.value) if port
+ @bag.delete(entry)
+ notify_event('take', entry.value)
+ return port ? nil : entry.value
+ end
+ template.wait
+ end
+ ensure
+ @take_waiter.delete(template)
+ end
+ end
+ end
+
+ ##
+ # Reads +tuple+, but does not remove it.
+
+ def read(tuple, sec=nil)
+ template = WaitTemplateEntry.new(self, tuple, sec)
+ yield(template) if block_given?
+ synchronize do
+ entry = @bag.find(template)
+ return entry.value if entry
+ raise RequestExpiredError if template.expired?
+
+ begin
+ @read_waiter.push(template)
+ start_keeper if template.expires
+ template.wait
+ raise RequestCanceledError if template.canceled?
+ raise RequestExpiredError if template.expired?
+ return template.found
+ ensure
+ @read_waiter.delete(template)
+ end
+ end
+ end
+
+ ##
+ # Returns all tuples matching +tuple+. Does not remove the found tuples.
+
+ def read_all(tuple)
+ template = WaitTemplateEntry.new(self, tuple, nil)
+ synchronize do
+ entry = @bag.find_all(template)
+ entry.collect do |e|
+ e.value
+ end
+ end
+ end
+
+ ##
+ # Registers for notifications of +event+. Returns a NotifyTemplateEntry.
+ # See NotifyTemplateEntry for examples of how to listen for notifications.
+ #
+ # +event+ can be:
+ # 'write':: A tuple was added
+ # 'take':: A tuple was taken or moved
+ # 'delete':: A tuple was lost after being overwritten or expiring
+ #
+ # The TupleSpace will also notify you of the 'close' event when the
+ # NotifyTemplateEntry has expired.
+
+ def notify(event, tuple, sec=nil)
+ template = NotifyTemplateEntry.new(self, event, tuple, sec)
+ synchronize do
+ @notify_waiter.push(template)
+ end
+ template
+ end
+
+ private
+
+ def create_entry(tuple, sec)
+ TupleEntry.new(tuple, sec)
+ end
+
+ ##
+ # Removes dead tuples.
+
+ def keep_clean
+ synchronize do
+ @read_waiter.delete_unless_alive.each do |e|
+ e.signal
+ end
+ @take_waiter.delete_unless_alive.each do |e|
+ e.signal
+ end
+ @notify_waiter.delete_unless_alive.each do |e|
+ e.notify(['close'])
+ end
+ @bag.delete_unless_alive.each do |e|
+ notify_event('delete', e.value)
+ end
+ end
+ end
+
+ ##
+ # Notifies all registered listeners for +event+ of a status change of
+ # +tuple+.
+
+ def notify_event(event, tuple)
+ ev = [event, tuple]
+ @notify_waiter.find_all_template(ev).each do |template|
+ template.notify(ev)
+ end
+ end
+
+ ##
+ # Creates a thread that scans the tuplespace for expired tuples.
+
+ def start_keeper
+ return if @keeper && @keeper.alive?
+ @keeper = Thread.new do
+ while true
+ sleep(@period)
+ synchronize do
+ break unless need_keeper?
+ keep_clean
+ end
+ end
+ end
+ end
+
+ ##
+ # Checks the tuplespace to see if it needs cleaning.
+
+ def need_keeper?
+ return true if @bag.has_expires?
+ return true if @read_waiter.has_expires?
+ return true if @take_waiter.has_expires?
+ return true if @notify_waiter.has_expires?
+ end
+
+ end
+
+end
+