Skip to content

Commit 143c05f

Browse files
authored
Merge pull request #898 from Altinity/feature/antalya-25.3/iceberg_metadata_file_path_for_swarm
Send iceberg_metadata_file_path setting to swarm node
2 parents 1abef61 + f2f6a91 commit 143c05f

File tree

2 files changed

+412
-296
lines changed

2 files changed

+412
-296
lines changed
Lines changed: 396 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,396 @@
1+
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
2+
#include <Storages/NamedCollectionsHelpers.h>
3+
#include <Storages/StorageFactory.h>
4+
#include <Common/logger_useful.h>
5+
#include <Storages/ColumnsDescription.h>
6+
#include <Parsers/ASTLiteral.h>
7+
#include <Parsers/ASTFunction.h>
8+
#include <Parsers/ASTIdentifier.h>
9+
#include <Parsers/ASTSetQuery.h>
10+
#include <Disks/DiskType.h>
11+
12+
#include <memory>
13+
#include <string>
14+
15+
#include <Common/ErrorCodes.h>
16+
17+
#include <fmt/ranges.h>
18+
19+
namespace DB
20+
{
21+
22+
namespace ErrorCodes
23+
{
24+
extern const int FORMAT_VERSION_TOO_OLD;
25+
extern const int LOGICAL_ERROR;
26+
}
27+
28+
namespace StorageObjectStorageSetting
29+
{
30+
extern const StorageObjectStorageSettingsBool allow_dynamic_metadata_for_data_lakes;
31+
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
32+
}
33+
34+
35+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
36+
void DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::update(ObjectStoragePtr object_storage, ContextPtr local_context)
37+
{
38+
BaseStorageConfiguration::update(object_storage, local_context);
39+
40+
bool existed = current_metadata != nullptr;
41+
42+
if (updateMetadataObjectIfNeeded(object_storage, local_context))
43+
{
44+
if (hasExternalDynamicMetadata() && existed)
45+
{
46+
throw Exception(
47+
ErrorCodes::FORMAT_VERSION_TOO_OLD,
48+
"Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query.");
49+
}
50+
}
51+
}
52+
53+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
54+
std::optional<ColumnsDescription> DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::tryGetTableStructureFromMetadata() const
55+
{
56+
if (!current_metadata)
57+
return std::nullopt;
58+
auto schema_from_metadata = current_metadata->getTableSchema();
59+
if (!schema_from_metadata.empty())
60+
{
61+
return ColumnsDescription(std::move(schema_from_metadata));
62+
}
63+
return std::nullopt;
64+
}
65+
66+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
67+
std::optional<String> DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::tryGetSamplePathFromMetadata() const
68+
{
69+
if (!current_metadata)
70+
return std::nullopt;
71+
auto data_files = current_metadata->getDataFiles();
72+
if (!data_files.empty())
73+
return data_files[0];
74+
return std::nullopt;
75+
}
76+
77+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
78+
std::optional<size_t> DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::totalRows()
79+
{
80+
if (!current_metadata)
81+
return {};
82+
83+
return current_metadata->totalRows();
84+
}
85+
86+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
87+
std::shared_ptr<NamesAndTypesList> DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::getInitialSchemaByPath(const String & data_path) const
88+
{
89+
if (!current_metadata)
90+
return {};
91+
return current_metadata->getInitialSchemaByPath(data_path);
92+
}
93+
94+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
95+
std::shared_ptr<const ActionsDAG> DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::getSchemaTransformer(const String & data_path) const
96+
{
97+
if (!current_metadata)
98+
return {};
99+
return current_metadata->getSchemaTransformer(data_path);
100+
}
101+
102+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
103+
bool DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::hasExternalDynamicMetadata()
104+
{
105+
return BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes]
106+
&& current_metadata
107+
&& current_metadata->supportsExternalMetadataChange();
108+
}
109+
110+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
111+
ColumnsDescription DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::updateAndGetCurrentSchema(
112+
ObjectStoragePtr object_storage,
113+
ContextPtr context)
114+
{
115+
BaseStorageConfiguration::update(object_storage, context);
116+
updateMetadataObjectIfNeeded(object_storage, context);
117+
return ColumnsDescription{current_metadata->getTableSchema()};
118+
}
119+
120+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
121+
ReadFromFormatInfo DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::prepareReadingFromFormat(
122+
ObjectStoragePtr object_storage,
123+
const Strings & requested_columns,
124+
const StorageSnapshotPtr & storage_snapshot,
125+
bool supports_subset_of_columns,
126+
ContextPtr local_context)
127+
{
128+
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns);
129+
if (!current_metadata)
130+
{
131+
current_metadata = DataLakeMetadata::create(
132+
object_storage,
133+
weak_from_this(),
134+
local_context);
135+
}
136+
auto read_schema = current_metadata->getReadSchema();
137+
if (!read_schema.empty())
138+
{
139+
/// There is a difference between "table schema" and "read schema".
140+
/// "table schema" is a schema from data lake table metadata,
141+
/// while "read schema" is a schema from data files.
142+
/// In most cases they would be the same.
143+
/// TODO: Try to hide this logic inside IDataLakeMetadata.
144+
145+
const auto read_schema_names = read_schema.getNames();
146+
const auto table_schema_names = current_metadata->getTableSchema().getNames();
147+
chassert(read_schema_names.size() == table_schema_names.size());
148+
149+
if (read_schema_names != table_schema_names)
150+
{
151+
LOG_TEST(log, "Read schema: {}, table schema: {}, requested columns: {}",
152+
fmt::join(read_schema_names, ", "),
153+
fmt::join(table_schema_names, ", "),
154+
fmt::join(info.requested_columns.getNames(), ", "));
155+
156+
auto column_name_mapping = [&]()
157+
{
158+
std::map<std::string, std::string> result;
159+
for (size_t i = 0; i < read_schema_names.size(); ++i)
160+
result[table_schema_names[i]] = read_schema_names[i];
161+
return result;
162+
}();
163+
164+
/// Go through requested columns and change column name
165+
/// from table schema to column name from read schema.
166+
167+
std::vector<NameAndTypePair> read_columns;
168+
for (const auto & column_name : info.requested_columns)
169+
{
170+
const auto pos = info.format_header.getPositionByName(column_name.name);
171+
auto column = info.format_header.getByPosition(pos);
172+
column.name = column_name_mapping.at(column_name.name);
173+
info.format_header.setColumn(pos, column);
174+
175+
read_columns.emplace_back(column.name, column.type);
176+
}
177+
info.requested_columns = NamesAndTypesList(read_columns.begin(), read_columns.end());
178+
}
179+
}
180+
181+
return info;
182+
}
183+
184+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
185+
bool DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::updateMetadataObjectIfNeeded(
186+
ObjectStoragePtr object_storage,
187+
ContextPtr context)
188+
{
189+
if (!current_metadata)
190+
{
191+
current_metadata = DataLakeMetadata::create(
192+
object_storage,
193+
weak_from_this(),
194+
context);
195+
return true;
196+
}
197+
198+
if (current_metadata->supportsUpdate())
199+
{
200+
return current_metadata->update(context);
201+
}
202+
203+
auto new_metadata = DataLakeMetadata::create(
204+
object_storage,
205+
weak_from_this(),
206+
context);
207+
208+
if (*current_metadata != *new_metadata)
209+
{
210+
current_metadata = std::move(new_metadata);
211+
return true;
212+
}
213+
else
214+
{
215+
return false;
216+
}
217+
}
218+
219+
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
220+
ASTPtr DataLakeConfiguration<BaseStorageConfiguration, DataLakeMetadata>::createArgsWithAccessData() const
221+
{
222+
auto res = BaseStorageConfiguration::createArgsWithAccessData();
223+
224+
auto iceberg_metadata_file_path = BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path];
225+
226+
if (iceberg_metadata_file_path.changed)
227+
{
228+
auto * arguments = res->template as<ASTExpressionList>();
229+
if (!arguments)
230+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list");
231+
232+
bool has_settings = false;
233+
234+
for (auto & arg : arguments->children)
235+
{
236+
if (auto * settings = arg->template as<ASTSetQuery>())
237+
{
238+
has_settings = true;
239+
settings->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
240+
break;
241+
}
242+
}
243+
244+
if (!has_settings)
245+
{
246+
std::shared_ptr<ASTSetQuery> settings_ast = std::make_shared<ASTSetQuery>();
247+
settings_ast->is_standalone = false;
248+
settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
249+
arguments->children.push_back(settings_ast);
250+
}
251+
}
252+
253+
return res;
254+
}
255+
256+
#if USE_AVRO
257+
# if USE_AWS_S3
258+
template class DataLakeConfiguration<StorageS3Configuration, IcebergMetadata>;
259+
# endif
260+
# if USE_AZURE_BLOB_STORAGE
261+
template class DataLakeConfiguration<StorageAzureConfiguration, IcebergMetadata>;
262+
# endif
263+
# if USE_HDFS
264+
template class DataLakeConfiguration<StorageHDFSConfiguration, IcebergMetadata>;
265+
# endif
266+
template class DataLakeConfiguration<StorageLocalConfiguration, IcebergMetadata>;
267+
#endif
268+
#if USE_PARQUET && USE_AWS_S3
269+
template class DataLakeConfiguration<StorageS3Configuration, DeltaLakeMetadata>;
270+
#endif
271+
#if USE_PARQUET
272+
template class DataLakeConfiguration<StorageLocalConfiguration, DeltaLakeMetadata>;
273+
#endif
274+
#if USE_AWS_S3
275+
template class DataLakeConfiguration<StorageS3Configuration, HudiMetadata>;
276+
#endif
277+
278+
ObjectStorageType StorageIcebergConfiguration::extractDynamicStorageType(
279+
ASTs & args, ContextPtr context, ASTPtr * type_arg) const
280+
{
281+
static const auto storage_type_name = "storage_type";
282+
283+
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context))
284+
{
285+
if (named_collection->has(storage_type_name))
286+
{
287+
return objectStorageTypeFromString(named_collection->get<String>(storage_type_name));
288+
}
289+
}
290+
291+
auto type_it = args.end();
292+
293+
/// S3 by default for backward compatibility
294+
/// Iceberg without storage_type == IcebergS3
295+
ObjectStorageType type = ObjectStorageType::S3;
296+
297+
for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it)
298+
{
299+
const auto * type_ast_function = (*arg_it)->as<ASTFunction>();
300+
301+
if (type_ast_function && type_ast_function->name == "equals"
302+
&& type_ast_function->arguments && type_ast_function->arguments->children.size() == 2)
303+
{
304+
auto name = type_ast_function->arguments->children[0]->as<ASTIdentifier>();
305+
306+
if (name && name->name() == storage_type_name)
307+
{
308+
if (type_it != args.end())
309+
{
310+
throw Exception(
311+
ErrorCodes::BAD_ARGUMENTS,
312+
"DataLake can have only one key-value argument: storage_type='type'.");
313+
}
314+
315+
auto value = type_ast_function->arguments->children[1]->as<ASTLiteral>();
316+
317+
if (!value)
318+
{
319+
throw Exception(
320+
ErrorCodes::BAD_ARGUMENTS,
321+
"DataLake parameter 'storage_type' has wrong type, string literal expected.");
322+
}
323+
324+
if (value->value.getType() != Field::Types::String)
325+
{
326+
throw Exception(
327+
ErrorCodes::BAD_ARGUMENTS,
328+
"DataLake parameter 'storage_type' has wrong value type, string expected.");
329+
}
330+
331+
type = objectStorageTypeFromString(value->value.safeGet<String>());
332+
333+
type_it = arg_it;
334+
}
335+
}
336+
}
337+
338+
if (type_it != args.end())
339+
{
340+
if (type_arg)
341+
*type_arg = *type_it;
342+
args.erase(type_it);
343+
}
344+
345+
return type;
346+
}
347+
348+
void StorageIcebergConfiguration::createDynamicStorage(ObjectStorageType type)
349+
{
350+
if (impl)
351+
{
352+
if (impl->getType() == type)
353+
return;
354+
355+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage");
356+
}
357+
358+
switch (type)
359+
{
360+
# if USE_AWS_S3
361+
case ObjectStorageType::S3:
362+
impl = std::make_unique<StorageS3IcebergConfiguration>();
363+
break;
364+
# endif
365+
# if USE_AZURE_BLOB_STORAGE
366+
case ObjectStorageType::Azure:
367+
impl = std::make_unique<StorageAzureIcebergConfiguration>();
368+
break;
369+
# endif
370+
# if USE_HDFS
371+
case ObjectStorageType::HDFS:
372+
impl = std::make_unique<StorageHDFSIcebergConfiguration>();
373+
break;
374+
# endif
375+
case ObjectStorageType::Local:
376+
impl = std::make_unique<StorageLocalIcebergConfiguration>();
377+
break;
378+
default:
379+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type);
380+
}
381+
}
382+
383+
StorageObjectStorage::Configuration & StorageIcebergConfiguration::getImpl() const
384+
{
385+
if (!impl)
386+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized");
387+
388+
return *impl;
389+
}
390+
391+
ASTPtr StorageIcebergConfiguration::createArgsWithAccessData() const
392+
{
393+
return getImpl().createArgsWithAccessData();
394+
}
395+
396+
}

0 commit comments

Comments
 (0)