diff --git a/hack/e2e-util.sh b/hack/e2e-util.sh index cc594ac..d9b12c7 100755 --- a/hack/e2e-util.sh +++ b/hack/e2e-util.sh @@ -15,14 +15,19 @@ export LOG_LEVEL=${TEST_LOG_LEVEL:-2} export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-"true"} export CLUSTER_CONTEXT="--name test" -export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0" -export IMAGE_BUSY_BOX_LATEST="quay.io/project-codeflare/busybox:latest" export KIND_OPT=${KIND_OPT:=" --config ${ROOT_DIR}/hack/kind-config.yaml"} export KA_BIN=_output/bin export WAIT_TIME="20s" export KUTTL_VERSION=0.15.0 +export KUBEFLOW_VERSION=v1.7.0 +export CERTMANAGER_VERSION=v1.13.3 DUMP_LOGS="true" +# These are images used by the e2e tests. +# Pull and kind load to avoid long delays during testing +export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0" +export IMAGE_BUSY_BOX_LATEST="quay.io/project-codeflare/busybox:latest" + function update_test_host { local arch="$(go env GOARCH)" @@ -109,19 +114,15 @@ function check_prerequisites { } function pull_images { - docker pull ${IMAGE_ECHOSERVER} - if [ $? -ne 0 ] - then - echo "Failed to pull ${IMAGE_ECHOSERVER}" - exit 1 - fi - - docker pull ${IMAGE_BUSY_BOX_LATEST} - if [ $? -ne 0 ] - then - echo "Failed to pull ${IMAGE_BUSY_BOX_LATEST}" - exit 1 - fi + for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} + do + docker pull $image + if [ $? -ne 0 ] + then + echo "Failed to pull $image" + exit 1 + fi + done docker images } @@ -149,7 +150,7 @@ function kind_up_cluster { function configure_cluster { echo "Installing cert-manager" - kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.3/cert-manager.yaml + kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/$CERTMANAGER_VERSION/cert-manager.yaml # sleep to ensure cert-manager is fully functional echo "Waiting for pod in the cert-manager namespace to become ready" @@ -157,6 +158,18 @@ function configure_cluster { do echo -n "." && sleep 1; done + echo "" + + echo "Installing Kubeflow operator version $KUBEFLOW_VERSION" + kubectl apply -k "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=$KUBEFLOW_VERSION" + + # Sleep until the kubeflow operator is running + echo "Waiting for pods in the kueueflow namespace to become ready" + while [[ $(kubectl get pods -n kubeflow -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}' | tr ' ' '\n' | sort -u) != "True" ]] + do + echo -n "." && sleep 1; + done + echo "" } function wait_for_appwrapper_controller { diff --git a/hack/kueue/kueue-manifests-v0.6.0.yaml b/hack/kueue/kueue-manifests-v0.6.0.yaml index e7bd72a..ee2dd4b 100644 --- a/hack/kueue/kueue-manifests-v0.6.0.yaml +++ b/hack/kueue/kueue-manifests-v0.6.0.yaml @@ -11063,7 +11063,7 @@ data: #pprofBindAddress: :8082 #waitForPodsReady: # enable: true - manageJobsWithoutQueueName: true + # manageJobsWithoutQueueName: true #internalCertManagement: # enable: false # webhookServiceName: "" diff --git a/internal/controller/appwrapper_controller.go b/internal/controller/appwrapper_controller.go index cad4243..4f5a521 100644 --- a/internal/controller/appwrapper_controller.go +++ b/internal/controller/appwrapper_controller.go @@ -34,6 +34,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2" ) @@ -315,16 +317,29 @@ func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloa if err != nil { return err, true // fatal } + + ref := &metav1.OwnerReference{APIVersion: GVK.GroupVersion().String(), Kind: GVK.Kind, Name: aw.Name, UID: aw.UID} + myWorkloadName, err := jobframework.GetWorkloadNameForOwnerRef(ref) + if err != nil { + return err, true + } + for _, obj := range objects { + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[constants.ParentWorkloadAnnotation] = myWorkloadName + obj.SetAnnotations(annotations) + if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil { + return err, true + } if err := r.Create(ctx, obj); err != nil { if apierrors.IsAlreadyExists(err) { continue // ignore existing component } return err, meta.IsNoMatchError(err) || apierrors.IsInvalid(err) // fatal } - if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil { - return err, true - } } return nil, false } diff --git a/samples/wrapped-deployment.yaml b/samples/wrapped-deployment.yaml index 9885888..65c257b 100644 --- a/samples/wrapped-deployment.yaml +++ b/samples/wrapped-deployment.yaml @@ -1,7 +1,7 @@ apiVersion: workload.codeflare.dev/v1beta2 kind: AppWrapper metadata: - name: sample2 + name: sample-deployment annotations: kueue.x-k8s.io/queue-name: user-queue spec: @@ -14,7 +14,7 @@ spec: apiVersion: apps/v1 kind: Deployment metadata: - name: test + name: sample-deployment-deployment labels: app: test spec: diff --git a/samples/wrapped-job.yaml b/samples/wrapped-job.yaml new file mode 100644 index 0000000..dc7b33b --- /dev/null +++ b/samples/wrapped-job.yaml @@ -0,0 +1,28 @@ +apiVersion: workload.codeflare.dev/v1beta2 +kind: AppWrapper +metadata: + name: sample-job + annotations: + kueue.x-k8s.io/queue-name: user-queue +spec: + suspend: true + components: + - podSets: + - replicas: 1 + path: template.spec.template + template: + apiVersion: batch/v1 + kind: Job + metadata: + name: sample-job-job + spec: + template: + spec: + restartPolicy: Never + containers: + - name: busybox + image: quay.io/project-codeflare/busybox:1.36 + command: ["sh", "-c", "sleep 600"] + resources: + requests: + cpu: 1 diff --git a/samples/wrapped-pod.yaml b/samples/wrapped-pod.yaml index cc4c485..db5c517 100644 --- a/samples/wrapped-pod.yaml +++ b/samples/wrapped-pod.yaml @@ -1,7 +1,7 @@ apiVersion: workload.codeflare.dev/v1beta2 kind: AppWrapper metadata: - name: sample + name: sample-pod annotations: kueue.x-k8s.io/queue-name: user-queue spec: @@ -14,7 +14,7 @@ spec: apiVersion: v1 kind: Pod metadata: - name: sample + name: sample-pod-pod spec: restartPolicy: Never initContainers: diff --git a/samples/wrapped-pytorch-job.yaml b/samples/wrapped-pytorch-job.yaml new file mode 100644 index 0000000..4dfdb79 --- /dev/null +++ b/samples/wrapped-pytorch-job.yaml @@ -0,0 +1,51 @@ +apiVersion: workload.codeflare.dev/v1beta2 +kind: AppWrapper +metadata: + name: sample-pytorch-job + annotations: + kueue.x-k8s.io/queue-name: user-queue +spec: + suspend: true + components: + - podSets: + - replicas: 1 + path: template.spec.pytorchReplicaSpecs.Master.template + - replicas: 1 + path: template.spec.pytorchReplicaSpecs.Worker.template + template: + apiVersion: "kubeflow.org/v1" + kind: PyTorchJob + metadata: + name: pytorch-simple + spec: + pytorchReplicaSpecs: + Master: + replicas: 1 + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1 + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + requests: + cpu: 1 + Worker: + replicas: 1 + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1 + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + requests: + cpu: 1 diff --git a/test/e2e/appwrapper_test.go b/test/e2e/appwrapper_test.go index 13ae29c..c24eb1b 100644 --- a/test/e2e/appwrapper_test.go +++ b/test/e2e/appwrapper_test.go @@ -53,12 +53,28 @@ var _ = Describe("AppWrapper E2E Test", func() { appwrappers = append(appwrappers, aw) Expect(waitAWPodsReady(ctx, aw)).Should(Succeed()) }) - // TODO: Batch v1.Jobs + It("Batch Jobs", func() { + aw := createAppWrapper(ctx, batchjob(250)) + appwrappers = append(appwrappers, aw) + Expect(waitAWPodsReady(ctx, aw)).Should(Succeed()) + }) + It("Mixed Basic Resources", func() { - aw := createAppWrapper(ctx, pod(100), deployment(2, 100), statefulset(2, 100), service()) + aw := createAppWrapper(ctx, pod(100), deployment(2, 100), statefulset(2, 100), service(), batchjob(100)) + appwrappers = append(appwrappers, aw) + Expect(waitAWPodsReady(ctx, aw)).Should(Succeed()) + }) + }) + + Describe("Creation of Kubeflow Training Operator GVKs", func() { + It("PyTorch Jobs", func() { + aw := createAppWrapper(ctx, pytorchjob(1, 100, 2, 250)) appwrappers = append(appwrappers, aw) Expect(waitAWPodsReady(ctx, aw)).Should(Succeed()) }) + + // TODO: Additional Kubeflow Training Operator GVKs of interest + }) Describe("Error Handling for Invalid Resources", func() { @@ -90,6 +106,15 @@ var _ = Describe("AppWrapper E2E Test", func() { }) + Describe("Recognition of Child Jobs", func() { + // TODO: Test scenarios where the AW "just fits" in the quota and + // contains components that Kueue might try to queue + // but should not in this case because they are using the parent workload's quota + // 1. batch v1 jobs + // 2. pytorch jobs (which themself contain child Jobs) + + }) + Describe("Detection of Completion Status", func() { }) diff --git a/test/e2e/fixtures_test.go b/test/e2e/fixtures_test.go index cbe5dce..c1cee3f 100644 --- a/test/e2e/fixtures_test.go +++ b/test/e2e/fixtures_test.go @@ -48,6 +48,7 @@ metadata: name: %v spec: restartPolicy: Never + terminationGracePeriodSeconds: 0 containers: - name: busybox image: quay.io/project-codeflare/busybox:1.36 @@ -174,3 +175,98 @@ func statefulset(replicaCount int, milliCPU int64) workloadv1beta2.AppWrapperCom Template: runtime.RawExtension{Raw: jsonBytes}, } } + +const batchJobYAML = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: %v +spec: + template: + spec: + restartPolicy: Never + terminationGracePeriodSeconds: 0 + containers: + - name: busybox + image: quay.io/project-codeflare/busybox:1.36 + command: ["sh", "-c", "sleep 600"] + resources: + requests: + cpu: %v +` + +func batchjob(milliCPU int64) workloadv1beta2.AppWrapperComponent { + yamlString := fmt.Sprintf(batchJobYAML, + randName("batchjob"), + resource.NewMilliQuantity(milliCPU, resource.DecimalSI)) + + jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString)) + Expect(err).NotTo(HaveOccurred()) + replicas := int32(1) + return workloadv1beta2.AppWrapperComponent{ + PodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: &replicas, Path: "template.spec.template"}}, + Template: runtime.RawExtension{Raw: jsonBytes}, + } +} + +const pytorchYAML = ` +apiVersion: "kubeflow.org/v1" +kind: PyTorchJob +metadata: + name: %v +spec: + pytorchReplicaSpecs: + Master: + replicas: %v + restartPolicy: OnFailure + template: + spec: + terminationGracePeriodSeconds: 0 + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1 + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + requests: + cpu: %v + Worker: + replicas: %v + restartPolicy: OnFailure + template: + spec: + terminationGracePeriodSeconds: 0 + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1 + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + requests: + cpu: %v +` + +func pytorchjob(replicasMaster int, milliCPUMaster int64, replicasWorker int, milliCPUWorker int64) workloadv1beta2.AppWrapperComponent { + yamlString := fmt.Sprintf(pytorchYAML, + randName("pytorchjob"), + replicasMaster, + resource.NewMilliQuantity(milliCPUMaster, resource.DecimalSI), + replicasWorker, + resource.NewMilliQuantity(milliCPUWorker, resource.DecimalSI), + ) + jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString)) + Expect(err).NotTo(HaveOccurred()) + masters := int32(replicasMaster) + workers := int32(replicasWorker) + return workloadv1beta2.AppWrapperComponent{ + PodSets: []workloadv1beta2.AppWrapperPodSet{ + {Replicas: &masters, Path: "template.spec.pytorchReplicaSpecs.Master.template"}, + {Replicas: &workers, Path: "template.spec.pytorchReplicaSpecs.Worker.template"}, + }, + Template: runtime.RawExtension{Raw: jsonBytes}, + } +} diff --git a/test/e2e/util_test.go b/test/e2e/util_test.go index 19cad2a..6ae7f82 100644 --- a/test/e2e/util_test.go +++ b/test/e2e/util_test.go @@ -199,7 +199,7 @@ func waitAWPodsDeleted(ctx context.Context, awNamespace string, awName string) e func waitAWPodsReady(ctx context.Context, aw *workloadv1beta2.AppWrapper) error { numExpected := controller.ExpectedPodCount(aw) phases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded} - return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 90*time.Second, true, podsInPhase(aw.Namespace, aw.Name, phases, numExpected)) + return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 120*time.Second, true, podsInPhase(aw.Namespace, aw.Name, phases, numExpected)) } func AppWrapperPhase(ctx context.Context, aw *workloadv1beta2.AppWrapper) func(g Gomega) workloadv1beta2.AppWrapperPhase {