Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction
{
public:
explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {}
size_t size() const override { return arguments ? arguments->size() : 0; }
std::unique_ptr<Argument> at(size_t n) const override { return std::make_unique<ArgumentTreeNode>(arguments->at(n).get()); }
size_t size() const override
{ /// size withous skipped indexes
return arguments ? arguments->size() - skippedSize() : 0;
}
std::unique_ptr<Argument> at(size_t n) const override
{ /// n is relative index, some can be skipped
return std::make_unique<ArgumentTreeNode>(arguments->at(getRealIndex(n)).get());
}
private:
const QueryTreeNodes * arguments = nullptr;
};
Expand Down
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6970,6 +6970,15 @@ Enable PRQL - an alternative to SQL.
)", EXPERIMENTAL) \
DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"(
Trigger processor to spill data into external storage adpatively. grace join is supported at present.
)", EXPERIMENTAL) \
DECLARE(String, object_storage_cluster, "", R"(
Cluster to make distributed requests to object storages with alternative syntax.
)", EXPERIMENTAL) \
DECLARE(UInt64, object_storage_max_nodes, 0, R"(
Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
Possible values:
- Positive integer.
- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_delta_kernel_rs, true, R"(
Allow experimental delta-kernel-rs implementation.
Expand Down
9 changes: 9 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
/// RELEASE CLOSED
});
addSettingsChanges(settings_changes_history, "25.6.5.2000",
{
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.6",
{
/// RELEASE CLOSED
Expand Down
17 changes: 10 additions & 7 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>

#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
Expand All @@ -47,6 +48,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString oauth_server_uri;
extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body;
extern const DatabaseDataLakeSettingsBool vended_credentials;
extern const DatabaseDataLakeSettingsString object_storage_cluster;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
Expand Down Expand Up @@ -176,7 +178,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
return catalog_impl;
}

std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfiguration(
StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const
{
Expand Down Expand Up @@ -423,24 +425,25 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con

/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
StorageObjectStorageConfiguration::initialize(*configuration, args, context_copy, /* with_table_structure */false);
configuration->initialize(args, context_copy, /* with_table_structure */false);

return std::make_shared<StorageObjectStorage>(
auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value;

return std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
context_copy,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* partition_by */nullptr,
context_copy,
/* comment */"",
getFormatSettings(context_copy),
LoadingStrictnessLevel::CREATE,
getCatalog(),
/* if_not_exists*/true,
/* is_datalake_query*/true,
/* distributed_processing */false,
/* partition_by */nullptr,
/* is_table_function */false,
/* lazy_init */true);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
void validateSettings();
std::shared_ptr<DataLake::ICatalog> getCatalog() const;

std::shared_ptr<StorageObjectStorageConfiguration> getConfiguration(
StorageObjectStorageConfigurationPtr getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const;

Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
DB::StorageObjectStorageConfiguration::initialize(*configuration, args, getContext(), false);
configuration->initialize(args, getContext(), false);

auto object_storage = configuration->createObjectStorage(getContext(), true);
const auto & read_settings = getContext()->getReadSettings();
Expand Down
64 changes: 44 additions & 20 deletions src/Disks/DiskType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

MetadataStorageType metadataTypeFromString(const String & type)
MetadataStorageType metadataTypeFromString(const std::string & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "local")
Expand Down Expand Up @@ -60,25 +60,49 @@ std::string DataSourceDescription::toString() const
case DataSourceType::RAM:
return "memory";
case DataSourceType::ObjectStorage:
{
switch (object_storage_type)
{
case ObjectStorageType::S3:
return "s3";
case ObjectStorageType::HDFS:
return "hdfs";
case ObjectStorageType::Azure:
return "azure_blob_storage";
case ObjectStorageType::Local:
return "local_blob_storage";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "none";
case ObjectStorageType::Max:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
}
}
return DB::toString(object_storage_type);
}
}

ObjectStorageType objectStorageTypeFromString(const std::string & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "s3")
return ObjectStorageType::S3;
if (check_type == "hdfs")
return ObjectStorageType::HDFS;
if (check_type == "azure_blob_storage" || check_type == "azure")
return ObjectStorageType::Azure;
if (check_type == "local_blob_storage" || check_type == "local")
return ObjectStorageType::Local;
if (check_type == "web")
return ObjectStorageType::Web;
if (check_type == "none")
return ObjectStorageType::None;

throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"Unknown object storage type: {}", type);
}

std::string toString(ObjectStorageType type)
{
switch (type)
{
case ObjectStorageType::S3:
return "s3";
case ObjectStorageType::HDFS:
return "hdfs";
case ObjectStorageType::Azure:
return "azure_blob_storage";
case ObjectStorageType::Local:
return "local_blob_storage";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "none";
case ObjectStorageType::Max:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
}
}

}
6 changes: 4 additions & 2 deletions src/Disks/DiskType.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ enum class MetadataStorageType : uint8_t
Memory,
};

MetadataStorageType metadataTypeFromString(const String & type);
String toString(DataSourceType data_source_type);
MetadataStorageType metadataTypeFromString(const std::string & type);

ObjectStorageType objectStorageTypeFromString(const std::string & type);
std::string toString(ObjectStorageType type);

struct DataSourceDescription
{
Expand Down
37 changes: 31 additions & 6 deletions src/Interpreters/Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,9 @@ void Cluster::initMisc()
}
}

std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)};
}

std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
Expand Down Expand Up @@ -783,7 +783,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings &

}

Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
Expand All @@ -805,6 +805,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti

if (address.is_local)
info.local_addresses.push_back(address);
addresses_with_failover.emplace_back(Addresses({address}));

auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings[Setting::distributed_connections_pool_size]),
Expand All @@ -828,9 +829,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
info.per_replica_pools = {std::move(pool)};
info.default_database = address.default_database;

addresses_with_failover.emplace_back(Addresses{address});

slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
shards_info.emplace_back(std::move(info));
}
};
Expand All @@ -852,10 +850,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
secret = from.secret;
name = from.name;

constrainShardInfoAndAddressesToMaxHosts(max_hosts);

for (size_t i = 0; i < shards_info.size(); ++i)
slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);

initMisc();
}


void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts)
{
if (max_hosts == 0 || shards_info.size() <= max_hosts)
return;

pcg64_fast gen{randomSeed()};
std::shuffle(shards_info.begin(), shards_info.end(), gen);
shards_info.resize(max_hosts);

AddressesWithFailover addresses_with_failover_;

UInt32 shard_num = 0;
for (auto & shard_info : shards_info)
{
addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]);
shard_info.shard_num = ++shard_num;
}

addresses_with_failover.swap(addresses_with_failover_);
}


Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t> & indices)
{
for (size_t index : indices)
Expand Down
7 changes: 5 additions & 2 deletions src/Interpreters/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class Cluster
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;

/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const;

/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
Expand All @@ -296,7 +296,7 @@ class Cluster

/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts);

void addShard(
const Settings & settings,
Expand All @@ -308,6 +308,9 @@ class Cluster
ShardInfoInsertPathForInternalReplication insert_paths = {},
bool internal_replication = false);

/// Reduce size of cluster to max_hosts
void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts);

/// Inter-server secret
String secret;

Expand Down
3 changes: 1 addition & 2 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1937,8 +1937,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());

if (!table_function->canBeUsedToCreateTable())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' cannot be used to create a table", table_function->getName());
table_function->validateUseToCreateTable();

/// In case of CREATE AS table_function() query we should use global context
/// in storage creation because there will be no query context on server startup
Expand Down
Loading
Loading