Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 0 additions & 52 deletions python/ray/tests/test_placement_group_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,57 +646,5 @@ def check_bundle_leaks():
wait_for_condition(check_bundle_leaks)


def test_placement_group_local_resource_view(monkeypatch, ray_start_cluster):
"""Please refer to https://github.com/ray-project/ray/pull/19911
for more details.
"""
with monkeypatch.context() as m:
# Increase broadcasting interval so that node resource will arrive
# at raylet after local resource all being allocated.
m.setenv("RAY_raylet_report_resources_period_milliseconds", "2000")
m.setenv("RAY_grpc_based_resource_broadcast", "true")
cluster = ray_start_cluster

cluster.add_node(num_cpus=16, object_store_memory=1e9)
cluster.wait_for_nodes()
cluster.add_node(num_cpus=16, num_gpus=1)
cluster.wait_for_nodes()
NUM_CPU_BUNDLES = 30

@ray.remote(num_cpus=1)
class Worker(object):
def __init__(self, i):
self.i = i

def work(self):
time.sleep(0.1)
print("work ", self.i)

@ray.remote(num_cpus=1, num_gpus=1)
class Trainer(object):
def __init__(self, i):
self.i = i

def train(self):
time.sleep(0.2)
print("train ", self.i)

ray.init(address="auto")
bundles = [{"CPU": 1, "GPU": 1}]
bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)]
pg = placement_group(bundles, strategy="PACK")
ray.get(pg.ready())

# Local resource will be allocated and here we are to ensure
# local view is consistent and node resouce updates are discarded
workers = [
Worker.options(placement_group=pg).remote(i)
for i in range(NUM_CPU_BUNDLES)
]
trainer = Trainer.options(placement_group=pg).remote(0)
ray.get([workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)])
ray.get(trainer.train.remote())


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void GcsResourceManager::HandleUpdateResources(
const rpc::UpdateResourcesRequest &request, rpc::UpdateResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
RAY_LOG(INFO) << "Updating resources, node id = " << node_id;
auto changed_resources = std::make_shared<std::unordered_map<std::string, double>>();
for (const auto &entry : request.resources()) {
changed_resources->emplace(entry.first, entry.second.resource_capacity());
Expand Down
40 changes: 18 additions & 22 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -891,9 +891,6 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from node id " << node_id
<< " with created or updated resources: "
<< createUpdatedResources.ToString() << ". Updating resource map.";
if (node_id == self_node_id_) {
return;
}

// Update local_available_resources_ and SchedulingResources
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
Expand All @@ -903,7 +900,11 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
new_resource_capacity);
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
cluster_task_manager_->ScheduleAndDispatchTasks();

if (node_id == self_node_id_) {
// The resource update is on the local node, check if we can reschedule tasks.
cluster_task_manager_->ScheduleAndDispatchTasks();
}
}

void NodeManager::ResourceDeleted(const NodeID &node_id,
Expand Down Expand Up @@ -1473,44 +1474,39 @@ void NodeManager::HandleUpdateResourceUsage(
rpc::SendReplyCallback send_reply_callback) {
rpc::ResourceUsageBroadcastData resource_usage_batch;
resource_usage_batch.ParseFromString(request.serialized_resource_usage_batch());
// When next_resource_seq_no_ == 0 it means it just started.
// TODO: Fetch a snapshot from gcs for lightweight resource broadcasting
if (next_resource_seq_no_ != 0 &&
resource_usage_batch.seq_no() != next_resource_seq_no_) {
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.

if (resource_usage_batch.seq_no() != next_resource_seq_no_) {
RAY_LOG(WARNING)
<< "Raylet may have missed a resource broadcast. This either means that GCS has "
"restarted, the network is heavily congested and is dropping, reordering, or "
"duplicating packets. Expected seq#: "
<< next_resource_seq_no_ << ", but got: " << resource_usage_batch.seq_no() << ".";
if (resource_usage_batch.seq_no() < next_resource_seq_no_) {
RAY_LOG(WARNING) << "Discard the the resource update since local version is newer";
return;
}
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.
}
next_resource_seq_no_ = resource_usage_batch.seq_no() + 1;

for (const auto &resource_change_or_data : resource_usage_batch.batch()) {
if (resource_change_or_data.has_data()) {
const auto &resource_usage = resource_change_or_data.data();
auto node_id = NodeID::FromBinary(resource_usage.node_id());
// Skip messages from self.
if (node_id != self_node_id_) {
UpdateResourceUsage(node_id, resource_usage);
const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id());
if (node_id == self_node_id_) {
// Skip messages from self.
continue;
}
UpdateResourceUsage(node_id, resource_usage);
} else if (resource_change_or_data.has_change()) {
const auto &resource_notification = resource_change_or_data.change();
auto node_id = NodeID::FromBinary(resource_notification.node_id());
auto id = NodeID::FromBinary(resource_notification.node_id());
if (resource_notification.updated_resources_size() != 0) {
ResourceSet resource_set(
MapFromProtobuf(resource_notification.updated_resources()));
ResourceCreateUpdated(node_id, resource_set);
ResourceCreateUpdated(id, resource_set);
}

if (resource_notification.deleted_resources_size() != 0) {
ResourceDeleted(node_id,
ResourceDeleted(id,
VectorFromProtobuf(resource_notification.deleted_resources()));
}
}
Expand Down