Skip to content

Commit 05b97ef

Browse files
authored
Merge pull request #7900 from omerap12/ctx-propagation-recommender
Refactor recommender to use context
2 parents 8657345 + 10c2a35 commit 05b97ef

File tree

6 files changed

+51
-39
lines changed

6 files changed

+51
-39
lines changed

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type ClusterStateFeeder interface {
6161
InitFromHistoryProvider(historyProvider history.HistoryProvider)
6262

6363
// InitFromCheckpoints loads historical checkpoints into clusterState.
64-
InitFromCheckpoints()
64+
InitFromCheckpoints(ctx context.Context)
6565

6666
// LoadVPAs updates clusterState with current state of VPAs.
6767
LoadVPAs(ctx context.Context)
@@ -70,10 +70,10 @@ type ClusterStateFeeder interface {
7070
LoadPods()
7171

7272
// LoadRealTimeMetrics updates clusterState with current usage metrics of containers.
73-
LoadRealTimeMetrics()
73+
LoadRealTimeMetrics(ctx context.Context)
7474

7575
// GarbageCollectCheckpoints removes historical checkpoints that don't have a matching VPA.
76-
GarbageCollectCheckpoints()
76+
GarbageCollectCheckpoints(ctx context.Context)
7777
}
7878

7979
// ClusterStateFeederFactory makes instances of ClusterStateFeeder.
@@ -113,26 +113,32 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
113113
}
114114

115115
// WatchEvictionEventsWithRetries watches new Events with reason=Evicted and passes them to the observer.
116-
func WatchEvictionEventsWithRetries(kubeClient kube_client.Interface, observer oom.Observer, namespace string) {
116+
func WatchEvictionEventsWithRetries(ctx context.Context, kubeClient kube_client.Interface, observer oom.Observer, namespace string) {
117117
go func() {
118118
options := metav1.ListOptions{
119119
FieldSelector: "reason=Evicted",
120120
}
121121

122122
watchEvictionEventsOnce := func() {
123-
watchInterface, err := kubeClient.CoreV1().Events(namespace).Watch(context.TODO(), options)
123+
watchInterface, err := kubeClient.CoreV1().Events(namespace).Watch(ctx, options)
124124
if err != nil {
125125
klog.ErrorS(err, "Cannot initialize watching events")
126126
return
127127
}
128+
defer watchInterface.Stop()
128129
watchEvictionEvents(watchInterface.ResultChan(), observer)
129130
}
130131
for {
131-
watchEvictionEventsOnce()
132-
// Wait between attempts, retrying too often breaks API server.
133-
waitTime := wait.Jitter(evictionWatchRetryWait, evictionWatchJitterFactor)
134-
klog.V(1).InfoS("An attempt to watch eviction events finished", "waitTime", waitTime)
135-
time.Sleep(waitTime)
132+
select {
133+
case <-ctx.Done():
134+
return
135+
default:
136+
watchEvictionEventsOnce()
137+
// Wait between attempts, retrying too often breaks API server.
138+
waitTime := wait.Jitter(evictionWatchRetryWait, evictionWatchJitterFactor)
139+
klog.V(1).InfoS("An attempt to watch eviction events finished", "waitTime", waitTime)
140+
time.Sleep(waitTime)
141+
}
136142
}
137143
}()
138144
}
@@ -188,10 +194,10 @@ func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.
188194
}
189195

190196
// NewPodListerAndOOMObserver creates pair of pod lister and OOM observer.
191-
func NewPodListerAndOOMObserver(kubeClient kube_client.Interface, namespace string, stopCh <-chan struct{}) (v1lister.PodLister, oom.Observer) {
197+
func NewPodListerAndOOMObserver(ctx context.Context, kubeClient kube_client.Interface, namespace string, stopCh <-chan struct{}) (v1lister.PodLister, oom.Observer) {
192198
oomObserver := oom.NewObserver()
193199
podLister := newPodClients(kubeClient, oomObserver, namespace, stopCh)
194-
WatchEvictionEventsWithRetries(kubeClient, oomObserver, namespace)
200+
WatchEvictionEventsWithRetries(ctx, kubeClient, oomObserver, namespace)
195201
return podLister, oomObserver
196202
}
197203

@@ -258,9 +264,9 @@ func (feeder *clusterStateFeeder) setVpaCheckpoint(checkpoint *vpa_types.Vertica
258264
return nil
259265
}
260266

261-
func (feeder *clusterStateFeeder) InitFromCheckpoints() {
267+
func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) {
262268
klog.V(3).InfoS("Initializing VPA from checkpoints")
263-
feeder.LoadVPAs(context.TODO())
269+
feeder.LoadVPAs(ctx)
264270

265271
namespaces := make(map[string]bool)
266272
for _, v := range feeder.clusterState.VPAs() {
@@ -269,7 +275,7 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints() {
269275

270276
for namespace := range namespaces {
271277
klog.V(3).InfoS("Fetching checkpoints", "namespace", namespace)
272-
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(context.TODO(), metav1.ListOptions{})
278+
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
273279
if err != nil {
274280
klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace)
275281
}
@@ -285,7 +291,7 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints() {
285291
}
286292
}
287293

288-
func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
294+
func (feeder *clusterStateFeeder) GarbageCollectCheckpoints(ctx context.Context) {
289295
klog.V(3).InfoS("Starting garbage collection of checkpoints")
290296

291297
allVPAKeys := map[model.VpaID]bool{}
@@ -303,7 +309,7 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
303309
allVPAKeys[vpaID] = true
304310
}
305311

306-
namespaceList, err := feeder.coreClient.Namespaces().List(context.TODO(), metav1.ListOptions{})
312+
namespaceList, err := feeder.coreClient.Namespaces().List(ctx, metav1.ListOptions{})
307313
if err != nil {
308314
klog.ErrorS(err, "Cannot list namespaces")
309315
return
@@ -319,7 +325,7 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
319325
klog.V(3).InfoS("Skipping namespace; it does not meet cleanup criteria", "namespace", namespace, "vpaObjectNamespace", feeder.vpaObjectNamespace, "ignoredNamespaces", feeder.ignoredNamespaces)
320326
continue
321327
}
322-
err := feeder.cleanupCheckpointsForNamespace(namespace, allVPAKeys)
328+
err := feeder.cleanupCheckpointsForNamespace(ctx, namespace, allVPAKeys)
323329
if err != nil {
324330
klog.ErrorS(err, "error cleanining checkpoints")
325331
}
@@ -338,16 +344,16 @@ func (feeder *clusterStateFeeder) shouldIgnoreNamespace(namespace string) bool {
338344
return false
339345
}
340346

341-
func (feeder *clusterStateFeeder) cleanupCheckpointsForNamespace(namespace string, allVPAKeys map[model.VpaID]bool) error {
347+
func (feeder *clusterStateFeeder) cleanupCheckpointsForNamespace(ctx context.Context, namespace string, allVPAKeys map[model.VpaID]bool) error {
342348
var err error
343-
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(context.TODO(), metav1.ListOptions{})
349+
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
344350
if err != nil {
345351
return err
346352
}
347353
for _, checkpoint := range checkpointList.Items {
348354
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
349355
if !allVPAKeys[vpaID] {
350-
if errFeeder := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}); errFeeder != nil {
356+
if errFeeder := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(ctx, checkpoint.Name, metav1.DeleteOptions{}); errFeeder != nil {
351357
err = fmt.Errorf("failed to delete orphaned checkpoint %s: %w", klog.KRef(namespace, checkpoint.Name), err)
352358
continue
353359
}
@@ -484,8 +490,8 @@ func (feeder *clusterStateFeeder) LoadPods() {
484490
}
485491
}
486492

487-
func (feeder *clusterStateFeeder) LoadRealTimeMetrics() {
488-
containersMetrics, err := feeder.metricsClient.GetContainersMetrics()
493+
func (feeder *clusterStateFeeder) LoadRealTimeMetrics(ctx context.Context) {
494+
containersMetrics, err := feeder.metricsClient.GetContainersMetrics(ctx)
489495
if err != nil {
490496
klog.ErrorS(err, "Cannot get ContainerMetricsSnapshot from MetricsClient")
491497
}

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/golang/mock/gomock"
2626
"github.com/stretchr/testify/assert"
2727
core "k8s.io/client-go/testing"
28+
"k8s.io/klog/v2/ktesting"
2829

2930
autoscalingv1 "k8s.io/api/autoscaling/v1"
3031
v1 "k8s.io/api/core/v1"
@@ -831,9 +832,10 @@ func TestFilterVPAsIgnoreNamespaces(t *testing.T) {
831832
}
832833

833834
func TestCanCleanupCheckpoints(t *testing.T) {
835+
_, tctx := ktesting.NewTestContext(t)
834836
client := fake.NewSimpleClientset()
835837

836-
_, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "testNamespace"}}, metav1.CreateOptions{})
838+
_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "testNamespace"}}, metav1.CreateOptions{})
837839
assert.NoError(t, err)
838840

839841
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace("testNamespace").WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
@@ -898,7 +900,7 @@ func TestCanCleanupCheckpoints(t *testing.T) {
898900
recommenderName: "default",
899901
}
900902

901-
feeder.GarbageCollectCheckpoints()
903+
feeder.GarbageCollectCheckpoints(tctx)
902904

903905
assert.Contains(t, deletedCheckpoints, "nonExistentVPA")
904906

vertical-pod-autoscaler/pkg/recommender/input/metrics/metrics_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type ContainerMetricsSnapshot struct {
4545
type MetricsClient interface {
4646
// GetContainersMetrics returns an array of ContainerMetricsSnapshots,
4747
// representing resource usage for every running container in the cluster
48-
GetContainersMetrics() ([]*ContainerMetricsSnapshot, error)
48+
GetContainersMetrics(ctx context.Context) ([]*ContainerMetricsSnapshot, error)
4949
}
5050

5151
type metricsClient struct {
@@ -64,10 +64,10 @@ func NewMetricsClient(source PodMetricsLister, namespace, clientName string) Met
6464
}
6565
}
6666

67-
func (c *metricsClient) GetContainersMetrics() ([]*ContainerMetricsSnapshot, error) {
67+
func (c *metricsClient) GetContainersMetrics(ctx context.Context) ([]*ContainerMetricsSnapshot, error) {
6868
var metricsSnapshots []*ContainerMetricsSnapshot
6969

70-
podMetricsList, err := c.source.List(context.TODO(), c.namespace, metav1.ListOptions{})
70+
podMetricsList, err := c.source.List(ctx, c.namespace, metav1.ListOptions{})
7171
recommender_metrics.RecordMetricsServerResponse(err, c.clientName)
7272
if err != nil {
7373
return nil, err

vertical-pod-autoscaler/pkg/recommender/input/metrics/metrics_client_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,26 @@ import (
2020
"testing"
2121

2222
"github.com/stretchr/testify/assert"
23+
"k8s.io/klog/v2/ktesting"
2324
)
2425

2526
func TestGetContainersMetricsReturnsEmptyList(t *testing.T) {
27+
_, tctx := ktesting.NewTestContext(t)
2628
tc := newEmptyMetricsClientTestCase()
2729
emptyMetricsClient := tc.createFakeMetricsClient()
2830

29-
containerMetricsSnapshots, err := emptyMetricsClient.GetContainersMetrics()
31+
containerMetricsSnapshots, err := emptyMetricsClient.GetContainersMetrics(tctx)
3032

3133
assert.NoError(t, err)
3234
assert.Empty(t, containerMetricsSnapshots, "should be empty for empty MetricsGetter")
3335
}
3436

3537
func TestGetContainersMetricsReturnsResults(t *testing.T) {
38+
_, tctx := ktesting.NewTestContext(t)
3639
tc := newMetricsClientTestCase()
3740
fakeMetricsClient := tc.createFakeMetricsClient()
3841

39-
snapshots, err := fakeMetricsClient.GetContainersMetrics()
42+
snapshots, err := fakeMetricsClient.GetContainersMetrics(tctx)
4043

4144
assert.NoError(t, err)
4245
assert.Len(t, snapshots, len(tc.getAllSnaps()), "It should return right number of snapshots")

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,15 @@ func main() {
138138
klog.ErrorS(nil, "--vpa-object-namespace and --ignored-vpa-object-namespaces are mutually exclusive and can't be set together.")
139139
os.Exit(255)
140140
}
141+
ctx := context.Background()
141142

142143
healthCheck := metrics.NewHealthCheck(*metricsFetcherInterval * 5)
143144
metrics_recommender.Register()
144145
metrics_quality.Register()
145146
server.Initialize(&commonFlags.EnableProfiling, healthCheck, address)
146147

147148
if !leaderElection.LeaderElect {
148-
run(healthCheck, commonFlags)
149+
run(ctx, healthCheck, commonFlags)
149150
} else {
150151
id, err := os.Hostname()
151152
if err != nil {
@@ -172,15 +173,15 @@ func main() {
172173
os.Exit(255)
173174
}
174175

175-
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
176+
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
176177
Lock: lock,
177178
LeaseDuration: leaderElection.LeaseDuration.Duration,
178179
RenewDeadline: leaderElection.RenewDeadline.Duration,
179180
RetryPeriod: leaderElection.RetryPeriod.Duration,
180181
ReleaseOnCancel: true,
181182
Callbacks: leaderelection.LeaderCallbacks{
182183
OnStartedLeading: func(_ context.Context) {
183-
run(healthCheck, commonFlags)
184+
run(ctx, healthCheck, commonFlags)
184185
},
185186
OnStoppedLeading: func() {
186187
klog.Fatal("lost master")
@@ -209,7 +210,7 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf
209210
}
210211
}
211212

212-
func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
213+
func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
213214
// Create a stop channel that will be used to signal shutdown
214215
stopCh := make(chan struct{})
215216
defer close(stopCh)
@@ -218,7 +219,7 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
218219
clusterState := model.NewClusterState(aggregateContainerStateGCInterval)
219220
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(commonFlag.VpaObjectNamespace))
220221
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
221-
podLister, oomObserver := input.NewPodListerAndOOMObserver(kubeClient, commonFlag.VpaObjectNamespace, stopCh)
222+
podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, commonFlag.VpaObjectNamespace, stopCh)
222223

223224
model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp))
224225

@@ -266,7 +267,7 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
266267
IgnoredNamespaces: ignoredNamespaces,
267268
VpaObjectNamespace: commonFlag.VpaObjectNamespace,
268269
}.Make()
269-
controllerFetcher.Start(context.Background(), scaleCacheLoopPeriod)
270+
controllerFetcher.Start(ctx, scaleCacheLoopPeriod)
270271

271272
recommender := routines.RecommenderFactory{
272273
ClusterState: clusterState,
@@ -287,7 +288,7 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
287288
}
288289

289290
if useCheckpoints {
290-
recommender.GetClusterStateFeeder().InitFromCheckpoints()
291+
recommender.GetClusterStateFeeder().InitFromCheckpoints(ctx)
291292
} else {
292293
config := history.PrometheusHistoryProviderConfig{
293294
Address: *prometheusAddress,

vertical-pod-autoscaler/pkg/recommender/routines/recommender.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (r *recommender) MaintainCheckpoints(ctx context.Context, minCheckpointsPer
134134
}
135135
if time.Since(r.lastCheckpointGC) > r.checkpointsGCInterval {
136136
r.lastCheckpointGC = now
137-
r.clusterStateFeeder.GarbageCollectCheckpoints()
137+
r.clusterStateFeeder.GarbageCollectCheckpoints(ctx)
138138
}
139139
}
140140
}
@@ -153,7 +153,7 @@ func (r *recommender) RunOnce() {
153153
r.clusterStateFeeder.LoadPods()
154154
timer.ObserveStep("LoadPods")
155155

156-
r.clusterStateFeeder.LoadRealTimeMetrics()
156+
r.clusterStateFeeder.LoadRealTimeMetrics(ctx)
157157
timer.ObserveStep("LoadMetrics")
158158
klog.V(3).InfoS("ClusterState is tracking", "pods", len(r.clusterState.Pods()), "vpas", len(r.clusterState.VPAs()))
159159

0 commit comments

Comments
 (0)