Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
'topology.k8s.aws/network-node-layer-2',
'topology.k8s.aws/network-node-layer-3'
}
from .quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits

class VolumeConfig(BaseModel):
model_config = ConfigDict(extra="forbid")
Expand Down Expand Up @@ -111,7 +110,7 @@ class PyTorchJobConfig(BaseModel):
min_length=1
)
node_count: Optional[int] = Field(
default=1,
default=None,
alias="node_count",
description="Number of nodes",
ge=1
Expand Down Expand Up @@ -286,21 +285,27 @@ def to_domain(self) -> Dict:
"""
Convert flat config to domain model (HyperPodPytorchJobSpec)
"""

valid, error = _is_valid(
self.vcpu, self.memory, self.accelerators, self.node_count, self.instance_type
)

if not valid:
raise ValueError(error)

# Create container with required fields
if self.instance_type is None:
requests_value = {"nvidia.com/gpu": "0"}
limits_value = {"nvidia.com/gpu": "0"}
else:
requests_value = _get_resources_from_compute_quotas(self.instance_type, self.vcpu, self.memory, self.accelerators) or _get_resources_from_instance(self.instance_type, self.node_count)
limits_value = _get_limits(self.instance_type, self.vcpu_limit, self.memory_limit, self.accelerators_limit)
requests_value = {}
if self.accelerators is not None:
requests_value["accelerators"] = str(self.accelerators)
if self.vcpu is not None:
requests_value["vcpu"] = str(self.vcpu)
if self.memory is not None:
requests_value["memory"] = str(self.memory)

limits_value = {}
if self.accelerators_limit is not None:
limits_value["accelerators"] = str(self.accelerators_limit)
if self.vcpu_limit is not None:
limits_value["vcpu"] = str(self.vcpu_limit)
if self.memory_limit is not None:
limits_value["memory"] = str(self.memory_limit)

# Create container with required fields
container_kwargs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@
"type": "null"
}
],
"default": 1,
"default": null,
"description": "Number of nodes",
"title": "Node Count"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
SAGEMAKER_MANAGED_CLUSTER_QUEUE_SUFFIX = "-clusterqueue"
SAGEMAKER_TRAINING_LAUNCHER_DIR = str(Path(__file__).parent.parent / "sagemaker_hyperpod_recipes")
NVIDIA_GPU_RESOURCE_LIMIT_KEY = "nvidia.com/gpu"
NEURON_RESOURCE_LIMIT_KEY = "aws.amazon.com/neurondevice"
AVAILABLE_ACCELERATOR_DEVICES_KEY = "AvailableAcceleratorDevices"
TOTAL_ACCELERATOR_DEVICES_KEY = "TotalAcceleratorDevices"
USER_NAME_LABEL_KEY = "sagemaker.user/created-by"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2979,7 +2979,7 @@ class ReplicaSpec(BaseModel):

name: str = Field(description="The name for the replica set")
replicas: Optional[int] = Field(
default=1,
default=0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is autogenerated from CRD and it might get overwritten if classes are generated again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, good point, will need to discuss this with training and task gov team on how to handle default node-count for quota calculations.

description="Replicas is the desired number of replicas of the given template.",
)
spares: Optional[int] = Field(
Expand Down
93 changes: 93 additions & 0 deletions src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from pydantic import ConfigDict, Field

from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL
from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
_HyperPodPytorchJob, HyperPodPytorchJobStatus
)
Expand All @@ -18,6 +20,9 @@
import yaml
import logging

from sagemaker.hyperpod.training.quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits



TRAINING_GROUP = "sagemaker.amazonaws.com"
API_VERSION = "v1"
Expand Down Expand Up @@ -52,6 +57,88 @@ def verify_kube_config(cls):

# Verify Kubernetes version compatibility
verify_kubernetes_version_compatibility(cls.get_logger())
@classmethod
def _extract_numeric_value(cls, value):
"""Extract numeric value from strings like '1.5Gi' -> 1.5"""
if not value:
return None
import re
match = re.match(r'^([0-9]*\.?[0-9]+)', str(value))
return float(match.group(1)) if match else None

@classmethod
def _process_replica_resources(cls, data):
"""Process and validate replica resource configuration."""
try:
node_count = data.get('replicas', None)

# Extract nested configuration with validation
template = data.get('template', {})
spec = template.get('spec', {})
node_selector = spec.get('nodeSelector', {})
instance_type = node_selector.get(INSTANCE_TYPE_LABEL) if node_selector else None
if not instance_type:
return None

containers = spec.get('containers', [])

if not containers:
raise ValueError("No containers found in template spec")

container = containers[0]
resources = container.get('resources', {})
requests = resources.get('requests', {})
limits = resources.get('limits', {})

# Extract resource values
vcpu = float(requests.get('vcpu')) if requests.get('vcpu') else None
memory = cls._extract_numeric_value(requests.get('memory'))
accelerators = int(requests.get('accelerators')) if requests.get('accelerators') else None
memory_limit = cls._extract_numeric_value(limits.get('memory'))
vcpu_limit = float(limits.get('vcpu')) if limits.get('vcpu') else None
accelerators_limit = int(limits.get('accelerators')) if limits.get('accelerators') else None

# Validate configuration
valid, error = _is_valid(vcpu, memory, accelerators, node_count, instance_type)
if not valid:
raise ValueError(error)

# Calculate resource values
requests_value = (_get_resources_from_compute_quotas(instance_type, vcpu, memory, accelerators)
or _get_resources_from_instance(instance_type, node_count=1))
limits_value = _get_limits(instance_type, vcpu_limit, memory_limit, accelerators_limit)

# Update data with calculated values
data['template']['spec']['containers'][0]['resources']['requests'] = requests_value
data['template']['spec']['containers'][0]['resources']['limits'] = limits_value
return data
except KeyError as e:
raise ValueError(f"Missing required configuration key: {str(e)}")

@classmethod
def _get_container_resources(cls, replica_spec):
"""Extract container resources from replica spec."""
container_resources = replica_spec['template']['spec']['containers'][0]['resources']
return container_resources['requests'], container_resources['limits']

@classmethod
def allocate_quotas_if_applicable(cls, spec):
try:
spec_dict = spec.model_dump()
replica_spec = spec_dict['replicaSpecs'][0]
cls._process_replica_resources(replica_spec)

# Update the original spec object directly
requests, limits = cls._get_container_resources(replica_spec)
spec.replicaSpecs[0].template.spec.containers[0].resources.requests = requests
spec.replicaSpecs[0].template.spec.containers[0].resources.limits = limits

return spec
except ValueError as e:
raise ValueError(e)
except Exception as e:
# In case of any other exception, return original spec
return spec

@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create_pytorchjob")
def create(self, debug=False):
Expand All @@ -65,6 +152,10 @@ def create(self, debug=False):
if not self.metadata.namespace:
self.metadata.namespace = get_default_namespace()

spec = self.allocate_quotas_if_applicable(spec)
if spec.replicaSpecs[0].replicas is None or spec.replicaSpecs[0].replicas == 0:
spec.replicaSpecs[0].replicas = 1 # default value

config = {
"apiVersion": f"{TRAINING_GROUP}/{API_VERSION}",
"kind": KIND,
Expand All @@ -91,6 +182,8 @@ def create(self, debug=False):
logger.error(f"Failed to create HyperPodPytorchJob {self.metadata.name}!")
handle_exception(e, self.metadata.name, self.metadata.namespace)



@classmethod
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pytorchjobs")
def list(cls, namespace=None) -> List["HyperPodPytorchJob"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from sagemaker.hyperpod.cli.constants.command_constants import NVIDIA_GPU_RESOURCE_LIMIT_KEY, NEURON_RESOURCE_LIMIT_KEY
from sagemaker.hyperpod.cli.utils import (
setup_logger
)
Expand Down Expand Up @@ -247,9 +247,6 @@ def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerator
return False, f"Invalid instance-type {instance_type}. Please re-check the instance type and contact AWS for support."

if instance_type is not None:
#neither specified
if (not has_gpu_quota_allocation and not node_specified):
return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
#both resources and node count specified
if (has_gpu_quota_allocation and node_specified):
return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
Expand All @@ -268,10 +265,10 @@ def _get_accelerator_type_and_count(instance_type: str) -> Tuple[Optional[str],

# Determine the appropriate key based on instance type
if trainium_count > 0:
accelerator_key = "aws.amazon.com/neurondevice"
accelerator_key = NEURON_RESOURCE_LIMIT_KEY
instance_accelerator_count = trainium_count
elif gpu_count > 0:
accelerator_key = "nvidia.com/gpu"
accelerator_key = NVIDIA_GPU_RESOURCE_LIMIT_KEY
instance_accelerator_count = gpu_count

if instance_accelerator_count is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@ def test_invalid_node_count_accelerators_parameter(self, test_job_name):
text=True
)
assert result.returncode != 0
assert "ValueError: Either node-count or a combination of accelerators, vcpu, " in result.stdout
assert "Either node-count or a combination of accelerators, vcpu, " in result.stdout
assert "memory-in-gib must be specified for instance-type ml.g5.8xlarge" in result.stdout

def test_invalid_no_node_count_or_quota_parameter(self, test_job_name):
"""Test that invalid case where both node-count and any of the quota parameters are provided"""
"""Test that case where both node-count and any of the quota parameters are provided"""
# Test with no node-count, no accelerators/vcpu/memory parameters
create_cmd = [
"hyp", "create", "hyp-pytorch-job",
Expand All @@ -242,9 +242,7 @@ def test_invalid_no_node_count_or_quota_parameter(self, test_job_name):
capture_output=True,
text=True
)
assert result.returncode != 0
assert "ValueError: Either node-count or a combination of accelerators, vcpu, " in result.stdout
assert "memory-in-gib must be specified for instance-type ml.g5.8xlarge" in result.stdout
assert result.returncode == 0

def test_invalid_instance_type_parameter(self, test_job_name):
"""Test case where invalid instance type parameter is provided"""
Expand Down Expand Up @@ -274,5 +272,5 @@ def test_invalid_instance_type_parameter(self, test_job_name):
text=True
)
assert result.returncode != 0
assert "ValueError: Invalid instance-type ml.n5.8xlarge" in result.stdout
assert "Invalid instance-type ml.n5.8xlarge" in result.stdout
logger.info("Successfully verified invalid instance type error")
Loading
Loading