Skip to content

Commit 99034f5

Browse files
authored
Revert "Revert "[core] Fix wrong local resource view in raylet (#1991… (#19996)
This reverts commit f1eedb1. ## Why are these changes needed? Self node should avoid reading any updates from gcs for node resource change since it'll maintain local view by itself. ## Related issue number #19438
1 parent 398d4cb commit 99034f5

File tree

3 files changed

+89
-21
lines changed

3 files changed

+89
-21
lines changed

python/ray/tests/test_placement_group_3.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,5 +646,57 @@ def check_bundle_leaks():
646646
wait_for_condition(check_bundle_leaks)
647647

648648

649+
def test_placement_group_local_resource_view(monkeypatch, ray_start_cluster):
650+
"""Please refer to https://github.com/ray-project/ray/pull/19911
651+
for more details.
652+
"""
653+
with monkeypatch.context() as m:
654+
# Increase broadcasting interval so that node resource will arrive
655+
# at raylet after local resource all being allocated.
656+
m.setenv("RAY_raylet_report_resources_period_milliseconds", "2000")
657+
m.setenv("RAY_grpc_based_resource_broadcast", "true")
658+
cluster = ray_start_cluster
659+
660+
cluster.add_node(num_cpus=16, object_store_memory=1e9)
661+
cluster.wait_for_nodes()
662+
cluster.add_node(num_cpus=16, num_gpus=1)
663+
cluster.wait_for_nodes()
664+
NUM_CPU_BUNDLES = 30
665+
666+
@ray.remote(num_cpus=1)
667+
class Worker(object):
668+
def __init__(self, i):
669+
self.i = i
670+
671+
def work(self):
672+
time.sleep(0.1)
673+
print("work ", self.i)
674+
675+
@ray.remote(num_cpus=1, num_gpus=1)
676+
class Trainer(object):
677+
def __init__(self, i):
678+
self.i = i
679+
680+
def train(self):
681+
time.sleep(0.2)
682+
print("train ", self.i)
683+
684+
ray.init(address="auto")
685+
bundles = [{"CPU": 1, "GPU": 1}]
686+
bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)]
687+
pg = placement_group(bundles, strategy="PACK")
688+
ray.get(pg.ready())
689+
690+
# Local resource will be allocated and here we are to ensure
691+
# local view is consistent and node resouce updates are discarded
692+
workers = [
693+
Worker.options(placement_group=pg).remote(i)
694+
for i in range(NUM_CPU_BUNDLES)
695+
]
696+
trainer = Trainer.options(placement_group=pg).remote(0)
697+
ray.get([workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)])
698+
ray.get(trainer.train.remote())
699+
700+
649701
if __name__ == "__main__":
650702
sys.exit(pytest.main(["-sv", __file__]))

src/ray/gcs/gcs_server/gcs_resource_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ void GcsResourceManager::HandleUpdateResources(
5858
const rpc::UpdateResourcesRequest &request, rpc::UpdateResourcesReply *reply,
5959
rpc::SendReplyCallback send_reply_callback) {
6060
NodeID node_id = NodeID::FromBinary(request.node_id());
61-
RAY_LOG(INFO) << "Updating resources, node id = " << node_id;
61+
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
6262
auto changed_resources = std::make_shared<std::unordered_map<std::string, double>>();
6363
for (const auto &entry : request.resources()) {
6464
changed_resources->emplace(entry.first, entry.second.resource_capacity());

src/ray/raylet/node_manager.cc

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,15 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
890890
const ResourceSet &createUpdatedResources) {
891891
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from node id " << node_id
892892
<< " with created or updated resources: "
893-
<< createUpdatedResources.ToString() << ". Updating resource map.";
893+
<< createUpdatedResources.ToString() << ". Updating resource map."
894+
<< " skip=" << (node_id == self_node_id_);
895+
896+
// Skip updating local node since local node always has the latest information.
897+
// Updating local node could result in a inconsistence view in cluster resource
898+
// scheduler which could make task hang.
899+
if (node_id == self_node_id_) {
900+
return;
901+
}
894902

895903
// Update local_available_resources_ and SchedulingResources
896904
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
@@ -900,11 +908,7 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
900908
new_resource_capacity);
901909
}
902910
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
903-
904-
if (node_id == self_node_id_) {
905-
// The resource update is on the local node, check if we can reschedule tasks.
906-
cluster_task_manager_->ScheduleAndDispatchTasks();
907-
}
911+
cluster_task_manager_->ScheduleAndDispatchTasks();
908912
}
909913

910914
void NodeManager::ResourceDeleted(const NodeID &node_id,
@@ -916,7 +920,14 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
916920
}
917921
RAY_LOG(DEBUG) << "[ResourceDeleted] received callback from node id " << node_id
918922
<< " with deleted resources: " << oss.str()
919-
<< ". Updating resource map.";
923+
<< ". Updating resource map. skip=" << (node_id == self_node_id_);
924+
}
925+
926+
// Skip updating local node since local node always has the latest information.
927+
// Updating local node could result in a inconsistence view in cluster resource
928+
// scheduler which could make task hang.
929+
if (node_id == self_node_id_) {
930+
return;
920931
}
921932

922933
// Update local_available_resources_ and SchedulingResources
@@ -1474,39 +1485,44 @@ void NodeManager::HandleUpdateResourceUsage(
14741485
rpc::SendReplyCallback send_reply_callback) {
14751486
rpc::ResourceUsageBroadcastData resource_usage_batch;
14761487
resource_usage_batch.ParseFromString(request.serialized_resource_usage_batch());
1477-
1478-
if (resource_usage_batch.seq_no() != next_resource_seq_no_) {
1488+
// When next_resource_seq_no_ == 0 it means it just started.
1489+
// TODO: Fetch a snapshot from gcs for lightweight resource broadcasting
1490+
if (next_resource_seq_no_ != 0 &&
1491+
resource_usage_batch.seq_no() != next_resource_seq_no_) {
1492+
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
1493+
// pull a full resource "snapshot" from gcs to make sure our state doesn't
1494+
// diverge from GCS.
14791495
RAY_LOG(WARNING)
14801496
<< "Raylet may have missed a resource broadcast. This either means that GCS has "
14811497
"restarted, the network is heavily congested and is dropping, reordering, or "
14821498
"duplicating packets. Expected seq#: "
14831499
<< next_resource_seq_no_ << ", but got: " << resource_usage_batch.seq_no() << ".";
1484-
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
1485-
// pull a full resource "snapshot" from gcs to make sure our state doesn't
1486-
// diverge from GCS.
1500+
if (resource_usage_batch.seq_no() < next_resource_seq_no_) {
1501+
RAY_LOG(WARNING) << "Discard the the resource update since local version is newer";
1502+
return;
1503+
}
14871504
}
14881505
next_resource_seq_no_ = resource_usage_batch.seq_no() + 1;
14891506

14901507
for (const auto &resource_change_or_data : resource_usage_batch.batch()) {
14911508
if (resource_change_or_data.has_data()) {
14921509
const auto &resource_usage = resource_change_or_data.data();
1493-
const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id());
1494-
if (node_id == self_node_id_) {
1495-
// Skip messages from self.
1496-
continue;
1510+
auto node_id = NodeID::FromBinary(resource_usage.node_id());
1511+
// Skip messages from self.
1512+
if (node_id != self_node_id_) {
1513+
UpdateResourceUsage(node_id, resource_usage);
14971514
}
1498-
UpdateResourceUsage(node_id, resource_usage);
14991515
} else if (resource_change_or_data.has_change()) {
15001516
const auto &resource_notification = resource_change_or_data.change();
1501-
auto id = NodeID::FromBinary(resource_notification.node_id());
1517+
auto node_id = NodeID::FromBinary(resource_notification.node_id());
15021518
if (resource_notification.updated_resources_size() != 0) {
15031519
ResourceSet resource_set(
15041520
MapFromProtobuf(resource_notification.updated_resources()));
1505-
ResourceCreateUpdated(id, resource_set);
1521+
ResourceCreateUpdated(node_id, resource_set);
15061522
}
15071523

15081524
if (resource_notification.deleted_resources_size() != 0) {
1509-
ResourceDeleted(id,
1525+
ResourceDeleted(node_id,
15101526
VectorFromProtobuf(resource_notification.deleted_resources()));
15111527
}
15121528
}

0 commit comments

Comments
 (0)