11
11
from hyperpod_cluster_stack_template .v1_0 .model import ClusterStackBase
12
12
13
13
from sagemaker .hyperpod import create_boto3_client
14
+ from sagemaker .hyperpod .common .telemetry import _hyperpod_telemetry_emitter
15
+ from sagemaker .hyperpod .common .telemetry .constants import Feature
14
16
15
17
CAPABILITIES_FOR_STACK_CREATION = [
16
18
'CAPABILITY_IAM' ,
@@ -33,7 +35,7 @@ class HpClusterStack(ClusterStackBase):
33
35
>>> # Create a cluster stack instance
34
36
>>> stack = HpClusterStack()
35
37
>>> response = stack.create(region="us-west-2")
36
- >>>
38
+ >>>
37
39
>>> # Check stack status
38
40
>>> status = stack.get_status()
39
41
>>> print(status)
@@ -46,17 +48,17 @@ class HpClusterStack(ClusterStackBase):
46
48
None ,
47
49
description = "CloudFormation stack name set after stack creation"
48
50
)
49
-
51
+
50
52
def __init__ (self , ** data ):
51
53
super ().__init__ (** data )
52
-
54
+
53
55
@field_validator ('kubernetes_version' , mode = 'before' )
54
56
@classmethod
55
57
def validate_kubernetes_version (cls , v ):
56
58
if v is not None :
57
59
return str (v )
58
60
return v
59
-
61
+
60
62
@field_validator ('availability_zone_ids' , 'nat_gateway_ids' , 'eks_private_subnet_ids' , 'security_group_ids' , 'private_route_table_ids' , 'private_subnet_ids' , 'instance_group_settings' , 'rig_settings' , 'tags' , mode = 'before' )
61
63
@classmethod
62
64
def validate_list_fields (cls , v ):
@@ -71,7 +73,7 @@ def validate_list_fields(cls, v):
71
73
v = ast .literal_eval (v )
72
74
except :
73
75
pass # Keep original value if parsing fails
74
-
76
+
75
77
if isinstance (v , list ) and len (v ) == 0 :
76
78
raise ValueError ('Empty lists [] are not allowed. Use proper YAML array format or leave field empty.' )
77
79
return v
@@ -80,14 +82,15 @@ def validate_list_fields(cls, v):
80
82
def get_template () -> str :
81
83
try :
82
84
template_content = importlib .resources .read_text (
83
- 'hyperpod_cluster_stack_template' ,
85
+ 'hyperpod_cluster_stack_template' ,
84
86
'creation_template.yaml'
85
87
)
86
88
yaml_data = yaml .safe_load (template_content )
87
89
return json .dumps (yaml_data , indent = 2 , ensure_ascii = False )
88
90
except Exception as e :
89
91
raise RuntimeError (f"Failed to load template from package: { e } " )
90
92
93
+ @_hyperpod_telemetry_emitter (Feature .HYPERPOD , "create_cluster_stack" )
91
94
def create (self ,
92
95
region : Optional [str ] = None ) -> str :
93
96
"""Creates a new HyperPod cluster CloudFormation stack.
@@ -121,7 +124,7 @@ def create(self,
121
124
>>> # Create stack in default region
122
125
>>> stack = HpClusterStack()
123
126
>>> response = stack.create()
124
- >>>
127
+ >>>
125
128
>>> # Create stack in specific region
126
129
>>> response = stack.create(region="us-east-1")
127
130
"""
@@ -178,12 +181,12 @@ def _create_parameters(self) -> List[Dict[str, str]]:
178
181
settings_list = json .loads (str (value ))
179
182
except (json .JSONDecodeError , TypeError ):
180
183
settings_list = []
181
-
184
+
182
185
for i , setting in enumerate (settings_list , 1 ):
183
186
formatted_setting = self ._convert_nested_keys (setting )
184
187
parameters .append ({
185
188
'ParameterKey' : f'InstanceGroupSettings{ i } ' ,
186
- 'ParameterValue' : "[" + json .dumps (formatted_setting ) + "]" if isinstance (formatted_setting , (dict , list )) else str (formatted_setting )
189
+ 'ParameterValue' : "[" + json .dumps (formatted_setting ) + "]" if isinstance (formatted_setting , (dict , list )) else str (formatted_setting )
187
190
})
188
191
elif field_name == 'rig_settings' :
189
192
# Handle both list and JSON string formats
@@ -195,7 +198,7 @@ def _create_parameters(self) -> List[Dict[str, str]]:
195
198
settings_list = json .loads (str (value ))
196
199
except (json .JSONDecodeError , TypeError ):
197
200
settings_list = []
198
-
201
+
199
202
for i , setting in enumerate (settings_list , 1 ):
200
203
formatted_setting = self ._convert_nested_keys (setting )
201
204
parameters .append ({
@@ -204,7 +207,7 @@ def _create_parameters(self) -> List[Dict[str, str]]:
204
207
})
205
208
else :
206
209
# Convert array fields to comma-separated strings
207
- if field_name in ['availability_zone_ids' , 'nat_gateway_ids' , 'eks_private_subnet_ids' ,
210
+ if field_name in ['availability_zone_ids' , 'nat_gateway_ids' , 'eks_private_subnet_ids' ,
208
211
'security_group_ids' , 'private_route_table_ids' , 'private_subnet_ids' ]:
209
212
if isinstance (value , list ):
210
213
value = ',' .join (str (item ) for item in value )
@@ -236,22 +239,22 @@ def _parse_tags(self) -> List[Dict[str, str]]:
236
239
"""Parse tags field and return proper CloudFormation tags format."""
237
240
if not self .tags :
238
241
return []
239
-
242
+
240
243
tags_list = self .tags
241
244
if isinstance (self .tags , str ):
242
245
try :
243
246
tags_list = json .loads (self .tags )
244
247
except (json .JSONDecodeError , TypeError ):
245
248
return []
246
-
249
+
247
250
# Convert array of strings to Key-Value format
248
251
if isinstance (tags_list , list ) and tags_list :
249
252
# Check if already in Key-Value format
250
253
if isinstance (tags_list [0 ], dict ) and 'Key' in tags_list [0 ]:
251
254
return tags_list
252
255
# Convert string array to Key-Value format
253
256
return [{'Key' : tag , 'Value' : '' } for tag in tags_list if isinstance (tag , str )]
254
-
257
+
255
258
return []
256
259
257
260
def _convert_nested_keys (self , obj : Any ) -> Any :
@@ -267,7 +270,7 @@ def _snake_to_pascal(snake_str: str) -> str:
267
270
"""Convert snake_case string to PascalCase."""
268
271
if not snake_str :
269
272
return snake_str
270
-
273
+
271
274
# Handle specific cases
272
275
mappings = {
273
276
"eks_cluster_name" : "EKSClusterName" ,
@@ -289,14 +292,14 @@ def _snake_to_pascal(snake_str: str) -> str:
289
292
"EbsVolumeConfig" : "EbsVolumeConfig" ,
290
293
"VolumeSizeInGB" : "VolumeSizeInGB"
291
294
}
292
-
295
+
293
296
if snake_str in mappings :
294
297
return mappings [snake_str ]
295
298
296
299
297
300
# Default case: capitalize each word
298
301
return '' .join (word .capitalize () for word in snake_str .split ('_' ))
299
-
302
+
300
303
def _snake_to_camel (self , snake_str : str ) -> str :
301
304
"""Convert snake_case string to camelCase for nested JSON keys."""
302
305
if not snake_str :
@@ -305,6 +308,7 @@ def _snake_to_camel(self, snake_str: str) -> str:
305
308
return words [0 ] + '' .join (word .capitalize () for word in words [1 :])
306
309
307
310
@staticmethod
311
+ @_hyperpod_telemetry_emitter (Feature .HYPERPOD , "describe_cluster_stack" )
308
312
def describe (stack_name , region : Optional [str ] = None ):
309
313
"""Describes a CloudFormation stack by name.
310
314
@@ -343,7 +347,7 @@ def describe(stack_name, region: Optional[str] = None):
343
347
344
348
>>> # Describe a stack by name
345
349
>>> response = HpClusterStack.describe("my-stack-name")
346
- >>>
350
+ >>>
347
351
>>> # Describe stack in specific region
348
352
>>> response = HpClusterStack.describe("my-stack", region="us-west-2")
349
353
"""
@@ -368,6 +372,7 @@ def describe(stack_name, region: Optional[str] = None):
368
372
raise RuntimeError ("Stack operation failed" )
369
373
370
374
@staticmethod
375
+ @_hyperpod_telemetry_emitter (Feature .HYPERPOD , "list_cluster_stack" )
371
376
def list (region : Optional [str ] = None , stack_status_filter : Optional [List [str ]] = None ):
372
377
"""Lists all CloudFormation stacks in the specified region.
373
378
@@ -403,7 +408,7 @@ def list(region: Optional[str] = None, stack_status_filter: Optional[List[str]]
403
408
404
409
>>> # List stacks in current region
405
410
>>> stacks = HpClusterStack.list()
406
- >>>
411
+ >>>
407
412
>>> # List stacks in specific region
408
413
>>> stacks = HpClusterStack.list(region="us-east-1")
409
414
"""
@@ -412,19 +417,19 @@ def list(region: Optional[str] = None, stack_status_filter: Optional[List[str]]
412
417
try :
413
418
# Prepare API call parameters
414
419
list_params = {}
415
-
420
+
416
421
if stack_status_filter is not None :
417
422
list_params ['StackStatusFilter' ] = stack_status_filter
418
-
423
+
419
424
response = cf .list_stacks (** list_params )
420
-
425
+
421
426
# Only filter DELETE_COMPLETE when no explicit filter is provided
422
427
if stack_status_filter is None and 'StackSummaries' in response :
423
428
response ['StackSummaries' ] = [
424
- stack for stack in response ['StackSummaries' ]
429
+ stack for stack in response ['StackSummaries' ]
425
430
if stack .get ('StackStatus' ) != 'DELETE_COMPLETE'
426
431
]
427
-
432
+
428
433
return response
429
434
except cf .exceptions .ClientError as e :
430
435
error_code = e .response ['Error' ]['Code' ]
0 commit comments