Skip to content

Commit 877fc4a

Browse files
authored
Merge pull request #4224 from zac-nixon/main
refactor deferred reconciler to always process tgb on start up
2 parents 1dbb7bd + b58d2a9 commit 877fc4a

File tree

3 files changed

+60
-52
lines changed

3 files changed

+60
-52
lines changed

controllers/elbv2/targetgroupbinding_controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
146146
if deferred {
147147
r.deferredTargetGroupBindingReconciler.Enqueue(tgb)
148148
return nil
149+
} else {
150+
r.deferredTargetGroupBindingReconciler.MarkProcessed(tgb)
149151
}
150152

151153
updateTargetGroupBindingStatusFn := func() {

controllers/elbv2/targetgroupbinding_deferred_reconciler.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package controllers
22

33
import (
44
"context"
5+
"k8s.io/apimachinery/pkg/util/cache"
56
"math/rand"
7+
"sync"
68
"time"
79

810
"github.com/go-logr/logr"
@@ -28,39 +30,54 @@ const (
2830

2931
type DeferredTargetGroupBindingReconciler interface {
3032
Enqueue(tgb *elbv2api.TargetGroupBinding)
33+
MarkProcessed(tgb *elbv2api.TargetGroupBinding)
3134
Run()
3235
}
3336

3437
type deferredTargetGroupBindingReconcilerImpl struct {
3538
delayQueue workqueue.DelayingInterface
36-
syncPeriod time.Duration
3739
k8sClient client.Client
3840
logger logr.Logger
3941

42+
processedTGBCache *cache.Expiring
43+
processedTGBCacheTTL time.Duration
44+
processedTGBCacheMutex sync.RWMutex
45+
4046
delayedReconcileTime time.Duration
4147
maxJitter time.Duration
4248
}
4349

4450
func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler {
4551
return &deferredTargetGroupBindingReconcilerImpl{
46-
syncPeriod: syncPeriod,
47-
logger: logger,
48-
delayQueue: delayQueue,
49-
k8sClient: k8sClient,
52+
logger: logger,
53+
delayQueue: delayQueue,
54+
k8sClient: k8sClient,
55+
processedTGBCache: cache.NewExpiring(),
56+
processedTGBCacheTTL: syncPeriod,
5057

5158
delayedReconcileTime: defaultDelayedReconcileTime,
5259
maxJitter: defaultMaxJitter,
5360
}
5461
}
5562

63+
// Enqueue enqueues a TGB iff it's not been processed recently.
5664
func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) {
5765
nsn := k8s.NamespacedName(tgb)
58-
if d.isEligibleForDefer(tgb) {
66+
if !d.tgbInCache(tgb) {
5967
d.enqueue(nsn)
6068
d.logger.Info("enqueued new deferred TGB", "tgb", nsn.Name)
6169
}
6270
}
6371

72+
// MarkProcessed updates the local cache to signify that the TGB has been processed recently and is not eligible to be deferred.
73+
func (d *deferredTargetGroupBindingReconcilerImpl) MarkProcessed(tgb *elbv2api.TargetGroupBinding) {
74+
if d.tgbInCache(tgb) {
75+
return
76+
}
77+
d.updateCache(k8s.NamespacedName(tgb))
78+
}
79+
80+
// Run starts a loop to process deferred items off the delaying queue.
6481
func (d *deferredTargetGroupBindingReconcilerImpl) Run() {
6582
var item interface{}
6683
shutDown := false
@@ -87,12 +104,6 @@ func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.
87104
return
88105
}
89106

90-
// Re-check that this tgb hasn't been updated since it was enqueued
91-
if !d.isEligibleForDefer(tgb) {
92-
d.logger.Info("TGB not eligible for deferral", "tgb", nsn)
93-
return
94-
}
95-
96107
tgbOld := tgb.DeepCopy()
97108
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash)
98109

@@ -111,12 +122,16 @@ func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn t
111122
}
112123
}
113124

114-
func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool {
115-
then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0)
116-
return time.Now().Sub(then) > d.syncPeriod
125+
func (d *deferredTargetGroupBindingReconcilerImpl) tgbInCache(tgb *elbv2api.TargetGroupBinding) bool {
126+
d.processedTGBCacheMutex.RLock()
127+
defer d.processedTGBCacheMutex.RUnlock()
128+
129+
_, exists := d.processedTGBCache.Get(k8s.NamespacedName(tgb))
130+
return exists
117131
}
118132

119133
func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) {
134+
d.updateCache(nsn)
120135
delayedTime := d.jitterReconcileTime()
121136
d.delayQueue.AddAfter(nsn, delayedTime)
122137
}
@@ -129,3 +144,9 @@ func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Du
129144

130145
return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter)))
131146
}
147+
148+
func (d *deferredTargetGroupBindingReconcilerImpl) updateCache(nsn types.NamespacedName) {
149+
d.processedTGBCacheMutex.Lock()
150+
defer d.processedTGBCacheMutex.Unlock()
151+
d.processedTGBCache.Set(nsn, true, d.processedTGBCacheTTL)
152+
}

controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
99
"k8s.io/apimachinery/pkg/runtime"
1010
"k8s.io/apimachinery/pkg/types"
11+
"k8s.io/apimachinery/pkg/util/cache"
1112
"k8s.io/apimachinery/pkg/util/sets"
1213
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
1314
"k8s.io/client-go/util/workqueue"
1415
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
1516
"sigs.k8s.io/aws-load-balancer-controller/pkg/annotations"
17+
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
1618
testclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
1719
"sigs.k8s.io/controller-runtime/pkg/log"
1820
"strconv"
@@ -31,15 +33,14 @@ func TestDeferredReconcilerConstructor(t *testing.T) {
3133

3234
deferredReconciler := d.(*deferredTargetGroupBindingReconcilerImpl)
3335
assert.Equal(t, dq, deferredReconciler.delayQueue)
34-
assert.Equal(t, syncPeriod, deferredReconciler.syncPeriod)
3536
assert.Equal(t, k8sClient, deferredReconciler.k8sClient)
3637
assert.Equal(t, logger, deferredReconciler.logger)
3738
}
3839

3940
func TestDeferredReconcilerEnqueue(t *testing.T) {
40-
syncPeriod := 5 * time.Minute
4141
testCases := []struct {
4242
name string
43+
tgbInCache []*elbv2api.TargetGroupBinding
4344
tgbToEnqueue []*elbv2api.TargetGroupBinding
4445
expectedQueueEntries sets.Set[types.NamespacedName]
4546
}{
@@ -59,8 +60,8 @@ func TestDeferredReconcilerEnqueue(t *testing.T) {
5960
}),
6061
},
6162
{
62-
name: "sync period too short, do not enqueue",
63-
tgbToEnqueue: []*elbv2api.TargetGroupBinding{
63+
name: "item in cache, no enqueue",
64+
tgbInCache: []*elbv2api.TargetGroupBinding{
6465
{
6566
ObjectMeta: metav1.ObjectMeta{
6667
Name: "tgb1",
@@ -71,25 +72,18 @@ func TestDeferredReconcilerEnqueue(t *testing.T) {
7172
},
7273
},
7374
},
74-
expectedQueueEntries: make(sets.Set[types.NamespacedName]),
75-
},
76-
{
77-
name: "sync period too long, do enqueue",
7875
tgbToEnqueue: []*elbv2api.TargetGroupBinding{
7976
{
8077
ObjectMeta: metav1.ObjectMeta{
8178
Name: "tgb1",
8279
Namespace: "ns",
8380
Annotations: map[string]string{
84-
annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Add(-2*syncPeriod).Unix(), 10),
81+
annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10),
8582
},
8683
},
8784
},
8885
},
89-
expectedQueueEntries: sets.New(types.NamespacedName{
90-
Name: "tgb1",
91-
Namespace: "ns",
92-
}),
86+
expectedQueueEntries: make(sets.Set[types.NamespacedName]),
9387
},
9488
{
9589
name: "multiple tgb",
@@ -196,15 +190,20 @@ func TestDeferredReconcilerEnqueue(t *testing.T) {
196190
Build()
197191

198192
impl := deferredTargetGroupBindingReconcilerImpl{
199-
delayQueue: dq,
200-
syncPeriod: syncPeriod,
201-
k8sClient: k8sClient,
202-
logger: logr.New(&log.NullLogSink{}),
193+
delayQueue: dq,
194+
k8sClient: k8sClient,
195+
logger: logr.New(&log.NullLogSink{}),
196+
processedTGBCache: cache.NewExpiring(),
197+
processedTGBCacheTTL: 1 * time.Minute,
203198

204199
delayedReconcileTime: 0 * time.Millisecond,
205200
maxJitter: 0 * time.Millisecond,
206201
}
207202

203+
for _, tgb := range tc.tgbInCache {
204+
impl.processedTGBCache.Set(k8s.NamespacedName(tgb), true, time.Minute*1)
205+
}
206+
208207
for _, tgb := range tc.tgbToEnqueue {
209208
impl.Enqueue(tgb)
210209
}
@@ -261,10 +260,12 @@ func TestDeferredReconcilerRun(t *testing.T) {
261260

262261
impl := deferredTargetGroupBindingReconcilerImpl{
263262
delayQueue: dq,
264-
syncPeriod: 5 * time.Minute,
265263
k8sClient: k8sClient,
266264
logger: logr.New(&log.NullLogSink{}),
267265

266+
processedTGBCache: cache.NewExpiring(),
267+
processedTGBCacheTTL: 1 * time.Minute,
268+
268269
delayedReconcileTime: 0 * time.Millisecond,
269270
maxJitter: 0 * time.Millisecond,
270271
}
@@ -292,24 +293,6 @@ func TestHandleDeferredItem(t *testing.T) {
292293
Namespace: "ns",
293294
},
294295
},
295-
{
296-
name: "not eligible",
297-
nsn: types.NamespacedName{
298-
Name: "name",
299-
Namespace: "ns",
300-
},
301-
storedTGB: &elbv2api.TargetGroupBinding{
302-
ObjectMeta: metav1.ObjectMeta{
303-
Name: "name",
304-
Namespace: "ns",
305-
Annotations: map[string]string{
306-
annotations.AnnotationCheckPoint: "foo",
307-
annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10),
308-
},
309-
},
310-
},
311-
expectedCheckPoint: aws.String("foo"),
312-
},
313296
{
314297
name: "eligible",
315298
nsn: types.NamespacedName{
@@ -355,10 +338,12 @@ func TestHandleDeferredItem(t *testing.T) {
355338

356339
impl := deferredTargetGroupBindingReconcilerImpl{
357340
delayQueue: dq,
358-
syncPeriod: syncPeriod,
359341
k8sClient: k8sClient,
360342
logger: logr.New(&log.NullLogSink{}),
361343

344+
processedTGBCache: cache.NewExpiring(),
345+
processedTGBCacheTTL: 1 * time.Minute,
346+
362347
delayedReconcileTime: 0 * time.Millisecond,
363348
maxJitter: 0 * time.Millisecond,
364349
}

0 commit comments

Comments
 (0)