Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ enum class AccessType : uint8_t
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake)
Expand Down
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
M(Merge, "Number of executing background merges") \
M(MergeParts, "Number of source parts participating in current background merges") \
M(Move, "Number of currently executing moves") \
M(Export, "Number of currently executing exports") \
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \
Expand Down
6 changes: 6 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \
M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \
M(PartsExports, "Number of successful part exports.", ValueType::Number) \
M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \
M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \
M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \
M(FailedQuery, "Number of failed queries.", ValueType::Number) \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \
Expand Down Expand Up @@ -171,6 +175,8 @@
M(MergesThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_merges_bandwidth_for_server' throttling.", ValueType::Microseconds) \
M(MutationsThrottlerBytes, "Bytes passed through 'max_mutations_bandwidth_for_server' throttler.", ValueType::Bytes) \
M(MutationsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_mutations_bandwidth_for_server' throttling.", ValueType::Microseconds) \
M(ExportsThrottlerBytes, "Bytes passed through 'max_exports_bandwidth_for_server' throttler.", ValueType::Bytes) \
M(ExportsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_exports_bandwidth_for_server' throttling.", ValueType::Microseconds) \
M(QueryRemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth' throttler.", ValueType::Bytes) \
M(QueryRemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth' throttling.", ValueType::Microseconds) \
M(QueryRemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth' throttler.", ValueType::Bytes) \
Expand Down
1 change: 1 addition & 0 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ namespace DB
DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \
DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \
DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \
Expand Down
6 changes: 6 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6854,6 +6854,9 @@ Possible values:
)", 0) \
DECLARE(Bool, use_roaring_bitmap_iceberg_positional_deletes, false, R"(
Use roaring bitmap for iceberg positional deletes.
)", 0) \
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
Overwrite file if it already exists when exporting a merge tree part
)", 0) \
\
/* ####################################################### */ \
Expand Down Expand Up @@ -7030,6 +7033,9 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"(
Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation.
)", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \
DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_part, false, R"(
Experimental export merge tree part.
)", EXPERIMENTAL, allow_experimental_export_merge_tree_part) \
\
DECLARE(String, promql_database, "", R"(
Specifies the database name used by the 'promql' dialect. Empty string means the current database.
Expand Down
10 changes: 10 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
/// RELEASE CLOSED
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
{
Expand Down
3 changes: 2 additions & 1 deletion src/Databases/DatabaseReplicated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,8 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
if (const auto * alter = query_ptr->as<const ASTAlterQuery>())
{
if (alter->isAttachAlter() || alter->isFetchAlter() || alter->isDropPartitionAlter()
|| is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot())
|| is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot()
|| alter->isExportPartAlter())
return false;

if (has_many_shards() || !is_replicated_table(query_ptr))
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct RelativePathWithMetadata
virtual ~RelativePathWithMetadata() = default;

virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); }
virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); }

virtual std::string getPath() const { return relative_path; }
virtual bool isArchive() const { return false; }
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
Expand Down
15 changes: 15 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MovesList.h>
#include <Storages/MergeTree/ExportList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
Expand Down Expand Up @@ -324,6 +325,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server;
extern const ServerSettingsUInt64 max_merges_bandwidth_for_server;
extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server;
extern const ServerSettingsUInt64 max_exports_bandwidth_for_server;
extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server;
extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server;
extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server;
Expand Down Expand Up @@ -503,6 +505,7 @@ struct ContextSharedPart : boost::noncopyable
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
Expand Down Expand Up @@ -544,6 +547,8 @@ struct ContextSharedPart : boost::noncopyable
mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations
mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges

mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports

MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk.
LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup
Expand Down Expand Up @@ -1051,6 +1056,9 @@ struct ContextSharedPart : boost::noncopyable

if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server])
merges_throttler = std::make_shared<Throttler>(bandwidth, ProfileEvents::MergesThrottlerBytes, ProfileEvents::MergesThrottlerSleepMicroseconds);

if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server])
exports_throttler = std::make_shared<Throttler>(bandwidth, ProfileEvents::ExportsThrottlerBytes, ProfileEvents::ExportsThrottlerSleepMicroseconds);
}
};

Expand Down Expand Up @@ -1208,6 +1216,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ExportsList & Context::getExportsList() { return shared->exports_list; }
const ExportsList & Context::getExportsList() const { return shared->exports_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }
Expand Down Expand Up @@ -4148,6 +4158,11 @@ ThrottlerPtr Context::getMergesThrottler() const
return shared->merges_throttler;
}

ThrottlerPtr Context::getExportsThrottler() const
{
return shared->exports_throttler;
}

void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const
{
if (read_bandwidth)
Expand Down
5 changes: 5 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class AsynchronousMetrics;
class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ExportsList;
class ReplicatedFetchList;
class RefreshSet;
class Cluster;
Expand Down Expand Up @@ -1165,6 +1166,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
MovesList & getMovesList();
const MovesList & getMovesList() const;

ExportsList & getExportsList();
const ExportsList & getExportsList() const;

ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;

Expand Down Expand Up @@ -1659,6 +1663,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>

ThrottlerPtr getMutationsThrottler() const;
ThrottlerPtr getMergesThrottler() const;
ThrottlerPtr getExportsThrottler() const;

void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const;
void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const;
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,8 @@ bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr & ast_ddl, const Stora
alter->isFreezeAlter() ||
alter->isUnlockSnapshot() ||
alter->isMovePartitionToDiskOrVolumeAlter() ||
alter->isCommentAlter())
alter->isCommentAlter() ||
alter->isExportPartAlter())
return false;
}

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table);
break;
}
case ASTAlterCommand::EXPORT_PART:
{
required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::FETCH_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table);
Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/PartLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"MovePart", static_cast<Int8>(MOVE_PART)},
{"MergePartsStart", static_cast<Int8>(MERGE_PARTS_START)},
{"MutatePartStart", static_cast<Int8>(MUTATE_PART_START)},
{"ExportPart", static_cast<Int8>(EXPORT_PART)},
}
);

Expand Down Expand Up @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription()
"RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)."
"MutatePartStart — Mutating of a data part has started, "
"MutatePart — Mutating of a data part has finished, "
"MovePart — Moving the data part from the one disk to another one."},
"MovePart — Moving the data part from the one disk to another one."
"ExportPart — Exporting the data part from a MergeTree table into a target table that represents external storage (e.g., object storage or a data lake).."},
{"merge_reason", std::move(merge_reason_datatype),
"The reason for the event with type MERGE_PARTS. Can have one of the following values: "
"NotAMerge — The current event has the type other than MERGE_PARTS, "
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/PartLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct PartLogElement
MOVE_PART = 6,
MERGE_PARTS_START = 7,
MUTATE_PART_START = 8,
EXPORT_PART = 9,
};

/// Copy of MergeAlgorithm since values are written to disk.
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/executeDDLQueryOnCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ bool isSupportedAlterTypeForOnClusterDDLQuery(int type)
ASTAlterCommand::ATTACH_PARTITION,
/// Usually followed by ATTACH PARTITION
ASTAlterCommand::FETCH_PARTITION,
/// Data operation that should be executed locally on each replica
ASTAlterCommand::EXPORT_PART,
/// Logical error
ASTAlterCommand::NO_TYPE,
};
Expand Down
28 changes: 28 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
ostr << quoteString(move_destination_name);
}
}
else if (type == ASTAlterCommand::EXPORT_PART)
{
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << "PART"
<< (settings.hilite ? hilite_none : "");
partition->format(ostr, settings, state, frame);
ostr << " TO ";
switch (move_destination_type)
{
case DataDestinationType::TABLE:
ostr << "TABLE ";
if (!to_database.empty())
{
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database)
<< (settings.hilite ? hilite_none : "") << ".";
}
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table)
<< (settings.hilite ? hilite_none : "");
return;
default:
break;
}

}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down Expand Up @@ -627,6 +650,11 @@ bool ASTAlterQuery::isMovePartitionToDiskOrVolumeAlter() const
return false;
}

bool ASTAlterQuery::isExportPartAlter() const
{
return isOneCommandTypeOnly(ASTAlterCommand::EXPORT_PART);
}


/** Get the text that identifies this element. */
String ASTAlterQuery::getID(char delim) const
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST
FREEZE_ALL,
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PART,

DELETE,
UPDATE,
Expand Down Expand Up @@ -263,6 +264,8 @@ class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCl

bool isMovePartitionToDiskOrVolumeAlter() const;

bool isExportPartAlter() const;

bool isCommentAlter() const;

String getID(char) const override;
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ namespace DB
MR_MACROS(MONTHS, "MONTHS") \
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
Loading
Loading