summaryrefslogtreecommitdiff
path: root/jni/ruby/lib/rinda/ring.rb
diff options
context:
space:
mode:
Diffstat (limited to 'jni/ruby/lib/rinda/ring.rb')
-rw-r--r--jni/ruby/lib/rinda/ring.rb480
1 files changed, 480 insertions, 0 deletions
diff --git a/jni/ruby/lib/rinda/ring.rb b/jni/ruby/lib/rinda/ring.rb
new file mode 100644
index 0000000..fe33420
--- /dev/null
+++ b/jni/ruby/lib/rinda/ring.rb
@@ -0,0 +1,480 @@
+#
+# Note: Rinda::Ring API is unstable.
+#
+require 'drb/drb'
+require 'rinda/rinda'
+require 'thread'
+require 'ipaddr'
+
+module Rinda
+
+ ##
+ # The default port Ring discovery will use.
+
+ Ring_PORT = 7647
+
+ ##
+ # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts.
+ # Default service location uses the following steps:
+ #
+ # 1. A RingServer begins listening on the network broadcast UDP address.
+ # 2. A RingFinger sends a UDP packet containing the DRb URI where it will
+ # listen for a reply.
+ # 3. The RingServer receives the UDP packet and connects back to the
+ # provided DRb URI with the DRb service.
+ #
+ # A RingServer requires a TupleSpace:
+ #
+ # ts = Rinda::TupleSpace.new
+ # rs = Rinda::RingServer.new
+ #
+ # RingServer can also listen on multicast addresses for announcements. This
+ # allows multiple RingServers to run on the same host. To use network
+ # broadcast and multicast:
+ #
+ # ts = Rinda::TupleSpace.new
+ # rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]
+
+ class RingServer
+
+ include DRbUndumped
+
+ ##
+ # Special renewer for the RingServer to allow shutdown
+
+ class Renewer # :nodoc:
+ include DRbUndumped
+
+ ##
+ # Set to false to shutdown future requests using this Renewer
+
+ attr_writer :renew
+
+ def initialize # :nodoc:
+ @renew = true
+ end
+
+ def renew # :nodoc:
+ @renew ? 1 : true
+ end
+ end
+
+ ##
+ # Advertises +ts+ on the given +addresses+ at +port+.
+ #
+ # If +addresses+ is omitted only the UDP broadcast address is used.
+ #
+ # +addresses+ can contain multiple addresses. If a multicast address is
+ # given in +addresses+ then the RingServer will listen for multicast
+ # queries.
+ #
+ # If you use IPv4 multicast you may need to set an address of the inbound
+ # interface which joins a multicast group.
+ #
+ # ts = Rinda::TupleSpace.new
+ # rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])
+ #
+ # You can set addresses as an Array Object. The first element of the
+ # Array is a multicast address and the second is an inbound interface
+ # address. If the second is omitted then '0.0.0.0' is used.
+ #
+ # If you use IPv6 multicast you may need to set both the local interface
+ # address and the inbound interface index:
+ #
+ # rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])
+ #
+ # The first element is a multicast address and the second is an inbound
+ # interface address. The third is an inbound interface index.
+ #
+ # At this time there is no easy way to get an interface index by name.
+ #
+ # If the second is omitted then '::1' is used.
+ # If the third is omitted then 0 (default interface) is used.
+
+ def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
+ @port = port
+
+ if Integer === addresses then
+ addresses, @port = [Socket::INADDR_ANY], addresses
+ end
+
+ @renewer = Renewer.new
+
+ @ts = ts
+ @sockets = []
+ addresses.each do |address|
+ if Array === address
+ make_socket(*address)
+ else
+ make_socket(address)
+ end
+ end
+
+ @w_services = write_services
+ @r_service = reply_service
+ end
+
+ ##
+ # Creates a socket at +address+
+ #
+ # If +address+ is multicast address then +interface_address+ and
+ # +multicast_interface+ can be set as optional.
+ #
+ # A created socket is bound to +interface_address+. If you use IPv4
+ # multicast then the interface of +interface_address+ is used as the
+ # inbound interface. If +interface_address+ is omitted or nil then
+ # '0.0.0.0' or '::1' is used.
+ #
+ # If you use IPv6 multicast then +multicast_interface+ is used as the
+ # inbound interface. +multicast_interface+ is a network interface index.
+ # If +multicast_interface+ is omitted then 0 (default interface) is used.
+
+ def make_socket(address, interface_address=nil, multicast_interface=0)
+ addrinfo = Addrinfo.udp(address, @port)
+
+ socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
+ addrinfo.protocol)
+ @sockets << socket
+
+ if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
+ if Socket.const_defined?(:SO_REUSEPORT) then
+ socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
+ else
+ socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
+ end
+
+ if addrinfo.ipv4_multicast? then
+ interface_address = '0.0.0.0' if interface_address.nil?
+ socket.bind(Addrinfo.udp(interface_address, @port))
+
+ mreq = IPAddr.new(addrinfo.ip_address).hton +
+ IPAddr.new(interface_address).hton
+
+ socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
+ else
+ interface_address = '::1' if interface_address.nil?
+ socket.bind(Addrinfo.udp(interface_address, @port))
+
+ mreq = IPAddr.new(addrinfo.ip_address).hton +
+ [multicast_interface].pack('I')
+
+ socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
+ end
+ else
+ socket.bind(addrinfo)
+ end
+
+ socket
+ end
+
+ ##
+ # Creates threads that pick up UDP packets and passes them to do_write for
+ # decoding.
+
+ def write_services
+ @sockets.map do |s|
+ Thread.new(s) do |socket|
+ loop do
+ msg = socket.recv(1024)
+ do_write(msg)
+ end
+ end
+ end
+ end
+
+ ##
+ # Extracts the response URI from +msg+ and adds it to TupleSpace where it
+ # will be picked up by +reply_service+ for notification.
+
+ def do_write(msg)
+ Thread.new do
+ begin
+ tuple, sec = Marshal.load(msg)
+ @ts.write(tuple, sec)
+ rescue
+ end
+ end
+ end
+
+ ##
+ # Creates a thread that notifies waiting clients from the TupleSpace.
+
+ def reply_service
+ Thread.new do
+ loop do
+ do_reply
+ end
+ end
+ end
+
+ ##
+ # Pulls lookup tuples out of the TupleSpace and sends their DRb object the
+ # address of the local TupleSpace.
+
+ def do_reply
+ tuple = @ts.take([:lookup_ring, nil], @renewer)
+ Thread.new { tuple[1].call(@ts) rescue nil}
+ rescue
+ end
+
+ ##
+ # Shuts down the RingServer
+
+ def shutdown
+ @renewer.renew = false
+
+ @w_services.each do |thread|
+ thread.kill
+ thread.join
+ end
+
+ @sockets.each do |socket|
+ socket.close
+ end
+
+ @r_service.kill
+ @r_service.join
+ end
+
+ end
+
+ ##
+ # RingFinger is used by RingServer clients to discover the RingServer's
+ # TupleSpace. Typically, all a client needs to do is call
+ # RingFinger.primary to retrieve the remote TupleSpace, which it can then
+ # begin using.
+ #
+ # To find the first available remote TupleSpace:
+ #
+ # Rinda::RingFinger.primary
+ #
+ # To create a RingFinger that broadcasts to a custom list:
+ #
+ # rf = Rinda::RingFinger.new ['localhost', '192.0.2.1']
+ # rf.primary
+ #
+ # Rinda::RingFinger also understands multicast addresses and sets them up
+ # properly. This allows you to run multiple RingServers on the same host:
+ #
+ # rf = Rinda::RingFinger.new ['239.0.0.1']
+ # rf.primary
+ #
+ # You can set the hop count (or TTL) for multicast searches using
+ # #multicast_hops.
+ #
+ # If you use IPv6 multicast you may need to set both an address and the
+ # outbound interface index:
+ #
+ # rf = Rinda::RingFinger.new ['ff02::1']
+ # rf.multicast_interface = 1
+ # rf.primary
+ #
+ # At this time there is no easy way to get an interface index by name.
+
+ class RingFinger
+
+ @@broadcast_list = ['<broadcast>', 'localhost']
+
+ @@finger = nil
+
+ ##
+ # Creates a singleton RingFinger and looks for a RingServer. Returns the
+ # created RingFinger.
+
+ def self.finger
+ unless @@finger
+ @@finger = self.new
+ @@finger.lookup_ring_any
+ end
+ @@finger
+ end
+
+ ##
+ # Returns the first advertised TupleSpace.
+
+ def self.primary
+ finger.primary
+ end
+
+ ##
+ # Contains all discovered TupleSpaces except for the primary.
+
+ def self.to_a
+ finger.to_a
+ end
+
+ ##
+ # The list of addresses where RingFinger will send query packets.
+
+ attr_accessor :broadcast_list
+
+ ##
+ # Maximum number of hops for sent multicast packets (if using a multicast
+ # address in the broadcast list). The default is 1 (same as UDP
+ # broadcast).
+
+ attr_accessor :multicast_hops
+
+ ##
+ # The interface index to send IPv6 multicast packets from.
+
+ attr_accessor :multicast_interface
+
+ ##
+ # The port that RingFinger will send query packets to.
+
+ attr_accessor :port
+
+ ##
+ # Contain the first advertised TupleSpace after lookup_ring_any is called.
+
+ attr_accessor :primary
+
+ ##
+ # Creates a new RingFinger that will look for RingServers at +port+ on
+ # the addresses in +broadcast_list+.
+ #
+ # If +broadcast_list+ contains a multicast address then multicast queries
+ # will be made using the given multicast_hops and multicast_interface.
+
+ def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
+ @broadcast_list = broadcast_list || ['localhost']
+ @port = port
+ @primary = nil
+ @rings = []
+
+ @multicast_hops = 1
+ @multicast_interface = 0
+ end
+
+ ##
+ # Contains all discovered TupleSpaces except for the primary.
+
+ def to_a
+ @rings
+ end
+
+ ##
+ # Iterates over all discovered TupleSpaces starting with the primary.
+
+ def each
+ lookup_ring_any unless @primary
+ return unless @primary
+ yield(@primary)
+ @rings.each { |x| yield(x) }
+ end
+
+ ##
+ # Looks up RingServers waiting +timeout+ seconds. RingServers will be
+ # given +block+ as a callback, which will be called with the remote
+ # TupleSpace.
+
+ def lookup_ring(timeout=5, &block)
+ return lookup_ring_any(timeout) unless block_given?
+
+ msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
+ @broadcast_list.each do |it|
+ send_message(it, msg)
+ end
+ sleep(timeout)
+ end
+
+ ##
+ # Returns the first found remote TupleSpace. Any further recovered
+ # TupleSpaces can be found by calling +to_a+.
+
+ def lookup_ring_any(timeout=5)
+ queue = Queue.new
+
+ Thread.new do
+ self.lookup_ring(timeout) do |ts|
+ queue.push(ts)
+ end
+ queue.push(nil)
+ end
+
+ @primary = queue.pop
+ raise('RingNotFound') if @primary.nil?
+
+ Thread.new do
+ while it = queue.pop
+ @rings.push(it)
+ end
+ end
+
+ @primary
+ end
+
+ ##
+ # Creates a socket for +address+ with the appropriate multicast options
+ # for multicast addresses.
+
+ def make_socket(address) # :nodoc:
+ addrinfo = Addrinfo.udp(address, @port)
+
+ soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
+ begin
+ if addrinfo.ipv4_multicast? then
+ soc.setsockopt(Socket::Option.ipv4_multicast_loop(1))
+ soc.setsockopt(Socket::Option.ipv4_multicast_ttl(@multicast_hops))
+ elsif addrinfo.ipv6_multicast? then
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
+ [@multicast_hops].pack('I'))
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
+ [@multicast_interface].pack('I'))
+ else
+ soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
+ end
+
+ soc.connect(addrinfo)
+ rescue Exception
+ soc.close
+ raise
+ end
+
+ soc
+ end
+
+ def send_message(address, message) # :nodoc:
+ soc = make_socket(address)
+
+ soc.send(message, 0)
+ rescue
+ nil
+ ensure
+ soc.close if soc
+ end
+
+ end
+
+ ##
+ # RingProvider uses a RingServer advertised TupleSpace as a name service.
+ # TupleSpace clients can register themselves with the remote TupleSpace and
+ # look up other provided services via the remote TupleSpace.
+ #
+ # Services are registered with a tuple of the format [:name, klass,
+ # DRbObject, description].
+
+ class RingProvider
+
+ ##
+ # Creates a RingProvider that will provide a +klass+ service running on
+ # +front+, with a +description+. +renewer+ is optional.
+
+ def initialize(klass, front, desc, renewer = nil)
+ @tuple = [:name, klass, front, desc]
+ @renewer = renewer || Rinda::SimpleRenewer.new
+ end
+
+ ##
+ # Advertises this service on the primary remote TupleSpace.
+
+ def provide
+ ts = Rinda::RingFinger.primary
+ ts.write(@tuple, @renewer)
+ end
+
+ end
+
+end