Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions lib/concurrent/edge/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1141,17 +1141,24 @@ def blocked_by

def process_on_done(future)
countdown = super(future)
value = future.value!
if countdown.nonzero?
internal_state = future.internal_state

unless internal_state.success?
complete_with internal_state
return countdown
end

value = internal_state.value
case value
when Future
@BlockedBy.push value
value.add_callback :pr_callback_notify_blocked, self
@Countdown.value
when Event
raise TypeError, 'cannot flatten to Event'
evaluate_to(lambda { raise TypeError, 'cannot flatten to Event' })
else
raise TypeError, "returned value #{value.inspect} is not a Future"
evaluate_to(lambda { raise TypeError, "returned value #{value.inspect} is not a Future" })
end
end
countdown
Expand All @@ -1174,6 +1181,10 @@ def clear_blocked_by!
@BlockedBy.clear
nil
end

def completable?(countdown)
[email protected]_state.completed? && super(countdown)
end
end

# @!visibility private
Expand Down
29 changes: 25 additions & 4 deletions spec/concurrent/edge/future_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
describe 'Future' do
it 'has sync and async callbacks' do
callbacks_tester = ->(future) do
queue = Queue.new
queue = Queue.new
future.on_completion(:io) { |result| queue.push("async on_completion #{ result.inspect }") }
future.on_completion! { |result| queue.push("sync on_completion #{ result.inspect }") }
future.on_success(:io) { |value| queue.push("async on_success #{ value.inspect }") }
Expand Down Expand Up @@ -309,9 +309,30 @@
expect(Concurrent.zip(branch1, branch2).value!).to eq [2, 3]
end

it 'has flat map' do
f = Concurrent.future { Concurrent.future { 1 } }.flat.then(&:succ)
expect(f.value!).to eq 2
describe '#flat' do
it 'returns value of inner future' do
f = Concurrent.future { Concurrent.future { 1 } }.flat.then(&:succ)
expect(f.value!).to eq 2
end

it 'propagates failure of inner future' do
err = StandardError.new('boo')
f = Concurrent.future { Concurrent.failed_future(err) }.flat
expect(f.reason).to eq err
end

it 'it propagates failure of the future which was suppose to provide inner future' do
f = Concurrent.future { raise 'boo' }.flat
expect(f.reason.message).to eq 'boo'
end

it 'fails if inner value is not a future' do
f = Concurrent.future { 'boo' }.flat
expect(f.reason).to be_an_instance_of TypeError

f = Concurrent.future { Concurrent.completed_event }.flat
expect(f.reason).to be_an_instance_of TypeError
end
end
end

Expand Down