-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-36879][SQL] Support Parquet v2 data page encoding (DELTA_BINARY_PACKED) for the vectorized path #34471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
fc9683b to
10659d3
Compare
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #144861 has finished for PR 34471 at commit
|
|
Kubernetes integration test status failure |
|
Test build #144864 has finished for PR 34471 at commit
|
|
Jenkins, retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144880 has finished for PR 34471 at commit
|
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @parthchandra for working on this! I left some comments.
.gitignore
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unrelated change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 with @sunchao 's comment. Please remove this from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem to be used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But not for long (there's a horrible pun in here somewhere).
I need this for the vectorized implemenatation of DeltaByteArrayReader (which I did not include to make review easier).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case can we put this together with the follow-up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just knew you would say that :). Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add some comments for this? what are c, rowId and val for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems it's better to have an abstract class inheriting ValuesReader and VectorizedValuesReader with this default behavior defined, rather than repeating the same thing in all the different value readers.
this can be done separately though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return 0 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why indeed. (Intellij generated code for unimplemented methods)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need to implement these too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh dear. I implemented only the methods the original PR had implemented. On closer look we also need support for byte, short, date, timestamp, yearmonth interval, and daytime interval datatypes which are stored as int32 or int64.
Perf note: Rebased dates and timestamps appear to be a backward compatibility fix and incur the penalty of checking if the value needs to be rebased.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be done more efficiently, for instance we don't need to unpack the bits anymore, and don't need to compute the original value from delta, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do. The original unit tests have interleaving read and skip. To continue to read after a skip, we need to have read the previous value.
.../java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
Outdated
Show resolved
Hide resolved
|
cc @sadikovi @viirya @dongjoon-hyun too |
|
Thank you for the review @sunchao! Let me address the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use two-space indentation like the other part of this file, @parthchandra ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import order is a little strange. Could you grouping java import (line 30 and 19) together as the first group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Let's remove redundant empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make these two lines into a single liner.
- readValues(total, null, -1, (w, r, v) -> {
- });
+ readValues(total, null, -1, (w, r, v) -> {});There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you put this at the beginning before int remaining = total;?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Looks like it may take some time to address some of the review comments. Marking this PR as draft in the meantime. |
3572eaa to
6021d90
Compare
|
Kubernetes integration test starting |
6021d90 to
a88c721
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for updating this @parthchandra ! Overall look pretty good. I think we just need to address the issue with benchmark and attach the result together with the PR. You can find out how to get benchmark result using GitHub workflow here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe revise this message a bit, since "total value count is + valuesRead" looks a bit confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work yet because of the BooleanType added recently.
Error:
[error] Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'sum(parquetv2table.id)' due to data type mismatch: function sum requires numeric or interval types, not boolean; line 1 pos 7;
[error] 'Aggregate [unresolvedalias(sum(id#40), None)]
[error] +- SubqueryAlias parquetv2table
[error] +- View (`parquetV2Table`, [id#40])
[error] +- Relation [id#40] parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these seem redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh. Rebase issue. Fixed.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146200 has finished for PR 34471 at commit
|
|
@parthchandra could you address the unit tests failure? |
|
The unit tests are failing in parts that I am not familiar with. Previously, re-running the tests had worked, but this time around the tests are failing every time. Can I get some help figuring out where the problem is? |
|
If you click the SparkQA test build link you should see the failed tests. For instance: |
|
Thank you @sunchao. I had gone thru the log and failed to see the test(s) that had failed. One of the unit tests was checking for the error message that accompanied an exception and as part of the review I had changed the error message! Updated the test. The tests should pass now. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146491 has finished for PR 34471 at commit
|
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
… 2.x's Vectorized Reader
…nce Booleans/RLE is implemented)
64cc82f to
6bb21f0
Compare
|
Committed to master branch, thanks @parthchandra ! |
| saveAsCsvTable(testDf, dir.getCanonicalPath + "/csv") | ||
| saveAsJsonTable(testDf, dir.getCanonicalPath + "/json") | ||
| saveAsParquetTable(testDf, dir.getCanonicalPath + "/parquet") | ||
| saveAsParquetV2Table(testDf, dir.getCanonicalPath + "/parquetV2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should update the benchmark-result of @parthchandraDataSourceReadBenchmark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that there are still unsupported encoding in Data Page V2, such as RLE for Boolean. It seems that it is not time to update the benchmark, please ignore my previous comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LuciferYang I was getting ready to set up a PR for the RLE/Boolean encoding and noticed that you have done so. Thank you!
Adding back the benchmark in a new PR.
| private ByteBufferInputStream in; | ||
|
|
||
| // temporary buffers used by readByte, readShort, readInteger, and readLong | ||
| byte byteVal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these 4 field be private?
| withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { | ||
| val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain") | ||
| .select($"dict".cast(catalystType), $"plain".cast(catalystType)) | ||
| withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Seq.tabulate(N)(_ => Row(nonRebased))) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong indentation too

What changes were proposed in this pull request?
Implements a vectorized version of the parquet reader for DELTA_BINARY_PACKED encoding
This PR includes a previous PR for this issue which passed the read request thru to the parquet implementation and which was not vectorized. The current PR builds on top of that PR (hence both are included).
Why are the changes needed?
Currently Spark throws an exception when reading data with these encodings if vectorized reader is enabled
Does this PR introduce any user-facing change?
No
How was this patch tested?
Additional unit tests for the encoding for both long and integer types (mirroring the unit tests in the Parquet implementation)