Skip to content

Commit b3eff6d

Browse files
fossedihelmalvaroaleman
authored andcommitted
priority queue: properly sync the waiter manipulation
As described in #3363, there are some circumstances under which `GetWithPriority` is not returning the correct/expected element. This can happen when a `GetWithPriority` is executed and the `Ascend` of the queue is not completed yet, causing not all the items of the BTree to evaluate the same w.waiters.Load() value. Adding a lock to manipulate the waiters will solve the issue. Since the lock is required, there is no need to use an atomic.Int64 anymore. Signed-off-by: fossedihelm <[email protected]>
1 parent 88269f3 commit b3eff6d

File tree

2 files changed

+13
-6
lines changed

2 files changed

+13
-6
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct {
124124
get chan item[T]
125125

126126
// waiters is the number of routines blocked in Get, we use it to determine
127-
// if we can push items.
128-
waiters atomic.Int64
127+
// if we can push items. Every manipulation has to be protected with the lock.
128+
waiters int64
129129

130130
// Configurable for testing
131131
now func() time.Time
@@ -269,15 +269,15 @@ func (w *priorityqueue[T]) spin() {
269269
}
270270
}
271271

272-
if w.waiters.Load() == 0 {
272+
if w.waiters == 0 {
273273
// Have to keep iterating here to ensure we update metrics
274274
// for further items that became ready and set nextReady.
275275
return true
276276
}
277277

278278
w.metrics.get(item.Key, item.Priority)
279279
w.locked.Insert(item.Key)
280-
w.waiters.Add(-1)
280+
w.waiters--
281281
delete(w.items, item.Key)
282282
toDelete = append(toDelete, item)
283283
w.becameReady.Delete(item.Key)
@@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
316316
return zero, 0, true
317317
}
318318

319-
w.waiters.Add(1)
319+
w.lock.Lock()
320+
w.waiters++
321+
w.lock.Unlock()
320322

321323
w.notifyItemOrWaiterAdded()
322324

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,12 @@ var _ = Describe("Controllerworkqueue", func() {
378378
}()
379379

380380
// Verify the go routine above is now waiting for an item.
381-
Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1)))
381+
Eventually(func() int64 {
382+
q.(*priorityqueue[string]).lock.Lock()
383+
defer q.(*priorityqueue[string]).lock.Unlock()
384+
385+
return q.(*priorityqueue[string]).waiters
386+
}).Should(Equal(int64(1)))
382387
Consistently(getUnblocked).ShouldNot(BeClosed())
383388

384389
// shut down

0 commit comments

Comments
 (0)