-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17075][SQL][followup] fix filter estimation issues #17148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation | |
|
|
||
| import scala.collection.immutable.HashSet | ||
| import scala.collection.mutable | ||
| import scala.math.BigDecimal.RoundingMode | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.catalyst.CatalystConf | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Statistics} | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
@@ -52,17 +53,19 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| def estimate: Option[Statistics] = { | ||
| if (childStats.rowCount.isEmpty) return None | ||
|
|
||
| // save a mutable copy of colStats so that we can later change it recursively | ||
| // Save a mutable copy of colStats so that we can later change it recursively. | ||
| colStatsMap.setInitValues(childStats.attributeStats) | ||
|
|
||
| // estimate selectivity of this filter predicate | ||
| val filterSelectivity: Double = calculateFilterSelectivity(plan.condition) match { | ||
| case Some(percent) => percent | ||
| // for not-supported condition, set filter selectivity to a conservative estimate 100% | ||
| case None => 1.0 | ||
| } | ||
| // Estimate selectivity of this filter predicate, and update column stats if needed. | ||
| // For not-supported condition, set filter selectivity to a conservative estimate 100% | ||
| val filterSelectivity: Double = calculateFilterSelectivity(plan.condition).getOrElse(1.0) | ||
|
|
||
| val newColStats = colStatsMap.toColumnStats | ||
| val newColStats = if (filterSelectivity == 0) { | ||
| // The output is empty, we don't need to keep column stats. | ||
| AttributeMap[ColumnStat](Nil) | ||
| } else { | ||
| colStatsMap.toColumnStats | ||
| } | ||
|
|
||
| val filteredRowCount: BigInt = | ||
| EstimationUtils.ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity) | ||
|
|
@@ -74,12 +77,14 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| } | ||
|
|
||
| /** | ||
| * Returns a percentage of rows meeting a compound condition in Filter node. | ||
| * A compound condition is decomposed into multiple single conditions linked with AND, OR, NOT. | ||
| * Returns a percentage of rows meeting a condition in Filter node. | ||
| * If it's a single condition, we calculate the percentage directly. | ||
| * If it's a compound condition, it is decomposed into multiple single conditions linked with | ||
| * AND, OR, NOT. | ||
| * For logical AND conditions, we need to update stats after a condition estimation | ||
| * so that the stats will be more accurate for subsequent estimation. This is needed for | ||
| * range condition such as (c > 40 AND c <= 50) | ||
| * For logical OR conditions, we do not update stats after a condition estimation. | ||
| * For logical OR and NOT conditions, we do not update stats after a condition estimation. | ||
| * | ||
| * @param condition the compound logical expression | ||
| * @param update a boolean flag to specify if we need to update ColumnStat of a column | ||
|
|
@@ -90,34 +95,29 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = { | ||
| condition match { | ||
| case And(cond1, cond2) => | ||
| // For ease of debugging, we compute percent1 and percent2 in 2 statements. | ||
| val percent1 = calculateFilterSelectivity(cond1, update) | ||
| val percent2 = calculateFilterSelectivity(cond2, update) | ||
| (percent1, percent2) match { | ||
| case (Some(p1), Some(p2)) => Some(p1 * p2) | ||
| case (Some(p1), None) => Some(p1) | ||
| case (None, Some(p2)) => Some(p2) | ||
| case (None, None) => None | ||
| } | ||
| val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(1.0) | ||
| val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(1.0) | ||
| Some(percent1 * percent2) | ||
|
|
||
| case Or(cond1, cond2) => | ||
| // For ease of debugging, we compute percent1 and percent2 in 2 statements. | ||
| val percent1 = calculateFilterSelectivity(cond1, update = false) | ||
| val percent2 = calculateFilterSelectivity(cond2, update = false) | ||
| (percent1, percent2) match { | ||
| case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2))) | ||
| case (Some(p1), None) => Some(1.0) | ||
| case (None, Some(p2)) => Some(1.0) | ||
| case (None, None) => None | ||
| } | ||
| val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(1.0) | ||
| val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(1.0) | ||
| Some(percent1 + percent2 - (percent1 * percent2)) | ||
|
|
||
| case Not(cond) => calculateFilterSelectivity(cond, update = false) match { | ||
| case Some(percent) => Some(1.0 - percent) | ||
| // for not-supported condition, set filter selectivity to a conservative estimate 100% | ||
| case None => None | ||
| } | ||
| case Not(And(cond1, cond2)) => | ||
| calculateFilterSelectivity(Or(Not(cond1), Not(cond2)), update = false) | ||
|
|
||
| case Not(Or(cond1, cond2)) => | ||
| calculateFilterSelectivity(And(Not(cond1), Not(cond2)), update = false) | ||
|
|
||
| case _ => calculateSingleCondition(condition, update) | ||
| case Not(cond) => | ||
| calculateFilterSelectivity(cond, update = false) match { | ||
| case Some(percent) => Some(1.0 - percent) | ||
| case None => None | ||
| } | ||
|
|
||
| case _ => | ||
| calculateSingleCondition(condition, update) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -225,12 +225,12 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| } | ||
|
|
||
| val percent = if (isNull) { | ||
| nullPercent.toDouble | ||
| nullPercent | ||
| } else { | ||
| 1.0 - nullPercent.toDouble | ||
| 1.0 - nullPercent | ||
| } | ||
|
|
||
| Some(percent) | ||
| Some(percent.toDouble) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -249,17 +249,19 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| attr: Attribute, | ||
| literal: Literal, | ||
| update: Boolean): Option[Double] = { | ||
| if (!colStatsMap.contains(attr)) { | ||
| logDebug("[CBO] No statistics for " + attr) | ||
| return None | ||
| } | ||
|
|
||
| attr.dataType match { | ||
| case _: NumericType | DateType | TimestampType => | ||
| case _: NumericType | DateType | TimestampType | BooleanType => | ||
| evaluateBinaryForNumeric(op, attr, literal, update) | ||
| case StringType | BinaryType => | ||
| // TODO: It is difficult to support other binary comparisons for String/Binary | ||
| // type without min/max and advanced statistics like histogram. | ||
| logDebug("[CBO] No range comparison statistics for String/Binary type " + attr) | ||
| None | ||
| case _ => | ||
| // TODO: support boolean type. | ||
| None | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -291,6 +293,10 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| * Returns a percentage of rows meeting an equality (=) expression. | ||
| * This method evaluates the equality predicate for all data types. | ||
| * | ||
| * For EqualNullSafe (<=>), if the literal is not null, result will be the same as EqualTo; | ||
| * if the literal is null, the condition will be changed to IsNull after optimization. | ||
| * So we don't need specific logic for EqualNullSafe here. | ||
| * | ||
| * @param attr an Attribute (or a column) | ||
| * @param literal a literal value (or constant) | ||
| * @param update a boolean flag to specify if we need to update ColumnStat of a given column | ||
|
|
@@ -323,7 +329,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| colStatsMap(attr) = newStats | ||
| } | ||
|
|
||
| Some(1.0 / ndv.toDouble) | ||
| Some((1.0 / BigDecimal(ndv)).toDouble) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ndv is a BigInt, its range is bigger than double, so |
||
| } else { | ||
| Some(0.0) | ||
| } | ||
|
|
@@ -394,12 +400,12 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
|
|
||
| // return the filter selectivity. Without advanced statistics such as histograms, | ||
| // we have to assume uniform distribution. | ||
| Some(math.min(1.0, newNdv.toDouble / ndv.toDouble)) | ||
| Some(math.min(1.0, (BigDecimal(newNdv) / BigDecimal(ndv)).toDouble)) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a percentage of rows meeting a binary comparison expression. | ||
| * This method evaluate expression for Numeric columns only. | ||
| * This method evaluate expression for Numeric/Date/Timestamp/Boolean columns. | ||
| * | ||
| * @param op a binary comparison operator uch as =, <, <=, >, >= | ||
| * @param attr an Attribute (or a column) | ||
|
|
@@ -414,53 +420,66 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| literal: Literal, | ||
| update: Boolean): Option[Double] = { | ||
|
|
||
| var percent = 1.0 | ||
| val colStat = colStatsMap(attr) | ||
| val statsRange = | ||
| Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange] | ||
| val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange] | ||
| val max = BigDecimal(statsRange.max) | ||
| val min = BigDecimal(statsRange.min) | ||
| val ndv = BigDecimal(colStat.distinctCount) | ||
|
|
||
| // determine the overlapping degree between predicate range and column's range | ||
| val literalValueBD = BigDecimal(literal.value.toString) | ||
| val numericLiteral = if (literal.dataType == BooleanType) { | ||
| if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0) | ||
| } else { | ||
| BigDecimal(literal.value.toString) | ||
| } | ||
| val (noOverlap: Boolean, completeOverlap: Boolean) = op match { | ||
| case _: LessThan => | ||
| (literalValueBD <= statsRange.min, literalValueBD > statsRange.max) | ||
| (numericLiteral <= min, numericLiteral > max) | ||
| case _: LessThanOrEqual => | ||
| (literalValueBD < statsRange.min, literalValueBD >= statsRange.max) | ||
| (numericLiteral < min, numericLiteral >= max) | ||
| case _: GreaterThan => | ||
| (literalValueBD >= statsRange.max, literalValueBD < statsRange.min) | ||
| (numericLiteral >= max, numericLiteral < min) | ||
| case _: GreaterThanOrEqual => | ||
| (literalValueBD > statsRange.max, literalValueBD <= statsRange.min) | ||
| (numericLiteral > max, numericLiteral <= min) | ||
| } | ||
|
|
||
| var percent = BigDecimal(1.0) | ||
| if (noOverlap) { | ||
| percent = 0.0 | ||
| } else if (completeOverlap) { | ||
| percent = 1.0 | ||
| } else { | ||
| // this is partial overlap case | ||
| val literalDouble = literalValueBD.toDouble | ||
| val maxDouble = BigDecimal(statsRange.max).toDouble | ||
| val minDouble = BigDecimal(statsRange.min).toDouble | ||
|
|
||
| // This is the partial overlap case: | ||
| // Without advanced statistics like histogram, we assume uniform data distribution. | ||
| // We just prorate the adjusted range over the initial range to compute filter selectivity. | ||
| // For ease of computation, we convert all relevant numeric values to Double. | ||
| assert(max > min) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why? it's possible after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's in the partial overlap case, if max == min, it must be either no overlap or complete overlap for a binary expression. see here: |
||
| percent = op match { | ||
| case _: LessThan => | ||
| (literalDouble - minDouble) / (maxDouble - minDouble) | ||
| if (numericLiteral == max) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add some comments to explain this special case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. updated. |
||
| // If the literal value is right on the boundary, we can minus the part of the | ||
| // boundary value (1/ndv). | ||
| 1.0 - 1.0 / ndv | ||
| } else { | ||
| (numericLiteral - min) / (max - min) | ||
| } | ||
| case _: LessThanOrEqual => | ||
| if (literalValueBD == BigDecimal(statsRange.min)) { | ||
| 1.0 / colStat.distinctCount.toDouble | ||
| if (numericLiteral == min) { | ||
| // The boundary value is the only satisfying value. | ||
| 1.0 / ndv | ||
| } else { | ||
| (literalDouble - minDouble) / (maxDouble - minDouble) | ||
| (numericLiteral - min) / (max - min) | ||
| } | ||
| case _: GreaterThan => | ||
| (maxDouble - literalDouble) / (maxDouble - minDouble) | ||
| if (numericLiteral == min) { | ||
| 1.0 - 1.0 / ndv | ||
| } else { | ||
| (max - numericLiteral) / (max - min) | ||
| } | ||
| case _: GreaterThanOrEqual => | ||
| if (literalValueBD == BigDecimal(statsRange.max)) { | ||
| 1.0 / colStat.distinctCount.toDouble | ||
| if (numericLiteral == max) { | ||
| 1.0 / ndv | ||
| } else { | ||
| (maxDouble - literalDouble) / (maxDouble - minDouble) | ||
| (max - numericLiteral) / (max - min) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -469,22 +488,25 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo | |
| val newValue = convertBoundValue(attr.dataType, literal.value) | ||
| var newMax = colStat.max | ||
| var newMin = colStat.min | ||
| var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() | ||
| if (newNdv < 1) newNdv = 1 | ||
|
|
||
| op match { | ||
| case _: GreaterThan => newMin = newValue | ||
| case _: GreaterThanOrEqual => newMin = newValue | ||
| case _: LessThan => newMax = newValue | ||
| case _: LessThanOrEqual => newMax = newValue | ||
| case _: GreaterThan | _: GreaterThanOrEqual => | ||
| // If new ndv is 1, then new max must be equal to new min. | ||
| newMin = if (newNdv == 1) newMax else newValue | ||
| case _: LessThan | _: LessThanOrEqual => | ||
| newMax = if (newNdv == 1) newMin else newValue | ||
| } | ||
|
|
||
| val newNdv = math.max(math.round(colStat.distinctCount.toDouble * percent), 1) | ||
| val newStats = colStat.copy(distinctCount = newNdv, min = newMin, | ||
| max = newMax, nullCount = 0) | ||
| val newStats = | ||
| colStat.copy(distinctCount = newNdv, min = newMin, max = newMax, nullCount = 0) | ||
|
|
||
| colStatsMap(attr) = newStats | ||
| } | ||
| } | ||
|
|
||
| Some(percent) | ||
| Some(percent.toDouble) | ||
| } | ||
|
|
||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have stats, there's no need to go through the logic below.