Skip to content

Commit b22ea80

Browse files
committed
Add new method for getting pushed down filters in Parquet file reader
1 parent 1a8c093 commit b22ea80

File tree

3 files changed

+277
-133
lines changed

3 files changed

+277
-133
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,13 +372,13 @@ class ParquetFileFormat
372372
// Try to push down filters when filter push-down is enabled.
373373
val pushed = if (enableParquetFilterPushDown) {
374374
val parquetSchema = footerFileMetaData.getSchema
375-
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
376-
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
375+
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
376+
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
377377
filters
378378
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
379379
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
380380
// is used here.
381-
.flatMap(parquetFilters.createFilter(parquetSchema, _))
381+
.flatMap(parquetFilters.createFilter(_))
382382
.reduceOption(FilterApi.and)
383383
} else {
384384
None

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 111 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,38 @@ import org.apache.spark.unsafe.types.UTF8String
4141
* Some utility function to convert Spark data source filters to Parquet filters.
4242
*/
4343
private[parquet] class ParquetFilters(
44+
schema: MessageType,
4445
pushDownDate: Boolean,
4546
pushDownTimestamp: Boolean,
4647
pushDownDecimal: Boolean,
4748
pushDownStartWith: Boolean,
4849
pushDownInFilterThreshold: Int,
4950
caseSensitive: Boolean) {
51+
// A map which contains parquet field name and data type, if predicate push down applies.
52+
private val nameToParquetField : Map[String, ParquetField] = {
53+
// Here we don't flatten the fields in the nested schema but just look up through
54+
// root fields. Currently, accessing to nested fields does not push down filters
55+
// and it does not support to create filters for them.
56+
val primitiveFields =
57+
schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
58+
f.getName -> ParquetField(f.getName,
59+
ParquetSchemaType(f.getOriginalType,
60+
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
61+
}
62+
if (caseSensitive) {
63+
primitiveFields.toMap
64+
} else {
65+
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
66+
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
67+
// See: SPARK-25132.
68+
val dedupPrimitiveFields =
69+
primitiveFields
70+
.groupBy(_._1.toLowerCase(Locale.ROOT))
71+
.filter(_._2.size == 1)
72+
.mapValues(_.head._2)
73+
CaseInsensitiveMap(dedupPrimitiveFields)
74+
}
75+
}
5076

5177
/**
5278
* Holds a single field information stored in the underlying parquet file.
@@ -361,96 +387,104 @@ private[parquet] class ParquetFilters(
361387
FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
362388
}
363389

364-
/**
365-
* Returns a map, which contains parquet field name and data type, if predicate push down applies.
366-
*/
367-
private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = {
368-
// Here we don't flatten the fields in the nested schema but just look up through
369-
// root fields. Currently, accessing to nested fields does not push down filters
370-
// and it does not support to create filters for them.
371-
val primitiveFields =
372-
dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
373-
f.getName -> ParquetField(f.getName,
374-
ParquetSchemaType(f.getOriginalType,
375-
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
376-
}
377-
if (caseSensitive) {
378-
primitiveFields.toMap
379-
} else {
380-
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
381-
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
382-
// See: SPARK-25132.
383-
val dedupPrimitiveFields =
384-
primitiveFields
385-
.groupBy(_._1.toLowerCase(Locale.ROOT))
386-
.filter(_._2.size == 1)
387-
.mapValues(_.head._2)
388-
CaseInsensitiveMap(dedupPrimitiveFields)
390+
// Returns filters that can be pushed down when reading Parquet files.
391+
def convertibleFilters(filters: Seq[sources.Filter]): Seq[sources.Filter] = {
392+
filters.flatMap(convertibleFiltersHelper(_, canPartialPushDown = true))
393+
}
394+
395+
private def convertibleFiltersHelper(
396+
predicate: sources.Filter,
397+
canPartialPushDown: Boolean): Option[sources.Filter] = {
398+
predicate match {
399+
case sources.And(left, right) =>
400+
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
401+
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
402+
(leftResultOptional, rightResultOptional) match {
403+
case (Some(leftResult), Some(rightResult)) => Some(sources.And(leftResult, rightResult))
404+
case (Some(leftResult), None) if canPartialPushDown => Some(leftResult)
405+
case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult)
406+
case _ => None
407+
}
408+
409+
case sources.Or(left, right) =>
410+
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
411+
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
412+
if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) {
413+
None
414+
} else {
415+
Some(sources.Or(leftResultOptional.get, rightResultOptional.get))
416+
}
417+
case sources.Not(pred) =>
418+
val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false)
419+
resultOptional.map(sources.Not)
420+
421+
case other =>
422+
if (createFilter(other).isDefined) {
423+
Some(other)
424+
} else {
425+
None
426+
}
389427
}
390428
}
391429

392430
/**
393431
* Converts data sources filters to Parquet filter predicates.
394432
*/
395-
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
396-
val nameToParquetField = getFieldMap(schema)
397-
createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true)
433+
def createFilter(predicate: sources.Filter): Option[FilterPredicate] = {
434+
createFilterHelper(predicate, canPartialPushDownConjuncts = true)
435+
}
436+
437+
// Parquet's type in the given file should be matched to the value's type
438+
// in the pushed filter in order to push down the filter to Parquet.
439+
private def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
440+
value == null || (nameToParquetField(name).fieldType match {
441+
case ParquetBooleanType => value.isInstanceOf[JBoolean]
442+
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
443+
case ParquetLongType => value.isInstanceOf[JLong]
444+
case ParquetFloatType => value.isInstanceOf[JFloat]
445+
case ParquetDoubleType => value.isInstanceOf[JDouble]
446+
case ParquetStringType => value.isInstanceOf[String]
447+
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
448+
case ParquetDateType => value.isInstanceOf[Date]
449+
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
450+
value.isInstanceOf[Timestamp]
451+
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
452+
isDecimalMatched(value, decimalMeta)
453+
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
454+
isDecimalMatched(value, decimalMeta)
455+
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
456+
isDecimalMatched(value, decimalMeta)
457+
case _ => false
458+
})
459+
}
460+
461+
// Decimal type must make sure that filter value's scale matched the file.
462+
// If doesn't matched, which would cause data corruption.
463+
private def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
464+
case decimal: JBigDecimal =>
465+
decimal.scale == decimalMeta.getScale
466+
case _ => false
467+
}
468+
469+
// Parquet does not allow dots in the column name because dots are used as a column path
470+
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
471+
// with missing columns. The incorrect results could be got from Parquet when we push down
472+
// filters for the column having dots in the names. Thus, we do not push down such filters.
473+
// See SPARK-20364.
474+
private def canMakeFilterOn(name: String, value: Any): Boolean = {
475+
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
398476
}
399477

400478
/**
401-
* @param nameToParquetField a map from the field name to its field name and data type.
402-
* This only includes the root fields whose types are primitive types.
403479
* @param predicate the input filter predicates. Not all the predicates can be pushed down.
404480
* @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed
405481
* down safely. Pushing ONLY one side of AND down is safe to
406482
* do at the top level or none of its ancestors is NOT and OR.
407483
* @return the Parquet-native filter predicates that are eligible for pushdown.
408484
*/
409485
private def createFilterHelper(
410-
nameToParquetField: Map[String, ParquetField],
411486
predicate: sources.Filter,
412487
canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = {
413-
// Decimal type must make sure that filter value's scale matched the file.
414-
// If doesn't matched, which would cause data corruption.
415-
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
416-
case decimal: JBigDecimal =>
417-
decimal.scale == decimalMeta.getScale
418-
case _ => false
419-
}
420-
421-
// Parquet's type in the given file should be matched to the value's type
422-
// in the pushed filter in order to push down the filter to Parquet.
423-
def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
424-
value == null || (nameToParquetField(name).fieldType match {
425-
case ParquetBooleanType => value.isInstanceOf[JBoolean]
426-
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
427-
case ParquetLongType => value.isInstanceOf[JLong]
428-
case ParquetFloatType => value.isInstanceOf[JFloat]
429-
case ParquetDoubleType => value.isInstanceOf[JDouble]
430-
case ParquetStringType => value.isInstanceOf[String]
431-
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
432-
case ParquetDateType => value.isInstanceOf[Date]
433-
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
434-
value.isInstanceOf[Timestamp]
435-
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
436-
isDecimalMatched(value, decimalMeta)
437-
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
438-
isDecimalMatched(value, decimalMeta)
439-
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
440-
isDecimalMatched(value, decimalMeta)
441-
case _ => false
442-
})
443-
}
444-
445-
// Parquet does not allow dots in the column name because dots are used as a column path
446-
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
447-
// with missing columns. The incorrect results could be got from Parquet when we push down
448-
// filters for the column having dots in the names. Thus, we do not push down such filters.
449-
// See SPARK-20364.
450-
def canMakeFilterOn(name: String, value: Any): Boolean = {
451-
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
452-
}
453-
454488
// NOTE:
455489
//
456490
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -515,9 +549,9 @@ private[parquet] class ParquetFilters(
515549
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
516550
// can be safely removed.
517551
val lhsFilterOption =
518-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
552+
createFilterHelper(lhs, canPartialPushDownConjuncts)
519553
val rhsFilterOption =
520-
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
554+
createFilterHelper(rhs, canPartialPushDownConjuncts)
521555

522556
(lhsFilterOption, rhsFilterOption) match {
523557
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
@@ -539,14 +573,12 @@ private[parquet] class ParquetFilters(
539573
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
540574
// As per the logical in And predicate, we can push down (a1 OR b1).
541575
for {
542-
lhsFilter <-
543-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
544-
rhsFilter <-
545-
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
576+
lhsFilter <- createFilterHelper(lhs, canPartialPushDownConjuncts)
577+
rhsFilter <- createFilterHelper(rhs, canPartialPushDownConjuncts)
546578
} yield FilterApi.or(lhsFilter, rhsFilter)
547579

548580
case sources.Not(pred) =>
549-
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
581+
createFilterHelper(pred, canPartialPushDownConjuncts = false)
550582
.map(FilterApi.not)
551583

552584
case sources.In(name, values) if canMakeFilterOn(name, values.head)

0 commit comments

Comments
 (0)