@@ -41,12 +41,38 @@ import org.apache.spark.unsafe.types.UTF8String
4141 * Some utility function to convert Spark data source filters to Parquet filters. 
4242 */  
4343private [parquet] class  ParquetFilters (
44+     schema : MessageType ,
4445    pushDownDate : Boolean ,
4546    pushDownTimestamp : Boolean ,
4647    pushDownDecimal : Boolean ,
4748    pushDownStartWith : Boolean ,
4849    pushDownInFilterThreshold : Int ,
4950    caseSensitive : Boolean ) {
51+   //  A map which contains parquet field name and data type, if predicate push down applies.
52+   private  val  nameToParquetField  :  Map [String , ParquetField ] =  {
53+     //  Here we don't flatten the fields in the nested schema but just look up through
54+     //  root fields. Currently, accessing to nested fields does not push down filters
55+     //  and it does not support to create filters for them.
56+     val  primitiveFields  = 
57+     schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => 
58+       f.getName ->  ParquetField (f.getName,
59+         ParquetSchemaType (f.getOriginalType,
60+           f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
61+     }
62+     if  (caseSensitive) {
63+       primitiveFields.toMap
64+     } else  {
65+       //  Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
66+       //  mode, just skip pushdown for these fields, they will trigger Exception when reading,
67+       //  See: SPARK-25132.
68+       val  dedupPrimitiveFields  = 
69+       primitiveFields
70+         .groupBy(_._1.toLowerCase(Locale .ROOT ))
71+         .filter(_._2.size ==  1 )
72+         .mapValues(_.head._2)
73+       CaseInsensitiveMap (dedupPrimitiveFields)
74+     }
75+   }
5076
5177  /**  
5278   * Holds a single field information stored in the underlying parquet file. 
@@ -361,96 +387,104 @@ private[parquet] class ParquetFilters(
361387        FilterApi .gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf [JBigDecimal ], length))
362388  }
363389
364-   /**  
365-    * Returns a map, which contains parquet field name and data type, if predicate push down applies. 
366-    */  
367-   private  def  getFieldMap (dataType : MessageType ):  Map [String , ParquetField ] =  {
368-     //  Here we don't flatten the fields in the nested schema but just look up through
369-     //  root fields. Currently, accessing to nested fields does not push down filters
370-     //  and it does not support to create filters for them.
371-     val  primitiveFields  = 
372-       dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => 
373-         f.getName ->  ParquetField (f.getName,
374-           ParquetSchemaType (f.getOriginalType,
375-             f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
376-       }
377-     if  (caseSensitive) {
378-       primitiveFields.toMap
379-     } else  {
380-       //  Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
381-       //  mode, just skip pushdown for these fields, they will trigger Exception when reading,
382-       //  See: SPARK-25132.
383-       val  dedupPrimitiveFields  = 
384-         primitiveFields
385-           .groupBy(_._1.toLowerCase(Locale .ROOT ))
386-           .filter(_._2.size ==  1 )
387-           .mapValues(_.head._2)
388-       CaseInsensitiveMap (dedupPrimitiveFields)
390+   //  Returns filters that can be pushed down when reading Parquet files.
391+   def  convertibleFilters (filters : Seq [sources.Filter ]):  Seq [sources.Filter ] =  {
392+     filters.flatMap(convertibleFiltersHelper(_, canPartialPushDown =  true ))
393+   }
394+ 
395+   private  def  convertibleFiltersHelper (
396+       predicate : sources.Filter ,
397+       canPartialPushDown : Boolean ):  Option [sources.Filter ] =  {
398+     predicate match  {
399+       case  sources.And (left, right) => 
400+         val  leftResultOptional  =  convertibleFiltersHelper(left, canPartialPushDown)
401+         val  rightResultOptional  =  convertibleFiltersHelper(right, canPartialPushDown)
402+         (leftResultOptional, rightResultOptional) match  {
403+           case  (Some (leftResult), Some (rightResult)) =>  Some (sources.And (leftResult, rightResult))
404+           case  (Some (leftResult), None ) if  canPartialPushDown =>  Some (leftResult)
405+           case  (None , Some (rightResult)) if  canPartialPushDown =>  Some (rightResult)
406+           case  _ =>  None 
407+         }
408+ 
409+       case  sources.Or (left, right) => 
410+         val  leftResultOptional  =  convertibleFiltersHelper(left, canPartialPushDown)
411+         val  rightResultOptional  =  convertibleFiltersHelper(right, canPartialPushDown)
412+         if  (leftResultOptional.isEmpty ||  rightResultOptional.isEmpty) {
413+           None 
414+         } else  {
415+           Some (sources.Or (leftResultOptional.get, rightResultOptional.get))
416+         }
417+       case  sources.Not (pred) => 
418+         val  resultOptional  =  convertibleFiltersHelper(pred, canPartialPushDown =  false )
419+         resultOptional.map(sources.Not )
420+ 
421+       case  other => 
422+         if  (createFilter(other).isDefined) {
423+           Some (other)
424+         } else  {
425+           None 
426+         }
389427    }
390428  }
391429
392430  /**  
393431   * Converts data sources filters to Parquet filter predicates. 
394432   */  
395-   def  createFilter (schema : MessageType , predicate : sources.Filter ):  Option [FilterPredicate ] =  {
396-     val  nameToParquetField  =  getFieldMap(schema)
397-     createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts =  true )
433+   def  createFilter (predicate : sources.Filter ):  Option [FilterPredicate ] =  {
434+     createFilterHelper(predicate, canPartialPushDownConjuncts =  true )
435+   }
436+ 
437+   //  Parquet's type in the given file should be matched to the value's type
438+   //  in the pushed filter in order to push down the filter to Parquet.
439+   private  def  valueCanMakeFilterOn (name : String , value : Any ):  Boolean  =  {
440+     value ==  null  ||  (nameToParquetField(name).fieldType match  {
441+       case  ParquetBooleanType  =>  value.isInstanceOf [JBoolean ]
442+       case  ParquetByteType  |  ParquetShortType  |  ParquetIntegerType  =>  value.isInstanceOf [Number ]
443+       case  ParquetLongType  =>  value.isInstanceOf [JLong ]
444+       case  ParquetFloatType  =>  value.isInstanceOf [JFloat ]
445+       case  ParquetDoubleType  =>  value.isInstanceOf [JDouble ]
446+       case  ParquetStringType  =>  value.isInstanceOf [String ]
447+       case  ParquetBinaryType  =>  value.isInstanceOf [Array [Byte ]]
448+       case  ParquetDateType  =>  value.isInstanceOf [Date ]
449+       case  ParquetTimestampMicrosType  |  ParquetTimestampMillisType  => 
450+         value.isInstanceOf [Timestamp ]
451+       case  ParquetSchemaType (DECIMAL , INT32 , _, decimalMeta) => 
452+         isDecimalMatched(value, decimalMeta)
453+       case  ParquetSchemaType (DECIMAL , INT64 , _, decimalMeta) => 
454+         isDecimalMatched(value, decimalMeta)
455+       case  ParquetSchemaType (DECIMAL , FIXED_LEN_BYTE_ARRAY , _, decimalMeta) => 
456+         isDecimalMatched(value, decimalMeta)
457+       case  _ =>  false 
458+     })
459+   }
460+ 
461+   //  Decimal type must make sure that filter value's scale matched the file.
462+   //  If doesn't matched, which would cause data corruption.
463+   private  def  isDecimalMatched (value : Any , decimalMeta : DecimalMetadata ):  Boolean  =  value match  {
464+     case  decimal : JBigDecimal  => 
465+       decimal.scale ==  decimalMeta.getScale
466+     case  _ =>  false 
467+   }
468+ 
469+   //  Parquet does not allow dots in the column name because dots are used as a column path
470+   //  delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
471+   //  with missing columns. The incorrect results could be got from Parquet when we push down
472+   //  filters for the column having dots in the names. Thus, we do not push down such filters.
473+   //  See SPARK-20364.
474+   private  def  canMakeFilterOn (name : String , value : Any ):  Boolean  =  {
475+     nameToParquetField.contains(name) &&  ! name.contains(" ." &&  valueCanMakeFilterOn(name, value)
398476  }
399477
400478  /**  
401-    * @param  nameToParquetField  a map from the field name to its field name and data type. 
402-    *                           This only includes the root fields whose types are primitive types. 
403479   * @param  predicate  the input filter predicates. Not all the predicates can be pushed down. 
404480   * @param  canPartialPushDownConjuncts  whether a subset of conjuncts of predicates can be pushed 
405481   *                                    down safely. Pushing ONLY one side of AND down is safe to 
406482   *                                    do at the top level or none of its ancestors is NOT and OR. 
407483   * @return  the Parquet-native filter predicates that are eligible for pushdown. 
408484   */  
409485  private  def  createFilterHelper (
410-       nameToParquetField : Map [String , ParquetField ],
411486      predicate : sources.Filter ,
412487      canPartialPushDownConjuncts : Boolean ):  Option [FilterPredicate ] =  {
413-     //  Decimal type must make sure that filter value's scale matched the file.
414-     //  If doesn't matched, which would cause data corruption.
415-     def  isDecimalMatched (value : Any , decimalMeta : DecimalMetadata ):  Boolean  =  value match  {
416-       case  decimal : JBigDecimal  => 
417-         decimal.scale ==  decimalMeta.getScale
418-       case  _ =>  false 
419-     }
420- 
421-     //  Parquet's type in the given file should be matched to the value's type
422-     //  in the pushed filter in order to push down the filter to Parquet.
423-     def  valueCanMakeFilterOn (name : String , value : Any ):  Boolean  =  {
424-       value ==  null  ||  (nameToParquetField(name).fieldType match  {
425-         case  ParquetBooleanType  =>  value.isInstanceOf [JBoolean ]
426-         case  ParquetByteType  |  ParquetShortType  |  ParquetIntegerType  =>  value.isInstanceOf [Number ]
427-         case  ParquetLongType  =>  value.isInstanceOf [JLong ]
428-         case  ParquetFloatType  =>  value.isInstanceOf [JFloat ]
429-         case  ParquetDoubleType  =>  value.isInstanceOf [JDouble ]
430-         case  ParquetStringType  =>  value.isInstanceOf [String ]
431-         case  ParquetBinaryType  =>  value.isInstanceOf [Array [Byte ]]
432-         case  ParquetDateType  =>  value.isInstanceOf [Date ]
433-         case  ParquetTimestampMicrosType  |  ParquetTimestampMillisType  => 
434-           value.isInstanceOf [Timestamp ]
435-         case  ParquetSchemaType (DECIMAL , INT32 , _, decimalMeta) => 
436-           isDecimalMatched(value, decimalMeta)
437-         case  ParquetSchemaType (DECIMAL , INT64 , _, decimalMeta) => 
438-           isDecimalMatched(value, decimalMeta)
439-         case  ParquetSchemaType (DECIMAL , FIXED_LEN_BYTE_ARRAY , _, decimalMeta) => 
440-           isDecimalMatched(value, decimalMeta)
441-         case  _ =>  false 
442-       })
443-     }
444- 
445-     //  Parquet does not allow dots in the column name because dots are used as a column path
446-     //  delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
447-     //  with missing columns. The incorrect results could be got from Parquet when we push down
448-     //  filters for the column having dots in the names. Thus, we do not push down such filters.
449-     //  See SPARK-20364.
450-     def  canMakeFilterOn (name : String , value : Any ):  Boolean  =  {
451-       nameToParquetField.contains(name) &&  ! name.contains(" ." &&  valueCanMakeFilterOn(name, value)
452-     }
453- 
454488    //  NOTE:
455489    // 
456490    //  For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -515,9 +549,9 @@ private[parquet] class ParquetFilters(
515549        //  AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
516550        //  can be safely removed.
517551        val  lhsFilterOption  = 
518-           createFilterHelper(nameToParquetField,  lhs, canPartialPushDownConjuncts)
552+           createFilterHelper(lhs, canPartialPushDownConjuncts)
519553        val  rhsFilterOption  = 
520-           createFilterHelper(nameToParquetField,  rhs, canPartialPushDownConjuncts)
554+           createFilterHelper(rhs, canPartialPushDownConjuncts)
521555
522556        (lhsFilterOption, rhsFilterOption) match  {
523557          case  (Some (lhsFilter), Some (rhsFilter)) =>  Some (FilterApi .and(lhsFilter, rhsFilter))
@@ -539,14 +573,12 @@ private[parquet] class ParquetFilters(
539573        //  (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
540574        //  As per the logical in And predicate, we can push down (a1 OR b1).
541575        for  {
542-           lhsFilter <- 
543-             createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
544-           rhsFilter <- 
545-             createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
576+           lhsFilter <-  createFilterHelper(lhs, canPartialPushDownConjuncts)
577+           rhsFilter <-  createFilterHelper(rhs, canPartialPushDownConjuncts)
546578        } yield  FilterApi .or(lhsFilter, rhsFilter)
547579
548580      case  sources.Not (pred) => 
549-         createFilterHelper(nameToParquetField,  pred, canPartialPushDownConjuncts =  false )
581+         createFilterHelper(pred, canPartialPushDownConjuncts =  false )
550582          .map(FilterApi .not)
551583
552584      case  sources.In (name, values) if  canMakeFilterOn(name, values.head)
0 commit comments