@@ -1951,57 +1951,128 @@ def then_ask(actor)
19511951 include ActorIntegration
19521952 end
19531953
1954- ### Experimental features follow
1954+ class Channel < Concurrent ::Synchronization ::Object
1955+ safe_initialization!
19551956
1956- module FactoryMethods
1957+ # Default size of the Channel, makes it accept unlimited number of messages.
1958+ UNLIMITED = Object . new
1959+ UNLIMITED . singleton_class . class_eval do
1960+ include Comparable
19571961
1958- # @!visibility private
1962+ def <=>( other )
1963+ 1
1964+ end
19591965
1960- module ChannelIntegration
1966+ def to_s
1967+ 'unlimited'
1968+ end
1969+ end
1970+
1971+ # A channel to pass messages between promises. The size is limited to support back pressure.
1972+ # @param [Integer, UNLIMITED] size the maximum number of messages stored in the channel.
1973+ def initialize ( size = UNLIMITED )
1974+ super ( )
1975+ @Size = size
1976+ # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
1977+ @Mutex = Mutex . new
1978+ @Probes = [ ]
1979+ @Messages = [ ]
1980+ @PendingPush = [ ]
1981+ end
19611982
1962- # @!visibility private
19631983
1964- # only proof of concept
1965- # @return [Future]
1966- def select ( *channels )
1967- # TODO (pitr-ch 26-Mar-2016): re-do, has to be non-blocking
1968- future do
1969- # noinspection RubyArgCount
1970- Channel . select do |s |
1971- channels . each do |ch |
1972- s . take ( ch ) { |value | [ value , ch ] }
1984+ # Returns future which will fulfill when the message is added to the channel. Its value is the message.
1985+ # @param [Object] message
1986+ # @return [Future]
1987+ def push ( message )
1988+ @Mutex . synchronize do
1989+ while true
1990+ if @Probes . empty?
1991+ if @Size > @Messages . size
1992+ @Messages . push message
1993+ return Promises . fulfilled_future message
1994+ else
1995+ pushed = Promises . resolvable_future
1996+ @PendingPush . push [ message , pushed ]
1997+ return pushed . with_hidden_resolvable
1998+ end
1999+ else
2000+ probe = @Probes . shift
2001+ if probe . fulfill [ self , message ] , false
2002+ return Promises . fulfilled_future ( message )
19732003 end
19742004 end
19752005 end
19762006 end
19772007 end
19782008
1979- include ChannelIntegration
2009+ # Returns a future witch will become fulfilled with a value from the channel when one is available.
2010+ # @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
2011+ # @return [Future] the probe, its value will be the message when available.
2012+ def pop ( probe = Concurrent ::Promises . resolvable_future )
2013+ # TODO (pitr-ch 26-Dec-2016): improve performance
2014+ pop_for_select ( probe ) . then ( &:last )
2015+ end
2016+
2017+ # @!visibility private
2018+ def pop_for_select ( probe = Concurrent ::Promises . resolvable_future )
2019+ @Mutex . synchronize do
2020+ if @Messages . empty?
2021+ @Probes . push probe
2022+ else
2023+ message = @Messages . shift
2024+ probe . fulfill [ self , message ]
2025+
2026+ unless @PendingPush . empty?
2027+ message , pushed = @PendingPush . shift
2028+ @Messages . push message
2029+ pushed . fulfill message
2030+ end
2031+ end
2032+ end
2033+ probe
2034+ end
2035+
2036+ # @return [String] Short string representation.
2037+ def to_s
2038+ format '<#%s:0x%x size:%s>' , self . class , object_id << 1 , @Size
2039+ end
2040+
2041+ alias_method :inspect , :to_s
19802042 end
19812043
19822044 class Future < AbstractEventFuture
2045+ module NewChannelIntegration
19832046
1984- # @!visibility private
2047+ # @param [Channel] channel to push to.
2048+ # @return [Future] a future which is fulfilled after the message is pushed to the channel.
2049+ # May take a moment if the channel is full.
2050+ def then_push_channel ( channel )
2051+ self . then { |value | channel . push value } . flat_future
2052+ end
2053+
2054+ # TODO (pitr-ch 26-Dec-2016): does it make sense to have rescue an chain variants as well, check other integrations as well
2055+ end
19852056
1986- module ChannelIntegration
2057+ include NewChannelIntegration
2058+ end
19872059
1988- # @!visibility private
2060+ module FactoryMethods
19892061
1990- # Zips with selected value form the suplied channels
1991- # @return [Future]
1992- def then_select ( *channels )
1993- future = Concurrent ::Promises . select ( *channels )
1994- ZipFuturesPromise . new_blocked_by2 ( self , future , @DefaultExecutor ) . future
1995- end
2062+ module NewChannelIntegration
19962063
1997- # @note may block
1998- # @note only proof of concept
1999- def then_put ( channel )
2000- on_fulfillment_using ( :io , channel ) { |value , channel | channel . put value }
2064+ # Selects a channel which is ready to be read from.
2065+ # @param [Channel] channels
2066+ # @return [Future] a future which is fulfilled with pair [channel, message] when one of the channels is
2067+ # available for reading
2068+ def select_channel ( *channels )
2069+ probe = Promises . resolvable_future
2070+ channels . each { |ch | ch . pop_for_select probe }
2071+ probe
20012072 end
20022073 end
20032074
2004- include ChannelIntegration
2075+ include NewChannelIntegration
20052076 end
20062077
20072078 end
0 commit comments