Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class IColumn;
M(Bool, move_all_conditions_to_prewhere, true, "Move all viable conditions from WHERE to PREWHERE", 0) \
M(Bool, enable_multiple_prewhere_read_steps, true, "Move more conditions from WHERE to PREWHERE and do reads from disk and filtering in multiple steps if there are multiple conditions combined with AND", 0) \
M(Bool, move_primary_key_columns_to_end_of_prewhere, true, "Move PREWHERE conditions containing primary key columns to the end of AND chain. It is likely that these conditions are taken into account during primary key analysis and thus will not contribute a lot to PREWHERE filtering.", 0) \
M(Bool, allow_reorder_prewhere_conditions, true, "When moving conditions from WHERE to PREWHERE, allow reordering them to optimize filtering.", 0) /* #71539 via https://github.com/Altinity/ClickHouse/pull/640 */ \
\
M(UInt64, alter_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) ALIAS(replication_alter_partitions_sync) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \
Expand Down Expand Up @@ -738,7 +739,7 @@ class IColumn;
M(Bool, query_plan_push_down_limit, true, "Allow to move LIMITs down in the query plan", 0) \
M(Bool, query_plan_split_filter, true, "Allow to split filters in the query plan", 0) \
M(Bool, query_plan_merge_expressions, true, "Allow to merge expressions in the query plan", 0) \
M(Bool, query_plan_merge_filters, false, "Allow to merge filters in the query plan", 0) \
M(Bool, query_plan_merge_filters, true, "Allow to merge filters in the query plan", 0) \
M(Bool, query_plan_filter_push_down, true, "Allow to push down filter by predicate query plan step", 0) \
M(Bool, query_plan_convert_outer_join_to_inner_join, true, "Allow to convert OUTER JOIN to INNER JOIN if filter after JOIN always filters default values", 0) \
M(Bool, query_plan_optimize_prewhere, true, "Allow to push down filter to PREWHERE expression for supported storages", 0) \
Expand Down
26 changes: 10 additions & 16 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,24 @@ String ClickHouseVersion::toString() const
/// Note: please check if the key already exists to prevent duplicate entries.
static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> settings_changes_history_initializer =
{
{"24.12",
{"24.8.14",
{
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."}, // #71539 via https://github.com/Altinity/ClickHouse/pull/640
{"allow_reorder_prewhere_conditions", false, true, "New setting."}, // #71539 via https://github.com/Altinity/ClickHouse/pull/640
}
},
{"24.11",
{"24.8.11",
{
{"push_external_roles_in_interserver_queries", false, true, "New setting."}
}
},
{"24.10",
{
}
},
{"24.9",
{
{"enable_parallel_replicas", false, false, "Parallel replicas with read tasks became the Beta tier feature."},
{"parallel_replicas_mode", "read_tasks", "read_tasks", "This setting was introduced as a part of making parallel replicas feature Beta"},
{"parallel_replicas_mark_segment_size", 128, 0, "Value for this setting now determined automatically"},
{"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"}
{"enable_parallel_replicas", false, false, "Parallel replicas with read tasks became the Beta tier feature."}, // #63151 via https://github.com/Altinity/ClickHouse/pull/542
{"parallel_replicas_mark_segment_size", 128, 0, "Value for this setting now determined automatically"}, // #68424 via https://github.com/Altinity/ClickHouse/pull/542
{"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"}, // #64448 via https://github.com/Altinity/ClickHouse/pull/542
{"parallel_replicas_mode", "read_tasks", "read_tasks", "This setting was introduced as a part of making parallel replicas feature Beta"}, // #63151 via https://github.com/Altinity/ClickHouse/pull/542
{"push_external_roles_in_interserver_queries", false, true, "New setting."}, // #70332 via https://github.com/Altinity/ClickHouse/pull/542
}
},
{"24.8",
{
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"enable_named_columns_in_function_tuple", false, false, "Retroactively disabled by default due to critical bugs."},
{"rows_before_aggregation", false, false, "Provide exact value for rows_before_aggregation statistic, represents the number of rows read before aggregation"},
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
Expand Down
8 changes: 7 additions & 1 deletion src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ namespace DB

BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{
const auto & query_settings = from->getSettingsRef();
BuildQueryPipelineSettings settings;
settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes);
settings.actions_settings = ExpressionActionsSettings::fromSettings(query_settings, CompileExpressions::yes);
settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback();

/// Setting query_plan_merge_filters is enabled by default.
/// But it can brake short-circuit without splitting filter step into smaller steps.
/// So, enable and disable this optimizations together.
settings.enable_multiple_filters_transforms_for_and_chain = query_settings.query_plan_merge_filters;
return settings;
}

Expand Down
2 changes: 2 additions & 0 deletions src/Processors/QueryPlan/BuildQueryPipelineSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ using QueryStatusPtr = std::shared_ptr<QueryStatus>;

struct BuildQueryPipelineSettings
{
bool enable_multiple_filters_transforms_for_and_chain = true;

ExpressionActionsSettings actions_settings;
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;
Expand Down
137 changes: 136 additions & 1 deletion src/Processors/QueryPlan/FilterStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunction.h>
#include <stack>
#include <ranges>

namespace DB
{
Expand Down Expand Up @@ -33,6 +38,92 @@ static ITransformingStep::Traits getTraits(const ActionsDAG & expression, const
};
}

static bool isTrivialSubtree(const ActionsDAG::Node * node)
{
while (node->type == ActionsDAG::ActionType::ALIAS)
node = node->children.at(0);

return node->type != ActionsDAG::ActionType::FUNCTION && node->type != ActionsDAG::ActionType::ARRAY_JOIN;
}

struct ActionsAndName
{
ActionsDAG dag;
std::string name;
};

static ActionsAndName splitSingleAndFilter(ActionsDAG & dag, const ActionsDAG::Node * filter_node)
{
auto split_result = dag.split({filter_node}, true);
dag = std::move(split_result.second);

const auto * split_filter_node = split_result.split_nodes_mapping[filter_node];
auto filter_type = removeLowCardinality(split_filter_node->result_type);
if (!filter_type->onlyNull() && !isUInt8(removeNullable(filter_type)))
{
DataTypePtr cast_type = std::make_shared<DataTypeUInt8>();
if (filter_type->isNullable())
cast_type = std::make_shared<DataTypeNullable>(std::move(cast_type));

split_filter_node = &split_result.first.addCast(*split_filter_node, cast_type, {});
}

split_result.first.getOutputs().emplace(split_result.first.getOutputs().begin(), split_filter_node);
auto name = split_filter_node->result_name;
return ActionsAndName{std::move(split_result.first), std::move(name)};
}

/// Try to split the left most AND atom to a separate DAG.
static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, const std::string & filter_name)
{
const auto * filter = &dag.findInOutputs(filter_name);
while (filter->type == ActionsDAG::ActionType::ALIAS)
filter = filter->children.at(0);

if (filter->type != ActionsDAG::ActionType::FUNCTION || filter->function_base->getName() != "and")
return {};

const ActionsDAG::Node * condition_to_split = nullptr;
std::stack<const ActionsDAG::Node *> nodes;
nodes.push(filter);
while (!nodes.empty())
{
const auto * node = nodes.top();
nodes.pop();

if (node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and")
{
/// The order is important. We should take the left-most atom, so put conditions on stack in reverse order.
for (const auto * child : node->children | std::ranges::views::reverse)
nodes.push(child);

continue;
}

if (isTrivialSubtree(node))
continue;

/// Do not split subtree if it's the last non-trivial one.
/// So, split the first found condition only when there is a another one found.
if (condition_to_split)
return splitSingleAndFilter(dag, condition_to_split);

condition_to_split = node;
}

return {};
}

std::vector<ActionsAndName> splitAndChainIntoMultipleFilters(ActionsDAG & dag, const std::string & filter_name)
{
std::vector<ActionsAndName> res;

while (auto condition = trySplitSingleAndFilter(dag, filter_name))
res.push_back(std::move(*condition));

return res;
}

FilterStep::FilterStep(
const DataStream & input_stream_,
ActionsDAG actions_dag_,
Expand All @@ -59,6 +150,23 @@ FilterStep::FilterStep(

void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
std::vector<ActionsAndName> and_atoms;

/// Splitting AND filter condition to steps under the setting, which is enabled with merge_filters optimization.
/// This is needed to support short-circuit properly.
if (settings.enable_multiple_filters_transforms_for_and_chain && !actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(actions_dag, filter_column_name);

for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, and_atom.name, true, on_totals);
});
}

auto expression = std::make_shared<ExpressionActions>(std::move(actions_dag), settings.getActionsSettings());

pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
Expand All @@ -85,18 +193,45 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
void FilterStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, settings.indent_char);

auto cloned_dag = actions_dag.clone();

std::vector<ActionsAndName> and_atoms;
if (!actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);

for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
settings.out << prefix << "AND column: " << and_atom.name << '\n';
expression->describeActions(settings.out, prefix);
}

settings.out << prefix << "Filter column: " << filter_column_name;

if (remove_filter_column)
settings.out << " (removed)";
settings.out << '\n';

auto expression = std::make_shared<ExpressionActions>(actions_dag.clone());
auto expression = std::make_shared<ExpressionActions>(std::move(cloned_dag));
expression->describeActions(settings.out, prefix);
}

void FilterStep::describeActions(JSONBuilder::JSONMap & map) const
{
auto cloned_dag = actions_dag.clone();

std::vector<ActionsAndName> and_atoms;
if (!actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);

for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
map.add("AND column", and_atom.name);
map.add("Expression", expression->toTree());
}

map.add("Filter Column", filter_column_name);
map.add("Removes Filter", remove_filter_column);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct QueryPlanOptimizationSettings
bool merge_expressions = true;

/// If merge-filters optimization is enabled.
bool merge_filters = false;
bool merge_filters = true;

/// If filter push down optimization is enabled.
bool filter_push_down = true;
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ namespace ProfileEvents

namespace DB
{

namespace ErrorCodes
{
extern const int INDEX_NOT_USED;
Expand All @@ -155,6 +154,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree
&& (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1),
.enable_multiple_prewhere_read_steps = settings.enable_multiple_prewhere_read_steps,
.force_short_circuit_execution = settings.query_plan_merge_filters
};
}

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
auto prewhere_actions = MergeTreeSelectProcessor::getPrewhereActions(
prewhere_info,
actions_settings,
reader_settings.enable_multiple_prewhere_read_steps);
reader_settings.enable_multiple_prewhere_read_steps, reader_settings.force_short_circuit_execution);

for (const auto & step : prewhere_actions.steps)
add_step(*step);
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeIOSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ struct MergeTreeReaderSettings
bool use_asynchronous_read_from_pool = false;
/// If PREWHERE has multiple conditions combined with AND, execute them in separate read/filtering steps.
bool enable_multiple_prewhere_read_steps = false;
/// In case of multiple prewhere steps, execute filtering earlier to support short-circuit properly.
bool force_short_circuit_execution = false;
/// If true, try to lower size of read buffer according to granule size and compressed block size.
bool adjust_read_buffer_size = true;
/// If true, it's allowed to read the whole part without reading marks.
Expand Down
8 changes: 4 additions & 4 deletions src/Storages/MergeTree/MergeTreeSelectProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
, algorithm(std::move(algorithm_))
, prewhere_info(prewhere_info_)
, actions_settings(actions_settings_)
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps))
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps, reader_settings_.force_short_circuit_execution))
, reader_settings(reader_settings_)
, block_size_params(block_size_params_)
, result_header(transformHeader(pool->getHeader(), prewhere_info))
Expand Down Expand Up @@ -126,9 +126,9 @@ String MergeTreeSelectProcessor::getName() const
return fmt::format("MergeTreeSelect(pool: {}, algorithm: {})", pool->getName(), algorithm->getName());
}

bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere);
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere, bool force_short_circuit_execution);

PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps)
PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps, bool force_short_circuit_execution)
{
PrewhereExprInfo prewhere_actions;
if (prewhere_info)
Expand All @@ -149,7 +149,7 @@ PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr pr
}

if (!enable_multiple_prewhere_read_steps ||
!tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions))
!tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions, force_short_circuit_execution))
{
PrewhereExprStep prewhere_step
{
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeSelectProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class MergeTreeSelectProcessor : private boost::noncopyable
static PrewhereExprInfo getPrewhereActions(
PrewhereInfoPtr prewhere_info,
const ExpressionActionsSettings & actions_settings,
bool enable_multiple_prewhere_read_steps);
bool enable_multiple_prewhere_read_steps,
bool force_short_circuit_execution);

void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }

Expand Down
Loading
Loading