diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index fc5cf2cb4e06..843d8ee7a72f 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -210,6 +210,7 @@ enum class AccessType : uint8_t enabled implicitly by the grant ALTER_TABLE */\ M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\ M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \ + M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \ M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \ M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \ M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2dcf07466941..411942d62a60 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure) add_headers_and_sources(dbms Storages/ObjectStorage/S3) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) add_headers_and_sources(dbms Storages/ObjectStorage/Local) +add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 8b65070a5fde..da72fd5f4653 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -10,6 +10,7 @@ M(Merge, "Number of executing background merges") \ M(MergeParts, "Number of source parts participating in current background merges") \ M(Move, "Number of currently executing moves") \ + M(Export, "Number of currently executing exports") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 1839ae632e47..610b732cafd2 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -28,6 +28,10 @@ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \ M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \ M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \ + M(PartsExports, "Number of successful part exports.", ValueType::Number) \ + M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \ + M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \ + M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \ M(FailedQuery, "Number of failed queries.", ValueType::Number) \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \ M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \ @@ -171,6 +175,8 @@ M(MergesThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_merges_bandwidth_for_server' throttling.", ValueType::Microseconds) \ M(MutationsThrottlerBytes, "Bytes passed through 'max_mutations_bandwidth_for_server' throttler.", ValueType::Bytes) \ M(MutationsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_mutations_bandwidth_for_server' throttling.", ValueType::Microseconds) \ + M(ExportsThrottlerBytes, "Bytes passed through 'max_exports_bandwidth_for_server' throttler.", ValueType::Bytes) \ + M(ExportsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_exports_bandwidth_for_server' throttling.", ValueType::Microseconds) \ M(QueryRemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth' throttler.", ValueType::Bytes) \ M(QueryRemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth' throttling.", ValueType::Microseconds) \ M(QueryRemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth' throttler.", ValueType::Bytes) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 573778cf7e51..8d0a31538dc3 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -112,6 +112,7 @@ namespace DB DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \ DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \ DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \ + DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 5a9531ba861a..8ef7a64bad9e 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6870,6 +6870,9 @@ Possible values: )", 0) \ DECLARE(Bool, use_roaring_bitmap_iceberg_positional_deletes, false, R"( Use roaring bitmap for iceberg positional deletes. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"( +Overwrite file if it already exists when exporting a merge tree part )", 0) \ \ /* ####################################################### */ \ @@ -7080,6 +7083,9 @@ Execute request to object storage as remote on one of object_storage_cluster nod DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"( Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation. )", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \ + DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_part, false, R"( +Experimental export merge tree part. +)", EXPERIMENTAL, allow_experimental_export_merge_tree_part) \ \ DECLARE(String, promql_database, "", R"( Specifies the database name used by the 'promql' dialect. Empty string means the current database. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 177de3cbcb34..5915d3f3714d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -159,6 +159,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 3ac52499ed66..c089390cfb1a 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -2245,7 +2245,8 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, if (const auto * alter = query_ptr->as()) { if (alter->isAttachAlter() || alter->isFetchAlter() || alter->isDropPartitionAlter() - || is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot()) + || is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot() + || alter->isExportPartAlter()) return false; if (has_many_shards() || !is_replicated_table(query_ptr)) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 264f36a00588..7a3a050903fa 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -162,6 +162,8 @@ struct RelativePathWithMetadata virtual ~RelativePathWithMetadata() = default; virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); } + virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); } + virtual std::string getPath() const { return relative_path; } virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 27405614f8f4..c9f6e9ad84b6 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -157,6 +158,8 @@ namespace ProfileEvents extern const Event BackupThrottlerSleepMicroseconds; extern const Event MergesThrottlerBytes; extern const Event MergesThrottlerSleepMicroseconds; + extern const Event ExportsThrottlerBytes; + extern const Event ExportsThrottlerSleepMicroseconds; extern const Event MutationsThrottlerBytes; extern const Event MutationsThrottlerSleepMicroseconds; extern const Event QueryLocalReadThrottlerBytes; @@ -325,6 +328,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server; extern const ServerSettingsUInt64 max_merges_bandwidth_for_server; extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server; + extern const ServerSettingsUInt64 max_exports_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server; @@ -504,6 +508,7 @@ struct ContextSharedPart : boost::noncopyable GlobalOvercommitTracker global_overcommit_tracker; MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree) + ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree) ReplicatedFetchList replicated_fetch_list; RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView) ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections. @@ -545,6 +550,8 @@ struct ContextSharedPart : boost::noncopyable mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges + mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports + MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk. LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup @@ -1055,6 +1062,9 @@ struct ContextSharedPart : boost::noncopyable if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server]) merges_throttler = std::make_shared(bandwidth, ProfileEvents::MergesThrottlerBytes, ProfileEvents::MergesThrottlerSleepMicroseconds); + + if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server]) + exports_throttler = std::make_shared(bandwidth, ProfileEvents::ExportsThrottlerBytes, ProfileEvents::ExportsThrottlerSleepMicroseconds); } }; @@ -1212,6 +1222,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; } MovesList & Context::getMovesList() { return shared->moves_list; } const MovesList & Context::getMovesList() const { return shared->moves_list; } +ExportsList & Context::getExportsList() { return shared->exports_list; } +const ExportsList & Context::getExportsList() const { return shared->exports_list; } ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; } const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; } RefreshSet & Context::getRefreshSet() { return shared->refresh_set; } @@ -4155,6 +4167,11 @@ ThrottlerPtr Context::getMergesThrottler() const return shared->merges_throttler; } +ThrottlerPtr Context::getExportsThrottler() const +{ + return shared->exports_throttler; +} + void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const { if (read_bandwidth) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 9429ab5690ee..583dd6509ebe 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -89,6 +89,7 @@ class AsynchronousMetrics; class BackgroundSchedulePool; class MergeList; class MovesList; +class ExportsList; class ReplicatedFetchList; class RefreshSet; class Cluster; @@ -1166,6 +1167,9 @@ class Context: public ContextData, public std::enable_shared_from_this MovesList & getMovesList(); const MovesList & getMovesList() const; + ExportsList & getExportsList(); + const ExportsList & getExportsList() const; + ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; @@ -1671,6 +1675,7 @@ class Context: public ContextData, public std::enable_shared_from_this ThrottlerPtr getMutationsThrottler() const; ThrottlerPtr getMergesThrottler() const; + ThrottlerPtr getExportsThrottler() const; void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index adc8c01a0294..28182a3c772c 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -752,7 +752,8 @@ bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr & ast_ddl, const Stora alter->isFreezeAlter() || alter->isUnlockSnapshot() || alter->isMovePartitionToDiskOrVolumeAlter() || - alter->isCommentAlter()) + alter->isCommentAlter() || + alter->isExportPartAlter()) return false; } diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 841e9b768d44..988df44e7049 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -539,6 +539,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table); break; } + case ASTAlterCommand::EXPORT_PART: + { + required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table); + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); + break; + } case ASTAlterCommand::FETCH_PARTITION: { required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table); diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index 02c6f6e573b0..aca3b4cf6870 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription() {"MovePart", static_cast(MOVE_PART)}, {"MergePartsStart", static_cast(MERGE_PARTS_START)}, {"MutatePartStart", static_cast(MUTATE_PART_START)}, + {"ExportPart", static_cast(EXPORT_PART)}, } ); @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription() "RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)." "MutatePartStart — Mutating of a data part has started, " "MutatePart — Mutating of a data part has finished, " - "MovePart — Moving the data part from the one disk to another one."}, + "MovePart — Moving the data part from the one disk to another one." + "ExportPart — Exporting the data part from a MergeTree table into a target table that represents external storage (e.g., object storage or a data lake).."}, {"merge_reason", std::move(merge_reason_datatype), "The reason for the event with type MERGE_PARTS. Can have one of the following values: " "NotAMerge — The current event has the type other than MERGE_PARTS, " diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 44d2fb413c5f..4f58069dae55 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -30,6 +30,7 @@ struct PartLogElement MOVE_PART = 6, MERGE_PARTS_START = 7, MUTATE_PART_START = 8, + EXPORT_PART = 9, }; /// Copy of MergeAlgorithm since values are written to disk. diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 46ef1aaafee3..eb2315253a0f 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -53,6 +53,8 @@ bool isSupportedAlterTypeForOnClusterDDLQuery(int type) ASTAlterCommand::ATTACH_PARTITION, /// Usually followed by ATTACH PARTITION ASTAlterCommand::FETCH_PARTITION, + /// Data operation that should be executed locally on each replica + ASTAlterCommand::EXPORT_PART, /// Logical error ASTAlterCommand::NO_TYPE, }; diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 32f2156b5cde..30b5f9dca156 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -358,6 +358,26 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << quoteString(move_destination_name); } } + else if (type == ASTAlterCommand::EXPORT_PART) + { + ostr << "EXPORT PART "; + partition->format(ostr, settings, state, frame); + ostr << " TO "; + switch (move_destination_type) + { + case DataDestinationType::TABLE: + ostr << "TABLE "; + if (!to_database.empty()) + { + ostr << backQuoteIfNeed(to_database) << "."; + } + ostr << backQuoteIfNeed(to_table); + return; + default: + break; + } + + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { ostr << (replace ? "REPLACE" : "ATTACH") << " PARTITION " @@ -627,6 +647,11 @@ bool ASTAlterQuery::isMovePartitionToDiskOrVolumeAlter() const return false; } +bool ASTAlterQuery::isExportPartAlter() const +{ + return isOneCommandTypeOnly(ASTAlterCommand::EXPORT_PART); +} + /** Get the text that identifies this element. */ String ASTAlterQuery::getID(char delim) const diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 3867a86cf797..d8d502cb87c6 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST FREEZE_ALL, UNFREEZE_PARTITION, UNFREEZE_ALL, + EXPORT_PART, DELETE, UPDATE, @@ -263,6 +264,8 @@ class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCl bool isMovePartitionToDiskOrVolumeAlter() const; + bool isExportPartAlter() const; + bool isCommentAlter() const; String getID(char) const override; diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 057aad6fffea..58694bde8984 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -332,6 +332,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 4bb76c0d2e4b..775d495492cf 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); + ParserKeyword s_export_part(Keyword::EXPORT_PART); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -535,6 +536,23 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->move_destination_name = ast_space_name->as().value.safeGet(); } + else if (s_export_part.ignore(pos, expected)) + { + if (!parser_string_and_substituion.parse(pos, command_partition, expected)) + return false; + + command->type = ASTAlterCommand::EXPORT_PART; + command->part = true; + + if (!s_to_table.ignore(pos, expected)) + { + return false; + } + + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; + } else if (s_move_partition.ignore(pos, expected)) { if (!parser_partition.parse(pos, command_partition, expected)) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index 0be6d30f4e7c..0e2f897fb617 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -264,19 +264,6 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column_name.column_name).column; } -std::string WildcardPartitionStrategy::getPathForRead( - const std::string & prefix) -{ - return prefix; -} - -std::string WildcardPartitionStrategy::getPathForWrite( - const std::string & prefix, - const std::string & partition_key) -{ - return PartitionedSink::replaceWildcards(prefix, partition_key); -} - HiveStylePartitionStrategy::HiveStylePartitionStrategy( KeyDescription partition_key_description_, const Block & sample_block_, @@ -296,41 +283,6 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); } -std::string HiveStylePartitionStrategy::getPathForRead(const std::string & prefix) -{ - return prefix + "**." + Poco::toLower(file_format); -} - -std::string HiveStylePartitionStrategy::getPathForWrite( - const std::string & prefix, - const std::string & partition_key) -{ - std::string path; - - if (!prefix.empty()) - { - path += prefix; - if (path.back() != '/') - { - path += '/'; - } - } - - /// Not adding '/' because buildExpressionHive() always adds a trailing '/' - path += partition_key; - - /* - * File extension is toLower(format) - * This isn't ideal, but I guess multiple formats can be specified and introduced. - * So I think it is simpler to keep it this way. - * - * Or perhaps implement something like `IInputFormat::getFileExtension()` - */ - path += std::to_string(generateSnowflakeID()) + "." + Poco::toLower(file_format); - - return path; -} - ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) { Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index bc90d7f03461..606122b4ae71 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -29,8 +29,12 @@ struct IPartitionStrategy virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0; - virtual std::string getPathForRead(const std::string & prefix) = 0; - virtual std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) = 0; + ColumnPtr computePartitionKey(Block & block) const + { + actions_with_column_name.actions->execute(block); + + return block.getByName(actions_with_column_name.column_name).column; + } virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) { @@ -53,6 +57,7 @@ struct IPartitionStrategy const KeyDescription partition_key_description; const Block sample_block; ContextPtr context; + PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -89,11 +94,6 @@ struct WildcardPartitionStrategy : IPartitionStrategy WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_); ColumnPtr computePartitionKey(const Chunk & chunk) override; - std::string getPathForRead(const std::string & prefix) override; - std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; - -private: - PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -111,8 +111,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy bool partition_columns_in_data_file_); ColumnPtr computePartitionKey(const Chunk & chunk) override; - std::string getPathForRead(const std::string & prefix) override; - std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override; Block getFormatHeader() override; @@ -121,7 +119,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy const std::string file_format; const bool partition_columns_in_data_file; std::unordered_set partition_columns_name_set; - PartitionExpressionActionsAndColumnName actions_with_column_name; Block block_without_partition_columns; }; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 181a5812fa2e..cb2bc3afb85a 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -453,6 +453,27 @@ class IStorage : public std::enable_shared_from_this, public TypePromo ContextPtr /*context*/, bool /*async_insert*/); + virtual bool supportsImport() const + { + return false; + } + + /* +It is currently only implemented in StorageObjectStorage. + It is meant to be used to import merge tree data parts into object storage. It is similar to the write API, + but it won't re-partition the data and should allow the filename to be set by the caller. + */ + virtual SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); + } + + /** Writes the data to a table in distributed manner. * It is supposed that implementation looks into SELECT part of the query and executes distributed * INSERT SELECT if it is possible with current storage as a receiver and query SELECT part as a producer. diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp new file mode 100644 index 000000000000..0239f841dc69 --- /dev/null +++ b/src/Storages/MergeTree/ExportList.cpp @@ -0,0 +1,66 @@ +#include + +namespace DB +{ + +ExportsListElement::ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + const String & target_file_name_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_, + const ContextPtr & context) +: source_table_id(source_table_id_) +, destination_table_id(destination_table_id_) +, part_size(part_size_) +, part_name(part_name_) +, destination_file_path(target_file_name_) +, total_rows_to_read(total_rows_to_read_) +, total_size_bytes_compressed(total_size_bytes_compressed_) +, total_size_bytes_uncompressed(total_size_bytes_uncompressed_) +, create_time(create_time_) +{ + thread_group = ThreadGroup::createForMergeMutate(context); +} + +ExportsListElement::~ExportsListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&thread_group->memory_tracker); +} + +ExportInfo ExportsListElement::getInfo() const +{ + ExportInfo res; + res.source_database = source_table_id.database_name; + res.source_table = source_table_id.table_name; + res.destination_database = destination_table_id.database_name; + res.destination_table = destination_table_id.table_name; + res.part_name = part_name; + res.destination_file_path = destination_file_path; + res.rows_read = rows_read; + res.total_rows_to_read = total_rows_to_read; + res.total_size_bytes_compressed = total_size_bytes_compressed; + res.total_size_bytes_uncompressed = total_size_bytes_uncompressed; + res.bytes_read_uncompressed = bytes_read_uncompressed; + res.memory_usage = getMemoryUsage(); + res.peak_memory_usage = getPeakMemoryUsage(); + res.create_time = create_time; + res.elapsed = elapsed; + return res; +} + +UInt64 ExportsListElement::getMemoryUsage() const +{ + return thread_group->memory_tracker.get(); +} + +UInt64 ExportsListElement::getPeakMemoryUsage() const +{ + return thread_group->memory_tracker.getPeak(); +} + +} diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h new file mode 100644 index 000000000000..ade18b69480c --- /dev/null +++ b/src/Storages/MergeTree/ExportList.h @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric Export; +} + +namespace DB +{ + +struct ExportInfo +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + String part_name; + String destination_file_path; + UInt64 rows_read; + UInt64 total_rows_to_read; + UInt64 total_size_bytes_compressed; + UInt64 total_size_bytes_uncompressed; + UInt64 bytes_read_uncompressed; + UInt64 memory_usage; + UInt64 peak_memory_usage; + time_t create_time = 0; + Float64 elapsed; +}; + +struct ExportsListElement : private boost::noncopyable +{ + const StorageID source_table_id; + const StorageID destination_table_id; + const UInt64 part_size; + const String part_name; + const String destination_file_path; + UInt64 rows_read {0}; + UInt64 total_rows_to_read {0}; + UInt64 total_size_bytes_compressed {0}; + UInt64 total_size_bytes_uncompressed {0}; + UInt64 bytes_read_uncompressed {0}; + time_t create_time {0}; + Float64 elapsed {0}; + + Stopwatch watch; + ThreadGroupPtr thread_group; + + ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + const String & destination_file_path_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_, + const ContextPtr & context); + + ~ExportsListElement(); + + ExportInfo getInfo() const; + + UInt64 getMemoryUsage() const; + UInt64 getPeakMemoryUsage() const; +}; + + +class ExportsList final : public BackgroundProcessList +{ +private: + using Parent = BackgroundProcessList; + +public: + ExportsList() + : Parent(CurrentMetrics::Export) + {} +}; + +using ExportsListEntry = BackgroundProcessListEntry; + +} diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f6082afbb577..fbbe56256a0f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -114,6 +119,7 @@ #include #include #include +#include #include #include @@ -156,6 +162,10 @@ namespace ProfileEvents extern const Event LoadedDataPartsMicroseconds; extern const Event RestorePartsSkippedFiles; extern const Event RestorePartsSkippedBytes; + extern const Event PartsExports; + extern const Event PartsExportTotalMilliseconds; + extern const Event PartsExportFailures; + extern const Event PartsExportDuplicated; } namespace CurrentMetrics @@ -199,6 +209,10 @@ namespace Setting extern const SettingsUInt64 min_insert_block_size_rows; extern const SettingsUInt64 min_insert_block_size_bytes; extern const SettingsBool apply_patch_parts; + extern const SettingsBool allow_experimental_export_merge_tree_part; + extern const SettingsUInt64 min_bytes_to_use_direct_io; + extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; + extern const SettingsBool output_format_parallel_formatting; } namespace MergeTreeSetting @@ -316,6 +330,8 @@ namespace ErrorCodes extern const int CANNOT_FORGET_PARTITION; extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY; extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES; + extern const int UNKNOWN_TABLE; + extern const int FILE_ALREADY_EXISTS; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -4490,8 +4506,6 @@ void MergeTreeData::changeSettings( { if (new_settings) { - bool has_storage_policy_changed = false; - const auto & new_changes = new_settings->as().changes; StoragePolicyPtr new_storage_policy = nullptr; @@ -4530,8 +4544,6 @@ void MergeTreeData::changeSettings( disk->createDirectories(fs::path(relative_data_path) / DETACHED_DIR_NAME); } /// FIXME how would that be done while reloading configuration??? - - has_storage_policy_changed = true; } } } @@ -4548,9 +4560,6 @@ void MergeTreeData::changeSettings( StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); new_metadata.setSettingsChanges(new_settings); setInMemoryMetadata(new_metadata); - - if (has_storage_policy_changed) - startBackgroundMovesIfNeeded(); } } @@ -6189,6 +6198,236 @@ void MergeTreeData::movePartitionToTable(const PartitionCommand & command, Conte movePartitionToTable(dest_storage, command.partition, query_context); } +void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextPtr query_context) +{ + if (!query_context->getSettingsRef()[Setting::allow_experimental_export_merge_tree_part]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "Exporting merge tree part is experimental. Set `allow_experimental_export_merge_tree_part` to enable it"); + } + + String dest_database = query_context->resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + + if (dest_storage->getStorageID() == this->getStorageID()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed"); + } + + if (!dest_storage->supportsImport()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName()); + + auto query_to_string = [] (const ASTPtr & ast) + { + return ast ? ast->formatWithSecretsOneLine() : ""; + }; + + auto src_snapshot = getInMemoryMetadataPtr(); + auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); + + if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical())) + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + + if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); + + auto part_name = command.partition->as().value.safeGet(); + + auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); + + if (!part) + throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'", + part_name, getStorageID().getFullTableName()); + + { + MergeTreeExportManifest manifest( + dest_storage->getStorageID(), + part, + query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting]); + + std::lock_guard lock(export_manifests_mutex); + + if (!export_manifests.emplace(std::move(manifest)).second) + { + throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", + part_name, dest_storage->getStorageID().getFullTableName()); + } + } + + background_moves_assignee.trigger(); +} + +void MergeTreeData::exportPartToTableImpl( + const MergeTreeExportManifest & manifest, + ContextPtr local_context) +{ + auto metadata_snapshot = getInMemoryMetadataPtr(); + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); + StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context); + + MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export; + + NamesAndTypesList partition_columns; + if (metadata_snapshot->hasPartitionKey()) + { + const auto & partition_key = metadata_snapshot->getPartitionKey(); + if (!partition_key.column_names.empty()) + partition_columns = partition_key.expression->getRequiredColumnsWithTypes(); + } + + auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns); + + auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, getContext()); + if (!destination_storage) + { + std::lock_guard inner_lock(export_manifests_mutex); + + const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs(); + export_manifests.erase(manifest); + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); + } + + SinkToStoragePtr sink; + std::string destination_file_path; + + try + { + auto context_copy = Context::createCopy(local_context); + context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + + sink = destination_storage->import( + manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), + block_with_partition_values, + destination_file_path, + manifest.overwrite_file_if_exists, + context_copy); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS) + { + ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); + } + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + + std::lock_guard inner_lock(export_manifests_mutex); + export_manifests.erase(manifest); + return; + } + + bool apply_deleted_mask = true; + bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); + bool prefetch = false; + + MergeTreeData::IMutationsSnapshot::Params params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = manifest.data_part->getMetadataVersion(), + }; + + auto mutations_snapshot = getMutationsSnapshot(params); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + manifest.data_part, + mutations_snapshot, + local_context); + + QueryPlan plan_for_part; + + createReadFromPartStep( + read_type, + plan_for_part, + *this, + storage_snapshot, + RangesInDataPart(manifest.data_part), + alter_conversions, + nullptr, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + local_context, + getLogger("ExportPartition")); + + auto exports_list_entry = getContext()->getExportsList().insert( + getStorageID(), + manifest.destination_storage_id, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->name, + destination_file_path, + manifest.data_part->rows_count, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->getBytesUncompressedOnDisk(), + manifest.create_time, + local_context); + + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); + + QueryPlanOptimizationSettings optimization_settings(local_context); + auto pipeline_settings = BuildQueryPipelineSettings(local_context); + auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + + builder->setProgressCallback([&exports_list_entry](const Progress & progress) + { + (*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes; + (*exports_list_entry)->rows_read += progress.read_rows; + (*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds(); + }); + + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + + pipeline.complete(sink); + + try + { + CompletedPipelineExecutor exec(pipeline); + exec.execute(); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + {}, + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + export_manifests.erase(manifest); + + ProfileEvents::increment(ProfileEvents::PartsExports); + ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast((*exports_list_entry)->elapsed * 1000)); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name)); + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + ExecutionStatus::fromCurrentException("", true), + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + export_manifests.erase(manifest); + + throw; + } +} + void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MOVE PARTITION TO SHARD is not supported by storage {}", getName()); @@ -6240,6 +6479,11 @@ Pipe MergeTreeData::alterPartition( } } break; + case PartitionCommand::EXPORT_PART: + { + exportPartToTable(command, query_context); + break; + } case PartitionCommand::DROP_DETACHED_PARTITION: dropDetached(command.partition, command.part, query_context); @@ -8571,6 +8815,32 @@ std::pair MergeTreeData::cloneAn return std::make_pair(dst_data_part, std::move(temporary_directory_lock)); } +std::vector MergeTreeData::getExportsStatus() const +{ + std::lock_guard lock(export_manifests_mutex); + std::vector result; + + auto source_database = getStorageID().database_name; + auto source_table = getStorageID().table_name; + + for (const auto & manifest : export_manifests) + { + MergeTreeExportStatus status; + + status.source_database = source_database; + status.source_table = source_table; + status.destination_database = manifest.destination_storage_id.database_name; + status.destination_table = manifest.destination_storage_id.table_name; + status.create_time = manifest.create_time; + status.part_name = manifest.data_part->name; + + result.emplace_back(std::move(status)); + } + + return result; +} + + bool MergeTreeData::canUseAdaptiveGranularity() const { const auto settings = getSettings(); @@ -8848,7 +9118,8 @@ void MergeTreeData::writePartLog( const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters) + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry) try { auto table_id = getStorageID(); @@ -8916,6 +9187,13 @@ try part_log_elem.rows = (*merge_entry)->rows_written; part_log_elem.peak_memory_usage = (*merge_entry)->getMemoryTracker().getPeak(); } + else if (exports_entry) + { + part_log_elem.rows_read = (*exports_entry)->rows_read; + part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; + part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); + part_log_elem.path_on_disk = (*exports_entry)->destination_file_path; + } if (profile_counters) { @@ -8957,21 +9235,43 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) { - if (parts_mover.moves_blocker.isCancelled()) - return false; + if (!parts_mover.moves_blocker.isCancelled()) + { + auto moving_tagger = selectPartsForMove(); + if (!moving_tagger->parts_to_move.empty()) + { + assignee.scheduleMoveTask(std::make_shared( + [this, moving_tagger] () mutable + { + ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); + WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; + }, moves_assignee_trigger, getStorageID())); + return true; + } + } - auto moving_tagger = selectPartsForMove(); - if (moving_tagger->parts_to_move.empty()) - return false; + std::lock_guard lock(export_manifests_mutex); - assignee.scheduleMoveTask(std::make_shared( - [this, moving_tagger] () mutable + for (auto & manifest : export_manifests) + { + if (manifest.in_progress) { - ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); - WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); - return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; - }, moves_assignee_trigger, getStorageID())); - return true; + continue; + } + + manifest.in_progress = assignee.scheduleMoveTask(std::make_shared( + [this, manifest] () mutable { + exportPartToTableImpl(manifest, getContext()); + return true; + }, + moves_assignee_trigger, + getStorageID())); + + return manifest.in_progress; + } + + return false; } bool MergeTreeData::areBackgroundMovesNeeded() const @@ -9189,6 +9489,10 @@ bool MergeTreeData::canUsePolymorphicParts() const return canUsePolymorphicParts(*getSettings(), unused); } +void MergeTreeData::startBackgroundMoves() +{ + background_moves_assignee.start(); +} void MergeTreeData::checkDropOrRenameCommandDoesntAffectInProgressMutations( const AlterCommand & command, const std::map & unfinished_mutations, ContextPtr local_context) const diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 2cd69c086473..7072b8d52e82 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,8 @@ #include #include #include +#include +#include #include #include @@ -979,6 +982,12 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Moves partition to specified Table void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context); + void exportPartToTable(const PartitionCommand & command, ContextPtr query_context); + + void exportPartToTableImpl( + const MergeTreeExportManifest & manifest, + ContextPtr local_context); + /// Checks that Partition could be dropped right now /// Otherwise - throws an exception with detailed information. /// We do not use mutex because it is not very important that the size could change during the operation. @@ -1056,6 +1065,7 @@ class MergeTreeData : public IStorage, public WithMutableContext const WriteSettings & write_settings); virtual std::vector getMutationsStatus() const = 0; + std::vector getExportsStatus() const; /// Returns true if table can create new parts with adaptive granularity /// Has additional constraint in replicated version @@ -1241,6 +1251,10 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Mutex for currently_moving_parts mutable std::mutex moving_parts_mutex; + mutable std::mutex export_manifests_mutex; + + std::set export_manifests; + PinnedPartUUIDsPtr getPinnedPartUUIDs() const; /// Schedules background job to like merge/mutate/fetch an executor @@ -1359,6 +1373,8 @@ class MergeTreeData : public IStorage, public WithMutableContext are_columns_and_secondary_indices_sizes_calculated = false; } + void startBackgroundMoves(); + /// Engine-specific methods BrokenPartCallback broken_part_callback; @@ -1614,7 +1630,8 @@ class MergeTreeData : public IStorage, public WithMutableContext const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters); + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry = nullptr); /// If part is assigned to merge or mutation (possibly replicated) /// Should be overridden by children, because they can have different @@ -1825,8 +1842,6 @@ class MergeTreeData : public IStorage, public WithMutableContext bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const; - virtual void startBackgroundMovesIfNeeded() = 0; - bool allow_nullable_key = false; bool allow_reverse_key = false; diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h new file mode 100644 index 000000000000..36831fd132ba --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -0,0 +1,50 @@ +#include +#include + +namespace DB +{ + +struct MergeTreeExportManifest +{ + using DataPartPtr = std::shared_ptr; + + + MergeTreeExportManifest( + const StorageID & destination_storage_id_, + const DataPartPtr & data_part_, + bool overwrite_file_if_exists_, + bool parallel_formatting_) + : destination_storage_id(destination_storage_id_), + data_part(data_part_), + overwrite_file_if_exists(overwrite_file_if_exists_), + parallel_formatting(parallel_formatting_), + create_time(time(nullptr)) {} + + StorageID destination_storage_id; + DataPartPtr data_part; + bool overwrite_file_if_exists; + bool parallel_formatting; + + time_t create_time; + mutable bool in_progress = false; + + bool operator<(const MergeTreeExportManifest & rhs) const + { + // Lexicographic comparison: first compare destination storage, then part name + auto lhs_storage = destination_storage_id.getQualifiedName(); + auto rhs_storage = rhs.destination_storage_id.getQualifiedName(); + + if (lhs_storage != rhs_storage) + return lhs_storage < rhs_storage; + + return data_part->name < rhs.data_part->name; + } + + bool operator==(const MergeTreeExportManifest & rhs) const + { + return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName() + && data_part->name == rhs.data_part->name; + } +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeExportStatus.h b/src/Storages/MergeTree/MergeTreeExportStatus.h new file mode 100644 index 000000000000..e71a2f15e6ed --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportStatus.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +struct MergeTreeExportStatus +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + time_t create_time = 0; + std::string part_name; +}; + +} diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index a4ab9066bb33..3037f67b23ac 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -466,6 +466,22 @@ void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Bl } } +Block MergeTreePartition::getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const +{ + chassert(partition_columns.size() == value.size()); + + Block result; + + std::size_t i = 0; + for (const auto & partition_column : partition_columns) + { + auto column = partition_column.type->createColumnConst(1, value[i++]); + result.insert({column, partition_column.type, partition_column.name}); + } + + return result; +} + NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) { auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context); diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index 4338b216cdb8..811cfdc2a90c 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -60,6 +60,8 @@ struct MergeTreePartition void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); + Block getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const; + /// Adjust partition key and execute its expression on block. Return sample block according to used expression. static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index d0385d1c7d33..cc43895f0c76 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -168,6 +168,9 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( addThrottler(read_settings.remote_throttler, context->getMergesThrottler()); addThrottler(read_settings.local_throttler, context->getMergesThrottler()); break; + case Export: + read_settings.local_throttler = context->getExportsThrottler(); + break; } MergeTreeReaderSettings reader_settings = diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index abba230d9e79..a858adf33bb5 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -15,6 +15,7 @@ enum MergeTreeSequentialSourceType { Mutation, Merge, + Export, }; /// Create stream for reading single part from MergeTree. diff --git a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h new file mode 100644 index 000000000000..1ed503cbf3c6 --- /dev/null +++ b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h @@ -0,0 +1,79 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + struct ObjectStorageFilePathGenerator + { + virtual ~ObjectStorageFilePathGenerator() = default; + std::string getPathForWrite(const std::string & partition_id) const { + return getPathForWrite(partition_id, ""); + } + virtual std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const = 0; + virtual std::string getPathForRead() const = 0; + }; + + struct ObjectStorageWildcardFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageWildcardFilePathGenerator(const std::string & raw_path_) : raw_path(raw_path_) {} + + using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope + std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const override + { + return PartitionedSink::replaceWildcards(raw_path, partition_id); + } + + std::string getPathForRead() const override + { + return raw_path; + } + + private: + std::string raw_path; + + }; + + struct ObjectStorageAppendFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageAppendFilePathGenerator( + const std::string & raw_path_, + const std::string & file_format_) + : raw_path(raw_path_), file_format(Poco::toLower(file_format_)){} + + using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope + std::string getPathForWrite(const std::string & partition_id, const std::string & file_name_override) const override + { + std::string result; + + result += raw_path; + + if (raw_path.back() != '/') + { + result += "/"; + } + + /// Not adding '/' because buildExpressionHive() always adds a trailing '/' + result += partition_id; + + const auto file_name = file_name_override.empty() ? std::to_string(generateSnowflakeID()) : file_name_override; + + result += file_name + "." + file_format; + + return result; + } + + std::string getPathForRead() const override + { + return raw_path + "**." + file_format; + } + + private: + std::string raw_path; + std::string file_format; + }; + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index aed11bfa5b6a..915a5897dd2d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -32,7 +33,6 @@ #include #include #include -#include #include #include #include @@ -55,6 +55,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; + extern const int FILE_ALREADY_EXISTS; } String StorageObjectStorage::getPathSample(ContextPtr context) @@ -432,7 +433,8 @@ SinkToStoragePtr StorageObjectStorage::write( if (configuration->getPartitionStrategy()) { - return std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + auto sink_creator = std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + return std::make_shared(configuration->partition_strategy, sink_creator, local_context, sample_block); } auto paths = configuration->getPaths(); @@ -464,6 +466,46 @@ bool StorageObjectStorage::optimize( return configuration->optimize(metadata_snapshot, context, format_settings); } +bool StorageObjectStorage::supportsImport() const +{ + return configuration->partition_strategy != nullptr && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE; +} + +SinkToStoragePtr StorageObjectStorage::import( + const std::string & file_name, + Block & block_with_partition_values, + std::string & destination_file_path, + bool overwrite_if_exists, + ContextPtr local_context) +{ + std::string partition_key; + + if (configuration->partition_strategy) + { + const auto column_with_partition_key = configuration->partition_strategy->computePartitionKey(block_with_partition_values); + + if (!column_with_partition_key->empty()) + { + partition_key = column_with_partition_key->getDataAt(0).toString(); + } + } + + destination_file_path = configuration->getPathForWrite(partition_key, file_name).path; + + if (!overwrite_if_exists && object_storage->exists(StoredObject(destination_file_path))) + { + throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", destination_file_path); + } + + return std::make_shared( + destination_file_path, + object_storage, + configuration, + format_settings, + std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), + local_context); +} + void StorageObjectStorage::truncate( const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, @@ -635,5 +677,5 @@ void StorageObjectStorage::checkAlterIsPossible(const AlterCommands & commands, configuration->checkAlterIsPossible(commands); } - } + diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index d5b6dd026cde..9c118913ef46 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -7,6 +7,7 @@ #include #include #include +#include "Storages/ObjectStorage/ObjectStorageFilePathGenerator.h" #include #include #include @@ -77,6 +78,16 @@ class StorageObjectStorage : public IStorage ContextPtr context, bool async_insert) override; + + bool supportsImport() const override; + + SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) override; + void truncate( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 7920ecc74b48..efae7b129d7a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -169,6 +169,11 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( metadata.setColumns(columns); metadata.setConstraints(constraints_); + if (configuration->getPartitionStrategy()) + { + metadata.partition_key = configuration->getPartitionStrategy()->getPartitionKeyDescription(); + } + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns)); setInMemoryMetadata(metadata); @@ -551,6 +556,26 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } +bool StorageObjectStorageCluster::supportsImport() const +{ + if (pure_storage) + return pure_storage->supportsImport(); + return false; +} + +SinkToStoragePtr StorageObjectStorageCluster::import( + const std::string & file_name, + Block & block_with_partition_values, + std::string & destination_file_path, + bool overwrite_if_exists, + ContextPtr context) +{ + if (pure_storage) + return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); + + return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); +} + void StorageObjectStorageCluster::readFallBackToPure( QueryPlan & query_plan, const Names & column_names, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 5141db6638df..f8b05846f09c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -122,6 +122,14 @@ class StorageObjectStorageCluster : public IStorageCluster void onActionLockRemove(StorageActionBlockType action_type) override; + bool supportsImport() const override; + + SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) override; bool prefersLargeBlocks() const override; private: diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp index 475de8c31392..f030b31c6a3e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp @@ -88,6 +88,17 @@ void StorageObjectStorageConfiguration::initialize( } } + if (partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE) + { + file_path_generator = std::make_shared( + getRawPath().path, + format); + } + else + { + file_path_generator = std::make_shared(getRawPath().path); + } + if (format == "auto") { if (isDataLakeConfiguration()) @@ -105,8 +116,7 @@ void StorageObjectStorageConfiguration::initialize( else FormatFactory::instance().checkFormatName(format); - /// It might be changed on `StorageObjectStorageConfiguration::initPartitionStrategy` - read_path = getRawPath(); + read_path = file_path_generator->getPathForRead(); initialized = true; } @@ -124,7 +134,6 @@ void StorageObjectStorageConfiguration::initPartitionStrategy(ASTPtr partition_b if (partition_strategy) { - read_path = partition_strategy->getPathForRead(getRawPath().path); LOG_DEBUG(getLogger("StorageObjectStorageConfiguration"), "Initialized partition strategy {}", magic_enum::enum_name(partition_strategy_type)); } } @@ -136,14 +145,12 @@ const StorageObjectStorageConfiguration::Path & StorageObjectStorageConfiguratio StorageObjectStorageConfiguration::Path StorageObjectStorageConfiguration::getPathForWrite(const std::string & partition_id) const { - auto raw_path = getRawPath(); - - if (!partition_strategy) - { - return raw_path; - } + return getPathForWrite(partition_id, /* filename_override */ ""); +} - return Path {partition_strategy->getPathForWrite(raw_path.path, partition_id)}; +StorageObjectStorageConfiguration::Path StorageObjectStorageConfiguration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const +{ + return Path {file_path_generator->getPathForWrite(partition_id, filename_override)}; } bool StorageObjectStorageConfiguration::Path::hasPartitionWildcard() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index d607a9f424e3..ec9c9c10b9ab 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB { @@ -94,6 +95,8 @@ class StorageObjectStorageConfiguration // Path used for writing, it should not be globbed and might contain a partition key virtual Path getPathForWrite(const std::string & partition_id = "") const; + Path getPathForWrite(const std::string & partition_id, const std::string & filename_override) const; + virtual void setPathForRead(const Path & path) { read_path = path; @@ -271,15 +274,16 @@ class StorageObjectStorageConfiguration return false; } -private: - String format = "auto"; - String compression_method = "auto"; - String structure = "auto"; PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::NONE; + std::shared_ptr partition_strategy; /// Whether partition column values are contained in the actual data. /// And alternative is with hive partitioning, when they are contained in file path. bool partition_columns_in_data_file = true; - std::shared_ptr partition_strategy; + +private: + String format = "auto"; + String compression_method = "auto"; + String structure = "auto"; protected: bool initialized = false; @@ -288,6 +292,8 @@ class StorageObjectStorageConfiguration // Path used for reading, by default it is the same as `getRawPath` // When using `partition_strategy=hive`, a recursive reading pattern will be appended `'table_root/**.parquet' Path read_path; + + std::shared_ptr file_path_generator; }; using StorageObjectStorageConfigurationPtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index b8c5daf8a565..48525da19ec7 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -142,8 +142,7 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( std::optional format_settings_, SharedHeader sample_block_, ContextPtr context_) - : PartitionedSink(configuration_->getPartitionStrategy(), context_, sample_block_) - , object_storage(object_storage_) + : object_storage(object_storage_) , configuration(configuration_) , query_settings(configuration_->getQuerySettings(context_)) , format_settings(format_settings_) @@ -176,7 +175,7 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String object_storage, configuration, format_settings, - std::make_shared(partition_strategy->getFormatHeader()), + std::make_shared(configuration->partition_strategy->getFormatHeader()), context ); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index f4a775030715..39873998ad7a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -8,6 +8,8 @@ namespace DB { class StorageObjectStorageSink : public SinkToStorage { +friend class StorageObjectStorageImporterSink; + public: StorageObjectStorageSink( const std::string & path_, @@ -41,7 +43,7 @@ class StorageObjectStorageSink : public SinkToStorage void cancelBuffers(); }; -class PartitionedStorageObjectStorageSink : public PartitionedSink +class PartitionedStorageObjectStorageSink : public PartitionedSink::SinkCreator { public: PartitionedStorageObjectStorageSink( diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index c12da89d7ed4..96f49a60e511 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -130,6 +130,16 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.with_name = command_ast->with_name; return res; } + if (command_ast->type == ASTAlterCommand::EXPORT_PART) + { + PartitionCommand res; + res.type = EXPORT_PART; + res.partition = command_ast->partition->clone(); + res.part = command_ast->part; + res.to_database = command_ast->to_database; + res.to_table = command_ast->to_table; + return res; + } return {}; } @@ -171,6 +181,8 @@ std::string PartitionCommand::typeToString() const return "UNFREEZE ALL"; case PartitionCommand::Type::REPLACE_PARTITION: return "REPLACE PARTITION"; + case PartitionCommand::Type::EXPORT_PART: + return "EXPORT PART"; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Uninitialized partition command"); } diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 917e510f24b4..15d2a7fb869f 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -33,6 +33,7 @@ struct PartitionCommand UNFREEZE_ALL_PARTITIONS, UNFREEZE_PARTITION, REPLACE_PARTITION, + EXPORT_PART, }; Type type = UNKNOWN; diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index 078237483154..2a3df191dd92 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -26,10 +26,12 @@ namespace ErrorCodes PartitionedSink::PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, SharedHeader source_header_) : SinkToStorage(source_header_) , partition_strategy(partition_strategy_) + , sink_creator(sink_creator_) , context(context_) , source_header(source_header_) { @@ -41,7 +43,7 @@ SinkPtr PartitionedSink::getSinkForPartitionKey(StringRef partition_key) auto it = partition_id_to_sink.find(partition_key); if (it == partition_id_to_sink.end()) { - auto sink = createSinkForPartition(partition_key.toString()); + auto sink = sink_creator->createSinkForPartition(partition_key.toString()); std::tie(it, std::ignore) = partition_id_to_sink.emplace(partition_key, sink); } diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index 444624ba6c8e..bc446477e9dd 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -17,10 +17,17 @@ namespace DB class PartitionedSink : public SinkToStorage { public: + struct SinkCreator + { + virtual ~SinkCreator() = default; + virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; + }; + static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}"; PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, SharedHeader source_header_); @@ -34,16 +41,15 @@ class PartitionedSink : public SinkToStorage void onFinish() override; - virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; - static void validatePartitionKey(const String & str, bool allow_slash); static String replaceWildcards(const String & haystack, const String & partition_id); + protected: std::shared_ptr partition_strategy; - private: + std::shared_ptr sink_creator; ContextPtr context; SharedHeader source_header; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index d57ca4f996a6..28e9da937032 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1989,7 +1989,7 @@ class StorageFileSink final : public SinkToStorage, WithContext std::unique_lock lock; }; -class PartitionedStorageFileSink : public PartitionedSink +class PartitionedStorageFileSink : public PartitionedSink::SinkCreator { public: PartitionedStorageFileSink( @@ -2004,7 +2004,7 @@ class PartitionedStorageFileSink : public PartitionedSink const String format_name_, ContextPtr context_, int flags_) - : PartitionedSink(partition_strategy_, context_, std::make_shared(metadata_snapshot_->getSampleBlock())) + : partition_strategy(partition_strategy_) , path(path_) , metadata_snapshot(metadata_snapshot_) , table_name_for_log(table_name_for_log_) @@ -2020,11 +2020,12 @@ class PartitionedStorageFileSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string filepath = partition_strategy->getPathForWrite(path, partition_id); + const auto file_path_generator = std::make_shared(path); + std::string filepath = file_path_generator->getPathForWrite(partition_id); fs::create_directories(fs::path(filepath).parent_path()); - validatePartitionKey(filepath, true); + PartitionedSink::validatePartitionKey(filepath, true); checkCreationIsAllowed(context, context->getUserFilesPath(), filepath, /*can_be_directory=*/ true); return std::make_shared( metadata_snapshot, @@ -2041,6 +2042,7 @@ class PartitionedStorageFileSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String path; StorageMetadataPtr metadata_snapshot; String table_name_for_log; @@ -2092,7 +2094,7 @@ SinkToStoragePtr StorageFile::write( has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, metadata_snapshot, getStorageID().getNameForLogs(), @@ -2104,6 +2106,13 @@ SinkToStoragePtr StorageFile::write( format_name, context, flags); + + return std::make_shared( + partition_strategy, + sink_creator, + context, + std::make_shared(metadata_snapshot->getSampleBlock()) + ); } String path; @@ -2129,6 +2138,7 @@ SinkToStoragePtr StorageFile::write( String new_path; do { + new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos)); ++index; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 45331190bc1d..a46b85364be0 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -120,6 +120,7 @@ namespace ErrorCodes extern const int TABLE_IS_READ_ONLY; extern const int TOO_MANY_PARTS; extern const int PART_IS_LOCKED; + extern const int INCOMPATIBLE_COLUMNS; } namespace ActionLocks @@ -209,7 +210,7 @@ void StorageMergeTree::startup() try { background_operations_assignee.start(); - startBackgroundMovesIfNeeded(); + startBackgroundMoves(); startOutdatedAndUnexpectedDataPartsLoadingTask(); } catch (...) @@ -2812,12 +2813,6 @@ MutationCounters StorageMergeTree::getMutationCounters() const return mutation_counters; } -void StorageMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); -} - std::unique_ptr StorageMergeTree::getDefaultSettings() const { return std::make_unique(getContext()->getMergeTreeSettings()); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 00cbc7acdad8..0bffa6ead7d3 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -286,8 +286,6 @@ class StorageMergeTree final : public MergeTreeData std::unique_ptr fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock); std::unique_ptr fillNewPartNameAndResetLevel(MutableDataPartPtr & part, DataPartsLock & lock); - void startBackgroundMovesIfNeeded() override; - BackupEntries backupMutations(UInt64 version, const String & data_path_in_backup) const; /// Attaches restored parts to the storage. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3a4374a6c9dc..de377150f21e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5634,7 +5634,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooK restarting_thread.start(true); }); - startBackgroundMovesIfNeeded(); + startBackgroundMoves(); part_moves_between_shards_orchestrator.start(); @@ -9842,13 +9842,6 @@ MutationCounters StorageReplicatedMergeTree::getMutationCounters() const return queue.getMutationCounters(); } -void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); -} - - std::unique_ptr StorageReplicatedMergeTree::getDefaultSettings() const { return std::make_unique(getContext()->getReplicatedMergeTreeSettings()); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 487c3a3f44c0..5abf14c1400d 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -955,8 +955,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override; - void startBackgroundMovesIfNeeded() override; - /// Attaches restored parts to the storage. void attachRestoredParts(MutableDataPartsVector && parts) override; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index c3cd88dd6014..5c80831421f4 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -727,7 +727,7 @@ void StorageURLSink::cancelBuffers() write_buf->cancel(); } -class PartitionedStorageURLSink : public PartitionedSink +class PartitionedStorageURLSink : public PartitionedSink::SinkCreator { public: PartitionedStorageURLSink( @@ -741,7 +741,7 @@ class PartitionedStorageURLSink : public PartitionedSink const CompressionMethod compression_method_, const HTTPHeaderEntries & headers_, const String & http_method_) - : PartitionedSink(partition_strategy_, context_, std::make_shared(sample_block_)) + : partition_strategy(partition_strategy_) , uri(uri_) , format(format_) , format_settings(format_settings_) @@ -756,7 +756,8 @@ class PartitionedStorageURLSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string partition_path = partition_strategy->getPathForWrite(uri, partition_id); + const auto file_path_generator = std::make_shared(uri); + std::string partition_path = file_path_generator->getPathForWrite(partition_id); context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); return std::make_shared( @@ -764,6 +765,7 @@ class PartitionedStorageURLSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String uri; const String format; const std::optional format_settings; @@ -1445,7 +1447,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, uri, format_name, @@ -1456,6 +1458,8 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad compression_method, headers, http_method); + + return std::make_shared(partition_strategy, sink_creator, context, std::make_shared(metadata_snapshot->getSampleBlock())); } return std::make_shared( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp new file mode 100644 index 000000000000..979fa21708d6 --- /dev/null +++ b/src/Storages/System/StorageSystemExports.cpp @@ -0,0 +1,66 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +ColumnsDescription StorageSystemExports::getColumnsDescription() +{ + return ColumnsDescription + { + {"source_database", std::make_shared(), "Name of the source database."}, + {"source_table", std::make_shared(), "Name of the source table."}, + {"destination_database", std::make_shared(), "Name of the destination database."}, + {"destination_table", std::make_shared(), "Name of the destination table."}, + {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, + {"part_name", std::make_shared(), "Name of the part"}, + {"destination_file_path", std::make_shared(), "File path where the part is being exported."}, + {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, + {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, + {"total_rows_to_read", std::make_shared(), "The total number of rows to read from the exported part."}, + {"total_size_bytes_compressed", std::make_shared(), "The total size of the compressed data in the exported part."}, + {"total_size_bytes_uncompressed", std::make_shared(), "The total size of the uncompressed data in the exported part."}, + {"bytes_read_uncompressed", std::make_shared(), "The number of uncompressed bytes read from the exported part."}, + {"memory_usage", std::make_shared(), "Current memory usage in bytes for the export operation."}, + {"peak_memory_usage", std::make_shared(), "Peak memory usage in bytes during the export operation."}, + }; +} + +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const +{ + const auto access = context->getAccess(); + const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES); + + for (const auto & export_info : context->getExportsList().get()) + { + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, export_info.source_database, export_info.source_table)) + continue; + + size_t i = 0; + res_columns[i++]->insert(export_info.source_database); + res_columns[i++]->insert(export_info.source_table); + res_columns[i++]->insert(export_info.destination_database); + res_columns[i++]->insert(export_info.destination_table); + res_columns[i++]->insert(export_info.create_time); + res_columns[i++]->insert(export_info.part_name); + res_columns[i++]->insert(export_info.destination_file_path); + res_columns[i++]->insert(export_info.elapsed); + res_columns[i++]->insert(export_info.rows_read); + res_columns[i++]->insert(export_info.total_rows_to_read); + res_columns[i++]->insert(export_info.total_size_bytes_compressed); + res_columns[i++]->insert(export_info.total_size_bytes_uncompressed); + res_columns[i++]->insert(export_info.bytes_read_uncompressed); + res_columns[i++]->insert(export_info.memory_usage); + res_columns[i++]->insert(export_info.peak_memory_usage); + } +} + +} diff --git a/src/Storages/System/StorageSystemExports.h b/src/Storages/System/StorageSystemExports.h new file mode 100644 index 000000000000..e13fbfa26aaa --- /dev/null +++ b/src/Storages/System/StorageSystemExports.h @@ -0,0 +1,25 @@ +#pragma once + +#include + + +namespace DB +{ + +class Context; + + +class StorageSystemExports final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemExports"; } + + static ColumnsDescription getColumnsDescription(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const override; +}; + +} diff --git a/src/Storages/System/StorageSystemMerges.cpp b/src/Storages/System/StorageSystemMerges.cpp index 0fca5dc84a2b..c8c569ff4696 100644 --- a/src/Storages/System/StorageSystemMerges.cpp +++ b/src/Storages/System/StorageSystemMerges.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 9e996d5ca2b9..5a3a4d30599d 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -103,6 +104,7 @@ #include #include #include +#include #include #include @@ -208,6 +210,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "dimensional_metrics", "Contains dimensional metrics, which have multiple dimensions (labels) to provide more granular information. For example, counting failed merges by their error code. This table is always up to date."); attach(context, system_database, "merges", "Contains a list of merges currently executing merges of MergeTree tables and their progress. Each merge operation is represented by a single row."); attach(context, system_database, "moves", "Contains information about in-progress data part moves of MergeTree tables. Each data part movement is represented by a single row."); + attach(context, system_database, "exports", "Contains a list of exports currently executing exports of MergeTree tables and their progress. Each export operation is represented by a single row."); attach(context, system_database, "mutations", "Contains a list of mutations and their progress. Each mutation command is represented by a single row."); attachNoDescription(context, system_database, "replicas", "Contains information and status of all table replicas on current server. Each replica is represented by a single row."); attach(context, system_database, "replication_queue", "Contains information about tasks from replication queues stored in ClickHouse Keeper, or ZooKeeper, for each table replica."); diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 21fc0800752e..1bdf1cd250fd 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -43,6 +43,7 @@ ALTER TTL ['ALTER MODIFY TTL','MODIFY TTL'] TABLE ALTER TABLE ALTER MATERIALIZE TTL ['MATERIALIZE TTL'] TABLE ALTER TABLE ALTER SETTINGS ['ALTER SETTING','ALTER MODIFY SETTING','MODIFY SETTING','RESET SETTING'] TABLE ALTER TABLE ALTER MOVE PARTITION ['ALTER MOVE PART','MOVE PARTITION','MOVE PART'] TABLE ALTER TABLE +ALTER EXPORT PART ['ALTER EXPORT PART','EXPORT PART'] TABLE ALTER TABLE ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE ALTER UNLOCK SNAPSHOT ['UNLOCK SNAPSHOT'] TABLE ALTER TABLE diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference new file mode 100644 index 000000000000..d9089d37dd99 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -0,0 +1,16 @@ +---- Export 2020_1_1_0 and 2021_2_2_0 +---- Both data parts should appear +1 2020 +2 2020 +3 2020 +4 2021 +---- Export the same part again, it should be idempotent +1 2020 +2 2020 +3 2020 +4 2021 +---- Data in roundtrip MergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh new file mode 100755 index 000000000000..1b7efb2d0379 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +mt_table="mt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +mt_table_roundtrip="mt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" + +query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "INSERT INTO $mt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" +echo "---- Export 2020_1_1_0 and 2021_2_2_0" +query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +echo "---- Both data parts should appear" +query "SELECT * FROM $s3_table ORDER BY id" + +echo "---- Export the same part again, it should be idempotent" +query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +query "SELECT * FROM $s3_table ORDER BY id" + +query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip MergeTree table (should match s3_table)" +query "SELECT * FROM $s3_table ORDER BY id" + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..a61c066e8789 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -0,0 +1,22 @@ +-- Tags: no-parallel, no-fasttest + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_mt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference new file mode 100644 index 000000000000..07f1ec6376a6 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference @@ -0,0 +1,16 @@ +---- Get actual part names and export them +---- Both data parts should appear +1 2020 +2 2020 +3 2020 +4 2021 +---- Export the same part again, it should be idempotent +1 2020 +2 2020 +3 2020 +4 2021 +---- Data in roundtrip ReplicatedMergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh new file mode 100755 index 000000000000..4709faf6c2e4 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +# Tags: replica, no-parallel, no-replicated-database, no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +rmt_table="rmt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +rmt_table_roundtrip="rmt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $rmt_table, $s3_table, $rmt_table_roundtrip" + +query "CREATE TABLE $rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$rmt_table', 'replica1') PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "INSERT INTO $rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" + +echo "---- Get actual part names and export them" +part_2020=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$rmt_table' AND partition = '2020' ORDER BY name LIMIT 1" | tr -d '\n') +part_2021=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$rmt_table' AND partition = '2021' ORDER BY name LIMIT 1" | tr -d '\n') + +query "ALTER TABLE $rmt_table EXPORT PART '$part_2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $rmt_table EXPORT PART '$part_2021' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +echo "---- Both data parts should appear" +query "SELECT * FROM $s3_table ORDER BY id" + +echo "---- Export the same part again, it should be idempotent" +query "ALTER TABLE $rmt_table EXPORT PART '$part_2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +query "SELECT * FROM $s3_table ORDER BY id" + +query "CREATE TABLE $rmt_table_roundtrip ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$rmt_table_roundtrip', 'replica1') PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip ReplicatedMergeTree table (should match s3_table)" +query "SELECT * FROM $rmt_table_roundtrip ORDER BY id" + +query "DROP TABLE IF EXISTS $rmt_table, $s3_table, $rmt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..f8f23532f0a7 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql @@ -0,0 +1,22 @@ +-- Tags: no-parallel, no-fasttest + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt/03572_rmt_table', 'replica1') PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_rmt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table;