diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index 363293933d53..c7ab78030411 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -84,13 +84,19 @@ void TableMetadata::setLocation(const std::string & location_) auto pos_to_path = location_.substr(pos_to_bucket).find('/'); if (pos_to_path == std::string::npos) - throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); - - pos_to_path = pos_to_bucket + pos_to_path; + { // empty path + location_without_path = location_; + path.clear(); + bucket = location_.substr(pos_to_bucket); + } + else + { + pos_to_path = pos_to_bucket + pos_to_path; - location_without_path = location_.substr(0, pos_to_path); - path = location_.substr(pos_to_path + 1); - bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + location_without_path = location_.substr(0, pos_to_path); + path = location_.substr(pos_to_path + 1); + bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + } LOG_TEST(getLogger("TableMetadata"), "Parsed location without path: {}, path: {}", diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 3006c68e6083..7e385f034c98 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -272,7 +272,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( { const auto & context = getContext(); - Poco::URI url(base_url / endpoint); + Poco::URI url(base_url / endpoint, false); if (!params.empty()) url.setQueryParameters(params); @@ -511,7 +511,9 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas for (size_t i = 0; i < identifiers_object->size(); ++i) { const auto current_table_json = identifiers_object->get(static_cast(i)).extract(); - const auto table_name = current_table_json->get("name").extract(); + const auto table_name_raw = current_table_json->get("name").extract(); + std::string table_name; + Poco::URI::encode(table_name_raw, "/", table_name); tables.push_back(base_namespace + "." + table_name); if (limit && tables.size() >= limit) @@ -700,7 +702,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r }; } - Poco::URI url(endpoint); + Poco::URI url(endpoint, false); auto wb = DB::BuilderRWBufferFromHTTP(url) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withMethod(method) diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 28bc4d52f45f..5fe470f9c19f 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -465,7 +465,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const auto bucket_uri = getURIForBucket(bucket); if (!bucket_uri) { - if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value()) + if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value()) return *maybe_error; if (auto region = getRegionForBucket(bucket); !region.empty()) @@ -670,7 +670,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const if (auto uri = getURIForBucket(bucket); uri.has_value()) request.overrideURI(std::move(*uri)); - bool found_new_endpoint = false; // if we found correct endpoint after 301 responses, update the cache for future requests SCOPE_EXIT( @@ -973,12 +972,15 @@ std::optional Client::getURIFromError(const Aws::S3::S3Error & error) c } // Do a list request because head requests don't have body in response -std::optional Client::updateURIForBucketForHead(const std::string & bucket) const +// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject +std::optional Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const { - ListObjectsV2Request req; + GetObjectRequest req; req.SetBucket(bucket); - req.SetMaxKeys(1); - auto result = ListObjectsV2(req); + req.SetKey(key); + req.SetRange("bytes=0-1"); + auto result = GetObject(req); + if (result.IsSuccess()) return std::nullopt; return result.GetError(); diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 48c26124e306..dfbb9207b305 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -285,7 +285,7 @@ class Client : private Aws::S3::S3Client void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const; std::optional getURIFromError(const Aws::S3::S3Error & error) const; - std::optional updateURIForBucketForHead(const std::string & bucket) const; + std::optional updateURIForBucketForHead(const std::string & bucket, const std::string & key) const; std::optional getURIForBucket(const std::string & bucket) const; diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index f5848fac75df..480160a86c5f 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) validateKey(key, uri); } +bool URI::isAWSRegion(std::string_view region) +{ + /// List from https://docs.aws.amazon.com/general/latest/gr/s3.html + static const std::unordered_set regions = { + "us-east-2", + "us-east-1", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-2", + "ap-southeast-3", + "ap-southeast-5", + "ap-southeast-4", + "ap-south-1", + "ap-northeast-3", + "ap-northeast-2", + "ap-southeast-1", + "ap-southeast-2", + "ap-east-2", + "ap-southeast-7", + "ap-northeast-1", + "ca-central-1", + "ca-west-1", + "eu-central-1", + "eu-west-1", + "eu-west-2", + "eu-south-1", + "eu-west-3", + "eu-south-2", + "eu-north-1", + "eu-central-2", + "il-central-1", + "mx-central-1", + "me-south-1", + "me-central-1", + "sa-east-1", + "us-gov-east-1", + "us-gov-west-1" + }; + + /// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2' + /// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility + if (region.substr(0, 3) == "s3-") + region = region.substr(3); + + return regions.contains(region); +} + void URI::addRegionToURI(const std::string ®ion) { if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos) + { + if (pos > 0) + { /// Check if region is already in endpoint to avoid add it second time + auto prev_pos = endpoint.find_last_of("/.", pos - 1); + if (prev_pos == std::string::npos) + prev_pos = 0; + else + ++prev_pos; + std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos); + if (isAWSRegion(endpoint_region)) + return; + } endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos); + } } void URI::validateBucket(const String & bucket, const Poco::URI & uri) diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index 9220a8209045..8af05c177807 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -41,6 +41,10 @@ struct URI static void validateBucket(const std::string & bucket, const Poco::URI & uri); static void validateKey(const std::string & key, const Poco::URI & uri); + + /// Returns true if 'region' string is an AWS S3 region + /// https://docs.aws.amazon.com/general/latest/gr/s3.html + static bool isAWSRegion(std::string_view region); }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index a92630e1bb05..eda67873dc80 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -316,7 +316,10 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec persistent_components, local_context, getProperFilePathFromMetadataInfo( - snapshot->getValue(f_manifest_list), configuration_ptr->getPathForRead().path, persistent_components.table_location), + snapshot->getValue(f_manifest_list), + configuration_ptr->getPathForRead().path, + persistent_components.table_location, + configuration_ptr->getNamespace()), log), relevant_snapshot_id, total_rows, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 00c9e34d010c..1fdde4972ef0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -150,6 +150,7 @@ ManifestFileContent::ManifestFileContent( Int64 inherited_sequence_number, Int64 inherited_snapshot_id, const String & table_location, + const String & common_namespace, DB::ContextPtr context, const String & path_to_manifest_file_) : path_to_manifest_file(path_to_manifest_file_) @@ -286,7 +287,11 @@ ManifestFileContent::ManifestFileContent( const auto file_path_key = manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(); - const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), common_path, table_location); + const auto file_path = getProperFilePathFromMetadataInfo( + manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), + common_path, + table_location, + common_namespace); /// NOTE: This is weird, because in manifest file partition looks like this: /// { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index ac12be343439..0bfdb758ee22 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -115,6 +115,7 @@ class ManifestFileContent : public boost::noncopyable Int64 inherited_sequence_number, Int64 inherited_snapshot_id, const std::string & table_location, + const std::string & common_namespace, DB::ContextPtr context, const String & path_to_manifest_file_); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp index a0f3c628d774..9372a13e2074 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp @@ -103,6 +103,7 @@ Iceberg::ManifestFilePtr getManifestFile( inherited_sequence_number, inherited_snapshot_id, persistent_table_components.table_location, + configuration->getNamespace(), local_context, filename); }; @@ -160,7 +161,10 @@ ManifestFileCacheKeys getManifestList( const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet(); const auto manifest_file_name = getProperFilePathFromMetadataInfo( - file_path, configuration_ptr->getPathForRead().path, persistent_table_components.table_location); + file_path, + configuration_ptr->getPathForRead().path, + persistent_table_components.table_location, + configuration_ptr->getNamespace()); Int64 added_sequence_number = 0; auto added_snapshot_id = manifest_list_deserializer.getValueFromRowByName(i, f_added_snapshot_id); if (added_snapshot_id.isNull()) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 8cc23f59d729..260c50ea4080 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -166,7 +166,11 @@ std::optional parseTransformAndArgument(const String & tra // This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. // For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro // Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location) +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace) { auto trim_backward_slash = [](std::string_view str) -> std::string_view { @@ -231,7 +235,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s } else { - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path); + /// Data files can have different path + pos = data_path.find("://"); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + pos = data_path.find('/', pos + 3); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + if (data_path.substr(pos + 1).starts_with(common_namespace)) + { + auto new_pos = data_path.find('/', pos + 1); + if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path + pos = new_pos; + } + return std::string(data_path.substr(pos)); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 52c2b19b049d..c77bfc6d5c76 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -31,7 +31,11 @@ void writeMessageToFile( std::function cleanup, DB::CompressionMethod compression_method = DB::CompressionMethod::None); -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location); +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace); struct TransformAndArgument { diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 662b51e0f687..ed368abb762a 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -560,6 +560,7 @@ def test_create(started_cluster): node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_name}` VALUES ('AAPL');", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1}) assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "AAPL\n" + def test_drop_table(started_cluster): node = started_cluster.instances["node1"] @@ -575,3 +576,27 @@ def test_drop_table(started_cluster): drop_clickhouse_iceberg_table(node, root_namespace, table_name) assert len(catalog.list_tables(root_namespace)) == 0 + + +def test_table_with_slash(started_cluster): + node = started_cluster.instances["node1"] + + # pyiceberg at current moment (version 0.9.1) has a bug with table names with slashes + # see https://github.com/apache/iceberg-python/issues/2462 + # so we need to encode it manually + table_raw_suffix = "table/foo" + table_encoded_suffix = "table%2Ffoo" + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_{table_raw_suffix}" + table_encoded_name = f"{test_ref}_{table_encoded_suffix}" + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + create_table(catalog, root_namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1}) + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n"