Skip to content

Commit dfa56f4

Browse files
authored
Merge pull request #950 from Altinity/feature/antalya-25.6.5/fix_remote_calls
25.6.5 Antalya port #583, #584, #703, #720 - fixes for s3Cluster distributed calls
2 parents af5b44d + 56a23e6 commit dfa56f4

File tree

19 files changed

+443
-52
lines changed

19 files changed

+443
-52
lines changed

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ void executeQuery(
455455
not_optimized_cluster->getName());
456456

457457
read_from_remote->setStepDescription("Read from remote replica");
458+
read_from_remote->setIsRemoteFunction(is_remote_function);
458459
plan->addStep(std::move(read_from_remote));
459460
plan->addInterpreterContext(new_context);
460461
plans.emplace_back(std::move(plan));

src/Interpreters/Context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2878,8 +2878,11 @@ void Context::setCurrentQueryId(const String & query_id)
28782878

28792879
client_info.current_query_id = query_id_to_set;
28802880

2881-
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
2881+
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
2882+
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
2883+
{
28822884
client_info.initial_query_id = client_info.current_query_id;
2885+
}
28832886
}
28842887

28852888
void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
#include <Processors/QueryPlan/TotalsHavingStep.h>
7272
#include <Processors/QueryPlan/WindowStep.h>
7373
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
74+
#include <Processors/QueryPlan/ObjectFilterStep.h>
7475
#include <Processors/Sources/NullSource.h>
7576
#include <Processors/Sources/SourceFromSingleChunk.h>
7677
#include <Processors/Transforms/AggregatingTransform.h>
@@ -83,6 +84,7 @@
8384
#include <Storages/StorageValues.h>
8485
#include <Storages/StorageView.h>
8586
#include <Storages/ReadInOrderOptimizer.h>
87+
#include <Storages/IStorageCluster.h>
8688

8789
#include <Columns/Collator.h>
8890
#include <Columns/ColumnAggregateFunction.h>
@@ -195,6 +197,7 @@ namespace Setting
195197
extern const SettingsUInt64 max_rows_to_transfer;
196198
extern const SettingsOverflowMode transfer_overflow_mode;
197199
extern const SettingsString implicit_table_at_top_level;
200+
extern const SettingsBool use_hive_partitioning;
198201
}
199202

200203
namespace ServerSetting
@@ -1972,6 +1975,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
19721975

19731976
if (expressions.second_stage || from_aggregation_stage)
19741977
{
1978+
if (settings[Setting::use_hive_partitioning]
1979+
&& !expressions.first_stage
1980+
&& expressions.hasWhere())
1981+
{
1982+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1983+
{
1984+
auto object_filter_step = std::make_unique<ObjectFilterStep>(
1985+
query_plan.getCurrentHeader(),
1986+
expressions.before_where->dag.clone(),
1987+
getSelectQuery().where()->getColumnName());
1988+
1989+
object_filter_step->setStepDescription("WHERE");
1990+
query_plan.addStep(std::move(object_filter_step));
1991+
}
1992+
}
1993+
19751994
if (from_aggregation_stage)
19761995
{
19771996
/// No need to aggregate anything, since this was done on remote shards.

src/Planner/Planner.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include <Processors/QueryPlan/WindowStep.h>
4040
#include <Processors/QueryPlan/ReadNothingStep.h>
4141
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
42+
#include <Processors/QueryPlan/ObjectFilterStep.h>
4243
#include <QueryPipeline/QueryPipelineBuilder.h>
4344

4445
#include <Interpreters/Context.h>
@@ -52,6 +53,7 @@
5253
#include <Storages/StorageDistributed.h>
5354
#include <Storages/StorageDummy.h>
5455
#include <Storages/StorageMerge.h>
56+
#include <Storages/IStorageCluster.h>
5557

5658
#include <AggregateFunctions/IAggregateFunction.h>
5759

@@ -143,6 +145,7 @@ namespace Setting
143145
extern const SettingsUInt64 max_rows_to_transfer;
144146
extern const SettingsOverflowMode transfer_overflow_mode;
145147
extern const SettingsBool enable_parallel_blocks_marshalling;
148+
extern const SettingsBool use_hive_partitioning;
146149
}
147150

148151
namespace ServerSetting
@@ -452,6 +455,19 @@ void addFilterStep(
452455
query_plan.addStep(std::move(where_step));
453456
}
454457

458+
void addObjectFilterStep(QueryPlan & query_plan,
459+
FilterAnalysisResult & filter_analysis_result,
460+
const std::string & step_description)
461+
{
462+
auto actions = std::move(filter_analysis_result.filter_actions->dag);
463+
464+
auto where_step = std::make_unique<ObjectFilterStep>(query_plan.getCurrentHeader(),
465+
std::move(actions),
466+
filter_analysis_result.filter_column_name);
467+
where_step->setStepDescription(step_description);
468+
query_plan.addStep(std::move(where_step));
469+
}
470+
455471
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
456472
const AggregationAnalysisResult & aggregation_analysis_result,
457473
const QueryAnalysisResult & query_analysis_result,
@@ -1754,6 +1770,16 @@ void Planner::buildPlanForQueryNode()
17541770

17551771
if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
17561772
{
1773+
if (settings[Setting::use_hive_partitioning]
1774+
&& !query_processing_info.isFirstStage()
1775+
&& expression_analysis_result.hasWhere())
1776+
{
1777+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1778+
{
1779+
addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
1780+
}
1781+
}
1782+
17571783
if (query_processing_info.isFromAggregationState())
17581784
{
17591785
/// Aggregation was performed on remote shards
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include <Processors/QueryPlan/ObjectFilterStep.h>
2+
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
3+
#include <Processors/QueryPlan/Serialization.h>
4+
#include <Processors/Transforms/FilterTransform.h>
5+
#include <IO/Operators.h>
6+
7+
#include <memory>
8+
9+
namespace DB
10+
{
11+
12+
namespace ErrorCodes
13+
{
14+
extern const int INCORRECT_DATA;
15+
}
16+
17+
ObjectFilterStep::ObjectFilterStep(
18+
const Header & input_header_,
19+
ActionsDAG actions_dag_,
20+
String filter_column_name_)
21+
: actions_dag(std::move(actions_dag_))
22+
, filter_column_name(std::move(filter_column_name_))
23+
{
24+
input_headers.emplace_back(input_header_);
25+
output_header = input_headers.front();
26+
}
27+
28+
QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */)
29+
{
30+
return std::move(pipelines.front());
31+
}
32+
33+
void ObjectFilterStep::updateOutputHeader()
34+
{
35+
output_header = input_headers.front();
36+
}
37+
38+
void ObjectFilterStep::serialize(Serialization & ctx) const
39+
{
40+
writeStringBinary(filter_column_name, ctx.out);
41+
42+
actions_dag.serialize(ctx.out, ctx.registry);
43+
}
44+
45+
std::unique_ptr<IQueryPlanStep> ObjectFilterStep::deserialize(Deserialization & ctx)
46+
{
47+
if (ctx.input_headers.size() != 1)
48+
throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream");
49+
50+
String filter_column_name;
51+
readStringBinary(filter_column_name, ctx.in);
52+
53+
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
54+
55+
return std::make_unique<ObjectFilterStep>(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name));
56+
}
57+
58+
void registerObjectFilterStep(QueryPlanStepRegistry & registry)
59+
{
60+
registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize);
61+
}
62+
63+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
#include <Processors/QueryPlan/IQueryPlanStep.h>
3+
#include <Interpreters/ActionsDAG.h>
4+
5+
namespace DB
6+
{
7+
8+
/// Implements WHERE operation.
9+
class ObjectFilterStep : public IQueryPlanStep
10+
{
11+
public:
12+
ObjectFilterStep(
13+
const Header & input_header_,
14+
ActionsDAG actions_dag_,
15+
String filter_column_name_);
16+
17+
String getName() const override { return "ObjectFilter"; }
18+
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;
19+
20+
const ActionsDAG & getExpression() const { return actions_dag; }
21+
ActionsDAG & getExpression() { return actions_dag; }
22+
const String & getFilterColumnName() const { return filter_column_name; }
23+
24+
void serialize(Serialization & ctx) const override;
25+
26+
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
27+
28+
private:
29+
void updateOutputHeader() override;
30+
31+
ActionsDAG actions_dag;
32+
String filter_column_name;
33+
};
34+
35+
}

src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <Processors/QueryPlan/FilterStep.h>
44
#include <Processors/QueryPlan/LimitStep.h>
55
#include <Processors/QueryPlan/SourceStepWithFilter.h>
6+
#include <Processors/QueryPlan/ObjectFilterStep.h>
67

78
namespace DB::QueryPlanOptimizations
89
{
@@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
4142
/// So this is likely not needed.
4243
continue;
4344
}
45+
else if (auto * object_filter_step = typeid_cast<ObjectFilterStep *>(iter->node->step.get()))
46+
{
47+
source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName());
48+
}
4449
else
4550
{
4651
break;

src/Processors/QueryPlan/QueryPlanStepRegistry.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry);
4949
void registerTotalsHavingStep(QueryPlanStepRegistry & registry);
5050
void registerExtremesStep(QueryPlanStepRegistry & registry);
5151
void registerJoinStep(QueryPlanStepRegistry & registry);
52+
void registerObjectFilterStep(QueryPlanStepRegistry & registry);
5253

5354
void registerReadFromTableStep(QueryPlanStepRegistry & registry);
5455
void registerReadFromTableFunctionStep(QueryPlanStepRegistry & registry);
@@ -73,6 +74,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
7374

7475
registerReadFromTableStep(registry);
7576
registerReadFromTableFunctionStep(registry);
77+
registerObjectFilterStep(registry);
7678
}
7779

7880
}

src/Processors/QueryPlan/ReadFromRemote.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,8 @@ void ReadFromRemote::addLazyPipe(
510510
my_stage = stage, my_storage = storage,
511511
add_agg_info, add_totals, add_extremes, async_read, async_query_sending,
512512
query_tree = shard.query_tree, planner_context = shard.planner_context,
513-
pushed_down_filters, parallel_marshalling_threads]() mutable
513+
pushed_down_filters, parallel_marshalling_threads,
514+
my_is_remote_function = is_remote_function]() mutable
514515
-> QueryPipelineBuilder
515516
{
516517
auto current_settings = my_context->getSettingsRef();
@@ -597,6 +598,8 @@ void ReadFromRemote::addLazyPipe(
597598
{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
598599
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
599600
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan);
601+
remote_query_executor->setRemoteFunction(my_is_remote_function);
602+
remote_query_executor->setShardCount(my_shard_count);
600603

601604
auto pipe = createRemoteSourcePipe(
602605
remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads);
@@ -687,6 +690,8 @@ void ReadFromRemote::addPipe(
687690
priority_func);
688691
remote_query_executor->setLogger(log);
689692
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
693+
remote_query_executor->setRemoteFunction(is_remote_function);
694+
remote_query_executor->setShardCount(shard_count);
690695

691696
if (!table_func_ptr)
692697
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
@@ -707,6 +712,8 @@ void ReadFromRemote::addPipe(
707712
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
708713
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan);
709714
remote_query_executor->setLogger(log);
715+
remote_query_executor->setRemoteFunction(is_remote_function);
716+
remote_query_executor->setShardCount(shard_count);
710717

711718
if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled)
712719
{

src/Processors/QueryPlan/ReadFromRemote.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
4646

4747
void enableMemoryBoundMerging();
4848
void enforceAggregationInOrder(const SortDescription & sort_description);
49+
void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
4950

5051
bool hasSerializedPlan() const;
5152

@@ -63,6 +64,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
6364
UInt32 shard_count;
6465
const String cluster_name;
6566
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
67+
bool is_remote_function = false;
6668

6769
Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const Header & out_header);
6870

0 commit comments

Comments
 (0)