Skip to content

Conversation

@vgankidi
Copy link

@vgankidi vgankidi commented Jun 27, 2019

What changes were proposed in this pull request?

Add materialize API to dataframes. Materialize is a Spark action. It is a way to let Spark explicitly know that the dataframe has already been computed. Once a dataframe is materialized, Spark skips all stages prior to the materialize when the dataframe is reused later on.
Please refer to SPARK-28188 for the rationale behind adding the materialize API

How was this patch tested?

Tested manually

@srowen
Copy link
Member

srowen commented Jun 30, 2019

I don't think we should add this. It's already very common to .count() or .mapPartitions with a no-op to do this. I do think there are use cases for proactively materializing, but, it's overused too.

@danielcweeks
Copy link

@srowen I think the point here is to provide an explicit way of materializing data so that it can be reused in later stages as opposed to relying on side-effects of operations that produce the same result.

This is a really common issue, so having a clear expression of the intent really helps to disambiguate the logic.

*/
def cache(): this.type = persist()

def materialize(): RDD[T] = withScope{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the PR title. This is RDD API, not a Dataframe API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also includes Dataset.materialize, which calls this RDD version.

@dongjoon-hyun
Copy link
Member

I agree with @srowen , but since this is a legitimate community request, ping @mateiz , @rxin , @gatorsmile , @cloud-fan , too.

@dongjoon-hyun dongjoon-hyun changed the title SPARK-28188 Materialize Dataframe API [SPARK-28188] Materialize Dataframe API Jul 3, 2019
@rxin
Copy link
Contributor

rxin commented Jul 3, 2019

Did I miss something? How does "runJob" materialize the query plan / rdd?

@rdblue
Copy link
Contributor

rdblue commented Jul 3, 2019

@rxin, this runs the query up to the point where materialize is called. The underlying RDD can then pick up from the last shuffle the next time it is used. This works better than caching in most cases when using dynamic allocation because executors are not sitting idle, but work can be resumed and shared across queries. We could rename the method if that would be more clear.

@srowen, I've seen this suggested on the dev list a few times and I think it is a good idea to add it. There is not guarantee that count does the same thing -- it could be optimized -- and it is a little tricky to get this to work with the dataset API. This version creates a new DataFrame from the underlying RDD so that the work is reused from the last shuffle, instead of allowing the planner to re-optimize with later changes (usually projections) and discard the intermediate result. We have found this really useful for better control over the planner, as well as to cache data using the shuffle system.

@rxin
Copy link
Contributor

rxin commented Jul 4, 2019

Got it. But the name is plain wrong because the function doesn't materialize the DataFrame. As a matter of fact, if you run this "materialize" on a query plan without a shuffle this becomes a job that's completely useless that waste a lot of resources.

I've seen a different use case myself: I want to measure the execution time of a query plan, without materializing the data or invoking any I/O, or any overhead. What I ended up doing was implementing a simple data sink that doesn't write anything.

Looks like that can be used here as well?

@cloud-fan
Copy link
Contributor

+1 for the no-op sink, I think it should be the same as the no-op runJob here.

@vgankidi
Copy link
Author

vgankidi commented Jul 4, 2019

@rxin Yes, materializing on a query plan without a shuffle just wastes resources. We usually recommend repartitioning the dataframe before invoking materialize to add a shuffle. I think this can be used for your use case of measuring execution time of a query plan. What name would you suggest for this api?

@rxin
Copy link
Contributor

rxin commented Jul 4, 2019

Something that's like write to a no-op sink ...

@vgankidi
Copy link
Author

Updated the PR with the change in name.

@cloud-fan
Copy link
Contributor

It's simple to write to noop sink: df.write.format("noop").save, why do we need this extra public API?

@felixcheung
Copy link
Member

felixcheung commented Jul 15, 2019

@rxin, this runs the query up to the point where materialize is called. The underlying RDD can then pick up from the last shuffle the next time it is used. This works better than caching in most cases when using dynamic allocation because executors are not sitting idle, but work can be resumed and shared across queries. We could rename the method if that would be more clear.

@srowen, I've seen this suggested on the dev list a few times and I think it is a good idea to add it. There is not guarantee that count does the same thing -- it could be optimized -- and it is a little tricky to get this to work with the dataset API. This version creates a new DataFrame from the underlying RDD so that the work is reused from the last shuffle, instead of allowing the planner to re-optimize with later changes (usually projections) and discard the intermediate result. We have found this really useful for better control over the planner, as well as to cache data using the shuffle system.

I have to agree with this - I've seen count() or cache() mis-used too many times and too many times people need to go back to clean up and remove all calls to count(). So much so I'm planning to write an optimizer rule to remove them. I'm only partly kidding.

Maybe this isn't the API for it, and that's ok, let's improve it then and make good suggestion to the community/contributor etc.

I'm not sure df.write.format("noop").save is a good suggestion to general spark user.

@rdblue
Copy link
Contributor

rdblue commented Jul 15, 2019

I think this should be an action, not a sink. A no-op sink is just another way to misuse existing APIs for a different purpose. And worse, a noop sink doesn't actually accomplish the goal. This call returns a dataframe that will reuse the data stored in shuffle servers. A noop sink would not work for dataframes because you have to get Spark to re-use the same underlying RDD that has been run.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 16, 2019

I've spent more time understanding the use case, and think table cache should be a better choice here

  1. disk vs memory: you can set the storage level to disk-only with more than one copy, which is more reliable than shuffle files.
  2. shuffle service: it supports RDD blocks as well thanks to [SPARK-27677][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation #24499

In addition, the table cache has more advantages:

  1. It can work for any dataframes, even without shuffles
  2. other queries can benefit from table cache automatically

You do have a point that table cache is lazy, but we can add a flag to control it. e.g. def cache(eager: Boolean = false)

@rdblue
Copy link
Contributor

rdblue commented Jul 16, 2019

@cloud-fan, I wasn't aware of #24499. Serving cache blocks from the shuffle service sounds like it could be a good alternative solution. I like that it would serve whole blocks instead of shards. We would definitely need a way to eagerly cache.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@dongjoon-hyun
Copy link
Member

Hi, All.
Given the previous discussion and long inactivity, I'll close this PR.
@vgankidi . Feel free to reopen this if you have another opinion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants