Skip to content

Commit 92bd90e

Browse files
ddeiddademartinofra
authored andcommitted
Support development instance_types_data configuration
This commit integrates the support for the new `instance_types_data` configuration parameter, allowing custom instance types data to be used from ParallelCluster daemons. Signed-off-by: ddeidda <[email protected]>
1 parent b12bb84 commit 92bd90e

File tree

5 files changed

+153
-33
lines changed

5 files changed

+153
-33
lines changed

src/common/utils.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# See the License for the specific language governing permissions and limitations under the License.
1212
import collections
1313
import itertools
14+
import json
1415
import logging
1516
import os
1617
import pwd
@@ -274,20 +275,29 @@ def _fetch_instance_info(region, proxy_config, instance_type):
274275
raise CriticalError(emsg)
275276

276277

277-
def _get_instance_info(region, proxy_config, instance_type):
278+
def _get_instance_info(region, proxy_config, instance_type, additional_instance_types_data=None):
278279
"""
279280
Call the DescribeInstanceTypes to get number of vcpus and gpus for the given instance type.
280281
281282
:return: (the number of vcpus or -1 if the instance type cannot be found,
282283
number of gpus or None if the instance does not have gpu)
283284
"""
284-
log.debug("Fetching info for instance_type {0}".format(instance_type))
285-
instance_info = _fetch_instance_info(region, proxy_config, instance_type)
286-
log.debug("Received the following information for instance type {0}: {1}".format(instance_type, instance_info))
285+
instance_info = None
286+
287+
# First attempt to describe the instance is from configuration data, if present
288+
if additional_instance_types_data:
289+
instance_info = additional_instance_types_data.get(instance_type, None)
290+
291+
# If no data is provided from configuration we retrieve it from ec2
292+
if not instance_info:
293+
log.debug("Fetching info for instance_type {0}".format(instance_type))
294+
instance_info = _fetch_instance_info(region, proxy_config, instance_type)
295+
log.debug("Received the following information for instance type {0}: {1}".format(instance_type, instance_info))
296+
287297
return _get_vcpus_from_instance_info(instance_info), _get_gpus_from_instance_info(instance_info)
288298

289299

290-
def get_instance_properties(region, proxy_config, instance_type):
300+
def get_instance_properties(region, proxy_config, instance_type, additional_instance_types_data=None):
291301
"""
292302
Get instance properties for the given instance type, according to the cfn_scheduler_slots configuration parameter.
293303
@@ -299,7 +309,7 @@ def get_instance_properties(region, proxy_config, instance_type):
299309

300310
if instance_type not in get_instance_properties.cache:
301311
# get vcpus and gpus from the pricing file, gpus = 0 if instance does not have GPU
302-
vcpus, gpus = _get_instance_info(region, proxy_config, instance_type)
312+
vcpus, gpus = _get_instance_info(region, proxy_config, instance_type, additional_instance_types_data)
303313

304314
try:
305315
cfnconfig_params = _read_cfnconfig()
@@ -421,3 +431,30 @@ def grouper(iterable, n):
421431
if not chunk:
422432
return
423433
yield chunk
434+
435+
436+
def load_additional_instance_types_data(config, section):
437+
"""Load instance types data from configuration, if set; an empty dict is returned otherwise."""
438+
instance_types_data = {}
439+
if config.has_option(section, "instance_types_data"):
440+
instance_types_data_str = config.get(section, "instance_types_data")
441+
if instance_types_data_str:
442+
try:
443+
instance_types_data_str = str(instance_types_data_str).strip()
444+
445+
# Load json value if not empty
446+
if instance_types_data_str:
447+
instance_types_data = json.loads(instance_types_data_str)
448+
449+
# Fallback to empty dict if value is None
450+
if not instance_types_data:
451+
instance_types_data = {}
452+
453+
log.info(
454+
"Additional instance types data loaded for instance types '{0}': {1}".format(
455+
instance_types_data.keys(), instance_types_data
456+
)
457+
)
458+
except Exception as e:
459+
raise CriticalError("Error loading instance types data from configuration: {0}".format(e))
460+
return instance_types_data

src/jobwatcher/jobwatcher.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
get_asg_settings,
2626
get_compute_instance_type,
2727
get_instance_properties,
28+
load_additional_instance_types_data,
2829
load_module,
2930
sleep_remaining_loop_time,
3031
)
@@ -36,7 +37,7 @@
3637

3738

3839
JobwatcherConfig = collections.namedtuple(
39-
"JobwatcherConfig", ["region", "scheduler", "stack_name", "pcluster_dir", "proxy_config"]
40+
"JobwatcherConfig", ["region", "scheduler", "stack_name", "pcluster_dir", "proxy_config", "instance_types_data"]
4041
)
4142

4243

@@ -59,21 +60,23 @@ def _get_config():
5960
scheduler = config.get("jobwatcher", "scheduler")
6061
stack_name = config.get("jobwatcher", "stack_name")
6162
pcluster_dir = config.get("jobwatcher", "cfncluster_dir")
63+
instance_types_data = load_additional_instance_types_data(config, "jobwatcher")
6264

6365
_proxy = config.get("jobwatcher", "proxy")
6466
proxy_config = Config()
6567
if _proxy != "NONE":
6668
proxy_config = Config(proxies={"https": _proxy})
6769

6870
log.info(
69-
"Configured parameters: region=%s scheduler=%s stack_name=%s pcluster_dir=%s proxy=%s",
71+
"Configured parameters: region=%s scheduler=%s stack_name=%s pcluster_dir=%s proxy=%s instance_types_data=%s",
7072
region,
7173
scheduler,
7274
stack_name,
7375
pcluster_dir,
7476
_proxy,
77+
instance_types_data,
7578
)
76-
return JobwatcherConfig(region, scheduler, stack_name, pcluster_dir, proxy_config)
79+
return JobwatcherConfig(region, scheduler, stack_name, pcluster_dir, proxy_config, instance_types_data)
7780

7881

7982
def _poll_scheduler_status(config, asg_name, scheduler_module):
@@ -99,7 +102,9 @@ def _poll_scheduler_status(config, asg_name, scheduler_module):
99102
)
100103
if new_instance_type != instance_type:
101104
instance_type = new_instance_type
102-
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
105+
instance_properties = get_instance_properties(
106+
config.region, config.proxy_config, instance_type, config.instance_types_data
107+
)
103108
update_instance_properties_timer += LOOP_TIME
104109

105110
# get current limits

src/nodewatcher/nodewatcher.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
get_asg_name,
3232
get_asg_settings,
3333
get_instance_properties,
34+
load_additional_instance_types_data,
3435
load_module,
3536
retrieve_max_cluster_size,
3637
sleep_remaining_loop_time,
@@ -48,7 +49,8 @@
4849
CLUSTER_PROPERTIES_REFRESH_INTERVAL = 300
4950

5051
NodewatcherConfig = collections.namedtuple(
51-
"NodewatcherConfig", ["region", "scheduler", "stack_name", "scaledown_idletime", "proxy_config"]
52+
"NodewatcherConfig",
53+
["region", "scheduler", "stack_name", "scaledown_idletime", "proxy_config", "instance_types_data"],
5254
)
5355

5456

@@ -71,21 +73,24 @@ def _get_config():
7173
scheduler = config.get("nodewatcher", "scheduler")
7274
stack_name = config.get("nodewatcher", "stack_name")
7375
scaledown_idletime = int(config.get("nodewatcher", "scaledown_idletime"))
76+
instance_types_data = load_additional_instance_types_data(config, "nodewatcher")
7477

7578
_proxy = config.get("nodewatcher", "proxy")
7679
proxy_config = Config()
7780
if _proxy != "NONE":
7881
proxy_config = Config(proxies={"https": _proxy})
7982

8083
log.info(
81-
"Configured parameters: region=%s scheduler=%s stack_name=%s scaledown_idletime=%s proxy=%s",
84+
"Configured parameters: region=%s scheduler=%s stack_name=%s scaledown_idletime=%s proxy=%s "
85+
"instance_types_data=%s",
8286
region,
8387
scheduler,
8488
stack_name,
8589
scaledown_idletime,
8690
_proxy,
91+
instance_types_data,
8792
)
88-
return NodewatcherConfig(region, scheduler, stack_name, scaledown_idletime, proxy_config)
93+
return NodewatcherConfig(region, scheduler, stack_name, scaledown_idletime, proxy_config, instance_types_data)
8994

9095

9196
def _get_metadata(metadata_path):
@@ -328,7 +333,9 @@ def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance
328333
_terminate_if_down(scheduler_module, config, asg_name, instance_id, INITIAL_TERMINATE_TIMEOUT)
329334

330335
idletime = _init_idletime()
331-
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
336+
instance_properties = get_instance_properties(
337+
config.region, config.proxy_config, instance_type, config.instance_types_data
338+
)
332339
start_time = None
333340
while True:
334341
sleep_remaining_loop_time(LOOP_TIME, start_time)

src/sqswatcher/sqswatcher.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
get_asg_name,
3232
get_compute_instance_type,
3333
get_instance_properties,
34+
load_additional_instance_types_data,
3435
load_module,
3536
retrieve_max_cluster_size,
3637
sleep_remaining_loop_time,
@@ -59,6 +60,7 @@ class QueryConfigError(Exception):
5960
"proxy_config",
6061
"stack_name",
6162
"max_processed_messages",
63+
"instance_types_data",
6264
],
6365
)
6466

@@ -84,6 +86,7 @@ def _get_config():
8486
table_name = config.get("sqswatcher", "table_name")
8587
cluster_user = config.get("sqswatcher", "cluster_user")
8688
stack_name = config.get("sqswatcher", "stack_name")
89+
instance_types_data = load_additional_instance_types_data(config, "sqswatcher")
8790
max_processed_messages = int(
8891
config.get("sqswatcher", "max_processed_messages", fallback=DEFAULT_MAX_PROCESSED_MESSAGES)
8992
)
@@ -95,7 +98,7 @@ def _get_config():
9598

9699
log.info(
97100
"Configured parameters: region=%s scheduler=%s sqsqueue=%s table_name=%s cluster_user=%s "
98-
"proxy=%s stack_name=%s max_processed_messages=%s",
101+
"proxy=%s stack_name=%s max_processed_messages=%s instance_types_data=%s",
99102
region,
100103
scheduler,
101104
sqsqueue,
@@ -104,9 +107,18 @@ def _get_config():
104107
_proxy,
105108
stack_name,
106109
max_processed_messages,
110+
instance_types_data,
107111
)
108112
return SQSWatcherConfig(
109-
region, scheduler, sqsqueue, table_name, cluster_user, proxy_config, stack_name, max_processed_messages
113+
region,
114+
scheduler,
115+
sqsqueue,
116+
table_name,
117+
cluster_user,
118+
proxy_config,
119+
stack_name,
120+
max_processed_messages,
121+
instance_types_data,
110122
)
111123

112124

@@ -226,7 +238,7 @@ def _retrieve_all_sqs_messages(queue, max_processed_messages):
226238
return messages
227239

228240

229-
def _parse_sqs_messages(sqs_config_region, sqs_config_proxy, messages, table, queue):
241+
def _parse_sqs_messages(sqs_config_region, sqs_config_proxy, messages, table, queue, additional_instance_types_data):
230242
add_events = []
231243
remove_events = []
232244
for message in messages:
@@ -240,7 +252,9 @@ def _parse_sqs_messages(sqs_config_region, sqs_config_proxy, messages, table, qu
240252
continue
241253

242254
if event_type == "parallelcluster:COMPUTE_READY":
243-
add_event = _process_compute_ready_event(sqs_config_region, sqs_config_proxy, message_attrs, message, table)
255+
add_event = _process_compute_ready_event(
256+
sqs_config_region, sqs_config_proxy, message_attrs, message, table, additional_instance_types_data
257+
)
244258
if add_event:
245259
add_events.append(add_event)
246260
elif event_type == "autoscaling:EC2_INSTANCE_TERMINATE":
@@ -266,12 +280,16 @@ def _parse_sqs_messages(sqs_config_region, sqs_config_proxy, messages, table, qu
266280
return update_events.values()
267281

268282

269-
def _process_compute_ready_event(sqs_config_region, sqs_config_proxy, message_attrs, message, table):
283+
def _process_compute_ready_event(
284+
sqs_config_region, sqs_config_proxy, message_attrs, message, table, additional_instance_types_data
285+
):
270286
instance_id = message_attrs.get("EC2InstanceId")
271287
instance_type = message_attrs.get("EC2InstanceType")
272288
# Get instances properties for each event because instance types
273289
# from instance and CloudFormation could be out-of-sync
274-
instance_properties = get_instance_properties(sqs_config_region, sqs_config_proxy, instance_type)
290+
instance_properties = get_instance_properties(
291+
sqs_config_region, sqs_config_proxy, instance_type, additional_instance_types_data
292+
)
275293
gpus = instance_properties["gpus"]
276294
slots = message_attrs.get("Slots")
277295
hostname = message_attrs.get("LocalHostname").split(".")[0]
@@ -392,12 +410,16 @@ def _poll_queue(sqs_config, queue, table, asg_name):
392410
force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type
393411
if new_instance_type != instance_type:
394412
instance_type = new_instance_type
395-
instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type)
413+
instance_properties = get_instance_properties(
414+
sqs_config.region, sqs_config.proxy_config, instance_type, sqs_config.instance_types_data
415+
)
396416
max_cluster_size = new_max_cluster_size
397417
cluster_properties_refresh_timer += LOOP_TIME
398418

399419
messages = _retrieve_all_sqs_messages(queue, sqs_config.max_processed_messages)
400-
update_events = _parse_sqs_messages(sqs_config.region, sqs_config.proxy_config, messages, table, queue)
420+
update_events = _parse_sqs_messages(
421+
sqs_config.region, sqs_config.proxy_config, messages, table, queue, sqs_config.instance_types_data
422+
)
401423
_process_sqs_messages(
402424
update_events,
403425
scheduler_module,

0 commit comments

Comments
 (0)