-
Notifications
You must be signed in to change notification settings - Fork 8
Antalya 25.3: lock_object_storage_task_distribution_ms setting #866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
258bbbb
56af128
27fb928
fd36f73
d851e30
d1f4b4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,10 +8,12 @@ namespace DB | |
|
||
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( | ||
std::shared_ptr<IObjectIterator> iterator_, | ||
std::vector<std::string> ids_of_nodes_) | ||
std::vector<std::string> ids_of_nodes_, | ||
uint64_t lock_object_storage_task_distribution_ms_) | ||
: iterator(std::move(iterator_)) | ||
, connection_to_files(ids_of_nodes_.size()) | ||
, ids_of_nodes(ids_of_nodes_) | ||
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Poco::Timestamp is in microseconds, so here convert from milliseconds to microseconds to avoid multiplications or divides later.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a slight chance of overflow with large positive value (mot likely unintentionally, due to misconfiguration or error) becoming a negative one, since IDK if it is even worth it to do some overflow checks here, but please makes sure that negative values do not break anything down the road. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add checks for |
||
, iterator_exhausted(false) | ||
{ | ||
} | ||
|
@@ -24,6 +26,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz | |
number_of_current_replica | ||
); | ||
|
||
saveLastNodeActivity(number_of_current_replica); | ||
|
||
// 1. Check pre-queued files first | ||
if (auto file = getPreQueuedFile(number_of_current_replica)) | ||
return file; | ||
|
@@ -148,7 +152,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile | |
// Queue file for its assigned replica | ||
{ | ||
std::lock_guard lock(mutex); | ||
unprocessed_files.insert(file_path); | ||
unprocessed_files[file_path] = number_of_current_replica; | ||
connection_to_files[file_replica_idx].push_back(file_path); | ||
} | ||
} | ||
|
@@ -158,25 +162,64 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile | |
|
||
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) | ||
{ | ||
/// Limit time of node activity to keep task in queue | ||
Poco::Timestamp activity_limit; | ||
Poco::Timestamp oldest_activity; | ||
if (lock_object_storage_task_distribution_us > 0) | ||
activity_limit -= lock_object_storage_task_distribution_us; | ||
|
||
std::lock_guard lock(mutex); | ||
|
||
if (!unprocessed_files.empty()) | ||
{ | ||
auto it = unprocessed_files.begin(); | ||
String next_file = *it; | ||
unprocessed_files.erase(it); | ||
|
||
while (it != unprocessed_files.end()) | ||
{ | ||
auto last_activity = last_node_activity.find(it->second); | ||
if (lock_object_storage_task_distribution_us <= 0 | ||
|| last_activity == last_node_activity.end() | ||
|| activity_limit > last_activity->second) | ||
{ | ||
String next_file = it->first; | ||
unprocessed_files.erase(it); | ||
|
||
LOG_TRACE( | ||
log, | ||
"Iterator exhausted. Assigning unprocessed file {} to replica {}", | ||
next_file, | ||
number_of_current_replica | ||
); | ||
|
||
return next_file; | ||
} | ||
|
||
oldest_activity = std::min(oldest_activity, last_activity->second); | ||
++it; | ||
} | ||
|
||
LOG_TRACE( | ||
log, | ||
"Iterator exhausted. Assigning unprocessed file {} to replica {}", | ||
next_file, | ||
number_of_current_replica | ||
"No unprocessed file for replica {}, need to retry after {} us", | ||
number_of_current_replica, | ||
oldest_activity - activity_limit | ||
); | ||
|
||
return next_file; | ||
/// All unprocessed files owned by alive replicas with recenlty activity | ||
/// Need to retry after (oldest_activity - activity_limit) microseconds | ||
RelativePathWithMetadata::CommandInTaskResponse response; | ||
response.set_retry_after_us(oldest_activity - activity_limit); | ||
return response.to_string(); | ||
} | ||
|
||
return std::nullopt; | ||
} | ||
|
||
void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) | ||
{ | ||
Poco::Timestamp now; | ||
std::lock_guard lock(mutex); | ||
last_node_activity[number_of_current_replica] = now; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do you plan to implement this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need more deep understanding how to do it, plan to make separate PR later.
In any case need to do it before sending to upstream, now it is just experimental level of code quality.