Skip to content

Conversation

@kanzhang
Copy link
Contributor

JIRA issue https://issues.apache.org/jira/browse/SPARK-2010

This PR adds support for nested collection types in PySpark SQL, including
array, dict, list, set, and tuple. Example,

>>> from array import array
>>> from pyspark.sql import SQLContext
>>> sqlCtx = SQLContext(sc)
>>> rdd = sc.parallelize([
...         {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...         {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...                    {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> rdd = sc.parallelize([
...         {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...         {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == \
... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...  {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15651/

@kanzhang
Copy link
Contributor Author

@pwendell the failure doesn't seem to be related to this patch. Could we re-test?

@aarondav
Copy link
Contributor

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15660/

@ahirreddy
Copy link
Contributor

Once it passes Jenkins, LGTM

@ahirreddy
Copy link
Contributor

I think you have to rebase against master, it's including extra commits that you probably don't want in jenkins:

[SPARK-2010] Support for nested data in PySpark SQL (commit: 95689830ed50819bfcbaf6cf9697c2710d7b457a) (detail)
[SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not re... (commit: 2966044) (detail)

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15660/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think a key question here is if nested dictionaries are Maps or Structs. Right now the outermost dict is treated like a struct with named attributes, so I'm kind of inclined to continue that trend. Although, I think then we would want some way to create a map. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked too deeply, my initial understanding is that Structs are for user-defined Objects (if we are going to support them in Python), while MapType should suffice if we access dictionaries only as maps. Specifically,

  1. Struct stores its fields in order, which makes it usable as schema for the outermost dict, as we need to match schema with data stored in an array for each row. Speaking of ordering, how can we be sure when we map dictionary values to an array, they are all in the same order?
    val rowRdd = rdd.mapPartitions { iter =>
      iter.map { map =>
        new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
      }
    }
  1. Structs can only be used for dicts whose the keys are Strings, right?

  2. I don't know enough about access pattern to tell if it is safe to label Set as ArrayType since Set is unordered?

@kanzhang
Copy link
Contributor Author

@ahirreddy thanks, rebased just now.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15787/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this line before the previous unit test block?

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@kanzhang
Copy link
Contributor Author

@rxin updated patch based on your review comments. Thx!

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15803/

@rxin
Copy link
Contributor

rxin commented Jun 16, 2014

Thanks. I've merged this in master & branch-1.0.

@asfgit asfgit closed this in 4fdb491 Jun 16, 2014
asfgit pushed a commit that referenced this pull request Jun 16, 2014
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010

This PR adds support for nested collection types in PySpark SQL, including
array, dict, list, set, and tuple. Example,

```
>>> from array import array
>>> from pyspark.sql import SQLContext
>>> sqlCtx = SQLContext(sc)
>>> rdd = sc.parallelize([
...         {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...         {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...                    {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> rdd = sc.parallelize([
...         {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...         {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == \
... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...  {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True
```

Author: Kan Zhang <[email protected]>

Closes #1041 from kanzhang/SPARK-2010 and squashes the following commits:

1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO
504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL

(cherry picked from commit 4fdb491)
Signed-off-by: Reynold Xin <[email protected]>
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010

This PR adds support for nested collection types in PySpark SQL, including
array, dict, list, set, and tuple. Example,

```
>>> from array import array
>>> from pyspark.sql import SQLContext
>>> sqlCtx = SQLContext(sc)
>>> rdd = sc.parallelize([
...         {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...         {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...                    {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> rdd = sc.parallelize([
...         {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...         {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == \
... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...  {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True
```

Author: Kan Zhang <[email protected]>

Closes apache#1041 from kanzhang/SPARK-2010 and squashes the following commits:

1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO
504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010

This PR adds support for nested collection types in PySpark SQL, including
array, dict, list, set, and tuple. Example,

```
>>> from array import array
>>> from pyspark.sql import SQLContext
>>> sqlCtx = SQLContext(sc)
>>> rdd = sc.parallelize([
...         {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...         {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...                    {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> rdd = sc.parallelize([
...         {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...         {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == \
... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...  {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True
```

Author: Kan Zhang <[email protected]>

Closes apache#1041 from kanzhang/SPARK-2010 and squashes the following commits:

1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO
504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL
@kanzhang kanzhang deleted the SPARK-2010 branch December 12, 2014 01:32
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
wangyum pushed a commit that referenced this pull request May 26, 2023
…g the shuffle service for released executors (#1041)

* [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors

Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself.

To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically:
- Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders.
- Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files.

Both of these changes are done after creating a file so that umasks don't affect the resulting permissions.

External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications.

No.

New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add.

Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks.

Authored-by: Adam Binford <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>

* [SPARK-37618][CORE][FOLLOWUP] Support cleaning up shuffle blocks from external shuffle service

Fix test failure in build.
Depending on the umask of the process running tests (which is typically inherited from the user's default umask), the group writable bit for the files/directories could be set or unset. The test was assuming that by default the umask will be restrictive (and so files/directories wont be group writable). Since this is not a valid assumption, we use jnr to change the umask of the process to be more restrictive - so that the test can validate the behavior change - and reset it back once the test is done.

Fix test failure in build

No

Adds jnr as a test scoped dependency, which does not bring in any other new dependency (asm is already a dep in spark).
```
[INFO] +- com.github.jnr:jnr-posix:jar:3.0.9:test
[INFO] |  +- com.github.jnr:jnr-ffi:jar:2.0.1:test
[INFO] |  |  +- com.github.jnr:jffi:jar:1.2.7:test
[INFO] |  |  +- com.github.jnr:jffi:jar:native:1.2.7:test
[INFO] |  |  +- org.ow2.asm:asm:jar:5.0.3:test
[INFO] |  |  +- org.ow2.asm:asm-commons:jar:5.0.3:test
[INFO] |  |  +- org.ow2.asm:asm-analysis:jar:5.0.3:test
[INFO] |  |  +- org.ow2.asm:asm-tree:jar:5.0.3:test
[INFO] |  |  +- org.ow2.asm:asm-util:jar:5.0.3:test
[INFO] |  |  \- com.github.jnr:jnr-x86asm:jar:1.0.2:test
[INFO] |  \- com.github.jnr:jnr-constants:jar:0.8.6:test
```

Modification to existing test.
Tested on Linux, skips test when native posix env is not found.

Closes #36473 from mridulm/fix-SPARK-37618-test.

Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Sean Owen <[email protected]>

* Fix ut failure

Co-authored-by: Adam Binford <[email protected]>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
udaynpusa pushed a commit to mapr/spark that referenced this pull request Jan 30, 2024
…rors with manageSSLKeys.sh (apache#1041)

Co-authored-by: Egor Krivokon <>
mapr-devops pushed a commit to mapr/spark that referenced this pull request May 8, 2025
…rors with manageSSLKeys.sh (apache#1041)

Co-authored-by: Egor Krivokon <>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants