Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,19 @@ void TableMetadata::setLocation(const std::string & location_)
auto pos_to_path = location_.substr(pos_to_bucket).find('/');

if (pos_to_path == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);

pos_to_path = pos_to_bucket + pos_to_path;
{ // empty path
location_without_path = location_;
path.clear();
bucket = location_.substr(pos_to_bucket);
}
else
{
pos_to_path = pos_to_bucket + pos_to_path;

location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
}

LOG_TEST(getLogger("TableMetadata"),
"Parsed location without path: {}, path: {}",
Expand Down
8 changes: 5 additions & 3 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
{
const auto & context = getContext();

Poco::URI url(base_url / endpoint);
Poco::URI url(base_url / endpoint, false);
if (!params.empty())
url.setQueryParameters(params);

Expand Down Expand Up @@ -511,7 +511,9 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas
for (size_t i = 0; i < identifiers_object->size(); ++i)
{
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
const auto table_name = current_table_json->get("name").extract<String>();
const auto table_name_raw = current_table_json->get("name").extract<String>();
std::string table_name;
Poco::URI::encode(table_name_raw, "/", table_name);

tables.push_back(base_namespace + "." + table_name);
if (limit && tables.size() >= limit)
Expand Down Expand Up @@ -700,7 +702,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r
};
}

Poco::URI url(endpoint);
Poco::URI url(endpoint, false);
auto wb = DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withMethod(method)
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
return *maybe_error;

if (auto region = getRegionForBucket(bucket); !region.empty())
Expand Down Expand Up @@ -670,7 +670,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));


bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
Expand Down Expand Up @@ -973,12 +972,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
}

// Do a list request because head requests don't have body in response
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
{
ListObjectsV2Request req;
GetObjectRequest req;
req.SetBucket(bucket);
req.SetMaxKeys(1);
auto result = ListObjectsV2(req);
req.SetKey(key);
req.SetRange("bytes=0-1");
auto result = GetObject(req);

if (result.IsSuccess())
return std::nullopt;
return result.GetError();
Expand Down
2 changes: 1 addition & 1 deletion src/IO/S3/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class Client : private Aws::S3::S3Client

void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;

std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;

Expand Down
62 changes: 62 additions & 0 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
validateKey(key, uri);
}

bool URI::isAWSRegion(std::string_view region)
{
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
static const std::unordered_set<std::string_view> regions = {
"us-east-2",
"us-east-1",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-southeast-4",
"ap-south-1",
"ap-northeast-3",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-east-2",
"ap-southeast-7",
"ap-northeast-1",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-south-1",
"eu-west-3",
"eu-south-2",
"eu-north-1",
"eu-central-2",
"il-central-1",
"mx-central-1",
"me-south-1",
"me-central-1",
"sa-east-1",
"us-gov-east-1",
"us-gov-west-1"
};

/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
if (region.substr(0, 3) == "s3-")
region = region.substr(3);

return regions.contains(region);
}

void URI::addRegionToURI(const std::string &region)
{
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
{
if (pos > 0)
{ /// Check if region is already in endpoint to avoid add it second time
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
if (prev_pos == std::string::npos)
prev_pos = 0;
else
++prev_pos;
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
if (isAWSRegion(endpoint_region))
return;
}
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
}
}

void URI::validateBucket(const String & bucket, const Poco::URI & uri)
Expand Down
4 changes: 4 additions & 0 deletions src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct URI

static void validateBucket(const std::string & bucket, const Poco::URI & uri);
static void validateKey(const std::string & key, const Poco::URI & uri);

/// Returns true if 'region' string is an AWS S3 region
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
static bool isAWSRegion(std::string_view region);
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
persistent_components,
local_context,
getProperFilePathFromMetadataInfo(
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, persistent_components.table_location),
snapshot->getValue<String>(f_manifest_list),
configuration_ptr->getPathForRead().path,
persistent_components.table_location,
configuration_ptr->getNamespace()),
log),
relevant_snapshot_id,
total_rows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ ManifestFileContent::ManifestFileContent(
Int64 inherited_sequence_number,
Int64 inherited_snapshot_id,
const String & table_location,
const String & common_namespace,
DB::ContextPtr context,
const String & path_to_manifest_file_)
: path_to_manifest_file(path_to_manifest_file_)
Expand Down Expand Up @@ -286,7 +287,11 @@ ManifestFileContent::ManifestFileContent(

const auto file_path_key
= manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>();
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
const auto file_path = getProperFilePathFromMetadataInfo(
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
common_path,
table_location,
common_namespace);

/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ManifestFileContent : public boost::noncopyable
Int64 inherited_sequence_number,
Int64 inherited_snapshot_id,
const std::string & table_location,
const std::string & common_namespace,
DB::ContextPtr context,
const String & path_to_manifest_file_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Iceberg::ManifestFilePtr getManifestFile(
inherited_sequence_number,
inherited_snapshot_id,
persistent_table_components.table_location,
configuration->getNamespace(),
local_context,
filename);
};
Expand Down Expand Up @@ -160,7 +161,10 @@ ManifestFileCacheKeys getManifestList(
const std::string file_path
= manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
const auto manifest_file_name = getProperFilePathFromMetadataInfo(
file_path, configuration_ptr->getPathForRead().path, persistent_table_components.table_location);
file_path,
configuration_ptr->getPathForRead().path,
persistent_table_components.table_location,
configuration_ptr->getNamespace());
Int64 added_sequence_number = 0;
auto added_snapshot_id = manifest_list_deserializer.getValueFromRowByName(i, f_added_snapshot_id);
if (added_snapshot_id.isNull())
Expand Down
21 changes: 19 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ std::optional<TransformAndArgument> parseTransformAndArgument(const String & tra
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
// Common path should end with "<table_name>" or "<table_name>/".
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
std::string getProperFilePathFromMetadataInfo(
std::string_view data_path,
std::string_view common_path,
std::string_view table_location,
std::string_view common_namespace)
{
auto trim_backward_slash = [](std::string_view str) -> std::string_view
{
Expand Down Expand Up @@ -231,7 +235,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
}
else
{
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
/// Data files can have different path
pos = data_path.find("://");
if (pos == std::string::npos)
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
pos = data_path.find('/', pos + 3);
if (pos == std::string::npos)
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
if (data_path.substr(pos + 1).starts_with(common_namespace))
{
auto new_pos = data_path.find('/', pos + 1);
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
pos = new_pos;
}
return std::string(data_path.substr(pos));
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ void writeMessageToFile(
std::function<void()> cleanup,
DB::CompressionMethod compression_method = DB::CompressionMethod::None);

std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
std::string getProperFilePathFromMetadataInfo(
std::string_view data_path,
std::string_view common_path,
std::string_view table_location,
std::string_view common_namespace);

struct TransformAndArgument
{
Expand Down
25 changes: 25 additions & 0 deletions tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ def test_create(started_cluster):
node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_name}` VALUES ('AAPL');", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "AAPL\n"


def test_drop_table(started_cluster):
node = started_cluster.instances["node1"]

Expand All @@ -575,3 +576,27 @@ def test_drop_table(started_cluster):

drop_clickhouse_iceberg_table(node, root_namespace, table_name)
assert len(catalog.list_tables(root_namespace)) == 0


def test_table_with_slash(started_cluster):
node = started_cluster.instances["node1"]

# pyiceberg at current moment (version 0.9.1) has a bug with table names with slashes
# see https://github.com/apache/iceberg-python/issues/2462
# so we need to encode it manually
table_raw_suffix = "table/foo"
table_encoded_suffix = "table%2Ffoo"

test_ref = f"test_list_tables_{uuid.uuid4()}"
table_name = f"{test_ref}_{table_raw_suffix}"
table_encoded_name = f"{test_ref}_{table_encoded_suffix}"
root_namespace = f"{test_ref}_namespace"

catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(root_namespace)

create_table(catalog, root_namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER)

create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n"
Loading