Skip to content

Commit 57f0db7

Browse files
committed
[Test] Add integration tests to validate support for GB200.
1 parent fff826e commit 57f0db7

File tree

10 files changed

+702
-1
lines changed

10 files changed

+702
-1
lines changed

tests/integration-tests/clusters_factory.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import subprocess
1717
import time
1818

19+
from tests.common.utils import read_remote_file
20+
from remote_command_executor import RemoteCommandExecutor
21+
1922
import boto3
2023
import yaml
2124
from framework.credential_providers import run_pcluster_command
@@ -34,6 +37,13 @@
3437
retry_if_subprocess_error,
3538
)
3639

40+
TAG_CLUSTER_NAME = "parallelcluster:cluster-name"
41+
TAG_NODE_TYPE = "parallelcluster:node-type"
42+
TAG_QUEUE_NAME = "parallelcluster:queue-name"
43+
TAG_QCOMPUTE_RESOURCE_NAME = "parallelcluster:compute-resource-name"
44+
45+
LAUNCH_TEMPLATES_CONFIG_FILE = "/opt/parallelcluster/shared/launch-templates-config.json"
46+
3747

3848
def suppress_and_log_exception(func):
3949
@functools.wraps(func)
@@ -253,6 +263,37 @@ def describe_cluster_instances(self, node_type=None, queue_name=None):
253263
logging.error("Failed when getting cluster instances with error:\n%s\nand output:\n%s", e.stderr, e.stdout)
254264
raise
255265

266+
def get_compute_nodes(self, queue_name: str = None, compute_resource_name: str = None, status: str = "running"):
267+
"""Return the EC2 instance details for compute nodes matching the provided criteria."""
268+
ec2 = boto3.client("ec2", region_name=self.region)
269+
filters = [
270+
{"Name": f"tag:{TAG_CLUSTER_NAME}", "Values": [self.cfn_name]},
271+
{"Name": f"tag:{TAG_NODE_TYPE}", "Values": ["Compute"]},
272+
{"Name": "instance-state-name", "Values": [status]},
273+
]
274+
275+
if queue_name:
276+
filters.append({"Name": f"tag:{TAG_QUEUE_NAME}", "Values": [queue_name]})
277+
if compute_resource_name:
278+
filters.append({"Name": f"tag:{TAG_QCOMPUTE_RESOURCE_NAME}", "Values": [compute_resource_name]})
279+
280+
return ec2.describe_instances(Filters=filters).get("Reservations")[0].get("Instances")
281+
282+
def get_compute_nodes_private_ip(self, queue_name: str = None, compute_resource_name: str = None, status: str = "running"):
283+
"""Return the private IP address of compute nodes matching the provided criteria."""
284+
return [i.get("PrivateIpAddress") for i in self.get_compute_nodes(queue_name, compute_resource_name, status)]
285+
286+
def get_compute_nodes_launch_template_logical_id(self, queue_name: str, compute_resource_name: str):
287+
"""Return the launch template logical id of compute nodes matching the provided criteria."""
288+
launch_templates_config = json.loads(read_remote_file(RemoteCommandExecutor(self), LAUNCH_TEMPLATES_CONFIG_FILE))
289+
return (launch_templates_config
290+
.get("Queues", {})
291+
.get(queue_name, {})
292+
.get("ComputeResources", {})
293+
.get(compute_resource_name, {})
294+
.get("LaunchTemplate", {})
295+
.get("LogicalId"))
296+
256297
def get_cluster_instance_ids(self, node_type=None, queue_name=None):
257298
"""Run pcluster describe-cluster-instances and collect instance ids."""
258299
instances = self.describe_cluster_instances(node_type=node_type, queue_name=queue_name)

tests/integration-tests/configs/develop.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,13 @@ test-suites:
280280
instances: [{{ common.instance("instance_type_1") }}]
281281
oss: [{{ OS_X86_6 }}]
282282
schedulers: [ "slurm" ]
283+
gb200:
284+
test_gb200.py::test_gb200:
285+
dimensions:
286+
- regions: [ "us-east-1" ]
287+
instances: [ "g4dn.2xlarge" ]
288+
oss: [ "alinux2023" ]
289+
schedulers: [ "slurm" ]
283290
health_checks:
284291
test_gpu_health_checks.py::test_cluster_with_gpu_health_checks:
285292
dimensions:

tests/integration-tests/conftest.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,31 @@ def test_datadir(request, datadir):
578578
class_name = request.cls.__name__
579579
return datadir / "{0}/{1}".format(class_name, function_name)
580580

581+
@pytest.fixture()
582+
def file_reader(test_datadir, request, vpc_stack):
583+
"""
584+
Define a fixture to render file templates associated to the running test.
585+
586+
The template file for a given test is a generic file stored in the configs_datadir folder.
587+
The template can be written by using Jinja2 template engine.
588+
589+
:return: a _file_renderer(**kwargs) function which gets as input a dictionary of values to replace in the template
590+
"""
591+
592+
def _file_renderer(input_file: str = "script.sh", output_file: str = "script_rendered.sh", **kwargs):
593+
input_file_path = test_datadir / input_file
594+
if not os.path.isfile(input_file_path):
595+
raise FileNotFoundError(f"Input file not found in the expected dir {input_file_path}")
596+
output_file_path = test_datadir / output_file if output_file else input_file_path
597+
default_values = _get_default_template_values(vpc_stack, request)
598+
file_loader = FileSystemLoader(str(test_datadir))
599+
env = SandboxedEnvironment(loader=file_loader)
600+
rendered_template = env.get_template(input_file).render(**{**default_values, **kwargs})
601+
output_file_path.write_text(rendered_template)
602+
return output_file_path
603+
604+
return _file_renderer
605+
581606

582607
@pytest.fixture()
583608
def pcluster_config_reader(test_datadir, vpc_stack, request, region, instance, architecture):

tests/integration-tests/tests/common/assertions.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
1010
# limitations under the License.
1111
import logging
12+
import re
1213
import time
1314
from typing import List, Union
1415

@@ -19,6 +20,8 @@
1920
from remote_command_executor import RemoteCommandExecutor
2021
from retrying import RetryError, retry
2122
from time_utils import minutes, seconds
23+
from clusters_factory import Cluster
24+
2225
from utils import (
2326
get_cfn_resources,
2427
get_cluster_nodes_instance_ids,
@@ -28,7 +31,7 @@
2831
)
2932

3033
from tests.common.scaling_common import get_compute_nodes_allocation
31-
from tests.common.utils import get_ddb_item
34+
from tests.common.utils import get_ddb_item, read_remote_file
3235

3336

3437
@retry(wait_fixed=seconds(20), stop_max_delay=minutes(6))
@@ -422,3 +425,9 @@ def _assert_build_image_stack_deleted(stack_name, region, timeout_seconds=600, p
422425
time.sleep(poll_interval)
423426

424427
pytest.fail(f"Timed-out waiting for stack {stack_name} deletion (last status: {last_status})")
428+
429+
def assert_regex_in_file(cluster: Cluster, compute_node_ip: str, file_name: str, pattern: str):
430+
rce = RemoteCommandExecutor(cluster, compute_node_ip)
431+
file_content = read_remote_file(rce, file_name)
432+
assert_that(bool(re.search(pattern, file_content, re.IGNORECASE))).is_true()
433+

tests/integration-tests/tests/common/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,3 +536,11 @@ def write_file(dirname, filename, content):
536536
f.write(content)
537537
logging.info(f"File written: {filepath}")
538538
return filepath
539+
540+
def terminate_nodes_manually(instance_ids, region):
541+
ec2_client = boto3.client("ec2", region_name=region)
542+
for instance_id in instance_ids:
543+
instance_states = ec2_client.terminate_instances(InstanceIds=[instance_id]).get("TerminatingInstances")[0]
544+
assert_that(instance_states.get("InstanceId")).is_equal_to(instance_id)
545+
assert_that(instance_states.get("CurrentState").get("Name")).is_in("shutting-down", "terminated")
546+
logging.info("Terminated nodes: {}".format(instance_ids))
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# A copy of the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "LICENSE.txt" file accompanying this file.
10+
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
11+
# See the License for the specific language governing permissions and limitations under the License.
12+
import json
13+
import logging
14+
import re
15+
16+
import boto3
17+
import pytest
18+
from assertpy import assert_that
19+
from remote_command_executor import RemoteCommandExecutor
20+
from clusters_factory import Cluster
21+
22+
from tests.common.schedulers_common import SlurmCommands
23+
from tests.common.utils import read_remote_file, terminate_nodes_manually
24+
from tests.common.assertions import assert_regex_in_file
25+
26+
def submit_job_imex_status(rce: RemoteCommandExecutor, launch_template_id: str, queue_name: str, max_nodes: int = 1):
27+
logging.info("Submitting job to check IMEX status")
28+
slurm_commands = SlurmCommands(rce)
29+
job_id = slurm_commands.submit_command_and_assert_job_accepted(
30+
submit_command_args={
31+
"command": f"/usr/bin/nvidia-imex-ctl -a -q -c /opt/parallelcluster/shared/nvidia-imex/config_{launch_template_id}.cfg",
32+
"partition": queue_name,
33+
"nodes": max_nodes,
34+
}
35+
)
36+
slurm_commands.wait_job_completed(job_id)
37+
slurm_commands.assert_job_succeeded(job_id)
38+
return job_id
39+
40+
def assert_imex_node_config(rce: RemoteCommandExecutor, launch_template_id: str, expected_ips: list):
41+
logging.info(f"Checking IMEX nodes config contains the expected nodes: {expected_ips}")
42+
imex_nodes_config_file = f"/opt/parallelcluster/shared/nvidia-imex/nodes_config_{launch_template_id}.cfg"
43+
imex_config_content = read_remote_file(rce, imex_nodes_config_file)
44+
actual_ips = [ip.strip() for ip in imex_config_content.strip().split('\n')]
45+
assert_that(actual_ips).contains_only(expected_ips)
46+
47+
48+
def assert_imex_healthy(cluster: Cluster, queue_name: str, compute_resource_name: str, max_nodes: int = 1):
49+
rce = RemoteCommandExecutor(cluster)
50+
51+
launch_template_id = cluster.get_compute_nodes_launch_template_logical_id(queue_name, compute_resource_name)
52+
logging.info(f"Launch template for compute nodes in queue {queue_name} and compute resource {compute_resource_name}: {launch_template_id}")
53+
54+
ips = cluster.get_compute_nodes_private_ip(queue_name, compute_resource_name)
55+
logging.info(f"Private IP addresses for compute nodes in queue {queue_name} and compute resource {compute_resource_name}: {ips}")
56+
57+
job_id = submit_job_imex_status(rce, launch_template_id, queue_name, max_nodes)
58+
59+
assert_imex_node_config(rce, launch_template_id, ips)
60+
61+
job_stdout = rce.run_remote_command(f"cat slurm-{job_id}.out").stdout
62+
logging.info(f"IMEX status output: {job_stdout}")
63+
imex_status = json.loads(job_stdout)
64+
65+
logging.info(f"Checking that IMEX only sees the expected nodes: {ips}")
66+
assert_that(
67+
all(node['hostname'] in ips for node in imex_status['nodeStatus'])
68+
).is_true()
69+
70+
logging.info(f"Checking that IMEX sees every node ready: {ips}")
71+
assert_that(
72+
all(
73+
any(
74+
node['hostname'] == ip and node['message'] == 'READY' for node in imex_status['nodeStatus']) for ip in ips
75+
)
76+
).is_true()
77+
78+
for compute_node_ip in cluster.get_compute_nodes_private_ip(queue_name, compute_resource_name):
79+
for file_name in ["/var/log/nvidia-imex-verbose.log", "/var/log/parallelcluster/nvidia-imex-prolog.log"]:
80+
logging.info(f"Checking file {file_name} log does not contain any error")
81+
assert_regex_in_file(cluster, compute_node_ip, file_name, r'^(?!.*(?:err|warn|fail)).*$')
82+
83+
def assert_imex_not_configured(cluster: Cluster, queue_name: str, compute_resource_name: str, max_nodes: int = 1):
84+
rce = RemoteCommandExecutor(cluster)
85+
86+
launch_template_id = cluster.get_compute_nodes_launch_template_logical_id(queue_name, compute_resource_name)
87+
logging.info(f"Launch template for compute nodes in queue {queue_name} and compute resource {compute_resource_name}: {launch_template_id}")
88+
89+
submit_job_imex_status(rce, launch_template_id, queue_name, max_nodes)
90+
91+
assert_imex_node_config(rce, launch_template_id, ["0.0.0.0", "0.0.0.0"])
92+
93+
94+
@pytest.mark.usefixtures("region", "os", "instance", "scheduler")
95+
def test_gb200(pcluster_config_reader, file_reader, clusters_factory, test_datadir, s3_bucket_factory, region):
96+
"""
97+
Test automated configuration of Nvidia IMEX.
98+
99+
This test creates a cluster with the necessary custom actions to configure NVIDIA IMEX and verifies the following:
100+
1. On the compute resource supporting IMEX (q1-cr1), the IMEX nodes file is configured by the prolog,
101+
IMEX service is healthy and no errors are reported in IMEX's or prolog's logs.
102+
Also, IMEX gets reconfigured when nodes belonging to the same compute resource get replaced
103+
2. On the compute resource not supporting IMEX (q1-cr2), the IMEX nodes file is not configured by the prolog,
104+
keeping the default values and IMEX is not started.
105+
106+
The test prints in test log the full IMEX status to facilitate troubleshooting.
107+
The test uses instance type g4dn to simulate a p6e-gb200 instance.
108+
This is a reasonable approximation for the test because the focus of the test is on IMEX configuration,
109+
which can be executed on g4dn as well.
110+
"""
111+
max_queue_size = 2
112+
113+
# Create an S3 bucket for custom action scripts
114+
bucket_name = s3_bucket_factory()
115+
bucket = boto3.resource("s3", region_name=region).Bucket(bucket_name)
116+
117+
# Upload files to test bucket
118+
headnode_start_filename="head_node_start.sh"
119+
prolog_filename="nvidia-imex.prolog.sh"
120+
bucket.upload_file(str(test_datadir / prolog_filename), prolog_filename)
121+
head_node_start_script_rendered = file_reader(
122+
input_file=headnode_start_filename,
123+
output_file=f"{headnode_start_filename}.rendered",
124+
bucket_name=bucket_name,
125+
prolog_key=prolog_filename
126+
)
127+
bucket.upload_file(head_node_start_script_rendered, headnode_start_filename)
128+
129+
#TODO: Remove after testing: BEGIN: added compute custom action to force the configuraiton of IMEX
130+
compute_configured_filename="compute_node_configured.sh"
131+
bucket.upload_file(str(test_datadir / compute_configured_filename), compute_configured_filename)
132+
#TODO: Remove after testing: END
133+
134+
queue_name = "q1"
135+
compute_resource_with_imex = "cr1"
136+
compute_resource_without_imex = "cr2"
137+
138+
cluster_config = pcluster_config_reader(
139+
bucket_name=bucket_name,
140+
head_node_start_script=headnode_start_filename,
141+
compute_node_configured_script=compute_configured_filename,
142+
max_queue_size=max_queue_size,
143+
queue_name=queue_name,
144+
compute_resource_with_imex=compute_resource_with_imex,
145+
compute_resource_without_imex=compute_resource_without_imex,
146+
)
147+
cluster = clusters_factory(cluster_config)
148+
149+
assert_imex_healthy(cluster, queue_name, compute_resource_with_imex, max_queue_size)
150+
151+
# IMEX is not configured on compute resource thta do not support it
152+
assert_imex_not_configured(cluster, queue_name, compute_resource_without_imex)
153+
154+
# Forcefully terminate a compute node in the compute resource supporting IMEX
155+
# to simulate an outage that forces the replacement of the node and consequently the IMEX reconfiguration.
156+
terminate_nodes_manually([cluster.get_compute_nodes(queue_name, compute_resource_with_imex)[0].get("InstanceId")], region)
157+
assert_imex_healthy(cluster, queue_name, compute_resource_with_imex, max_queue_size)

0 commit comments

Comments
 (0)