Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ci/jobs/functional_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def main():
is_flaky_check = False
is_bugfix_validation = False
is_s3_storage = False
is_azure_storage = False
is_database_replicated = False
is_shared_catalog = False
is_encrypted_storage = True # randomize it
runner_options = ""
info = Info()

Expand All @@ -163,6 +165,8 @@ def main():

if "s3 storage" in to:
is_s3_storage = True
if "azure" in to:
is_azure_storage = True
if "DatabaseReplicated" in to:
is_database_replicated = True
if "SharedCatalog" in to:
Expand All @@ -180,6 +184,10 @@ def main():
print("Disable azure for a local run")
config_installs_args += " --no-azure"

if (is_azure_storage or is_s3_storage) and is_encrypted_storage:
config_installs_args += " --encrypted-storage"
runner_options += f" --encrypted-storage"

ch_path = args.ch_path

stop_watch = Utils.Stopwatch()
Expand Down
28 changes: 3 additions & 25 deletions src/Backups/BackupCoordinationReplicatedTables.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#if CLICKHOUSE_CLOUD
#include <Storages/SharedMergeTree/SharedMergeTreeMutationEntry.h>
#endif
#include <Common/Exception.h>

#include <boost/range/adaptor/map.hpp>
Expand Down Expand Up @@ -292,13 +295,6 @@ void BackupCoordinationReplicatedTables::prepare() const
auto part_info = MergeTreePartInfo::fromPartName(part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
[[maybe_unused]] const auto & partition_id = part_info.getPartitionId();

auto & min_data_versions_by_partition = table_info.min_data_versions_by_partition;
auto it2 = min_data_versions_by_partition.find(partition_id);
if (it2 == min_data_versions_by_partition.end())
min_data_versions_by_partition[partition_id] = part_info.getDataVersion();
else
it2->second = std::min(it2->second, part_info.getDataVersion());

table_info.covered_parts_finder->addPartInfo(std::move(part_info), part_replicas.replica_names[0]);
}

Expand All @@ -310,24 +306,6 @@ void BackupCoordinationReplicatedTables::prepare() const
const auto & chosen_replica_name = *part_replicas.replica_names[chosen_index];
table_info.part_names_by_replica_name[chosen_replica_name].push_back(part_name);
}

/// Remove finished or unrelated mutations.
std::unordered_map<String, String> unfinished_mutations;
for (const auto & [mutation_id, mutation_entry_str] : table_info.mutations)
{
auto mutation_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_entry_str, mutation_id);
std::map<String, Int64> new_block_numbers;
for (const auto & [partition_id, block_number] : mutation_entry.block_numbers)
{
auto it = table_info.min_data_versions_by_partition.find(partition_id);
if ((it != table_info.min_data_versions_by_partition.end()) && (it->second < block_number))
new_block_numbers[partition_id] = block_number;
}
mutation_entry.block_numbers = std::move(new_block_numbers);
if (!mutation_entry.block_numbers.empty())
unfinished_mutations[mutation_id] = mutation_entry.toString();
}
table_info.mutations = unfinished_mutations;
}
catch (Exception & e)
{
Expand Down
1 change: 0 additions & 1 deletion src/Backups/BackupCoordinationReplicatedTables.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ class BackupCoordinationReplicatedTables
std::map<String /* part_name */, PartReplicas> replicas_by_part_name; /// Should be ordered because we need this map to be in the same order on every replica.
mutable std::unordered_map<String /* replica_name> */, Strings> part_names_by_replica_name;
std::unique_ptr<CoveredPartsFinder> covered_parts_finder;
mutable std::unordered_map<String, Int64> min_data_versions_by_partition;
mutable std::unordered_map<String, String> mutations;
String replica_name_to_store_mutations;
std::unordered_set<String> data_paths;
Expand Down
8 changes: 7 additions & 1 deletion src/Backups/BackupEntriesCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ namespace Setting
extern const SettingsUInt64 backup_restore_keeper_retry_max_backoff_ms;
extern const SettingsUInt64 backup_restore_keeper_max_retries;
extern const SettingsSeconds lock_acquire_timeout;

/// Cloud only
extern const SettingsBool cloud_mode;
}

namespace ErrorCodes
Expand Down Expand Up @@ -112,7 +115,10 @@ BackupEntriesCollector::BackupEntriesCollector(
context->getConfigRef().getUInt64("backups.min_sleep_before_next_attempt_to_collect_metadata", 100))
, max_sleep_before_next_attempt_to_collect_metadata(
context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true))
, compare_collected_metadata(
context->getConfigRef().getBool("backups.compare_collected_metadata",
!context->getSettingsRef()[Setting::cloud_mode])) /// Collected metadata shouldn't be compared by default in our Cloud
/// (because in the Cloud only Replicated databases are used)
, log(getLogger("BackupEntriesCollector"))
, zookeeper_retries_info(
context->getSettingsRef()[Setting::backup_restore_keeper_max_retries],
Expand Down
4 changes: 4 additions & 0 deletions src/Backups/BackupEntryWrappedWith.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class BackupEntryWrappedWith : public IBackupEntry
bool isEncryptedByDisk() const override { return entry->isEncryptedByDisk(); }
bool isFromFile() const override { return entry->isFromFile(); }
bool isFromImmutableFile() const override { return entry->isFromImmutableFile(); }
bool isFromRemoteFile() const override { return entry->isFromRemoteFile(); }
String getEndpointURI() const override { return entry->getEndpointURI(); }
String getNamespace() const override { return entry->getNamespace(); }
String getRemotePath() const override { return entry->getRemotePath(); }
String getFilePath() const override { return entry->getFilePath(); }
DiskPtr getDisk() const override { return entry->getDisk(); }

Expand Down
6 changes: 6 additions & 0 deletions src/Backups/BackupFileInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ BackupFileInfo buildFileInfoForBackupEntry(
info.size = backup_entry->getSize();
info.encrypted_by_disk = backup_entry->isEncryptedByDisk();

if (backup_entry->isFromRemoteFile())
{
info.object_key = backup_entry->getRemotePath();
return info;
}

/// We don't set `info.data_file_name` and `info.data_file_index` in this function because they're set during backup coordination
/// (see the class BackupCoordinationFileInfos).

Expand Down
23 changes: 19 additions & 4 deletions src/Backups/BackupIO_AzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <Poco/Util/AbstractConfiguration.h>
#include <azure/storage/blobs/blob_options.hpp>
#include <azure/core/context.hpp>

#include <filesystem>

Expand Down Expand Up @@ -65,6 +66,16 @@ static bool compareAuthMethod (AzureBlobStorage::AuthMethod auth_method_a, Azure
Azure::Core::Credentials::TokenRequestContext tokenRequestContext;
return managed_identity_a->get()->GetToken(tokenRequestContext, {}).Token == managed_identity_b->get()->GetToken(tokenRequestContext, {}).Token;
}

const auto * static_credential_a = std::get_if<std::shared_ptr<AzureBlobStorage::StaticCredential>>(&auth_method_a);
const auto * static_credential_b = std::get_if<std::shared_ptr<AzureBlobStorage::StaticCredential>>(&auth_method_b);

if (static_credential_a && static_credential_b)
{
Azure::Core::Credentials::TokenRequestContext tokenRequestContext;
auto az_context = Azure::Core::Context();
return static_credential_a->get()->GetToken(tokenRequestContext, az_context).Token == static_credential_b->get()->GetToken(tokenRequestContext, az_context).Token;
}
}
catch (const Azure::Core::Credentials::AuthenticationException & e)
{
Expand All @@ -83,7 +94,7 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false, ""}
, connection_params(connection_params_)
, blob_path(blob_path_)
{
Expand All @@ -95,8 +106,10 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
connection_params.auth_method,
std::move(client_ptr),
std::move(settings_ptr),
connection_params,
connection_params.getContainer(),
connection_params.getConnectionURL());
connection_params.getConnectionURL(),
/*common_key_prefix*/ "");

client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
Expand Down Expand Up @@ -177,7 +190,7 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
const ContextPtr & context_,
bool attempt_to_create_container)
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false, ""}
, connection_params(connection_params_)
, blob_path(blob_path_)
{
Expand All @@ -192,8 +205,10 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
connection_params.auth_method,
std::move(client_ptr),
std::move(settings_ptr),
connection_params,
connection_params.getContainer(),
connection_params.getConnectionURL());
connection_params.getConnectionURL(),
/*common_key_prefix*/ "");


client = object_storage->getAzureBlobStorageClient();
Expand Down
44 changes: 38 additions & 6 deletions src/Backups/BackupIO_S3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ namespace S3AuthSetting
extern const S3AuthSettingsString server_side_encryption_customer_key_base64;
extern const S3AuthSettingsBool use_environment_credentials;
extern const S3AuthSettingsBool use_insecure_imds_request;

extern const S3AuthSettingsString role_arn;
extern const S3AuthSettingsString role_session_name;
extern const S3AuthSettingsString http_client;
extern const S3AuthSettingsString service_account;
extern const S3AuthSettingsString metadata_service;
extern const S3AuthSettingsString request_token_path;
}

namespace S3RequestSetting
Expand All @@ -69,6 +76,8 @@ namespace
const S3::URI & s3_uri,
const String & access_key_id,
const String & secret_access_key,
String role_arn,
String role_session_name,
const S3Settings & settings,
const ContextPtr & context)
{
Expand All @@ -84,6 +93,12 @@ namespace
const Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const Settings & local_settings = context->getSettingsRef();

if (role_arn.empty())
{
role_arn = settings.auth_settings[S3AuthSetting::role_arn];
role_session_name = settings.auth_settings[S3AuthSetting::role_session_name];
}

S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings[S3AuthSetting::region],
context->getRemoteHostFilter(),
Expand All @@ -108,6 +123,11 @@ namespace
client_configuration.http_max_field_name_size = request_settings[S3RequestSetting::http_max_field_name_size];
client_configuration.http_max_field_value_size = request_settings[S3RequestSetting::http_max_field_value_size];

client_configuration.http_client = settings.auth_settings[S3AuthSetting::http_client];
client_configuration.service_account = settings.auth_settings[S3AuthSetting::service_account];
client_configuration.metadata_service = settings.auth_settings[S3AuthSetting::metadata_service];
client_configuration.request_token_path = settings.auth_settings[S3AuthSetting::request_token_path];

S3::ClientSettings client_settings{
.use_virtual_addressing = s3_uri.is_virtual_hosted_style,
.disable_checksum = local_settings[Setting::s3_disable_checksum],
Expand All @@ -128,7 +148,10 @@ namespace
settings.auth_settings[S3AuthSetting::use_environment_credentials],
settings.auth_settings[S3AuthSetting::use_insecure_imds_request],
settings.auth_settings[S3AuthSetting::expiration_window_seconds],
settings.auth_settings[S3AuthSetting::no_sign_request]
settings.auth_settings[S3AuthSetting::no_sign_request],
std::move(role_arn),
std::move(role_session_name),
/*sts_endpoint_override=*/""
});
}

Expand All @@ -150,14 +173,16 @@ BackupReaderS3::BackupReaderS3(
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
const String & role_arn,
const String & role_session_name,
bool allow_s3_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_,
bool is_internal_backup)
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false, ""}
{
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());

Expand All @@ -170,7 +195,7 @@ BackupReaderS3::BackupReaderS3(
s3_settings.request_settings.updateFromSettings(context_->getSettingsRef(), /* if_changed */true);
s3_settings.request_settings[S3RequestSetting::allow_native_copy] = allow_s3_native_copy;

client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, role_arn, role_session_name, s3_settings, context_);

if (auto blob_storage_system_log = context_->getBlobStorageLog())
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
Expand Down Expand Up @@ -247,6 +272,8 @@ BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
const String & role_arn,
const String & role_session_name,
bool allow_s3_native_copy,
const String & storage_class_name,
const ReadSettings & read_settings_,
Expand All @@ -255,7 +282,7 @@ BackupWriterS3::BackupWriterS3(
bool is_internal_backup)
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false, ""}
, s3_capabilities(getCapabilitiesFromConfig(context_->getConfigRef(), "s3"))
{
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());
Expand All @@ -270,7 +297,8 @@ BackupWriterS3::BackupWriterS3(
s3_settings.request_settings[S3RequestSetting::allow_native_copy] = allow_s3_native_copy;
s3_settings.request_settings[S3RequestSetting::storage_class_name] = storage_class_name;

client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, role_arn, role_session_name, s3_settings, context_);

if (auto blob_storage_system_log = context_->getBlobStorageLog())
{
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
Expand Down Expand Up @@ -308,6 +336,10 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
[&]
{
LOG_TRACE(log, "Falling back to copy file {} from disk {} to S3 through buffers", src_path, src_disk->getName());

if (copy_encrypted)
return src_disk->readEncryptedFile(src_path, read_settings);

return src_disk->readFile(src_path, read_settings);
});
return; /// copied!
Expand All @@ -320,7 +352,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src

void BackupWriterS3::copyFile(const String & destination, const String & source, size_t size)
{
LOG_TRACE(log, "Copying file inside backup from {} to {} ", source, destination);
LOG_TRACE(log, "Copying file inside backup from {} to {}", source, destination);

const auto source_key = fs::path(s3_uri.key) / source;
copyS3File(
Expand Down
4 changes: 4 additions & 0 deletions src/Backups/BackupIO_S3.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class BackupReaderS3 : public BackupReaderDefault
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
const String & role_arn,
const String & role_session_name,
bool allow_s3_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
Expand Down Expand Up @@ -54,6 +56,8 @@ class BackupWriterS3 : public BackupWriterDefault
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
const String & role_arn,
const String & role_session_name,
bool allow_s3_native_copy,
const String & storage_class_name,
const ReadSettings & read_settings_,
Expand Down
Loading
Loading