Skip to content

Commit 1b4ba77

Browse files
Add a backing_instance_max_count config option for clustermgtd to be robust to eventual EC2 consistency (#613)
Adding a config option to clustermgtd, ec2_backing_instance_max_count, to allow more time for describe-instances to reach eventual consistency with run-instances data Passes the max count and map to is_healthy() and is_bootstrap_failure() for static and dynamic nodes to evaluate the count for individual instances.
1 parent 853f48d commit 1b4ba77

File tree

7 files changed

+275
-37
lines changed

7 files changed

+275
-37
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ This file is used to list changes made in each version of the aws-parallelcluste
77
------
88

99
**ENHANCEMENTS**
10+
- Add a clustermgtd config option `ec2_instance_missing_max_count` to allow a configurable amount of retries for eventual EC2
11+
describe instances consistency with run instances
1012

1113
**CHANGES**
1214

src/slurm_plugin/cluster_event_publisher.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,12 +317,18 @@ def detail_supplier(node_names):
317317
# }
318318
# }
319319
@log_exception(logger, "publish_unhealthy_node_events", catch_exception=Exception, raise_on_error=False)
320-
def publish_unhealthy_node_events(self, unhealthy_nodes: List[SlurmNode]):
320+
def publish_unhealthy_node_events(
321+
self, unhealthy_nodes: List[SlurmNode], ec2_instance_missing_max_count, nodes_without_backing_instance_count_map
322+
):
321323
"""Publish events for unhealthy nodes without a backing instance and for nodes that are not responding."""
322324
timestamp = ClusterEventPublisher.timestamp()
323325

324326
nodes_with_invalid_backing_instance = [
325-
node for node in unhealthy_nodes if not node.is_backing_instance_valid(log_warn_if_unhealthy=False)
327+
node
328+
for node in unhealthy_nodes
329+
if not node.is_backing_instance_valid(
330+
ec2_instance_missing_max_count, nodes_without_backing_instance_count_map, log_warn_if_unhealthy=False
331+
)
326332
]
327333
self.publish_event(
328334
logging.WARNING if nodes_with_invalid_backing_instance else logging.DEBUG,

src/slurm_plugin/clustermgtd.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ class ClustermgtdConfig:
150150
"terminate_drain_nodes": True,
151151
"terminate_down_nodes": True,
152152
"orphaned_instance_timeout": 300,
153+
"ec2_instance_missing_max_count": 2,
153154
# Health check configs
154155
"disable_ec2_health_check": False,
155156
"disable_scheduled_event_health_check": False,
@@ -304,6 +305,11 @@ def _get_terminate_config(self, config):
304305
self.insufficient_capacity_timeout = config.getfloat(
305306
"clustermgtd", "insufficient_capacity_timeout", fallback=self.DEFAULTS.get("insufficient_capacity_timeout")
306307
)
308+
self.ec2_instance_missing_max_count = config.getint(
309+
"clustermgtd",
310+
"ec2_instance_missing_max_count",
311+
fallback=self.DEFAULTS.get("ec2_instance_missing_max_count"),
312+
)
307313
self.disable_nodes_on_insufficient_capacity = self.insufficient_capacity_timeout > 0
308314

309315
def _get_dns_config(self, config):
@@ -384,6 +390,7 @@ def __init__(self, config):
384390
self._insufficient_capacity_compute_resources = {}
385391
self._static_nodes_in_replacement = set()
386392
self._partitions_protected_failure_count_map = {}
393+
self._nodes_without_backing_instance_count_map = {}
387394
self._compute_fleet_status = ComputeFleetStatus.RUNNING
388395
self._current_time = None
389396
self._config = None
@@ -492,7 +499,10 @@ def _handle_successfully_launched_nodes(self, partitions_name_map):
492499
partitions_protected_failure_count_map = self._partitions_protected_failure_count_map.copy()
493500
for partition, failures_per_compute_resource in partitions_protected_failure_count_map.items():
494501
partition_online_compute_resources = partitions_name_map[partition].get_online_node_by_type(
495-
self._config.terminate_drain_nodes, self._config.terminate_down_nodes
502+
self._config.terminate_drain_nodes,
503+
self._config.terminate_down_nodes,
504+
self._config.ec2_instance_missing_max_count,
505+
self._nodes_without_backing_instance_count_map,
496506
)
497507
for compute_resource in failures_per_compute_resource.keys():
498508
if compute_resource in partition_online_compute_resources:
@@ -762,6 +772,8 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes):
762772
if not node.is_healthy(
763773
consider_drain_as_unhealthy=self._config.terminate_drain_nodes,
764774
consider_down_as_unhealthy=self._config.terminate_down_nodes,
775+
ec2_instance_missing_max_count=self._config.ec2_instance_missing_max_count,
776+
nodes_without_backing_instance_count_map=self._nodes_without_backing_instance_count_map,
765777
log_warn_if_unhealthy=node.name not in reserved_nodenames,
766778
):
767779
if not self._config.disable_capacity_blocks_management and node.name in reserved_nodenames:
@@ -778,7 +790,11 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes):
778790
).append(node)
779791
else:
780792
unhealthy_dynamic_nodes.append(node)
781-
self._event_publisher.publish_unhealthy_node_events(all_unhealthy_nodes)
793+
self._event_publisher.publish_unhealthy_node_events(
794+
all_unhealthy_nodes,
795+
self._config.ec2_instance_missing_max_count,
796+
self._nodes_without_backing_instance_count_map,
797+
)
782798
return (
783799
unhealthy_dynamic_nodes,
784800
unhealthy_static_nodes,
@@ -1167,11 +1183,12 @@ def _is_node_replacement_timeout(self, node):
11671183
"""Check if a static node is in replacement but replacement time is expired."""
11681184
return self._is_node_in_replacement_valid(node, check_node_is_valid=False)
11691185

1170-
@staticmethod
1171-
def _find_bootstrap_failure_nodes(slurm_nodes):
1186+
def _find_bootstrap_failure_nodes(self, slurm_nodes):
11721187
bootstrap_failure_nodes = []
11731188
for node in slurm_nodes:
1174-
if node.is_bootstrap_failure():
1189+
if node.is_bootstrap_failure(
1190+
self._config.ec2_instance_missing_max_count, self._nodes_without_backing_instance_count_map
1191+
):
11751192
bootstrap_failure_nodes.append(node)
11761193
return bootstrap_failure_nodes
11771194

src/slurm_plugin/slurm_resources.py

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,24 @@ def is_inactive(self):
6363
def has_running_job(self):
6464
return any(node.is_running_job() for node in self.slurm_nodes)
6565

66-
def get_online_node_by_type(self, terminate_drain_nodes, terminate_down_nodes):
66+
def get_online_node_by_type(
67+
self,
68+
terminate_drain_nodes,
69+
terminate_down_nodes,
70+
ec2_instance_missing_max_count,
71+
nodes_without_backing_instance_count_map,
72+
):
6773
online_compute_resources = set()
6874
if not self.state == "INACTIVE":
6975
for node in self.slurm_nodes:
7076
if (
71-
node.is_healthy(terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=False)
77+
node.is_healthy(
78+
terminate_drain_nodes,
79+
terminate_down_nodes,
80+
ec2_instance_missing_max_count,
81+
nodes_without_backing_instance_count_map,
82+
log_warn_if_unhealthy=False,
83+
)
7284
and node.is_online()
7385
):
7486
logger.debug("Currently online node: %s, node state: %s", node.name, node.state_string)
@@ -233,6 +245,7 @@ def __init__(
233245
self.is_failing_health_check = False
234246
self.error_code = self._parse_error_code()
235247
self.queue_name, self._node_type, self.compute_resource_name = parse_nodename(name)
248+
self.ec2_backing_instance_valid = None
236249

237250
def is_nodeaddr_set(self):
238251
"""Check if nodeaddr(private ip) for the node is set."""
@@ -378,7 +391,7 @@ def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealt
378391
pass
379392

380393
@abstractmethod
381-
def is_bootstrap_failure(self):
394+
def is_bootstrap_failure(self, ec2_instance_missing_max_count, nodes_without_backing_instance_count_map: dict):
382395
"""
383396
Check if a slurm node has boostrap failure.
384397
@@ -394,7 +407,14 @@ def is_bootstrap_timeout(self):
394407
pass
395408

396409
@abstractmethod
397-
def is_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
410+
def is_healthy(
411+
self,
412+
consider_drain_as_unhealthy,
413+
consider_down_as_unhealthy,
414+
ec2_instance_missing_max_count,
415+
nodes_without_backing_instance_count_map: dict,
416+
log_warn_if_unhealthy=True,
417+
):
398418
"""Check if a slurm node is considered healthy."""
399419
pass
400420

@@ -404,8 +424,18 @@ def is_powering_down_with_nodeaddr(self):
404424
# for example because of a short SuspendTimeout
405425
return self.is_nodeaddr_set() and (self.is_power() or self.is_powering_down())
406426

407-
def is_backing_instance_valid(self, log_warn_if_unhealthy=True):
427+
def is_backing_instance_valid(
428+
self,
429+
ec2_instance_missing_max_count,
430+
nodes_without_backing_instance_count_map: dict,
431+
log_warn_if_unhealthy=True,
432+
):
408433
"""Check if a slurm node's addr is set, it points to a valid instance in EC2."""
434+
# Perform this logic only once and return the result thereafter
435+
if self.ec2_backing_instance_valid is not None:
436+
return self.ec2_backing_instance_valid
437+
# Set ec2_backing_instance_valid to True since it will be the result most often
438+
self.ec2_backing_instance_valid = True
409439
if self.is_nodeaddr_set():
410440
if not self.instance:
411441
if log_warn_if_unhealthy:
@@ -414,8 +444,30 @@ def is_backing_instance_valid(self, log_warn_if_unhealthy=True):
414444
self,
415445
self.state_string,
416446
)
417-
return False
418-
return True
447+
# Allow a few iterations for the eventual consistency of EC2 data
448+
logger.debug(f"Map of slurm nodes without backing instances {nodes_without_backing_instance_count_map}")
449+
missing_instance_loop_count = nodes_without_backing_instance_count_map.get(self.name, 0)
450+
# If the loop count has been reached, the instance is unhealthy and will be terminated
451+
if missing_instance_loop_count >= ec2_instance_missing_max_count:
452+
if log_warn_if_unhealthy:
453+
logger.warning(f"EC2 instance availability for node {self.name} has timed out.")
454+
# Remove the slurm node from the map since a new instance will be launched
455+
nodes_without_backing_instance_count_map.pop(self.name, None)
456+
self.ec2_backing_instance_valid = False
457+
else:
458+
nodes_without_backing_instance_count_map[self.name] = missing_instance_loop_count + 1
459+
if log_warn_if_unhealthy:
460+
logger.warning(
461+
f"Incrementing missing EC2 instance count for node {self.name} to "
462+
f"{nodes_without_backing_instance_count_map[self.name]}."
463+
)
464+
else:
465+
# Remove the slurm node from the map since the instance is healthy
466+
nodes_without_backing_instance_count_map.pop(self.name, None)
467+
else:
468+
# Remove the slurm node from the map since the instance is healthy
469+
nodes_without_backing_instance_count_map.pop(self.name, None)
470+
return self.ec2_backing_instance_valid
419471

420472
@abstractmethod
421473
def needs_reset_when_inactive(self):
@@ -478,11 +530,22 @@ def __init__(
478530
reservation_name=reservation_name,
479531
)
480532

481-
def is_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
533+
def is_healthy(
534+
self,
535+
consider_drain_as_unhealthy,
536+
consider_down_as_unhealthy,
537+
ec2_instance_missing_max_count,
538+
nodes_without_backing_instance_count_map: dict,
539+
log_warn_if_unhealthy=True,
540+
):
482541
"""Check if a slurm node is considered healthy."""
483542
return (
484543
self._is_static_node_ip_configuration_valid(log_warn_if_unhealthy=log_warn_if_unhealthy)
485-
and self.is_backing_instance_valid(log_warn_if_unhealthy=log_warn_if_unhealthy)
544+
and self.is_backing_instance_valid(
545+
ec2_instance_missing_max_count=ec2_instance_missing_max_count,
546+
nodes_without_backing_instance_count_map=nodes_without_backing_instance_count_map,
547+
log_warn_if_unhealthy=log_warn_if_unhealthy,
548+
)
486549
and self.is_state_healthy(
487550
consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=log_warn_if_unhealthy
488551
)
@@ -533,9 +596,13 @@ def _is_static_node_ip_configuration_valid(self, log_warn_if_unhealthy=True):
533596
return False
534597
return True
535598

536-
def is_bootstrap_failure(self):
599+
def is_bootstrap_failure(self, ec2_instance_missing_max_count, nodes_without_backing_instance_count_map: dict):
537600
"""Check if a slurm node has boostrap failure."""
538-
if self.is_static_nodes_in_replacement and not self.is_backing_instance_valid(log_warn_if_unhealthy=False):
601+
if self.is_static_nodes_in_replacement and not self.is_backing_instance_valid(
602+
ec2_instance_missing_max_count=ec2_instance_missing_max_count,
603+
nodes_without_backing_instance_count_map=nodes_without_backing_instance_count_map,
604+
log_warn_if_unhealthy=False,
605+
):
539606
# Node is currently in replacement and no backing instance
540607
logger.warning(
541608
"Node bootstrap error: Node %s is currently in replacement and no backing instance, node state: %s",
@@ -618,17 +685,30 @@ def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealt
618685
return False
619686
return True
620687

621-
def is_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
688+
def is_healthy(
689+
self,
690+
consider_drain_as_unhealthy,
691+
consider_down_as_unhealthy,
692+
ec2_instance_missing_max_count,
693+
nodes_without_backing_instance_count_map,
694+
log_warn_if_unhealthy=True,
695+
):
622696
"""Check if a slurm node is considered healthy."""
623-
return self.is_backing_instance_valid(log_warn_if_unhealthy=log_warn_if_unhealthy) and self.is_state_healthy(
697+
return self.is_backing_instance_valid(
698+
ec2_instance_missing_max_count=ec2_instance_missing_max_count,
699+
nodes_without_backing_instance_count_map=nodes_without_backing_instance_count_map,
700+
log_warn_if_unhealthy=log_warn_if_unhealthy,
701+
) and self.is_state_healthy(
624702
consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=log_warn_if_unhealthy
625703
)
626704

627-
def is_bootstrap_failure(self):
705+
def is_bootstrap_failure(self, ec2_instance_missing_max_count, nodes_without_backing_instance_count_map: dict):
628706
"""Check if a slurm node has boostrap failure."""
629707
# no backing instance + [working state]# in node state
630708
if (self.is_configuring_job() or self.is_powering_up_idle()) and not self.is_backing_instance_valid(
631-
log_warn_if_unhealthy=False
709+
ec2_instance_missing_max_count=ec2_instance_missing_max_count,
710+
nodes_without_backing_instance_count_map=nodes_without_backing_instance_count_map,
711+
log_warn_if_unhealthy=False,
632712
):
633713
logger.warning(
634714
"Node bootstrap error: Node %s is in power up state without valid backing instance, node state: %s",

0 commit comments

Comments
 (0)