diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index 3bd43f410861..6271b698797f 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -65,7 +65,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync } private: - bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override + bool getBatchAndCheckNext(PathsWithMetadata & batch) override { ProfileEvents::increment(ProfileEvents::AzureListObjects); if (client->IsClientForDisk()) @@ -78,7 +78,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync for (const auto & blob : blobs_list) { - batch.emplace_back(std::make_shared( + batch.emplace_back(std::make_shared( blob.Name, ObjectMetadata{ static_cast(blob.BlobSize), @@ -160,7 +160,7 @@ ObjectStorageIteratorPtr AzureObjectStorage::iterate(const std::string & path_pr return std::make_shared(path_prefix, client_ptr, max_keys ? max_keys : settings_ptr->list_object_keys_size); } -void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void AzureObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { auto client_ptr = client.get(); @@ -182,7 +182,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith for (const auto & blob : blobs_list) { - children.emplace_back(std::make_shared( + children.emplace_back(std::make_shared( blob.Name, ObjectMetadata{ static_cast(blob.BlobSize), diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index a958907771cc..1e4ed18b4605 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -37,7 +37,7 @@ class AzureObjectStorage : public IObjectStorage bool supportsListObjectsCache() override { return true; } - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; /// Sanitizer build may crash with max_keys=1; this looks like a false positive. ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override; diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index 92e6d413ac5f..f0ceecefd37a 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -193,7 +193,7 @@ void CachedObjectStorage::copyObject( // NOLINT object_storage->copyObject(object_from, object_to, read_settings, write_settings, object_to_attributes); } -void CachedObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void CachedObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { object_storage->listObjects(path, children, max_keys); } diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h index 68cce9f2ccb6..34c2b7f2054f 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h @@ -64,7 +64,7 @@ class CachedObjectStorage final : public IObjectStorage IObjectStorage & object_storage_to, std::optional object_to_attributes = {}) override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; ObjectMetadata getObjectMetadata(const std::string & path) const override; diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp index 5dea86c49027..443a8b4c6920 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp @@ -167,7 +167,7 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string & path) co return metadata; } -void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void HDFSObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { initializeHDFSFS(); LOG_TEST(log, "Trying to list files for {}", path); @@ -203,7 +203,7 @@ void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithM } else { - children.emplace_back(std::make_shared( + children.emplace_back(std::make_shared( String(file_path), ObjectMetadata{ static_cast(ls.file_info[i].mSize), diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h index 733407e236ef..c3fe9ce0cf6c 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h @@ -92,7 +92,7 @@ class HDFSObjectStorage : public IObjectStorage, public HDFSErrorWrapper const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; String getObjectsNamespace() const override { return ""; } diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index d126bae13da4..a40f4f4c0ddf 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -32,12 +32,12 @@ const MetadataStorageMetrics & IObjectStorage::getMetadataStorageMetrics() const bool IObjectStorage::existsOrHasAnyChild(const std::string & path) const { - RelativePathsWithMetadata files; + PathsWithMetadata files; listObjects(path, files, 1); return !files.empty(); } -void IObjectStorage::listObjects(const std::string &, RelativePathsWithMetadata &, size_t) const +void IObjectStorage::listObjects(const std::string &, PathsWithMetadata &, size_t) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "listObjects() is not supported"); } @@ -45,7 +45,7 @@ void IObjectStorage::listObjects(const std::string &, RelativePathsWithMetadata ObjectStorageIteratorPtr IObjectStorage::iterate(const std::string & path_prefix, size_t max_keys) const { - RelativePathsWithMetadata files; + PathsWithMetadata files; listObjects(path_prefix, files, max_keys); return std::make_shared(std::move(files)); @@ -104,9 +104,15 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } -RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, std::optional metadata_) +PathWithMetadata::PathWithMetadata( + const String & task_string, + std::optional metadata_, + std::optional absolute_path_, + std::optional object_storage_to_use_) : metadata(std::move(metadata_)) , command(task_string) + , absolute_path(absolute_path_) + , object_storage_to_use(object_storage_to_use_) { if (!command.isParsed()) relative_path = task_string; @@ -119,14 +125,20 @@ RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, s } } -RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_) +PathWithMetadata::PathWithMetadata( + const DataFileInfo & info, + std::optional metadata_, + std::optional absolute_path_, + std::optional object_storage_to_use_) : metadata(std::move(metadata_)) + , absolute_path(absolute_path_) + , object_storage_to_use(object_storage_to_use_) { relative_path = info.file_path; file_meta_info = info.file_meta_info; } -void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) +void PathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) { if (!metadata) { @@ -143,7 +155,7 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, boo } } -RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) { Poco::JSON::Parser parser; try @@ -167,7 +179,7 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std } } -std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const +std::string PathWithMetadata::CommandInTaskResponse::toString() const { Poco::JSON::Object json; diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index c455bdd449cd..4a123d8f7ad4 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -106,7 +106,7 @@ struct DataFileInfo; class DataFileMetaInfo; using DataFileMetaInfoPtr = std::shared_ptr; -struct RelativePathWithMetadata +struct PathWithMetadata { class CommandInTaskResponse { @@ -138,18 +138,30 @@ struct RelativePathWithMetadata std::optional metadata; CommandInTaskResponse command; std::optional file_meta_info; + std::optional absolute_path; + std::optional object_storage_to_use = std::nullopt; - RelativePathWithMetadata() = default; + PathWithMetadata() = default; - explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt); - explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_ = std::nullopt); + explicit PathWithMetadata( + const String & task_string, + std::optional metadata_ = std::nullopt, + std::optional absolute_path_ = std::nullopt, + std::optional object_storage_to_use_ = std::nullopt); - virtual ~RelativePathWithMetadata() = default; + explicit PathWithMetadata( + const DataFileInfo & info, + std::optional metadata_ = std::nullopt, + std::optional absolute_path_ = std::nullopt, + std::optional object_storage_to_use_ = std::nullopt); + + virtual ~PathWithMetadata() = default; virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); } virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); } virtual std::string getPath() const { return relative_path; } + virtual std::optional getAbsolutePath() const { return absolute_path; } virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } @@ -160,6 +172,8 @@ struct RelativePathWithMetadata void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file); const CommandInTaskResponse & getCommand() const { return command; } + + std::optional getObjectStorage() const { return object_storage_to_use; } }; struct ObjectKeyWithMetadata @@ -175,8 +189,8 @@ struct ObjectKeyWithMetadata {} }; -using RelativePathWithMetadataPtr = std::shared_ptr; -using RelativePathsWithMetadata = std::vector; +using PathWithMetadataPtr = std::shared_ptr; +using PathsWithMetadata = std::vector; using ObjectKeysWithMetadata = std::vector; class IObjectStorageIterator; @@ -217,7 +231,7 @@ class IObjectStorage virtual bool existsOrHasAnyChild(const std::string & path) const; /// List objects recursively by certain prefix. - virtual void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const; + virtual void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const; /// List objects recursively by certain prefix. Use it instead of listObjects, if you want to list objects lazily. virtual ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const; diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index 2c444954d538..40e83286e416 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -151,7 +151,7 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & path) c return object_metadata; } -void LocalObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t/* max_keys */) const +void LocalObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t/* max_keys */) const { if (!fs::exists(path) || !fs::is_directory(path)) return; @@ -164,7 +164,7 @@ void LocalObjectStorage::listObjects(const std::string & path, RelativePathsWith continue; } - children.emplace_back(std::make_shared(entry.path(), getObjectMetadata(entry.path()))); + children.emplace_back(std::make_shared(entry.path(), getObjectMetadata(entry.path()))); } } diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.h b/src/Disks/ObjectStorages/Local/LocalObjectStorage.h index a8a9fe321894..3b2a8b31d562 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.h +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.h @@ -62,7 +62,7 @@ class LocalObjectStorage : public IObjectStorage ObjectMetadata getObjectMetadata(const std::string & path) const override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; bool existsOrHasAnyChild(const std::string & path) const override; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp b/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp index 168615ab0c79..2a88eb812daf 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp @@ -123,7 +123,7 @@ std::vector MetadataStorageFromPlainObjectStorage::listDirectory(co { auto key_prefix = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */).serialize(); - RelativePathsWithMetadata files; + PathsWithMetadata files; std::string absolute_key = key_prefix; if (!absolute_key.ends_with('/')) absolute_key += '/'; diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.cpp b/src/Disks/ObjectStorages/ObjectStorageIterator.cpp index 3d939ce92302..bec76452b2d5 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.cpp @@ -9,7 +9,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -RelativePathWithMetadataPtr ObjectStorageIteratorFromList::current() +PathWithMetadataPtr ObjectStorageIteratorFromList::current() { if (!isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.h b/src/Disks/ObjectStorages/ObjectStorageIterator.h index d814514ddcc9..b62992f6a719 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.h +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.h @@ -16,10 +16,10 @@ class IObjectStorageIterator virtual bool isValid() = 0; /// Return the current element. - virtual RelativePathWithMetadataPtr current() = 0; + virtual PathWithMetadataPtr current() = 0; /// This will initiate prefetching the next batch in background, so it can be obtained faster when needed. - virtual std::optional getCurrentBatchAndScheduleNext() = 0; + virtual std::optional getCurrentBatchAndScheduleNext() = 0; /// Returns the number of elements in the batches that were fetched so far. virtual size_t getAccumulatedSize() const = 0; @@ -36,7 +36,7 @@ class IObjectStorageIterator /// Return the current batch of elements. /// It is unspecified how batches are formed. /// But this method can be used for more efficient processing. - virtual RelativePathsWithMetadata currentBatch() = 0; + virtual PathsWithMetadata currentBatch() = 0; }; using ObjectStorageIteratorPtr = std::shared_ptr; @@ -45,7 +45,7 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator { public: /// Everything is represented by just a single batch. - explicit ObjectStorageIteratorFromList(RelativePathsWithMetadata && batch_) + explicit ObjectStorageIteratorFromList(PathsWithMetadata && batch_) : batch(std::move(batch_)) , batch_iterator(batch.begin()) {} @@ -59,11 +59,11 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator bool isValid() override { return batch_iterator != batch.end(); } - RelativePathWithMetadataPtr current() override; + PathWithMetadataPtr current() override; - RelativePathsWithMetadata currentBatch() override { return batch; } + PathsWithMetadata currentBatch() override { return batch; } - std::optional getCurrentBatchAndScheduleNext() override + std::optional getCurrentBatchAndScheduleNext() override { if (batch.empty()) return {}; @@ -76,8 +76,8 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator size_t getAccumulatedSize() const override { return batch.size(); } private: - RelativePathsWithMetadata batch; - RelativePathsWithMetadata::iterator batch_iterator; + PathsWithMetadata batch; + PathsWithMetadata::iterator batch_iterator; }; } diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp index 2d2e8cd2c1a5..c488e8596aaa 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp @@ -112,7 +112,7 @@ bool IObjectStorageIteratorAsync::isValid() return !is_finished; } -RelativePathWithMetadataPtr IObjectStorageIteratorAsync::current() +PathWithMetadataPtr IObjectStorageIteratorAsync::current() { std::lock_guard lock(mutex); @@ -123,7 +123,7 @@ RelativePathWithMetadataPtr IObjectStorageIteratorAsync::current() } -RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() +PathsWithMetadata IObjectStorageIteratorAsync::currentBatch() { std::lock_guard lock(mutex); @@ -133,7 +133,7 @@ RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() return current_batch; } -std::optional IObjectStorageIteratorAsync::getCurrentBatchAndScheduleNext() +std::optional IObjectStorageIteratorAsync::getCurrentBatchAndScheduleNext() { std::lock_guard lock(mutex); diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h index 013714151245..b8a0d6e0249a 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h @@ -23,24 +23,24 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator bool isValid() override; - RelativePathWithMetadataPtr current() override; - RelativePathsWithMetadata currentBatch() override; + PathWithMetadataPtr current() override; + PathsWithMetadata currentBatch() override; void next() override; void nextBatch() override; size_t getAccumulatedSize() const override; - std::optional getCurrentBatchAndScheduleNext() override; + std::optional getCurrentBatchAndScheduleNext() override; void deactivate(); protected: /// This method fetches the next batch, and returns true if there are more batches after it. - virtual bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) = 0; + virtual bool getBatchAndCheckNext(PathsWithMetadata & batch) = 0; struct BatchAndHasNext { - RelativePathsWithMetadata batch; + PathsWithMetadata batch; bool has_next; }; @@ -55,8 +55,8 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator ThreadPool list_objects_pool; ThreadPoolCallbackRunnerUnsafe list_objects_scheduler; std::future outcome_future; - RelativePathsWithMetadata current_batch; - RelativePathsWithMetadata::iterator current_batch_iterator; + PathsWithMetadata current_batch; + PathsWithMetadata::iterator current_batch_iterator; std::atomic accumulated_size = 0; }; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 430d99231461..ff7ea7d55a58 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -132,7 +132,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync } private: - bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override + bool getBatchAndCheckNext(PathsWithMetadata & batch) override { ProfileEvents::increment(ProfileEvents::S3ListObjects); ProfileEvents::increment(ProfileEvents::DiskS3ListObjects); @@ -148,7 +148,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync for (const auto & object : objects) { ObjectMetadata metadata{static_cast(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), object.GetETag(), {}}; - batch.emplace_back(std::make_shared(object.GetKey(), std::move(metadata))); + batch.emplace_back(std::make_shared(object.GetKey(), std::move(metadata))); } /// It returns false when all objects were returned @@ -246,7 +246,7 @@ ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefi return std::make_shared(uri.bucket, path_prefix, client.get(), max_keys); } -void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void S3ObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { auto settings_ptr = s3_settings.get(); @@ -274,7 +274,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet break; for (const auto & object : objects) - children.emplace_back(std::make_shared( + children.emplace_back(std::make_shared( object.GetKey(), ObjectMetadata{ static_cast(object.GetSize()), diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index a7efb0809984..fbeb32916280 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -79,7 +79,7 @@ class S3ObjectStorage : public IObjectStorage size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, const WriteSettings & write_settings = {}) override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override; diff --git a/src/Storages/ObjectStorage/DataLakes/Common.cpp b/src/Storages/ObjectStorage/DataLakes/Common.cpp index d2bc6ce6be59..bbfc6b2562a6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Common.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Common.cpp @@ -14,7 +14,7 @@ std::vector listFiles( const String & prefix, const String & suffix) { auto key = std::filesystem::path(configuration.getPathForRead().path) / prefix; - RelativePathsWithMetadata files_with_metadata; + PathsWithMetadata files_with_metadata; object_storage.listObjects(key, files_with_metadata, 0); Strings res; for (const auto & file_with_metadata : files_with_metadata) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index d70a05e6a325..467e4a6e389d 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -356,7 +356,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, IDataLakeMetadata * getExternalMetadata() override { return getImpl().getExternalMetadata(); } std::shared_ptr getInitialSchemaByPath(ContextPtr context, const String & path) const override - { return getImpl().getInitialSchemaByPath(context, path); } + { return getImpl().getInitialSchemaByPath(context, path); } std::shared_ptr getSchemaTransformer(ContextPtr context, const String & path) const override { return getImpl().getSchemaTransformer(context, path); } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index c4228f57cc98..7b99532fdedf 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -58,6 +58,7 @@ struct DataFileInfo { std::string file_path; std::optional file_meta_info; + std::optional absolute_uri; explicit DataFileInfo(const std::string & file_path_) : file_path(file_path_) {} @@ -123,7 +124,7 @@ class IDataLakeMetadata : boost::noncopyable virtual std::optional sortingKey(ContextPtr) const { return {}; } protected: - ObjectIterator createKeysIterator( + virtual ObjectIterator createKeysIterator( DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 19c6ac74be91..cba030532c7e 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -13,12 +13,12 @@ #include #include #include -#include "Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h" +#include +#include #include #include #include -#include #include #include #include @@ -752,8 +752,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object); relevant_snapshot = IcebergSnapshot{ - getManifestList(local_context, getProperFilePathFromMetadataInfo( - snapshot->getValue(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())), + getManifestList(local_context, snapshot->getValue(f_manifest_list)), relevant_snapshot_id, total_rows, total_bytes, partition_key, sorting_key}; if (!snapshot->has(f_schema_id)) @@ -863,7 +862,12 @@ std::optional IcebergMetadata::getSchemaVersionByFileIfOutdated(String da { auto schema_id_it = schema_id_by_data_file.find(data_path); if (schema_id_it == schema_id_by_data_file.end()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find manifest file for data file: {}", data_path); + { + std::string error_msg = ""; + for (const auto & sch : schema_id_by_data_file) + error_msg += "Schema id: " + std::to_string(sch.second) + " for file: " + sch.first + "\n"; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find manifest file for data file: {}.\n Contents:\n{}", data_path, error_msg); + } auto schema_id = schema_id_it->second; if (schema_id == relevant_snapshot_schema_id) @@ -905,7 +909,7 @@ void IcebergMetadata::initializeSchemasFromManifestList(ContextPtr local_context for (const auto & manifest_file_entry : manifest_file_ptr->getFiles()) { if (std::holds_alternative(manifest_file_entry.file)) - schema_id_by_data_file.emplace(std::get(manifest_file_entry.file).file_name, manifest_file_ptr->getSchemaId()); + schema_id_by_data_file.emplace(makeAbsolutePath(table_location, std::get(manifest_file_entry.file).file_name), manifest_file_ptr->getSchemaId()); } } @@ -918,28 +922,32 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context, if (configuration_ptr == nullptr) throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired"); + const String full_filename = makeAbsolutePath(table_location, filename); + auto create_fn = [&]() { - StorageObjectStorage::ObjectInfo object_info(filename); + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, filename, object_storage, secondary_storages, local_context); + + StorageObjectStorage::ObjectInfo object_info(key, std::nullopt, full_filename); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (manifest_cache) read_settings.enable_filesystem_cache = false; - auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log, read_settings); - AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(local_context)); + auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, storage_to_use, local_context, log, read_settings); + AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), key, getFormatSettings(local_context)); ManifestFileCacheKeys manifest_file_cache_keys; for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i) { - const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet(); - const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace()); + const std::string manifest_file_path = makeAbsolutePath(table_location, manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet()); Int64 added_sequence_number = 0; if (format_version > 1) added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet(); - manifest_file_cache_keys.emplace_back(manifest_file_name, added_sequence_number); + manifest_file_cache_keys.emplace_back(manifest_file_path, added_sequence_number); } /// We only return the list of {file name, seq number} for cache. /// Because ManifestList holds a list of ManifestFilePtr which consume much memory space. @@ -949,7 +957,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context, ManifestFileCacheKeys manifest_file_cache_keys; if (manifest_cache) - manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys(IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn); + manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys(IcebergMetadataFilesCache::getKey(configuration_ptr, full_filename), create_fn); else manifest_file_cache_keys = create_fn(); return manifest_file_cache_keys; @@ -1049,35 +1057,40 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const { auto configuration_ptr = configuration.lock(); + const String full_filename = makeAbsolutePath(table_location, filename); + auto create_fn = [&]() { - ObjectInfo manifest_object_info(filename); + // Select proper storage and key for the manifest file + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, full_filename, object_storage, secondary_storages, local_context); + + + ObjectInfo manifest_object_info(key, std::nullopt, full_filename); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (manifest_cache) read_settings.enable_filesystem_cache = false; - auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings); - AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(local_context)); - auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, filename); + auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings); + AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), full_filename, getFormatSettings(local_context)); + auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, full_filename); schema_processor.addIcebergTableSchema(schema_object); return std::make_shared( manifest_file_deserializer, format_version, - configuration_ptr->getPathForRead().path, schema_id, schema_object, schema_processor, inherited_sequence_number, table_location, - configuration_ptr->getNamespace(), local_context); }; if (manifest_cache) { - auto manifest_file = manifest_cache->getOrSetManifestFile(IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn); + auto manifest_file = manifest_cache->getOrSetManifestFile(IcebergMetadataFilesCache::getKey(configuration_ptr, full_filename), create_fn); schema_processor.addIcebergTableSchema(manifest_file->getSchemaObject()); return manifest_file; } @@ -1220,6 +1233,77 @@ std::optional IcebergMetadata::sortingKey(ContextPtr) const } +namespace +{ +class IcebergKeysIterator : public IObjectIterator +{ +public: + IcebergKeysIterator( + DataFileInfos && data_files_, + const std::string & table_location_, + ObjectStoragePtr object_storage_, + std::map & secondary_storages_, + IDataLakeMetadata::FileProgressCallback callback_, + ContextPtr local_context_, + StorageObjectStorage::ConfigurationPtr configuration_ptr_) + : data_files(data_files_) + , table_location(table_location_) + , object_storage(object_storage_) + , secondary_storages(secondary_storages_) + , callback(callback_) + , local_context(local_context_) + , configuration_ptr(configuration_ptr_) + { + } + + size_t estimatedKeysCount() override + { + return data_files.size(); + } + + ObjectInfoPtr next(size_t) override + { + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= data_files.size()) + return nullptr; + + const auto & data_file_info = data_files[current_index]; + + auto absolute_path = data_file_info.absolute_uri.value_or(makeAbsolutePath(table_location, data_file_info.file_path)); + + // Route to correct storage + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, data_file_info.absolute_uri.value_or(data_file_info.file_path), object_storage, secondary_storages, local_context); + + auto object_metadata = storage_to_use->getObjectMetadata(key); + + if (callback) + callback(FileProgress(0, object_metadata.size_bytes)); + + return std::make_shared(key, std::move(object_metadata), absolute_path, storage_to_use); + } + +private: + DataFileInfos data_files; + const String table_location; + ObjectStoragePtr object_storage; + std::map & secondary_storages; + std::atomic index = 0; + IDataLakeMetadata::FileProgressCallback callback; + ContextPtr local_context; + StorageObjectStorage::ConfigurationPtr configuration_ptr; +}; +} + +ObjectIterator IcebergMetadata::createIcebergKeysIterator( + DataFileInfos && data_files_, + ObjectStoragePtr, + IDataLakeMetadata::FileProgressCallback callback_, + ContextPtr local_context) +{ + return std::make_shared(std::move(data_files_), table_location, object_storage, secondary_storages, callback_, local_context, configuration.lock()); +} + ObjectIterator IcebergMetadata::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, @@ -1227,7 +1311,8 @@ ObjectIterator IcebergMetadata::iterate( ContextPtr local_context) const { SharedLockGuard lock(mutex); - return createKeysIterator(getDataFilesImpl(filter_dag, local_context), object_storage, callback); + /// Yes, it is terrible, but OK for proof of concept + return const_cast(this)->createIcebergKeysIterator(getDataFilesImpl(filter_dag, local_context), object_storage, callback, local_context); } NamesAndTypesList IcebergMetadata::getTableSchema() const diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 670e8d6638e9..c723c55aadbe 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -84,6 +84,12 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext std::optional sortingKey(ContextPtr) const override; protected: + ObjectIterator createIcebergKeysIterator( + DataFileInfos && data_files_, + ObjectStoragePtr, + IDataLakeMetadata::FileProgressCallback callback_, + ContextPtr local_context); + ObjectIterator iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, @@ -92,6 +98,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext private: const ObjectStoragePtr object_storage; + mutable std::map secondary_storages; // Sometimes data or manifests can be located on another storage const ConfigurationObserverPtr configuration; mutable IcebergSchemaProcessor schema_processor; LoggerPtr log; @@ -130,6 +137,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext std::optional getRelevantManifestList(const Poco::JSON::Object::Ptr & metadata); Iceberg::ManifestFilePtr tryGetManifestFile(const String & filename) const; + }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 3f94ad70fe6e..d4894e54ec0c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -4,10 +4,11 @@ #if USE_AVRO #include -#include #include #include +#include + #include #include #include @@ -122,15 +123,14 @@ using namespace DB; ManifestFileContent::ManifestFileContent( const AvroForIcebergDeserializer & manifest_file_deserializer, Int32 format_version_, - const String & common_path, Int32 schema_id_, Poco::JSON::Object::Ptr schema_object_, const IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, - const String & table_location, - const String & common_namespace, + std::string table_location_, DB::ContextPtr context) { + this->table_location = std::move(table_location_); this->schema_id = schema_id_; this->schema_object = schema_object_; @@ -193,11 +193,7 @@ ManifestFileContent::ManifestFileContent( } const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet()); - const auto file_path = getProperFilePathFromMetadataInfo( - manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), - common_path, - table_location, - common_namespace); + const auto file_path = manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(); /// NOTE: This is weird, because in manifest file partition looks like this: /// { @@ -287,7 +283,7 @@ ManifestFileContent::ManifestFileContent( columns_infos[column_id].hyperrectangle.emplace(*left, true, *right, true); } - FileEntry file = FileEntry{DataFileEntry{file_path}}; + FileEntry file = FileEntry{DataFileEntry{makeAbsolutePath(table_location, file_path)}}; Int64 added_sequence_number = 0; if (format_version_ > 1) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 9d49c9ef548d..c7b6ac039b59 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -88,13 +88,11 @@ class ManifestFileContent explicit ManifestFileContent( const AvroForIcebergDeserializer & manifest_file_deserializer, Int32 format_version_, - const String & common_path, Int32 schema_id_, Poco::JSON::Object::Ptr schema_object_, const DB::IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, - const std::string & table_location, - const std::string & common_namespace, + std::string table_location_, DB::ContextPtr context); const std::vector & getFiles() const; @@ -124,6 +122,8 @@ class ManifestFileContent std::set column_ids_which_have_bounds; + std::string table_location; + }; using ManifestFilePtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 33af1fe07eaa..397606655518 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -6,8 +6,18 @@ #include #include +#include +#include + #include +# include "boost/filesystem/path.hpp" + +# include "boost/geometry/index/detail/predicates.hpp" + +# include "Poco/String.h" +# include "boost/property_tree/ptree_fwd.hpp" + using namespace DB; @@ -22,89 +32,8 @@ extern const int BAD_ARGUMENTS; namespace Iceberg { - using namespace DB; -// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. -// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro -// Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo( - std::string_view data_path, - std::string_view common_path, - std::string_view table_location, - std::string_view common_namespace) -{ - auto trim_backward_slash = [](std::string_view str) -> std::string_view - { - if (str.ends_with('/')) - { - return str.substr(0, str.size() - 1); - } - return str; - }; - auto trim_forward_slash = [](std::string_view str) -> std::string_view - { - if (str.starts_with('/')) - { - return str.substr(1); - } - return str; - }; - common_path = trim_backward_slash(common_path); - table_location = trim_backward_slash(table_location); - if (data_path.starts_with(table_location) && table_location.ends_with(common_path)) - { - return std::filesystem::path{common_path} / trim_forward_slash(data_path.substr(table_location.size())); - } - - - auto pos = data_path.find(common_path); - size_t good_pos = std::string::npos; - while (pos != std::string::npos) - { - auto potential_position = pos + common_path.size(); - if ((std::string_view(data_path.data() + potential_position, 6) == "/data/") - || (std::string_view(data_path.data() + potential_position, 10) == "/metadata/")) - { - good_pos = pos; - break; - } - size_t new_pos = data_path.find(common_path, pos + 1); - if (new_pos == std::string::npos) - { - break; - } - pos = new_pos; - } - - - if (good_pos != std::string::npos) - { - return std::string{data_path.substr(good_pos)}; - } - else if (pos != std::string::npos) - { - return std::string{data_path.substr(pos)}; - } - else - { - /// Data files can have different path - pos = data_path.find("://"); - if (pos == std::string::npos) - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); - pos = data_path.find('/', pos + 3); - if (pos == std::string::npos) - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); - if (data_path.substr(pos + 1).starts_with(common_namespace)) - { - auto new_pos = data_path.find('/', pos + 1); - if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path - pos = new_pos; - } - return std::string(data_path.substr(pos)); - } -} - } #endif diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 300df4492aa6..9be5ee891fec 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -1,7 +1,10 @@ #pragma once + +#include #include "config.h" +#include #include #include @@ -10,11 +13,7 @@ namespace Iceberg { -std::string getProperFilePathFromMetadataInfo( - std::string_view data_path, - std::string_view common_path, - std::string_view table_location, - std::string_view common_namespace); +/// THis file is clean now, but I left it as is as in future versions there will be some logic } diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index f48935f06168..92a7325b3f58 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -3,8 +3,8 @@ namespace DB { -using ObjectInfo = RelativePathWithMetadata; -using ObjectInfoPtr = std::shared_ptr; +using ObjectInfo = PathWithMetadata; +using ObjectInfoPtr = std::shared_ptr; struct IObjectIterator { diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index f12c6bd6077f..5d5a9bff7a60 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -45,7 +45,7 @@ class StorageObjectStorage : public IStorage class Configuration; using ConfigurationPtr = std::shared_ptr; using ConfigurationObserverPtr = std::weak_ptr; - using ObjectInfo = RelativePathWithMetadata; + using ObjectInfo = PathWithMetadata; using ObjectInfoPtr = std::shared_ptr; using ObjectInfos = std::vector; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 3e124b3c6f8a..70c294096bcc 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include #include #include + #if ENABLE_DISTRIBUTED_CACHE #include #include @@ -155,7 +157,8 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( auto distributed_iterator = std::make_unique( local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads], - local_context); + local_context, + object_storage); if (is_archive) return std::make_shared(object_storage, configuration, std::move(distributed_iterator), local_context, nullptr); @@ -484,6 +487,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); + ObjectStoragePtr storage_to_use = object_storage; + bool not_a_path = false; do @@ -511,7 +516,10 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (object_info->getPath().empty()) return {}; - object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); + if (auto storage_for_file = object_info->getObjectStorage(); storage_for_file.has_value()) + storage_to_use = storage_for_file.value(); + + object_info->loadMetadata(storage_to_use, query_settings.ignore_non_existent_file); } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); @@ -706,12 +714,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else { compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod()); - read_buf = createReadBuffer(*object_info, object_storage, context_, log); + read_buf = createReadBuffer(*object_info, storage_to_use, context_, log); } Block initial_header = read_from_format_info.format_header; - - if (auto initial_schema = configuration->getInitialSchemaByPath(context_, object_info->getPath())) + if (auto initial_schema = configuration->getInitialSchemaByPath( + context_, object_info->getAbsolutePath().value_or(object_info->getPath()))) { Block sample_header; for (const auto & [name, type] : *initial_schema) @@ -747,7 +755,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade builder.init(Pipe(input_format)); - if (auto transformer = configuration->getSchemaTransformer(context_, object_info->getPath())) + if (auto transformer = configuration->getSchemaTransformer(context_, object_info->getAbsolutePath().value_or(object_info->getPath()))) { auto schema_modifying_actions = std::make_shared(transformer->clone()); builder.addSimpleTransform([&](const Block & header) @@ -800,6 +808,8 @@ std::unique_ptr StorageObjectStorageSource::createReadBu const auto & settings = context_->getSettingsRef(); const auto & effective_read_settings = read_settings.has_value() ? read_settings.value() : context_->getReadSettings(); + ObjectStoragePtr storage_to_use = object_info.getObjectStorage().value_or(object_storage); + bool use_distributed_cache = false; #if ENABLE_DISTRIBUTED_CACHE ObjectStorageConnectionInfoPtr connection_info; @@ -807,7 +817,7 @@ std::unique_ptr StorageObjectStorageSource::createReadBu && DistributedCache::Registry::instance().isReady( effective_read_settings.distributed_cache_settings.read_only_from_current_az)) { - connection_info = object_storage->getConnectionInfo(); + connection_info = storage_to_use->getConnectionInfo(); if (connection_info) use_distributed_cache = true; } @@ -820,15 +830,15 @@ std::unique_ptr StorageObjectStorageSource::createReadBu filesystem_cache_name = settings[Setting::filesystem_cache_name].value; use_filesystem_cache = effective_read_settings.enable_filesystem_cache && !filesystem_cache_name.empty() - && (object_storage->getType() == ObjectStorageType::Azure - || object_storage->getType() == ObjectStorageType::S3); + && (storage_to_use->getType() == ObjectStorageType::Azure + || storage_to_use->getType() == ObjectStorageType::S3); } /// We need object metadata for two cases: /// 1. object size suggests whether we need to use prefetch /// 2. object etag suggests a cache key in case we use filesystem cache if (!object_info.metadata) - object_info.metadata = object_storage->getObjectMetadata(object_info.getPath()); + object_info.metadata = storage_to_use->getObjectMetadata(object_info.getPath()); const auto & object_size = object_info.metadata->size_bytes; @@ -866,9 +876,9 @@ std::unique_ptr StorageObjectStorageSource::createReadBu { const std::string path = object_info.getPath(); StoredObject object(path, "", object_size); - auto read_buffer_creator = [object, nested_buffer_read_settings, object_storage]() + auto read_buffer_creator = [object, nested_buffer_read_settings, storage_to_use]() { - return object_storage->readObject(object, nested_buffer_read_settings); + return storage_to_use->readObject(object, nested_buffer_read_settings); }; impl = std::make_unique( @@ -901,9 +911,9 @@ std::unique_ptr StorageObjectStorageSource::createReadBu const auto cache_key = FileCacheKey::fromKey(hash.get128()); auto cache = FileCacheFactory::instance().get(filesystem_cache_name); - auto read_buffer_creator = [path = object_info.getPath(), object_size, modified_read_settings, object_storage]() + auto read_buffer_creator = [path = object_info.getPath(), object_size, modified_read_settings, storage_to_use]() { - return object_storage->readObject(StoredObject(path, "", object_size), modified_read_settings); + return storage_to_use->readObject(StoredObject(path, "", object_size), modified_read_settings); }; modified_read_settings.filesystem_cache_boundary_alignment = settings[Setting::filesystem_cache_boundary_alignment]; @@ -933,7 +943,7 @@ std::unique_ptr StorageObjectStorageSource::createReadBu } if (!impl) - impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), modified_read_settings); + impl = storage_to_use->readObject(StoredObject(object_info.getPath(), "", object_size), modified_read_settings); if (!use_async_buffer) return impl; @@ -1236,9 +1246,10 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc } StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( - const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_) + const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_, ObjectStoragePtr object_storage_) : WithContext(context_) , callback(callback_) + , object_storage(object_storage_) { if (!getContext()->isSwarmModeEnabled()) { @@ -1262,11 +1273,38 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( buffer.reserve(max_threads_count); for (auto & key_future : keys) { - auto key = key_future.get(); - if (key.empty()) + auto raw = key_future.get(); + if (raw.empty()) continue; - buffer.emplace_back(std::make_shared(key, std::nullopt)); + if (isRelativePath(raw)) + { + buffer.emplace_back(std::make_shared(raw, std::nullopt)); + continue; + } + + /// Sometimes, raw is {"retry_after_us":499871}. Then skip????? + std::cerr << "\npath: " << raw << "\n"; + if (object_storage) + std::cerr << "\nstorage: " << object_storage->getName() << ", " << object_storage->getObjectsNamespace() << "\n"; + auto [storage_to_use, key] = resolveObjectStorageForPath( + "", + raw, + object_storage, + secondary_storages, + getContext() + ); + + if (key.empty()) /// Not a valid key/path, maybe it is "retry_after_us". Store as is. + { + std::cerr << "\nin buffer: raw: " << raw << "\n"; + buffer.emplace_back(std::make_shared(raw, std::nullopt)); + } + else + { + std::cerr << "\nin buffer: key: " << key << ", raw: " << raw << "\n"; + buffer.emplace_back(std::make_shared(key, std::nullopt, raw, storage_to_use)); + } } } @@ -1281,11 +1319,29 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator return nullptr; } - auto key = callback(); - if (key.empty()) + auto raw = callback(); + if (raw.empty()) return nullptr; - return std::make_shared(key, std::nullopt); + if (isRelativePath(raw)) + return std::make_shared(raw, std::nullopt); + + auto [storage_to_use, key] = resolveObjectStorageForPath( + "", + raw, + object_storage, + secondary_storages, + getContext() + ); + + if (key.empty()) /// Not a valid key/path, maybe it is "retry_after_us". Store as is. + { + std::cerr << "\nraw: " << raw << "\n"; + return std::make_shared(raw, std::nullopt); + } + + std::cerr << "\nkey: " << key << ", raw: " << raw << "\n"; + return std::make_shared(key, std::nullopt, raw, storage_to_use); } return buffer[current_index]; @@ -1339,7 +1395,7 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o /* path_to_archive */object_info->getPath(), /* archive_read_function */[=, this]() { - return StorageObjectStorageSource::createReadBuffer(*object_info, object_storage, getContext(), log); + return StorageObjectStorageSource::createReadBuffer(*object_info, object_info->getObjectStorage().value_or(object_storage), getContext(), log); }, /* archive_size */size); } @@ -1359,7 +1415,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor return {}; if (!archive_object->metadata) - archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath()); + { + ObjectStoragePtr storage_to_use = archive_object->getObjectStorage().value_or(object_storage); + archive_object->metadata = storage_to_use->getObjectMetadata(archive_object->getPath()); + } archive_reader = createArchiveReader(archive_object); file_enumerator = archive_reader->firstFile(); @@ -1384,7 +1443,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor return {}; if (!archive_object->metadata) - archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath()); + { + ObjectStoragePtr storage_to_use = archive_object->getObjectStorage().value_or(object_storage); + archive_object->metadata = storage_to_use->getObjectMetadata(archive_object->getPath()); + } archive_reader = createArchiveReader(archive_object); if (!archive_reader->fileExists(path_in_archive)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index c8284ab7d997..7f289e982a7c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -166,7 +166,7 @@ class StorageObjectStorageSource : public SourceWithKeyCondition class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, WithContext { public: - ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_); + ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_, ObjectStoragePtr object_storage_); ObjectInfoPtr next(size_t) override; @@ -177,6 +177,8 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, Wit ReadTaskCallback callback; ObjectInfos buffer; std::atomic_size_t index = 0; + ObjectStoragePtr object_storage; + mutable std::map secondary_storages; // Sometimes data can be located on a different storage }; class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithContext diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 482b3a6bcb31..7a200b11df34 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -167,13 +167,13 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile } else { - file_path = object_info->getPath(); + file_path = object_info->getAbsolutePath().value_or(object_info->getPath()); } auto file_meta_info = object_info->getFileMetaInfo(); if (file_meta_info.has_value()) { - RelativePathWithMetadata::CommandInTaskResponse response; + PathWithMetadata::CommandInTaskResponse response; response.setFilePath(file_path); response.setFileMetaInfo(file_meta_info.value()); file_path = response.toString(); @@ -250,7 +250,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocess /// All unprocessed files owned by alive replicas with recenlty activity /// Need to retry after (oldest_activity - activity_limit) microseconds - RelativePathWithMetadata::CommandInTaskResponse response; + PathWithMetadata::CommandInTaskResponse response; response.setRetryAfterUs(oldest_activity - activity_limit); return response.toString(); } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 6e68b3e6302f..38509694369e 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -1,6 +1,10 @@ #include #include +#include #include +#include +#include + namespace DB { @@ -11,6 +15,73 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace +{ + +inline std::string normalizeSchema(const std::string & schema) +{ + auto schema_lowercase = Poco::toLower(schema); + + if (schema_lowercase == "s3a" || schema_lowercase == "s3n") + schema_lowercase = "s3"; + else if (schema_lowercase == "wasb" || schema_lowercase == "wasbs" || schema_lowercase == "abfss") + schema_lowercase = "abfs"; + + return schema_lowercase; +} + +inline std::string endpoint_cache_key(const std::string & normalized_scheme, const std::string & authority) +{ + return normalized_scheme + "://" + authority; +} + +static std::string factoryTypeForScheme(const std::string & normalized_scheme) +{ + if (normalized_scheme == "s3") return "s3"; + if (normalized_scheme == "abfs") return "azure"; + if (normalized_scheme == "hdfs") return "hdfs"; + if (normalized_scheme == "file") return "local"; + return ""; +} + +} + +SchemeAuthorityKey::SchemeAuthorityKey(const std::string & uri) +{ + if (uri.empty()) + return; + + // scheme://authority/path + if (auto scheme_sep = uri.find("://"); scheme_sep != std::string_view::npos) + { + scheme = Poco::toLower(uri.substr(0, scheme_sep)); + auto rest = uri.substr(scheme_sep + 3); // skip :// + + // authority is up to next '/' + auto slash = rest.find('/'); + if (slash == std::string_view::npos) + { + authority = std::string(rest); + key = "/"; // Happy debugging. FIXME: throw exception, path obviously incorrect + return; + } + authority = std::string(rest.substr(0, slash)); + key = std::string(rest.substr(++slash)); // do not keep leading '/' + return; + } + + // if part has no scheme and starts with '/' -- it is an absolute uri for local file: file:///path + if (uri.front() == '/') + { + scheme = "file"; + key = std::string(uri); + return; + } + + // Relative path, return as is + key = std::string(uri); +} + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorage::Configuration & configuration, @@ -48,7 +119,7 @@ void resolveSchemaAndFormat( ColumnsDescription & columns, ObjectStoragePtr object_storage, StorageObjectStorage::ConfigurationPtr configuration, - std::optional format_settings, + const std::optional & format_settings, std::string & sample_path, const ContextPtr & context) { @@ -109,4 +180,263 @@ void validateSupportedColumns( } } +inline bool isSameStorageSchema(const std::string & schema1, const std::string & schema2) +{ + return normalizeSchema(schema1) == normalizeSchema(schema2); +} + +std::string extractStorageType(const std::string & path) +{ + if (path.empty()) + return ""; + + // Absolute POSIX path -> file + if (path.front() == '/') + return "file"; + + // Look for "scheme://..." + if (auto pos = path.find("://"); pos != std::string_view::npos && pos > 0) + return Poco::toLower(path.substr(0, pos)); + + // Tolerant: "scheme:..." with no slash before colon (avoid Windows drive letters like "C:\") + { + auto colon_pos = path.find(':'); + auto slash_pos = path.find('/'); + if (colon_pos != std::string_view::npos && (slash_pos == std::string_view::npos || colon_pos < slash_pos)) + { + auto maybe_scheme = path.substr(0, colon_pos); + // Heuristic: treat single-letter prefix followed by ":\\" as Windows drive, not a scheme + if (!(maybe_scheme.size() == 1 && colon_pos + 1 < path.size() && (path[colon_pos + 1] == '\\' || path[colon_pos + 1] == '/'))) + return Poco::toLower(maybe_scheme); + } + } + + // Relative path or unknown + return ""; +} + +bool isRelativePath(const std::string & path) +{ + if (path.empty()) + return true; + + // Non-relative if it has a scheme (e.g., s3://, file://) + if (!extractStorageType(path).empty()) + return false; + + return true; +} + +std::string makeAbsolutePath(const std::string & table_location, const std::string & path) +{ + if (!isRelativePath(path)) + return path; + + auto base = SchemeAuthorityKey(table_location); + + std::string base_dir = base.key.empty() ? std::string("/") : base.key; + if (!base_dir.empty() && base_dir.back() != '/') + base_dir.push_back('/'); + + std::string rel = path; + if (!rel.empty() && rel.front() == '/') + rel.erase(0, 1); + + std::string abs_path = base_dir + rel; + if (abs_path.empty() || abs_path.front() != '/') + abs_path.insert(abs_path.begin(), '/'); + + if (!base.scheme.empty()) + return base.scheme + "://" + base.authority + abs_path; + + return std::string("file://") + abs_path; +} + +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + std::map & secondary_storages, + const DB::ContextPtr & context) +{ + if (isRelativePath(path)) + { + // For relative paths, we need to construct the "full" key: table location + relative path + std::string full_key = path; + + if (!table_location.empty()) + { + SchemeAuthorityKey base{table_location}; + if (!base.key.empty()) + { + // Combine base key with relative path + full_key = base.key; + if (!full_key.empty() && full_key.back() != '/') + full_key += '/'; + + std::string rel_path = path; + if (!rel_path.empty() && rel_path.front() == '/') + rel_path = rel_path.substr(1); + + full_key += rel_path; + } + } + + if (base_storage) + std::cerr << "\nRelative path: " << base_storage->getName() << ", " << base_storage->getObjectsNamespace() << ". full_key: " << full_key << "\n"; + else + std::cerr << "\nRelative, returning base, which is: NULL\n"; + return {base_storage, full_key}; // Relative path definitely goes to base storage + } + + SchemeAuthorityKey base{table_location}; + SchemeAuthorityKey target{path}; + + if (target.scheme.empty()) + { + if (base_storage) + std::cerr << "\nNo scheme, returning base, which is: " << base_storage->getName() << ", " << base_storage->getObjectsNamespace() << "\n"; + else + + std::cerr << "\nNo scheme, returning base, which is: NULL\n"; + return {base_storage, target.key}; + } + + const std::string base_norm = normalizeSchema(base.scheme); + const std::string target_norm = normalizeSchema(target.scheme); + + // For S3 URIs, use S3::URI to properly URLs: https://s3.amazonaws.com/..... + #if USE_AWS_S3 + if (target_norm == "s3" || target_norm == "https" || target_norm == "http") + { + try + { + S3::URI s3_uri(path); + + if (base_norm == "s3" || base_norm == "https" || base_norm == "http") + { + S3::URI base_s3_uri(table_location); + + if (s3_uri.bucket == base_s3_uri.bucket && s3_uri.endpoint == base_s3_uri.endpoint) + { + std::cerr << "\nPath: " << path << "\n"; + if (base_storage) + std::cerr << "\nSame S3 location, returning base, which is: " << base_storage->getName() << ", " << base_storage->getObjectsNamespace() << "\n"; + else + std::cerr << "\nSame S3 location, returning base, which is: NULL\n"; + return {base_storage, s3_uri.key}; + } + else + { + std::cerr << "\nS3 storages are different:\n" << base_s3_uri.bucket << " @ " << base_s3_uri.endpoint + << " vs " << s3_uri.bucket << " @ " << s3_uri.endpoint << "\n"; + } + } + + const std::string cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint); + + if (auto it = secondary_storages.find(cache_key); it != secondary_storages.end()) + { + if (it->second) + std::cerr << "\nfound in cache: " << it->second->getName() << ", " << it->second->getObjectsNamespace() << "\n"; + else + std::cerr << "\nfound in cache: NULL\n"; + return {it->second, s3_uri.key}; + } + + /// TODO: maybe do not invent new configuration. Use old one and clean up later + Poco::AutoPtr cfg(new Poco::Util::MapConfiguration); + + const std::string config_prefix = "object_storages." + cache_key; + + cfg->setString(config_prefix + ".object_storage_type", "s3"); + + // Use the full endpoint or construct it from bucket + std::string endpoint = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + cfg->setString(config_prefix + ".endpoint", endpoint); + + auto & factory = DB::ObjectStorageFactory::instance(); + + DB::ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + + secondary_storages.emplace(cache_key, storage); + if (storage) + std::cerr << "\ncreated new S3 storage: " << storage->getName() << ", " << storage->getObjectsNamespace() << ":\ndescr: " << storage->getDescription() << "\n"; + else + std::cerr << "\ncreated new S3 storage: it is NULL\n"; + return {storage, s3_uri.key}; + } + catch (...) + { + // If S3::URI parsing fails, fall back to the old logic + } + } + #endif + + + // Reuse base storage if scheme and authority (bucket) matches + if (base_norm == target_norm && base.authority == target.authority) + { + if (base_storage) + std::cerr << "\nSame location, returning base, which is: " << base_storage->getName() << ", " << base_storage->getObjectsNamespace() << "\n"; + else + std::cerr << "\nSame location, returning base, which is: NULL\n"; + return {base_storage, target.key}; + } + else + { + std::cerr << "\nStorages are different:\n" << base.scheme << "://" << base.authority + << " vs " << target.scheme << "://" << target.authority << "\n"; + } + + const std::string cache_key = endpoint_cache_key(target_norm, target.authority); + if (auto it = secondary_storages.find(cache_key); it != secondary_storages.end()) + { + if (it->second) + std::cerr << "\nfound in cache: " << it->second->getName() << ", " << it->second->getObjectsNamespace() << "\n"; + else + std::cerr << "\nfound in cache: NULL\n"; + return {it->second, target.key}; + } + + /// TODO: maybe do not invent new configuration. Use old one and clean up later + Poco::AutoPtr cfg(new Poco::Util::MapConfiguration); + + const std::string type_for_factory = factoryTypeForScheme(target_norm); + if (type_for_factory.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported storage scheme '{}' in path '{}'", target_norm, path); + + const std::string config_prefix = "object_storages." + cache_key; + + cfg->setString(config_prefix + ".object_storage_type", type_for_factory); + + if (target_norm == "s3" || target_norm == "abfs") + { + cfg->setString(config_prefix + ".endpoint", target_norm + "://" + target.authority); + } + else if (target_norm == "hdfs") + { + // HDFS endpoint must end with '/' + auto endpoint = target_norm + "://" + target.authority; + if (!endpoint.empty() && endpoint.back() != '/') + endpoint.push_back('/'); + cfg->setString(config_prefix + ".endpoint", endpoint); + } + // No extra config needed for local storage (file://) + + auto & factory = DB::ObjectStorageFactory::instance(); + + // Also use the cache key as a (unique) storage name + DB::ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + + secondary_storages.emplace(cache_key, storage); + if (storage) + std::cerr << "\ncreated new storage: " << storage->getName() << ", " << storage->getObjectsNamespace() << ":\ndescr: " << storage->getDescription() << "\n"; + else + std::cerr << "\ncreated new storage: it is NULL\n"; + return {storage, target.key}; +} + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 7631d92173db..4bb6cc425756 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -1,11 +1,26 @@ #pragma once #include "StorageObjectStorage.h" +#include + namespace DB { class IObjectStorage; +// A URI splitted into components +// s3://bucket/a/b -> scheme="s3", authority="bucket", path="/a/b" +// file:///var/x -> scheme="file", authority="", path="/var/x" +// /abs/p -> scheme="", authority="", path="/abs/p" +struct SchemeAuthorityKey +{ + explicit SchemeAuthorityKey(const std::string & uri); + + std::string scheme; + std::string authority; + std::string key; +}; + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorage::Configuration & configuration, @@ -17,7 +32,7 @@ void resolveSchemaAndFormat( ColumnsDescription & columns, ObjectStoragePtr object_storage, StorageObjectStorage::ConfigurationPtr configuration, - std::optional format_settings, + const std::optional & format_settings, std::string & sample_path, const ContextPtr & context); @@ -25,4 +40,24 @@ void validateSupportedColumns( ColumnsDescription & columns, const StorageObjectStorage::Configuration & configuration); +bool isRelativePath(const std::string & path); + +bool isSameStorageSchema(const std::string & schema1, const std::string & schema2); + +// Returns lowercased and normalized schema / storage type (e.g. s3:// -> "s3") +std::string extractStorageType(const std::string & path); + +std::string makeAbsolutePath(const std::string & table_location, const std::string & path); + +// Resolve object storage for reading a file from +// * If path is relative -- it must be read from "base" storage +// * Otherwise, lookup if suitable storage already exists in secondary_storages +// * Also, TODO: come back here and make some comments +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + std::map & secondary_storages, + const DB::ContextPtr & context); + }