From 3aa85abb260e15ca68e6b5b676f0c931623821f6 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 7 Aug 2025 21:12:50 +0000 Subject: [PATCH 1/3] Merge pull request #84710 from ilejn/search_orphaned_parts Limit disks to search for orphaned parts --- src/Core/SettingsEnums.cpp | 7 ++ src/Core/SettingsEnums.h | 8 +++ src/Interpreters/DatabaseCatalog.cpp | 22 +++++- src/Storages/MergeTree/MergeTreeData.cpp | 13 +++- src/Storages/MergeTree/MergeTreeData.h | 5 ++ src/Storages/MergeTree/MergeTreeSettings.h | 1 + .../test_search_orphaned_parts/__init__.py | 0 .../configs/storage_conf.xml | 66 +++++++++++++++++ .../configs/users.xml | 10 +++ .../test_search_orphaned_parts/test.py | 72 +++++++++++++++++++ 10 files changed, 200 insertions(+), 4 deletions(-) create mode 100644 tests/integration/test_search_orphaned_parts/__init__.py create mode 100644 tests/integration/test_search_orphaned_parts/configs/storage_conf.xml create mode 100644 tests/integration/test_search_orphaned_parts/configs/users.xml create mode 100644 tests/integration/test_search_orphaned_parts/test.py diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 1762022102dd..84487ff5087b 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -249,4 +249,11 @@ IMPLEMENT_SETTING_ENUM( GroupArrayActionWhenLimitReached, ErrorCodes::BAD_ARGUMENTS, {{"throw", GroupArrayActionWhenLimitReached::THROW}, {"discard", GroupArrayActionWhenLimitReached::DISCARD}}) + +IMPLEMENT_SETTING_ENUM( + SearchOrphanedPartsDisks, + ErrorCodes::BAD_ARGUMENTS, + {{"any", SearchOrphanedPartsDisks::ANY}, + {"local", SearchOrphanedPartsDisks::LOCAL}, + {"none", SearchOrphanedPartsDisks::NONE}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d8d77504b85f..2c92fe6f6432 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -354,4 +354,12 @@ enum class GroupArrayActionWhenLimitReached : uint8_t }; DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached) +enum class SearchOrphanedPartsDisks : uint8_t +{ + NONE, + LOCAL, + ANY +}; + +DECLARE_SETTING_ENUM(SearchOrphanedPartsDisks) } diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index fb4fad85f66d..bf43688d464b 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include #include @@ -1370,11 +1372,29 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) table.table->drop(); } + /// Check if we are interested in a particular disk + /// or it is better to bypass it e.g. to avoid interactions with a remote storage + auto is_disk_eligible_for_search = [this](DiskPtr disk, std::shared_ptr storage) + { + bool is_disk_eligible = !disk->isReadOnly(); + + /// Disk is not actually used by MergeTree table + if (is_disk_eligible && storage && !storage->getStoragePolicy()->tryGetVolumeIndexByDiskName(disk->getName()).has_value()) + { + SearchOrphanedPartsDisks mode = storage->getSettings()->search_orphaned_parts_disks; + is_disk_eligible = mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote()); + } + + LOG_TRACE(log, "is disk {} eligible for search: {}", disk->getName(), is_disk_eligible); + return is_disk_eligible; + }; + /// Even if table is not loaded, try remove its data from disks. for (const auto & [disk_name, disk] : getContext()->getDisksMap()) { String data_path = "store/" + getPathForUUID(table.table_id.uuid); - if (disk->isReadOnly() || !disk->exists(data_path)) + auto table_merge_tree = std::dynamic_pointer_cast(table.table); + if (!is_disk_eligible_for_search(disk, table_merge_tree) || !disk->exists(data_path)) continue; LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 76215c0f8673..4061f791763f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1683,6 +1683,14 @@ std::vector MergeTreeData::loadDataPartsFromDisk( return loaded_parts; } +bool MergeTreeData::isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const +{ + SearchOrphanedPartsDisks mode = getSettings()->search_orphaned_parts_disks; + bool is_disk_eligible = !disk->isBroken() && !disk->isCustomDisk() && (mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote())); + + LOG_TRACE(log, "is disk {} eligible for search: {} (mode {})", disk->getName(), is_disk_eligible, mode); + return is_disk_eligible; +} void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional> expected_parts) { @@ -1697,7 +1705,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optionalisDefaultPolicy() && !skip_sanity_checks) { - /// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks. + /// Check extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks. std::unordered_set defined_disk_names; for (const auto & disk_ptr : disks) @@ -1731,14 +1739,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional skip_check_disks; for (const auto & [disk_name, disk] : getContext()->getDisksMap()) { - if (disk->isBroken() || disk->isCustomDisk()) + if (!isDiskEligibleForOrphanedPartsSearch(disk)) { skip_check_disks.insert(disk_name); continue; } bool is_disk_defined = defined_disk_names.contains(disk_name); - if (!is_disk_defined && disk->exists(relative_data_path)) { /// There still a chance that underlying disk is defined in storage policy diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index dc37d5e7dadf..af14d7c9d98f 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1752,6 +1752,11 @@ class MergeTreeData : public IStorage, public WithMutableContext void checkColumnFilenamesForCollision(const StorageInMemoryMetadata & metadata, bool throw_on_error) const; void checkColumnFilenamesForCollision(const ColumnsDescription & columns, const MergeTreeSettings & settings, bool throw_on_error) const; + + /// Is the disk should be searched for orphaned parts (ones that belong to a table based on file names, but located + /// on disks that are not a part of storage policy of the table). + /// Sometimes it is better to bypass a disk e.g. to avoid interactions with a remote storage + bool isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const; }; /// RAII struct to record big parts that are submerging or emerging. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index ea8b92da752e..26110b089101 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -195,6 +195,7 @@ struct Settings; M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \ M(Bool, enable_block_number_column, false, "Enable persisting column _block_number for each row.", 0) ALIAS(allow_experimental_block_number_column) \ M(Bool, enable_block_offset_column, false, "Enable persisting column _block_offset for each row.", 0) \ + M(SearchOrphanedPartsDisks, search_orphaned_parts_disks, SearchOrphanedPartsDisks::ANY, "ClickHouse scans all disks for orphaned parts upon any ATTACH or CREATE table in order to not allow to miss data parts at undefined (not included in policy) disks. Orphaned parts originates from potentially unsafe storage reconfiguration, e.g. if a disk was excluded from storage policy. This setting limits scope of disks to search by traits of the disks. Possible values: - any - scope is not limited, - local - scope is limited by local disks, - none - empty scope, do not search", 0) \ \ /** Experimental/work in progress feature. Unsafe for production. */ \ M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \ diff --git a/tests/integration/test_search_orphaned_parts/__init__.py b/tests/integration/test_search_orphaned_parts/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_search_orphaned_parts/configs/storage_conf.xml b/tests/integration/test_search_orphaned_parts/configs/storage_conf.xml new file mode 100644 index 000000000000..28fae70569a6 --- /dev/null +++ b/tests/integration/test_search_orphaned_parts/configs/storage_conf.xml @@ -0,0 +1,66 @@ + + + + + 1024 + + + s3_plain + http://minio1:9001/root/data/disks/disk_s3_plain/ + minio + minio123 + true + + + /internal/1/ + + + local_blob_storage + /internal/2/ + + + + + + + + + + + + + cache + local_disk_2 + /local_cache/ + LRU + 12M + 100K + 100K + 1 + + + + + + + default + + + + + +
+ local_disk_1 +
+
+
+ + +
+ local_cache +
+
+
+
+
+
diff --git a/tests/integration/test_search_orphaned_parts/configs/users.xml b/tests/integration/test_search_orphaned_parts/configs/users.xml new file mode 100644 index 000000000000..5a46eca567df --- /dev/null +++ b/tests/integration/test_search_orphaned_parts/configs/users.xml @@ -0,0 +1,10 @@ + + + + 1 + 5 + 0 + 0 + + + diff --git a/tests/integration/test_search_orphaned_parts/test.py b/tests/integration/test_search_orphaned_parts/test.py new file mode 100644 index 000000000000..654c13501cd4 --- /dev/null +++ b/tests/integration/test_search_orphaned_parts/test.py @@ -0,0 +1,72 @@ +import logging +import pytest + +from helpers.cluster import ClickHouseCluster + + +def get_cluster(with_minio): + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node", + main_configs=["configs/storage_conf.xml"], + user_configs=["configs/users.xml"], + with_minio=with_minio, + stay_alive=True, + # remote database disk adds MinIO implicitly + # FIXME: disable with_remote_database_disk if with_minio set to False explicitly + ) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + return cluster + + +# ClickHouse checks extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks. +# The test verifies how the search of orphaned parts works if there is no connection to MinIO. +# The following is expected +# * search_orphaned_parts_disks is `none` - does not search s3, the query is successful +# * search_orphaned_parts_disks is `local` - does not search s3, the query is successful +# * search_orphaned_parts_disks is `any` - searches s3, the query throws if no MinIO +# Note that disk_s3_plain is configured disk that is not used either in n_s3 or local_cache policies. +@pytest.mark.parametrize("with_minio", [True, False]) +def test_search_orphaned_parts(with_minio): + table_name = "t1" + + try: + cluster = get_cluster(with_minio) + + node = cluster.instances["node"] + + for search_mode in ["any", "local", "none"]: + for storage_policy in ["no_s3", "local_cache"]: + node.query(f"DROP TABLE IF EXISTS {table_name} SYNC") + + if search_mode == "any" and not with_minio: + assert "Code: 499. DB::Exception" in node.query_and_get_error( + f""" + CREATE TABLE {table_name} ( + id Int64, + data String + ) ENGINE=MergeTree() + PARTITION BY id % 10 + ORDER BY id + SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}' + """ + ) + else: + node.query( + f""" + CREATE TABLE {table_name} ( + id Int64, + data String + ) ENGINE=MergeTree() + PARTITION BY id % 10 + ORDER BY id + SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}' + """ + ) + node.query(f"DROP TABLE IF EXISTS {table_name} SYNC") + + finally: + cluster.shutdown() From 2a08417a7a7d6fad4d9a1eef8cf7561b8173c43d Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 3 Oct 2025 16:09:31 +0000 Subject: [PATCH 2/3] Trigger CI From 41f323ba2338a807988f07507fecbe3af59e2df5 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Thu, 9 Oct 2025 15:33:27 +0200 Subject: [PATCH 3/3] poke CI