diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 613b6f7..ddd7946 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -90,6 +90,18 @@ rules: - patch - update - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - kubeflow.org resources: diff --git a/hack/e2e-util.sh b/hack/e2e-util.sh index 691a676..eec6a31 100755 --- a/hack/e2e-util.sh +++ b/hack/e2e-util.sh @@ -29,6 +29,9 @@ export IMAGE_KUBEFLOW_OPERATOR="docker.io/kubeflow/training-operator:v1-855e096" export KUBERAY_VERSION=1.1.0 export IMAGE_KUBERAY_OPERATOR="quay.io/kuberay/operator:v1.1.1" +export JOBSET_VERSION=v0.7.3 +export IMAGE_JOBSET_OPERATOR="registry.k8s.io/jobset/jobset:v0.7.3" + # These are small images used by the e2e tests. # Pull and kind load to avoid long delays during testing export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0" @@ -142,7 +145,7 @@ function check_prerequisites { } function pull_images { - for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} + for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} ${IMAGE_JOBSET_OPERATOR} do docker pull $image if [ $? -ne 0 ] @@ -238,7 +241,7 @@ function kind_up_cluster { } function kind_load_images { - for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} + for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} ${IMAGE_JOBSET_OPERATOR} do kind load docker-image ${image} ${CLUSTER_CONTEXT} if [ $? -ne 0 ] @@ -267,6 +270,15 @@ function configure_cluster { echo -n "." && sleep 1; done echo "" + + echo "Installing JobSet operator version $JOBSET_VERSION" + kubectl apply --server-side -f "https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml" + echo "Waiting for pods in the jobset namespace to become ready" + while [[ $(kubectl get pods -n jobset-system -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}' | tr ' ' '\n' | sort -u) != "True" ]] + do + echo -n "." && sleep 1; + done + echo "" } function wait_for_appwrapper_controller { diff --git a/internal/controller/appwrapper/appwrapper_controller.go b/internal/controller/appwrapper/appwrapper_controller.go index 2256f04..eb01a0f 100644 --- a/internal/controller/appwrapper/appwrapper_controller.go +++ b/internal/controller/appwrapper/appwrapper_controller.go @@ -80,7 +80,7 @@ type componentStatusSummary struct { //+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/finalizers,verbs=update -// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters +// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters, jobsets //+kubebuilder:rbac:groups="",resources=pods;services,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments;statefulsets,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +89,7 @@ type componentStatusSummary struct { //+kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=podgroups,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ray.io,resources=rayclusters;rayjobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete // Reconcile reconciles an appwrapper // Please see [aw-states] for documentation of this method. diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index be1892d..be74b10 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -492,6 +492,27 @@ func InferPodSets(obj *unstructured.Unstructured) ([]workloadv1beta2.AppWrapperP } podSets = append(podSets, workloadv1beta2.AppWrapperPodSet{Replicas: ptr.To(replicas), Path: "template.spec.template"}) + case schema.GroupVersionKind{Group: "jobset.x-k8s.io", Version: "v1alpha2", Kind: "JobSet"}: + if jobs, err := getValueAtPath(obj.UnstructuredContent(), "template.spec.replicatedJobs"); err == nil { + if jobs, ok := jobs.([]interface{}); ok { + for i := range jobs { + jobSpecPrefix := fmt.Sprintf("template.spec.replicatedJobs[%v].", i) + // validate path to replica template + if _, err := getValueAtPath(obj.UnstructuredContent(), jobSpecPrefix+"template"); err == nil { + var replicas int32 = 1 + if parallelism, err := GetReplicas(obj, jobSpecPrefix+"template.spec.parallelism"); err == nil { + replicas = parallelism + } + if completions, err := GetReplicas(obj, jobSpecPrefix+"template.spec.completions"); err == nil && completions < replicas { + replicas = completions + } + // infer replica count + podSets = append(podSets, workloadv1beta2.AppWrapperPodSet{Replicas: ptr.To(replicas), Path: jobSpecPrefix + "template.spec.template"}) + } + } + } + } + case schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PyTorchJob"}: for _, replicaType := range []string{"Master", "Worker"} { prefix := "template.spec.pytorchReplicaSpecs." + replicaType + "." diff --git a/samples/wrapped-jobset.yaml b/samples/wrapped-jobset.yaml new file mode 100644 index 0000000..898fafd --- /dev/null +++ b/samples/wrapped-jobset.yaml @@ -0,0 +1,47 @@ +apiVersion: workload.codeflare.dev/v1beta2 +kind: AppWrapper +metadata: + name: sample-jobset + labels: + kueue.x-k8s.io/queue-name: default-queue +spec: + components: + - template: + apiVersion: jobset.x-k8s.io/v1alpha2 + kind: JobSet + metadata: + name: sample-jobset + spec: + replicatedJobs: + - name: workers + template: + spec: + parallelism: 4 + completions: 4 + backoffLimit: 0 + template: + spec: + restartPolicy: Never + containers: + - name: sleep + image: quay.io/project-codeflare/busybox:1.36 + command: ["sh", "-c", "sleep 100"] + resources: + requests: + cpu: 100m + - name: driver + template: + spec: + parallelism: 1 + completions: 1 + backoffLimit: 0 + template: + spec: + restartPolicy: Never + containers: + - name: sleep + image: quay.io/project-codeflare/busybox:1.36 + command: ["sh", "-c", "sleep 100"] + resources: + requests: + cpu: 100m diff --git a/test/e2e/appwrapper_test.go b/test/e2e/appwrapper_test.go index 577dde0..d2a1572 100644 --- a/test/e2e/appwrapper_test.go +++ b/test/e2e/appwrapper_test.go @@ -104,7 +104,16 @@ var _ = Describe("AppWrapper E2E Test", func() { }) }) - // TODO: JobSets (would have to deploy JobSet controller on e2e test cluster) + Describe("Creation of JobSet GVKs", Label("Kueue", "Standalone"), func() { + It("JobSet", func() { + aw := createAppWrapper(ctx, jobset(500)) + appwrappers = append(appwrappers, aw) + // TODO: Need dev versions of kueue/jobset to get correct handling of ownership + // Once those are released change the test to: + // Expect(waitAWPodsReady(ctx, aw)).Should(Succeed()) + Eventually(AppWrapperPhase(ctx, aw), 15*time.Second).Should(Equal(workloadv1beta2.AppWrapperResuming)) + }) + }) Describe("Webhook Enforces AppWrapper Invariants", Label("Webhook"), func() { Context("Structural Invariants", func() { diff --git a/test/e2e/fixtures_test.go b/test/e2e/fixtures_test.go index 87aac20..9a87109 100644 --- a/test/e2e/fixtures_test.go +++ b/test/e2e/fixtures_test.go @@ -336,6 +336,44 @@ func stuckInitBatchjob(milliCPU int64) workloadv1beta2.AppWrapperComponent { } } +const jobsetYAML = ` +apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + generateName: %v +spec: + replicatedJobs: + - name: driver + template: + spec: + parallelism: 1 + completions: 1 + backoffLimit: 0 + template: + spec: + restartPolicy: Never + containers: + - name: sleep + image: quay.io/project-codeflare/busybox:1.36 + command: ["sh", "-c", "sleep 100"] + resources: + requests: + cpu: %v +` + +func jobset(milliCPU int64) workloadv1beta2.AppWrapperComponent { + yamlString := fmt.Sprintf(jobsetYAML, + "jobset-", + resource.NewMilliQuantity(milliCPU, resource.DecimalSI)) + + jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString)) + Expect(err).NotTo(HaveOccurred()) + return workloadv1beta2.AppWrapperComponent{ + DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Path: "template.spec.replicatedJobs[0].template.spec.template"}}, + Template: runtime.RawExtension{Raw: jsonBytes}, + } +} + // This is not a useful PyTorchJob: // 1. Using a dummy busybox image to avoid pulling a large & rate-limited image from dockerhub // 2. We avoid needing the injected sidecar (alpine:3.10 from dockerhub) by not specifying a Master