Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions hack/e2e-util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
export LOG_LEVEL=${TEST_LOG_LEVEL:-2}
export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-"true"}
export CLUSTER_CONTEXT="--name test"
export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0"
export IMAGE_BUSY_BOX_LATEST="quay.io/project-codeflare/busybox:latest"
export KIND_OPT=${KIND_OPT:=" --config ${ROOT_DIR}/hack/kind-config.yaml"}
export KA_BIN=_output/bin
export WAIT_TIME="20s"
export KUTTL_VERSION=0.15.0
export KUBEFLOW_VERSION=v1.7.0
export CERTMANAGER_VERSION=v1.13.3
DUMP_LOGS="true"

# These are images used by the e2e tests.
# Pull and kind load to avoid long delays during testing
export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0"
export IMAGE_BUSY_BOX_LATEST="quay.io/project-codeflare/busybox:latest"

function update_test_host {

local arch="$(go env GOARCH)"
Expand Down Expand Up @@ -109,19 +114,15 @@ function check_prerequisites {
}

function pull_images {
docker pull ${IMAGE_ECHOSERVER}
if [ $? -ne 0 ]
then
echo "Failed to pull ${IMAGE_ECHOSERVER}"
exit 1
fi

docker pull ${IMAGE_BUSY_BOX_LATEST}
if [ $? -ne 0 ]
then
echo "Failed to pull ${IMAGE_BUSY_BOX_LATEST}"
exit 1
fi
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST}
do
docker pull $image
if [ $? -ne 0 ]
then
echo "Failed to pull $image"
exit 1
fi
done

docker images
}
Expand Down Expand Up @@ -149,14 +150,26 @@ function kind_up_cluster {

function configure_cluster {
echo "Installing cert-manager"
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.3/cert-manager.yaml
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/$CERTMANAGER_VERSION/cert-manager.yaml

# sleep to ensure cert-manager is fully functional
echo "Waiting for pod in the cert-manager namespace to become ready"
while [[ $(kubectl get pods -n cert-manager -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}' | tr ' ' '\n' | sort -u) != "True" ]]
do
echo -n "." && sleep 1;
done
echo ""

echo "Installing Kubeflow operator version $KUBEFLOW_VERSION"
kubectl apply -k "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=$KUBEFLOW_VERSION"

# Sleep until the kubeflow operator is running
echo "Waiting for pods in the kueueflow namespace to become ready"
while [[ $(kubectl get pods -n kubeflow -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}' | tr ' ' '\n' | sort -u) != "True" ]]
do
echo -n "." && sleep 1;
done
echo ""
}

function wait_for_appwrapper_controller {
Expand Down
2 changes: 1 addition & 1 deletion hack/kueue/kueue-manifests-v0.6.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11063,7 +11063,7 @@ data:
#pprofBindAddress: :8082
#waitForPodsReady:
# enable: true
manageJobsWithoutQueueName: true
# manageJobsWithoutQueueName: true
#internalCertManagement:
# enable: false
# webhookServiceName: ""
Expand Down
21 changes: 18 additions & 3 deletions internal/controller/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
)
Expand Down Expand Up @@ -315,16 +317,29 @@ func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloa
if err != nil {
return err, true // fatal
}

ref := &metav1.OwnerReference{APIVersion: GVK.GroupVersion().String(), Kind: GVK.Kind, Name: aw.Name, UID: aw.UID}
myWorkloadName, err := jobframework.GetWorkloadNameForOwnerRef(ref)
if err != nil {
return err, true
}

for _, obj := range objects {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[constants.ParentWorkloadAnnotation] = myWorkloadName
obj.SetAnnotations(annotations)
if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil {
return err, true
}
if err := r.Create(ctx, obj); err != nil {
if apierrors.IsAlreadyExists(err) {
continue // ignore existing component
}
return err, meta.IsNoMatchError(err) || apierrors.IsInvalid(err) // fatal
}
if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil {
return err, true
}
}
return nil, false
}
Expand Down
4 changes: 2 additions & 2 deletions samples/wrapped-deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: workload.codeflare.dev/v1beta2
kind: AppWrapper
metadata:
name: sample2
name: sample-deployment
annotations:
kueue.x-k8s.io/queue-name: user-queue
spec:
Expand All @@ -14,7 +14,7 @@ spec:
apiVersion: apps/v1
kind: Deployment
metadata:
name: test
name: sample-deployment-deployment
labels:
app: test
spec:
Expand Down
28 changes: 28 additions & 0 deletions samples/wrapped-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: workload.codeflare.dev/v1beta2
kind: AppWrapper
metadata:
name: sample-job
annotations:
kueue.x-k8s.io/queue-name: user-queue
spec:
suspend: true
components:
- podSets:
- replicas: 1
path: template.spec.template
template:
apiVersion: batch/v1
kind: Job
metadata:
name: sample-job-job
spec:
template:
spec:
restartPolicy: Never
containers:
- name: busybox
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 600"]
resources:
requests:
cpu: 1
4 changes: 2 additions & 2 deletions samples/wrapped-pod.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: workload.codeflare.dev/v1beta2
kind: AppWrapper
metadata:
name: sample
name: sample-pod
annotations:
kueue.x-k8s.io/queue-name: user-queue
spec:
Expand All @@ -14,7 +14,7 @@ spec:
apiVersion: v1
kind: Pod
metadata:
name: sample
name: sample-pod-pod
spec:
restartPolicy: Never
initContainers:
Expand Down
51 changes: 51 additions & 0 deletions samples/wrapped-pytorch-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
apiVersion: workload.codeflare.dev/v1beta2
kind: AppWrapper
metadata:
name: sample-pytorch-job
annotations:
kueue.x-k8s.io/queue-name: user-queue
spec:
suspend: true
components:
- podSets:
- replicas: 1
path: template.spec.pytorchReplicaSpecs.Master.template
- replicas: 1
path: template.spec.pytorchReplicaSpecs.Worker.template
template:
apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
name: pytorch-simple
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=1"
resources:
requests:
cpu: 1
Worker:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=1"
resources:
requests:
cpu: 1
29 changes: 27 additions & 2 deletions test/e2e/appwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,28 @@ var _ = Describe("AppWrapper E2E Test", func() {
appwrappers = append(appwrappers, aw)
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
})
// TODO: Batch v1.Jobs
It("Batch Jobs", func() {
aw := createAppWrapper(ctx, batchjob(250))
appwrappers = append(appwrappers, aw)
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
})

It("Mixed Basic Resources", func() {
aw := createAppWrapper(ctx, pod(100), deployment(2, 100), statefulset(2, 100), service())
aw := createAppWrapper(ctx, pod(100), deployment(2, 100), statefulset(2, 100), service(), batchjob(100))
appwrappers = append(appwrappers, aw)
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
})
})

Describe("Creation of Kubeflow Training Operator GVKs", func() {
It("PyTorch Jobs", func() {
aw := createAppWrapper(ctx, pytorchjob(1, 100, 2, 250))
appwrappers = append(appwrappers, aw)
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
})

// TODO: Additional Kubeflow Training Operator GVKs of interest

})

Describe("Error Handling for Invalid Resources", func() {
Expand Down Expand Up @@ -90,6 +106,15 @@ var _ = Describe("AppWrapper E2E Test", func() {

})

Describe("Recognition of Child Jobs", func() {
// TODO: Test scenarios where the AW "just fits" in the quota and
// contains components that Kueue might try to queue
// but should not in this case because they are using the parent workload's quota
// 1. batch v1 jobs
// 2. pytorch jobs (which themself contain child Jobs)

})

Describe("Detection of Completion Status", func() {

})
Expand Down
Loading