Skip to content

Commit 8484597

Browse files
committed
improved MVar to deal with spurious wake ups and fixed a potential issue in empty? and full? methods
1 parent f98d93a commit 8484597

File tree

3 files changed

+141
-39
lines changed

3 files changed

+141
-39
lines changed

lib/concurrent/mvar.rb

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,17 @@ class MVar
1212
def initialize(value = EMPTY, opts = {})
1313
@value = value
1414
@mutex = Mutex.new
15-
@empty_condition = ConditionVariable.new
16-
@full_condition = ConditionVariable.new
15+
@empty_condition = Condition.new
16+
@full_condition = Condition.new
1717
set_deref_options(opts)
1818
end
1919

2020
def take(timeout = nil)
2121
@mutex.synchronize do
22-
# If the value isn't empty, wait for full to be signalled
23-
@full_condition.wait(@mutex, timeout) if empty?
22+
wait_for_full(timeout)
2423

2524
# If we timed out we'll still be empty
26-
if full?
25+
if unlocked_full?
2726
value = @value
2827
@value = EMPTY
2928
@empty_condition.signal
@@ -36,11 +35,10 @@ def take(timeout = nil)
3635

3736
def put(value, timeout = nil)
3837
@mutex.synchronize do
39-
# Unless the value is empty, wait for empty to be signalled
40-
@empty_condition.wait(@mutex, timeout) if full?
38+
wait_for_empty(timeout)
4139

4240
# If we timed out we won't be empty
43-
if empty?
41+
if unlocked_empty?
4442
@value = value
4543
@full_condition.signal
4644
apply_deref_options(value)
@@ -54,11 +52,10 @@ def modify(timeout = nil)
5452
raise ArgumentError.new('no block given') unless block_given?
5553

5654
@mutex.synchronize do
57-
# If the value isn't empty, wait for full to be signalled
58-
@full_condition.wait(@mutex, timeout) if empty?
55+
wait_for_full(timeout)
5956

6057
# If we timed out we'll still be empty
61-
if full?
58+
if unlocked_full?
6259
value = @value
6360
@value = yield value
6461
@full_condition.signal
@@ -71,7 +68,7 @@ def modify(timeout = nil)
7168

7269
def try_take!
7370
@mutex.synchronize do
74-
if full?
71+
if unlocked_full?
7572
value = @value
7673
@value = EMPTY
7774
@empty_condition.signal
@@ -84,7 +81,7 @@ def try_take!
8481

8582
def try_put!(value)
8683
@mutex.synchronize do
87-
if empty?
84+
if unlocked_empty?
8885
@value = value
8986
@full_condition.signal
9087
true
@@ -103,13 +100,13 @@ def set!(value)
103100
end
104101
end
105102

106-
def modify!(timeout = nil)
103+
def modify!
107104
raise ArgumentError.new('no block given') unless block_given?
108105

109106
@mutex.synchronize do
110107
value = @value
111108
@value = yield value
112-
if @value == EMPTY
109+
if unlocked_empty?
113110
@empty_condition.signal
114111
else
115112
@full_condition.signal
@@ -119,13 +116,37 @@ def modify!(timeout = nil)
119116
end
120117

121118
def empty?
122-
@value == EMPTY
119+
@mutex.synchronize { @value == EMPTY }
123120
end
124121

125122
def full?
126123
not empty?
127124
end
128125

126+
private
127+
128+
def unlocked_empty?
129+
@value == EMPTY
130+
end
131+
132+
def unlocked_full?
133+
! unlocked_empty?
134+
end
135+
136+
def wait_for_full(timeout)
137+
remaining = Condition::Result.new(timeout)
138+
while unlocked_empty? && remaining.can_wait?
139+
remaining = @full_condition.wait(@mutex, remaining.remaining_time)
140+
end
141+
end
142+
143+
def wait_for_empty(timeout)
144+
remaining = Condition::Result.new(timeout)
145+
while unlocked_full? && remaining.can_wait?
146+
remaining = @empty_condition.wait(@mutex, remaining.remaining_time)
147+
end
148+
end
149+
129150
end
130151

131152
end

spec/concurrent/event_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ module Concurrent
148148
context 'spurious wake ups' do
149149

150150
before(:each) do
151-
def subject.wake_up
151+
def subject.simulate_spurious_wake_up
152152
@mutex.synchronize do
153153
@condition.signal
154154
@condition.broadcast
@@ -161,7 +161,7 @@ def subject.wake_up
161161
Thread.new { subject.wait; @expected = true }
162162

163163
sleep(0.1)
164-
subject.wake_up
164+
subject.simulate_spurious_wake_up
165165

166166
sleep(0.1)
167167
@expected.should be_false
@@ -172,7 +172,7 @@ def subject.wake_up
172172
Thread.new { subject.wait(0.5); @expected = true }
173173

174174
sleep(0.1)
175-
subject.wake_up
175+
subject.simulate_spurious_wake_up
176176

177177
sleep(0.1)
178178
@expected.should be_false

spec/concurrent/mvar_spec.rb

Lines changed: 101 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'spec_helper'
2+
require_relative 'dereferenceable_shared'
23

34
module Concurrent
45

@@ -16,7 +17,7 @@ def dereferenceable_subject(value, opts = {})
1617

1718
end
1819

19-
context '#initialize' do
20+
describe '#initialize' do
2021

2122
it 'accepts no initial value' do
2223
m = MVar.new
@@ -40,7 +41,7 @@ def dereferenceable_subject(value, opts = {})
4041

4142
end
4243

43-
context '#take' do
44+
describe '#take' do
4445

4546
it 'sets the MVar to empty' do
4647
m = MVar.new(14)
@@ -57,21 +58,21 @@ def dereferenceable_subject(value, opts = {})
5758
m = MVar.new
5859

5960
putter = Thread.new {
60-
sleep(0.5)
61-
m.put 14
61+
sleep(0.1)
62+
m.put 14
6263
}
6364

6465
m.take.should eq 14
6566
end
6667

6768
it 'returns TIMEOUT on timeout on an empty MVar' do
6869
m = MVar.new
69-
m.take(0.5).should eq MVar::TIMEOUT
70+
m.take(0.1).should eq MVar::TIMEOUT
7071
end
7172

7273
end
7374

74-
context '#put' do
75+
describe '#put' do
7576

7677
it 'sets the MVar to be empty' do
7778
m = MVar.new(14)
@@ -89,7 +90,7 @@ def dereferenceable_subject(value, opts = {})
8990
m = MVar.new(14)
9091

9192
putter = Thread.new {
92-
sleep(0.5)
93+
sleep(0.1)
9394
m.take
9495
}
9596

@@ -98,7 +99,7 @@ def dereferenceable_subject(value, opts = {})
9899

99100
it 'returns TIMEOUT on timeout on a full MVar' do
100101
m = MVar.new(14)
101-
m.put(14, 0.5).should eq MVar::TIMEOUT
102+
m.put(14, 0.1).should eq MVar::TIMEOUT
102103
end
103104

104105
it 'returns the value' do
@@ -108,7 +109,7 @@ def dereferenceable_subject(value, opts = {})
108109

109110
end
110111

111-
context '#empty?' do
112+
describe '#empty?' do
112113

113114
it 'returns true on an empty MVar' do
114115
m = MVar.new
@@ -122,7 +123,7 @@ def dereferenceable_subject(value, opts = {})
122123

123124
end
124125

125-
context '#full?' do
126+
describe '#full?' do
126127

127128
it 'returns false on an empty MVar' do
128129
m = MVar.new
@@ -136,7 +137,7 @@ def dereferenceable_subject(value, opts = {})
136137

137138
end
138139

139-
context '#modify' do
140+
describe '#modify' do
140141

141142
it 'raises an exception when no block given' do
142143
m = MVar.new(14)
@@ -158,7 +159,7 @@ def dereferenceable_subject(value, opts = {})
158159
m = MVar.new
159160

160161
putter = Thread.new {
161-
sleep(0.5)
162+
sleep(0.1)
162163
m.put 14
163164
}
164165

@@ -173,24 +174,24 @@ def dereferenceable_subject(value, opts = {})
173174

174175
modifier = Thread.new {
175176
m.modify do |v|
176-
sleep(1)
177+
sleep(0.5)
177178
1
178179
end
179180
}
180181

181-
sleep(0.5)
182-
m.put(2, 1).should eq MVar::TIMEOUT
182+
sleep(0.1)
183+
m.put(2, 0.5).should eq MVar::TIMEOUT
183184
m.take.should eq 1
184185
end
185186

186187
it 'returns TIMEOUT on timeout on an empty MVar' do
187188
m = MVar.new
188-
m.modify(0.5){ |v| v + 2 }.should eq MVar::TIMEOUT
189+
m.modify(0.1){ |v| v + 2 }.should eq MVar::TIMEOUT
189190
end
190191

191192
end
192193

193-
context '#try_put!' do
194+
describe '#try_put!' do
194195

195196
it 'returns true an empty MVar' do
196197
m = MVar.new
@@ -210,7 +211,7 @@ def dereferenceable_subject(value, opts = {})
210211

211212
end
212213

213-
context '#try_take!' do
214+
describe '#try_take!' do
214215

215216
it 'returns EMPTY an empty MVar' do
216217
m = MVar.new
@@ -230,7 +231,7 @@ def dereferenceable_subject(value, opts = {})
230231

231232
end
232233

233-
context '#set!' do
234+
describe '#set!' do
234235

235236
it 'sets an empty MVar to be full' do
236237
m = MVar.new
@@ -257,7 +258,7 @@ def dereferenceable_subject(value, opts = {})
257258

258259
end
259260

260-
context '#modify!' do
261+
describe '#modify!' do
261262

262263
it 'raises an exception when no block given' do
263264
m = MVar.new(14)
@@ -295,6 +296,86 @@ def dereferenceable_subject(value, opts = {})
295296

296297
end
297298

299+
context 'spurious wake ups' do
300+
301+
let(:m) { MVar.new }
302+
303+
before(:each) do
304+
def m.simulate_spurious_wake_up
305+
@mutex.synchronize do
306+
@full_condition.broadcast
307+
@empty_condition.broadcast
308+
end
309+
end
310+
end
311+
312+
describe '#take' do
313+
it 'waits for another thread to #put' do
314+
Thread.new { sleep(0.5); m.put 14 }
315+
Thread.new { sleep(0.1); m.simulate_spurious_wake_up }
316+
317+
m.take.should eq 14
318+
end
319+
320+
it 'returns TIMEOUT on timeout on an empty MVar' do
321+
result = nil
322+
Thread.new { result = m.take(0.3) }
323+
sleep(0.1)
324+
Thread.new { m.simulate_spurious_wake_up }
325+
sleep(0.1)
326+
result.should be_nil
327+
sleep(0.2)
328+
result.should eq MVar::TIMEOUT
329+
end
330+
end
331+
332+
describe '#modify' do
333+
334+
it 'waits for another thread to #put' do
335+
Thread.new { sleep(0.5); m.put 14 }
336+
Thread.new { sleep(0.1); m.simulate_spurious_wake_up }
337+
338+
m.modify{ |v| v + 2 }.should eq 14
339+
end
340+
341+
it 'returns TIMEOUT on timeout on an empty MVar' do
342+
result = nil
343+
Thread.new { result = m.modify(0.3){ |v| v + 2 } }
344+
sleep(0.1)
345+
Thread.new { m.simulate_spurious_wake_up }
346+
sleep(0.1)
347+
result.should be_nil
348+
sleep(0.2)
349+
result.should eq MVar::TIMEOUT
350+
end
351+
end
352+
353+
describe '#put' do
354+
355+
before(:each) { m.put(42) }
356+
357+
it 'waits for another thread to #take' do
358+
Thread.new { sleep(0.5); m.take }
359+
Thread.new { sleep(0.1); m.simulate_spurious_wake_up }
360+
361+
m.put(14).should eq 14
362+
end
363+
364+
it 'returns TIMEOUT on timeout on a full MVar' do
365+
result = nil
366+
Thread.new { result = m.put(14, 0.3) }
367+
sleep(0.1)
368+
Thread.new { m.simulate_spurious_wake_up }
369+
sleep(0.1)
370+
result.should be_nil
371+
sleep(0.2)
372+
result.should eq MVar::TIMEOUT
373+
end
374+
end
375+
376+
377+
end
378+
298379
end
299380

300381
end

0 commit comments

Comments
 (0)