-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-2010] Support for nested data in PySpark SQL #1041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15651/ |
|
@pwendell the failure doesn't seem to be related to this patch. Could we re-test? |
|
Jenkins, retest this please. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15660/ |
|
Once it passes Jenkins, LGTM |
|
I think you have to rebase against master, it's including extra commits that you probably don't want in jenkins: [SPARK-2010] Support for nested data in PySpark SQL (commit: 95689830ed50819bfcbaf6cf9697c2710d7b457a) (detail) https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15660/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think a key question here is if nested dictionaries are Maps or Structs. Right now the outermost dict is treated like a struct with named attributes, so I'm kind of inclined to continue that trend. Although, I think then we would want some way to create a map. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked too deeply, my initial understanding is that Structs are for user-defined Objects (if we are going to support them in Python), while MapType should suffice if we access dictionaries only as maps. Specifically,
- Struct stores its fields in order, which makes it usable as schema for the outermost dict, as we need to match schema with data stored in an array for each row. Speaking of ordering, how can we be sure when we map dictionary values to an array, they are all in the same order?
val rowRdd = rdd.mapPartitions { iter =>
iter.map { map =>
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
}
}
-
Structs can only be used for dicts whose the keys are Strings, right?
-
I don't know enough about access pattern to tell if it is safe to label Set as ArrayType since Set is unordered?
|
@ahirreddy thanks, rebased just now. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
python/pyspark/sql.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this line before the previous unit test block?
|
Merged build triggered. |
|
Merged build started. |
|
@rxin updated patch based on your review comments. Thx! |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
|
Thanks. I've merged this in master & branch-1.0. |
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010 This PR adds support for nested collection types in PySpark SQL, including array, dict, list, set, and tuple. Example, ``` >>> from array import array >>> from pyspark.sql import SQLContext >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([ ... {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}] True >>> rdd = sc.parallelize([ ... {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == \ ... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}] True ``` Author: Kan Zhang <[email protected]> Closes #1041 from kanzhang/SPARK-2010 and squashes the following commits: 1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO 504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL (cherry picked from commit 4fdb491) Signed-off-by: Reynold Xin <[email protected]>
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010 This PR adds support for nested collection types in PySpark SQL, including array, dict, list, set, and tuple. Example, ``` >>> from array import array >>> from pyspark.sql import SQLContext >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([ ... {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}] True >>> rdd = sc.parallelize([ ... {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == \ ... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}] True ``` Author: Kan Zhang <[email protected]> Closes apache#1041 from kanzhang/SPARK-2010 and squashes the following commits: 1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO 504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010 This PR adds support for nested collection types in PySpark SQL, including array, dict, list, set, and tuple. Example, ``` >>> from array import array >>> from pyspark.sql import SQLContext >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([ ... {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}] True >>> rdd = sc.parallelize([ ... {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == \ ... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}] True ``` Author: Kan Zhang <[email protected]> Closes apache#1041 from kanzhang/SPARK-2010 and squashes the following commits: 1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO 504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL
…g the shuffle service for released executors (#1041) * [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files. Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. No. New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford <[email protected]> Signed-off-by: Thomas Graves <[email protected]> * [SPARK-37618][CORE][FOLLOWUP] Support cleaning up shuffle blocks from external shuffle service Fix test failure in build. Depending on the umask of the process running tests (which is typically inherited from the user's default umask), the group writable bit for the files/directories could be set or unset. The test was assuming that by default the umask will be restrictive (and so files/directories wont be group writable). Since this is not a valid assumption, we use jnr to change the umask of the process to be more restrictive - so that the test can validate the behavior change - and reset it back once the test is done. Fix test failure in build No Adds jnr as a test scoped dependency, which does not bring in any other new dependency (asm is already a dep in spark). ``` [INFO] +- com.github.jnr:jnr-posix:jar:3.0.9:test [INFO] | +- com.github.jnr:jnr-ffi:jar:2.0.1:test [INFO] | | +- com.github.jnr:jffi:jar:1.2.7:test [INFO] | | +- com.github.jnr:jffi:jar:native:1.2.7:test [INFO] | | +- org.ow2.asm:asm:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-commons:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-analysis:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-tree:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-util:jar:5.0.3:test [INFO] | | \- com.github.jnr:jnr-x86asm:jar:1.0.2:test [INFO] | \- com.github.jnr:jnr-constants:jar:0.8.6:test ``` Modification to existing test. Tested on Linux, skips test when native posix env is not found. Closes #36473 from mridulm/fix-SPARK-37618-test. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: Sean Owen <[email protected]> * Fix ut failure Co-authored-by: Adam Binford <[email protected]> Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
…rors with manageSSLKeys.sh (apache#1041) Co-authored-by: Egor Krivokon <>
…rors with manageSSLKeys.sh (apache#1041) Co-authored-by: Egor Krivokon <>
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010
This PR adds support for nested collection types in PySpark SQL, including
array, dict, list, set, and tuple. Example,