Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
)", EXPERIMENTAL) \
\

Expand Down
5 changes: 5 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.3.3.20000",
{
// Altinity Antalya modifications atop of 25.3
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.2.1.20000",
{
// Altinity Antalya modifications atop of 25.2
Expand Down
36 changes: 36 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include <Common/Exception.h>
#include <Common/ObjectStorageKeyGenerator.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>


namespace DB
{
Expand Down Expand Up @@ -107,4 +111,36 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage)
}
}

RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
try
{
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
if (!json)
return;

successfully_parsed = true;

if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
}

std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
{
Poco::JSON::Object json;
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

}
32 changes: 28 additions & 4 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,37 @@ struct ObjectMetadata

struct RelativePathWithMetadata
{
class CommandInTaskResponse
{
public:
CommandInTaskResponse() {}
CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }

std::string to_string() const;

std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }

private:
bool successfully_parsed = false;
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
};

String relative_path;
std::optional<ObjectMetadata> metadata;
CommandInTaskResponse command;

RelativePathWithMetadata() = default;

explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: relative_path(std::move(relative_path_))
, metadata(std::move(metadata_))
{}
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: metadata(std::move(metadata_))
, command(task_string)
{
if (!command.is_parsed())
relative_path = task_string;
}

virtual ~RelativePathWithMetadata() = default;

Expand All @@ -85,6 +107,8 @@ struct RelativePathWithMetadata
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }

void loadMetadata(ObjectStoragePtr object_storage);

const CommandInTaskResponse & getCommand() const { return command; }
};

struct ObjectKeyWithMetadata
Expand Down
19 changes: 18 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ namespace Setting
{
extern const SettingsBool use_hive_partitioning;
extern const SettingsString object_storage_cluster;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
}

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FUNCTION;
extern const int NOT_IMPLEMENTED;
extern const int INVALID_SETTING_VALUE;
}


Expand Down Expand Up @@ -386,7 +388,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
}
}

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];

/// Check value to avoid negative result after conversion in microseconds.
/// Poco::Timestamp::TimeDiff is signed int 64.
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
lock_object_storage_task_distribution_ms,
lock_object_storage_task_distribution_ms_max
);

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
iterator,
ids_of_hosts,
lock_object_storage_task_distribution_ms);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
Expand Down
25 changes: 23 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/VirtualColumnUtils.h>
Expand Down Expand Up @@ -430,16 +431,36 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
ObjectInfoPtr object_info;
auto query_settings = configuration->getQuerySettings(context_);

bool not_a_path = false;

do
{
not_a_path = false;
object_info = file_iterator->next(processor);

if (!object_info || object_info->getPath().empty())
if (!object_info)
return {};

if (object_info->getCommand().is_parsed())
{
auto retry_after_us = object_info->getCommand().get_retry_after_us();
if (retry_after_us.has_value())
{
not_a_path = true;
/// TODO: Make asyncronous waiting without sleep in thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// TODO: Make asyncronous waiting without sleep in thread

When do you plan to implement this TODO?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need more deep understanding how to do it, plan to make separate PR later.
In any case need to do it before sending to upstream, now it is just experimental level of code quality.

/// Now this sleep is on executor node in worker thread
/// Does not block query initiator
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
continue;
}
}

if (object_info->getPath().empty())
return {};

object_info->loadMetadata(object_storage);
}
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));

QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ namespace DB

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_)
std::vector<std::string> ids_of_nodes_,
uint64_t lock_object_storage_task_distribution_ms_)
: iterator(std::move(iterator_))
, connection_to_files(ids_of_nodes_.size())
, ids_of_nodes(ids_of_nodes_)
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poco::Timestamp is in microseconds, so here convert from milliseconds to microseconds to avoid multiplications or divides later.

Copy link
Member

@Enmk Enmk Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight chance of overflow with large positive value (mot likely unintentionally, due to misconfiguration or error) becoming a negative one, since Poco::Timestamp::TimeDiff is signed int64.

IDK if it is even worth it to do some overflow checks here, but please makes sure that negative values do not break anything down the road.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add checks for lock_object_storage_task_distribution_ms in getTaskIteratorExtension before constructor called.

, iterator_exhausted(false)
{
}
Expand All @@ -24,6 +26,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
number_of_current_replica
);

saveLastNodeActivity(number_of_current_replica);

// 1. Check pre-queued files first
if (auto file = getPreQueuedFile(number_of_current_replica))
return file;
Expand Down Expand Up @@ -148,7 +152,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
// Queue file for its assigned replica
{
std::lock_guard lock(mutex);
unprocessed_files.insert(file_path);
unprocessed_files[file_path] = number_of_current_replica;
connection_to_files[file_replica_idx].push_back(file_path);
}
}
Expand All @@ -158,25 +162,64 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile

std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
{
/// Limit time of node activity to keep task in queue
Poco::Timestamp activity_limit;
Poco::Timestamp oldest_activity;
if (lock_object_storage_task_distribution_us > 0)
activity_limit -= lock_object_storage_task_distribution_us;

std::lock_guard lock(mutex);

if (!unprocessed_files.empty())
{
auto it = unprocessed_files.begin();
String next_file = *it;
unprocessed_files.erase(it);

while (it != unprocessed_files.end())
{
auto last_activity = last_node_activity.find(it->second);
if (lock_object_storage_task_distribution_us <= 0
|| last_activity == last_node_activity.end()
|| activity_limit > last_activity->second)
{
String next_file = it->first;
unprocessed_files.erase(it);

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
);

return next_file;
}

oldest_activity = std::min(oldest_activity, last_activity->second);
++it;
}

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
"No unprocessed file for replica {}, need to retry after {} us",
number_of_current_replica,
oldest_activity - activity_limit
);

return next_file;
/// All unprocessed files owned by alive replicas with recenlty activity
/// Need to retry after (oldest_activity - activity_limit) microseconds
RelativePathWithMetadata::CommandInTaskResponse response;
response.set_retry_after_us(oldest_activity - activity_limit);
return response.to_string();
}

return std::nullopt;
}

void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
{
Poco::Timestamp now;
std::lock_guard lock(mutex);
last_node_activity[number_of_current_replica] = now;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
#include <Interpreters/Cluster.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>

#include <Poco/Timestamp.h>

#include <unordered_set>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <memory>
Expand All @@ -18,7 +22,8 @@ class StorageObjectStorageStableTaskDistributor
public:
StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_);
std::vector<std::string> ids_of_nodes_,
uint64_t lock_object_storage_task_distribution_ms_);

std::optional<String> getNextTask(size_t number_of_current_replica);

Expand All @@ -28,12 +33,17 @@ class StorageObjectStorageStableTaskDistributor
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);

void saveLastNodeActivity(size_t number_of_current_replica);

std::shared_ptr<IObjectIterator> iterator;

std::vector<std::vector<String>> connection_to_files;
std::unordered_set<String> unprocessed_files;
/// Map of unprocessed files in format filename => number of prefetched replica
std::unordered_map<String, size_t> unprocessed_files;

std::vector<std::string> ids_of_nodes;
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;

std::mutex mutex;
bool iterator_exhausted = false;
Expand Down
Loading
Loading