Skip to content

Commit a06a94b

Browse files
committed
Implementing Task Gov. feature for SDK flow
1 parent f571859 commit a06a94b

File tree

10 files changed

+570
-39
lines changed

10 files changed

+570
-39
lines changed

hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
'topology.k8s.aws/network-node-layer-2',
2121
'topology.k8s.aws/network-node-layer-3'
2222
}
23-
from .quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits
2423

2524
class VolumeConfig(BaseModel):
2625
model_config = ConfigDict(extra="forbid")
@@ -111,7 +110,7 @@ class PyTorchJobConfig(BaseModel):
111110
min_length=1
112111
)
113112
node_count: Optional[int] = Field(
114-
default=1,
113+
default=None,
115114
alias="node_count",
116115
description="Number of nodes",
117116
ge=1
@@ -286,21 +285,27 @@ def to_domain(self) -> Dict:
286285
"""
287286
Convert flat config to domain model (HyperPodPytorchJobSpec)
288287
"""
289-
290-
valid, error = _is_valid(
291-
self.vcpu, self.memory, self.accelerators, self.node_count, self.instance_type
292-
)
293-
294-
if not valid:
295-
raise ValueError(error)
296288

297289
# Create container with required fields
298290
if self.instance_type is None:
299291
requests_value = {"nvidia.com/gpu": "0"}
300292
limits_value = {"nvidia.com/gpu": "0"}
301293
else:
302-
requests_value = _get_resources_from_compute_quotas(self.instance_type, self.vcpu, self.memory, self.accelerators) or _get_resources_from_instance(self.instance_type, self.node_count)
303-
limits_value = _get_limits(self.instance_type, self.vcpu_limit, self.memory_limit, self.accelerators_limit)
294+
requests_value = {}
295+
if self.accelerators is not None:
296+
requests_value["accelerators"] = str(self.accelerators)
297+
if self.vcpu is not None:
298+
requests_value["vcpu"] = str(self.vcpu)
299+
if self.memory is not None:
300+
requests_value["memory"] = str(self.memory)
301+
302+
limits_value = {}
303+
if self.accelerators_limit is not None:
304+
limits_value["accelerators"] = str(self.accelerators_limit)
305+
if self.vcpu_limit is not None:
306+
limits_value["vcpu"] = str(self.vcpu_limit)
307+
if self.memory_limit is not None:
308+
limits_value["memory"] = str(self.memory_limit)
304309

305310
# Create container with required fields
306311
container_kwargs = {

hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@
202202
"type": "null"
203203
}
204204
],
205-
"default": 1,
205+
"default": null,
206206
"description": "Number of nodes",
207207
"title": "Node Count"
208208
},

src/sagemaker/hyperpod/cli/constants/command_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
SAGEMAKER_MANAGED_CLUSTER_QUEUE_SUFFIX = "-clusterqueue"
4545
SAGEMAKER_TRAINING_LAUNCHER_DIR = str(Path(__file__).parent.parent / "sagemaker_hyperpod_recipes")
4646
NVIDIA_GPU_RESOURCE_LIMIT_KEY = "nvidia.com/gpu"
47+
NEURON_RESOURCE_LIMIT_KEY = "aws.amazon.com/neurondevice"
4748
AVAILABLE_ACCELERATOR_DEVICES_KEY = "AvailableAcceleratorDevices"
4849
TOTAL_ACCELERATOR_DEVICES_KEY = "TotalAcceleratorDevices"
4950
USER_NAME_LABEL_KEY = "sagemaker.user/created-by"

src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2979,7 +2979,7 @@ class ReplicaSpec(BaseModel):
29792979

29802980
name: str = Field(description="The name for the replica set")
29812981
replicas: Optional[int] = Field(
2982-
default=1,
2982+
default=0,
29832983
description="Replicas is the desired number of replicas of the given template.",
29842984
)
29852985
spares: Optional[int] = Field(

src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from pydantic import ConfigDict, Field
2+
3+
from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL
24
from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
35
_HyperPodPytorchJob, HyperPodPytorchJobStatus
46
)
@@ -18,6 +20,9 @@
1820
import yaml
1921
import logging
2022

23+
from sagemaker.hyperpod.training.quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits
24+
25+
2126

2227
TRAINING_GROUP = "sagemaker.amazonaws.com"
2328
API_VERSION = "v1"
@@ -52,6 +57,88 @@ def verify_kube_config(cls):
5257

5358
# Verify Kubernetes version compatibility
5459
verify_kubernetes_version_compatibility(cls.get_logger())
60+
@classmethod
61+
def _extract_numeric_value(cls, value):
62+
"""Extract numeric value from strings like '1.5Gi' -> 1.5"""
63+
if not value:
64+
return None
65+
import re
66+
match = re.match(r'^([0-9]*\.?[0-9]+)', str(value))
67+
return float(match.group(1)) if match else None
68+
69+
@classmethod
70+
def _process_replica_resources(cls, data):
71+
"""Process and validate replica resource configuration."""
72+
try:
73+
node_count = data.get('replicas', None)
74+
75+
# Extract nested configuration with validation
76+
template = data.get('template', {})
77+
spec = template.get('spec', {})
78+
node_selector = spec.get('nodeSelector', {})
79+
instance_type = node_selector.get(INSTANCE_TYPE_LABEL) if node_selector else None
80+
if not instance_type:
81+
return None
82+
83+
containers = spec.get('containers', [])
84+
85+
if not containers:
86+
raise ValueError("No containers found in template spec")
87+
88+
container = containers[0]
89+
resources = container.get('resources', {})
90+
requests = resources.get('requests', {})
91+
limits = resources.get('limits', {})
92+
93+
# Extract resource values
94+
vcpu = float(requests.get('vcpu')) if requests.get('vcpu') else None
95+
memory = cls._extract_numeric_value(requests.get('memory'))
96+
accelerators = int(requests.get('accelerators')) if requests.get('accelerators') else None
97+
memory_limit = cls._extract_numeric_value(limits.get('memory'))
98+
vcpu_limit = float(limits.get('vcpu')) if limits.get('vcpu') else None
99+
accelerators_limit = int(limits.get('accelerators')) if limits.get('accelerators') else None
100+
101+
# Validate configuration
102+
valid, error = _is_valid(vcpu, memory, accelerators, node_count, instance_type)
103+
if not valid:
104+
raise ValueError(error)
105+
106+
# Calculate resource values
107+
requests_value = (_get_resources_from_compute_quotas(instance_type, vcpu, memory, accelerators)
108+
or _get_resources_from_instance(instance_type, node_count=1))
109+
limits_value = _get_limits(instance_type, vcpu_limit, memory_limit, accelerators_limit)
110+
111+
# Update data with calculated values
112+
data['template']['spec']['containers'][0]['resources']['requests'] = requests_value
113+
data['template']['spec']['containers'][0]['resources']['limits'] = limits_value
114+
return data
115+
except KeyError as e:
116+
raise ValueError(f"Missing required configuration key: {str(e)}")
117+
118+
@classmethod
119+
def _get_container_resources(cls, replica_spec):
120+
"""Extract container resources from replica spec."""
121+
container_resources = replica_spec['template']['spec']['containers'][0]['resources']
122+
return container_resources['requests'], container_resources['limits']
123+
124+
@classmethod
125+
def allocate_quotas_if_applicable(cls, spec):
126+
try:
127+
spec_dict = spec.model_dump()
128+
replica_spec = spec_dict['replicaSpecs'][0]
129+
cls._process_replica_resources(replica_spec)
130+
131+
# Update the original spec object directly
132+
requests, limits = cls._get_container_resources(replica_spec)
133+
spec.replicaSpecs[0].template.spec.containers[0].resources.requests = requests
134+
spec.replicaSpecs[0].template.spec.containers[0].resources.limits = limits
135+
136+
return spec
137+
except ValueError as e:
138+
raise ValueError(e)
139+
except Exception as e:
140+
# In case of any other exception, return original spec
141+
return spec
55142

56143
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create_pytorchjob")
57144
def create(self, debug=False):
@@ -65,6 +152,10 @@ def create(self, debug=False):
65152
if not self.metadata.namespace:
66153
self.metadata.namespace = get_default_namespace()
67154

155+
spec = self.allocate_quotas_if_applicable(spec)
156+
if spec.replicaSpecs[0].replicas == 0 :
157+
spec.replicaSpecs[0].replicas = 1 # default value
158+
68159
config = {
69160
"apiVersion": f"{TRAINING_GROUP}/{API_VERSION}",
70161
"kind": KIND,
@@ -91,6 +182,8 @@ def create(self, debug=False):
91182
logger.error(f"Failed to create HyperPodPytorchJob {self.metadata.name}!")
92183
handle_exception(e, self.metadata.name, self.metadata.namespace)
93184

185+
186+
94187
@classmethod
95188
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pytorchjobs")
96189
def list(cls, namespace=None) -> List["HyperPodPytorchJob"]:
Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
1111
# ANY KIND, either express or implied. See the License for the specific
1212
# language governing permissions and limitations under the License.
13-
13+
from sagemaker.hyperpod.cli.constants.command_constants import NVIDIA_GPU_RESOURCE_LIMIT_KEY, NEURON_RESOURCE_LIMIT_KEY
1414
from sagemaker.hyperpod.cli.utils import (
1515
setup_logger
1616
)
@@ -247,9 +247,6 @@ def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerator
247247
return False, f"Invalid instance-type {instance_type}. Please re-check the instance type and contact AWS for support."
248248

249249
if instance_type is not None:
250-
#neither specified
251-
if (not has_gpu_quota_allocation and not node_specified):
252-
return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
253250
#both resources and node count specified
254251
if (has_gpu_quota_allocation and node_specified):
255252
return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
@@ -268,10 +265,10 @@ def _get_accelerator_type_and_count(instance_type: str) -> Tuple[Optional[str],
268265

269266
# Determine the appropriate key based on instance type
270267
if trainium_count > 0:
271-
accelerator_key = "aws.amazon.com/neurondevice"
268+
accelerator_key = NEURON_RESOURCE_LIMIT_KEY
272269
instance_accelerator_count = trainium_count
273270
elif gpu_count > 0:
274-
accelerator_key = "nvidia.com/gpu"
271+
accelerator_key = NVIDIA_GPU_RESOURCE_LIMIT_KEY
275272
instance_accelerator_count = gpu_count
276273

277274
if instance_accelerator_count is not None:

test/integration_tests/training/cli/test_gpu_quota_allocation.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,11 @@ def test_invalid_node_count_accelerators_parameter(self, test_job_name):
220220
text=True
221221
)
222222
assert result.returncode != 0
223-
assert "ValueError: Either node-count or a combination of accelerators, vcpu, " in result.stdout
223+
assert "Either node-count or a combination of accelerators, vcpu, " in result.stdout
224224
assert "memory-in-gib must be specified for instance-type ml.g5.8xlarge" in result.stdout
225225

226226
def test_invalid_no_node_count_or_quota_parameter(self, test_job_name):
227-
"""Test that invalid case where both node-count and any of the quota parameters are provided"""
227+
"""Test that case where both node-count and any of the quota parameters are provided"""
228228
# Test with no node-count, no accelerators/vcpu/memory parameters
229229
create_cmd = [
230230
"hyp", "create", "hyp-pytorch-job",
@@ -242,9 +242,7 @@ def test_invalid_no_node_count_or_quota_parameter(self, test_job_name):
242242
capture_output=True,
243243
text=True
244244
)
245-
assert result.returncode != 0
246-
assert "ValueError: Either node-count or a combination of accelerators, vcpu, " in result.stdout
247-
assert "memory-in-gib must be specified for instance-type ml.g5.8xlarge" in result.stdout
245+
assert result.returncode == 0
248246

249247
def test_invalid_instance_type_parameter(self, test_job_name):
250248
"""Test case where invalid instance type parameter is provided"""
@@ -274,5 +272,5 @@ def test_invalid_instance_type_parameter(self, test_job_name):
274272
text=True
275273
)
276274
assert result.returncode != 0
277-
assert "ValueError: Invalid instance-type ml.n5.8xlarge" in result.stdout
275+
assert "Invalid instance-type ml.n5.8xlarge" in result.stdout
278276
logger.info("Successfully verified invalid instance type error")

0 commit comments

Comments
 (0)