Skip to content

Commit 35e5d83

Browse files
committed
another cleaned attempt
fix schema id cache population correct splitting of URI into parts tmp: lazy way to debug cluster fix storage schema normalization store per-file object_storage_ptr in object_info tmp upd upd fix for local make new storages properly remove cloneObjectStorage
1 parent 89e8c2a commit 35e5d83

File tree

8 files changed

+262
-167
lines changed

8 files changed

+262
-167
lines changed

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,16 @@ struct PathWithMetadata
125125
std::optional<ObjectMetadata> metadata;
126126
CommandInTaskResponse command;
127127
String absolute_path;
128+
std::optional<ObjectStoragePtr> object_storage_to_use = std::nullopt;
128129

129130
PathWithMetadata() = default;
130131

131-
explicit PathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt, String absolute_path_ = "")
132+
explicit PathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt, String absolute_path_ = "", std::optional<ObjectStoragePtr> object_storage_to_use_ = std::nullopt)
132133
: metadata(std::move(metadata_))
133134
, command(task_string)
135+
, absolute_path(absolute_path_)
136+
, object_storage_to_use(object_storage_to_use_)
134137
{
135-
absolute_path = absolute_path_;
136138
if (!command.is_parsed())
137139
{
138140
relative_path = task_string;
@@ -152,6 +154,8 @@ struct PathWithMetadata
152154

153155
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file);
154156
const CommandInTaskResponse & getCommand() const { return command; }
157+
158+
std::optional<ObjectStoragePtr> getObjectStorage() const { return object_storage_to_use; }
155159
};
156160

157161
struct ObjectKeyWithMetadata
@@ -291,14 +295,6 @@ class IObjectStorage
291295
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
292296
virtual String getObjectsNamespace() const = 0;
293297

294-
virtual std::unique_ptr<IObjectStorage> cloneObjectStorage(
295-
const std::string &,
296-
const Poco::Util::AbstractConfiguration &,
297-
const std::string &, ContextPtr)
298-
{
299-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'cloneObjectStorage' is not implemented");
300-
}
301-
302298
/// Generate blob name for passed absolute local path.
303299
/// Path can be generated either independently or based on `path`.
304300
virtual ObjectStorageKey generateObjectKeyForPath(const std::string & path, const std::optional<std::string> & key_prefix) const = 0;

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
356356
IDataLakeMetadata * getExternalMetadata() override { return getImpl().getExternalMetadata(); }
357357

358358
std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(ContextPtr context, const String & path) const override
359-
{ return getImpl().getInitialSchemaByPath(context, path); }
359+
{ return getImpl().getInitialSchemaByPath(context, path); }
360360

361361
std::shared_ptr<const ActionsDAG> getSchemaTransformer(ContextPtr context, const String & path) const override
362362
{ return getImpl().getSchemaTransformer(context, path); }

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 36 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,12 @@ std::optional<Int32> IcebergMetadata::getSchemaVersionByFileIfOutdated(String da
866866
{
867867
auto schema_id_it = schema_id_by_data_file.find(data_path);
868868
if (schema_id_it == schema_id_by_data_file.end())
869-
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find manifest file for data file: {}", data_path);
869+
{
870+
std::string error_msg = "";
871+
for (const auto & sch : schema_id_by_data_file)
872+
error_msg += "Schema id: " + std::to_string(sch.second) + " for file: " + sch.first + "\n";
873+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find manifest file for data file: {}.\n Contents:\n{}", data_path, error_msg);
874+
}
870875

871876
auto schema_id = schema_id_it->second;
872877
if (schema_id == relevant_snapshot_schema_id)
@@ -908,7 +913,7 @@ void IcebergMetadata::initializeSchemasFromManifestList(ContextPtr local_context
908913
for (const auto & manifest_file_entry : manifest_file_ptr->getFiles())
909914
{
910915
if (std::holds_alternative<DataFileEntry>(manifest_file_entry.file))
911-
schema_id_by_data_file.emplace(std::get<DataFileEntry>(manifest_file_entry.file).file_name, manifest_file_ptr->getSchemaId());
916+
schema_id_by_data_file.emplace(Iceberg::makeAbsolutePath(table_location, std::get<DataFileEntry>(manifest_file_entry.file).file_name), manifest_file_ptr->getSchemaId());
912917
}
913918
}
914919

@@ -921,56 +926,32 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
921926
if (configuration_ptr == nullptr)
922927
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
923928

929+
const String full_filename = Iceberg::makeAbsolutePath(table_location, filename);
930+
924931
auto create_fn = [&]()
925932
{
926-
auto base_table_uri_parsed = Iceberg::parseUri(table_location);
927-
auto file_uri_parsed = Iceberg::parseUri(filename);
928-
929-
ObjectStoragePtr storage_to_use = object_storage;
930-
String key_or_path = filename;
931-
932-
if (!file_uri_parsed.scheme.empty() && !base_table_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme)
933-
{
934-
if (file_uri_parsed.authority == base_table_uri_parsed.authority)
935-
{
936-
// Same namespace as table_location -> use primary storage and strip leading '/'
937-
key_or_path = file_uri_parsed.path;
938-
if (!key_or_path.empty() && key_or_path.front() == '/')
939-
key_or_path.erase(0, 1);
940-
}
941-
else if (!file_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority.empty())
942-
{
943-
// Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
944-
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
945-
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
946-
storage_to_use = ObjectStoragePtr(cloned.release());
947-
948-
key_or_path = file_uri_parsed.path;
949-
if (!key_or_path.empty() && key_or_path.front() == '/')
950-
key_or_path.erase(0, 1);
951-
}
952-
}
953-
// TODO: what if storage type is different?
933+
auto [storage_to_use, key] = Iceberg::resolveObjectStorageForPath(
934+
table_location, filename, object_storage, secondary_storages, local_context);
954935

955-
StorageObjectStorage::ObjectInfo object_info(key_or_path, std::nullopt, filename);
936+
StorageObjectStorage::ObjectInfo object_info(key, std::nullopt, full_filename);
956937

957938
auto read_settings = local_context->getReadSettings();
958939
/// Do not utilize filesystem cache if more precise cache enabled
959940
if (manifest_cache)
960941
read_settings.enable_filesystem_cache = false;
961942

962943
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, storage_to_use, local_context, log, read_settings);
963-
AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), file_uri_parsed.path, getFormatSettings(local_context));
944+
AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), key, getFormatSettings(local_context));
964945

965946
ManifestFileCacheKeys manifest_file_cache_keys;
966947

967948
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
968949
{
969-
const std::string manifest_file_name = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
950+
const std::string manifest_file_path = Iceberg::makeAbsolutePath(table_location, manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>());
970951
Int64 added_sequence_number = 0;
971952
if (format_version > 1)
972953
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet<Int64>();
973-
manifest_file_cache_keys.emplace_back(manifest_file_name, added_sequence_number);
954+
manifest_file_cache_keys.emplace_back(manifest_file_path, added_sequence_number);
974955
}
975956
/// We only return the list of {file name, seq number} for cache.
976957
/// Because ManifestList holds a list of ManifestFilePtr which consume much memory space.
@@ -980,7 +961,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
980961

981962
ManifestFileCacheKeys manifest_file_cache_keys;
982963
if (manifest_cache)
983-
manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys(IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn);
964+
manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys(IcebergMetadataFilesCache::getKey(configuration_ptr, full_filename), create_fn);
984965
else
985966
manifest_file_cache_keys = create_fn();
986967
return manifest_file_cache_keys;
@@ -1080,46 +1061,25 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
10801061
{
10811062
auto configuration_ptr = configuration.lock();
10821063

1064+
const String full_filename = Iceberg::makeAbsolutePath(table_location, filename);
1065+
10831066
auto create_fn = [&]()
10841067
{
1085-
auto base_table_uri_parsed = Iceberg::parseUri(table_location);
1086-
auto file_uri_parsed = Iceberg::parseUri(filename);
1068+
// Select proper storage and key for the manifest file
1069+
auto [storage_to_use, key] = Iceberg::resolveObjectStorageForPath(
1070+
table_location, full_filename, object_storage, secondary_storages, local_context);
10871071

1088-
ObjectStoragePtr storage_to_use = object_storage;
1089-
String key_or_path = filename;
10901072

1091-
if (!file_uri_parsed.scheme.empty() && !base_table_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme)
1092-
{
1093-
if (file_uri_parsed.authority == base_table_uri_parsed.authority)
1094-
{
1095-
// Same namespace as table_location -> use primary storage and strip leading '/'
1096-
key_or_path = file_uri_parsed.path;
1097-
if (!key_or_path.empty() && key_or_path.front() == '/')
1098-
key_or_path.erase(0, 1);
1099-
}
1100-
else if (!file_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority.empty())
1101-
{
1102-
// Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
1103-
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
1104-
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
1105-
storage_to_use = ObjectStoragePtr(cloned.release());
1106-
1107-
key_or_path = file_uri_parsed.path;
1108-
if (!key_or_path.empty() && key_or_path.front() == '/')
1109-
key_or_path.erase(0, 1);
1110-
}
1111-
}
1112-
1113-
ObjectInfo manifest_object_info(key_or_path, std::nullopt, filename);
1073+
ObjectInfo manifest_object_info(key, std::nullopt, full_filename);
11141074

11151075
auto read_settings = local_context->getReadSettings();
11161076
/// Do not utilize filesystem cache if more precise cache enabled
11171077
if (manifest_cache)
11181078
read_settings.enable_filesystem_cache = false;
11191079

11201080
auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings);
1121-
AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(local_context));
1122-
auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, filename);
1081+
AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), full_filename, getFormatSettings(local_context));
1082+
auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, full_filename);
11231083
schema_processor.addIcebergTableSchema(schema_object);
11241084
return std::make_shared<ManifestFileContent>(
11251085
manifest_file_deserializer,
@@ -1128,12 +1088,13 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
11281088
schema_object,
11291089
schema_processor,
11301090
inherited_sequence_number,
1091+
table_location,
11311092
local_context);
11321093
};
11331094

11341095
if (manifest_cache)
11351096
{
1136-
auto manifest_file = manifest_cache->getOrSetManifestFile(IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn);
1097+
auto manifest_file = manifest_cache->getOrSetManifestFile(IcebergMetadataFilesCache::getKey(configuration_ptr, full_filename), create_fn);
11371098
schema_processor.addIcebergTableSchema(manifest_file->getSchemaObject());
11381099
return manifest_file;
11391100
}
@@ -1275,12 +1236,14 @@ class IcebergKeysIterator : public IObjectIterator
12751236
public:
12761237
IcebergKeysIterator(
12771238
Strings && data_files_,
1239+
const std::string & table_location_,
12781240
ObjectStoragePtr object_storage_,
12791241
std::map<String, ObjectStoragePtr> & secondary_storages_,
12801242
IDataLakeMetadata::FileProgressCallback callback_,
12811243
ContextPtr local_context_,
12821244
StorageObjectStorage::ConfigurationPtr configuration_ptr_)
12831245
: data_files(data_files_)
1246+
, table_location(table_location_)
12841247
, object_storage(object_storage_)
12851248
, secondary_storages(secondary_storages_)
12861249
, callback(callback_)
@@ -1300,33 +1263,23 @@ class IcebergKeysIterator : public IObjectIterator
13001263
if (current_index >= data_files.size())
13011264
return nullptr;
13021265

1303-
auto file_uri = data_files[current_index];
1304-
auto file_uri_parsed = Iceberg::parseUri(file_uri);
1305-
1306-
ObjectStoragePtr storage_to_use;
1266+
const auto & raw_path = data_files[current_index];
13071267

1308-
if (file_uri_parsed.authority == object_storage->getObjectsNamespace())
1309-
storage_to_use = object_storage;
1310-
else if (secondary_storages.contains(file_uri_parsed.authority))
1311-
storage_to_use = secondary_storages.at(file_uri_parsed.authority);
1312-
else
1313-
{
1314-
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
1315-
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
1316-
storage_to_use = ObjectStoragePtr(cloned.release());
1317-
secondary_storages[file_uri_parsed.authority] = storage_to_use;
1318-
}
1268+
// Route to correct storage
1269+
auto [storage_to_use, key] = Iceberg::resolveObjectStorageForPath(
1270+
table_location, raw_path, object_storage, secondary_storages, local_context);
13191271

1320-
auto object_metadata = storage_to_use->getObjectMetadata(file_uri_parsed.path);
1272+
auto object_metadata = storage_to_use->getObjectMetadata(key);
13211273

13221274
if (callback)
13231275
callback(FileProgress(0, object_metadata.size_bytes));
13241276

1325-
return std::make_shared<ObjectInfo>(file_uri_parsed.path, std::move(object_metadata), file_uri);
1277+
return std::make_shared<ObjectInfo>(key, std::move(object_metadata), raw_path, storage_to_use);
13261278
}
13271279

13281280
private:
13291281
Strings data_files;
1282+
const String table_location;
13301283
ObjectStoragePtr object_storage;
13311284
std::map<String, ObjectStoragePtr> & secondary_storages;
13321285
std::atomic<size_t> index = 0;
@@ -1342,7 +1295,7 @@ ObjectIterator IcebergMetadata::createIcebergKeysIterator(
13421295
IDataLakeMetadata::FileProgressCallback callback_,
13431296
ContextPtr local_context)
13441297
{
1345-
return std::make_shared<IcebergKeysIterator>(std::move(data_files_), object_storage, secondary_storages, callback_, local_context, configuration.lock());
1298+
return std::make_shared<IcebergKeysIterator>(std::move(data_files_), table_location, object_storage, secondary_storages, callback_, local_context, configuration.lock());
13461299
}
13471300

13481301
ObjectIterator IcebergMetadata::iterate(

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ ManifestFileContent::ManifestFileContent(
126126
Poco::JSON::Object::Ptr schema_object_,
127127
const IcebergSchemaProcessor & schema_processor,
128128
Int64 inherited_sequence_number,
129+
std::string table_location_,
129130
DB::ContextPtr context)
130131
{
132+
this->table_location = std::move(table_location_);
131133
this->schema_id = schema_id_;
132134
this->schema_object = schema_object_;
133135

@@ -280,7 +282,7 @@ ManifestFileContent::ManifestFileContent(
280282
columns_infos[column_id].hyperrectangle.emplace(*left, true, *right, true);
281283
}
282284

283-
FileEntry file = FileEntry{DataFileEntry{file_path}};
285+
FileEntry file = FileEntry{DataFileEntry{Iceberg::makeAbsolutePath(table_location, file_path)}};
284286

285287
Int64 added_sequence_number = 0;
286288
if (format_version_ > 1)

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class ManifestFileContent
9292
Poco::JSON::Object::Ptr schema_object_,
9393
const DB::IcebergSchemaProcessor & schema_processor,
9494
Int64 inherited_sequence_number,
95+
std::string table_location_,
9596
DB::ContextPtr context);
9697

9798
const std::vector<ManifestFileEntry> & getFiles() const;
@@ -121,6 +122,8 @@ class ManifestFileContent
121122

122123
std::set<Int32> column_ids_which_have_bounds;
123124

125+
std::string table_location;
126+
124127
};
125128

126129
using ManifestFilePtr = std::shared_ptr<const ManifestFileContent>;

0 commit comments

Comments
 (0)