diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index cb2286cc28e8..904c100dc555 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -83,6 +83,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,10 @@ # include #endif +#if USE_PARQUET +# include +#endif + #include /// A minimal file used when the server is run without installation @@ -326,6 +331,11 @@ namespace ServerSetting extern const ServerSettingsUInt64 os_cpu_busy_time_threshold; extern const ServerSettingsFloat min_os_cpu_wait_time_ratio_to_drop_connection; extern const ServerSettingsFloat max_os_cpu_wait_time_ratio_to_drop_connection; + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_size; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries; + extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl; + } namespace ErrorCodes @@ -413,6 +423,7 @@ namespace ErrorCodes extern const int NETWORK_ERROR; extern const int CORRUPTED_DATA; extern const int BAD_ARGUMENTS; + extern const int STARTUP_SCRIPTS_ERROR; } @@ -2421,8 +2432,16 @@ try if (dns_cache_updater) dns_cache_updater->start(); + ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]); + ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]); + ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]); + auto replicas_reconnector = ReplicasReconnector::init(global_context); +#if USE_PARQUET + ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]); +#endif + /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database]; diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 961a90283160..6a951560586f 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -184,6 +184,8 @@ enum class AccessType : uint8_t M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 687969b5f678..14cadc3b3587 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1041,7 +1041,12 @@ The server successfully detected this situation and will download merged part fr M(IndexGenericExclusionSearchAlgorithm, "Number of times the generic exclusion search algorithm is used over the index marks", ValueType::Number) \ M(ParallelReplicasQueryCount, "Number of (sub)queries executed using parallel replicas during a query execution", ValueType::Number) \ M(DistributedConnectionReconnectCount, "Number of reconnects to other servers done during distributed query execution. It can happen when a stale connection has been acquired from connection pool", ValueType::Number) \ - + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ + M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \ + M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \ + M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \ + M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Common/TTLCachePolicy.h b/src/Common/TTLCachePolicy.h index 2070718c87c8..92d78bf7c0aa 100644 --- a/src/Common/TTLCachePolicy.h +++ b/src/Common/TTLCachePolicy.h @@ -271,10 +271,10 @@ class TTLCachePolicy : public ICachePolicy; Cache cache; - +private: /// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators /// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a /// binary search on the sorted container and erase all left of the found key. diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index b46a51774847..2e356794d79d 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1351,8 +1351,7 @@ Limits the size of the blocks formed during data parsing in input formats in byt DECLARE(Bool, input_format_parquet_allow_geoparquet_parser, true, R"( Use geo column parser to convert Array(UInt8) into Point/Linestring/Polygon/MultiLineString/MultiPolygon types )", 0) \ - - + DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index fc964e21892f..c73cbfce17f2 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1064,7 +1064,10 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ See [Controlling behavior on server CPU overload](/operations/settings/server-overload) for more details. )", 0) \ DECLARE(Float, distributed_cache_keep_up_free_connections_ratio, 0.1f, "Soft limit for number of active connection distributed cache will try to keep free. After the number of free connections goes below distributed_cache_keep_up_free_connections_ratio * max_connections, connections with oldest activity will be closed until the number goes above the limit.", 0) \ - + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \ + DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \ + DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \ + DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) // clang-format on diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 5070f4fbdfe6..7897de1176ee 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6865,6 +6865,9 @@ Default number of buckets for distributed shuffle-hash-join. )", EXPERIMENTAL) \ DECLARE(UInt64, distributed_plan_default_reader_bucket_count, 8, R"( Default number of tasks for parallel reading in distributed query. Tasks are spread across between replicas. +)", EXPERIMENTAL) \ + DECLARE(Bool, use_object_storage_list_objects_cache, false, R"( +Cache the list of objects returned by list objects calls in object storage )", EXPERIMENTAL) \ DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ebc3d26dcf08..37b479108d0e 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -174,6 +174,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_hash_join_threshold", 0, 0, "New setting"}, {"function_date_trunc_return_type_behavior", 1, 0, "Change the result type for dateTrunc function for DateTime64/Date32 arguments to DateTime64/Date32 regardless of time unit to get correct result for negative values"}, /// Release closed. Please use 25.5 + // Altinity Antalya modifications atop of 25.2 + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, + {"use_object_storage_list_objects_cache", true, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.3", { @@ -192,6 +196,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_hash_join_threshold", 0, 0, "New setting"}, /// Release closed. Please use 25.4 }); + addSettingsChanges(settings_changes_history, "24.12.2.20000", + { + // Altinity Antalya modifications atop of 24.12 + {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 + }); addSettingsChanges(settings_changes_history, "25.2", { /// Release closed. Please use 25.3 diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index 5a23deb9b65b..a958907771cc 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -35,6 +35,8 @@ class AzureObjectStorage : public IObjectStorage const String & description_, const String & common_key_prefix_); + bool supportsListObjectsCache() override { return true; } + void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; /// Sanitizer build may crash with max_keys=1; this looks like a false positive. diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index bdaa391dd729..2e674d41e9a3 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -101,6 +101,23 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } +void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) +{ + if (!metadata) + { + const auto & path = isArchive() ? getPathToArchive() : getPath(); + + if (ignore_non_existent_file) + { + metadata = object_storage->tryGetObjectMetadata(path); + } + else + { + metadata = object_storage->getObjectMetadata(path); + } + } +} + RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) { Poco::JSON::Parser parser; diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 42734b52ec51..e4b1d48771d4 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -143,6 +143,7 @@ struct RelativePathWithMetadata virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } + void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file); const CommandInTaskResponse & getCommand() const { return command; } }; @@ -339,6 +340,8 @@ class IObjectStorage #endif + virtual bool supportsListObjectsCache() { return false; } + private: mutable std::mutex throttlers_mutex; ThrottlerPtr remote_read_throttler; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index 7b64d3b0f9fe..a7efb0809984 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -61,6 +61,8 @@ class S3ObjectStorage : public IObjectStorage ObjectStorageType getType() const override { return ObjectStorageType::S3; } + bool supportsListObjectsCache() override { return true; } + bool exists(const StoredObject & object) const override; std::unique_ptr readObject( /// NOLINT diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index f11e9598308e..ca029ae076b0 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -79,6 +80,10 @@ #include #endif +#if USE_PARQUET +#include +#endif + #if USE_AWS_S3 #include #endif @@ -433,6 +438,21 @@ BlockIO InterpreterSystemQuery::execute() getContext()->clearQueryResultCache(query.query_result_cache_tag); break; } + case Type::DROP_PARQUET_METADATA_CACHE: +#if USE_PARQUET + getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE); + ParquetFileMetaDataCache::instance()->clear(); + break; +#else + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet"); +#endif + + case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: + { + getContext()->checkAccess(AccessType::SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE); + ObjectStorageListObjectsCache::instance().clear(); + break; + } case Type::DROP_COMPILED_EXPRESSION_CACHE: #if USE_EMBEDDED_COMPILER getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE); @@ -1533,6 +1553,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_PAGE_CACHE: case Type::DROP_SCHEMA_CACHE: case Type::DROP_FORMAT_SCHEMA_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: + case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: case Type::DROP_S3_CLIENT_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index fac766a01180..4c463107a0c3 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -467,6 +467,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_COMPILED_EXPRESSION_CACHE: case Type::DROP_S3_CLIENT_CACHE: case Type::DROP_ICEBERG_METADATA_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: + case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: case Type::RESET_COVERAGE: case Type::RESTART_REPLICAS: case Type::JEMALLOC_PURGE: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 5d650df64ff3..13af5fe76ea0 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -42,6 +42,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster DROP_SCHEMA_CACHE, DROP_FORMAT_SCHEMA_CACHE, DROP_S3_CLIENT_CACHE, + DROP_PARQUET_METADATA_CACHE, + DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, STOP_LISTEN, START_LISTEN, RESTART_REPLICAS, diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index c59f1e714038..70fdd9a3e23a 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -70,6 +70,9 @@ class IInputFormat : public SourceWithKeyCondition void needOnlyCount() { need_only_count = true; } + /// Set additional info/key/id related to underlying storage of the ReadBuffer + virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {} + protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 67994192a6b3..22bfc9b7d595 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -3,6 +3,9 @@ #if USE_PARQUET #include +#include +#include +#include #include #include #include @@ -29,6 +32,7 @@ #include #include #include +#include #include #include @@ -38,6 +42,8 @@ namespace ProfileEvents extern const Event ParquetFetchWaitTimeMicroseconds; extern const Event ParquetReadRowGroups; extern const Event ParquetPrunedRowGroups; + extern const Event ParquetMetaDataCacheHits; + extern const Event ParquetMetaDataCacheMisses; } namespace CurrentMetrics @@ -54,6 +60,16 @@ namespace CurrentMetrics namespace DB { +namespace Setting +{ + extern const SettingsBool input_format_parquet_use_metadata_cache; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; +} + namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -513,6 +529,49 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() +{ + createArrowFileIfNotCreated(); + return parquet::ReadMetaData(arrow_file); +} + +std::shared_ptr ParquetBlockInputFormat::getFileMetaData() +{ + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + return readMetadataFromFile(); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + return readMetadataFromFile(); + } + ); + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + return parquet_file_metadata; +} + +void ParquetBlockInputFormat::createArrowFileIfNotCreated() +{ + if (arrow_file) + { + return; + } + + // Create arrow file adapter. + // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that + // we'll need to read (which we know in advance). Use max_download_threads for that. + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); +} + std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) { std::unordered_set column_keys; @@ -612,7 +671,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + metadata = getFileMetaData(); const bool prefetch_group = supportPrefetch(); std::shared_ptr schema; @@ -712,6 +771,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } + bool has_row_groups_to_read = false; + auto skip_row_group_based_on_filters = [&](int row_group) { if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) @@ -763,7 +824,20 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += row_group_size; auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + + has_row_groups_to_read = true; } + + if (has_row_groups_to_read) + { + createArrowFileIfNotCreated(); + } +} + +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index f941bb70ebce..a1b7eade4303 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -72,6 +72,8 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + private: Chunk read() override; @@ -90,6 +92,11 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); + void createArrowFileIfNotCreated(); + std::shared_ptr readMetadataFromFile(); + + std::shared_ptr getFileMetaData(); + inline bool supportPrefetch() const; // Data layout in the file: @@ -338,6 +345,13 @@ class ParquetBlockInputFormat : public IInputFormat std::exception_ptr background_exception = nullptr; std::atomic is_stopped{0}; bool is_initialized = false; + struct Cache + { + String key; + bool use_cache = false; + }; + + Cache metadata_cache; }; class ParquetSchemaReader : public ISchemaReader diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp new file mode 100644 index 000000000000..da8ad825f505 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp @@ -0,0 +1,20 @@ +#include + +#if USE_PARQUET + +namespace DB +{ + +ParquetFileMetaDataCache::ParquetFileMetaDataCache() + : CacheBase(CurrentMetrics::end(), CurrentMetrics::end(), 0) +{} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance() +{ + static ParquetFileMetaDataCache instance; + return &instance; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h new file mode 100644 index 000000000000..fb5fc1bb0217 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h @@ -0,0 +1,30 @@ +#pragma once + +#include "config.h" + +#if USE_PARQUET + +namespace parquet +{ + +class FileMetaData; + +} + +#include + +namespace DB +{ + +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(); + +private: + ParquetFileMetaDataCache(); +}; + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index d9b55b2fc79c..ccdaf4274ab4 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -22,8 +22,17 @@ #include #include "ArrowBufferedStreams.h" #include +#include +#include +#include +namespace ProfileEvents +{ +extern const Event ParquetMetaDataCacheHits; +extern const Event ParquetMetaDataCacheMisses; +} + namespace DB { @@ -32,6 +41,11 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace Setting +{ +extern const SettingsBool input_format_parquet_use_metadata_cache; +} + static NamesAndTypesList getHeaderForParquetMetadata() { NamesAndTypesList names_and_types{ @@ -130,10 +144,35 @@ void checkHeader(const Block & header) static std::shared_ptr getFileMetadata( ReadBuffer & in, const FormatSettings & format_settings, - std::atomic & is_stopped) + std::atomic & is_stopped, + ParquetMetadataInputFormat::Cache metadata_cache) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + ); + + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + + return parquet_file_metadata; + + } ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_) @@ -148,7 +187,7 @@ Chunk ParquetMetadataInputFormat::read() if (done) return res; - auto metadata = getFileMetadata(*in, format_settings, is_stopped); + auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache); const auto & header = getPort().getHeader(); auto names_and_types = getHeaderForParquetMetadata(); @@ -489,6 +528,12 @@ void ParquetMetadataInputFormat::resetParser() done = false; } +void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; +} + ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_) { diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 5d2d89898596..5cd3f4173bf5 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat void resetParser() override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + + struct Cache + { + String key; + bool use_cache = false; + }; + private: Chunk read() override; @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat const FormatSettings format_settings; bool done = false; std::atomic is_stopped{0}; + + Cache metadata_cache; }; class ParquetMetadataSchemaReader : public ISchemaReader diff --git a/src/Storages/Cache/ObjectStorageListObjectsCache.cpp b/src/Storages/Cache/ObjectStorageListObjectsCache.cpp new file mode 100644 index 000000000000..a7aec57d9161 --- /dev/null +++ b/src/Storages/Cache/ObjectStorageListObjectsCache.cpp @@ -0,0 +1,210 @@ +#include +#include +#include +#include + +namespace ProfileEvents +{ +extern const Event ObjectStorageListObjectsCacheHits; +extern const Event ObjectStorageListObjectsCacheMisses; +extern const Event ObjectStorageListObjectsCacheExactMatchHits; +extern const Event ObjectStorageListObjectsCachePrefixMatchHits; +} + +namespace DB +{ + +template +class ObjectStorageListObjectsCachePolicy : public TTLCachePolicy +{ +public: + using BasePolicy = TTLCachePolicy; + using typename BasePolicy::MappedPtr; + using typename BasePolicy::KeyMapped; + using BasePolicy::cache; + + ObjectStorageListObjectsCachePolicy() + : BasePolicy(CurrentMetrics::end(), CurrentMetrics::end(), std::make_unique()) + { + } + + std::optional getWithKey(const Key & key) override + { + if (const auto it = cache.find(key); it != cache.end()) + { + if (!IsStaleFunction()(it->first)) + { + return std::make_optional({it->first, it->second}); + } + // found a stale entry, remove it but don't return. We still want to perform the prefix matching search + BasePolicy::remove(it->first); + } + + if (const auto it = findBestMatchingPrefixAndRemoveExpiredEntries(key); it != cache.end()) + { + return std::make_optional({it->first, it->second}); + } + + return std::nullopt; + } + +private: + auto findBestMatchingPrefixAndRemoveExpiredEntries(Key key) + { + while (!key.prefix.empty()) + { + if (const auto it = cache.find(key); it != cache.end()) + { + if (IsStaleFunction()(it->first)) + { + BasePolicy::remove(it->first); + } + else + { + return it; + } + } + + key.prefix.pop_back(); + } + + return cache.end(); + } +}; + +ObjectStorageListObjectsCache::Key::Key( + const String & storage_description_, + const String & bucket_, + const String & prefix_, + const std::chrono::steady_clock::time_point & expires_at_, + std::optional user_id_) + : storage_description(storage_description_), bucket(bucket_), prefix(prefix_), expires_at(expires_at_), user_id(user_id_) {} + +bool ObjectStorageListObjectsCache::Key::operator==(const Key & other) const +{ + return storage_description == other.storage_description && bucket == other.bucket && prefix == other.prefix; +} + +size_t ObjectStorageListObjectsCache::KeyHasher::operator()(const Key & key) const +{ + std::size_t seed = 0; + + boost::hash_combine(seed, key.storage_description); + boost::hash_combine(seed, key.bucket); + boost::hash_combine(seed, key.prefix); + + return seed; +} + +bool ObjectStorageListObjectsCache::IsStale::operator()(const Key & key) const +{ + return key.expires_at < std::chrono::steady_clock::now(); +} + +size_t ObjectStorageListObjectsCache::WeightFunction::operator()(const Value & value) const +{ + std::size_t weight = 0; + + for (const auto & object : value) + { + const auto object_metadata = object->metadata; + weight += object->relative_path.capacity() + sizeof(object_metadata); + + // variable size + if (object_metadata) + { + weight += object_metadata->etag.capacity(); + weight += object_metadata->attributes.size() * (sizeof(std::string) * 2); + + for (const auto & [k, v] : object_metadata->attributes) + { + weight += k.capacity() + v.capacity(); + } + } + } + + return weight; +} + +ObjectStorageListObjectsCache::ObjectStorageListObjectsCache() + : cache(std::make_unique>()) +{ +} + +void ObjectStorageListObjectsCache::set( + const Key & key, + const std::shared_ptr & value) +{ + auto key_with_ttl = key; + key_with_ttl.expires_at = std::chrono::steady_clock::now() + std::chrono::seconds(ttl_in_seconds); + + cache.set(key_with_ttl, value); +} + +void ObjectStorageListObjectsCache::clear() +{ + cache.clear(); +} + +std::optional ObjectStorageListObjectsCache::get(const Key & key, bool filter_by_prefix) +{ + const auto pair = cache.getWithKey(key); + + if (!pair) + { + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheMisses); + return {}; + } + + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheHits); + + if (pair->key == key) + { + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheExactMatchHits); + return *pair->mapped; + } + + ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCachePrefixMatchHits); + + if (!filter_by_prefix) + { + return *pair->mapped; + } + + Value filtered_objects; + + filtered_objects.reserve(pair->mapped->size()); + + for (const auto & object : *pair->mapped) + { + if (object->relative_path.starts_with(key.prefix)) + { + filtered_objects.push_back(object); + } + } + + return filtered_objects; +} + +void ObjectStorageListObjectsCache::setMaxSizeInBytes(std::size_t size_in_bytes_) +{ + cache.setMaxSizeInBytes(size_in_bytes_); +} + +void ObjectStorageListObjectsCache::setMaxCount(std::size_t count) +{ + cache.setMaxCount(count); +} + +void ObjectStorageListObjectsCache::setTTL(std::size_t ttl_in_seconds_) +{ + ttl_in_seconds = ttl_in_seconds_; +} + +ObjectStorageListObjectsCache & ObjectStorageListObjectsCache::instance() +{ + static ObjectStorageListObjectsCache instance; + return instance; +} + +} diff --git a/src/Storages/Cache/ObjectStorageListObjectsCache.h b/src/Storages/Cache/ObjectStorageListObjectsCache.h new file mode 100644 index 000000000000..6cb6c3694d93 --- /dev/null +++ b/src/Storages/Cache/ObjectStorageListObjectsCache.h @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class ObjectStorageListObjectsCache +{ + friend class ObjectStorageListObjectsCacheTest; +public: + ObjectStorageListObjectsCache(const ObjectStorageListObjectsCache &) = delete; + ObjectStorageListObjectsCache(ObjectStorageListObjectsCache &&) noexcept = delete; + + ObjectStorageListObjectsCache& operator=(const ObjectStorageListObjectsCache &) = delete; + ObjectStorageListObjectsCache& operator=(ObjectStorageListObjectsCache &&) noexcept = delete; + + static ObjectStorageListObjectsCache & instance(); + + struct Key + { + Key( + const String & storage_description_, + const String & bucket_, + const String & prefix_, + const std::chrono::steady_clock::time_point & expires_at_ = std::chrono::steady_clock::now(), + std::optional user_id_ = std::nullopt); + + std::string storage_description; + std::string bucket; + std::string prefix; + std::chrono::steady_clock::time_point expires_at; + std::optional user_id; + + bool operator==(const Key & other) const; + }; + + using Value = StorageObjectStorage::ObjectInfos; + struct KeyHasher + { + size_t operator()(const Key & key) const; + }; + + struct IsStale + { + bool operator()(const Key & key) const; + }; + + struct WeightFunction + { + size_t operator()(const Value & value) const; + }; + + using Cache = CacheBase; + + void set( + const Key & key, + const std::shared_ptr & value); + + std::optional get(const Key & key, bool filter_by_prefix = true); + + void clear(); + + void setMaxSizeInBytes(std::size_t size_in_bytes_); + void setMaxCount(std::size_t count); + void setTTL(std::size_t ttl_in_seconds_); + +private: + ObjectStorageListObjectsCache(); + + Cache cache; + size_t ttl_in_seconds {0}; +}; + +} diff --git a/src/Storages/Cache/tests/gtest_object_storage_list_objects_cache.cpp b/src/Storages/Cache/tests/gtest_object_storage_list_objects_cache.cpp new file mode 100644 index 000000000000..3b719d4df3e3 --- /dev/null +++ b/src/Storages/Cache/tests/gtest_object_storage_list_objects_cache.cpp @@ -0,0 +1,160 @@ +#include +#include +#include +#include + +namespace DB +{ + +class ObjectStorageListObjectsCacheTest : public ::testing::Test +{ +protected: + void SetUp() override + { + cache = std::unique_ptr(new ObjectStorageListObjectsCache()); + cache->setTTL(3); + cache->setMaxCount(100); + cache->setMaxSizeInBytes(1000000); + } + + std::unique_ptr cache; + static ObjectStorageListObjectsCache::Key default_key; + + static std::shared_ptr createTestValue(const std::vector& paths) + { + auto value = std::make_shared(); + for (const auto & path : paths) + { + value->push_back(std::make_shared(path)); + } + return value; + } +}; + +ObjectStorageListObjectsCache::Key ObjectStorageListObjectsCacheTest::default_key {"default", "test-bucket", "test-prefix/"}; + +TEST_F(ObjectStorageListObjectsCacheTest, BasicSetAndGet) +{ + cache->clear(); + auto value = createTestValue({"test-prefix/file1.txt", "test-prefix/file2.txt"}); + + cache->set(default_key, value); + + auto result = cache->get(default_key).value(); + + ASSERT_EQ(result.size(), 2); + EXPECT_EQ(result[0]->getPath(), "test-prefix/file1.txt"); + EXPECT_EQ(result[1]->getPath(), "test-prefix/file2.txt"); +} + +TEST_F(ObjectStorageListObjectsCacheTest, CacheMiss) +{ + cache->clear(); + + EXPECT_FALSE(cache->get(default_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, ClearCache) +{ + cache->clear(); + auto value = createTestValue({"test-prefix/file1.txt", "test-prefix/file2.txt"}); + + cache->set(default_key, value); + cache->clear(); + + EXPECT_FALSE(cache->get(default_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, PrefixMatching) +{ + cache->clear(); + + auto short_prefix_key = default_key; + short_prefix_key.prefix = "parent/"; + + auto mid_prefix_key = default_key; + mid_prefix_key.prefix = "parent/child/"; + + auto long_prefix_key = default_key; + long_prefix_key.prefix = "parent/child/grandchild/"; + + auto value = createTestValue( + { + "parent/child/grandchild/file1.txt", + "parent/child/grandchild/file2.txt"}); + + cache->set(mid_prefix_key, value); + + auto result1 = cache->get(mid_prefix_key).value(); + EXPECT_EQ(result1.size(), 2); + + auto result2 = cache->get(long_prefix_key).value(); + EXPECT_EQ(result2.size(), 2); + + EXPECT_FALSE(cache->get(short_prefix_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, PrefixFiltering) +{ + cache->clear(); + + auto key_with_short_prefix = default_key; + key_with_short_prefix.prefix = "parent/"; + + auto key_with_mid_prefix = default_key; + key_with_mid_prefix.prefix = "parent/child1/"; + + auto value = createTestValue({ + "parent/file1.txt", + "parent/child1/file2.txt", + "parent/child2/file3.txt" + }); + + cache->set(key_with_short_prefix, value); + + auto result = cache->get(key_with_mid_prefix, true).value(); + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0]->getPath(), "parent/child1/file2.txt"); +} + +TEST_F(ObjectStorageListObjectsCacheTest, TTLExpiration) +{ + cache->clear(); + auto value = createTestValue({"test-prefix/file1.txt"}); + + cache->set(default_key, value); + + // Verify we can get it immediately + auto result1 = cache->get(default_key).value(); + EXPECT_EQ(result1.size(), 1); + + std::this_thread::sleep_for(std::chrono::seconds(4)); + + EXPECT_FALSE(cache->get(default_key)); +} + +TEST_F(ObjectStorageListObjectsCacheTest, BestPrefixMatch) +{ + cache->clear(); + + auto short_prefix_key = default_key; + short_prefix_key.prefix = "a/b/"; + + auto mid_prefix_key = default_key; + mid_prefix_key.prefix = "a/b/c/"; + + auto long_prefix_key = default_key; + long_prefix_key.prefix = "a/b/c/d/"; + + auto short_prefix = createTestValue({"a/b/c/d/file1.txt", "a/b/c/file1.txt", "a/b/file2.txt"}); + auto mid_prefix = createTestValue({"a/b/c/d/file1.txt", "a/b/c/file1.txt"}); + + cache->set(short_prefix_key, short_prefix); + cache->set(mid_prefix_key, mid_prefix); + + // should pick mid_prefix, which has size 2. filter_by_prefix=false so we can assert by size + auto result = cache->get(long_prefix_key, false).value(); + EXPECT_EQ(result.size(), 2u); +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index c0dce62becaf..df4f5ed3a45b 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -34,12 +34,16 @@ class KeysIterator : public IObjectIterator return nullptr; auto key = data_files[current_index]; - auto object_metadata = object_storage->getObjectMetadata(key); if (callback) - callback(FileProgress(0, object_metadata.size_bytes)); + { + /// Too expencive to load size for metadata always + /// because it requires API call to external storage. + /// In many cases only keys are needed. + callback(FileProgress(0, 1)); + } - return std::make_shared(key, std::move(object_metadata)); + return std::make_shared(key, std::nullopt); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index e7f7a16858f3..e59d7199f563 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -15,6 +15,7 @@ #include #include "Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h" #include +#include #include #include @@ -104,7 +105,8 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( StorageObjectStorage::ConfigurationPtr configuration_ptr, IcebergMetadataFilesCachePtr cache_ptr, const ContextPtr & local_context, - LoggerPtr log) + LoggerPtr log, + CompressionMethod compression_method) { auto create_fn = [&]() { @@ -115,7 +117,14 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( if (cache_ptr) read_settings.enable_filesystem_cache = false; - auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log, read_settings); + auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log); + + std::unique_ptr buf; + if (compression_method != CompressionMethod::None) + buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method); + else + buf = std::move(source_buf); + String json_str; readJSONObjectPossiblyInvalid(json_str, *buf); return json_str; @@ -274,7 +283,30 @@ Int32 IcebergMetadata::parseTableSchema( } } -static std::pair getMetadataFileAndVersion(const std::string & path) +struct MetadataFileWithInfo +{ + Int32 version; + String path; + CompressionMethod compression_method; +}; + +static CompressionMethod getCompressionMethodFromMetadataFile(const String & path) +{ + constexpr std::string_view metadata_suffix = ".metadata.json"; + + auto compression_method = chooseCompressionMethod(path, "auto"); + + /// NOTE you will be surprised, but some metadata files store compression not in the end of the file name, + /// but somewhere in the middle of the file name, before metadata.json suffix. + /// Maybe history of Iceberg metadata files is not so long, but it is already full of surprises. + /// Example of weird engineering decisions: 00000-85befd5a-69c7-46d4-bca6-cfbd67f0f7e6.gz.metadata.json + if (compression_method == CompressionMethod::None && path.ends_with(metadata_suffix)) + compression_method = chooseCompressionMethod(path.substr(0, path.size() - metadata_suffix.size()), "auto"); + + return compression_method; +} + +static MetadataFileWithInfo getMetadataFileAndVersion(const std::string & path) { String file_name(path.begin() + path.find_last_of('/') + 1, path.end()); String version_str; @@ -289,7 +321,10 @@ static std::pair getMetadataFileAndVersion(const std::string & pa throw Exception( ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); - return std::make_pair(std::stoi(version_str), path); + return MetadataFileWithInfo{ + .version = std::stoi(version_str), + .path = path, + .compression_method = getCompressionMethodFromMetadataFile(path)}; } enum class MostRecentMetadataFileSelectionWay @@ -300,7 +335,7 @@ enum class MostRecentMetadataFileSelectionWay struct ShortMetadataFileInfo { - UInt32 version; + Int32 version; UInt64 last_updated_ms; String path; }; @@ -312,7 +347,7 @@ struct ShortMetadataFileInfo * 1) v.metadata.json, where V - metadata version. * 2) -.metadata.json, where V - metadata version */ -static std::pair getLatestMetadataFileAndVersion( +static MetadataFileWithInfo getLatestMetadataFileAndVersion( const ObjectStoragePtr & object_storage, StorageObjectStorage::ConfigurationPtr configuration_ptr, IcebergMetadataFilesCachePtr cache_ptr, @@ -336,10 +371,10 @@ static std::pair getLatestMetadataFileAndVersion( metadata_files_with_versions.reserve(metadata_files.size()); for (const auto & path : metadata_files) { - auto [version, metadata_file_path] = getMetadataFileAndVersion(path); + auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path); if (need_all_metadata_files_parsing) { - auto metadata_file_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log); + auto metadata_file_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log, compression_method); if (table_uuid.has_value()) { if (metadata_file_object->has(f_table_uuid)) @@ -389,10 +424,11 @@ static std::pair getLatestMetadataFileAndVersion( [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; }); } }(); - return {latest_metadata_file_info.version, latest_metadata_file_info.path}; + + return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)}; } -static std::pair getLatestOrExplicitMetadataFileAndVersion( +static MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( const ObjectStoragePtr & object_storage, StorageObjectStorage::ConfigurationPtr configuration_ptr, IcebergMetadataFilesCachePtr cache_ptr, @@ -459,7 +495,7 @@ bool IcebergMetadata::update(const ContextPtr & local_context) std::lock_guard lock(mutex); - const auto [metadata_version, metadata_file_path] + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get()); bool metadata_file_changed = false; @@ -469,7 +505,7 @@ bool IcebergMetadata::update(const ContextPtr & local_context) metadata_file_changed = true; } - auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log); + auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log, compression_method); chassert(format_version == metadata_object->getValue(f_format_version)); auto previous_snapshot_id = relevant_snapshot_id; @@ -662,9 +698,9 @@ DataLakeMetadataPtr IcebergMetadata::create( else LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false."); - const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, cache_ptr, local_context, log.get()); + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, cache_ptr, local_context, log.get()); - Poco::JSON::Object::Ptr object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log); + Poco::JSON::Object::Ptr object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log, compression_method); auto format_version = object->getValue(f_format_version); return std::make_unique(object_storage, configuration_ptr, local_context, metadata_version, format_version, object, cache_ptr); @@ -734,7 +770,7 @@ IcebergMetadata::IcebergHistory IcebergMetadata::getHistory(ContextPtr local_con { auto configuration_ptr = configuration.lock(); - const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get()); + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get()); chassert([&]() { @@ -742,7 +778,7 @@ IcebergMetadata::IcebergHistory IcebergMetadata::getHistory(ContextPtr local_con return metadata_version == last_metadata_version; }()); - auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log); + auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log, compression_method); chassert([&]() { SharedLockGuard lock(mutex); diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 92527050936c..a99369bb8a34 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -75,10 +75,7 @@ std::optional ReadBufferIterator::tryGetColumnsFromCache( const auto & object_info = (*it); auto get_last_mod_time = [&] -> std::optional { - const auto & path = object_info->isArchive() ? object_info->getPathToArchive() : object_info->getPath(); - if (!object_info->metadata) - object_info->metadata = object_storage->tryGetObjectMetadata(path); - + object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); return object_info->metadata ? std::optional(object_info->metadata->last_modified.epochTime()) : std::nullopt; @@ -150,7 +147,6 @@ std::unique_ptr ReadBufferIterator::recreateLastReadBuffer() { auto context = getContext(); - const auto & path = current_object_info->isArchive() ? current_object_info->getPathToArchive() : current_object_info->getPath(); auto impl = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, context, getLogger("ReadBufferIterator")); const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->getCompressionMethod()); @@ -249,6 +245,8 @@ ReadBufferIterator::Data ReadBufferIterator::next() prev_read_keys_size = read_keys.size(); } + current_object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); + if (query_settings.skip_empty_files && current_object_info->metadata && current_object_info->metadata->size_bytes == 0) continue; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 01b3240f1802..2f21601a984a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -65,6 +65,7 @@ namespace Setting extern const SettingsBool use_iceberg_partition_pruning; extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; extern const SettingsBool table_engine_read_through_distributed_cache; + extern const SettingsBool use_object_storage_list_objects_cache; } namespace ErrorCodes @@ -172,11 +173,36 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( query_settings.ignore_non_existent_file, skip_object_metadata, file_progress_callback); } else + { + std::shared_ptr object_iterator = nullptr; + std::unique_ptr cache_ptr = nullptr; + + if (local_context->getSettingsRef()[Setting::use_object_storage_list_objects_cache] && object_storage->supportsListObjectsCache()) + { + auto & cache = ObjectStorageListObjectsCache::instance(); + ObjectStorageListObjectsCache::Key cache_key {object_storage->getDescription(), configuration->getNamespace(), configuration->getRawPath().cutGlobs(configuration->supportsPartialPathPrefix())}; + + if (auto objects_info = cache.get(cache_key, /*filter_by_prefix=*/ false)) + { + object_iterator = std::make_shared(std::move(*objects_info)); + } + else + { + cache_ptr = std::make_unique(cache, cache_key); + object_iterator = object_storage->iterate(configuration->getRawPath().cutGlobs(configuration->supportsPartialPathPrefix()), query_settings.list_object_keys_size); + } + } + else + { + object_iterator = object_storage->iterate(configuration->getRawPath().cutGlobs(configuration->supportsPartialPathPrefix()), query_settings.list_object_keys_size); + } + /// Iterate through disclosed globs and make a source for each file iterator = std::make_unique( - object_storage, configuration, predicate, virtual_columns, hive_columns, - local_context, is_archive ? nullptr : read_keys, query_settings.list_object_keys_size, - query_settings.throw_on_zero_files_match, file_progress_callback); + object_iterator, configuration, predicate, virtual_columns, hive_columns, + local_context, is_archive ? nullptr : read_keys, + query_settings.throw_on_zero_files_match, file_progress_callback, std::move(cache_ptr)); + } } else if (configuration->supportsFileIterator()) { @@ -470,21 +496,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (object_info->getPath().empty()) return {}; - if (!object_info->metadata) - { - const auto & path = object_info->isArchive() ? object_info->getPathToArchive() : object_info->getPath(); - - if (query_settings.ignore_non_existent_file) - { - auto metadata = object_storage->tryGetObjectMetadata(path); - if (!metadata) - return {}; - - object_info->metadata = metadata; - } - else - object_info->metadata = object_storage->getObjectMetadata(path); - } + object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); @@ -574,6 +586,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); + if (!object_info->getPath().empty()) + input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + builder.init(Pipe(input_format)); if (auto transformer = configuration->getSchemaTransformer(context_, object_info->getPath())) @@ -795,18 +810,18 @@ std::unique_ptr StorageObjectStorageSource::createReadBu } StorageObjectStorageSource::GlobIterator::GlobIterator( - ObjectStoragePtr object_storage_, + const ObjectStorageIteratorPtr & object_storage_iterator_, ConfigurationPtr configuration_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns_, const NamesAndTypesList & hive_columns_, ContextPtr context_, ObjectInfos * read_keys_, - size_t list_object_keys_size, bool throw_on_zero_files_match_, - std::function file_progress_callback_) + std::function file_progress_callback_, + std::unique_ptr list_cache_) : WithContext(context_) - , object_storage(object_storage_) + , object_storage_iterator(object_storage_iterator_) , configuration(configuration_) , virtual_columns(virtual_columns_) , hive_columns(hive_columns_) @@ -815,6 +830,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( , read_keys(read_keys_) , local_context(context_) , file_progress_callback(file_progress_callback_) + , list_cache(std::move(list_cache_)) { const auto & reading_path = configuration->getPathForRead(); if (reading_path.hasGlobs()) @@ -822,8 +838,6 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( const auto & key_with_globs = reading_path; const auto key_prefix = reading_path.cutGlobs(configuration->supportsPartialPathPrefix()); - object_storage_iterator = object_storage->iterate(key_prefix, list_object_keys_size); - matcher = std::make_unique(makeRegexpPatternFromGlobs(key_with_globs.path)); if (!matcher->ok()) { @@ -887,11 +901,21 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne auto result = object_storage_iterator->getCurrentBatchAndScheduleNext(); if (!result.has_value()) { + if (list_cache) + { + list_cache->set(std::move(object_list)); + } is_finished = true; return {}; } new_batch = std::move(result.value()); + + if (list_cache) + { + object_list.insert(object_list.end(), new_batch.begin(), new_batch.end()); + } + for (auto it = new_batch.begin(); it != new_batch.end();) { if (!recursive && !re2::RE2::FullMatch((*it)->getPath(), *matcher)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 6221c1508100..6b08b02f9245 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB @@ -171,17 +172,31 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithContext { public: + struct ListObjectsCacheWithKey + { + ListObjectsCacheWithKey(ObjectStorageListObjectsCache & cache_, const ObjectStorageListObjectsCache::Key & key_) : cache(cache_), key(key_) {} + + void set(ObjectStorageListObjectsCache::Value && value) const + { + cache.set(key, std::make_shared(std::move(value))); + } + + private: + ObjectStorageListObjectsCache & cache; + ObjectStorageListObjectsCache::Key key; + }; + GlobIterator( - ObjectStoragePtr object_storage_, + const ObjectStorageIteratorPtr & object_storage_iterator_, ConfigurationPtr configuration_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns_, const NamesAndTypesList & hive_columns_, ContextPtr context_, ObjectInfos * read_keys_, - size_t list_object_keys_size, bool throw_on_zero_files_match_, - std::function file_progress_callback_ = {}); + std::function file_progress_callback_ = {}, + std::unique_ptr list_cache_ = nullptr); ~GlobIterator() override = default; @@ -194,7 +209,7 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon void createFilterAST(const String & any_key); void fillBufferForKey(const std::string & uri_key); - const ObjectStoragePtr object_storage; + ObjectStorageIteratorPtr object_storage_iterator; const ConfigurationPtr configuration; const NamesAndTypesList virtual_columns; const NamesAndTypesList hive_columns; @@ -206,7 +221,6 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon ObjectInfos object_infos; ObjectInfos * read_keys; ExpressionActionsPtr filter_expr; - ObjectStorageIteratorPtr object_storage_iterator; bool recursive{false}; std::vector expanded_keys; std::vector::iterator expanded_keys_iter; @@ -219,6 +233,8 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon const ContextPtr local_context; std::function file_progress_callback; + std::unique_ptr list_cache; + ObjectInfos object_list; }; class StorageObjectStorageSource::KeysIterator : public IObjectIterator diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 30549ead3554..8adfdaa805ec 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1,5 +1,6 @@ import logging import os +import subprocess import uuid import time from datetime import datetime, timezone @@ -744,7 +745,7 @@ def make_query_from_function( # Cluster Query with node1 as coordinator and storage type as arg select_cluster_with_type_arg, query_id_cluster_with_type_arg = make_query_from_function( run_on_cluster=True, - storage_type_as_arg=True, + storage_type_as_arg=True, ) # Cluster Query with node1 as coordinator and storage type in named collection @@ -2044,7 +2045,7 @@ def test_metadata_file_selection_from_version_hint(started_cluster, format_versi spark.sql( f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)" ) - + # test the case where version_hint.text file contains just the version number with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "w") as f: f.write('5') @@ -3402,3 +3403,42 @@ def execute_spark_query(query: str): table_select_expression = table_creation_expression instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL") + + +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_compressed_metadata(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_compressed_metadata_" + storage_type + "_" + get_uuid_str() + + table_properties = { + "write.metadata.compression": "gzip" + } + + df = spark.createDataFrame([ + (1, "Alice"), + (2, "Bob") + ], ["id", "name"]) + + # for some reason write.metadata.compression is not working :( + df.writeTo(TABLE_NAME) \ + .tableProperty("write.metadata.compression", "gzip") \ + .using("iceberg") \ + .create() + + # manual compression of metadata file before upload, still test some scenarios + subprocess.check_output(f"gzip /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json", shell=True) + + # Weird but compression extension is really in the middle of the file name, not in the end... + subprocess.check_output(f"mv /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json.gz /iceberg_data/default/{TABLE_NAME}/metadata/v1.gz.metadata.json", shell=True) + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") + + assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n" \ No newline at end of file diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 268959bb444c..b504ab6f4aa1 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -133,6 +133,8 @@ SYSTEM DROP PAGE CACHE ['SYSTEM DROP PAGE CACHE','DROP PAGE CACHE'] GLOBAL SYSTE SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE +SYSTEM DROP PARQUET METADATA CACHE ['SYSTEM DROP PARQUET METADATA CACHE'] GLOBAL SYSTEM DROP CACHE +SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE ['SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD USERS ['RELOAD USERS'] GLOBAL SYSTEM RELOAD diff --git a/tests/queries/0_stateless/02995_settings_25_2_3.tsv b/tests/queries/0_stateless/02995_settings_25_2_3.tsv index e926ef502a2a..ca227db9355a 100644 --- a/tests/queries/0_stateless/02995_settings_25_2_3.tsv +++ b/tests/queries/0_stateless/02995_settings_25_2_3.tsv @@ -538,6 +538,7 @@ input_format_parquet_bloom_filter_push_down 0 input_format_parquet_case_insensitive_column_matching 0 input_format_parquet_enable_row_group_prefetch 1 input_format_parquet_filter_push_down 1 +input_format_parquet_use_metadata_cache 1 input_format_parquet_import_nested 0 input_format_parquet_local_file_min_bytes_for_seek 8192 input_format_parquet_max_block_size 65409 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference new file mode 100644 index 000000000000..c87ad9008b60 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -0,0 +1,8 @@ +10 +10 +10 +10 +10 +10 +0 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql new file mode 100644 index 000000000000..2a1934e7c963 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -0,0 +1,61 @@ +-- Tags: no-parallel, no-fasttest + +DROP TABLE IF EXISTS t_parquet_03262; + +CREATE TABLE t_parquet_03262 (a UInt64) +ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) +PARTITION BY a; + +INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata) +SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_format_metadata_cache'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_format_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SYSTEM DROP PARQUET METADATA CACHE; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache_cache_empty'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ParquetMetaDataCacheMisses'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +DROP TABLE t_parquet_03262; diff --git a/tests/queries/0_stateless/03377_object_storage_list_objects_cache.reference b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.reference new file mode 100644 index 000000000000..76535ad25106 --- /dev/null +++ b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.reference @@ -0,0 +1,103 @@ +-- { echoOn } + +-- The cached key should be `dir_`, and that includes all three files: 1, 2 and 3. Cache should return all three, but ClickHouse should filter out the third. +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_{1..2}.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; +test/dir_a/dir_b/t_03377_sample_1.parquet 1 +test/dir_a/dir_b/t_03377_sample_2.parquet 2 +-- Make sure the filtering did not interfere with the cached values +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_*.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; +test/dir_a/dir_b/t_03377_sample_1.parquet 1 +test/dir_a/dir_b/t_03377_sample_2.parquet 2 +test/dir_a/dir_b/t_03377_sample_3.parquet 3 +SYSTEM FLUSH LOGS; +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'cold_list_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +0 +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; +1 diff --git a/tests/queries/0_stateless/03377_object_storage_list_objects_cache.sql b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.sql new file mode 100644 index 000000000000..9638faa88d23 --- /dev/null +++ b/tests/queries/0_stateless/03377_object_storage_list_objects_cache.sql @@ -0,0 +1,115 @@ +-- Tags: no-parallel, no-fasttest + +SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE; + +INSERT INTO TABLE FUNCTION s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_{_partition_id}.parquet', format='Parquet', structure='id UInt64') PARTITION BY id SETTINGS s3_truncate_on_insert=1 VALUES (1), (2), (3); + +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='cold_list_cache'; +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='warm_list_exact_cache'; +SELECT * FROM s3(s3_conn, filename='dir_a/dir_b**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='warm_list_prefix_match_cache'; +SELECT * FROM s3(s3_conn, filename='dirr_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='warm_list_cache_miss'; -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE } +SELECT * FROM s3(s3_conn, filename='d**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='even_shorter_prefix'; +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='still_exact_match_after_shorter_prefix'; +SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE; +SELECT * FROM s3(s3_conn, filename='dir_**.parquet') Format Null SETTINGS use_object_storage_list_objects_cache=1, log_comment='after_drop'; + +-- { echoOn } + +-- The cached key should be `dir_`, and that includes all three files: 1, 2 and 3. Cache should return all three, but ClickHouse should filter out the third. +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_{1..2}.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; + +-- Make sure the filtering did not interfere with the cached values +SELECT _path, * FROM s3(s3_conn, filename='dir_a/dir_b/t_03377_sample_*.parquet') order by id SETTINGS use_object_storage_list_objects_cache=1; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'cold_list_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_exact_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCachePrefixMatchHits'] > 0 as prefix_match_hit +FROM system.query_log +where log_comment = 'warm_list_prefix_match_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'even_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheExactMatchHits'] > 0 as exact_match_hit +FROM system.query_log +where log_comment = 'still_exact_match_after_shorter_prefix' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheHits'] > 0 as hit +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ObjectStorageListObjectsCacheMisses'] > 0 as miss +FROM system.query_log +where log_comment = 'after_drop' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1;