- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28.9k
[SPARK-27418][SQL] Migrate Parquet to File Data Source V2 #24327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Test build #104442 has finished for PR 24327 at commit  
 | 
| Test build #104503 has finished for PR 24327 at commit  
 | 
336ac92    to
    99b8575      
    Compare
  
    | Test build #104550 has finished for PR 24327 at commit  
 | 
99b8575    to
    cf6837c      
    Compare
  
    | Test build #104662 has finished for PR 24327 at commit  
 | 
cf6837c    to
    ab70c37      
    Compare
  
            
          
                sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | Test build #104816 has finished for PR 24327 at commit  
 | 
| Test build #104823 has finished for PR 24327 at commit  
 | 
b3b04b0    to
    138344e      
    Compare
  
    | Test build #104841 has finished for PR 24327 at commit  
 | 
| This is ready for review.  @cloud-fan @HyukjinKwon @dongjoon-hyun | 
        
          
                sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...c/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | Test build #104948 has finished for PR 24327 at commit  
 | 
        
          
                sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      30d88cb    to
    c6e602f      
    Compare
  
    | Test build #105005 has finished for PR 24327 at commit  
 | 
        
          
                ...e/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | Test build #105040 has finished for PR 24327 at commit  
 | 
| Test build #105042 has finished for PR 24327 at commit  
 | 
| The test case "org.apache.spark.sql.streaming.FileStreamSinkSuite.writing with aggregation" becomes flaky with this PR. | 
| retest this please. | 
| } | ||
|  | ||
| private def createRowBaseReader(file: PartitionedFile): ParquetRecordReader[UnsafeRow] = { | ||
| buildReaderBase(file, createRowBaseReader0).asInstanceOf[ParquetRecordReader[UnsafeRow]] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildReaderBase is parameterized, but the result is still casted. Why not parameterize so that ti retursn ParquetRecordReader[UnsafeRow] to avoid the cast? I think these casts should be removed.
|  | ||
| // The actual filter push down happens in [[ParquetPartitionReaderFactory]]. | ||
| // It requires the Parquet physical schema to determine whether a filter is convertible. | ||
| // So here we simply mark that all the filters are pushed down. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment isn't correct. All filters that can be converted to Parquet are pushed down.
| paths: Seq[String], | ||
| userSpecifiedSchema: Option[StructType], | ||
| fallbackFileFormat: Class[_ <: FileFormat]) | ||
| extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this will also hit SPARK-27960. I think this is okay for now. No need to block Parquet to fix it.
However, it would be good to follow up with a suite of SQL tests for each v2 implementation that validates overall behavior, like reporting the metastore schema after a table is created.
|  | ||
| val committerClass = | ||
| conf.getClass( | ||
| SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does v2 also use Parquet _metadata files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is disabled by default
| // Sets compression scheme | ||
| conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) | ||
|  | ||
| // SPARK-15719: Disables writing Parquet summary files by default. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they are disabled by default in v1, why allow writing them in v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the behavior in V1 and V2 are the same: by default set "parquet.summary.metadata.level" as  "NONE" and don't write the summary file. If the conf "parquet.summary.metadata.level" is set by user and spark.sql.parquet.output.committer.class is set correctly, then it will write the summary file.
See: https://issues.apache.org/jira/browse/SPARK-15719
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should v2 support deprecated metadata files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is consistent with V1 here.
The value of parquet.summary.metadata.level is ALL by default. As per SPARK-15719, we should set it as NONE by default in Spark.
If users set the conf parquet.summary.metadata.level as ALL or COMMON_ONLY explicitly, Spark should write metadata files.
| Test build #106277 has finished for PR 24327 at commit  
 | 
| @rdblue Thanks for the review. I have addressed all your comments. Any other concerns? | 
| retest this please. | 
| Test build #106380 has finished for PR 24327 at commit  
 | 
| retest this please. | 
| Test build #106391 has finished for PR 24327 at commit  
 | 
| @dongjoon-hyun Would you help do a final review and merge this one? Thanks! | 
|  | ||
| private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { | ||
| val vectorizedReader = | ||
| buildReaderBase(file, createVectorizedReader0).asInstanceOf[VectorizedParquetRecordReader] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang, why is this cast here? I expected it to be removed when the one in createRowBaseReader was removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because here we need to call the method initBatch and enableReturningBatches of VectorizedParquetRecordReader. We can't just change the returned type as RecordReader[Void, Object] here.
| vectorizedReader | ||
| } | ||
|  | ||
| private def createVectorizedReader0( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no biggie but I'd name it to createParquetVectorizedReader
| sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) | ||
| } | ||
|  | ||
| lazy val _pushedFilters = { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal here too but I'd name it to pushedParquetFilters
| new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema) | ||
| val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, | ||
| pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) | ||
| parquetFilters.convertibleFilters(this.filters).toArray | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I missed some context. What's diff between ParquetFilters.convertibleFilters and ParquetFilters.createFilters? Seems like logic is duplicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetFilters.convertibleFilters returns Seq[org.apache.spark.sql.sources.Filter] 
ParquetFilters.createFilters returns org.apache.parquet.filter2.predicate.FilterPredicate
The overlap of the two methods is only on the And/Or/Not operator.
| Test build #106522 has finished for PR 24327 at commit  
 | 
| Merged to master. For #24327 (comment), I think it's confusing. Let me take a look and see if I can make it simpler separately. | 
| @dongjoon-hyun, @rdblue, @cloud-fan, let me know if there are any major comments to address that I missed. If that's not easily fixable, I don't mind reverting it as well. | 
| @dongjoon-hyun @rdblue @cloud-fan @mallman @HyukjinKwon @gatorsmile @jaceklaskowski Thanks for the review! | 
| assertDF(df) | ||
| // TODO: fix file source V2 as well. | ||
| withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { | ||
| val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this related to parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[info]   Decoded objects do not match expected objects:
[info]   expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
[info]   actual:   WrappedArray(9, 0, 10, 1, 2, 8, 3, 6, 7, 5, 4)
[info]   assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"))
[info]   +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long")
[info]      +- getcolumnbyordinal(0, LongType) (QueryTest.scala:70)
We need to fix the read path for steaming output.
What changes were proposed in this pull request?
Migrate Parquet to File Data Source V2
How was this patch tested?
Unit test