Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions java/runtime/src/main/resources/ray.default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ ray {

// See src/ray/ray_config_def.h for options.
config {
// Maximum size of an inline object (bytes).
inline_object_max_size_bytes: 0
}
}

Expand Down
58 changes: 1 addition & 57 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,14 +1151,8 @@ def test_object_transfer_dump(ray_start_cluster):
cluster = ray_start_cluster

num_nodes = 3
# Set the inline object size to 0 to force all objects to be written to
# plasma.
config = json.dumps({"inline_object_max_size_bytes": 0})
for i in range(num_nodes):
cluster.add_node(
resources={str(i): 1},
object_store_memory=10**9,
_internal_config=config)
cluster.add_node(resources={str(i): 1}, object_store_memory=10**9)
ray.init(redis_address=cluster.redis_address)

@ray.remote
Expand Down Expand Up @@ -2659,56 +2653,6 @@ def f():
assert len(ready_ids) == 1


def test_inline_objects(shutdown_only):
config = json.dumps({"initial_reconstruction_timeout_milliseconds": 200})
ray.init(num_cpus=1, object_store_memory=10**7, _internal_config=config)

@ray.remote
class Actor(object):
def create_inline_object(self):
return "inline"

def create_non_inline_object(self):
return 10000 * [1]

def get(self):
return

a = Actor.remote()
# Count the number of objects that were successfully inlined.
inlined = 0
for _ in range(100):
inline_object = a.create_inline_object.remote()
ray.get(inline_object)
plasma_id = ray.pyarrow.plasma.ObjectID(inline_object.binary())
ray.worker.global_worker.plasma_client.delete([plasma_id])
# Make sure we can still get an inlined object created by an actor even
# after it has been evicted.
try:
value = ray.get(inline_object)
assert value == "inline"
inlined += 1
except ray.exceptions.UnreconstructableError:
pass
# Make sure some objects were inlined. Some of them may not get inlined
# because we evict the object soon after creating it.
assert inlined > 0

# Non-inlined objects are not able to be recreated after eviction.
for _ in range(10):
non_inline_object = a.create_non_inline_object.remote()
ray.get(non_inline_object)
plasma_id = ray.pyarrow.plasma.ObjectID(non_inline_object.binary())
# This while loop is necessary because sometimes the object is still
# there immediately after plasma_client.delete.
while ray.worker.global_worker.plasma_client.contains(plasma_id):
ray.worker.global_worker.plasma_client.delete([plasma_id])
# Objects created by an actor that were evicted and larger than the
# maximum inline object size cannot be retrieved or reconstructed.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(non_inline_object) == 10000 * [1]


def test_ray_setproctitle(shutdown_only):
ray.init(num_cpus=2)

Expand Down
7 changes: 6 additions & 1 deletion python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def set_weights(self, x):
def test_object_transfer_retry(ray_start_empty_cluster):
cluster = ray_start_empty_cluster

repeated_push_delay = 10
repeated_push_delay = 4

# Force the sending object manager to allow duplicate pushes again sooner.
# Also, force the receiving object manager to retry the Pull sooner. We
Expand Down Expand Up @@ -262,6 +262,11 @@ def f(size):
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)

end_time = time.time()
# Make sure that the first time the objects get transferred, it happens
# quickly.
assert end_time - start_time < repeated_push_delay

# Get the objects again and make sure they get transferred.
xs = ray.get(x_ids)
end_transfer_time = time.time()
Expand Down
8 changes: 0 additions & 8 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,10 @@ table FunctionTableData {
table ObjectTableData {
// The size of the object.
object_size: long;
// Is object in-lined? Inline objects are objects whose data and metadata are
// inlined in the GCS object table entry, which normally only specifies
// the object location.
inline_object_flag: bool;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// In-line object data.
inline_object_data: [ubyte];
// In-line object metadata.
inline_object_metadata: [ubyte];
}

table TaskReconstructionData {
Expand Down
112 changes: 25 additions & 87 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,41 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,

namespace {

/// Process a suffix of the object table log.
/// If object is inlined (inline_object_flag = TRUE), its data and metadata are
/// stored with the object's entry so we read them into inline_object_data, and
/// inline_object_metadata, respectively.
/// If object is not inlined, store the result in client_ids.
/// This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix.
/// This function also stores a bool in has_been_created indicating whether the
/// object has ever been created before.
/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *inline_object_flag,
std::vector<uint8_t> *inline_object_data,
std::string *inline_object_metadata, bool *has_been_created) {
bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
// client1.is_eviction = true
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
//
// If object is inlined each entry contains both the object's data and metadata,
// so we don't care about its location.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
*has_been_created = true;
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (object_table_data.inline_object_flag) {
if (!*inline_object_flag) {
// This is the first time we're receiving the inline object data. Read
// object's data from the GCS entry.
*inline_object_flag = object_table_data.inline_object_flag;
inline_object_data->assign(object_table_data.inline_object_data.begin(),
object_table_data.inline_object_data.end());
inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(),
object_table_data.inline_object_metadata.end());
}
// We got the data and metadata of the object so exit the loop.
break;
}

if (!object_table_data.is_eviction) {
client_ids->insert(client_id);
} else {
client_ids->erase(client_id);
}
}

if (!*inline_object_flag) {
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
}
}
Expand All @@ -88,8 +62,6 @@ void ObjectDirectory::RegisterBackend() {
// Update entries for this object.
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.inline_object_flag, &it->second.inline_object_data,
&it->second.inline_object_metadata,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
Expand All @@ -102,8 +74,6 @@ void ObjectDirectory::RegisterBackend() {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
it->second.inline_object_flag, it->second.inline_object_data,
it->second.inline_object_metadata,
it->second.has_been_created);
}
};
Expand All @@ -114,25 +84,13 @@ void ObjectDirectory::RegisterBackend() {

ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const plasma::ObjectBuffer &plasma_buffer) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
const object_manager::protocol::ObjectInfoT &object_info) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id;
// Append the addition entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->object_size = object_info.data_size;
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(
plasma_buffer.data->data(),
plasma_buffer.data->data() + plasma_buffer.data->size());
data->inline_object_metadata.assign(
plasma_buffer.metadata->data(),
plasma_buffer.metadata->data() + plasma_buffer.metadata->size());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
return status;
Expand Down Expand Up @@ -184,19 +142,16 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
UpdateObjectLocations(
{}, gcs_client_->client_table(), &listener.second.current_object_locations,
&listener.second.inline_object_flag, &listener.second.inline_object_data,
&listener.second.inline_object_metadata, &listener.second.has_been_created);
UpdateObjectLocations({}, gcs_client_->client_table(),
&listener.second.current_object_locations,
&listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(
object_id, listener.second.current_object_locations,
listener.second.inline_object_flag, listener.second.inline_object_data,
listener.second.inline_object_metadata, listener.second.has_been_created);
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.has_been_created);
}
}
}
Expand All @@ -222,14 +177,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
auto inline_object_flag = listener_state.inline_object_flag;
const auto &inline_object_data = listener_state.inline_object_data;
const auto &inline_object_metadata = listener_state.inline_object_metadata;
io_service_.post([callback, locations, inline_object_flag, inline_object_data,
inline_object_metadata, object_id]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata,
/*has_been_created=*/true);
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
});
}
return status;
Expand Down Expand Up @@ -262,31 +211,20 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool inline_object_flag = false;
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &inline_object_flag, &inline_object_data,
&inline_object_metadata, &has_been_created);
&client_ids, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
callback(object_id, client_ids, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
// If object inlined, we already have the object's data.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
bool inline_object_flag = it->second.inline_object_flag;
const auto &inline_object_data = it->second.inline_object_data;
const auto &inline_object_metadata = it->second.inline_object_metadata;
io_service_.post([callback, object_id, locations, inline_object_flag,
inline_object_data, inline_object_metadata, has_been_created]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
});
}
return status;
Expand Down
29 changes: 7 additions & 22 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;

/// Callback for object location notifications.
using OnLocationsFound = std::function<void(
const ray::ObjectID &object_id, const std::unordered_set<ray::ClientID> &, bool,
const std::vector<uint8_t> &, const std::string &, bool has_been_created)>;
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;

/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
Expand Down Expand Up @@ -101,14 +101,10 @@ class ObjectDirectoryInterface {
/// \param object_id The object id that was put into the store.
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param plasma_buffer Object data and metadata from plasma. This data is
/// only valid for inlined objects (i.e., when inline_object_flag=true).
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const plasma::ObjectBuffer &plasma_buffer) = 0;
const object_manager::protocol::ObjectInfoT &object_info) = 0;

/// Report objects removed from this client's store to the object directory.
///
Expand Down Expand Up @@ -160,11 +156,9 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id) override;

ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const plasma::ObjectBuffer &plasma_buffer) override;

ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;

Expand All @@ -182,15 +176,6 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// Specify whether the object is inlined. The data and the metadata of
/// an inlined object are stored in the object's GCS entry. In this flag
/// (i.e., the object is inlined) the content of current_object_locations
/// can be ignored.
bool inline_object_flag;
/// Inlined object data, if inline_object_flag == true.
std::vector<uint8_t> inline_object_data;
/// Inlined object metadata, if inline_object_flag == true.
std::string inline_object_metadata;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
Expand Down
Loading