diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 380a2ab33e03..1aff9006c305 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. +)", EXPERIMENTAL) \ + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( +In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. )", EXPERIMENTAL) \ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 987a1ee7598d..9fb07c7ef5b5 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -66,6 +66,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.3.3.20000", + { + // Altinity Antalya modifications atop of 25.3 + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + }); addSettingsChanges(settings_changes_history, "25.2.1.20000", { // Altinity Antalya modifications atop of 25.2 diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index f729b7ce6913..f8917e20e7de 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,10 @@ #include #include +#include +#include +#include + namespace DB { @@ -107,4 +111,36 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage) } } +RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +{ + Poco::JSON::Parser parser; + try + { + auto json = parser.parse(task).extract(); + if (!json) + return; + + successfully_parsed = true; + + if (json->has("retry_after_us")) + retry_after_us = json->getValue("retry_after_us"); + } + catch (const Poco::JSON::JSONException &) + { /// Not a JSON + return; + } +} + +std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +{ + Poco::JSON::Object json; + if (retry_after_us.has_value()) + json.set("retry_after_us", retry_after_us.value()); + + std::ostringstream oss; + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 417fa10e5212..15bacb1ca0eb 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -66,15 +66,37 @@ struct ObjectMetadata struct RelativePathWithMetadata { + class CommandInTaskResponse + { + public: + CommandInTaskResponse() {} + CommandInTaskResponse(const std::string & task); + + bool is_parsed() const { return successfully_parsed; } + void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + + std::string to_string() const; + + std::optional get_retry_after_us() const { return retry_after_us; } + + private: + bool successfully_parsed = false; + std::optional retry_after_us; + }; + String relative_path; std::optional metadata; + CommandInTaskResponse command; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(String relative_path_, std::optional metadata_ = std::nullopt) - : relative_path(std::move(relative_path_)) - , metadata(std::move(metadata_)) - {} + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) + : metadata(std::move(metadata_)) + , command(task_string) + { + if (!command.is_parsed()) + relative_path = task_string; + } virtual ~RelativePathWithMetadata() = default; @@ -85,6 +107,8 @@ struct RelativePathWithMetadata virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } void loadMetadata(ObjectStoragePtr object_storage); + + const CommandInTaskResponse & getCommand() const { return command; } }; struct ObjectKeyWithMetadata diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 6fdb2c0d4b9d..cdfad7f646cb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -27,6 +27,7 @@ namespace Setting { extern const SettingsBool use_hive_partitioning; extern const SettingsString object_storage_cluster; + extern const SettingsUInt64 lock_object_storage_task_distribution_ms; } namespace ErrorCodes @@ -34,6 +35,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int UNKNOWN_FUNCTION; extern const int NOT_IMPLEMENTED; + extern const int INVALID_SETTING_VALUE; } @@ -386,7 +388,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten } } - auto task_distributor = std::make_shared(iterator, ids_of_hosts); + uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]; + + /// Check value to avoid negative result after conversion in microseconds. + /// Poco::Timestamp::TimeDiff is signed int 64. + static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL; + if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, + "Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}", + lock_object_storage_task_distribution_ms, + lock_object_storage_task_distribution_ms_max + ); + + auto task_distributor = std::make_shared( + iterator, + ids_of_hosts, + lock_object_storage_task_distribution_ms); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index f51e0e5860f2..814b99d78032 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -430,16 +431,36 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); + bool not_a_path = false; + do { + not_a_path = false; object_info = file_iterator->next(processor); - if (!object_info || object_info->getPath().empty()) + if (!object_info) + return {}; + + if (object_info->getCommand().is_parsed()) + { + auto retry_after_us = object_info->getCommand().get_retry_after_us(); + if (retry_after_us.has_value()) + { + not_a_path = true; + /// TODO: Make asyncronous waiting without sleep in thread + /// Now this sleep is on executor node in worker thread + /// Does not block query initiator + sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); + continue; + } + } + + if (object_info->getPath().empty()) return {}; object_info->loadMetadata(object_storage); } - while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0); + while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); QueryPipelineBuilder builder; std::shared_ptr source; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d9ca7b344637..b9c6e1e3ce0b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -8,10 +8,12 @@ namespace DB StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::vector ids_of_nodes_) + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_) : iterator(std::move(iterator_)) , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(ids_of_nodes_) + , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { } @@ -24,6 +26,8 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz number_of_current_replica ); + saveLastNodeActivity(number_of_current_replica); + // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -148,7 +152,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.insert(file_path); + unprocessed_files[file_path] = number_of_current_replica; connection_to_files[file_replica_idx].push_back(file_path); } } @@ -158,25 +162,64 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) { + /// Limit time of node activity to keep task in queue + Poco::Timestamp activity_limit; + Poco::Timestamp oldest_activity; + if (lock_object_storage_task_distribution_us > 0) + activity_limit -= lock_object_storage_task_distribution_us; + std::lock_guard lock(mutex); if (!unprocessed_files.empty()) { auto it = unprocessed_files.begin(); - String next_file = *it; - unprocessed_files.erase(it); + + while (it != unprocessed_files.end()) + { + auto last_activity = last_node_activity.find(it->second); + if (lock_object_storage_task_distribution_us <= 0 + || last_activity == last_node_activity.end() + || activity_limit > last_activity->second) + { + String next_file = it->first; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + oldest_activity = std::min(oldest_activity, last_activity->second); + ++it; + } LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}", - next_file, - number_of_current_replica + "No unprocessed file for replica {}, need to retry after {} us", + number_of_current_replica, + oldest_activity - activity_limit ); - return next_file; + /// All unprocessed files owned by alive replicas with recenlty activity + /// Need to retry after (oldest_activity - activity_limit) microseconds + RelativePathWithMetadata::CommandInTaskResponse response; + response.set_retry_after_us(oldest_activity - activity_limit); + return response.to_string(); } return std::nullopt; } +void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) +{ + Poco::Timestamp now; + std::lock_guard lock(mutex); + last_node_activity[number_of_current_replica] = now; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 678ff4372f5f..2132ba95a752 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -5,7 +5,11 @@ #include #include #include + +#include + #include +#include #include #include #include @@ -18,7 +22,8 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::vector ids_of_nodes_); + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_); std::optional getNextTask(size_t number_of_current_replica); @@ -28,12 +33,17 @@ class StorageObjectStorageStableTaskDistributor std::optional getMatchingFileFromIterator(size_t number_of_current_replica); std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + void saveLastNodeActivity(size_t number_of_current_replica); + std::shared_ptr iterator; std::vector> connection_to_files; - std::unordered_set unprocessed_files; + /// Map of unprocessed files in format filename => number of prefetched replica + std::unordered_map unprocessed_files; std::vector ids_of_nodes; + std::unordered_map last_node_activity; + Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; std::mutex mutex; bool iterator_exhausted = false; diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index da667fad35b9..b7ce32c8b825 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -15,12 +15,12 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -def create_buckets_s3(cluster): +def create_buckets_s3(cluster, files=1000): minio = cluster.minio_client s3_data = [] - for file_number in range(1000): + for file_number in range(files): file_name = f"data/generated/file_{file_number}.csv" os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) s3_data.append(file_name) @@ -60,6 +60,7 @@ def started_cluster(): macros={"replica": f"clickhouse{i}"}, with_minio=True, with_zookeeper=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -70,13 +71,22 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() -def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): for host in list(cluster.instances.values()): - host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", timeout=30) + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) + + settings = { + "enable_filesystem_cache": enable_filesystem_cache, + "filesystem_cache_name": "'raw_s3_cache'", + } + + if lock_object_storage_task_distribution_ms > 0: + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms query_id_first = str(uuid.uuid4()) result_first = node.query( @@ -84,12 +94,9 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SELECT count(*) FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 - SETTINGS - enable_filesystem_cache={enable_filesystem_cache}, - filesystem_cache_name='raw_s3_cache' + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_first, - timeout=30, ) assert result_first == expected_result query_id_second = str(uuid.uuid4()) @@ -98,18 +105,14 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SELECT count(*) FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 - SETTINGS - enable_filesystem_cache={enable_filesystem_cache}, - filesystem_cache_name='raw_s3_cache' + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_second, - timeout=30, ) assert result_second == expected_result - node.query("SYSTEM FLUSH LOGS", timeout=30) - node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}", timeout=30) - node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}", timeout=30) + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") s3_get_first = node.query( f""" @@ -118,7 +121,6 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, WHERE type='QueryFinish' AND initial_query_id='{query_id_first}' """, - timeout=30, ) s3_get_second = node.query( f""" @@ -127,25 +129,26 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, WHERE type='QueryFinish' AND initial_query_id='{query_id_second}' """, - timeout=30, ) return int(s3_get_first), int(s3_get_second) -def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): # Repeat test several times to get average result - iterations = 10 + iterations = 1 if lock_object_storage_task_distribution_ms > 0 else 10 s3_get_first_sum = 0 s3_get_second_sum = 0 for _ in range(iterations): - (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache) + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms) s3_get_first_sum += s3_get_first s3_get_second_sum += s3_get_second return s3_get_first_sum, s3_get_second_sum -def test_cache_locality(started_cluster): +@pytest.mark.parametrize("lock_object_storage_task_distribution_ms ", [0, 30000]) +def test_cache_locality(started_cluster, lock_object_storage_task_distribution_ms): node = started_cluster.instances["clickhouse0"] expected_result = node.query( @@ -157,36 +160,36 @@ def test_cache_locality(started_cluster): ) # Algorithm does not give 100% guarantee, so add 10% on dispersion - dispersion = 0.1 + dispersion = 0.0 if lock_object_storage_task_distribution_ms > 0 else 0.1 # No cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock_object_storage_task_distribution_ms) assert s3_get_second == s3_get_first # With cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion # Different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion # No last node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 # No first node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 # No first node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # Add new node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # New node and old node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1) - assert s3_get_second <= s3_get_first * (0.4375 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000