Skip to content

Commit 2da2e3c

Browse files
Merge pull request #107826 from smarterclayton/context_wait
wait: Introduce new methods that allow detection of context cancellation Kubernetes-commit: 5469b170fe8717cb9fae8f12498cd1afd5586891
2 parents 1281665 + 831cf05 commit 2da2e3c

File tree

10 files changed

+1537
-166
lines changed

10 files changed

+1537
-166
lines changed

pkg/util/wait/backoff.go

Lines changed: 188 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package wait
1919
import (
2020
"context"
2121
"math"
22+
"sync"
2223
"time"
2324

2425
"k8s.io/apimachinery/pkg/util/runtime"
@@ -51,33 +52,104 @@ type Backoff struct {
5152
Cap time.Duration
5253
}
5354

54-
// Step (1) returns an amount of time to sleep determined by the
55-
// original Duration and Jitter and (2) mutates the provided Backoff
56-
// to update its Steps and Duration.
55+
// Step returns an amount of time to sleep determined by the original
56+
// Duration and Jitter. The backoff is mutated to update its Steps and
57+
// Duration. A nil Backoff always has a zero-duration step.
5758
func (b *Backoff) Step() time.Duration {
58-
if b.Steps < 1 {
59-
if b.Jitter > 0 {
60-
return Jitter(b.Duration, b.Jitter)
61-
}
62-
return b.Duration
59+
if b == nil {
60+
return 0
6361
}
64-
b.Steps--
62+
var nextDuration time.Duration
63+
nextDuration, b.Duration, b.Steps = delay(b.Steps, b.Duration, b.Cap, b.Factor, b.Jitter)
64+
return nextDuration
65+
}
6566

67+
// DelayFunc returns a function that will compute the next interval to
68+
// wait given the arguments in b. It does not mutate the original backoff
69+
// but the function is safe to use only from a single goroutine.
70+
func (b Backoff) DelayFunc() DelayFunc {
71+
steps := b.Steps
6672
duration := b.Duration
73+
cap := b.Cap
74+
factor := b.Factor
75+
jitter := b.Jitter
76+
77+
return func() time.Duration {
78+
var nextDuration time.Duration
79+
// jitter is applied per step and is not cumulative over multiple steps
80+
nextDuration, duration, steps = delay(steps, duration, cap, factor, jitter)
81+
return nextDuration
82+
}
83+
}
6784

68-
// calculate the next step
69-
if b.Factor != 0 {
70-
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
71-
if b.Cap > 0 && b.Duration > b.Cap {
72-
b.Duration = b.Cap
73-
b.Steps = 0
85+
// Timer returns a timer implementation appropriate to this backoff's parameters
86+
// for use with wait functions.
87+
func (b Backoff) Timer() Timer {
88+
if b.Steps > 1 || b.Jitter != 0 {
89+
return &variableTimer{new: internalClock.NewTimer, fn: b.DelayFunc()}
90+
}
91+
if b.Duration > 0 {
92+
return &fixedTimer{new: internalClock.NewTicker, interval: b.Duration}
93+
}
94+
return newNoopTimer()
95+
}
96+
97+
// delay implements the core delay algorithm used in this package.
98+
func delay(steps int, duration, cap time.Duration, factor, jitter float64) (_ time.Duration, next time.Duration, nextSteps int) {
99+
// when steps is non-positive, do not alter the base duration
100+
if steps < 1 {
101+
if jitter > 0 {
102+
return Jitter(duration, jitter), duration, 0
74103
}
104+
return duration, duration, 0
105+
}
106+
steps--
107+
108+
// calculate the next step's interval
109+
if factor != 0 {
110+
next = time.Duration(float64(duration) * factor)
111+
if cap > 0 && next > cap {
112+
next = cap
113+
steps = 0
114+
}
115+
} else {
116+
next = duration
75117
}
76118

77-
if b.Jitter > 0 {
78-
duration = Jitter(duration, b.Jitter)
119+
// add jitter for this step
120+
if jitter > 0 {
121+
duration = Jitter(duration, jitter)
79122
}
80-
return duration
123+
124+
return duration, next, steps
125+
126+
}
127+
128+
// DelayWithReset returns a DelayFunc that will return the appropriate next interval to
129+
// wait. Every resetInterval the backoff parameters are reset to their initial state.
130+
// This method is safe to invoke from multiple goroutines, but all calls will advance
131+
// the backoff state when Factor is set. If Factor is zero, this method is the same as
132+
// invoking b.DelayFunc() since Steps has no impact without Factor. If resetInterval is
133+
// zero no backoff will be performed as the same calling DelayFunc with a zero factor
134+
// and steps.
135+
func (b Backoff) DelayWithReset(c clock.Clock, resetInterval time.Duration) DelayFunc {
136+
if b.Factor <= 0 {
137+
return b.DelayFunc()
138+
}
139+
if resetInterval <= 0 {
140+
b.Steps = 0
141+
b.Factor = 0
142+
return b.DelayFunc()
143+
}
144+
return (&backoffManager{
145+
backoff: b,
146+
initialBackoff: b,
147+
resetInterval: resetInterval,
148+
149+
clock: c,
150+
lastStart: c.Now(),
151+
timer: nil,
152+
}).Step
81153
}
82154

83155
// Until loops until stop channel is closed, running f every period.
@@ -187,15 +259,65 @@ func JitterUntilWithContext(ctx context.Context, f func(context.Context), period
187259
JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
188260
}
189261

190-
// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
191-
// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
192-
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
193-
// undetermined behavior.
194-
// The BackoffManager is supposed to be called in a single-threaded environment.
262+
// backoffManager provides simple backoff behavior in a threadsafe manner to a caller.
263+
type backoffManager struct {
264+
backoff Backoff
265+
initialBackoff Backoff
266+
resetInterval time.Duration
267+
268+
clock clock.Clock
269+
270+
lock sync.Mutex
271+
lastStart time.Time
272+
timer clock.Timer
273+
}
274+
275+
// Step returns the expected next duration to wait.
276+
func (b *backoffManager) Step() time.Duration {
277+
b.lock.Lock()
278+
defer b.lock.Unlock()
279+
280+
switch {
281+
case b.resetInterval == 0:
282+
b.backoff = b.initialBackoff
283+
case b.clock.Now().Sub(b.lastStart) > b.resetInterval:
284+
b.backoff = b.initialBackoff
285+
b.lastStart = b.clock.Now()
286+
}
287+
return b.backoff.Step()
288+
}
289+
290+
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer
291+
// for exponential backoff. The returned timer must be drained before calling Backoff() the second
292+
// time.
293+
func (b *backoffManager) Backoff() clock.Timer {
294+
b.lock.Lock()
295+
defer b.lock.Unlock()
296+
if b.timer == nil {
297+
b.timer = b.clock.NewTimer(b.Step())
298+
} else {
299+
b.timer.Reset(b.Step())
300+
}
301+
return b.timer
302+
}
303+
304+
// Timer returns a new Timer instance that shares the clock and the reset behavior with all other
305+
// timers.
306+
func (b *backoffManager) Timer() Timer {
307+
return DelayFunc(b.Step).Timer(b.clock)
308+
}
309+
310+
// BackoffManager manages backoff with a particular scheme based on its underlying implementation.
195311
type BackoffManager interface {
312+
// Backoff returns a shared clock.Timer that is Reset on every invocation. This method is not
313+
// safe for use from multiple threads. It returns a timer for backoff, and caller shall backoff
314+
// until Timer.C() drains. If the second Backoff() is called before the timer from the first
315+
// Backoff() call finishes, the first timer will NOT be drained and result in undetermined
316+
// behavior.
196317
Backoff() clock.Timer
197318
}
198319

320+
// Deprecated: Will be removed when the legacy polling functions are removed.
199321
type exponentialBackoffManagerImpl struct {
200322
backoff *Backoff
201323
backoffTimer clock.Timer
@@ -208,6 +330,27 @@ type exponentialBackoffManagerImpl struct {
208330
// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
209331
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
210332
// This backoff manager is used to reduce load during upstream unhealthiness.
333+
//
334+
// Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
335+
// Backoff struct, use DelayWithReset() to get a DelayFunc that periodically resets itself, and then
336+
// invoke Timer() when calling wait.BackoffUntil.
337+
//
338+
// Instead of:
339+
//
340+
// bm := wait.NewExponentialBackoffManager(init, max, reset, factor, jitter, clock)
341+
// ...
342+
// wait.BackoffUntil(..., bm.Backoff, ...)
343+
//
344+
// Use:
345+
//
346+
// delayFn := wait.Backoff{
347+
// Duration: init,
348+
// Cap: max,
349+
// Steps: int(math.Ceil(float64(max) / float64(init))), // now a required argument
350+
// Factor: factor,
351+
// Jitter: jitter,
352+
// }.DelayWithReset(reset, clock)
353+
// wait.BackoffUntil(..., delayFn.Timer(), ...)
211354
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
212355
return &exponentialBackoffManagerImpl{
213356
backoff: &Backoff{
@@ -248,6 +391,7 @@ func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
248391
return b.backoffTimer
249392
}
250393

394+
// Deprecated: Will be removed when the legacy polling functions are removed.
251395
type jitteredBackoffManagerImpl struct {
252396
clock clock.Clock
253397
duration time.Duration
@@ -257,6 +401,19 @@ type jitteredBackoffManagerImpl struct {
257401

258402
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
259403
// is negative, backoff will not be jittered.
404+
//
405+
// Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
406+
// Backoff struct and invoke Timer() when calling wait.BackoffUntil.
407+
//
408+
// Instead of:
409+
//
410+
// bm := wait.NewJitteredBackoffManager(duration, jitter, clock)
411+
// ...
412+
// wait.BackoffUntil(..., bm.Backoff, ...)
413+
//
414+
// Use:
415+
//
416+
// wait.BackoffUntil(..., wait.Backoff{Duration: duration, Jitter: jitter}.Timer(), ...)
260417
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
261418
return &jitteredBackoffManagerImpl{
262419
clock: c,
@@ -296,6 +453,9 @@ func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
296453
// 3. a sleep truncated by the cap on duration has been completed.
297454
// In case (1) the returned error is what the condition function returned.
298455
// In all other cases, ErrWaitTimeout is returned.
456+
//
457+
// Since backoffs are often subject to cancellation, we recommend using
458+
// ExponentialBackoffWithContext and passing a context to the method.
299459
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
300460
for backoff.Steps > 0 {
301461
if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
@@ -309,8 +469,11 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
309469
return ErrWaitTimeout
310470
}
311471

312-
// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never
313-
// exceeds the deadline specified by the request context.
472+
// ExponentialBackoffWithContext repeats a condition check with exponential backoff.
473+
// It immediately returns an error if the condition returns an error, the context is cancelled
474+
// or hits the deadline, or if the maximum attempts defined in backoff is exceeded (ErrWaitTimeout).
475+
// If an error is returned by the condition the backoff stops immediately. The condition will
476+
// never be invoked more than backoff.Steps times.
314477
func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
315478
for backoff.Steps > 0 {
316479
select {

pkg/util/wait/delay.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package wait
18+
19+
import (
20+
"context"
21+
"sync"
22+
"time"
23+
24+
"k8s.io/utils/clock"
25+
)
26+
27+
// DelayFunc returns the next time interval to wait.
28+
type DelayFunc func() time.Duration
29+
30+
// Timer takes an arbitrary delay function and returns a timer that can handle arbitrary interval changes.
31+
// Use Backoff{...}.Timer() for simple delays and more efficient timers.
32+
func (fn DelayFunc) Timer(c clock.Clock) Timer {
33+
return &variableTimer{fn: fn, new: c.NewTimer}
34+
}
35+
36+
// Until takes an arbitrary delay function and runs until cancelled or the condition indicates exit. This
37+
// offers all of the functionality of the methods in this package.
38+
func (fn DelayFunc) Until(ctx context.Context, immediate, sliding bool, condition ConditionWithContextFunc) error {
39+
return loopConditionUntilContext(ctx, &variableTimer{fn: fn, new: internalClock.NewTimer}, immediate, sliding, condition)
40+
}
41+
42+
// Concurrent returns a version of this DelayFunc that is safe for use by multiple goroutines that
43+
// wish to share a single delay timer.
44+
func (fn DelayFunc) Concurrent() DelayFunc {
45+
var lock sync.Mutex
46+
return func() time.Duration {
47+
lock.Lock()
48+
defer lock.Unlock()
49+
return fn()
50+
}
51+
}

0 commit comments

Comments
 (0)