Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ set(
statistics/statistics_collector.hpp
)

set(
QUERY_COMPILER_SOURCES
compiler/optimizer/strategy/pqp_partial_aggregation_rule.cpp
compiler/optimizer/strategy/pqp_partial_aggregation_rule.hpp
)

set(
SHARED_SOURCES
aggregate_traits.hpp
Expand Down
125 changes: 125 additions & 0 deletions src/lib/compiler/optimizer/strategy/pqp_partial_aggregation_rule.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include "pqp_partial_aggregation_rule.hpp"

#include <numeric>

#include "compiler/physical_query_plan/aggregate_operator_proxy.hpp"
#include "compiler/physical_query_plan/pqp_utils.hpp"
#include "compiler/plan_utils.hpp"
#include "expression/aggregate_expression.hpp"
#include "expression/expression_functional.hpp"
#include "expression/expression_utils.hpp"
#include "expression/pqp_column_expression.hpp"
#include "types.hpp"

using namespace skyrise::expression_functional; // NOLINT(google-build-using-namespace)

namespace skyrise {

const std::string& PqpPartialAggregationRule::Name() const {
static const std::string rule_name = "PqpPartialAggregationRule";
return rule_name;
}

void PqpPartialAggregationRule::ApplyTo(const std::shared_ptr<AbstractOperatorProxy>& pqp_root) const {
// Traverse PQP for pipeline-breaking AggregateOperatorProxies
VisitPqp(pqp_root, [&](const auto& operator_proxy) {
if (operator_proxy->Type() != OperatorType::kAggregate || !operator_proxy->IsPipelineBreaker()) {
return PqpVisitation::kVisitInputs;
}

// Pre-aggregation should happen in parallel to be useful.
if (operator_proxy->LeftInput()->OutputObjectsCount() == 1) {
return PqpVisitation::kVisitInputs;
}

// Perform optimization, if applicable.
auto aggregate_proxy = std::static_pointer_cast<AggregateOperatorProxy>(operator_proxy);
if (PqpPartialAggregationRule::AllowsPartialAggregation(aggregate_proxy)) {
PqpPartialAggregationRule::InsertPartialAggregation(aggregate_proxy);
}

return PqpVisitation::kVisitInputs;
});
}

bool PqpPartialAggregationRule::AllowsPartialAggregation(
const std::shared_ptr<AggregateOperatorProxy>& aggregate_proxy) {
// Currently, we only support SUM, MIN, MAX, COUNT and COUNT(*) aggregates for pre-aggregation.
for (const auto& expression : aggregate_proxy->Aggregates()) {
Assert(expression->type_ == ExpressionType::kAggregate, "Expression should have ExpressionType::kAggregate.");
const auto aggregate_expression = std::static_pointer_cast<AggregateExpression>(expression);

switch (aggregate_expression->aggregate_function_) {
case AggregateFunction::kAny:
// Since ANY is not an actual aggregate, c.f. DependentGroupByReductionRule, it does not block pre-aggregation.
break;
case AggregateFunction::kAvg:
// AVGs do not support pre-aggregation. However, they can be computed as SUM(a) / COUNT(a), which the
// LqpAverageRewriteRule takes care of.
return false;
case AggregateFunction::kCount:
break;
case AggregateFunction::kCountDistinct:
return false;
case AggregateFunction::kMax:
case AggregateFunction::kMin:
break;
case AggregateFunction::kStandardDeviationSample:
return false;
case AggregateFunction::kSum:
break;
default:
Fail("Unsupported AggregateFunction.");
}
}
return true;
}

void PqpPartialAggregationRule::InsertPartialAggregation(std::shared_ptr<AggregateOperatorProxy>& aggregate_proxy) {
/**
* (1) Create duplicate AggregateOperatorProxy for pre-aggregation, and push it below the original/final aggregation.
*/
auto pre_aggregate_operator_proxy = AggregateOperatorProxy::Make(aggregate_proxy->GroupByColumnIds(),
ExpressionsDeepCopy(aggregate_proxy->Aggregates()));
pre_aggregate_operator_proxy->SetComment("Pre-Aggregate");
std::static_pointer_cast<AggregateOperatorProxy>(pre_aggregate_operator_proxy)->SetIsPipelineBreaker(false);
PlanInsertNodeBelow<AbstractOperatorProxy>(aggregate_proxy, PlanInputSide::kLeft, pre_aggregate_operator_proxy);

/**
* (2) Update final aggregation's group-by column ids to match pre-aggregation's output group-by column ids.
*/
std::iota(aggregate_proxy->groupby_column_ids_.begin(), aggregate_proxy->groupby_column_ids_.end(), 0);

/**
* (3) Update final aggregation's AggregateExpressions.
*/
size_t groupby_column_ids_count = aggregate_proxy->GroupByColumnIds().size();
size_t aggregate_count = aggregate_proxy->Aggregates().size();
auto& aggregates = aggregate_proxy->aggregates_;

for (size_t i = 0; i < aggregate_count; ++i) {
auto aggregate_expression = std::static_pointer_cast<AggregateExpression>(aggregates[i]);
Assert(aggregate_expression->Argument()->type_ == ExpressionType::kPqpColumn,
"Expected aggregate argument to have ExpressionType::kPqpColumn");
const auto argument = std::static_pointer_cast<PqpColumnExpression>(aggregate_expression->Argument());

// Update aggregate argument to match the output of the pre-aggregation.
// - Group-By columns are moved to the front indices.
// - Column data types might change in the course of the pre-aggregation.
// For example, a DataType::kFloat column becomes a DataType::kDouble column after a SUM aggregation.
const ColumnId updated_column_id = i + groupby_column_ids_count;
const DataType updated_data_type = aggregate_expression->GetDataType();
const auto updated_argument = PqpColumn_(updated_column_id, updated_data_type, argument->is_nullable_, argument->column_name_);

// Replace COUNT with SUM
auto aggregate_function = aggregate_expression->aggregate_function_;
if (aggregate_function == AggregateFunction::kCount) {
aggregate_function = AggregateFunction::kSum;
}

// Replace AggregateExpression
aggregates[i] = std::make_shared<AggregateExpression>(aggregate_function, updated_argument);
}
}

} // namespace skyrise
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "compiler/optimizer/abstract_rule.hpp"
#include "compiler/physical_query_plan/abstract_operator_proxy.hpp"

namespace skyrise {

class AggregateOperatorProxy;

/**
* Searches for AggregateOperatorProxies, which are defined as pipeline-breakers. Duplicates them to allow for
* pre-aggregation.
*/
class PqpPartialAggregationRule : public AbstractRule {
public:
const std::string& Name() const override;
void ApplyTo(const std::shared_ptr<AbstractOperatorProxy>& pqp_root) const override;

protected:
static bool AllowsPartialAggregation(const std::shared_ptr<AggregateOperatorProxy>& aggregate_proxy);
static void InsertPartialAggregation(std::shared_ptr<AggregateOperatorProxy>& aggregate_proxy);
};

} // namespace skyrise
2 changes: 2 additions & 0 deletions src/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ set(
benchmark/lib/lambda/lambda_benchmark_result_test.cpp
benchmark/lib/lambda/lambda_benchmark_runner_test.cpp

lib/compiler/optimizer/strategy/pqp_partial_aggregation_rule_test.cpp

lib/compiler/physical_query_plan/aggregate_operator_proxy_test.cpp
lib/compiler/physical_query_plan/alias_operator_proxy_test.cpp
lib/compiler/physical_query_plan/exchange_operator_proxy_test.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#include "compiler/optimizer/strategy/pqp_partial_aggregation_rule.hpp"

#include <string>
#include <vector>

#include <gtest/gtest.h>

#include "compiler/optimizer/strategy/strategy_base_test.hpp"
#include "compiler/physical_query_plan/aggregate_operator_proxy.hpp"
#include "compiler/physical_query_plan/import_operator_proxy.hpp"
#include "expression/aggregate_expression.hpp"
#include "expression/expression_functional.hpp"
#include "expression/expression_utils.hpp"
#include "expression/pqp_column_expression.hpp"
#include "types.hpp"

namespace skyrise {

using namespace skyrise::expression_functional; // NOLINT(google-build-using-namespace)

class PqpPartialAggregationRuleTest : public StrategyBaseTest {
public:
void SetUp() override {
rule_ = std::make_shared<PqpPartialAggregationRule>();

std::vector<ColumnId> column_ids = {ColumnId{0}, ColumnId{1}, ColumnId{2}, ColumnId{3}};
const std::vector<std::string> object_keys{"partition1", "partition2", "partition3"};
import_proxy_ = ImportOperatorProxy::Make("dummy_bucket", object_keys, column_ids);

a_ = PqpColumn_(ColumnId{0}, DataType::kLong, false, "a");
b_ = PqpColumn_(ColumnId{1}, DataType::kLong, false, "b");
c_ = PqpColumn_(ColumnId{2}, DataType::kLong, false, "c");
d_ = PqpColumn_(ColumnId{3}, DataType::kFloat, false, "d");
}

static void VerifyAggregate(const std::shared_ptr<AbstractExpression>& expression,
const AggregateFunction expected_function, const ColumnId expected_column_id,
const DataType expected_data_type, const std::string& expected_column_name) {
ASSERT_EQ(expression->type_, ExpressionType::kAggregate);
const auto aggregate_expression = std::static_pointer_cast<AggregateExpression>(expression);

EXPECT_EQ(aggregate_expression->aggregate_function_, expected_function);
ASSERT_EQ(aggregate_expression->Argument()->type_, ExpressionType::kPqpColumn);
const auto argument_pqp_expression =
std::static_pointer_cast<PqpColumnExpression>(aggregate_expression->Argument());
EXPECT_EQ(argument_pqp_expression->column_id_, expected_column_id);
EXPECT_EQ(argument_pqp_expression->data_type_, expected_data_type);
EXPECT_EQ(argument_pqp_expression->column_name_, expected_column_name);
}

protected:
std::shared_ptr<AbstractRule> rule_;
std::shared_ptr<ImportOperatorProxy> import_proxy_;
std::shared_ptr<PqpColumnExpression> a_;
std::shared_ptr<PqpColumnExpression> b_;
std::shared_ptr<PqpColumnExpression> c_;
std::shared_ptr<PqpColumnExpression> d_;
};

TEST_F(PqpPartialAggregationRuleTest, PartialAggregation) {
const std::vector<ColumnId> groupby_column_ids = {ColumnId{1}, ColumnId{2}};
const auto aggregates = ExpressionVector_(Sum_(d_), Min_(a_));
// clang-format off
auto pqp =
AggregateOperatorProxy::Make(groupby_column_ids, aggregates,
import_proxy_);
// clang-format on

StrategyBaseTest::ApplyRule(rule_, pqp);

EXPECT_EQ(pqp->Type(), OperatorType::kAggregate);
EXPECT_EQ(pqp->LeftInput()->Type(), OperatorType::kAggregate);
EXPECT_EQ(pqp->LeftInput()->LeftInput()->Type(), OperatorType::kImport);
{
auto pre_aggregate_proxy = std::static_pointer_cast<AggregateOperatorProxy>(pqp->LeftInput());
EXPECT_EQ(pre_aggregate_proxy->Comment(), "Pre-Aggregate");
EXPECT_FALSE(pre_aggregate_proxy->IsPipelineBreaker());

EXPECT_EQ(pre_aggregate_proxy->GroupByColumnIds(), groupby_column_ids);
EXPECT_TRUE(ExpressionsEqual(pre_aggregate_proxy->Aggregates(), aggregates));

VerifyAggregate(pre_aggregate_proxy->Aggregates().at(0), AggregateFunction::kSum, ColumnId{3}, DataType::kFloat, "d");
VerifyAggregate(pre_aggregate_proxy->Aggregates().at(1), AggregateFunction::kMin, ColumnId{0}, a_->data_type_, "a");
}
{
auto aggregate_proxy = std::static_pointer_cast<AggregateOperatorProxy>(pqp);
EXPECT_TRUE(aggregate_proxy->Comment().empty());
EXPECT_TRUE(aggregate_proxy->IsPipelineBreaker());

EXPECT_EQ(aggregate_proxy->GroupByColumnIds(), std::vector<ColumnId>({ColumnId{0}, ColumnId{1}}));
EXPECT_FALSE(ExpressionsEqual(aggregate_proxy->Aggregates(), aggregates));

// After SUM aggregations, DataType::kFloat input columns become DataType::kDouble aggregates, c.f. AggregateTraits.
VerifyAggregate(aggregate_proxy->Aggregates().at(0), AggregateFunction::kSum, ColumnId{2}, DataType::kDouble ,"d");
VerifyAggregate(aggregate_proxy->Aggregates().at(1), AggregateFunction::kMin, ColumnId{3}, a_->data_type_ ,"a");
}
}

TEST_F(PqpPartialAggregationRuleTest, PartialAggregationCounts) {
// COUNTs become SUMs in the final aggregation
const std::vector<ColumnId> groupby_column_ids = {ColumnId{2}, ColumnId{3}};
const auto aggregates = ExpressionVector_(CountStarPqp_(), Min_(a_), Count_(b_));
// clang-format off
auto pqp =
AggregateOperatorProxy::Make(groupby_column_ids, aggregates,
import_proxy_);
// clang-format on

StrategyBaseTest::ApplyRule(rule_, pqp);

EXPECT_EQ(pqp->Type(), OperatorType::kAggregate);
EXPECT_EQ(pqp->LeftInput()->Type(), OperatorType::kAggregate);
EXPECT_EQ(pqp->LeftInput()->LeftInput()->Type(), OperatorType::kImport);
{
auto pre_aggregate_proxy = std::static_pointer_cast<AggregateOperatorProxy>(pqp->LeftInput());
EXPECT_EQ(pre_aggregate_proxy->GroupByColumnIds(), groupby_column_ids);
EXPECT_TRUE(ExpressionsEqual(pre_aggregate_proxy->Aggregates(), aggregates));

VerifyAggregate(pre_aggregate_proxy->Aggregates().at(0), AggregateFunction::kCount, ColumnId{kInvalidColumnId},
DataType::kLong, "*");
VerifyAggregate(pre_aggregate_proxy->Aggregates().at(1), AggregateFunction::kMin, ColumnId{0}, a_->data_type_,"a");
VerifyAggregate(pre_aggregate_proxy->Aggregates().at(2), AggregateFunction::kCount, ColumnId{1}, b_->data_type_ ,"b");
}
{
auto aggregate_proxy = std::static_pointer_cast<AggregateOperatorProxy>(pqp);
EXPECT_EQ(aggregate_proxy->GroupByColumnIds(), std::vector<ColumnId>({ColumnId{0}, ColumnId{1}}));
EXPECT_FALSE(ExpressionsEqual(aggregate_proxy->Aggregates(), aggregates));

VerifyAggregate(aggregate_proxy->Aggregates().at(0), AggregateFunction::kSum, ColumnId{2}, DataType::kLong, "*");
VerifyAggregate(aggregate_proxy->Aggregates().at(1), AggregateFunction::kMin, ColumnId{3}, a_->data_type_ ,"a");
VerifyAggregate(aggregate_proxy->Aggregates().at(2), AggregateFunction::kSum, ColumnId{4}, b_->data_type_,"b");
}
}

TEST_F(PqpPartialAggregationRuleTest, PartialAggregationCountDistinct) {
// The rule should not touch aggregates involving unsupported AggregateExpressions, such as AVG or COUNT DISTINCT.
const std::vector<ColumnId> groupby_column_ids = {ColumnId{1}, ColumnId{2}};
const auto aggregates = ExpressionVector_(CountDistinct_(a_), Sum_(d_));
// clang-format off
auto pqp =
AggregateOperatorProxy::Make(groupby_column_ids, aggregates,
import_proxy_);
// clang-format on

StrategyBaseTest::ApplyRule(rule_, pqp);

EXPECT_EQ(pqp->Type(), OperatorType::kAggregate);
EXPECT_EQ(pqp->LeftInput()->Type(), OperatorType::kImport);

auto aggregate_proxy = std::static_pointer_cast<AggregateOperatorProxy>(pqp);
EXPECT_EQ(aggregate_proxy->GroupByColumnIds(), groupby_column_ids);
EXPECT_TRUE(ExpressionsEqual(aggregate_proxy->Aggregates(), aggregates));
}

} // namespace skyrise