Skip to content

Conversation

@patrick-nicholson
Copy link

What changes were proposed in this pull request?

In my experience, pushing pandas.DataFrames to pyspark.DataFrames will very quickly run up against size issues. These can usually be remedied by changing configuration parameters (e.g., spark.rpc.message.maxSize), but it is much more convenient to change the level of parallelization used during RDD creation. This option is available in sparkContext.broadcast. This pull request exposes it to sparkSession.createDataFrame.

How was this patch tested?

I have been using a patch implementing this change for a while. I'm only exposing a keyword argument used by an underlying function to the user.

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76698 has finished for PR 17926 at commit c9a6348.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76700 has finished for PR 17926 at commit 6ef9fdd.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76701 has finished for PR 17926 at commit 4a9d58d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

It seems adding a functionality and not a trivial fix. I think we need a JIRA.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a rather niche case and we can workaround by parallelizing outside:

>>> df = spark.createDataFrame(spark.sparkContext.parallelize([[1],[2],[3],[4],[5]], numSlices=5))
>>> df.rdd.getNumPartitions()
5

Also, this looks only applying when the data is not RDD. I think this is confusing if a user sets this and this option is not working in some cases unless the user reads the documentation.

@srowen
Copy link
Member

srowen commented May 10, 2017

Does this cause any incompatibility with existing code?

@HyukjinKwon
Copy link
Member

I don't think so (this is Python ... ) for both positional and keyword arguments. (If the new numSlices is added in the middle of the arguments it will break for positional arguments but this one adds it at the last).

@patrick-nicholson
Copy link
Author

patrick-nicholson commented May 10, 2017

It seems adding a functionality and not a trivial fix. I think we need a JIRA.

It's up to you. All I'm doing is passing a keyword argument from one preexisting public method to another. I don't view that as adding functionality, but I am not the arbiter of such things.

I think this is a rather niche case and we can workaround by parallelizing outside.

It has been a rather common case for me since I'm often working with pandas.DataFrames of millions of rows with many columns of mixed types (where any numeric types are implicitly numpy types, rather than base). It can be worked outside by manually performing the steps inside of createDataFrame:

df = spark.createDataFrame(spark.sparkContext.parallelize([r.tolist() for r in pandas_df.to_records(index=False)], numSlices=5)), schema=[str(_) for _ in pandas_df.columns])

Again, I don't see the proposed change as adding any functionality, just exposing machinery already in place for distributing Python data to an RDD in a consistent way for convenience.

Also, this looks only applying when the data is not RDD. I think this is confusing if a user sets this and this option is not working in some cases unless the user reads the documentation.

Given that RDD and local data are necessarily different and that createDataFrame already has separate code paths for RDD and local Python data, I don't know how this can be avoided.

@HyukjinKwon
Copy link
Member

No, it is not virtually the same before/after (and also we need a regression test). So, it needs a JIRA - see http://spark.apache.org/contributing.html. Adding an parameter to createDataFrame is to add a functionality to createDataFrame that does not exist before in this API.

As you said, this can be done in a single line like that, you could just make a wrapper function for it in application side in few lines.

def createDataFrame(data, numSlices, **kwargs):
    return spark.createDataFrame(
        spark.sparkContext.parallelize(data, numSlices=numSlices), **kwargs)

I am not sure if it is worth adding this parameter. It looks there is a potential confusion to users and workaround looks so easy.

@gatorsmile
Copy link
Member

How about adding this workaround to the function description of createDataFrame now? In the future, we can change the interface if more people needs this?

Thanks!

@felixcheung
Copy link
Member

FYI we added numPartitions in R - but that's primarily because we don't have sc.parallelize
https://github.com/apache/spark/blob/master/R/pkg/R/SQLContext.R#L190

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants