diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c02cf2e89e8b..fc988feccf89 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -83,6 +83,7 @@ #include #include #include +#include #include #include #include @@ -338,6 +339,9 @@ namespace ServerSetting extern const ServerSettingsBool abort_on_logical_error; extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes; extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_size; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries; } namespace ErrorCodes @@ -348,6 +352,9 @@ namespace ErrorCodes namespace FileCacheSetting { extern const FileCacheSettingsBool load_metadata_asynchronously; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_size; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl; } } @@ -2520,6 +2527,10 @@ try if (dns_cache_updater) dns_cache_updater->start(); + ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]); + ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]); + ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]); + auto replicas_reconnector = ReplicasReconnector::init(global_context); /// Set current database name before loading tables and databases because diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 13a9911c702e..b257119c8c6a 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -320,6 +320,7 @@ enum class AccessType : uint8_t M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index e6dee19233e0..532ae5d2950d 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1135,7 +1135,10 @@ The server successfully detected this situation and will download merged part fr M(AsyncLoggingErrorFileLogDroppedMessages, "How many messages have been dropped from error file log due to the async log queue being full", ValueType::Number) \ M(AsyncLoggingSyslogDroppedMessages, "How many messages have been dropped from the syslog due to the async log queue being full", ValueType::Number) \ M(AsyncLoggingTextLogDroppedMessages, "How many messages have been dropped from text_log due to the async log queue being full", ValueType::Number) \ - + M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \ + M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \ + M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \ + M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Common/TTLCachePolicy.h b/src/Common/TTLCachePolicy.h index 79e34dc2ebd5..140dcc59f1b8 100644 --- a/src/Common/TTLCachePolicy.h +++ b/src/Common/TTLCachePolicy.h @@ -271,10 +271,10 @@ class TTLCachePolicy : public ICachePolicy; Cache cache; - +private: /// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators /// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a /// binary search on the sorted container and erase all left of the found key. diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 004f8a16098c..65fff78c5692 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1139,8 +1139,9 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ DECLARE(UInt64, threadpool_local_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from local filesystem.)", 0) \ DECLARE(NonZeroUInt64, threadpool_remote_fs_reader_pool_size, 250, R"(Number of threads in the Thread pool used for reading from remote filesystem when `remote_filesystem_read_method = 'threadpool'`.)", 0) \ DECLARE(UInt64, threadpool_remote_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from remote filesystem.)", 0) \ - - + DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \ + DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \ + DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) // clang-format on /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 497760a1325d..fde58d5499d5 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6985,6 +6985,9 @@ Write full paths (including s3://) into iceberg metadata files. )", EXPERIMENTAL) \ DECLARE(String, iceberg_metadata_compression_method, "", R"( Method to compress `.metadata.json` file. +)", EXPERIMENTAL) \ + DECLARE(Bool, use_object_storage_list_objects_cache, false, R"( +Cache the list of objects returned by list objects calls in object storage )", EXPERIMENTAL) \ DECLARE(Bool, make_distributed_plan, false, R"( Make distributed query plan. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 1ba731c37bbf..6f69b76c8125 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -110,6 +110,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_lightweight_update", false, true, "Lightweight updates were moved to Beta."}, {"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"}, {"iceberg_metadata_log_level", "none", "none", "New setting."}, + {"use_object_storage_list_objects_cache", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.7", { diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index 5a23deb9b65b..a958907771cc 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -35,6 +35,8 @@ class AzureObjectStorage : public IObjectStorage const String & description_, const String & common_key_prefix_); + bool supportsListObjectsCache() override { return true; } + void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; /// Sanitizer build may crash with max_keys=1; this looks like a false positive. diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 1f3a4278f135..daf5e286012f 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -327,6 +327,14 @@ class IObjectStorage } virtual std::shared_ptr tryGetS3StorageClient() { return nullptr; } #endif + + + virtual bool supportsListObjectsCache() { return false; } + +private: + mutable std::mutex throttlers_mutex; + ThrottlerPtr remote_read_throttler; + ThrottlerPtr remote_write_throttler; }; using ObjectStoragePtr = std::shared_ptr; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index 7b64d3b0f9fe..a7efb0809984 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -61,6 +61,8 @@ class S3ObjectStorage : public IObjectStorage ObjectStorageType getType() const override { return ObjectStorageType::S3; } + bool supportsListObjectsCache() override { return true; } + bool exists(const StoredObject & object) const override; std::unique_ptr readObject( /// NOLINT diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 76a09ca9ff9e..e3f06dc6fc71 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -436,6 +437,12 @@ BlockIO InterpreterSystemQuery::execute() getContext()->clearQueryResultCache(query.query_result_cache_tag); break; } + case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: + { + getContext()->checkAccess(AccessType::SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE); + ObjectStorageListObjectsCache::instance().clear(); + break; + } case Type::DROP_COMPILED_EXPRESSION_CACHE: #if USE_EMBEDDED_COMPILER getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE); @@ -1575,6 +1582,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_PAGE_CACHE: case Type::DROP_SCHEMA_CACHE: case Type::DROP_FORMAT_SCHEMA_CACHE: + case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: case Type::DROP_S3_CLIENT_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 9cdd034f2ca3..e600289b8863 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -484,6 +484,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_COMPILED_EXPRESSION_CACHE: case Type::DROP_S3_CLIENT_CACHE: case Type::DROP_ICEBERG_METADATA_CACHE: + case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: case Type::RESET_COVERAGE: case Type::RESTART_REPLICAS: case Type::JEMALLOC_PURGE: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index cb21d3d12ba2..179fda501177 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -41,6 +41,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster DROP_SCHEMA_CACHE, DROP_FORMAT_SCHEMA_CACHE, DROP_S3_CLIENT_CACHE, + DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, STOP_LISTEN, START_LISTEN, RESTART_REPLICAS, diff --git a/src/Storages/Cache/ObjectStorageListObjectsCache.cpp b/src/Storages/Cache/ObjectStorageListObjectsCache.cpp new file mode 100644 index 000000000000..a7aec57d9161 --- /dev/null +++ b/src/Storages/Cache/ObjectStorageListObjectsCache.cpp @@ -0,0 +1,210 @@ +#include +#include +#include +#include + +namespace ProfileEvents +{ +extern const Event ObjectStorageListObjectsCacheHits; +extern const Event ObjectStorageListObjectsCacheMisses; +extern const Event ObjectStorageListObjectsCacheExactMatchHits; +extern const Event ObjectStorageListObjectsCachePrefixMatchHits; +} + +namespace DB +{ + +template +class ObjectStorageListObjectsCachePolicy : public TTLCachePolicy +{ +public: + using BasePolicy = TTLCachePolicy; + using typename BasePolicy::MappedPtr; + using typename BasePolicy::KeyMapped; + using BasePolicy::cache; + + ObjectStorageListObjectsCachePolicy() + : BasePolicy(CurrentMetrics::end(), CurrentMetrics::end(), std::make_unique()) + { + } + + std::optional getWithKey(const Key & key) override + { + if (const auto it = cache.find(key); it != cache.end()) + { + if (!IsStaleFunction()(it->first)) + { + return std::make_optional({it->first, it->second}); + } + // found a stale entry, remove it but don't return. We still want to perform the prefix matching search + BasePolicy::remove(it->first); + } + + if (const auto it = findBestMatchingPrefixAndRemoveExpiredEntries(key); it != cache.end()) + { + return std::make_optional({it->first, it->second}); + } + + return std::nullopt; + } + +private: + auto findBestMatchingPrefixAndRemoveExpiredEntries(Key key) + { + while (!key.prefix.empty()) + { + if (const auto it = cache.find(key); it != cache.end()) + { + if (IsStaleFunction()(it->first)) + { + BasePolicy::remove(it->first); + } + else + { + return it; + } + } + + key.prefix.pop_back(); + } + + return cache.end(); + } +}; + +ObjectStorageListObjectsCache::Key::Key( + const String & storage_description_, + const String & bucket_, + const String & prefix_, + const std::chrono::steady_clock::time_point & expires_at_, + std::optional user_id_) + : storage_description(storage_description_), bucket(bucket_), prefix(prefix_), expires_at(expires_at_), user_id(user_id_) {} + +bool ObjectStorageListObjectsCache::Key::operator==(const Key & other) const +{ + return storage_description == other.storage_description && bucket == other.bucket && prefix == other.prefix; +} + +size_t ObjectStorageListObjectsCache::KeyHasher::operator()(const Key & key) const +{ + std::size_t seed = 0; + + boost::hash_combine(seed, key.storage_description); + boost::hash_combine(seed, key.bucket); + boost::hash_combine(seed, key.prefix); + + return seed; +} + +bool ObjectStorageListObjectsCache::IsStale::operator()(const Key & key) const +{ + return key.expires_at < std::chrono::steady_clock::now(); +} + +size_t ObjectStorageListObjectsCache::WeightFunction::operator()(const Value & value) const +{ + std::size_t weight = 0; + + for (const auto & object : value) + { + const auto object_metadata = object->metadata; + weight += object->relative_path.capacity() + sizeof(object_metadata); + + // variable size + if (object_metadata) + { + weight += object_metadata->etag.capacity(); + weight += object_metadata->attributes.size() * (sizeof(std::string) * 2); + + for (const auto & [k, v] : object_metadata->attributes) + { + weight += k.capacity() + v.capacity(); + } + } + } + + return weight; +} + +ObjectStorageListObjectsCache::ObjectStorageListObjectsCache() + : cache(std::make_unique>()) +{ +} + +void ObjectStorageListObjectsCache::set( + const Key & key, + const std::shared_ptr & value) +{ + auto key_with_ttl = key; + key_with_ttl.expires_at = std::chrono::steady_clock::now() + std::chrono::seconds(ttl_in_seconds); + + cache.set(key_with_ttl, value); +} + +void ObjectStorageListObjectsCache::clear() +{ + cache.clear(); +} + +std::optional ObjectStorageListObjectsCache::get(const Key & key, bool filter_by_prefix) +{ + const auto pair = cache.getWithKey(key); + + if (!pair) + { + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheMisses); + return {}; + } + + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheHits); + + if (pair->key == key) + { + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheExactMatchHits); + return *pair->mapped; + } + + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCachePrefixMatchHits); + + if (!filter_by_prefix) + { + return *pair->mapped; + } + + Value filtered_objects; + + filtered_objects.reserve(pair->mapped->size()); + + for (const auto & object : *pair->mapped) + { + if (object->relative_path.starts_with(key.prefix)) + { + filtered_objects.push_back(object); + } + } + + return filtered_objects; +} + +void ObjectStorageListObjectsCache::setMaxSizeInBytes(std::size_t size_in_bytes_) +{ + cache.setMaxSizeInBytes(size_in_bytes_); +} + +void ObjectStorageListObjectsCache::setMaxCount(std::size_t count) +{ + cache.setMaxCount(count); +} + +void ObjectStorageListObjectsCache::setTTL(std::size_t ttl_in_seconds_) +{ + ttl_in_seconds = ttl_in_seconds_; +} + +ObjectStorageListObjectsCache & ObjectStorageListObjectsCache::instance() +{ + static ObjectStorageListObjectsCache instance; + return instance; +} + +} diff --git a/src/Storages/Cache/ObjectStorageListObjectsCache.h b/src/Storages/Cache/ObjectStorageListObjectsCache.h new file mode 100644 index 000000000000..6cb6c3694d93 --- /dev/null +++ b/src/Storages/Cache/ObjectStorageListObjectsCache.h @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class ObjectStorageListObjectsCache +{ + friend class ObjectStorageListObjectsCacheTest; +public: + ObjectStorageListObjectsCache(const ObjectStorageListObjectsCache &) = delete; + ObjectStorageListObjectsCache(ObjectStorageListObjectsCache &&) noexcept = delete; + + ObjectStorageListObjectsCache& operator=(const ObjectStorageListObjectsCache &) = delete; + ObjectStorageListObjectsCache& operator=(ObjectStorageListObjectsCache &&) noexcept = delete; + + static ObjectStorageListObjectsCache & instance(); + + struct Key + { + Key( + const String & storage_description_, + const String & bucket_, + const String & prefix_, + const std::chrono::steady_clock::time_point & expires_at_ = std::chrono::steady_clock::now(), + std::optional user_id_ = std::nullopt); + + std::string storage_description; + std::string bucket; + std::string prefix; + std::chrono::steady_clock::time_point expires_at; + std::optional user_id; + + bool operator==(const Key & other) const; + }; + + using Value = StorageObjectStorage::ObjectInfos; + struct KeyHasher + { + size_t operator()(const Key & key) const; + }; + + struct IsStale + { + bool operator()(const Key & key) const; + }; + + struct WeightFunction + { + size_t operator()(const Value & value) const; + }; + + using Cache = CacheBase; + + void set( + const Key & key, + const std::shared_ptr & value); + + std::optional get(const Key & key, bool filter_by_prefix = true); + + void clear(); + + void setMaxSizeInBytes(std::size_t size_in_bytes_); + void setMaxCount(std::size_t count); + void setTTL(std::size_t ttl_in_seconds_); + +private: + ObjectStorageListObjectsCache(); + + Cache cache; + size_t ttl_in_seconds {0}; +}; + +} diff --git a/src/Storages/Cache/tests/gtest_object_storage_list_objects_cache.cpp b/src/Storages/Cache/tests/gtest_object_storage_list_objects_cache.cpp new file mode 100644 index 000000000000..3b719d4df3e3 --- /dev/null +++ b/src/Storages/Cache/tests/gtest_object_storage_list_objects_cache.cpp @@ -0,0 +1,160 @@ +#include +#include +#include +#include + +namespace DB +{ + +class ObjectStorageListObjectsCacheTest : public ::testing::Test +{ +protected: + void SetUp() override + { + cache = std::unique_ptr(new ObjectStorageListObjectsCache()); + cache->setTTL(3); + cache->setMaxCount(100); + cache->setMaxSizeInBytes(1000000); + } + + std::unique_ptr cache; + static ObjectStorageListObjectsCache::Key default_key; + + static std::shared_ptr createTestValue(const std::vector& paths) + { + auto value = std::make_shared(); + for (const auto & path : paths) + { + value->push_back(std::make_shared(path)); + } + return value; + } +}; + +ObjectStorageListObjectsCache::Key ObjectStorageListObjectsCacheTest::default_key {"default", "test-bucket", "test-prefix/"}; + +TEST_F(ObjectStorageListObjectsCacheTest, BasicSetAndGet) +{ + cache->clear(); + auto value = createTestValue({"test-prefix/file1.txt", "test-prefix/file2.txt"}); + + cache->set(default_key, value); + + auto result = cache->get(default_key).value(); + + ASSERT_EQ(result.size(), 2); + EXPECT_EQ(result[0]->getPath(), "test-prefix/file1.txt"); + EXPECT_EQ(result[1]->getPath(), "test-prefix/file2.txt"); +} + +TEST_F(ObjectStorageListObjectsCacheTest, CacheMiss) +{ + cache->clear(); + + EXPECT_FALSE(cache->get(default_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, ClearCache) +{ + cache->clear(); + auto value = createTestValue({"test-prefix/file1.txt", "test-prefix/file2.txt"}); + + cache->set(default_key, value); + cache->clear(); + + EXPECT_FALSE(cache->get(default_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, PrefixMatching) +{ + cache->clear(); + + auto short_prefix_key = default_key; + short_prefix_key.prefix = "parent/"; + + auto mid_prefix_key = default_key; + mid_prefix_key.prefix = "parent/child/"; + + auto long_prefix_key = default_key; + long_prefix_key.prefix = "parent/child/grandchild/"; + + auto value = createTestValue( + { + "parent/child/grandchild/file1.txt", + "parent/child/grandchild/file2.txt"}); + + cache->set(mid_prefix_key, value); + + auto result1 = cache->get(mid_prefix_key).value(); + EXPECT_EQ(result1.size(), 2); + + auto result2 = cache->get(long_prefix_key).value(); + EXPECT_EQ(result2.size(), 2); + + EXPECT_FALSE(cache->get(short_prefix_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, PrefixFiltering) +{ + cache->clear(); + + auto key_with_short_prefix = default_key; + key_with_short_prefix.prefix = "parent/"; + + auto key_with_mid_prefix = default_key; + key_with_mid_prefix.prefix = "parent/child1/"; + + auto value = createTestValue({ + "parent/file1.txt", + "parent/child1/file2.txt", + "parent/child2/file3.txt" + }); + + cache->set(key_with_short_prefix, value); + + auto result = cache->get(key_with_mid_prefix, true).value(); + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0]->getPath(), "parent/child1/file2.txt"); +} + +TEST_F(ObjectStorageListObjectsCacheTest, TTLExpiration) +{ + cache->clear(); + auto value = createTestValue({"test-prefix/file1.txt"}); + + cache->set(default_key, value); + + // Verify we can get it immediately + auto result1 = cache->get(default_key).value(); + EXPECT_EQ(result1.size(), 1); + + std::this_thread::sleep_for(std::chrono::seconds(4)); + + EXPECT_FALSE(cache->get(default_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, BestPrefixMatch) +{ + cache->clear(); + + auto short_prefix_key = default_key; + short_prefix_key.prefix = "a/b/"; + + auto mid_prefix_key = default_key; + mid_prefix_key.prefix = "a/b/c/"; + + auto long_prefix_key = default_key; + long_prefix_key.prefix = "a/b/c/d/"; + + auto short_prefix = createTestValue({"a/b/c/d/file1.txt", "a/b/c/file1.txt", "a/b/file2.txt"}); + auto mid_prefix = createTestValue({"a/b/c/d/file1.txt", "a/b/c/file1.txt"}); + + cache->set(short_prefix_key, short_prefix); + cache->set(mid_prefix_key, mid_prefix); + + // should pick mid_prefix, which has size 2. filter_by_prefix=false so we can assert by size + auto result = cache->get(long_prefix_key, false).value(); + EXPECT_EQ(result.size(), 2u); +} + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index db47a7fc7945..b62cfa1f3ca0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -65,6 +65,7 @@ namespace Setting extern const SettingsBool use_iceberg_partition_pruning; extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; extern const SettingsBool table_engine_read_through_distributed_cache; + extern const SettingsBool use_object_storage_list_objects_cache; } namespace ErrorCodes @@ -182,11 +183,36 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( query_settings.ignore_non_existent_file, skip_object_metadata, file_progress_callback); } else + { + std::shared_ptr object_iterator = nullptr; + std::unique_ptr cache_ptr = nullptr; + + if (local_context->getSettingsRef()[Setting::use_object_storage_list_objects_cache] && object_storage->supportsListObjectsCache()) + { + auto & cache = ObjectStorageListObjectsCache::instance(); + ObjectStorageListObjectsCache::Key cache_key {object_storage->getDescription(), configuration->getNamespace(), configuration->getRawPath().cutGlobs(configuration->supportsPartialPathPrefix())}; + + if (auto objects_info = cache.get(cache_key, /*filter_by_prefix=*/ false)) + { + object_iterator = std::make_shared(std::move(*objects_info)); + } + else + { + cache_ptr = std::make_unique(cache, cache_key); + object_iterator = object_storage->iterate(configuration->getRawPath().cutGlobs(configuration->supportsPartialPathPrefix()), query_settings.list_object_keys_size); + } + } + else + { + object_iterator = object_storage->iterate(configuration->getRawPath().cutGlobs(configuration->supportsPartialPathPrefix()), query_settings.list_object_keys_size); + } + /// Iterate through disclosed globs and make a source for each file iterator = std::make_unique( - object_storage, configuration, predicate, virtual_columns, hive_columns, - local_context, is_archive ? nullptr : read_keys, query_settings.list_object_keys_size, - query_settings.throw_on_zero_files_match, file_progress_callback); + object_iterator, configuration, predicate, virtual_columns, hive_columns, + local_context, is_archive ? nullptr : read_keys, + query_settings.throw_on_zero_files_match, file_progress_callback, std::move(cache_ptr)); + } } else if (configuration->supportsFileIterator()) { @@ -805,18 +831,18 @@ std::unique_ptr createReadBuffer( } StorageObjectStorageSource::GlobIterator::GlobIterator( - ObjectStoragePtr object_storage_, - StorageObjectStorageConfigurationPtr configuration_, + const ObjectStorageIteratorPtr & object_storage_iterator_, + ConfigurationPtr configuration_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns_, const NamesAndTypesList & hive_columns_, ContextPtr context_, ObjectInfos * read_keys_, - size_t list_object_keys_size, bool throw_on_zero_files_match_, - std::function file_progress_callback_) + std::function file_progress_callback_, + std::unique_ptr list_cache_) : WithContext(context_) - , object_storage(object_storage_) + , object_storage_iterator(object_storage_iterator_) , configuration(configuration_) , virtual_columns(virtual_columns_) , hive_columns(hive_columns_) @@ -825,6 +851,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( , read_keys(read_keys_) , local_context(context_) , file_progress_callback(file_progress_callback_) + , list_cache(std::move(list_cache_)) { const auto & reading_path = configuration->getPathForRead(); if (reading_path.hasGlobs()) @@ -832,8 +859,6 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( const auto & key_with_globs = reading_path; const auto key_prefix = reading_path.cutGlobs(configuration->supportsPartialPathPrefix()); - object_storage_iterator = object_storage->iterate(key_prefix, list_object_keys_size); - matcher = std::make_unique(makeRegexpPatternFromGlobs(key_with_globs.path)); if (!matcher->ok()) { @@ -897,11 +922,21 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne auto result = object_storage_iterator->getCurrentBatchAndScheduleNext(); if (!result.has_value()) { + if (list_cache) + { + list_cache->set(std::move(object_list)); + } is_finished = true; return {}; } new_batch = std::move(result.value()); + + if (list_cache) + { + object_list.insert(object_list.end(), new_batch.begin(), new_batch.end()); + } + for (auto it = new_batch.begin(); it != new_batch.end();) { if (!recursive && !re2::RE2::FullMatch((*it)->getPath(), *matcher)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 8145105a09e2..afb7a5f2e8fc 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -10,6 +10,9 @@ #include #include #include +#include + + namespace DB { @@ -175,17 +178,33 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithContext { public: + struct ListObjectsCacheWithKey + { + ListObjectsCacheWithKey(ObjectStorageListObjectsCache & cache_, const ObjectStorageListObjectsCache::Key & key_) : cache(cache_), key(key_) {} + + void set(ObjectStorageListObjectsCache::Value && value) const + { + cache.set(key, std::make_shared(std::move(value))); + } + + private: + ObjectStorageListObjectsCache & cache; + ObjectStorageListObjectsCache::Key key; + }; + + using ConfigurationPtr = std::shared_ptr; + GlobIterator( - ObjectStoragePtr object_storage_, - StorageObjectStorageConfigurationPtr configuration_, + const ObjectStorageIteratorPtr & object_storage_iterator_, + ConfigurationPtr configuration_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns_, const NamesAndTypesList & hive_columns_, ContextPtr context_, ObjectInfos * read_keys_, - size_t list_object_keys_size, bool throw_on_zero_files_match_, - std::function file_progress_callback_ = {}); + std::function file_progress_callback_ = {}, + std::unique_ptr list_cache_ = nullptr); ~GlobIterator() override = default; @@ -198,7 +217,7 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon void createFilterAST(const String & any_key); void fillBufferForKey(const std::string & uri_key); - const ObjectStoragePtr object_storage; + ObjectStorageIteratorPtr object_storage_iterator; const StorageObjectStorageConfigurationPtr configuration; const NamesAndTypesList virtual_columns; const NamesAndTypesList hive_columns; @@ -210,7 +229,6 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon ObjectInfos object_infos; ObjectInfos * read_keys; ExpressionActionsPtr filter_expr; - ObjectStorageIteratorPtr object_storage_iterator; bool recursive{false}; std::vector expanded_keys; std::vector::iterator expanded_keys_iter; @@ -223,6 +241,8 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon const ContextPtr local_context; std::function file_progress_callback; + std::unique_ptr list_cache; + ObjectInfos object_list; }; class StorageObjectStorageSource::KeysIterator : public IObjectIterator diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 716d0cd00634..55f18b7525fd 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -133,6 +133,7 @@ SYSTEM DROP PAGE CACHE ['SYSTEM DROP PAGE CACHE','DROP PAGE CACHE'] GLOBAL SYSTE SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE +SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE ['SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD USERS ['RELOAD USERS'] GLOBAL SYSTEM RELOAD diff --git a/tests/queries/0_stateless/03377_object_storage_list_objects_cache.reference b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.reference new file mode 100644 index 000000000000..76535ad25106 --- /dev/null +++ b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.reference @@ -0,0 +1,103 @@ +-- { echoOn } + +-- The cached key should be `dir_`, and that includes all three files: 1, 2 and 3. Cache should return all three, but ClickHouse should filter out the third. +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_{1..2}.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; +test/dir_a/dir_b/t_03377_sample_1.parquet 1 +test/dir_a/dir_b/t_03377_sample_2.parquet 2 +-- Make sure the filtering did not interfere with the cached values +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_*.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; +test/dir_a/dir_b/t_03377_sample_1.parquet 1 +test/dir_a/dir_b/t_03377_sample_2.parquet 2 +test/dir_a/dir_b/t_03377_sample_3.parquet 3 +SYSTEM FLUSH LOGS; +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'cold_list_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 diff --git a/tests/queries/0_stateless/03377_object_storage_list_objects_cache.sql b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.sql new file mode 100644 index 000000000000..9638faa88d23 --- /dev/null +++ b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.sql @@ -0,0 +1,115 @@ +-- Tags: no-parallel, no-fasttest + +SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE; + +INSERT INTO TABLE FUNCTION s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_{_partition_id}.parquet', format='Parquet', structure='id UInt64') PARTITION BY id SETTINGS s3_truncate_on_insert=1 VALUES (1), (2), (3); + +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='cold_list_cache'; +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='warm_list_exact_cache'; +SELECT * FROM s3(s3_conn, filename='dir_a/dir_b**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='warm_list_prefix_match_cache'; +SELECT * FROM s3(s3_conn, filename='dirr_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='warm_list_cache_miss'; -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE } +SELECT * FROM s3(s3_conn, filename='d**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='even_shorter_prefix'; +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='still_exact_match_after_shorter_prefix'; +SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE; +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='after_drop'; + +-- { echoOn } + +-- The cached key should be `dir_`, and that includes all three files: 1, 2 and 3. Cache should return all three, but ClickHouse should filter out the third. +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_{1..2}.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; + +-- Make sure the filtering did not interfere with the cached values +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_*.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'cold_list_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1;