Skip to content

Commit f2f6a91

Browse files
authored
Merge branch 'antalya-25.3' into feature/antalya-25.3/iceberg_metadata_file_path_for_swarm
2 parents 17f79a2 + 1abef61 commit f2f6a91

File tree

6 files changed

+120
-18
lines changed

6 files changed

+120
-18
lines changed

src/Core/Settings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5719,7 +5719,7 @@ Query Iceberg table using the specific snapshot id.
57195719
DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"(
57205720
Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)
57215721
)", 0) \
5722-
DECLARE(Bool, use_iceberg_partition_pruning, false, R"(
5722+
DECLARE(Bool, use_iceberg_partition_pruning, true, R"(
57235723
Use Iceberg partition pruning for Iceberg tables
57245724
)", 0) \
57255725
DECLARE(Bool, allow_deprecated_snowflake_conversion_functions, false, R"(

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7070
{
7171
// Altinity Antalya modifications atop of 25.3
7272
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
73+
{"use_iceberg_partition_pruning", false, true, "Enable Iceberg partition pruning by default."},
7374
});
7475
addSettingsChanges(settings_changes_history, "25.2.1.20000",
7576
{
@@ -134,7 +135,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
134135
{"distributed_cache_connect_max_tries", 20, 20, "Cloud only"},
135136
{"query_plan_use_new_logical_join_step", false, false, "New join step, internal change"},
136137
{"distributed_cache_min_bytes_for_seek", 0, 0, "New private setting."},
137-
{"use_iceberg_partition_pruning", false, false, "New setting"},
138+
{"use_iceberg_partition_pruning", false, false, "New setting for Iceberg partition pruning."},
138139
{"max_bytes_ratio_before_external_group_by", 0.0, 0.5, "Enable automatic spilling to disk by default."},
139140
{"max_bytes_ratio_before_external_sort", 0.0, 0.5, "Enable automatic spilling to disk by default."},
140141
{"min_external_sort_block_bytes", 0., 100_MiB, "New setting."},

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
1414
#include <Storages/ObjectStorage/StorageObjectStorageSettings.h>
1515
#include <Interpreters/ExpressionActions.h>
16+
#include <IO/CompressedReadBufferWrapper.h>
1617

1718
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
1819
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
@@ -106,10 +107,16 @@ std::string normalizeUuid(const std::string & uuid)
106107
}
107108

108109
Poco::JSON::Object::Ptr
109-
readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log)
110+
readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log, CompressionMethod compression_method)
110111
{
111112
ObjectInfo object_info(metadata_file_path);
112-
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
113+
auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
114+
115+
std::unique_ptr<ReadBuffer> buf;
116+
if (compression_method != CompressionMethod::None)
117+
buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method);
118+
else
119+
buf = std::move(source_buf);
113120

114121
String json_str;
115122
readJSONObjectPossiblyInvalid(json_str, *buf);
@@ -263,7 +270,30 @@ Int32 IcebergMetadata::parseTableSchema(
263270
}
264271
}
265272

266-
static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & path)
273+
struct MetadataFileWithInfo
274+
{
275+
Int32 version;
276+
String path;
277+
CompressionMethod compression_method;
278+
};
279+
280+
static CompressionMethod getCompressionMethodFromMetadataFile(const String & path)
281+
{
282+
constexpr std::string_view metadata_suffix = ".metadata.json";
283+
284+
auto compression_method = chooseCompressionMethod(path, "auto");
285+
286+
/// NOTE you will be surprised, but some metadata files store compression not in the end of the file name,
287+
/// but somewhere in the middle of the file name, before metadata.json suffix.
288+
/// Maybe history of Iceberg metadata files is not so long, but it is already full of surprises.
289+
/// Example of weird engineering decisions: 00000-85befd5a-69c7-46d4-bca6-cfbd67f0f7e6.gz.metadata.json
290+
if (compression_method == CompressionMethod::None && path.ends_with(metadata_suffix))
291+
compression_method = chooseCompressionMethod(path.substr(0, path.size() - metadata_suffix.size()), "auto");
292+
293+
return compression_method;
294+
}
295+
296+
static MetadataFileWithInfo getMetadataFileAndVersion(const std::string & path)
267297
{
268298
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
269299
String version_str;
@@ -278,7 +308,10 @@ static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & pa
278308
throw Exception(
279309
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
280310

281-
return std::make_pair(std::stoi(version_str), path);
311+
return MetadataFileWithInfo{
312+
.version = std::stoi(version_str),
313+
.path = path,
314+
.compression_method = getCompressionMethodFromMetadataFile(path)};
282315
}
283316

284317
enum class MostRecentMetadataFileSelectionWay
@@ -289,7 +322,7 @@ enum class MostRecentMetadataFileSelectionWay
289322

290323
struct ShortMetadataFileInfo
291324
{
292-
UInt32 version;
325+
Int32 version;
293326
UInt64 last_updated_ms;
294327
String path;
295328
};
@@ -301,7 +334,7 @@ struct ShortMetadataFileInfo
301334
* 1) v<V>.metadata.json, where V - metadata version.
302335
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
303336
*/
304-
static std::pair<Int32, String> getLatestMetadataFileAndVersion(
337+
static MetadataFileWithInfo getLatestMetadataFileAndVersion(
305338
const ObjectStoragePtr & object_storage,
306339
const StorageObjectStorage::Configuration & configuration,
307340
const ContextPtr & local_context,
@@ -324,10 +357,10 @@ static std::pair<Int32, String> getLatestMetadataFileAndVersion(
324357
metadata_files_with_versions.reserve(metadata_files.size());
325358
for (const auto & path : metadata_files)
326359
{
327-
auto [version, metadata_file_path] = getMetadataFileAndVersion(path);
360+
auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path);
328361
if (need_all_metadata_files_parsing)
329362
{
330-
auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log);
363+
auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log, compression_method);
331364
if (table_uuid.has_value())
332365
{
333366
if (metadata_file_object->has("table-uuid"))
@@ -377,10 +410,11 @@ static std::pair<Int32, String> getLatestMetadataFileAndVersion(
377410
[](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; });
378411
}
379412
}();
380-
return {latest_metadata_file_info.version, latest_metadata_file_info.path};
413+
414+
return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)};
381415
}
382416

383-
static std::pair<Int32, String> getLatestOrExplicitMetadataFileAndVersion(
417+
static MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion(
384418
const ObjectStoragePtr & object_storage,
385419
const StorageObjectStorage::Configuration & configuration,
386420
const ContextPtr & local_context,
@@ -425,14 +459,14 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
425459
{
426460
auto configuration_ptr = configuration.lock();
427461

428-
const auto [metadata_version, metadata_file_path]
462+
const auto [metadata_version, metadata_file_path, compression_method]
429463
= getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
430464

431465
bool metadata_file_changed = false;
432466
if (last_metadata_version != metadata_version)
433467
{
434468
last_metadata_version = metadata_version;
435-
last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log);
469+
last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log, compression_method);
436470
metadata_file_changed = true;
437471
}
438472

@@ -594,12 +628,18 @@ DataLakeMetadataPtr IcebergMetadata::create(
594628
else
595629
LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false.");
596630

597-
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
631+
const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
598632

599633
auto create_fn = [&]()
600634
{
601635
ObjectInfo object_info(metadata_file_path); // NOLINT
602-
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
636+
auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
637+
638+
std::unique_ptr<ReadBuffer> buf;
639+
if (compression_method != CompressionMethod::None)
640+
buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method);
641+
else
642+
buf = std::move(source_buf);
603643

604644
String json_str;
605645
readJSONObjectPossiblyInvalid(json_str, *buf);

tests/integration/test_storage_iceberg/test.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import os
3+
import subprocess
34
import uuid
45
import time
56
from datetime import datetime, timezone
@@ -711,6 +712,26 @@ def add_df(mode):
711712
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
712713
)
713714

715+
# Cluster Query with node1 as coordinator
716+
table_function_expr_cluster = get_creation_expression(
717+
storage_type,
718+
TABLE_NAME,
719+
started_cluster,
720+
table_function=True,
721+
run_on_cluster=True,
722+
)
723+
724+
select_cluster = (
725+
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
726+
)
727+
728+
# Simple size check
729+
assert len(select_regular) == 600
730+
assert len(select_cluster) == 600
731+
732+
# Actual check
733+
assert select_cluster == select_regular
734+
714735
def make_query_from_function(
715736
run_on_cluster=False,
716737
alt_syntax=False,
@@ -3247,3 +3268,43 @@ def check_validity_and_get_prunned_files(select_expression):
32473268

32483269
for query in queries:
32493270
assert check_validity_and_get_prunned_files(query) > 0
3271+
3272+
3273+
@pytest.mark.parametrize("storage_type", ["local", "s3"])
3274+
def test_compressed_metadata(started_cluster, storage_type):
3275+
instance = started_cluster.instances["node1"]
3276+
spark = started_cluster.spark_session
3277+
TABLE_NAME = "test_compressed_metadata_" + storage_type + "_" + get_uuid_str()
3278+
3279+
table_properties = {
3280+
"write.metadata.compression": "gzip"
3281+
}
3282+
3283+
df = spark.createDataFrame([
3284+
(1, "Alice"),
3285+
(2, "Bob")
3286+
], ["id", "name"])
3287+
3288+
# for some reason write.metadata.compression is not working :(
3289+
df.writeTo(TABLE_NAME) \
3290+
.tableProperty("write.metadata.compression", "gzip") \
3291+
.using("iceberg") \
3292+
.create()
3293+
3294+
# manual compression of metadata file before upload, still test some scenarios
3295+
subprocess.check_output(f"gzip /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json", shell=True)
3296+
3297+
# Weird but compression extension is really in the middle of the file name, not in the end...
3298+
subprocess.check_output(f"mv /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json.gz /iceberg_data/default/{TABLE_NAME}/metadata/v1.gz.metadata.json", shell=True)
3299+
3300+
default_upload_directory(
3301+
started_cluster,
3302+
storage_type,
3303+
f"/iceberg_data/default/{TABLE_NAME}/",
3304+
f"/iceberg_data/default/{TABLE_NAME}/",
3305+
)
3306+
3307+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="")
3308+
3309+
assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n"
3310+
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
clickhouse.com
1+
<link rel="icon" href="data:image/svg+xml;base64,PHN2ZyB

tests/queries/0_stateless/01528_play.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
44
# shellcheck source=../shell_config.sh
55
. "$CURDIR"/../shell_config.sh
66

7-
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTP}/play" | grep -o -F 'clickhouse.com'
7+
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTP}/play" | grep -o -F '<link rel="icon" href="data:image/svg+xml;base64,PHN2ZyB'

0 commit comments

Comments
 (0)