Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Sep 30, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

ivoson and others added 7 commits September 30, 2022 09:04
…ndalone cluster when dynamic allocation disabled

### What changes were proposed in this pull request?
Currently stage level scheduling works for yarn/k8s/standalone cluster when dynamic allocation is enabled, and spark app will acquire executors with different resource profiles and assign tasks to executors with the same resource profile id.

This PR proposed to add stage level scheduling when dynamic allocation is off. In this case, spark app will only have executors with default resource profiles, but different `Stages` can still customize their task resource requests which should be compatible with default resource profile executor resources. And all these `Stages` with different task resource requests will reuse/share the same set of executors with default resource profile.

And this PR proposed to:
1. Introduces a new special `ResourceProfile`: `TaskResourceProfile`, it can be used to describe different task resource requests when dynamic allocation is off. And tasks bind to this `TaskResourceProfile` will reuse executors with default resource profile.
`Exception` should be thrown if executors with default resource profile can not fulfill the task resource requests.
```
class TaskResourceProfile(override val taskResources: Map[String, TaskResourceRequest])
  extends ResourceProfile(
    ResourceProfile.getOrCreateDefaultProfile(SparkEnv.get.conf).executorResources,
    taskResources)
```
2. `DADScheduler` and `TaskScheduler` will schedule tasks with customized `ResourceProfile` based on resource profile type and resource profile Id, taskSets with `TaskResourceProfile` can be scheduled to executors with `DEFAULT_RESOURCE_PROFILE_ID` and other taskSets can be scheduled to executors with exactly same resource profile id.

### Why are the changes needed?
When dynamic allocation is disabled, we can also leverage stage level schedule to customize task resource requests for different stages.

### Does this PR introduce _any_ user-facing change?
Spark users can specify `TaskResourceProfile` to customize task resource requests for different stages when dynamic allocation is off.

### How was this patch tested?
New UTs added.

Closes #37268 from ivoson/stage-schedule-dynamic-off.

Lead-authored-by: Tengfei Huang <[email protected]>
Co-authored-by: Huang Tengfei <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
…nd inline_outer

### What changes were proposed in this pull request?

Adds Scala and Python bindings for SQL functions inline and inline_outer

### Why are the changes needed?

Currently these functions can only be used via SQL or through `expr` functions. This makes it a little easier to use them with the DataFrame APIs.

### Does this PR introduce _any_ user-facing change?

Exposes new functions directly instead of only through SQL.

### How was this patch tested?

Updated existing inline tests to use the new Scala binding instead of being called through SQL expressions

Closes #37770 from Kimahriman/inline-bindings.

Authored-by: Adam Binford <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…andas regression

### What changes were proposed in this pull request?

This PR proposes to skip the `DataFrame.corr_with` test when the `other` is `pyspark.pandas.Series` and the `method` is "spearman" or "pearson", since there is regression in pandas 1.5.0 for that cases.

### Why are the changes needed?

There are some regressions in pandas 1.5.0, so we're not going to match the behavior for those cases.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually tested with pandas 1.5.0, confirmed the test pass.

Closes #38031 from itholic/SPARK-40589.

Authored-by: itholic <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request?

The PR proposes to fix `CategoricalIndex.append` to match the behavior with pandas.

### Why are the changes needed?

Because the current behavior is different from pandas 1.5.0.

### Does this PR introduce _any_ user-facing change?

The behavior of API is changed as below:

**Before**
```python
>>> psidx1 = ps.CategoricalIndex(["x", "y", "z"], categories=["z", "y", "x", "w"])
>>> psidx3 = ps.Index(["y", "x", "w", "z"])
>>> psidx1.append(psidx3.astype("category"))
CategoricalIndex(['x', 'y', 'z', 'y', 'x', 'w', 'z'], categories=['z', 'y', 'x', 'w'], ordered=False, dtype='category')
```
**After**
```python
>>> psidx1 = ps.CategoricalIndex(["x", "y", "z"], categories=["z", "y", "x", "w"])
>>> psidx3 = ps.Index(["y", "x", "w", "z"])
>>> psidx1.append(psidx3.astype("category"))
CategoricalIndex(['x', 'y', 'z', 'x', 'y', 'z', 'w'], categories=['z', 'y', 'x', 'w'], ordered=False, dtype='category')
```
### How was this patch tested?

Manually check the existing test is passed with pandas 1.5.0.

Closes #38015 from itholic/SPARK-40577.

Authored-by: itholic <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Eliminate `to_pandas` warnings in test, by changing `to_pandas()` to `_to_pandas()`

### Why are the changes needed?
when a test containing `to_pandas` failed, it may print tons of warnings, for example, in https://github.com/zhengruifeng/spark/actions/runs/3142284988/jobs/5106178199

it printed this warning from line 1243 to line 3985:

<img width="1170" alt="image" src="https://user-images.githubusercontent.com/7322292/192949507-ae1d3677-6ba4-4d80-84b1-44884fb1988b.png">

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
updated tests

Closes #38042 from zhengruifeng/ps_test_to_pandas.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…GACY_ERROR_TEMP_1200-1299

### What changes were proposed in this pull request?
In the PR, I propose to migrate 100 compilation errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_12xx`. The error message will not include the error classes, so, in this way we will preserve the existing behaviour.

### Why are the changes needed?
The migration on temporary error classes allows to gather statistics about errors and detect most popular error classes. After that we could prioritise the work on migration.

The new error class name prefix `_LEGACY_ERROR_TEMP_` proposed here kind of marks the error as developer-facing, not user-facing. Developers can still get the error class programmatically via the `SparkThrowable` interface, so that they can build error infra with it. End users won't see the error class in the message. This allows us to do the error migration very quickly, and we can refine the error classes and mark them as user-facing later (naming them properly, adding tests, etc.).

### Does this PR introduce _any_ user-facing change?
No. The error messages should be almost the same by default.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
$ build/sbt "test:testOnly *SQLQuerySuite"
```

Closes #38027 from MaxGekk/legacy-error-temp-compliation-1200.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
…ngSuite`

### What changes were proposed in this pull request?
Similar as SPARK-39869, this pr add explicitly `System.gc()` before each case for `HivePartitionFilteringSuite` to avoid OutOfMemoryError as far as possible.

### Why are the changes needed?
Fix flaky test.

```
2022-09-29T16:23:50.4263170Z [info] org.apache.spark.sql.hive.client.HivePartitionFilteringSuites *** ABORTED *** (26 minutes, 32 seconds)
2022-09-29T16:23:50.4340944Z [info]   java.lang.reflect.InvocationTargetException:
2022-09-29T16:23:50.4341736Z [info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2022-09-29T16:23:50.4342537Z [info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
2022-09-29T16:23:50.4343543Z [info]   at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
2022-09-29T16:23:50.4344319Z [info]   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
2022-09-29T16:23:50.4345108Z [info]   at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:315)
2022-09-29T16:23:50.4346070Z [info]   at org.apache.spark.sql.hive.client.HiveClientBuilder$.buildClient(HiveClientBuilder.scala:50)
2022-09-29T16:23:50.4347512Z [info]   at org.apache.spark.sql.hive.client.HiveVersionSuite.buildClient(HiveVersionSuite.scala:48)
2022-09-29T16:23:50.4348463Z [info]   at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.init(HivePartitionFilteringSuite.scala:73)
2022-09-29T16:23:50.4349656Z [info]   at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.beforeAll(HivePartitionFilteringSuite.scala:118)
2022-09-29T16:23:50.4350533Z [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
2022-09-29T16:23:50.4351500Z [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2022-09-29T16:23:50.4352219Z [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2022-09-29T16:23:50.4353147Z [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:66)
2022-09-29T16:23:50.4353841Z [info]   at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1178)
2022-09-29T16:23:50.4354737Z [info]   at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1225)
2022-09-29T16:23:50.4355475Z [info]   at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
2022-09-29T16:23:50.4356464Z [info]   at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
2022-09-29T16:23:50.4357212Z [info]   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
2022-09-29T16:23:50.4358108Z [info]   at org.scalatest.Suite.runNestedSuites(Suite.scala:1223)
2022-09-29T16:23:50.4358777Z [info]   at org.scalatest.Suite.runNestedSuites$(Suite.scala:1156)
2022-09-29T16:23:50.4359870Z [info]   at org.apache.spark.sql.hive.client.HivePartitionFilteringSuites.runNestedSuites(HivePartitionFilteringSuites.scala:24)
2022-09-29T16:23:50.4360679Z [info]   at org.scalatest.Suite.run(Suite.scala:1111)
2022-09-29T16:23:50.4361498Z [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
2022-09-29T16:23:50.4362487Z [info]   at org.apache.spark.sql.hive.client.HivePartitionFilteringSuites.run(HivePartitionFilteringSuites.scala:24)
2022-09-29T16:23:50.4363571Z [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
2022-09-29T16:23:50.4364320Z [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:516)
2022-09-29T16:23:50.4365208Z [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
2022-09-29T16:23:50.4365870Z [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2022-09-29T16:23:50.4366831Z [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2022-09-29T16:23:50.4368396Z [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2022-09-29T16:23:50.4368925Z [info]   at java.lang.Thread.run(Thread.java:750)
2022-09-29T16:23:50.4369387Z [info]   Cause: java.lang.OutOfMemoryError: Metaspace
2022-09-29T16:23:50.4369882Z [info]   at java.lang.ClassLoader.defineClass1(Native Method)
2022-09-29T16:23:50.4370399Z [info]   at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
2022-09-29T16:23:50.4370965Z [info]   at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
2022-09-29T16:23:50.4371538Z [info]   at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
2022-09-29T16:23:50.4372072Z [info]   at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
2022-09-29T16:23:50.4372586Z [info]   at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
2022-09-29T16:23:50.4373083Z [info]   at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
2022-09-29T16:23:50.4373604Z [info]   at java.security.AccessController.doPrivileged(Native Method)
2022-09-29T16:23:50.4374148Z [info]   at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
2022-09-29T16:23:50.4374670Z [info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
2022-09-29T16:23:50.4375323Z [info]   at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:269)
2022-09-29T16:23:50.4376066Z [info]   at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:258)
2022-09-29T16:23:50.4376672Z [info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
2022-09-29T16:23:50.4377204Z [info]   at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:150)
2022-09-29T16:23:50.4377825Z [info]   at org.apache.spark.sql.hive.client.HiveClientImpl$.newHiveConf(HiveClientImpl.scala:1284)
2022-09-29T16:23:50.4378489Z [info]   at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:171)
2022-09-29T16:23:50.4379144Z [info]   at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:136)
2022-09-29T16:23:50.4379774Z [info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2022-09-29T16:23:50.4380475Z [info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
2022-09-29T16:23:50.4381298Z [info]   at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
2022-09-29T16:23:50.4381965Z [info]   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
2022-09-29T16:23:50.4382648Z [info]   at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:315)
2022-09-29T16:23:50.4383383Z [info]   at org.apache.spark.sql.hive.client.HiveClientBuilder$.buildClient(HiveClientBuilder.scala:50)
2022-09-29T16:23:50.4384079Z [info]   at org.apache.spark.sql.hive.client.HiveVersionSuite.buildClient(HiveVersionSuite.scala:48)
2022-09-29T16:23:50.4384868Z [info]   at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.init(HivePartitionFilteringSuite.scala:73)
2022-09-29T16:23:50.4385686Z [info]   at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.beforeAll(HivePartitionFilteringSuite.scala:118)
2022-09-29T16:23:50.4386560Z [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
2022-09-29T16:23:50.4387139Z [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2022-09-29T16:23:50.4387696Z [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2022-09-29T16:23:50.4388245Z [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:66)
2022-09-29T16:23:50.4388781Z [info]   at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1178)
2022-09-29T16:23:50.4389309Z [info]   at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1225)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #38057 from LuciferYang/SPARK-40619.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…y.sum`

### What changes were proposed in this pull request?
Implement `numeric_only` and `min_count` in `GroupBy.sum`

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
new parameter

```
In [2]: df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], "C": [3, 4, 3, 4], "D": ["a", "a", "b", "a"]})

In [3]: df.groupby("A").sum(numeric_only=False).sort_index()
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: GroupBy.sum() can only support numeric and bool columns even ifnumeric_only=False, skip unsupported columns: ['D']
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)

   B  C
A
1  1  6
2  1  8

In [4]: df._to_pandas().groupby("A").sum(numeric_only=False).sort_index()
Out[4]:
   B  C   D
A
1  1  6  ab
2  1  8  aa

In [5]: df.groupby("D").sum(min_count=3).sort_index()
Out[5]:
     A    B     C
D
a  5.0  2.0  11.0
b  NaN  NaN   NaN

In [6]: df._to_pandas().groupby("D").sum(min_count=3).sort_index()
Out[6]:
     A    B     C
D
a  5.0  2.0  11.0
b  NaN  NaN   NaN

```

### How was this patch tested?
added UT

Closes #38060 from zhengruifeng/ps_groupby_sum_numonly_mc.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@pull pull bot merged commit 014165f into wangyum:master Sep 30, 2022
pull bot pushed a commit that referenced this pull request Apr 22, 2023
…onnect

### What changes were proposed in this pull request?
Implement Arrow-optimized Python UDFs in Spark Connect.

Please see apache#39384 for motivation and  performance improvements of Arrow-optimized Python UDFs.

### Why are the changes needed?
Parity with vanilla PySpark.

### Does this PR introduce _any_ user-facing change?
Yes. In Spark Connect Python Client, users can:

1. Set `useArrow` parameter True to enable Arrow optimization for a specific Python UDF.

```sh
>>> df = spark.range(2)
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#18 AS <lambda>(id)#16]
+- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200
   +- *(1) Range (0, 2, step=1, splits=1)
```

2. Enable `spark.sql.execution.pythonUDF.arrow.enabled` Spark Conf to make all Python UDFs Arrow-optimized.

```sh
>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> df.select(udf(lambda x : x + 1)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#30 AS <lambda>(id)#28]
+- ArrowEvalPython [<lambda>(id#26L)#27], [pythonUDF0#30], 200
   +- *(1) Range (0, 2, step=1, splits=1)

```

### How was this patch tested?
Parity unit tests.

Closes apache#40725 from xinrong-meng/connect_arrow_py_udf.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants