1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
|
require 'monitor'
require 'thread'
require 'drb/drb'
require 'rinda/rinda'
require 'enumerator'
require 'forwardable'
module Rinda
##
# A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
# together with expiry and cancellation data.
class TupleEntry
include DRbUndumped
attr_accessor :expires
##
# Creates a TupleEntry based on +ary+ with an optional renewer or expiry
# time +sec+.
#
# A renewer must implement the +renew+ method which returns a Numeric,
# nil, or true to indicate when the tuple has expired.
def initialize(ary, sec=nil)
@cancel = false
@expires = nil
@tuple = make_tuple(ary)
@renewer = nil
renew(sec)
end
##
# Marks this TupleEntry as canceled.
def cancel
@cancel = true
end
##
# A TupleEntry is dead when it is canceled or expired.
def alive?
!canceled? && !expired?
end
##
# Return the object which makes up the tuple itself: the Array
# or Hash.
def value; @tuple.value; end
##
# Returns the canceled status.
def canceled?; @cancel; end
##
# Has this tuple expired? (true/false).
#
# A tuple has expired when its expiry timer based on the +sec+ argument to
# #initialize runs out.
def expired?
return true unless @expires
return false if @expires > Time.now
return true if @renewer.nil?
renew(@renewer)
return true unless @expires
return @expires < Time.now
end
##
# Reset the expiry time according to +sec_or_renewer+.
#
# +nil+:: it is set to expire in the far future.
# +true+:: it has expired.
# Numeric:: it will expire in that many seconds.
#
# Otherwise the argument refers to some kind of renewer object
# which will reset its expiry time.
def renew(sec_or_renewer)
sec, @renewer = get_renewer(sec_or_renewer)
@expires = make_expires(sec)
end
##
# Returns an expiry Time based on +sec+ which can be one of:
# Numeric:: +sec+ seconds into the future
# +true+:: the expiry time is the start of 1970 (i.e. expired)
# +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
# UNIX clocks will die)
def make_expires(sec=nil)
case sec
when Numeric
Time.now + sec
when true
Time.at(1)
when nil
Time.at(2**31-1)
end
end
##
# Retrieves +key+ from the tuple.
def [](key)
@tuple[key]
end
##
# Fetches +key+ from the tuple.
def fetch(key)
@tuple.fetch(key)
end
##
# The size of the tuple.
def size
@tuple.size
end
##
# Creates a Rinda::Tuple for +ary+.
def make_tuple(ary)
Rinda::Tuple.new(ary)
end
private
##
# Returns a valid argument to make_expires and the renewer or nil.
#
# Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
# renewer). Otherwise it returns an expiry value from calling +it.renew+
# and the renewer.
def get_renewer(it)
case it
when Numeric, true, nil
return it, nil
else
begin
return it.renew, it
rescue Exception
return it, nil
end
end
end
end
##
# A TemplateEntry is a Template together with expiry and cancellation data.
class TemplateEntry < TupleEntry
##
# Matches this TemplateEntry against +tuple+. See Template#match for
# details on how a Template matches a Tuple.
def match(tuple)
@tuple.match(tuple)
end
alias === match
def make_tuple(ary) # :nodoc:
Rinda::Template.new(ary)
end
end
##
# <i>Documentation?</i>
class WaitTemplateEntry < TemplateEntry
attr_reader :found
def initialize(place, ary, expires=nil)
super(ary, expires)
@place = place
@cond = place.new_cond
@found = nil
end
def cancel
super
signal
end
def wait
@cond.wait
end
def read(tuple)
@found = tuple
signal
end
def signal
@place.synchronize do
@cond.signal
end
end
end
##
# A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
# TupleSpace changes. You may receive either your subscribed event or the
# 'close' event when iterating over notifications.
#
# See TupleSpace#notify_event for valid notification types.
#
# == Example
#
# ts = Rinda::TupleSpace.new
# observer = ts.notify 'write', [nil]
#
# Thread.start do
# observer.each { |t| p t }
# end
#
# 3.times { |i| ts.write [i] }
#
# Outputs:
#
# ['write', [0]]
# ['write', [1]]
# ['write', [2]]
class NotifyTemplateEntry < TemplateEntry
##
# Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
# match +tuple+.
def initialize(place, event, tuple, expires=nil)
ary = [event, Rinda::Template.new(tuple)]
super(ary, expires)
@queue = Queue.new
@done = false
end
##
# Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
def notify(ev)
@queue.push(ev)
end
##
# Retrieves a notification. Raises RequestExpiredError when this
# NotifyTemplateEntry expires.
def pop
raise RequestExpiredError if @done
it = @queue.pop
@done = true if it[0] == 'close'
return it
end
##
# Yields event/tuple pairs until this NotifyTemplateEntry expires.
def each # :yields: event, tuple
while !@done
it = pop
yield(it)
end
rescue
ensure
cancel
end
end
##
# TupleBag is an unordered collection of tuples. It is the basis
# of Tuplespace.
class TupleBag
class TupleBin
extend Forwardable
def_delegators '@bin', :find_all, :delete_if, :each, :empty?
def initialize
@bin = []
end
def add(tuple)
@bin.push(tuple)
end
def delete(tuple)
idx = @bin.rindex(tuple)
@bin.delete_at(idx) if idx
end
def find
@bin.reverse_each do |x|
return x if yield(x)
end
nil
end
end
def initialize # :nodoc:
@hash = {}
@enum = enum_for(:each_entry)
end
##
# +true+ if the TupleBag to see if it has any expired entries.
def has_expires?
@enum.find do |tuple|
tuple.expires
end
end
##
# Add +tuple+ to the TupleBag.
def push(tuple)
key = bin_key(tuple)
@hash[key] ||= TupleBin.new
@hash[key].add(tuple)
end
##
# Removes +tuple+ from the TupleBag.
def delete(tuple)
key = bin_key(tuple)
bin = @hash[key]
return nil unless bin
bin.delete(tuple)
@hash.delete(key) if bin.empty?
tuple
end
##
# Finds all live tuples that match +template+.
def find_all(template)
bin_for_find(template).find_all do |tuple|
tuple.alive? && template.match(tuple)
end
end
##
# Finds a live tuple that matches +template+.
def find(template)
bin_for_find(template).find do |tuple|
tuple.alive? && template.match(tuple)
end
end
##
# Finds all tuples in the TupleBag which when treated as templates, match
# +tuple+ and are alive.
def find_all_template(tuple)
@enum.find_all do |template|
template.alive? && template.match(tuple)
end
end
##
# Delete tuples which dead tuples from the TupleBag, returning the deleted
# tuples.
def delete_unless_alive
deleted = []
@hash.each do |key, bin|
bin.delete_if do |tuple|
if tuple.alive?
false
else
deleted.push(tuple)
true
end
end
end
deleted
end
private
def each_entry(&blk)
@hash.each do |k, v|
v.each(&blk)
end
end
def bin_key(tuple)
head = tuple[0]
if head.class == Symbol
return head
else
false
end
end
def bin_for_find(template)
key = bin_key(template)
key ? @hash.fetch(key, []) : @enum
end
end
##
# The Tuplespace manages access to the tuples it contains,
# ensuring mutual exclusion requirements are met.
#
# The +sec+ option for the write, take, move, read and notify methods may
# either be a number of seconds or a Renewer object.
class TupleSpace
include DRbUndumped
include MonitorMixin
##
# Creates a new TupleSpace. +period+ is used to control how often to look
# for dead tuples after modifications to the TupleSpace.
#
# If no dead tuples are found +period+ seconds after the last
# modification, the TupleSpace will stop looking for dead tuples.
def initialize(period=60)
super()
@bag = TupleBag.new
@read_waiter = TupleBag.new
@take_waiter = TupleBag.new
@notify_waiter = TupleBag.new
@period = period
@keeper = nil
end
##
# Adds +tuple+
def write(tuple, sec=nil)
entry = create_entry(tuple, sec)
synchronize do
if entry.expired?
@read_waiter.find_all_template(entry).each do |template|
template.read(tuple)
end
notify_event('write', entry.value)
notify_event('delete', entry.value)
else
@bag.push(entry)
start_keeper if entry.expires
@read_waiter.find_all_template(entry).each do |template|
template.read(tuple)
end
@take_waiter.find_all_template(entry).each do |template|
template.signal
end
notify_event('write', entry.value)
end
end
entry
end
##
# Removes +tuple+
def take(tuple, sec=nil, &block)
move(nil, tuple, sec, &block)
end
##
# Moves +tuple+ to +port+.
def move(port, tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given?
synchronize do
entry = @bag.find(template)
if entry
port.push(entry.value) if port
@bag.delete(entry)
notify_event('take', entry.value)
return port ? nil : entry.value
end
raise RequestExpiredError if template.expired?
begin
@take_waiter.push(template)
start_keeper if template.expires
while true
raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired?
entry = @bag.find(template)
if entry
port.push(entry.value) if port
@bag.delete(entry)
notify_event('take', entry.value)
return port ? nil : entry.value
end
template.wait
end
ensure
@take_waiter.delete(template)
end
end
end
##
# Reads +tuple+, but does not remove it.
def read(tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given?
synchronize do
entry = @bag.find(template)
return entry.value if entry
raise RequestExpiredError if template.expired?
begin
@read_waiter.push(template)
start_keeper if template.expires
template.wait
raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired?
return template.found
ensure
@read_waiter.delete(template)
end
end
end
##
# Returns all tuples matching +tuple+. Does not remove the found tuples.
def read_all(tuple)
template = WaitTemplateEntry.new(self, tuple, nil)
synchronize do
entry = @bag.find_all(template)
entry.collect do |e|
e.value
end
end
end
##
# Registers for notifications of +event+. Returns a NotifyTemplateEntry.
# See NotifyTemplateEntry for examples of how to listen for notifications.
#
# +event+ can be:
# 'write':: A tuple was added
# 'take':: A tuple was taken or moved
# 'delete':: A tuple was lost after being overwritten or expiring
#
# The TupleSpace will also notify you of the 'close' event when the
# NotifyTemplateEntry has expired.
def notify(event, tuple, sec=nil)
template = NotifyTemplateEntry.new(self, event, tuple, sec)
synchronize do
@notify_waiter.push(template)
end
template
end
private
def create_entry(tuple, sec)
TupleEntry.new(tuple, sec)
end
##
# Removes dead tuples.
def keep_clean
synchronize do
@read_waiter.delete_unless_alive.each do |e|
e.signal
end
@take_waiter.delete_unless_alive.each do |e|
e.signal
end
@notify_waiter.delete_unless_alive.each do |e|
e.notify(['close'])
end
@bag.delete_unless_alive.each do |e|
notify_event('delete', e.value)
end
end
end
##
# Notifies all registered listeners for +event+ of a status change of
# +tuple+.
def notify_event(event, tuple)
ev = [event, tuple]
@notify_waiter.find_all_template(ev).each do |template|
template.notify(ev)
end
end
##
# Creates a thread that scans the tuplespace for expired tuples.
def start_keeper
return if @keeper && @keeper.alive?
@keeper = Thread.new do
while true
sleep(@period)
synchronize do
break unless need_keeper?
keep_clean
end
end
end
end
##
# Checks the tuplespace to see if it needs cleaning.
def need_keeper?
return true if @bag.has_expires?
return true if @read_waiter.has_expires?
return true if @take_waiter.has_expires?
return true if @notify_waiter.has_expires?
end
end
end
|