Skip to content

Commit d488bdb

Browse files
authored
Merge pull request #6792 from k8s-infra-cherrypick-robot/cherry-pick-6749-to-release-1.1
[release-1.1] ✨ clusterctl: migrate CRDs during clusterctl upgrade
2 parents 2eb2188 + a39f8c9 commit d488bdb

File tree

4 files changed

+533
-0
lines changed

4 files changed

+533
-0
lines changed

cmd/clusterctl/client/cluster/cert_manager.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,12 @@ func (cm *certManagerClient) EnsureLatestVersion() error {
253253
return nil
254254
}
255255

256+
// Migrate CRs to latest CRD storage version, if necessary.
257+
// Note: We have to do this before cert-manager is deleted so conversion webhooks still work.
258+
if err := cm.migrateCRDs(); err != nil {
259+
return err
260+
}
261+
256262
// delete the cert-manager version currently installed (because it should be upgraded);
257263
// NOTE: CRDs, and namespace are preserved in order to avoid deletion of user objects;
258264
// web-hooks are preserved to avoid a user attempting to CREATE a cert-manager resource while the upgrade is in progress.
@@ -265,6 +271,30 @@ func (cm *certManagerClient) EnsureLatestVersion() error {
265271
return cm.install()
266272
}
267273

274+
func (cm *certManagerClient) migrateCRDs() error {
275+
config, err := cm.configClient.CertManager().Get()
276+
if err != nil {
277+
return err
278+
}
279+
280+
// Gets the new cert-manager components from the repository.
281+
objs, err := cm.getManifestObjs(config)
282+
if err != nil {
283+
return err
284+
}
285+
286+
c, err := cm.proxy.NewClient()
287+
if err != nil {
288+
return err
289+
}
290+
291+
if err := newCRDMigrator(c).Run(ctx, objs); err != nil {
292+
return err
293+
}
294+
295+
return nil
296+
}
297+
268298
func (cm *certManagerClient) deleteObjs(objs []unstructured.Unstructured) error {
269299
deleteCertManagerBackoff := newWriteBackoff()
270300
for i := range objs {
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
"github.com/pkg/errors"
26+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/runtime/schema"
30+
"k8s.io/apimachinery/pkg/util/rand"
31+
"k8s.io/apimachinery/pkg/util/sets"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
33+
34+
"sigs.k8s.io/cluster-api/cmd/clusterctl/internal/scheme"
35+
logf "sigs.k8s.io/cluster-api/cmd/clusterctl/log"
36+
)
37+
38+
// crdMigrator migrates CRs to the storage version of new CRDs.
39+
// This is necessary when the new CRD drops a version which
40+
// was previously used as a storage version.
41+
type crdMigrator struct {
42+
Client client.Client
43+
}
44+
45+
// newCRDMigrator creates a new CRD migrator.
46+
func newCRDMigrator(client client.Client) *crdMigrator {
47+
return &crdMigrator{
48+
Client: client,
49+
}
50+
}
51+
52+
// Run migrates CRs to the storage version of new CRDs.
53+
// This is necessary when the new CRD drops a version which
54+
// was previously used as a storage version.
55+
func (m *crdMigrator) Run(ctx context.Context, objs []unstructured.Unstructured) error {
56+
for i := range objs {
57+
obj := objs[i]
58+
59+
if obj.GetKind() == "CustomResourceDefinition" {
60+
crd := &apiextensionsv1.CustomResourceDefinition{}
61+
if err := scheme.Scheme.Convert(&obj, crd, nil); err != nil {
62+
return errors.Wrapf(err, "failed to convert CRD %q", obj.GetName())
63+
}
64+
65+
if _, err := m.run(ctx, crd); err != nil {
66+
return err
67+
}
68+
}
69+
}
70+
return nil
71+
}
72+
73+
// run migrates CRs of a new CRD.
74+
// This is necessary when the new CRD drops a version which
75+
// was previously used as a storage version.
76+
func (m *crdMigrator) run(ctx context.Context, newCRD *apiextensionsv1.CustomResourceDefinition) (bool, error) {
77+
log := logf.Log
78+
79+
// Gets the list of version supported by the new CRD
80+
newVersions := sets.NewString()
81+
for _, version := range newCRD.Spec.Versions {
82+
newVersions.Insert(version.Name)
83+
}
84+
85+
// Get the current CRD.
86+
currentCRD := &apiextensionsv1.CustomResourceDefinition{}
87+
if err := retryWithExponentialBackoff(newReadBackoff(), func() error {
88+
return m.Client.Get(ctx, client.ObjectKeyFromObject(newCRD), currentCRD)
89+
}); err != nil {
90+
// Return if the CRD doesn't exist yet. We only have to migrate if the CRD exists already.
91+
if apierrors.IsNotFound(err) {
92+
return false, nil
93+
}
94+
return false, err
95+
}
96+
97+
// Get the storage version of the current CRD.
98+
currentStorageVersion, err := storageVersionForCRD(currentCRD)
99+
if err != nil {
100+
return false, err
101+
}
102+
103+
// Return an error, if the current storage version has been dropped in the new CRD.
104+
if !newVersions.Has(currentStorageVersion) {
105+
return false, errors.Errorf("unable to upgrade CRD %q because the new CRD does not contain the storage version %q of the current CRD, thus not allowing CR migration", newCRD.Name, currentStorageVersion)
106+
}
107+
108+
currentStatusStoredVersions := sets.NewString(currentCRD.Status.StoredVersions...)
109+
110+
// If the new CRD still contains all current stored versions, nothing to do
111+
// as no previous storage version will be dropped.
112+
if newVersions.HasAll(currentStatusStoredVersions.List()...) {
113+
log.V(2).Info("CRD migration check passed", "name", newCRD.Name)
114+
return false, nil
115+
}
116+
117+
// Otherwise a version that has been used as storage version will be dropped, so it is necessary to migrate all the
118+
// objects and drop the storage version from the current CRD status before installing the new CRD.
119+
// Ref https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definition-versioning/#writing-reading-and-updating-versioned-customresourcedefinition-objects
120+
// Note: We are simply migrating all CR objects independent of the version in which they are actually stored in etcd.
121+
// This way we can make sure that all CR objects are now stored in the current storage version.
122+
// Alternatively, we would have to figure out which objects are stored in which version but this information is not
123+
// exposed by the apiserver.
124+
storedVersionsToDelete := currentStatusStoredVersions.Difference(newVersions)
125+
storedVersionsToPreserve := currentStatusStoredVersions.Intersection(newVersions)
126+
log.Info("CR migration required", "kind", newCRD.Spec.Names.Kind, "storedVersionsToDelete", strings.Join(storedVersionsToDelete.List(), ","), "storedVersionsToPreserve", strings.Join(storedVersionsToPreserve.List(), ","))
127+
128+
if err := m.migrateResourcesForCRD(ctx, currentCRD, currentStorageVersion); err != nil {
129+
return false, err
130+
}
131+
132+
if err := m.patchCRDStoredVersions(ctx, currentCRD, currentStorageVersion); err != nil {
133+
return false, err
134+
}
135+
136+
return true, nil
137+
}
138+
139+
func (m *crdMigrator) migrateResourcesForCRD(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, currentStorageVersion string) error {
140+
log := logf.Log
141+
log.Info("Migrating CRs, this operation may take a while...", "kind", crd.Spec.Names.Kind)
142+
143+
list := &unstructured.UnstructuredList{}
144+
list.SetGroupVersionKind(schema.GroupVersionKind{
145+
Group: crd.Spec.Group,
146+
Version: currentStorageVersion,
147+
Kind: crd.Spec.Names.ListKind,
148+
})
149+
150+
var i int
151+
for {
152+
if err := retryWithExponentialBackoff(newReadBackoff(), func() error {
153+
return m.Client.List(ctx, list, client.Continue(list.GetContinue()))
154+
}); err != nil {
155+
return errors.Wrapf(err, "failed to list %q", list.GetKind())
156+
}
157+
158+
for i := range list.Items {
159+
obj := list.Items[i]
160+
161+
log.V(5).Info("Migrating", logf.UnstructuredToValues(obj)...)
162+
if err := retryWithExponentialBackoff(newWriteBackoff(), func() error {
163+
return handleMigrateErr(m.Client.Update(ctx, &obj))
164+
}); err != nil {
165+
return errors.Wrapf(err, "failed to migrate %s/%s", obj.GetNamespace(), obj.GetName())
166+
}
167+
168+
// Add some random delays to avoid pressure on the API server.
169+
i++
170+
if i%10 == 0 {
171+
log.V(2).Info(fmt.Sprintf("%d objects migrated", i))
172+
time.Sleep(time.Duration(rand.IntnRange(50*int(time.Millisecond), 250*int(time.Millisecond))))
173+
}
174+
}
175+
176+
if list.GetContinue() == "" {
177+
break
178+
}
179+
}
180+
181+
log.V(2).Info(fmt.Sprintf("CR migration completed: migrated %d objects", i), "kind", crd.Spec.Names.Kind)
182+
return nil
183+
}
184+
185+
func (m *crdMigrator) patchCRDStoredVersions(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, currentStorageVersion string) error {
186+
crd.Status.StoredVersions = []string{currentStorageVersion}
187+
if err := retryWithExponentialBackoff(newWriteBackoff(), func() error {
188+
return m.Client.Status().Update(ctx, crd)
189+
}); err != nil {
190+
return errors.Wrapf(err, "failed to update status.storedVersions for CRD %q", crd.Name)
191+
}
192+
return nil
193+
}
194+
195+
// handleMigrateErr will absorb certain types of errors that we know can be skipped/passed on
196+
// during a migration of a particular object.
197+
func handleMigrateErr(err error) error {
198+
if err == nil {
199+
return nil
200+
}
201+
202+
// If the resource no longer exists, don't return the error as the object no longer
203+
// needs updating to the new API version.
204+
if apierrors.IsNotFound(err) {
205+
return nil
206+
}
207+
208+
// If there was a conflict, another client must have written the object already which
209+
// means we don't need to force an update.
210+
if apierrors.IsConflict(err) {
211+
return nil
212+
}
213+
return err
214+
}
215+
216+
// storageVersionForCRD discovers the storage version for a given CRD.
217+
func storageVersionForCRD(crd *apiextensionsv1.CustomResourceDefinition) (string, error) {
218+
for _, v := range crd.Spec.Versions {
219+
if v.Storage {
220+
return v.Name, nil
221+
}
222+
}
223+
return "", errors.Errorf("could not find storage version for CRD %q", crd.Name)
224+
}

0 commit comments

Comments
 (0)