Skip to content

Commit 13dde43

Browse files
authored
Merge pull request #8320 from adrianmoisey/fix-race
Add some race-condition protection to VPA recommender
2 parents d84c982 + 44e0f63 commit 13dde43

File tree

8 files changed

+176
-12
lines changed

8 files changed

+176
-12
lines changed

vertical-pod-autoscaler/docs/flags.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ This document is auto-generated from the flag definitions in the VPA recommender
125125
| `storage` | string | | Specifies storage mode. Supported values: prometheus, checkpoint |
126126
| `target-cpu-percentile` | float | 0.9 | CPU usage percentile that will be used as a base for CPU target recommendation. Doesn't affect CPU lower bound, CPU upper bound nor memory recommendations. |
127127
| `target-memory-percentile` | float | 0.9 | Memory usage percentile that will be used as a base for memory target recommendation. Doesn't affect memory lower bound nor memory upper bound. |
128-
| `update-worker-count` | | 10 | kube-api-qps Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (kube-api-qps and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. |
128+
| `update-worker-count` | int | 10 | Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits ('kube-api-qps' and 'kube-api-burst') are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. |
129129
| `use-external-metrics` | | | ALPHA. Use an external metrics provider instead of metrics_server. |
130130
| `username` | string | | The username used in the prometheus server basic auth |
131131
| `v,` | | : 4 | , --v Level set the log level verbosity (default 4) |

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ var (
6666
address = flag.String("address", ":8942", "The address to expose Prometheus metrics.")
6767
storage = flag.String("storage", "", `Specifies storage mode. Supported values: prometheus, checkpoint (default)`)
6868
memorySaver = flag.Bool("memory-saver", false, `If true, only track pods which have an associated VPA`)
69-
updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (`kube-api-qps` and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.")
69+
updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits ('kube-api-qps' and 'kube-api-burst') are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.")
7070
)
7171

7272
// Prometheus history provider flags

vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ package model
3737

3838
import (
3939
"fmt"
40+
"sync"
4041
"time"
4142

4243
corev1 "k8s.io/api/core/v1"
@@ -105,16 +106,27 @@ type AggregateContainerState struct {
105106
// we want to know if it needs recommendation, if the recommendation
106107
// is present and if the automatic updates are on (are we able to
107108
// apply the recommendation to the pods).
108-
LastRecommendation corev1.ResourceList
109+
lastRecommendation corev1.ResourceList
109110
IsUnderVPA bool
110111
UpdateMode *vpa_types.UpdateMode
111112
ScalingMode *vpa_types.ContainerScalingMode
112113
ControlledResources *[]ResourceName
114+
115+
mutex sync.RWMutex
113116
}
114117

115-
// GetLastRecommendation returns last recorded recommendation.
118+
// GetLastRecommendation returns last recorded recommendation in a thread-safe manner.
116119
func (a *AggregateContainerState) GetLastRecommendation() corev1.ResourceList {
117-
return a.LastRecommendation
120+
a.mutex.RLock()
121+
defer a.mutex.RUnlock()
122+
return a.lastRecommendation
123+
}
124+
125+
// SetLastRecommendation sets the last recorded recommendation in a thread-safe manner.
126+
func (a *AggregateContainerState) SetLastRecommendation(recommendation corev1.ResourceList) {
127+
a.mutex.Lock()
128+
defer a.mutex.Unlock()
129+
a.lastRecommendation = recommendation
118130
}
119131

120132
// NeedsRecommendation returns true if the state should have recommendation calculated.
@@ -147,7 +159,7 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName {
147159
// a VPA object.
148160
func (a *AggregateContainerState) MarkNotAutoscaled() {
149161
a.IsUnderVPA = false
150-
a.LastRecommendation = nil
162+
a.SetLastRecommendation(nil)
151163
a.UpdateMode = nil
152164
a.ScalingMode = nil
153165
a.ControlledResources = nil

vertical-pod-autoscaler/pkg/recommender/model/cluster.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package model
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
apiv1 "k8s.io/api/core/v1"
@@ -70,6 +71,7 @@ type clusterState struct {
7071
// VPA objects in the cluster that have no recommendation mapped to the first
7172
// time we've noticed the recommendation missing or last time we logged
7273
// a warning about it.
74+
// TODO consider switching to a sync.Map for emptyVPAs
7375
emptyVPAs map[VpaID]time.Time
7476
// Observed VPAs. Used to check if there are updates needed.
7577
observedVPAs []*vpa_types.VerticalPodAutoscaler
@@ -82,6 +84,9 @@ type clusterState struct {
8284

8385
lastAggregateContainerStateGC time.Time
8486
gcInterval time.Duration
87+
88+
// Mutex to protect concurrent access to maps
89+
mutex sync.RWMutex
8590
}
8691

8792
// StateMapSize is the number of pods being tracked by the VPA
@@ -319,6 +324,8 @@ func (cluster *clusterState) DeleteVpa(vpaID VpaID) error {
319324
state.MarkNotAutoscaled()
320325
}
321326
delete(cluster.vpas, vpaID)
327+
cluster.mutex.Lock()
328+
defer cluster.mutex.Unlock()
322329
delete(cluster.emptyVPAs, vpaID)
323330
return nil
324331
}
@@ -464,6 +471,8 @@ func (cluster *clusterState) getContributiveAggregateStateKeys(ctx context.Conte
464471
// keep track of empty recommendations and log information about them
465472
// periodically.
466473
func (cluster *clusterState) RecordRecommendation(vpa *Vpa, now time.Time) error {
474+
cluster.mutex.Lock()
475+
defer cluster.mutex.Unlock()
467476
if vpa.Recommendation != nil && len(vpa.Recommendation.ContainerRecommendations) > 0 {
468477
delete(cluster.emptyVPAs, vpa.ID)
469478
return nil

vertical-pod-autoscaler/pkg/recommender/model/vpa.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ func (vpa *Vpa) UpdateRecommendation(recommendation *vpa_types.RecommendedPodRes
166166
for _, containerRecommendation := range recommendation.ContainerRecommendations {
167167
for container, state := range vpa.aggregateContainerStates {
168168
if container.ContainerName() == containerRecommendation.ContainerName {
169-
metrics_quality.ObserveRecommendationChange(state.LastRecommendation, containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount)
170-
state.LastRecommendation = containerRecommendation.UncappedTarget
169+
metrics_quality.ObserveRecommendationChange(state.GetLastRecommendation(), containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount)
170+
state.SetLastRecommendation(containerRecommendation.UncappedTarget)
171171
}
172172
}
173173
}

vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func TestUpdateRecommendation(t *testing.T) {
193193
for container, rec := range tc.containers {
194194
state := &AggregateContainerState{}
195195
if rec != nil {
196-
state.LastRecommendation = corev1.ResourceList{
196+
state.lastRecommendation = corev1.ResourceList{
197197
corev1.ResourceCPU: resource.MustParse(rec.cpu),
198198
corev1.ResourceMemory: resource.MustParse(rec.mem),
199199
}
@@ -209,9 +209,9 @@ func TestUpdateRecommendation(t *testing.T) {
209209
for key, state := range vpa.aggregateContainerStates {
210210
expected, ok := tc.expectedLast[key.ContainerName()]
211211
if !ok {
212-
assert.Nil(t, state.LastRecommendation)
212+
assert.Nil(t, state.lastRecommendation)
213213
} else {
214-
assert.Equal(t, expected, state.LastRecommendation)
214+
assert.Equal(t, expected, state.lastRecommendation)
215215
}
216216
}
217217
})
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package routines
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/labels"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
31+
v1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
32+
vpa_fake "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/fake"
33+
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/logic"
34+
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
35+
metrics_recommender "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/recommender"
36+
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
37+
)
38+
39+
type mockPodResourceRecommender struct{}
40+
41+
func (m *mockPodResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) logic.RecommendedPodResources {
42+
return logic.RecommendedPodResources{}
43+
}
44+
45+
// TestProcessUpdateVPAsConcurrency tests processVPAUpdate for race conditions when run concurrently
46+
func TestProcessUpdateVPAsConcurrency(t *testing.T) {
47+
updateWorkerCount := 10
48+
49+
vpaCount := 1000
50+
vpas := make(map[model.VpaID]*model.Vpa, vpaCount)
51+
apiObjectVPAs := make([]*v1.VerticalPodAutoscaler, vpaCount)
52+
fakedClient := make([]runtime.Object, vpaCount)
53+
54+
for i := range vpaCount {
55+
vpaName := fmt.Sprintf("test-vpa-%d", i)
56+
vpaID := model.VpaID{
57+
Namespace: "default",
58+
VpaName: vpaName,
59+
}
60+
selector, err := labels.Parse("app=test")
61+
assert.NoError(t, err, "Failed to parse label selector")
62+
vpas[vpaID] = model.NewVpa(vpaID, selector, time.Now())
63+
64+
apiObjectVPAs[i] = test.VerticalPodAutoscaler().
65+
WithName(vpaName).
66+
WithNamespace("default").
67+
WithContainer("test-container").
68+
Get()
69+
70+
fakedClient[i] = apiObjectVPAs[i]
71+
}
72+
73+
fakeClient := vpa_fake.NewSimpleClientset(fakedClient...).AutoscalingV1()
74+
r := &recommender{
75+
clusterState: model.NewClusterState(time.Minute),
76+
vpaClient: fakeClient,
77+
podResourceRecommender: &mockPodResourceRecommender{},
78+
recommendationPostProcessor: []RecommendationPostProcessor{},
79+
}
80+
81+
labelSelector, err := metav1.ParseToLabelSelector("app=test")
82+
assert.NoError(t, err, "Failed to parse label selector")
83+
parsedSelector, err := metav1.LabelSelectorAsSelector(labelSelector)
84+
assert.NoError(t, err, "Failed to convert label selector to selector")
85+
86+
// Inject into clusterState
87+
for _, vpa := range apiObjectVPAs {
88+
err := r.clusterState.AddOrUpdateVpa(vpa, parsedSelector)
89+
assert.NoError(t, err, "Failed to add or update VPA in cluster state")
90+
}
91+
r.clusterState.SetObservedVPAs(apiObjectVPAs)
92+
93+
// Run processVPAUpdate concurrently for all VPAs
94+
var wg sync.WaitGroup
95+
96+
cnt := metrics_recommender.NewObjectCounter()
97+
defer cnt.Observe()
98+
99+
// Create a channel to send VPA updates to workers
100+
vpaUpdates := make(chan *v1.VerticalPodAutoscaler, len(apiObjectVPAs))
101+
102+
var counter int64
103+
104+
// Start workers
105+
for range updateWorkerCount {
106+
wg.Add(1)
107+
go func() {
108+
defer wg.Done()
109+
for observedVpa := range vpaUpdates {
110+
key := model.VpaID{
111+
Namespace: observedVpa.Namespace,
112+
VpaName: observedVpa.Name,
113+
}
114+
115+
vpa, found := r.clusterState.VPAs()[key]
116+
if !found {
117+
return
118+
}
119+
120+
atomic.AddInt64(&counter, 1)
121+
122+
processVPAUpdate(r, vpa, observedVpa)
123+
cnt.Add(vpa)
124+
}
125+
}()
126+
}
127+
128+
// Send VPA updates to the workers
129+
for _, observedVpa := range apiObjectVPAs {
130+
vpaUpdates <- observedVpa
131+
}
132+
133+
close(vpaUpdates)
134+
wg.Wait()
135+
136+
assert.Equal(t, int64(vpaCount), atomic.LoadInt64(&counter), "Not all VPAs were processed")
137+
}

vertical-pod-autoscaler/pkg/utils/metrics/recommender/recommender.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"net/http"
2323
"strconv"
24+
"sync"
2425
"time"
2526

2627
"github.com/prometheus/client_golang/prometheus"
@@ -119,7 +120,8 @@ type objectCounterKey struct {
119120

120121
// ObjectCounter helps split all VPA objects into buckets
121122
type ObjectCounter struct {
122-
cnt map[objectCounterKey]int
123+
cnt map[objectCounterKey]int
124+
mutex sync.RWMutex
123125
}
124126

125127
// Register initializes all metrics for VPA Recommender
@@ -189,11 +191,15 @@ func (oc *ObjectCounter) Add(vpa *model.Vpa) {
189191
matchesPods: vpa.HasMatchedPods(),
190192
unsupportedConfig: vpa.Conditions.ConditionActive(vpa_types.ConfigUnsupported),
191193
}
194+
oc.mutex.Lock()
192195
oc.cnt[key]++
196+
oc.mutex.Unlock()
193197
}
194198

195199
// Observe passes all the computed bucket values to metrics
196200
func (oc *ObjectCounter) Observe() {
201+
oc.mutex.RLock()
202+
defer oc.mutex.RUnlock()
197203
for k, v := range oc.cnt {
198204
vpaObjectCount.WithLabelValues(
199205
k.mode,

0 commit comments

Comments
 (0)