@@ -56,12 +56,12 @@ def post(executor = :fast, &job)
5656
5757 # @return [Future]
5858 def future ( executor = :fast , &block )
59- Future . execute executor , &block
59+ Immediate . new ( executor , &block ) . future
6060 end
6161
6262 # @return [Delay]
6363 def delay ( executor = :fast , &block )
64- Delay . new ( executor , &block )
64+ Delay . new ( nil , executor , &block ) . future
6565 end
6666
6767 alias_method :async , :future
@@ -144,21 +144,15 @@ module FutureHelpers
144144 # @return [Future]
145145 def join ( *futures )
146146 countdown = Concurrent ::AtomicFixnum . new futures . size
147- promise = Promise . new . add_blocked_by ( * futures ) # TODO add injectable executor
147+ promise = ExternalPromise . new ( futures )
148148 futures . each { |future | future . add_callback :join , countdown , promise , *futures }
149149 promise . future
150150 end
151-
152- # @return [Future]
153- def execute ( executor = :fast , &block )
154- promise = Promise . new ( executor )
155- Next . executor ( executor ) . post { promise . evaluate_to &block }
156- promise . future
157- end
158151 end
159152
160153 class Future < SynchronizedObject
161154 extend FutureHelpers
155+ extend Shortcuts
162156
163157 singleton_class . send :alias_method , :dataflow , :join
164158
@@ -264,26 +258,47 @@ def exception(*args)
264258 reason . exception ( *args )
265259 end
266260
267- # TODO add #then_delay { ... } and such to be able to chain delayed evaluations
261+ # TODO needs better name
262+ def connect ( executor = default_executor )
263+ ConnectedPromise . new ( self , executor ) . future
264+ end
268265
269266 # @yield [success, value, reason] of the parent
270267 def chain ( executor = default_executor , &callback )
271- add_callback :chain_callback , executor , promise = Promise . new ( default_executor ) . add_blocked_by ( self ) , callback
268+ add_callback :chain_callback , executor , promise = ExternalPromise . new ( [ self ] , default_executor ) , callback
272269 promise . future
273270 end
274271
275272 # @yield [value] executed only on parent success
276273 def then ( executor = default_executor , &callback )
277- add_callback :then_callback , executor , promise = Promise . new ( default_executor ) . add_blocked_by ( self ) , callback
274+ add_callback :then_callback , executor , promise = ExternalPromise . new ( [ self ] , default_executor ) , callback
278275 promise . future
279276 end
280277
281278 # @yield [reason] executed only on parent failure
282279 def rescue ( executor = default_executor , &callback )
283- add_callback :rescue_callback , executor , promise = Promise . new ( default_executor ) . add_blocked_by ( self ) , callback
280+ add_callback :rescue_callback , executor , promise = ExternalPromise . new ( [ self ] , default_executor ) , callback
284281 promise . future
285282 end
286283
284+ # lazy version of #chain
285+ def chain_delay ( executor = default_executor , &callback )
286+ delay = Delay . new ( self , executor ) { callback_on_completion callback }
287+ delay . future
288+ end
289+
290+ # lazy version of #then
291+ def then_delay ( executor = default_executor , &callback )
292+ delay = Delay . new ( self , executor ) { conditioned_callback callback }
293+ delay . future
294+ end
295+
296+ # lazy version of #rescue
297+ def rescue_delay ( executor = default_executor , &callback )
298+ delay = Delay . new ( self , executor ) { callback_on_failure callback }
299+ delay . future
300+ end
301+
287302 # @yield [success, value, reason] executed async on `executor` when completed
288303 # @return self
289304 def on_completion ( executor = default_executor , &callback )
@@ -399,27 +414,15 @@ def with_promise(promise, &block)
399414 end
400415
401416 def chain_callback ( executor , promise , callback )
402- with_async ( executor ) do
403- with_promise ( promise ) do
404- callback_on_completion callback
405- end
406- end
417+ with_async ( executor ) { with_promise ( promise ) { callback_on_completion callback } }
407418 end
408419
409420 def then_callback ( executor , promise , callback )
410- with_async ( executor ) do
411- with_promise ( promise ) do
412- success? ? callback . call ( value ) : raise ( reason )
413- end
414- end
421+ with_async ( executor ) { with_promise ( promise ) { conditioned_callback callback } }
415422 end
416423
417424 def rescue_callback ( executor , promise , callback )
418- with_async ( executor ) do
419- with_promise ( promise ) do
420- callback_on_failure callback
421- end
422- end
425+ with_async ( executor ) { with_promise ( promise ) { callback_on_failure callback } }
423426 end
424427
425428 def with_async ( executor )
@@ -450,20 +453,20 @@ def callback_on_failure(callback)
450453 callback . call reason if failed?
451454 end
452455
456+ def conditioned_callback ( callback )
457+ self . success? ? callback . call ( value ) : raise ( reason )
458+ end
459+
453460 def call_callback ( method , *args )
454461 self . send method , *args
455462 end
456463 end
457464
458465 class Promise < SynchronizedObject
459466 # @api private
460- def initialize ( executor_or_future = :fast )
467+ def initialize ( executor = :fast )
461468 super ( )
462- future = if Future === executor_or_future
463- executor_or_future
464- else
465- Future . new ( self , executor_or_future )
466- end
469+ future = Future . new ( self , executor )
467470
468471 synchronize do
469472 @future = future
@@ -480,6 +483,48 @@ def blocked_by
480483 synchronize { @blocked_by }
481484 end
482485
486+ def state
487+ future . state
488+ end
489+
490+ def touch
491+ blocked_by . each ( &:touch ) if synchronize { @touched ? false : ( @touched = true ) }
492+ end
493+
494+ def to_s
495+ "<##{ self . class } :0x#{ '%x' % ( object_id << 1 ) } #{ state } >"
496+ end
497+
498+ def inspect
499+ "#{ to_s [ 0 ..-2 ] } blocked_by:[#{ synchronize { @blocked_by } . map ( &:to_s ) . join ( ', ' ) } ]>"
500+ end
501+
502+ private
503+
504+ def add_blocked_by ( *futures )
505+ synchronize { @blocked_by += futures }
506+ self
507+ end
508+
509+ def complete ( success , value , reason , raise = true )
510+ future . complete ( success , value , reason , raise )
511+ synchronize { @blocked_by . clear }
512+ end
513+
514+ # @return [Future]
515+ def evaluate_to ( &block ) # TODO for parent
516+ complete true , block . call , nil
517+ rescue => error
518+ complete false , nil , error
519+ end
520+ end
521+
522+ class ExternalPromise < Promise
523+ def initialize ( blocked_by_futures , executor = :fast )
524+ super executor
525+ add_blocked_by *blocked_by_futures
526+ end
527+
483528 # Set the `IVar` to a value and wake or notify all threads waiting on it.
484529 #
485530 # @param [Object] value the value to store in the `IVar`
@@ -506,76 +551,69 @@ def try_fail(reason = StandardError.new)
506551 !!complete ( false , nil , reason , false )
507552 end
508553
509- def complete ( success , value , reason , raise = true )
510- future . complete ( success , value , reason , raise )
511- synchronize { @blocked_by . clear }
512- end
513-
514- def state
515- future . state
516- end
517-
518- # @return [Future]
519- def evaluate_to ( &block )
520- success block . call
521- rescue => error
522- fail error
523- end
554+ public :evaluate_to
524555
525556 # @return [Future]
526557 def evaluate_to! ( &block )
527558 evaluate_to ( &block ) . no_error!
528559 end
560+ end
561+
562+ class ConnectedPromise < Promise
563+ def initialize ( future , executor = :fast )
564+ super ( executor )
565+ connect_to future
566+ end
567+
568+ # @api private
569+ public :complete
570+
571+ private
529572
530573 # @return [Future]
531574 def connect_to ( future )
532575 add_blocked_by future
533576 future . add_callback :set_promise_on_completion , self
534577 self . future
535578 end
579+ end
536580
537- def touch
538- blocked_by . each ( &:touch ) if synchronize { @touched ? false : ( @touched = true ) }
539- end
540-
541- def to_s
542- "<##{ self . class } :0x#{ '%x' % ( object_id << 1 ) } #{ state } >"
543- end
544-
545- def inspect
546- "#{ to_s [ 0 ..-2 ] } blocked_by:[#{ synchronize { @blocked_by } . map ( &:to_s ) . join ( ', ' ) } ]>"
547- end
548-
549- # @api private
550- def add_blocked_by ( *futures )
551- synchronize { @blocked_by += futures }
552- self
581+ class Immediate < Promise
582+ def initialize ( executor = :fast , &task )
583+ super ( executor )
584+ Next . executor ( executor ) . post { evaluate_to &task }
553585 end
554586 end
555587
556- class Delay < Future
557-
558- def initialize ( default_executor = :fast , &block )
559- super ( Promise . new ( self ) , default_executor )
560- raise ArgumentError . new ( 'no block given' ) unless block_given?
588+ class Delay < Promise
589+ def initialize ( blocked_by_future , executor = :fast , &task )
590+ super ( executor )
561591 synchronize do
592+ @task = task
562593 @computing = false
563- @task = block
564594 end
595+ add_blocked_by blocked_by_future if blocked_by_future
565596 end
566597
567- def wait ( timeout = nil )
568- touch
569- super timeout
598+ def touch
599+ if blocked_by . all? ( &:completed? )
600+ execute_once
601+ else
602+ blocked_by . each { |f | f . on_success! { self . touch } unless synchronize { @touched } }
603+ super
604+ end
570605 end
571606
572- # starts executing the value without blocking
573- def touch
607+ private
608+
609+ def execute_once
574610 execute , task = synchronize do
575611 [ ( @computing = true unless @computing ) , @task ]
576612 end
577613
578- Next . executor ( default_executor ) . post { promise . evaluate_to &task } if execute
614+ if execute
615+ Next . executor ( future . default_executor ) . post { evaluate_to &task }
616+ end
579617 self
580618 end
581619 end
@@ -610,7 +648,7 @@ def touch
610648future2 = future1 . then { |v | v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR
611649future3 = future1 . rescue { |err | err . message } # executed on default FAST_EXECUTOR
612650future4 = future0 . chain { |success , value , reason | success } # executed on default FAST_EXECUTOR
613- future5 = Promise . new ( :io ) . connect_to ( future3 )
651+ future5 = future3 . connect ( :io ) # connects new future with different executor, the new future is completed when future3 is
614652future6 = future5 . then ( &:capitalize ) # executes on IO_EXECUTOR because default was set to :io on future5
615653future7 = Future . join ( future0 , future3 )
616654
@@ -642,8 +680,9 @@ def touch
642680puts '-- promise like tree'
643681
644682# if head of the tree is not constructed with #future but with #delay it does not start execute,
645- # it's triggered later by calling wait or value on any of the depedent futures or the delay itself
646- tree = ( head = delay { 1 } ) . then { |v | v . succ } . then ( &:succ ) . then ( &:succ )
683+ # it's triggered later by calling wait or value on any of the dependent futures or the delay itself
684+ three = ( head = delay { 1 } ) . then { |v | v . succ } . then ( &:succ )
685+ four = three . then_delay ( &:succ )
647686
648687# meaningful to_s and inspect defined for Future and Promise
649688puts head
@@ -652,12 +691,65 @@ def touch
652691# <#Concurrent::Next::Delay:7f89b4bccc68 pending [<#Concurrent::Next::Promise:7f89b4bccb00 pending>]]>
653692p head . callbacks
654693# [[:then_callback, :fast, <#Concurrent::Next::Promise:0x7fa54b31d218 pending [<#Concurrent::Next::Delay:0x7fa54b31d380 pending>]>, #<Proc:0x007fa54b31d290>]]
655- p tree . value
694+
695+ # evaluates only up to three, four is left unevaluated
696+ p three . value # 3
697+ p four , four . promise
698+ # until value is called on four
699+ p four . value # 4
700+
701+ # futures hidden behind two delays trigger evaluation of both
702+ double_delay = delay { 1 } . then_delay ( &:succ )
703+ p double_delay . value # 2
704+
705+ puts '-- graph'
706+
707+ head = future { 1 }
708+ branch1 = head . then ( &:succ ) . then ( &:succ )
709+ branch2 = head . then ( &:succ ) . then_delay ( &:succ )
710+ result = Future . join ( branch1 , branch2 ) . then { |b1 , b2 | b1 + b2 }
711+
712+ sleep 0.1
713+ p branch1 . completed? , branch2 . completed? # true, false
714+ # force evaluation of whole graph
715+ p result . value # 6
656716
657717puts '-- bench'
658718require 'benchmark'
659719
660- Benchmark . bmbm ( 20 ) do |b |
720+ module Benchmark
721+ def self . bmbmbm ( rehearsals , width )
722+ job = Job . new ( width )
723+ yield ( job )
724+ width = job . width + 1
725+ sync = STDOUT . sync
726+ STDOUT . sync = true
727+
728+ # rehearsal
729+ rehearsals . times do
730+ puts 'Rehearsal ' . ljust ( width +CAPTION . length , '-' )
731+ ets = job . list . inject ( Tms . new ) { |sum , ( label , item ) |
732+ print label . ljust ( width )
733+ res = Benchmark . measure ( &item )
734+ print res . format
735+ sum + res
736+ } . format ( "total: %tsec" )
737+ print " #{ ets } \n \n " . rjust ( width +CAPTION . length +2 , '-' )
738+ end
739+
740+ # take
741+ print ' ' *width + CAPTION
742+ job . list . map { |label , item |
743+ GC . start
744+ print label . ljust ( width )
745+ Benchmark . measure ( label , &item ) . tap { |res | print res }
746+ }
747+ ensure
748+ STDOUT . sync = sync unless sync . nil?
749+ end
750+ end
751+
752+ Benchmark . bmbmbm ( 20 , 20 ) do |b |
661753
662754 parents = [ RubySynchronizedObject , ( JavaSynchronizedObject if defined? JavaSynchronizedObject ) ] . compact
663755 classes = parents . map do |parent |
0 commit comments