Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ void GetUncommittedLineageHelper(const TaskID &task_id, const Lineage &lineage_f
}
}

Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id,
const ClientID &node_id) const {
Lineage LineageCache::GetUncommittedLineageOrDie(const TaskID &task_id,
const ClientID &node_id) const {
Lineage uncommitted_lineage;
// Add all uncommitted ancestors from the lineage cache to the uncommitted
// lineage of the requested task.
Expand Down Expand Up @@ -445,7 +445,7 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
UnsubscribeTask(task_id);
}

const Task &LineageCache::GetTask(const TaskID &task_id) const {
const Task &LineageCache::GetTaskOrDie(const TaskID &task_id) const {
const auto &entries = lineage_.GetEntries();
auto it = entries.find(task_id);
RAY_CHECK(it != entries.end());
Expand Down
8 changes: 5 additions & 3 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,13 @@ class LineageCache {
/// The uncommitted lineage consists of all tasks in the given task's lineage
/// that have not been committed in the GCS, as far as we know.
///
/// \param task_id The ID of the task to get the uncommitted lineage for.
/// \param task_id The ID of the task to get the uncommitted lineage for. It is
/// a fatal error if the task is not found.
/// \param node_id The ID of the receiving node.
/// \return The uncommitted, unforwarded lineage of the task. The returned lineage
/// includes the entry for the requested entry_id.
Lineage GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const;
Lineage GetUncommittedLineageOrDie(const TaskID &task_id,
const ClientID &node_id) const;

/// Handle the commit of a task entry in the GCS. This attempts to evict the
/// task if possible.
Expand All @@ -279,7 +281,7 @@ class LineageCache {
///
/// \param task_id The ID of the task to get.
/// \return A const reference to the task data.
const Task &GetTask(const TaskID &task_id) const;
const Task &GetTaskOrDie(const TaskID &task_id) const;

/// Get whether the lineage cache contains the task.
///
Expand Down
40 changes: 20 additions & 20 deletions src/ray/raylet/lineage_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ std::vector<ObjectID> InsertTaskChain(LineageCache &lineage_cache,
return arguments;
}

TEST_F(LineageCacheTest, TestGetUncommittedLineage) {
TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) {
// Insert two independent chains of tasks.
std::vector<Task> tasks1;
auto return_values1 =
Expand All @@ -160,7 +160,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) {

// Get the uncommitted lineage for the last task (the leaf) of one of the chains.
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(task_ids1.back(), ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::nil());
// Check that the uncommitted lineage is exactly equal to the first chain of tasks.
ASSERT_EQ(task_ids1.size(), uncommitted_lineage.GetEntries().size());
for (auto &task_id : task_ids1) {
Expand All @@ -180,8 +180,8 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) {
}

// Get the uncommitted lineage for the inserted task.
uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(combined_task_ids.back(), ClientID::nil());
uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(
combined_task_ids.back(), ClientID::nil());
// Check that the uncommitted lineage is exactly equal to the entire set of
// tasks inserted so far.
ASSERT_EQ(combined_task_ids.size(), uncommitted_lineage.GetEntries().size());
Expand All @@ -207,9 +207,9 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) {
lineage_cache_.MarkTaskAsForwarded(forwarded_task_id, node_id);

auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id);
lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id);
auto uncommitted_lineage_all =
lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id2);
lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id2);

ASSERT_EQ(1, uncommitted_lineage.GetEntries().size());
ASSERT_EQ(4, uncommitted_lineage_all.GetEntries().size());
Expand All @@ -218,7 +218,7 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) {
// Check that lineage of requested task includes itself, regardless of whether
// it has been forwarded before.
auto uncommitted_lineage_forwarded =
lineage_cache_.GetUncommittedLineage(forwarded_task_id, node_id);
lineage_cache_.GetUncommittedLineageOrDie(forwarded_task_id, node_id);
ASSERT_EQ(1, uncommitted_lineage_forwarded.GetEntries().size());
}

Expand Down Expand Up @@ -284,8 +284,8 @@ TEST_F(LineageCacheTest, TestEvictChain) {
// the flushed task, but its lineage should not be evicted yet.
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
.GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
tasks.size());
Expand All @@ -297,8 +297,8 @@ TEST_F(LineageCacheTest, TestEvictChain) {
mock_gcs_.RemoteAdd(tasks.at(1).GetTaskSpecification().TaskId(), task_data));
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
.GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
tasks.size());
Expand Down Expand Up @@ -334,8 +334,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) {
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks);
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(),
ClientID::nil())
.GetUncommittedLineageOrDie(child_task.GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
total_tasks);
Expand All @@ -350,8 +350,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) {
// since the parent tasks have no dependencies.
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks);
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(),
ClientID::nil())
.GetUncommittedLineageOrDie(
child_task.GetTaskSpecification().TaskId(), ClientID::nil())
.GetEntries()
.size(),
total_tasks);
Expand All @@ -376,7 +376,7 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) {
const auto task_id = it->GetTaskSpecification().TaskId();
// Simulate removing the task and forwarding it to another node.
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
// Simulate receiving the task again. Make sure we can add the task back.
flatbuffers::FlatBufferBuilder fbb;
Expand All @@ -400,7 +400,7 @@ TEST_F(LineageCacheTest, TestForwardTask) {
tasks.erase(it);
auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId();
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove));
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), 3);

Expand Down Expand Up @@ -450,7 +450,7 @@ TEST_F(LineageCacheTest, TestEviction) {
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);

// Simulate executing the first task on a remote node and adding it to the
Expand Down Expand Up @@ -484,7 +484,7 @@ TEST_F(LineageCacheTest, TestEviction) {
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_);
// The remaining task should have no uncommitted lineage.
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), 1);
Expand All @@ -510,7 +510,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), lineage_size);

Expand Down
16 changes: 12 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
// Use a copy of the cached task spec to re-execute the task.
const Task task = lineage_cache_.GetTask(task_id);
const Task task = lineage_cache_.GetTaskOrDie(task_id);
Copy link
Contributor Author

@ericl ericl Jan 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't check fail in this function. However, the issue now is that you can't fail a task without the TaskSpec.

Is there a function from TaskID -> ReturnIDs?

Copy link
Contributor

@stephanie-wang stephanie-wang Jan 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use ComputeReturnId. Unfortunately, we can't know how many return values to put the error for without the task spec...I guess the safest option for now is to just put one return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I returned like 10? Could that cause an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For actor tasks, the last return value is the dummy object, which isn't supposed to have any value in the object store...this could potentially break other parts of the raylet, but I'm not totally sure.

ResubmitTask(task);

}));
Expand Down Expand Up @@ -1903,9 +1903,17 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
auto task_id = spec.TaskId();

// Get and serialize the task's unforwarded, uncommitted lineage.
auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id);
Task &lineage_cache_entry_task =
uncommitted_lineage.GetEntryMutable(task_id)->TaskDataMutable();
Lineage uncommitted_lineage;
if (lineage_cache_.ContainsTask(task_id)) {
uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(task_id, node_id);
} else {
// TODO: We expected the lineage to be in cache, but it was evicted (#3813).
// This is a bug but is not fatal to the application.
RAY_DCHECK(false) << "No lineage cache entry found for task " << task_id;
uncommitted_lineage.SetEntry(task, GcsStatus::NONE);
}
auto entry = uncommitted_lineage.GetEntryMutable(task_id);
Task &lineage_cache_entry_task = entry->TaskDataMutable();

// Increment forward count for the forwarded task.
lineage_cache_entry_task.IncrementNumForwards();
Expand Down
9 changes: 5 additions & 4 deletions src/ray/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ enum class RayLogLevel { DEBUG = -1, INFO = 0, WARNING = 1, ERROR = 2, FATAL = 3

#ifdef NDEBUG

#define RAY_DCHECK(condition) \
RAY_IGNORE_EXPR(condition); \
while (false) ::ray::RayLogBase()

#define RAY_DCHECK(condition) \
(condition) ? RAY_IGNORE_EXPR(0) \
: ::ray::Voidify() & \
::ray::RayLog(__FILE__, __LINE__, ray::RayLogLevel::ERROR) \
<< " Debug check failed: " #condition " "
#else

#define RAY_DCHECK(condition) RAY_CHECK(condition)
Expand Down