-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-28188] Materialize Dataframe API #24991
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I don't think we should add this. It's already very common to |
|
@srowen I think the point here is to provide an explicit way of materializing data so that it can be reused in later stages as opposed to relying on side-effects of operations that produce the same result. This is a really common issue, so having a clear expression of the intent really helps to disambiguate the logic. |
| */ | ||
| def cache(): this.type = persist() | ||
|
|
||
| def materialize(): RDD[T] = withScope{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the PR title. This is RDD API, not a Dataframe API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR also includes Dataset.materialize, which calls this RDD version.
|
I agree with @srowen , but since this is a legitimate community request, ping @mateiz , @rxin , @gatorsmile , @cloud-fan , too. |
|
Did I miss something? How does "runJob" materialize the query plan / rdd? |
|
@rxin, this runs the query up to the point where @srowen, I've seen this suggested on the dev list a few times and I think it is a good idea to add it. There is not guarantee that |
|
Got it. But the name is plain wrong because the function doesn't materialize the DataFrame. As a matter of fact, if you run this "materialize" on a query plan without a shuffle this becomes a job that's completely useless that waste a lot of resources. I've seen a different use case myself: I want to measure the execution time of a query plan, without materializing the data or invoking any I/O, or any overhead. What I ended up doing was implementing a simple data sink that doesn't write anything. Looks like that can be used here as well? |
|
+1 for the no-op sink, I think it should be the same as the no-op |
|
@rxin Yes, materializing on a query plan without a shuffle just wastes resources. We usually recommend repartitioning the dataframe before invoking materialize to add a shuffle. I think this can be used for your use case of measuring execution time of a query plan. What name would you suggest for this api? |
|
Something that's like write to a no-op sink ... |
|
Updated the PR with the change in name. |
|
It's simple to write to noop sink: |
I have to agree with this - I've seen Maybe this isn't the API for it, and that's ok, let's improve it then and make good suggestion to the community/contributor etc. I'm not sure |
|
I think this should be an action, not a sink. A no-op sink is just another way to misuse existing APIs for a different purpose. And worse, a noop sink doesn't actually accomplish the goal. This call returns a dataframe that will reuse the data stored in shuffle servers. A noop sink would not work for dataframes because you have to get Spark to re-use the same underlying RDD that has been run. |
|
I've spent more time understanding the use case, and think table cache should be a better choice here
In addition, the table cache has more advantages:
You do have a point that table cache is lazy, but we can add a flag to control it. e.g. |
|
@cloud-fan, I wasn't aware of #24499. Serving cache blocks from the shuffle service sounds like it could be a good alternative solution. I like that it would serve whole blocks instead of shards. We would definitely need a way to eagerly cache. |
|
Can one of the admins verify this patch? |
|
Hi, All. |
What changes were proposed in this pull request?
Add materialize API to dataframes. Materialize is a Spark action. It is a way to let Spark explicitly know that the dataframe has already been computed. Once a dataframe is materialized, Spark skips all stages prior to the materialize when the dataframe is reused later on.
Please refer to SPARK-28188 for the rationale behind adding the materialize API
How was this patch tested?
Tested manually