Skip to content

Commit 6b0f412

Browse files
pmendelskiabdelrahman882
authored andcommitted
Add async node group creation
1 parent 616930b commit 6b0f412

File tree

11 files changed

+302
-20
lines changed

11 files changed

+302
-20
lines changed

cluster-autoscaler/cloudprovider/test/test_cloud_provider.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,15 @@ func (tcp *TestCloudProvider) NewNodeGroup(machineType string, labels map[string
216216

217217
// NewNodeGroupWithId creates a new node group with custom ID suffix.
218218
func (tcp *TestCloudProvider) NewNodeGroupWithId(machineType string, labels map[string]string, systemLabels map[string]string,
219-
taints []apiv1.Taint, extraResources map[string]resource.Quantity, id string) (cloudprovider.NodeGroup, error) {
219+
taints []apiv1.Taint, extraResources map[string]resource.Quantity, upcoming bool, id string) (cloudprovider.NodeGroup, error) {
220220
return &TestNodeGroup{
221221
cloudProvider: tcp,
222222
id: "autoprovisioned-" + machineType + "-" + id,
223223
minSize: 0,
224224
maxSize: 1000,
225225
targetSize: 0,
226226
exist: false,
227+
upcoming: upcoming,
227228
autoprovisioned: true,
228229
machineType: machineType,
229230
labels: labels,
@@ -254,12 +255,34 @@ func (tcp *TestCloudProvider) BuildNodeGroup(id string, min, max, size int, auto
254255
}
255256
}
256257

258+
// BuildUpcomingNodeGroup returns an upcoming test node group.
259+
func (tcp *TestCloudProvider) BuildUpcomingNodeGroup(id string, min, max, size int, autoprovisioned bool, machineType string, opts *config.NodeGroupAutoscalingOptions) *TestNodeGroup {
260+
return &TestNodeGroup{
261+
cloudProvider: tcp,
262+
id: id,
263+
minSize: min,
264+
maxSize: max,
265+
targetSize: size,
266+
exist: false,
267+
upcoming: true,
268+
autoprovisioned: autoprovisioned,
269+
machineType: machineType,
270+
opts: opts,
271+
}
272+
}
273+
257274
// AddNodeGroup adds node group to test cloud provider.
258275
func (tcp *TestCloudProvider) AddNodeGroup(id string, min int, max int, size int) {
259276
nodeGroup := tcp.BuildNodeGroup(id, min, max, size, false, "", nil)
260277
tcp.InsertNodeGroup(nodeGroup)
261278
}
262279

280+
// AddUpcomingNodeGroup adds upcoming node group to test cloud provider.
281+
func (tcp *TestCloudProvider) AddUpcomingNodeGroup(id string, min int, max int, size int) {
282+
nodeGroup := tcp.BuildUpcomingNodeGroup(id, min, max, size, false, "", nil)
283+
tcp.InsertNodeGroup(nodeGroup)
284+
}
285+
263286
// AddNodeGroupWithCustomOptions adds node group with custom options
264287
// to test cloud provider.
265288
func (tcp *TestCloudProvider) AddNodeGroupWithCustomOptions(id string, min int, max int, size int, opts *config.NodeGroupAutoscalingOptions) {

cluster-autoscaler/clusterstate/clusterstate.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,11 @@ func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.
684684
klog.Warningf("Acceptable range for node group %s not found", nodeGroup.Id())
685685
continue
686686
}
687+
if nodeGroup.IsUpcoming() {
688+
// Nodes for upcoming node groups reside in-memory and wait for node group to be fully
689+
// created. There is no need to mark their sizes incorrect.
690+
continue
691+
}
687692
readiness, found := csr.perNodeGroupReadiness[nodeGroup.Id()]
688693
if !found {
689694
// if MinNodes == 0 node group has been scaled to 0 and everything's fine
@@ -981,6 +986,13 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]i
981986
registeredNodeNames = map[string][]string{}
982987
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
983988
id := nodeGroup.Id()
989+
if nodeGroup.IsUpcoming() {
990+
size, err := nodeGroup.TargetSize()
991+
if size >= 0 || err != nil {
992+
upcomingCounts[id] = size
993+
}
994+
continue
995+
}
984996
readiness := csr.perNodeGroupReadiness[id]
985997
ar := csr.acceptableRanges[id]
986998
// newNodes is the number of nodes that

cluster-autoscaler/clusterstate/clusterstate_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,3 +1380,27 @@ func TestTruncateIfExceedMaxSize(t *testing.T) {
13801380
})
13811381
}
13821382
}
1383+
1384+
func TestUpcomingNodesFromUpcomingNodeGroups(t *testing.T) {
1385+
now := time.Now()
1386+
1387+
ng := BuildTestNode("ng", 1000, 1000)
1388+
1389+
provider := testprovider.NewTestCloudProvider(nil, nil)
1390+
provider.AddUpcomingNodeGroup("ng", 1, 10, 2)
1391+
1392+
assert.NotNil(t, provider)
1393+
fakeClient := &fake.Clientset{}
1394+
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "some-map")
1395+
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
1396+
MaxTotalUnreadyPercentage: 10,
1397+
OkTotalUnreadyCount: 1,
1398+
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
1399+
err := clusterstate.UpdateNodes([]*apiv1.Node{ng}, nil, now)
1400+
assert.NoError(t, err)
1401+
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().Unready))
1402+
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
1403+
upcoming, upcomingRegistered := clusterstate.GetUpcomingNodes()
1404+
assert.Equal(t, 2, upcoming["ng"])
1405+
assert.Empty(t, upcomingRegistered["ng"])
1406+
}

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ type AutoscalingOptions struct {
295295
BypassedSchedulers map[string]bool
296296
// ProvisioningRequestEnabled tells if CA processes ProvisioningRequest.
297297
ProvisioningRequestEnabled bool
298+
// AsyncNodeGroupsEnabled tells if CA creates/deletes node groups asynchronously.
299+
AsyncNodeGroupsEnabled bool
298300
}
299301

300302
// KubeClientOptions specify options for kube client
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
Copyright 2016 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package orchestrator
18+
19+
import (
20+
"time"
21+
22+
appsv1 "k8s.io/api/apps/v1"
23+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
24+
"k8s.io/autoscaler/cluster-autoscaler/context"
25+
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
26+
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
27+
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
28+
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
29+
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
30+
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
31+
"k8s.io/klog/v2"
32+
"k8s.io/kubernetes/pkg/scheduler/framework"
33+
)
34+
35+
type asyncNodeGroupInitializer struct {
36+
nodeGroup cloudprovider.NodeGroup
37+
nodeInfo *framework.NodeInfo
38+
scaleUpExecutor *scaleUpExecutor
39+
taintConfig taints.TaintConfig
40+
daemonSets []*appsv1.DaemonSet
41+
scaleUpStatusProcessor status.ScaleUpStatusProcessor
42+
context *context.AutoscalingContext
43+
atomicScaleUp bool
44+
}
45+
46+
func newAsyncNodeGroupInitializer(
47+
nodeGroup cloudprovider.NodeGroup,
48+
nodeInfo *framework.NodeInfo,
49+
scaleUpExecutor *scaleUpExecutor,
50+
taintConfig taints.TaintConfig,
51+
daemonSets []*appsv1.DaemonSet,
52+
scaleUpStatusProcessor status.ScaleUpStatusProcessor,
53+
context *context.AutoscalingContext,
54+
atomicScaleUp bool,
55+
) *asyncNodeGroupInitializer {
56+
return &asyncNodeGroupInitializer{
57+
nodeGroup,
58+
nodeInfo,
59+
scaleUpExecutor,
60+
taintConfig,
61+
daemonSets,
62+
scaleUpStatusProcessor,
63+
context,
64+
atomicScaleUp,
65+
}
66+
}
67+
68+
func (s *asyncNodeGroupInitializer) InitializeNodeGroup(result nodegroups.AsyncNodeGroupCreationResult) {
69+
if result.Error != nil {
70+
klog.Errorf("Async node group creation failed. Async scale-up is cancelled. %v", result.Error)
71+
scaleUpStatus, _ := status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, result.Error))
72+
s.scaleUpStatusProcessor.Process(s.context, scaleUpStatus)
73+
return
74+
}
75+
mainCreatedNodeGroup := result.CreationResult.MainCreatedNodeGroup
76+
// If possible replace candidate node-info with node info based on crated node group. The latter
77+
// one should be more in line with nodes which will be created by node group.
78+
nodeInfo, aErr := utils.GetNodeInfoFromTemplate(mainCreatedNodeGroup, s.daemonSets, s.taintConfig)
79+
if aErr != nil {
80+
klog.Warningf("Cannot build node info for newly created main node group %s. Using fallback. Error: %v", mainCreatedNodeGroup.Id(), aErr)
81+
nodeInfo = s.nodeInfo
82+
}
83+
84+
nodeInfos := make(map[string]*framework.NodeInfo)
85+
var scaleUpInfos []nodegroupset.ScaleUpInfo
86+
for _, nodeGroup := range result.CreationResult.AllCreatedNodeGroups() {
87+
if targetSize := result.TargetSizes[nodeGroup.Id()]; targetSize > 0 {
88+
nodeInfos[nodeGroup.Id()] = nodeInfo
89+
scaleUpInfo := nodegroupset.ScaleUpInfo{
90+
Group: nodeGroup,
91+
CurrentSize: 0,
92+
NewSize: targetSize,
93+
MaxSize: nodeGroup.MaxSize(),
94+
}
95+
scaleUpInfos = append(scaleUpInfos, scaleUpInfo)
96+
}
97+
}
98+
klog.Infof("Starting initial scale-up for async created node groups. Scale ups: %v", scaleUpInfos)
99+
err, failedNodeGroups := s.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, time.Now(), s.atomicScaleUp)
100+
if err != nil {
101+
var failedNodeGroupIds []string
102+
for _, failedNodeGroup := range failedNodeGroups {
103+
failedNodeGroupIds = append(failedNodeGroupIds, failedNodeGroup.Id())
104+
}
105+
klog.Errorf("Async scale-up for asynchronously created node group failed: %v (node groups: %v)", err, failedNodeGroupIds)
106+
return
107+
}
108+
klog.Infof("Initial scale-up succeeded. Scale ups: %v", scaleUpInfos)
109+
}
110+
111+
func nodeGroupIds(nodeGroups []cloudprovider.NodeGroup) []string {
112+
var result []string
113+
for _, ng := range nodeGroups {
114+
result = append(result, ng.Id())
115+
}
116+
return result
117+
}

cluster-autoscaler/core/scaleup/orchestrator/executor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ func (e *scaleUpExecutor) executeScaleUp(
172172
if increase < 0 {
173173
return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase))
174174
}
175+
if info.Group.IsUpcoming() {
176+
// Don't emit scale up event for upcoming node group as it will be generated after
177+
// the node group is created, during initial scale up.
178+
return nil
179+
}
175180
e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now())
176181
metrics.RegisterScaleUp(increase, gpuResourceName, gpuType)
177182
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",

cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
214214

215215
// If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported.
216216
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
217-
if !bestOption.NodeGroup.Exist() {
217+
if !bestOption.NodeGroup.Exist() && !bestOption.NodeGroup.IsUpcoming() {
218218
if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes {
219219
klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes)
220220
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
@@ -223,7 +223,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
223223
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
224224
}
225225
var scaleUpStatus *status.ScaleUpStatus
226-
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
226+
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, allOrNothing)
227227
if aErr != nil {
228228
return scaleUpStatus, aErr
229229
}
@@ -503,11 +503,19 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
503503
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
504504
podEquivalenceGroups []*equivalence.PodGroup,
505505
daemonSets []*appsv1.DaemonSet,
506+
allOrNothing bool,
506507
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
507508
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
508509

509510
oldId := initialOption.NodeGroup.Id()
510-
createNodeGroupResult, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
511+
var createNodeGroupResult nodegroups.CreateNodeGroupResult
512+
var aErr errors.AutoscalerError
513+
if o.autoscalingContext.AsyncNodeGroupsEnabled {
514+
initializer := newAsyncNodeGroupInitializer(initialOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
515+
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer)
516+
} else {
517+
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
518+
}
511519
if aErr != nil {
512520
status, err := status.UpdateScaleUpError(
513521
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{initialOption.NodeGroup}, PodsTriggeredScaleUp: initialOption.Pods},

0 commit comments

Comments
 (0)