Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ObjectFilterStep.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AggregatingTransform.h>
Expand Down Expand Up @@ -189,6 +190,7 @@ namespace Setting
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool optimize_group_by_constant_keys;
extern const SettingsBool use_hive_partitioning;
}

namespace ServerSetting
Expand Down Expand Up @@ -1965,6 +1967,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P

if (expressions.second_stage || from_aggregation_stage)
{
if (settings[Setting::use_hive_partitioning]
&& !expressions.first_stage
&& expressions.hasWhere())
{
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
{
auto object_filter_step = std::make_unique<ObjectFilterStep>(
query_plan.getCurrentHeader(),
expressions.before_where->dag.clone(),
getSelectQuery().where()->getColumnName());

object_filter_step->setStepDescription("WHERE");
query_plan.addStep(std::move(object_filter_step));
}
}

if (from_aggregation_stage)
{
/// No need to aggregate anything, since this was done on remote shards.
Expand Down
140 changes: 73 additions & 67 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,79 @@ def test_remote_no_hedged(started_cluster):
assert TSV(pure_s3) == TSV(s3_distributed)


def test_hive_partitioning(started_cluster):
def test_distributed_s3_table_engine(started_cluster):
node = started_cluster.instances["s0_0_0"]

resp_def = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
"""
)

node.query("DROP TABLE IF EXISTS single_node");
node.query(
"""
CREATE TABLE single_node
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
"""
)
query_id_engine_single_node = str(uuid.uuid4())
resp_engine_single_node = node.query(
"""
SELECT * FROM single_node ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_single_node
)
assert resp_def == resp_engine_single_node

node.query("DROP TABLE IF EXISTS distributed");
node.query(
"""
CREATE TABLE distributed
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
SETTINGS object_storage_cluster='cluster_simple'
"""
)
query_id_engine_distributed = str(uuid.uuid4())
resp_engine_distributed = node.query(
"""
SELECT * FROM distributed ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_distributed
)
assert resp_def == resp_engine_distributed

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")

hosts_engine_single_node = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
"""
)
assert int(hosts_engine_single_node) == 1
hosts_engine_distributed = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
"""
)
assert int(hosts_engine_distributed) == 3


@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1])
def test_hive_partitioning(started_cluster, allow_experimental_analyzer):
node = started_cluster.instances["s0_0_0"]

node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}")

for i in range(1, 5):
exists = node.query(
f"""
Expand Down Expand Up @@ -846,69 +917,4 @@ def test_hive_partitioning(started_cluster):
cluster_optimized_traffic = int(cluster_optimized_traffic)
assert cluster_optimized_traffic == optimized_traffic


def test_distributed_s3_table_engine(started_cluster):
node = started_cluster.instances["s0_0_0"]

resp_def = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
"""
)

node.query("DROP TABLE IF EXISTS single_node");
node.query(
"""
CREATE TABLE single_node
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
"""
)
query_id_engine_single_node = str(uuid.uuid4())
resp_engine_single_node = node.query(
"""
SELECT * FROM single_node ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_single_node
)
assert resp_def == resp_engine_single_node

node.query("DROP TABLE IF EXISTS distributed");
node.query(
"""
CREATE TABLE distributed
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
SETTINGS object_storage_cluster='cluster_simple'
"""
)
query_id_engine_distributed = str(uuid.uuid4())
resp_engine_distributed = node.query(
"""
SELECT * FROM distributed ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_distributed
)
assert resp_def == resp_engine_distributed

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")

hosts_engine_single_node = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
"""
)
assert int(hosts_engine_single_node) == 1
hosts_engine_distributed = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
"""
)
assert int(hosts_engine_distributed) == 3
node.query("SET allow_experimental_analyzer = DEFAULT")
Loading