Skip to content

Conversation

@kanzhang
Copy link
Contributor

@kanzhang kanzhang commented Jul 9, 2014

JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024

This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats.

  • Added RDD methods saveAsSequenceFile, saveAsHadoopFile and saveAsHadoopDataset, for both old and new MapReduce APIs.
  • Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types.
  • No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to Object[] and they get pickled to Python tuples.
  • Added HBase and Cassandra output examples to show how custom output formats and converters can be used.

cc @MLnick @mateiz @ahirreddy @pwendell

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16446/

@kanzhang kanzhang changed the title [SPARK-2024] Add saveAsSequenceFile and saveAsHadoopFile to PySpark [SPARK-2024] Add saveAsSequenceFile to PySpark Jul 9, 2014
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16468/

@davies
Copy link
Contributor

davies commented Jul 11, 2014

LGTM, awesome!

@mateiz
Copy link
Contributor

mateiz commented Jul 12, 2014

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jul 12, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16588/consoleFull

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this work before and get removed now, or was it a mistake in the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz we don't handle arrays currently and this is also the case for Scala API. The reason is ArrayWritable class doesn't have a no-arg constructor for creating an empty instance upon reading. User needs to create subtypes. Although we could add subtypes for handling primitive arrays, that makes Spark a dependency for users, which we probably don't want to do.

For conversion between arrays and ArrayWritable subtypes, when reading we can convert automatically as long as the subtype is on the class path. However, when writing we can't convert arrays to ArrayWritable subtypes automatically since we don't know which subtype to use. User needs to specify custom converters.

We should look into ArrayPrimitiveWritable, which is not available in Hadoop v1.0.4.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, it looks like in Scala we can write them but not read them. It's probably fine to remove them from the table then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't write arrays in Scala either (the implicit conversion from Array to ArrayWritable is marked private). Otherwise, it can be awkward as we can't read it back since ArrayWritable doesn't have a no-arg constructor. For user supplied ArrayWritable subtypes, we can read them, it's just they won't be implicitly converted. Essentially the same support as we have in Python.

@mateiz
Copy link
Contributor

mateiz commented Jul 12, 2014

This looks awesome, thanks for putting it together! One comment I have though is that we should add more test coverage, to make sure we cover all the data types supported. Instead of doing this in doc comments, which gets unwieldy, you can do it in python/pyspark/tests.py, which is a standalone test file. Just make sure we have tests that cover each supported data type in sequence files.

@MLnick you should look at this too when you have a chance.

@MLnick
Copy link
Contributor

MLnick commented Jul 12, 2014

I have had a quick look over and will try to do a more detailed one this weekend.

High level looks good, 2 comments so far:

  1. Agree with Matei that I think the tests should live in tests.py as opposed to docstrings, and add tests for other datatypes in a similar manner to the input format tests
  2. Would be great to add a couple of examples of using the custom Converter in reverse for output. Again, a Cassandra and HBase example in similar vein to the input format examples would be valuable I think

Will provide any more feedback as I go through it in more detail.

(btw thanks for fixing up the ArrayWritable stuff too).

@SparkQA
Copy link

SparkQA commented Jul 12, 2014

QA results for PR 1338:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
dict of public properties (via JavaBean getters and setters) class for the class type

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16588/consoleFull

@kanzhang
Copy link
Contributor Author

@davies @mateiz @MLnick thanks for the review and suggestions. I'll try to add standalone tests for every data type.

@MLnick
Copy link
Contributor

MLnick commented Jul 16, 2014

Great - I will review in more detail after that. Would be great to get this
merged before 1.1 freeze so PySpark I/O for inputformat and outputformat is
in for the next release!

On Tue, Jul 15, 2014 at 1:07 AM, kanzhang [email protected] wrote:

@MLnick https://github.com/MLnick I'll see if I can add couple output
converter examples as well. Thx.


Reply to this email directly or view it on GitHub
#1338 (comment).

@kanzhang
Copy link
Contributor Author

@MLnick I'm thinking of removing the tests and programming guide entry for custom classes (JavaBeans). It seems to be a feature of Pyrolite and I can't think of any obvious use of it in the context of RDDs. For example, Pyrolite maps a JavaBean to a dict of its attributes in Python, but one can't go reverse. Listing it as a supported data type may add confusion to users. Thoughts?

@mateiz
Copy link
Contributor

mateiz commented Jul 21, 2014

Regarding the JavaBeans, is there a reason to believe Pyrolite won't support them in the future? Or are you just suggesting to remove it because we can't also save data? That would be a bit of a regression for the reading side, though maybe InputFormats that return JavaBeans are not that common.

@MLnick
Copy link
Contributor

MLnick commented Jul 22, 2014

@kanzhang @mateiz Yeah this is one issue with Pyrolite vs MsgPack. MsgPack supported case classes out the box, which would likely be a bit more common that beans.

I'd say that custom serde via Converter will be far more common (as we've already seen with various Avro commentary etc).

Thinking about it some more, I would be ok to remove from the docs. This would still be available as undocumented functionality so if relevant use cases did come up on the mailing list, we could point to it and in the unlikely case that there was demand we could simply document it as read-only functionality.

Bearing in mind this is also still marked experimental and we'll need to see how users use it in the wild a bit and make any amendments as required.

@kanzhang
Copy link
Contributor Author

@MLnick I merely removed it from programming guide. The functionality (and your test) is still there should anyone wants to try it.

@mateiz I meant when reading JavaBeans, you get a dict of attributes to values on the Python side. But you can't turn around and save it as JavaBeans from Python. What you save is a Java Map since that's what Pyrolite will pickle a dict to. I was trying to confirm the same asymmetry on the saving side (i.e., saving a Python custom object as Java Map, and reads it back as a Python dict), but I got the following exception and gave up.

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33.0:2 failed 1 times, most recent failure: Exception failure in TID 70 on host localhost: net.razorvine.pickle.InvalidOpcodeException: opcode not implemented: OBJ
        net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:223)
        net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
        net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)

@SparkQA
Copy link

SparkQA commented Jul 22, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16994/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 22, 2014

QA results for PR 1338:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16994/consoleFull

@kanzhang
Copy link
Contributor Author

Major changes for the updated patch.

  1. Replaced doctests with standalone tests
  2. Fixed converter for converting BytesWritables and added read/write tests for BytesWritable and byte arrays
  3. Added HBase and Cassandra output format and converter examples
  4. I used to inspect array element types and try to convert Object[] to array of primitive types whenever possible (so that they get pickled to Python arrays, whereas Object[] gets pickled to Python tuples). But I removed that code, since I can't determine element types for empty arrays. Users have to supply custom converters if they want Java arrays to appear as Python arrays (if they know their array types a priori).
  5. No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon deserializing. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to Object[] and they get pickled to Python tuples.

@kanzhang
Copy link
Contributor Author

@pwendell I renamed file HBaseConverter.scala to HBaseConverters.scala. Now I failed Scala style checks. How can I fix it? Thx.

@mateiz
Copy link
Contributor

mateiz commented Jul 23, 2014

The style check error is different, see https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16994/consoleFull. It's a bit hidden in there but it says:

error file=/home/jenkins/workspace/SparkPullRequestBuilder@7/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala message=There should be a space after the plus (+) sign line=34 column=19

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the style checker seems to be complaining about this +, which is a mistake in the style checker. You can add a space after the + for now. But do we really need covariance here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(For better or worse, we don't really use it elsewhere in Spark)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz thanks, Matei. I saw it but I couldn't believe that was the reason :-). I added the + sign because some of our converters have more specific types like [Any, Writable] and the compiler complains when assigning them to where [Any, Any] is required. I don't have a strong preference here and could change them back to [Any, Any]. Let me know.

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17042/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1338:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17384/consoleFull

@kanzhang
Copy link
Contributor Author

Now I have got the following error, since saveAsHadoopFile has 11 params. Can relax it a bit?

Scalastyle checks failed at following occurrences:
error file=/home/jenkins/workspace/SparkPullRequestBuilder@3/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala message=The number of parameters should not exceed 10 line=627 column=6

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17386/consoleFull

@kanzhang
Copy link
Contributor Author

Nevermind. I'm refactoring.

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1338:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17386/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17389/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1338:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17389/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17419/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1338. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17424/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1338:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17419/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1338:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17424/consoleFull

@JoshRosen
Copy link
Contributor

I think we should remove the batchSerialized arguments from PythonRDD's saveAs* methods and add a batchSerialized field to PythonRDD's constructor, since it's an attribute of the RDD itself rather than an option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This batchSerialized-respecting unpickling logic should probably live in its own function so that it can also be used by pythonToJavaMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we defer this refactoring to when we update pythonToJavaMap, since I don't want to touch SchemaRDD code in this patch?

@kanzhang
Copy link
Contributor Author

I think we should remove the batchSerialized arguments from PythonRDD's saveAs* methods and add a batchSerialized field to PythonRDD's constructor, since it's an attribute of the RDD itself rather than an option.

Problem with that is currently PythonRDD objects are only created by PipelinedRDD, whereas in other cases (e.g., PythonRDD.readRDDFromFile and SchemaRDD.javaToPython), _jrdd (or JavaRDD[Array[Byte]]) are created directly without PythonRDD objects. I feel the change to use PythonRDD everywhere is too big for this patch. Maybe a followup JIRA?

@JoshRosen
Copy link
Contributor

Ah, I see. I don't mind deferring that refactoring to a later patch. I'll create some PySpark refactoring JIRAs later.

@JoshRosen
Copy link
Contributor

I've merged this. Thanks!

@kanzhang kanzhang changed the title [SPARK-2024] Add saveAsSequenceFile to PySpark [SPARK-2024] [PySpark] Add saveAsSequenceFile to PySpark Jul 30, 2014
@kanzhang kanzhang changed the title [SPARK-2024] [PySpark] Add saveAsSequenceFile to PySpark [SPARK-2024] Add saveAsSequenceFile to PySpark Jul 30, 2014
@asfgit asfgit closed this in 94d1f46 Jul 30, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024

This PR is a followup to apache#455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats.

* Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs.

* Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types.

* No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples.

* Added HBase and Cassandra output examples to show how custom output formats and converters can be used.

cc MLnick mateiz ahirreddy pwendell

Author: Kan Zhang <[email protected]>

Closes apache#1338 from kanzhang/SPARK-2024 and squashes the following commits:

c01e3ef [Kan Zhang] [SPARK-2024] code formatting
6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD
d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10
57a7a5e [Kan Zhang] [SPARK-2024] correcting typo
75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD
0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests
9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests
0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases
7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
@kanzhang kanzhang deleted the SPARK-2024 branch December 12, 2014 01:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants