From ed0dd46f676a06932f8d34de91fea0fa126e7c5e Mon Sep 17 00:00:00 2001 From: Amarjeet LNU Date: Fri, 22 Aug 2025 10:17:43 -0700 Subject: [PATCH] Implementing Task Gov. feature for SDK flow --- .../v1_1/model.py | 27 +- .../v1_1/schema.json | 2 +- .../cli/constants/command_constants.py | 1 + .../hyperpod_pytorch_job_unified_config.py | 2 +- .../hyperpod/training/hyperpod_pytorch_job.py | 93 ++++++ .../training}/quota_allocation_util.py | 9 +- .../training/cli/test_gpu_quota_allocation.py | 10 +- .../training/sdk/test_sdk_quota_allocation.py | 300 ++++++++++++++++++ .../sdk/test_sdk_resource_processing.py | 150 +++++++++ .../cli/test_quota_allocation_util.py | 15 +- 10 files changed, 570 insertions(+), 39 deletions(-) rename {hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1 => src/sagemaker/hyperpod/training}/quota_allocation_util.py (97%) create mode 100644 test/integration_tests/training/sdk/test_sdk_quota_allocation.py create mode 100644 test/integration_tests/training/sdk/test_sdk_resource_processing.py diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py index 010a6ad8..a0d3a144 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py @@ -20,7 +20,6 @@ 'topology.k8s.aws/network-node-layer-2', 'topology.k8s.aws/network-node-layer-3' } -from .quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits class VolumeConfig(BaseModel): model_config = ConfigDict(extra="forbid") @@ -111,7 +110,7 @@ class PyTorchJobConfig(BaseModel): min_length=1 ) node_count: Optional[int] = Field( - default=1, + default=None, alias="node_count", description="Number of nodes", ge=1 @@ -286,21 +285,27 @@ def to_domain(self) -> Dict: """ Convert flat config to domain model (HyperPodPytorchJobSpec) """ - - valid, error = _is_valid( - self.vcpu, self.memory, self.accelerators, self.node_count, self.instance_type - ) - - if not valid: - raise ValueError(error) # Create container with required fields if self.instance_type is None: requests_value = {"nvidia.com/gpu": "0"} limits_value = {"nvidia.com/gpu": "0"} else: - requests_value = _get_resources_from_compute_quotas(self.instance_type, self.vcpu, self.memory, self.accelerators) or _get_resources_from_instance(self.instance_type, self.node_count) - limits_value = _get_limits(self.instance_type, self.vcpu_limit, self.memory_limit, self.accelerators_limit) + requests_value = {} + if self.accelerators is not None: + requests_value["accelerators"] = str(self.accelerators) + if self.vcpu is not None: + requests_value["vcpu"] = str(self.vcpu) + if self.memory is not None: + requests_value["memory"] = str(self.memory) + + limits_value = {} + if self.accelerators_limit is not None: + limits_value["accelerators"] = str(self.accelerators_limit) + if self.vcpu_limit is not None: + limits_value["vcpu"] = str(self.vcpu_limit) + if self.memory_limit is not None: + limits_value["memory"] = str(self.memory_limit) # Create container with required fields container_kwargs = { diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json index 88caab3c..4b86c591 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json @@ -202,7 +202,7 @@ "type": "null" } ], - "default": 1, + "default": null, "description": "Number of nodes", "title": "Node Count" }, diff --git a/src/sagemaker/hyperpod/cli/constants/command_constants.py b/src/sagemaker/hyperpod/cli/constants/command_constants.py index c086179c..3fc96606 100644 --- a/src/sagemaker/hyperpod/cli/constants/command_constants.py +++ b/src/sagemaker/hyperpod/cli/constants/command_constants.py @@ -44,6 +44,7 @@ SAGEMAKER_MANAGED_CLUSTER_QUEUE_SUFFIX = "-clusterqueue" SAGEMAKER_TRAINING_LAUNCHER_DIR = str(Path(__file__).parent.parent / "sagemaker_hyperpod_recipes") NVIDIA_GPU_RESOURCE_LIMIT_KEY = "nvidia.com/gpu" +NEURON_RESOURCE_LIMIT_KEY = "aws.amazon.com/neurondevice" AVAILABLE_ACCELERATOR_DEVICES_KEY = "AvailableAcceleratorDevices" TOTAL_ACCELERATOR_DEVICES_KEY = "TotalAcceleratorDevices" USER_NAME_LABEL_KEY = "sagemaker.user/created-by" diff --git a/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py b/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py index fbdb9584..a7855ef5 100644 --- a/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py +++ b/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py @@ -2979,7 +2979,7 @@ class ReplicaSpec(BaseModel): name: str = Field(description="The name for the replica set") replicas: Optional[int] = Field( - default=1, + default=0, description="Replicas is the desired number of replicas of the given template.", ) spares: Optional[int] = Field( diff --git a/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py b/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py index 38325109..6abd9314 100644 --- a/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py +++ b/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py @@ -1,4 +1,6 @@ from pydantic import ConfigDict, Field + +from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import ( _HyperPodPytorchJob, HyperPodPytorchJobStatus ) @@ -18,6 +20,9 @@ import yaml import logging +from sagemaker.hyperpod.training.quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits + + TRAINING_GROUP = "sagemaker.amazonaws.com" API_VERSION = "v1" @@ -52,6 +57,88 @@ def verify_kube_config(cls): # Verify Kubernetes version compatibility verify_kubernetes_version_compatibility(cls.get_logger()) + @classmethod + def _extract_numeric_value(cls, value): + """Extract numeric value from strings like '1.5Gi' -> 1.5""" + if not value: + return None + import re + match = re.match(r'^([0-9]*\.?[0-9]+)', str(value)) + return float(match.group(1)) if match else None + + @classmethod + def _process_replica_resources(cls, data): + """Process and validate replica resource configuration.""" + try: + node_count = data.get('replicas', None) + + # Extract nested configuration with validation + template = data.get('template', {}) + spec = template.get('spec', {}) + node_selector = spec.get('nodeSelector', {}) + instance_type = node_selector.get(INSTANCE_TYPE_LABEL) if node_selector else None + if not instance_type: + return None + + containers = spec.get('containers', []) + + if not containers: + raise ValueError("No containers found in template spec") + + container = containers[0] + resources = container.get('resources', {}) + requests = resources.get('requests', {}) + limits = resources.get('limits', {}) + + # Extract resource values + vcpu = float(requests.get('vcpu')) if requests.get('vcpu') else None + memory = cls._extract_numeric_value(requests.get('memory')) + accelerators = int(requests.get('accelerators')) if requests.get('accelerators') else None + memory_limit = cls._extract_numeric_value(limits.get('memory')) + vcpu_limit = float(limits.get('vcpu')) if limits.get('vcpu') else None + accelerators_limit = int(limits.get('accelerators')) if limits.get('accelerators') else None + + # Validate configuration + valid, error = _is_valid(vcpu, memory, accelerators, node_count, instance_type) + if not valid: + raise ValueError(error) + + # Calculate resource values + requests_value = (_get_resources_from_compute_quotas(instance_type, vcpu, memory, accelerators) + or _get_resources_from_instance(instance_type, node_count=1)) + limits_value = _get_limits(instance_type, vcpu_limit, memory_limit, accelerators_limit) + + # Update data with calculated values + data['template']['spec']['containers'][0]['resources']['requests'] = requests_value + data['template']['spec']['containers'][0]['resources']['limits'] = limits_value + return data + except KeyError as e: + raise ValueError(f"Missing required configuration key: {str(e)}") + + @classmethod + def _get_container_resources(cls, replica_spec): + """Extract container resources from replica spec.""" + container_resources = replica_spec['template']['spec']['containers'][0]['resources'] + return container_resources['requests'], container_resources['limits'] + + @classmethod + def allocate_quotas_if_applicable(cls, spec): + try: + spec_dict = spec.model_dump() + replica_spec = spec_dict['replicaSpecs'][0] + cls._process_replica_resources(replica_spec) + + # Update the original spec object directly + requests, limits = cls._get_container_resources(replica_spec) + spec.replicaSpecs[0].template.spec.containers[0].resources.requests = requests + spec.replicaSpecs[0].template.spec.containers[0].resources.limits = limits + + return spec + except ValueError as e: + raise ValueError(e) + except Exception as e: + # In case of any other exception, return original spec + return spec @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create_pytorchjob") def create(self, debug=False): @@ -65,6 +152,10 @@ def create(self, debug=False): if not self.metadata.namespace: self.metadata.namespace = get_default_namespace() + spec = self.allocate_quotas_if_applicable(spec) + if spec.replicaSpecs[0].replicas is None or spec.replicaSpecs[0].replicas == 0: + spec.replicaSpecs[0].replicas = 1 # default value + config = { "apiVersion": f"{TRAINING_GROUP}/{API_VERSION}", "kind": KIND, @@ -91,6 +182,8 @@ def create(self, debug=False): logger.error(f"Failed to create HyperPodPytorchJob {self.metadata.name}!") handle_exception(e, self.metadata.name, self.metadata.namespace) + + @classmethod @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pytorchjobs") def list(cls, namespace=None) -> List["HyperPodPytorchJob"]: diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/quota_allocation_util.py b/src/sagemaker/hyperpod/training/quota_allocation_util.py similarity index 97% rename from hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/quota_allocation_util.py rename to src/sagemaker/hyperpod/training/quota_allocation_util.py index c35e03b3..99aec20c 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/quota_allocation_util.py +++ b/src/sagemaker/hyperpod/training/quota_allocation_util.py @@ -10,7 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. - +from sagemaker.hyperpod.cli.constants.command_constants import NVIDIA_GPU_RESOURCE_LIMIT_KEY, NEURON_RESOURCE_LIMIT_KEY from sagemaker.hyperpod.cli.utils import ( setup_logger ) @@ -247,9 +247,6 @@ def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerator return False, f"Invalid instance-type {instance_type}. Please re-check the instance type and contact AWS for support." if instance_type is not None: - #neither specified - if (not has_gpu_quota_allocation and not node_specified): - return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}" #both resources and node count specified if (has_gpu_quota_allocation and node_specified): return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}" @@ -268,10 +265,10 @@ def _get_accelerator_type_and_count(instance_type: str) -> Tuple[Optional[str], # Determine the appropriate key based on instance type if trainium_count > 0: - accelerator_key = "aws.amazon.com/neurondevice" + accelerator_key = NEURON_RESOURCE_LIMIT_KEY instance_accelerator_count = trainium_count elif gpu_count > 0: - accelerator_key = "nvidia.com/gpu" + accelerator_key = NVIDIA_GPU_RESOURCE_LIMIT_KEY instance_accelerator_count = gpu_count if instance_accelerator_count is not None: diff --git a/test/integration_tests/training/cli/test_gpu_quota_allocation.py b/test/integration_tests/training/cli/test_gpu_quota_allocation.py index 8324b5c1..dbc29d0f 100644 --- a/test/integration_tests/training/cli/test_gpu_quota_allocation.py +++ b/test/integration_tests/training/cli/test_gpu_quota_allocation.py @@ -220,11 +220,11 @@ def test_invalid_node_count_accelerators_parameter(self, test_job_name): text=True ) assert result.returncode != 0 - assert "ValueError: Either node-count or a combination of accelerators, vcpu, " in result.stdout + assert "Either node-count or a combination of accelerators, vcpu, " in result.stdout assert "memory-in-gib must be specified for instance-type ml.g5.8xlarge" in result.stdout def test_invalid_no_node_count_or_quota_parameter(self, test_job_name): - """Test that invalid case where both node-count and any of the quota parameters are provided""" + """Test that case where both node-count and any of the quota parameters are provided""" # Test with no node-count, no accelerators/vcpu/memory parameters create_cmd = [ "hyp", "create", "hyp-pytorch-job", @@ -242,9 +242,7 @@ def test_invalid_no_node_count_or_quota_parameter(self, test_job_name): capture_output=True, text=True ) - assert result.returncode != 0 - assert "ValueError: Either node-count or a combination of accelerators, vcpu, " in result.stdout - assert "memory-in-gib must be specified for instance-type ml.g5.8xlarge" in result.stdout + assert result.returncode == 0 def test_invalid_instance_type_parameter(self, test_job_name): """Test case where invalid instance type parameter is provided""" @@ -274,5 +272,5 @@ def test_invalid_instance_type_parameter(self, test_job_name): text=True ) assert result.returncode != 0 - assert "ValueError: Invalid instance-type ml.n5.8xlarge" in result.stdout + assert "Invalid instance-type ml.n5.8xlarge" in result.stdout logger.info("Successfully verified invalid instance type error") diff --git a/test/integration_tests/training/sdk/test_sdk_quota_allocation.py b/test/integration_tests/training/sdk/test_sdk_quota_allocation.py new file mode 100644 index 00000000..46eb237e --- /dev/null +++ b/test/integration_tests/training/sdk/test_sdk_quota_allocation.py @@ -0,0 +1,300 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import pytest +import time +from sagemaker.hyperpod.training import ( + HyperPodPytorchJob, + Containers, + ReplicaSpec, + Resources, + RunPolicy, + Spec, + Template, +) +from sagemaker.hyperpod.common.config import Metadata +from sagemaker.hyperpod.cli.utils import setup_logger + +logger = setup_logger(__name__) + +NAMESPACE = "hyperpod-ns-team1" +QUEUE = "hyperpod-ns-team1-localqueue" + + +class TestHyperPodSDKQuotaAllocation: + """Integration tests for HyperPod SDK quota allocation functionality.""" + + def test_create_job_with_quota_parameters(self, test_job_name, image_uri): + """Test creating a job with quota allocation parameters.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "1", "cpu": "3", "memory": "1"}, + limits={"nvidia.com/gpu": "1", "cpu": "4", "memory": "2"}, + ), + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.8xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # Create the job + pytorch_job.create() + logger.info(f"Created job with quota parameters: {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created with correct resource allocation + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_create_job_with_only_replicas_parameters(self, test_job_name, image_uri): + """Test creating a job with quota allocation parameters.""" + replica_specs = [ + ReplicaSpec( + name="pod", + replicas= 1, + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always" + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.8xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # Create the job + pytorch_job.create() + logger.info(f"Created job with quota parameters: {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created with correct resource allocation + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + + def test_create_job_with_float_quota_parameters(self, test_job_name, image_uri): + """Test creating a job with float quota parameters.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "1", "cpu": "3.6", "memory": "1"}, + limits={"nvidia.com/gpu": "1", "cpu": "4.8", "memory": "2.7"}, + ), + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.8xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # Create the job + pytorch_job.create() + logger.info(f"Created job with float quota parameters: {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_create_job_with_only_accelerators(self, test_job_name, image_uri): + """Test creating a job with only accelerators parameter.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "1"}, + limits={"nvidia.com/gpu": "1"}, + ), + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.8xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # Create the job + pytorch_job.create() + logger.info(f"Created job with only accelerators: {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_quota_allocation_validation(self, test_job_name, image_uri): + """Test that quota allocation validation works correctly.""" + # Test with invalid instance type + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "1"}, + limits={"nvidia.com/gpu": "1"}, + ), + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.invalid.type"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # This should raise a ValueError for invalid instance type + with pytest.raises(ValueError, match="Invalid instance-type"): + pytorch_job.create() + + def test_default_replicas_allocation(self, test_job_name, image_uri): + """Test that default replicas value is set to 1 when 0.""" + replica_specs = [ + ReplicaSpec( + name="pod", + replicas=0, # This should be set to 1 by default + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "1"}, + limits={"nvidia.com/gpu": "1"}, + ), + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.8xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # Create the job + pytorch_job.create() + logger.info(f"Created job with 0 replicas (should default to 1): {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") \ No newline at end of file diff --git a/test/integration_tests/training/sdk/test_sdk_resource_processing.py b/test/integration_tests/training/sdk/test_sdk_resource_processing.py new file mode 100644 index 00000000..3ecf8601 --- /dev/null +++ b/test/integration_tests/training/sdk/test_sdk_resource_processing.py @@ -0,0 +1,150 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import pytest +from sagemaker.hyperpod.training import HyperPodPytorchJob +from sagemaker.hyperpod.cli.utils import setup_logger + +logger = setup_logger(__name__) + + +class TestHyperPodSDKResourceProcessing: + """Integration tests for HyperPod SDK resource processing methods.""" + + def test_process_replica_resources_valid_config(self): + """Test _process_replica_resources with valid configuration.""" + data = { + 'name': 'pod', + 'template': { + 'spec': { + 'containers': [{ + 'name': 'container-name', + 'image': 'pytorch:latest', + 'resources': { + 'requests': { + 'nvidia.com/gpu': '1', + 'cpu': '3', + 'memory': '1' + }, + 'limits': { + 'nvidia.com/gpu': '1', + 'cpu': '4', + 'memory': '2' + } + } + }], + 'nodeSelector': { + 'node.kubernetes.io/instance-type': 'ml.g5.8xlarge' + } + } + } + } + + # Process the resources + processed_data = HyperPodPytorchJob._process_replica_resources(data) + + # Verify the data was processed + assert processed_data is not None + assert 'template' in processed_data + assert 'spec' in processed_data['template'] + assert 'containers' in processed_data['template']['spec'] + assert len(processed_data['template']['spec']['containers']) > 0 + + container = processed_data['template']['spec']['containers'][0] + assert 'resources' in container + assert 'requests' in container['resources'] + assert 'limits' in container['resources'] + + logger.info("Successfully processed replica resources with valid config") + + def test_process_replica_resources_missing_containers(self): + """Test _process_replica_resources with missing containers.""" + data = { + 'name': 'pod', + 'replicas': 1, + 'template': { + 'spec': { + 'containers': [], # Empty containers + 'nodeSelector': { + 'node.kubernetes.io/instance-type': 'ml.g5.8xlarge' + } + } + } + } + + # This should raise a ValueError + with pytest.raises(ValueError, match="No containers found"): + HyperPodPytorchJob._process_replica_resources(data) + + logger.info("Successfully caught missing containers error") + + def test_get_container_resources(self): + """Test _get_container_resources method.""" + replica_spec = { + 'template': { + 'spec': { + 'containers': [{ + 'resources': { + 'requests': {'cpu': '2', 'memory': '4Gi'}, + 'limits': {'cpu': '4', 'memory': '8Gi'} + } + }] + } + } + } + + requests, limits = HyperPodPytorchJob._get_container_resources(replica_spec) + + assert requests == {'cpu': '2', 'memory': '4Gi'} + assert limits == {'cpu': '4', 'memory': '8Gi'} + + logger.info("Successfully extracted container resources") + + def test_process_replica_resources_with_float_values(self): + """Test _process_replica_resources with float values.""" + data = { + 'name': 'pod', + 'template': { + 'spec': { + 'containers': [{ + 'name': 'container-name', + 'image': 'pytorch:latest', + 'resources': { + 'requests': { + 'nvidia.com/gpu': '1', + 'cpu': '3.6', + 'memory': '1.5' + }, + 'limits': { + 'nvidia.com/gpu': '1', + 'cpu': '4.8', + 'memory': '2.7' + } + } + }], + 'nodeSelector': { + 'node.kubernetes.io/instance-type': 'ml.g5.8xlarge' + } + } + } + } + + # Process the resources + processed_data = HyperPodPytorchJob._process_replica_resources(data) + + # Verify the data was processed + assert processed_data is not None + container = processed_data['template']['spec']['containers'][0] + assert 'resources' in container + + logger.info("Successfully processed replica resources with float values") diff --git a/test/unit_tests/cli/test_quota_allocation_util.py b/test/unit_tests/cli/test_quota_allocation_util.py index a1e7b6d4..e0fc4b36 100644 --- a/test/unit_tests/cli/test_quota_allocation_util.py +++ b/test/unit_tests/cli/test_quota_allocation_util.py @@ -12,10 +12,7 @@ # language governing permissions and limitations under the License. import pytest -import sys -import os -sys.path.append(os.path.join(os.path.dirname(__file__), 'hyperpod-pytorch-job-template')) -from hyperpod_pytorch_job_template.v1_1.quota_allocation_util import ( +from sagemaker.hyperpod.training.quota_allocation_util import ( _get_resources_from_instance, _get_limits, _is_valid, @@ -204,11 +201,6 @@ def test_is_valid_invalid_instance_type(self): assert not valid assert message == "Invalid instance-type ml-123. Please re-check the instance type and contact AWS for support." - def test_is_valid_neither_node_count_nor_resources(self): - valid, message = _is_valid(None, None, None, None, "ml.g5.xlarge") - assert not valid - assert message == "Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type ml.g5.xlarge" - def test_is_valid_both_node_count_and_resources(self): valid, message = _is_valid(4.0, None, None, 2, "ml.g5.xlarge") assert not valid @@ -234,11 +226,6 @@ def test_is_valid_single_resource(self): assert valid assert message == "" - def test_is_valid_limits_only(self): - valid, message = _is_valid(None, None, None, None, "ml.g5.xlarge") - assert not valid - assert message == "Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type ml.g5.xlarge" - # Test instance resources dictionary def test_instance_resources_structure(self): assert isinstance(INSTANCE_RESOURCES, dict)