Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions api/v1beta2/appwrapper_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type AppWrapperComponent struct {
//+optional
Annotations map[string]string `json:"annotations,omitempty"`

// PodSets contained in the Component
// DeclaredPodSets for the Component (optional for known PodCreating GVKs)
//+optional
PodSets []AppWrapperPodSet `json:"podSets,omitempty"`
DeclaredPodSets []AppWrapperPodSet `json:"podSets,omitempty"`

// PodSetInfos assigned to the Component's PodSets by Kueue
//+optional
Expand Down Expand Up @@ -121,6 +121,9 @@ type AppWrapperComponentStatus struct {
// APIVersion is the APIVersion of the Component
APIVersion string `json:"apiVersion"`

// PodSets is the validated PodSets for the Component (either from AppWrapperComponent.DeclaredPodSets or inferred by the controller)
PodSets []AppWrapperPodSet `json:"podSets"`

// Conditions hold the latest available observations of the Component's current state.
//
// The type of the condition could be:
Expand Down
11 changes: 9 additions & 2 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion config/crd/bases/workload.codeflare.dev_appwrappers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ spec:
type: object
type: array
podSets:
description: PodSets contained in the Component
description: DeclaredPodSets for the Component (optional for
known PodCreating GVKs)
items:
description: AppWrapperPodSet describes an homogeneous set
of pods
Expand Down Expand Up @@ -264,10 +265,31 @@ spec:
name:
description: Name is the name of the Component
type: string
podSets:
description: PodSets is the validated PodSets for the Component
(either from AppWrapperComponent.DeclaredPodSets or inferred
by the controller)
items:
description: AppWrapperPodSet describes an homogeneous set
of pods
properties:
path:
description: Path is the path Component.Template to the
PodTemplateSpec for this PodSet
type: string
replicas:
description: Replicas is the number of pods in this PodSet
format: int32
type: integer
required:
- path
type: object
type: array
required:
- apiVersion
- kind
- name
- podSets
type: object
type: array
conditions:
Expand Down
9 changes: 8 additions & 1 deletion internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
"github.com/project-codeflare/appwrapper/pkg/config"
"github.com/project-codeflare/appwrapper/pkg/utils"
)
Expand Down Expand Up @@ -150,13 +151,19 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
}
aw.Status.ComponentStatus = make([]workloadv1beta2.AppWrapperComponentStatus, len(aw.Spec.Components))

return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspended)

case workloadv1beta2.AppWrapperSuspended: // no components deployed
if aw.Spec.Suspend {
return ctrl.Result{}, nil // remain suspended
}

// Normally already done as a side-effect of Kueue calling PodSets(), but be absolutely certain before we start using it.
if err := awstatus.EnsureComponentStatusInitialized(ctx, aw); err != nil {
return ctrl.Result{}, err
}

// begin deployment
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.QuotaReserved),
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/appwrapper/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func pod(milliCPU int64) workloadv1beta2.AppWrapperComponent {
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
return workloadv1beta2.AppWrapperComponent{
PodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
}
}

Expand All @@ -134,7 +134,7 @@ func malformedPod(milliCPU int64) workloadv1beta2.AppWrapperComponent {
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
return workloadv1beta2.AppWrapperComponent{
PodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
}
}
13 changes: 7 additions & 6 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,25 @@ import (
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
)

func parseComponent(aw *workloadv1beta2.AppWrapper, raw []byte) (*unstructured.Unstructured, error) {
func parseComponent(raw []byte, expectedNamespace string) (*unstructured.Unstructured, error) {
obj := &unstructured.Unstructured{}
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, obj); err != nil {
return nil, err
}
namespace := obj.GetNamespace()
if namespace == "" {
obj.SetNamespace(aw.Namespace)
} else if namespace != aw.Namespace {
obj.SetNamespace(expectedNamespace)
} else if namespace != expectedNamespace {
// Should not happen, namespace equality checked by validateAppWrapperInvariants
return nil, fmt.Errorf("component namespace \"%s\" is different from appwrapper namespace \"%s\"", namespace, aw.Namespace)
return nil, fmt.Errorf("component namespace \"%s\" is different from appwrapper namespace \"%s\"", namespace, expectedNamespace)
}
return obj, nil
}

//gocyclo:ignore
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (*unstructured.Unstructured, error, bool) {
component := aw.Spec.Components[componentIdx]
componentStatus := aw.Status.ComponentStatus[componentIdx]
toMap := func(x interface{}) map[string]string {
if x == nil {
return nil
Expand All @@ -77,7 +78,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}
}

obj, err := parseComponent(aw, component.Template.Raw)
obj, err := parseComponent(component.Template.Raw, aw.Namespace)
if err != nil {
return nil, err, true
}
Expand All @@ -88,7 +89,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}

awLabels := map[string]string{AppWrapperLabel: aw.Name}
for podSetsIdx, podSet := range component.PodSets {
for podSetsIdx, podSet := range componentStatus.PodSets {
toInject := &workloadv1beta2.AppWrapperPodSetInfo{}
if r.Config.EnableKueueIntegrations {
if podSetsIdx < len(component.PodSetInfos) {
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/appwrapper/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

Expand Down Expand Up @@ -100,6 +101,7 @@ var _ = BeforeSuite(func() {
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
awstatus.CacheClient(k8sClient)
})

var _ = AfterSuite(func() {
Expand Down
88 changes: 88 additions & 0 deletions internal/controller/awstatus/status_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2024 IBM Corporation.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package awstatus

import (
"context"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
cachedClient client.Client
)

const controllerName = "workload.codeflare.dev-appwrapper"

// CacheClient initializes cachedClient; must be called during startup
func CacheClient(k8sclient client.Client) {
cachedClient = k8sclient
}

// BaseSSAAppWrapper creates a new object based on the input AppWrapper that
// only contains the fields necessary to identify the original object.
// The object can be used as a base for Server-Side-Apply.
func BaseSSAAppWrapper(aw *workloadv1beta2.AppWrapper) *workloadv1beta2.AppWrapper {
patch := &workloadv1beta2.AppWrapper{
ObjectMeta: metav1.ObjectMeta{
UID: aw.UID,
Name: aw.Name,
Namespace: aw.Namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: workloadv1beta2.GroupVersion.String(),
Kind: "AppWrapper",
},
}
return patch
}

// EnsureComponentStatusInitialized initializes aw.Status.ComponenetStatus, including performing PodSet inference for known GVKs
func EnsureComponentStatusInitialized(ctx context.Context, aw *workloadv1beta2.AppWrapper) error {
if len(aw.Status.ComponentStatus) == len(aw.Spec.Components) {
return nil
}

// Construct definitive PodSets from the Spec + InferPodSets and cache in the Status (to avoid clashing with user updates to the Spec via apply)
compStatus := make([]workloadv1beta2.AppWrapperComponentStatus, len(aw.Spec.Components))
for idx := range aw.Spec.Components {
if len(aw.Spec.Components[idx].DeclaredPodSets) > 0 {
compStatus[idx].PodSets = aw.Spec.Components[idx].DeclaredPodSets
} else {
obj := &unstructured.Unstructured{}
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(aw.Spec.Components[idx].Template.Raw, nil, obj); err != nil {
// Transient error; Template.Raw was validated by our AdmissionController
return err
}
podSets, err := utils.InferPodSets(obj)
if err != nil {
// Transient error; InferPodSets was validated by our AdmissionController
return err
}
compStatus[idx].PodSets = podSets
}
}
aw.Status.ComponentStatus = compStatus

patch := BaseSSAAppWrapper(aw)
patch.Status.ComponentStatus = compStatus
return cachedClient.Status().Patch(ctx, patch, client.Apply, client.FieldOwner(controllerName), client.ForceOwnership)
}
13 changes: 6 additions & 7 deletions internal/controller/workload/child_admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -70,20 +70,19 @@ func (r *ChildWorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Reques
!meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.Unhealthy)) {
admittedChildren := 0
childrenWithPods := 0
for componentIdx, component := range aw.Spec.Components {
if len(component.PodSets) > 0 {
for componentIdx, componentStatus := range aw.Status.ComponentStatus {
if len(componentStatus.PodSets) > 0 {
childrenWithPods += 1
unstruct := &unstructured.Unstructured{}
if _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(component.Template.Raw, nil, unstruct); err == nil {
wlName := jobframework.GetWorkloadNameForOwnerWithGVK(unstruct.GetName(), *gvk)
if gv, err := schema.ParseGroupVersion(componentStatus.APIVersion); err == nil {
wlName := jobframework.GetWorkloadNameForOwnerWithGVK(componentStatus.Name, gv.WithKind(componentStatus.Kind))
wl := &kueue.Workload{}
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: aw.Namespace, Name: wlName}, wl); err == nil {
if workload.IsAdmitted(wl) {
admittedChildren += 1
} else {
admission := kueue.Admission{
ClusterQueue: childJobQueueName,
PodSetAssignments: make([]kueue.PodSetAssignment, len(aw.Spec.Components[componentIdx].PodSets)),
PodSetAssignments: make([]kueue.PodSetAssignment, len(componentStatus.PodSets)),
}
for i := range admission.PodSetAssignments {
admission.PodSetAssignments[i].Name = wl.Spec.PodSets[i].Name
Expand Down
27 changes: 16 additions & 11 deletions internal/controller/workload/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package workload

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -32,6 +33,7 @@ import (
"sigs.k8s.io/kueue/pkg/podset"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
"github.com/project-codeflare/appwrapper/pkg/utils"
)

Expand Down Expand Up @@ -77,17 +79,21 @@ func (aw *AppWrapper) GVK() schema.GroupVersionKind {

func (aw *AppWrapper) PodSets() []kueue.PodSet {
podSets := []kueue.PodSet{}
for componentIdx, component := range aw.Spec.Components {
if len(component.PodSets) > 0 {
if err := awstatus.EnsureComponentStatusInitialized(context.Background(), (*workloadv1beta2.AppWrapper)(aw)); err != nil {
// Kueue will raise an error on zero length PodSet. Unfortunately, the Kueue API prevents propagating the actual error
return podSets
}
for idx := range aw.Status.ComponentStatus {
if len(aw.Status.ComponentStatus[idx].PodSets) > 0 {
obj := &unstructured.Unstructured{}
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(component.Template.Raw, nil, obj); err != nil {
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(aw.Spec.Components[idx].Template.Raw, nil, obj); err != nil {
continue // Should be unreachable; Template.Raw validated by our AdmissionController
}
for psIdx, podSet := range component.PodSets {
for psIdx, podSet := range aw.Status.ComponentStatus[idx].PodSets {
replicas := utils.Replicas(podSet)
if template, err := utils.GetPodTemplateSpec(obj, podSet.Path); err == nil {
podSets = append(podSets, kueue.PodSet{
Name: fmt.Sprintf("%s-%v-%v", aw.Name, componentIdx, psIdx),
Name: fmt.Sprintf("%s-%v-%v", aw.Name, idx, psIdx),
Template: *template,
Count: replicas,
})
Expand All @@ -101,17 +107,16 @@ func (aw *AppWrapper) PodSets() []kueue.PodSet {
// RunWithPodSetsInfo records the assigned PodSetInfos for each component and sets aw.spec.Suspend to false
func (aw *AppWrapper) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error {
podSetsInfoIndex := 0
for componentIdx := range aw.Spec.Components {
component := &aw.Spec.Components[componentIdx]
if len(component.PodSetInfos) != len(component.PodSets) {
component.PodSetInfos = make([]workloadv1beta2.AppWrapperPodSetInfo, len(component.PodSets))
for idx := range aw.Spec.Components {
if len(aw.Spec.Components[idx].PodSetInfos) != len(aw.Status.ComponentStatus[idx].PodSets) {
aw.Spec.Components[idx].PodSetInfos = make([]workloadv1beta2.AppWrapperPodSetInfo, len(aw.Status.ComponentStatus[idx].PodSets))
}
for podSetIdx := range component.PodSets {
for podSetIdx := range aw.Status.ComponentStatus[idx].PodSets {
podSetsInfoIndex += 1
if podSetsInfoIndex > len(podSetsInfo) {
continue // we will return an error below...continuing to get an accurate count for the error message
}
component.PodSetInfos[podSetIdx] = workloadv1beta2.AppWrapperPodSetInfo{
aw.Spec.Components[idx].PodSetInfos[podSetIdx] = workloadv1beta2.AppWrapperPodSetInfo{
Annotations: podSetsInfo[podSetIdx].Annotations,
Labels: podSetsInfo[podSetIdx].Labels,
NodeSelector: podSetsInfo[podSetIdx].NodeSelector,
Expand Down
Loading