diff --git a/internal/controller/appwrapper/appwrapper_controller.go b/internal/controller/appwrapper/appwrapper_controller.go index 41f1515..6cf5b69 100644 --- a/internal/controller/appwrapper/appwrapper_controller.go +++ b/internal/controller/appwrapper/appwrapper_controller.go @@ -23,6 +23,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -61,6 +62,11 @@ type podStatusSummary struct { failed int32 } +type componentStatusSummary struct { + expected int32 + deployed int32 +} + // permission to fully control appwrappers //+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch @@ -202,7 +208,26 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) if aw.Spec.Suspend { return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspending) // begin undeployment } - podStatus, err := r.workloadStatus(ctx, aw) + + // First, check the Component-level status of the workload + compStatus, err := r.getComponentStatus(ctx, aw) + if err != nil { + return ctrl.Result{}, err + } + + // Detect externally deleted components and transition to Failed with no GracePeriod or retry + if compStatus.deployed != compStatus.expected { + meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ + Type: string(workloadv1beta2.Unhealthy), + Status: metav1.ConditionTrue, + Reason: "MissingComponent", + Message: fmt.Sprintf("Only found %v deployed components, but was expecting %v", compStatus.deployed, compStatus.expected), + }) + return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed) + } + + // Second, check the Pod-level status of the workload + podStatus, err := r.getPodStatus(ctx, aw) if err != nil { return ctrl.Result{}, err } @@ -430,7 +455,7 @@ func (r *AppWrapperReconciler) resetOrFail(ctx context.Context, aw *workloadv1be } } -func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*podStatusSummary, error) { +func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*podStatusSummary, error) { pods := &v1.PodList{} if err := r.List(ctx, pods, client.InNamespace(aw.Namespace), @@ -455,6 +480,30 @@ func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv return summary, nil } +func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*componentStatusSummary, error) { + summary := &componentStatusSummary{expected: int32(len(aw.Status.ComponentStatus))} + + for componentIdx := range aw.Status.ComponentStatus { + cs := &aw.Status.ComponentStatus[componentIdx] + obj := &metav1.PartialObjectMetadata{TypeMeta: metav1.TypeMeta{Kind: cs.Kind, APIVersion: cs.APIVersion}} + if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { + summary.deployed += 1 + } else { + if apierrors.IsNotFound(err) { + meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{ + Type: string(workloadv1beta2.Unhealthy), + Status: metav1.ConditionTrue, + Reason: "ComponentNotFound", + }) + } else { + return nil, err + } + } + } + + return summary, nil +} + func (r *AppWrapperReconciler) limitDuration(desired time.Duration) time.Duration { if desired < 0 { return 0 * time.Second diff --git a/internal/controller/appwrapper/appwrapper_controller_test.go b/internal/controller/appwrapper/appwrapper_controller_test.go index e206cb1..7755cb3 100644 --- a/internal/controller/appwrapper/appwrapper_controller_test.go +++ b/internal/controller/appwrapper/appwrapper_controller_test.go @@ -104,7 +104,7 @@ var _ = Describe("AppWrapper Controller", func() { Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.PodsReady))).Should(BeFalse()) Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue()) Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse()) - podStatus, err := awReconciler.workloadStatus(ctx, aw) + podStatus, err := awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.pending).Should(Equal(utils.ExpectedPodCount(aw))) @@ -121,7 +121,7 @@ var _ = Describe("AppWrapper Controller", func() { Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.PodsReady))).Should(BeFalse()) Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue()) Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse()) - podStatus, err = awReconciler.workloadStatus(ctx, aw) + podStatus, err = awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.pending).Should(Equal(utils.ExpectedPodCount(aw) - 1)) } @@ -142,7 +142,7 @@ var _ = Describe("AppWrapper Controller", func() { Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue()) Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse()) Expect((*workload.AppWrapper)(aw).PodsReady()).Should(BeTrue()) - podStatus, err := awReconciler.workloadStatus(ctx, aw) + podStatus, err := awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.running).Should(Equal(utils.ExpectedPodCount(aw))) _, finished := (*workload.AppWrapper)(aw).Finished() @@ -161,7 +161,7 @@ var _ = Describe("AppWrapper Controller", func() { _, err = awReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: awName}) // see deletion has completed Expect(err).NotTo(HaveOccurred()) - podStatus, err := awReconciler.workloadStatus(ctx, aw) + podStatus, err := awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.failed + podStatus.succeeded + podStatus.running + podStatus.pending).Should(Equal(int32(0))) }) @@ -184,7 +184,7 @@ var _ = Describe("AppWrapper Controller", func() { Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.QuotaReserved))).Should(BeTrue()) Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue()) Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse()) - podStatus, err := awReconciler.workloadStatus(ctx, aw) + podStatus, err := awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.running).Should(Equal(utils.ExpectedPodCount(aw) - 1)) Expect(podStatus.succeeded).Should(Equal(int32(1))) @@ -239,7 +239,7 @@ var _ = Describe("AppWrapper Controller", func() { Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.QuotaReserved))).Should(BeFalse()) Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeFalse()) Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeTrue()) - podStatus, err := awReconciler.workloadStatus(ctx, aw) + podStatus, err := awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.failed + podStatus.succeeded + podStatus.running + podStatus.pending).Should(Equal(int32(0))) }) @@ -296,7 +296,7 @@ var _ = Describe("AppWrapper Controller", func() { Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.PodsReady))).Should(BeFalse()) Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue()) Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse()) - podStatus, err := awReconciler.workloadStatus(ctx, aw) + podStatus, err := awReconciler.getPodStatus(ctx, aw) Expect(err).NotTo(HaveOccurred()) Expect(podStatus.pending).Should(Equal(int32(1))) }) diff --git a/test/e2e/appwrapper_test.go b/test/e2e/appwrapper_test.go index 536363d..e21a0ec 100644 --- a/test/e2e/appwrapper_test.go +++ b/test/e2e/appwrapper_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -276,6 +277,19 @@ var _ = Describe("AppWrapper E2E Test", func() { Expect(waitAWPodsReady(ctx, aw)).Should(Succeed()) Eventually(AppWrapperPhase(ctx, aw), 90*time.Second).Should(Equal(workloadv1beta2.AppWrapperFailed)) }) + + It("Deleting a Running Component yields a failed AppWrapper", func() { + aw := createAppWrapper(ctx, pytorchjob(2, 500)) + appwrappers = append(appwrappers, aw) + Eventually(AppWrapperPhase(ctx, aw), 60*time.Second).Should(Equal(workloadv1beta2.AppWrapperRunning)) + aw = getAppWrapper(ctx, types.NamespacedName{Name: aw.Name, Namespace: aw.Namespace}) + toDelete := &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{Kind: aw.Status.ComponentStatus[0].Kind, APIVersion: aw.Status.ComponentStatus[0].APIVersion}, + ObjectMeta: metav1.ObjectMeta{Name: aw.Status.ComponentStatus[0].Name, Namespace: aw.Namespace}, + } + Expect(getClient(ctx).Delete(ctx, toDelete)).Should(Succeed()) + Eventually(AppWrapperPhase(ctx, aw), 60*time.Second).Should(Equal(workloadv1beta2.AppWrapperFailed)) + }) }) Describe("Load Testing", Label("slow"), Label("Kueue", "Standalone"), func() {