@@ -83,6 +83,7 @@ def initialize(promise, default_executor = :io)
8383 @DefaultExecutor = default_executor
8484 @Touched = AtomicBoolean . new ( false )
8585 @Callbacks = LockFreeStack . new
86+ @Waiters = LockFreeStack . new
8687 @State = AtomicReference . new :pending
8788 super ( )
8889 ensure_ivar_visibility!
@@ -174,7 +175,8 @@ def inspect
174175 # @api private
175176 def complete ( raise = true )
176177 if complete_state
177- synchronize { ns_broadcast }
178+ # go to synchronized block only if there were waiting threads
179+ synchronize { ns_broadcast } if @Waiters . clear
178180 call_callbacks
179181 else
180182 Concurrent ::MultipleAssignmentError . new ( 'multiple assignment' ) if raise
@@ -222,8 +224,19 @@ def touched
222224 private
223225
224226 def wait_until_complete ( timeout )
225- unless completed?
226- synchronize { ns_wait_until ( timeout ) { completed? } }
227+ lock = Synchronization ::Lock . new
228+
229+ while true
230+ last_waiter = @Waiters . peek # waiters' state before completion
231+ break if completed?
232+
233+ # synchronize so it cannot be signaled before it waits
234+ synchronize do
235+ # ok only if completing thread did not start signaling
236+ next unless @Waiters . compare_and_push last_waiter , lock
237+ ns_wait_until ( timeout ) { completed? }
238+ break
239+ end
227240 end
228241 self
229242 end
@@ -244,12 +257,11 @@ def pr_callback_on_completion(callback)
244257 callback . call
245258 end
246259
247- def pr_notify_blocked ( promise )
260+ def pr_callback_notify_blocked ( promise )
248261 promise . on_done self
249262 end
250263
251264 def call_callback ( method , *args )
252- # all methods has to be pure
253265 self . send method , *args
254266 end
255267
@@ -273,7 +285,7 @@ def to_s
273285 end
274286 end
275287
276- Failed = ImmutableStruct . new :reason do
288+ Failed = ImmutableStruct . new :reason do
277289 def value
278290 nil
279291 end
@@ -395,9 +407,15 @@ def on_failure!(&callback)
395407 add_callback :pr_callback_on_failure , callback
396408 end
397409
410+ # @api private
411+ def apply_value ( value , block )
412+ block . call value
413+ end
414+
398415 # @api private
399416 def complete ( success , value , reason , raise = true )
400417 if complete_state success , value , reason
418+ @Waiters . clear
401419 synchronize { ns_broadcast }
402420 call_callbacks success , value , reason
403421 else
@@ -414,6 +432,7 @@ def add_callback(method, *args)
414432 else
415433 @Callbacks . push [ method , *args ]
416434 state = self . state
435+ # take back if it was completed in the meanwhile
417436 call_callbacks success? ( state ) , state . value , state . reason if completed? ( state )
418437 end
419438 self
@@ -439,6 +458,10 @@ def call_callbacks(success, value, reason)
439458 end
440459 end
441460
461+ def call_callback ( method , success , value , reason , *args )
462+ self . send method , success , value , reason , *args
463+ end
464+
442465 def pr_async_callback_on_success ( success , value , reason , executor , callback )
443466 pr_with_async ( executor , success , value , reason , callback ) do |success , value , reason , callback |
444467 pr_callback_on_success success , value , reason , callback
@@ -452,7 +475,7 @@ def pr_async_callback_on_failure(success, value, reason, executor, callback)
452475 end
453476
454477 def pr_callback_on_success ( success , value , reason , callback )
455- callback . call value if success
478+ apply_value value , callback if success
456479 end
457480
458481 def pr_callback_on_failure ( success , value , reason , callback )
@@ -463,7 +486,7 @@ def pr_callback_on_completion(success, value, reason, callback)
463486 callback . call success , value , reason
464487 end
465488
466- def pr_notify_blocked ( success , value , reason , promise )
489+ def pr_callback_notify_blocked ( success , value , reason , promise )
467490 super ( promise )
468491 end
469492
@@ -506,11 +529,11 @@ def try_fail(reason = StandardError.new)
506529 end
507530
508531 def evaluate_to ( *args , &block )
509- promise . evaluate_to ( *args , & block )
532+ promise . evaluate_to ( *args , block )
510533 end
511534
512535 def evaluate_to! ( *args , &block )
513- promise . evaluate_to! ( *args , & block )
536+ promise . evaluate_to! ( *args , block )
514537 end
515538 end
516539
@@ -609,8 +632,8 @@ def try_fail(reason = StandardError.new)
609632 public :evaluate_to
610633
611634 # @return [Future]
612- def evaluate_to! ( *args , & block )
613- evaluate_to ( *args , & block ) . wait!
635+ def evaluate_to! ( *args , block )
636+ evaluate_to ( *args , block ) . wait!
614637 end
615638 end
616639
@@ -625,7 +648,7 @@ def initialize(future, blocked_by_futures, countdown, &block)
625648 @Countdown = AtomicFixnum . new countdown
626649
627650 super ( future )
628- blocked_by . each { |f | f . add_callback :pr_notify_blocked , self }
651+ blocked_by . each { |future | future . add_callback :pr_callback_notify_blocked , self }
629652 end
630653
631654 # @api private
@@ -705,7 +728,9 @@ def initialize(blocked_by_future, default_executor = :io, executor = default_exe
705728
706729 def on_completable ( done_future )
707730 if done_future . success?
708- Concurrent . post_on ( @Executor , done_future , @Task ) { |done_future , task | evaluate_to done_future . value , &task }
731+ Concurrent . post_on ( @Executor , done_future , @Task ) do |done_future , task |
732+ evaluate_to { done_future . apply_value done_future . value , task }
733+ end
709734 else
710735 complete false , nil , done_future . reason
711736 end
@@ -722,7 +747,7 @@ def initialize(blocked_by_future, default_executor = :io, executor = default_exe
722747
723748 def on_completable ( done_future )
724749 if done_future . failed?
725- Concurrent . post_on ( @Executor , done_future , @Task ) { |done_future , task | evaluate_to done_future . reason , &task }
750+ Concurrent . post_on ( @Executor , done_future . reason , @Task ) { |reason , task | evaluate_to reason , &task }
726751 else
727752 complete true , done_future . value , nil
728753 end
@@ -757,12 +782,12 @@ def blocked_by
757782
758783 def process_on_done ( future )
759784 countdown = super ( future )
760- value = future . value
785+ value = future . value
761786 if countdown . nonzero?
762787 case value
763788 when Future
764789 @BlockedBy . push value
765- value . add_callback :pr_notify_blocked , self
790+ value . add_callback :pr_callback_notify_blocked , self
766791 @Countdown . value
767792 when Event
768793 raise TypeError , 'cannot flatten to Event'
@@ -797,26 +822,57 @@ def clear_blocked_by!
797822
798823 # used internally to support #with_default_executor
799824 class AllPromise < BlockedPromise
825+
826+ class ArrayFuture < Future
827+ def apply_value ( value , block )
828+ block . call ( *value )
829+ end
830+ end
831+
800832 private
801833
802834 def initialize ( blocked_by_futures , default_executor = :io )
803- klass = blocked_by_futures . any? { |f | f . is_a? ( Future ) } ? Future : Event
835+ klass = Event
836+ blocked_by_futures . each do |f |
837+ if f . is_a? ( Future )
838+ if klass == Event
839+ klass = Future
840+ elsif klass == Future
841+ klass = ArrayFuture
842+ break
843+ end
844+ end
845+ end
846+
804847 # noinspection RubyArgCount
805848 super ( klass . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
806849 end
807850
808851 def on_completable ( done_future )
809- results = blocked_by . select { |f | f . is_a? ( Future ) } . map ( &:result )
810- if results . empty?
811- complete
812- else
813- if results . all? { |success , _ , _ | success }
814- params = results . map { |_ , value , _ | value }
815- complete ( true , params . size == 1 ? params . first : params , nil )
852+ all_success = true
853+ reason = nil
854+
855+ values = blocked_by . each_with_object ( [ ] ) do |future , values |
856+ next unless future . is_a? ( Future )
857+ success , value , reason = future . result
858+
859+ unless success
860+ all_success = false
861+ reason = reason
862+ break
863+ end
864+ values << value
865+ end
866+
867+ if all_success
868+ if values . empty?
869+ complete
816870 else
817- # TODO what about other reasons?
818- complete false , nil , results . find { |success , _ , _ | !success } . last
871+ complete ( true , values . size == 1 ? values . first : values , nil )
819872 end
873+ else
874+ # TODO what about other reasons?
875+ complete false , nil , reason
820876 end
821877 end
822878 end
0 commit comments