Skip to content

Commit 9f759f2

Browse files
authored
Merge branch 'releases/24.8.14' into customizations/24.8.14
2 parents 90cb6cf + 1681701 commit 9f759f2

File tree

10 files changed

+200
-4
lines changed

10 files changed

+200
-4
lines changed

src/Core/SettingsEnums.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,4 +249,11 @@ IMPLEMENT_SETTING_ENUM(
249249
GroupArrayActionWhenLimitReached,
250250
ErrorCodes::BAD_ARGUMENTS,
251251
{{"throw", GroupArrayActionWhenLimitReached::THROW}, {"discard", GroupArrayActionWhenLimitReached::DISCARD}})
252+
253+
IMPLEMENT_SETTING_ENUM(
254+
SearchOrphanedPartsDisks,
255+
ErrorCodes::BAD_ARGUMENTS,
256+
{{"any", SearchOrphanedPartsDisks::ANY},
257+
{"local", SearchOrphanedPartsDisks::LOCAL},
258+
{"none", SearchOrphanedPartsDisks::NONE}})
252259
}

src/Core/SettingsEnums.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,4 +354,12 @@ enum class GroupArrayActionWhenLimitReached : uint8_t
354354
};
355355
DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)
356356

357+
enum class SearchOrphanedPartsDisks : uint8_t
358+
{
359+
NONE,
360+
LOCAL,
361+
ANY
362+
};
363+
364+
DECLARE_SETTING_ENUM(SearchOrphanedPartsDisks)
357365
}

src/Interpreters/DatabaseCatalog.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include <Databases/DatabaseOnDisk.h>
1515
#include <Disks/IDisk.h>
1616
#include <Storages/StorageMemory.h>
17+
#include <Storages/MergeTree/MergeTreeSettings.h>
18+
#include <Storages/MergeTree/MergeTreeData.h>
1719
#include <Core/BackgroundSchedulePool.h>
1820
#include <Parsers/formatAST.h>
1921
#include <IO/ReadHelpers.h>
@@ -1370,11 +1372,29 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
13701372
table.table->drop();
13711373
}
13721374

1375+
/// Check if we are interested in a particular disk
1376+
/// or it is better to bypass it e.g. to avoid interactions with a remote storage
1377+
auto is_disk_eligible_for_search = [this](DiskPtr disk, std::shared_ptr<MergeTreeData> storage)
1378+
{
1379+
bool is_disk_eligible = !disk->isReadOnly();
1380+
1381+
/// Disk is not actually used by MergeTree table
1382+
if (is_disk_eligible && storage && !storage->getStoragePolicy()->tryGetVolumeIndexByDiskName(disk->getName()).has_value())
1383+
{
1384+
SearchOrphanedPartsDisks mode = storage->getSettings()->search_orphaned_parts_disks;
1385+
is_disk_eligible = mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote());
1386+
}
1387+
1388+
LOG_TRACE(log, "is disk {} eligible for search: {}", disk->getName(), is_disk_eligible);
1389+
return is_disk_eligible;
1390+
};
1391+
13731392
/// Even if table is not loaded, try remove its data from disks.
13741393
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
13751394
{
13761395
String data_path = "store/" + getPathForUUID(table.table_id.uuid);
1377-
if (disk->isReadOnly() || !disk->exists(data_path))
1396+
auto table_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table.table);
1397+
if (!is_disk_eligible_for_search(disk, table_merge_tree) || !disk->exists(data_path))
13781398
continue;
13791399

13801400
LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name);

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,6 +1683,14 @@ std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(
16831683
return loaded_parts;
16841684
}
16851685

1686+
bool MergeTreeData::isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const
1687+
{
1688+
SearchOrphanedPartsDisks mode = getSettings()->search_orphaned_parts_disks;
1689+
bool is_disk_eligible = !disk->isBroken() && !disk->isCustomDisk() && (mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote()));
1690+
1691+
LOG_TRACE(log, "is disk {} eligible for search: {} (mode {})", disk->getName(), is_disk_eligible, mode);
1692+
return is_disk_eligible;
1693+
}
16861694

16871695
void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts)
16881696
{
@@ -1697,7 +1705,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
16971705
/// Only check if user did touch storage configuration for this table.
16981706
if (!getStoragePolicy()->isDefaultPolicy() && !skip_sanity_checks)
16991707
{
1700-
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
1708+
/// Check extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks.
17011709
std::unordered_set<String> defined_disk_names;
17021710

17031711
for (const auto & disk_ptr : disks)
@@ -1731,14 +1739,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
17311739
std::unordered_set<String> skip_check_disks;
17321740
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
17331741
{
1734-
if (disk->isBroken() || disk->isCustomDisk())
1742+
if (!isDiskEligibleForOrphanedPartsSearch(disk))
17351743
{
17361744
skip_check_disks.insert(disk_name);
17371745
continue;
17381746
}
17391747

17401748
bool is_disk_defined = defined_disk_names.contains(disk_name);
1741-
17421749
if (!is_disk_defined && disk->exists(relative_data_path))
17431750
{
17441751
/// There still a chance that underlying disk is defined in storage policy

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,6 +1752,11 @@ class MergeTreeData : public IStorage, public WithMutableContext
17521752

17531753
void checkColumnFilenamesForCollision(const StorageInMemoryMetadata & metadata, bool throw_on_error) const;
17541754
void checkColumnFilenamesForCollision(const ColumnsDescription & columns, const MergeTreeSettings & settings, bool throw_on_error) const;
1755+
1756+
/// Is the disk should be searched for orphaned parts (ones that belong to a table based on file names, but located
1757+
/// on disks that are not a part of storage policy of the table).
1758+
/// Sometimes it is better to bypass a disk e.g. to avoid interactions with a remote storage
1759+
bool isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const;
17551760
};
17561761

17571762
/// RAII struct to record big parts that are submerging or emerging.

src/Storages/MergeTree/MergeTreeSettings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ struct Settings;
195195
M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \
196196
M(Bool, enable_block_number_column, false, "Enable persisting column _block_number for each row.", 0) ALIAS(allow_experimental_block_number_column) \
197197
M(Bool, enable_block_offset_column, false, "Enable persisting column _block_offset for each row.", 0) \
198+
M(SearchOrphanedPartsDisks, search_orphaned_parts_disks, SearchOrphanedPartsDisks::ANY, "ClickHouse scans all disks for orphaned parts upon any ATTACH or CREATE table in order to not allow to miss data parts at undefined (not included in policy) disks. Orphaned parts originates from potentially unsafe storage reconfiguration, e.g. if a disk was excluded from storage policy. This setting limits scope of disks to search by traits of the disks. Possible values: - any - scope is not limited, - local - scope is limited by local disks, - none - empty scope, do not search", 0) \
198199
\
199200
/** Experimental/work in progress feature. Unsafe for production. */ \
200201
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \

tests/integration/test_search_orphaned_parts/__init__.py

Whitespace-only changes.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<clickhouse>
2+
<storage_configuration>
3+
<disks>
4+
<default>
5+
<keep_free_space_bytes>1024</keep_free_space_bytes>
6+
</default>
7+
<disk_s3_plain>
8+
<type>s3_plain</type>
9+
<endpoint>http://minio1:9001/root/data/disks/disk_s3_plain/</endpoint>
10+
<access_key_id>minio</access_key_id>
11+
<secret_access_key>minio123</secret_access_key>
12+
<skip_access_check>true</skip_access_check>
13+
</disk_s3_plain>
14+
<local_disk_1>
15+
<path>/internal/1/</path>
16+
</local_disk_1>
17+
<local_disk_2>
18+
<type>local_blob_storage</type>
19+
<path>/internal/2/</path>
20+
</local_disk_2>
21+
<!-- plain_rewritable disk is not able to start w/o S3, since it tries to load metadata at start-->
22+
<!-- <s3_plain_rewritable> -->
23+
<!-- <type>object_storage</type> -->
24+
<!-- <object_storage_type>s3</object_storage_type> -->
25+
<!-- <metadata_type>plain_rewritable</metadata_type> -->
26+
<!-- <endpoint>http://minio1:9001/root/data/disks/disk_s3_plain_rewritable/</endpoint> -->
27+
<!-- <access_key_id>minio</access_key_id> -->
28+
<!-- <secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key> -->
29+
<!-- <skip_access_check>true</skip_access_check> -->
30+
<!-- </s3_plain_rewritable> -->
31+
<local_cache>
32+
<type>cache</type>
33+
<disk>local_disk_2</disk>
34+
<path>/local_cache/</path>
35+
<cache_policy>LRU</cache_policy>
36+
<max_size>12M</max_size>
37+
<max_file_segment_size>100K</max_file_segment_size>
38+
<boundary_alignment>100K</boundary_alignment>
39+
<cache_on_write_operations>1</cache_on_write_operations>
40+
</local_cache>
41+
</disks>
42+
<policies>
43+
<default>
44+
<volumes>
45+
<vol1>
46+
<disk>default</disk>
47+
</vol1>
48+
</volumes>
49+
</default>
50+
<no_s3>
51+
<volumes>
52+
<main>
53+
<disk>local_disk_1</disk>
54+
</main>
55+
</volumes>
56+
</no_s3>
57+
<local_cache>
58+
<volumes>
59+
<main>
60+
<disk>local_cache</disk>
61+
</main>
62+
</volumes>
63+
</local_cache>
64+
</policies>
65+
</storage_configuration>
66+
</clickhouse>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<clickhouse>
2+
<profiles>
3+
<default>
4+
<enable_s3_requests_logging>1</enable_s3_requests_logging>
5+
<s3_retry_attempts>5</s3_retry_attempts>
6+
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
7+
<s3_validate_request_settings>0</s3_validate_request_settings>
8+
</default>
9+
</profiles>
10+
</clickhouse>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import logging
2+
import pytest
3+
4+
from helpers.cluster import ClickHouseCluster
5+
6+
7+
def get_cluster(with_minio):
8+
cluster = ClickHouseCluster(__file__)
9+
cluster.add_instance(
10+
"node",
11+
main_configs=["configs/storage_conf.xml"],
12+
user_configs=["configs/users.xml"],
13+
with_minio=with_minio,
14+
stay_alive=True,
15+
# remote database disk adds MinIO implicitly
16+
# FIXME: disable with_remote_database_disk if with_minio set to False explicitly
17+
)
18+
logging.info("Starting cluster...")
19+
cluster.start()
20+
logging.info("Cluster started")
21+
22+
return cluster
23+
24+
25+
# ClickHouse checks extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks.
26+
# The test verifies how the search of orphaned parts works if there is no connection to MinIO.
27+
# The following is expected
28+
# * search_orphaned_parts_disks is `none` - does not search s3, the query is successful
29+
# * search_orphaned_parts_disks is `local` - does not search s3, the query is successful
30+
# * search_orphaned_parts_disks is `any` - searches s3, the query throws if no MinIO
31+
# Note that disk_s3_plain is configured disk that is not used either in n_s3 or local_cache policies.
32+
@pytest.mark.parametrize("with_minio", [True, False])
33+
def test_search_orphaned_parts(with_minio):
34+
table_name = "t1"
35+
36+
try:
37+
cluster = get_cluster(with_minio)
38+
39+
node = cluster.instances["node"]
40+
41+
for search_mode in ["any", "local", "none"]:
42+
for storage_policy in ["no_s3", "local_cache"]:
43+
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
44+
45+
if search_mode == "any" and not with_minio:
46+
assert "Code: 499. DB::Exception" in node.query_and_get_error(
47+
f"""
48+
CREATE TABLE {table_name} (
49+
id Int64,
50+
data String
51+
) ENGINE=MergeTree()
52+
PARTITION BY id % 10
53+
ORDER BY id
54+
SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}'
55+
"""
56+
)
57+
else:
58+
node.query(
59+
f"""
60+
CREATE TABLE {table_name} (
61+
id Int64,
62+
data String
63+
) ENGINE=MergeTree()
64+
PARTITION BY id % 10
65+
ORDER BY id
66+
SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}'
67+
"""
68+
)
69+
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
70+
71+
finally:
72+
cluster.shutdown()

0 commit comments

Comments
 (0)