Skip to content

Conversation

@rshkv
Copy link

@rshkv rshkv commented May 23, 2020

Taking a few Arrow fixes adjacent to but not necessary for #683.

  • SPARK-27629 Prevent Unpickler from intervening each unpickling
  • SPARK-28003 spark.createDataFrame with Arrow doesn't work with pandas.NaT
  • SPARK-28041 Increase minimum supported Pandas to 0.23.2 (we have a few internal pins to sort before that)
  • SPARK-28240 Fix Arrow tests to pass with Python 2.7 and latest PyArrow and Pandas in PySpark

Cherry-picks below are the commit and dependencies for SPARK-28881, which is a correctness issue where Arrow serialization would result in an empty dataframe if the result size is exceeded:

  • SPARK-26922 Set socket timeout consistently in Arrow optimization
  • SPARK-27102 Remove the references to Python's Scala codes in R's Scala codes
  • SPARK-23961 pyspark toLocalIterator throws an exception
  • SPARK-28881 toPandas with Arrow should not return a DataFrame when the result size exceeds spark.driver.maxResultSize
  • SPARK-27992 PySpark socket server should sync with JVM connection thread future

viirya and others added 12 commits May 23, 2020 20:49
…ling

In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol.

It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue.

A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared.

We has two options:

1. Continues to reuse Unpickler, but calls its close after each unpickling.
2. Not to reuse Unpickler and create new Unpickler object in each unpickling.

This patch takes option 1.

Passing the test added in SPARK-27612 (apache#24519).

Closes apache#24521 from viirya/SPARK-27629.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
This increases the minimum supported version of Pandas to 0.23.2. Using a lower version will raise an error `Pandas >= 0.23.2 must be installed; however, your version was 0.XX`. Also, a workaround for using pyarrow with Pandas 0.19.2 was removed.

Existing Tests

Closes apache#24867 from BryanCutler/pyspark-increase-min-pandas-SPARK-28041.

Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
…from pandas with Arrow

## What changes were proposed in this pull request?

This patch removes `fillna(0)` when creating ArrowBatch from a pandas Series.

With `fillna(0)`, the original code would turn a timestamp type into object type, which pyarrow will complain later:
```
>>> s = pd.Series([pd.NaT, pd.Timestamp('2015-01-01')])
>>> s.dtypes
dtype('<M8[ns]')
>>> s.fillna(0)
0                      0
1    2015-01-01 00:00:00
dtype: object
```

## How was this patch tested?

Added `test_timestamp_nat`

Closes apache#24844 from icexelloss/SPARK-28003-arrow-nat.

Authored-by: Li Jin <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
…est PyArrow and Pandas in PySpark

## What changes were proposed in this pull request?

In Python 2.7 with latest PyArrow and Pandas, the error message seems a bit different with Python 3. This PR simply fixes the test.

```
======================================================================
FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.ArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema
    self.spark.createDataFrame(pdf, schema=wrong_schema)
AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))"

======================================================================
FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.EncryptionArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema
    self.spark.createDataFrame(pdf, schema=wrong_schema)
AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))"

```

## How was this patch tested?

Manually tested.

```
cd python
./run-tests --python-executables=python --modules pyspark-sql
```

Closes apache#25042 from HyukjinKwon/SPARK-28240.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
## What changes were proposed in this pull request?

This PR sets socket timeout consistently across Arrow optimization by `SPARKR_BACKEND_CONNECTION_TIMEOUT`. There looks only one place left.

## How was this patch tested?

Existing tests should cover.

Closes apache#23971 from HyukjinKwon/SPARK-26922.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…a codes in R's Scala codes

## What changes were proposed in this pull request?

Currently, R's Scala codes happened to refer Python's Scala codes for code deduplications. It's a bit odd. For instance, when we face an exception from R, it shows python related code path, which makes confusing to debug. It should rather have one code base and R's and Python's should share.

This PR proposes:

1. Make a `SocketAuthServer` and move `PythonServer` so that `PythonRDD` and `RRDD` can share it.
2. Move `readRDDFromFile` and `readRDDFromInputStream` into `JavaRDD`.
3. Reuse `RAuthHelper` and remove `RSocketAuthHelper` in `RRDD`.
4. Rename `getEncryptionEnabled` to `isEncryptionEnabled` while I am here.

So, now, the places below:

- `sql/core/src/main/scala/org/apache/spark/sql/api/r`
- `core/src/main/scala/org/apache/spark/api/r`
- `mllib/src/main/scala/org/apache/spark/ml/r`

don't refer Python's Scala codes.

## How was this patch tested?

Existing tests should cover this.

Closes apache#24023 from HyukjinKwon/SPARK-27102.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…s out of scope and properly raise errors from worker

This fixes an error when a PySpark local iterator, for both RDD and DataFrames, goes out of scope and the connection is closed before fully consuming the iterator. The error occurs on the JVM in the serving thread, when Python closes the local socket while the JVM is writing to it. This usually happens when there is enough data to fill the socket read buffer, causing the write call to block.

Additionally, this fixes a problem when an error occurs in the Python worker and the collect job is cancelled with an exception. Previously, the Python driver was never notified of the error so the user could get a partial result (iteration until the error) and the application will continue. With this change, an error in the worker is sent to the Python iterator and is then raised.

The change here introduces a protocol for PySpark local iterators that work as follows:

1) The local socket connection is made when the iterator is created
2) When iterating, Python first sends a request for partition data as a non-zero integer
3) While the JVM local iterator over partitions has next, it triggers a job to collect the next partition
4) The JVM sends a nonzero response to indicate it has the next partition to send
5) The next partition is sent to Python and read by the PySpark deserializer
6) After sending the entire partition, an `END_OF_DATA_SECTION` is sent to Python which stops the deserializer and allows to make another request
7) When the JVM gets a request from Python but has already consumed it's local iterator, it will send a zero response to Python and both will close the socket cleanly
8) If an error occurs in the worker, a negative response is sent to Python followed by the error message. Python will then raise a RuntimeError with the message, stopping iteration.
9) When the PySpark local iterator is garbage-collected, it will read any remaining data from the current partition (this is data that has already been collected) and send a request of zero to tell the JVM to stop collection jobs and close the connection.

Steps 1, 3, 5, 6 are the same as before. Step 8 was completely missing before because errors in the worker were never communicated back to Python. The other steps add synchronization to allow for a clean closing of the socket, with a small trade-off in performance for each partition. This is mainly because the JVM does not start collecting partition data until it receives a request to do so, where before it would eagerly write all data until the socket receive buffer is full.

Added new unit tests for DataFrame and RDD `toLocalIterator` and tested not fully consuming the iterator. Manual tests with Python 2.7  and 3.6.

Closes apache#24070 from BryanCutler/pyspark-toLocalIterator-clean-stop-SPARK-23961.

Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
…propagate errors

## What changes were proposed in this pull request?

Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if the Spark job being run in the background serving thread errors, it will be caught and sent to Python through the PySpark serializer.
This is not the ideal solution because it is only catch a SparkException, it won't handle an error that occurs in the serializer, and each method has to have it's own special handling to propagate the error.

This PR instead returns the Python Server object along with the serving port and authentication info, so that it allows the Python caller to join with the serving thread. During the call to join, the serving thread Future is completed either successfully or with an exception. In the latter case, the exception will be propagated to Python through the Py4j call.

## How was this patch tested?

Existing tests

Closes apache#24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992.

Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
…row optimization throws an exception per maxResultSize

### What changes were proposed in this pull request?
This PR proposes to add a test case for:

```bash
./bin/pyspark --conf spark.driver.maxResultSize=1m
spark.conf.set("spark.sql.execution.arrow.enabled",True)
```

```python
spark.range(10000000).toPandas()
```

```
Empty DataFrame
Columns: [id]
Index: []
```

which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed.

### Why are the changes needed?
To prevent the same regression in the future.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Test was added.

Closes apache#25594 from HyukjinKwon/SPARK-28881.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
…(...)) to prevent for Spark conf to affect other tests

### What changes were proposed in this pull request?

This PR proposes to match the test with branch-2.4. See apache#25593 (comment)

Seems using `SparkSession.builder` with Spark conf possibly affects other tests.

### Why are the changes needed?
To match with branch-2.4 and to make easier to backport.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Test was fixed.

Closes apache#25603 from HyukjinKwon/SPARK-28881-followup.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
This PR updates plugins to latest versions.

This brings bug fixes like the following.
- https://issues.apache.org/jira/projects/MCOMPILER/versions/12343484 (maven-compiler-plugin)
- https://issues.apache.org/jira/projects/MJAVADOC/versions/12345060 (maven-javadoc-plugin)
- https://issues.apache.org/jira/projects/MCHECKSTYLE/versions/12342397 (maven-checkstyle-plugin)
- https://checkstyle.sourceforge.io/releasenotes.html#Release_8.25 (checkstyle)
- https://checkstyle.sourceforge.io/releasenotes.html#Release_8.24 (checkstyle)

No.

Pass the Jenkins building and testing with the existing code.

Closes apache#26117 from dongjoon-hyun/SPARK-29470.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This add `typesafe` bintray repo for `sbt-mima-plugin`.

### Why are the changes needed?

Since Oct 21, the following plugin causes [Jenkins failures](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.4-test-sbt-hadoop-2.6/611/console
) due to the missing jar.

- `branch-2.4`: `sbt-mima-plugin:0.1.17` is missing.
- `master`: `sbt-mima-plugin:0.3.0` is missing.

These versions of `sbt-mima-plugin` seems to be removed from the old repo.

```
$ rm -rf ~/.ivy2/

$ build/sbt scalastyle test:scalastyle
...
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	::          UNRESOLVED DEPENDENCIES         ::
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	:: com.typesafe#sbt-mima-plugin;0.1.17: not found
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Check `GitHub Action` linter result. This PR should pass. Or, manual check.
(Note that Jenkins PR builder didn't fail until now due to the local cache.)

Closes apache#26217 from dongjoon-hyun/SPARK-29560.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@rshkv rshkv changed the title Arrow fixes Misc PyArrow fixes Jun 4, 2020
@rshkv rshkv requested a review from robert3005 June 4, 2020 19:40
@rshkv rshkv merged commit 1270813 into master Jun 5, 2020
@rshkv rshkv deleted the wr/arrow-fixes branch June 5, 2020 10:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants