Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation

import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.math.BigDecimal.RoundingMode

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Statistics}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -52,17 +53,19 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
def estimate: Option[Statistics] = {
if (childStats.rowCount.isEmpty) return None

// save a mutable copy of colStats so that we can later change it recursively
// Save a mutable copy of colStats so that we can later change it recursively.
colStatsMap.setInitValues(childStats.attributeStats)

// estimate selectivity of this filter predicate
val filterSelectivity: Double = calculateFilterSelectivity(plan.condition) match {
case Some(percent) => percent
// for not-supported condition, set filter selectivity to a conservative estimate 100%
case None => 1.0
}
// Estimate selectivity of this filter predicate, and update column stats if needed.
// For not-supported condition, set filter selectivity to a conservative estimate 100%
val filterSelectivity: Double = calculateFilterSelectivity(plan.condition).getOrElse(1.0)

val newColStats = colStatsMap.toColumnStats
val newColStats = if (filterSelectivity == 0) {
// The output is empty, we don't need to keep column stats.
AttributeMap[ColumnStat](Nil)
} else {
colStatsMap.toColumnStats
}

val filteredRowCount: BigInt =
EstimationUtils.ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
Expand All @@ -74,12 +77,14 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
}

/**
* Returns a percentage of rows meeting a compound condition in Filter node.
* A compound condition is decomposed into multiple single conditions linked with AND, OR, NOT.
* Returns a percentage of rows meeting a condition in Filter node.
* If it's a single condition, we calculate the percentage directly.
* If it's a compound condition, it is decomposed into multiple single conditions linked with
* AND, OR, NOT.
* For logical AND conditions, we need to update stats after a condition estimation
* so that the stats will be more accurate for subsequent estimation. This is needed for
* range condition such as (c > 40 AND c <= 50)
* For logical OR conditions, we do not update stats after a condition estimation.
* For logical OR and NOT conditions, we do not update stats after a condition estimation.
*
* @param condition the compound logical expression
* @param update a boolean flag to specify if we need to update ColumnStat of a column
Expand All @@ -90,34 +95,29 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = {
condition match {
case And(cond1, cond2) =>
// For ease of debugging, we compute percent1 and percent2 in 2 statements.
val percent1 = calculateFilterSelectivity(cond1, update)
val percent2 = calculateFilterSelectivity(cond2, update)
(percent1, percent2) match {
case (Some(p1), Some(p2)) => Some(p1 * p2)
case (Some(p1), None) => Some(p1)
case (None, Some(p2)) => Some(p2)
case (None, None) => None
}
val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(1.0)
val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(1.0)
Some(percent1 * percent2)

case Or(cond1, cond2) =>
// For ease of debugging, we compute percent1 and percent2 in 2 statements.
val percent1 = calculateFilterSelectivity(cond1, update = false)
val percent2 = calculateFilterSelectivity(cond2, update = false)
(percent1, percent2) match {
case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2)))
case (Some(p1), None) => Some(1.0)
case (None, Some(p2)) => Some(1.0)
case (None, None) => None
}
val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(1.0)
val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(1.0)
Some(percent1 + percent2 - (percent1 * percent2))

case Not(cond) => calculateFilterSelectivity(cond, update = false) match {
case Some(percent) => Some(1.0 - percent)
// for not-supported condition, set filter selectivity to a conservative estimate 100%
case None => None
}
case Not(And(cond1, cond2)) =>
calculateFilterSelectivity(Or(Not(cond1), Not(cond2)), update = false)

case Not(Or(cond1, cond2)) =>
calculateFilterSelectivity(And(Not(cond1), Not(cond2)), update = false)

case _ => calculateSingleCondition(condition, update)
case Not(cond) =>
calculateFilterSelectivity(cond, update = false) match {
case Some(percent) => Some(1.0 - percent)
case None => None
}

case _ =>
calculateSingleCondition(condition, update)
}
}

Expand Down Expand Up @@ -225,12 +225,12 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
}

val percent = if (isNull) {
nullPercent.toDouble
nullPercent
} else {
1.0 - nullPercent.toDouble
1.0 - nullPercent
}

Some(percent)
Some(percent.toDouble)
}

/**
Expand All @@ -249,17 +249,19 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
attr: Attribute,
literal: Literal,
update: Boolean): Option[Double] = {
if (!colStatsMap.contains(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have stats, there's no need to go through the logic below.

}

attr.dataType match {
case _: NumericType | DateType | TimestampType =>
case _: NumericType | DateType | TimestampType | BooleanType =>
evaluateBinaryForNumeric(op, attr, literal, update)
case StringType | BinaryType =>
// TODO: It is difficult to support other binary comparisons for String/Binary
// type without min/max and advanced statistics like histogram.
logDebug("[CBO] No range comparison statistics for String/Binary type " + attr)
None
case _ =>
// TODO: support boolean type.
None
}
}

Expand Down Expand Up @@ -291,6 +293,10 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
* Returns a percentage of rows meeting an equality (=) expression.
* This method evaluates the equality predicate for all data types.
*
* For EqualNullSafe (<=>), if the literal is not null, result will be the same as EqualTo;
* if the literal is null, the condition will be changed to IsNull after optimization.
* So we don't need specific logic for EqualNullSafe here.
*
* @param attr an Attribute (or a column)
* @param literal a literal value (or constant)
* @param update a boolean flag to specify if we need to update ColumnStat of a given column
Expand Down Expand Up @@ -323,7 +329,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
colStatsMap(attr) = newStats
}

Some(1.0 / ndv.toDouble)
Some((1.0 / BigDecimal(ndv)).toDouble)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ndv is a BigInt, its range is bigger than double, so toDouble is not safe here, while 1/ndv is in (0, 1), so toDouble is safe

} else {
Some(0.0)
}
Expand Down Expand Up @@ -394,12 +400,12 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo

// return the filter selectivity. Without advanced statistics such as histograms,
// we have to assume uniform distribution.
Some(math.min(1.0, newNdv.toDouble / ndv.toDouble))
Some(math.min(1.0, (BigDecimal(newNdv) / BigDecimal(ndv)).toDouble))
}

/**
* Returns a percentage of rows meeting a binary comparison expression.
* This method evaluate expression for Numeric columns only.
* This method evaluate expression for Numeric/Date/Timestamp/Boolean columns.
*
* @param op a binary comparison operator uch as =, <, <=, >, >=
* @param attr an Attribute (or a column)
Expand All @@ -414,53 +420,66 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
literal: Literal,
update: Boolean): Option[Double] = {

var percent = 1.0
val colStat = colStatsMap(attr)
val statsRange =
Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
val max = BigDecimal(statsRange.max)
val min = BigDecimal(statsRange.min)
val ndv = BigDecimal(colStat.distinctCount)

// determine the overlapping degree between predicate range and column's range
val literalValueBD = BigDecimal(literal.value.toString)
val numericLiteral = if (literal.dataType == BooleanType) {
if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0)
} else {
BigDecimal(literal.value.toString)
}
val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
case _: LessThan =>
(literalValueBD <= statsRange.min, literalValueBD > statsRange.max)
(numericLiteral <= min, numericLiteral > max)
case _: LessThanOrEqual =>
(literalValueBD < statsRange.min, literalValueBD >= statsRange.max)
(numericLiteral < min, numericLiteral >= max)
case _: GreaterThan =>
(literalValueBD >= statsRange.max, literalValueBD < statsRange.min)
(numericLiteral >= max, numericLiteral < min)
case _: GreaterThanOrEqual =>
(literalValueBD > statsRange.max, literalValueBD <= statsRange.min)
(numericLiteral > max, numericLiteral <= min)
}

var percent = BigDecimal(1.0)
if (noOverlap) {
percent = 0.0
} else if (completeOverlap) {
percent = 1.0
} else {
// this is partial overlap case
val literalDouble = literalValueBD.toDouble
val maxDouble = BigDecimal(statsRange.max).toDouble
val minDouble = BigDecimal(statsRange.min).toDouble

// This is the partial overlap case:
// Without advanced statistics like histogram, we assume uniform data distribution.
// We just prorate the adjusted range over the initial range to compute filter selectivity.
// For ease of computation, we convert all relevant numeric values to Double.
assert(max > min)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? it's possible after WHERE a = 1 that max and min are same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's in the partial overlap case, if max == min, it must be either no overlap or complete overlap for a binary expression. see here:

val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
      case _: LessThan =>
        (numericLiteral <= min, numericLiteral > max)
      case _: LessThanOrEqual =>
        (numericLiteral < min, numericLiteral >= max)
      case _: GreaterThan =>
        (numericLiteral >= max, numericLiteral < min)
      case _: GreaterThanOrEqual =>
        (numericLiteral > max, numericLiteral <= min)
    }

percent = op match {
case _: LessThan =>
(literalDouble - minDouble) / (maxDouble - minDouble)
if (numericLiteral == max) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments to explain this special case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. updated.

// If the literal value is right on the boundary, we can minus the part of the
// boundary value (1/ndv).
1.0 - 1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
}
case _: LessThanOrEqual =>
if (literalValueBD == BigDecimal(statsRange.min)) {
1.0 / colStat.distinctCount.toDouble
if (numericLiteral == min) {
// The boundary value is the only satisfying value.
1.0 / ndv
} else {
(literalDouble - minDouble) / (maxDouble - minDouble)
(numericLiteral - min) / (max - min)
}
case _: GreaterThan =>
(maxDouble - literalDouble) / (maxDouble - minDouble)
if (numericLiteral == min) {
1.0 - 1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
}
case _: GreaterThanOrEqual =>
if (literalValueBD == BigDecimal(statsRange.max)) {
1.0 / colStat.distinctCount.toDouble
if (numericLiteral == max) {
1.0 / ndv
} else {
(maxDouble - literalDouble) / (maxDouble - minDouble)
(max - numericLiteral) / (max - min)
}
}

Expand All @@ -469,22 +488,25 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
val newValue = convertBoundValue(attr.dataType, literal.value)
var newMax = colStat.max
var newMin = colStat.min
var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
if (newNdv < 1) newNdv = 1

op match {
case _: GreaterThan => newMin = newValue
case _: GreaterThanOrEqual => newMin = newValue
case _: LessThan => newMax = newValue
case _: LessThanOrEqual => newMax = newValue
case _: GreaterThan | _: GreaterThanOrEqual =>
// If new ndv is 1, then new max must be equal to new min.
newMin = if (newNdv == 1) newMax else newValue
case _: LessThan | _: LessThanOrEqual =>
newMax = if (newNdv == 1) newMin else newValue
}

val newNdv = math.max(math.round(colStat.distinctCount.toDouble * percent), 1)
val newStats = colStat.copy(distinctCount = newNdv, min = newMin,
max = newMax, nullCount = 0)
val newStats =
colStat.copy(distinctCount = newNdv, min = newMin, max = newMax, nullCount = 0)

colStatsMap(attr) = newStats
}
}

Some(percent)
Some(percent.toDouble)
}

}
Expand Down
Loading