forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 2
[pull] master from apache:master #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…ndalone cluster when dynamic allocation disabled
### What changes were proposed in this pull request?
Currently stage level scheduling works for yarn/k8s/standalone cluster when dynamic allocation is enabled, and spark app will acquire executors with different resource profiles and assign tasks to executors with the same resource profile id.
This PR proposed to add stage level scheduling when dynamic allocation is off. In this case, spark app will only have executors with default resource profiles, but different `Stages` can still customize their task resource requests which should be compatible with default resource profile executor resources. And all these `Stages` with different task resource requests will reuse/share the same set of executors with default resource profile.
And this PR proposed to:
1. Introduces a new special `ResourceProfile`: `TaskResourceProfile`, it can be used to describe different task resource requests when dynamic allocation is off. And tasks bind to this `TaskResourceProfile` will reuse executors with default resource profile.
`Exception` should be thrown if executors with default resource profile can not fulfill the task resource requests.
```
class TaskResourceProfile(override val taskResources: Map[String, TaskResourceRequest])
extends ResourceProfile(
ResourceProfile.getOrCreateDefaultProfile(SparkEnv.get.conf).executorResources,
taskResources)
```
2. `DADScheduler` and `TaskScheduler` will schedule tasks with customized `ResourceProfile` based on resource profile type and resource profile Id, taskSets with `TaskResourceProfile` can be scheduled to executors with `DEFAULT_RESOURCE_PROFILE_ID` and other taskSets can be scheduled to executors with exactly same resource profile id.
### Why are the changes needed?
When dynamic allocation is disabled, we can also leverage stage level schedule to customize task resource requests for different stages.
### Does this PR introduce _any_ user-facing change?
Spark users can specify `TaskResourceProfile` to customize task resource requests for different stages when dynamic allocation is off.
### How was this patch tested?
New UTs added.
Closes #37268 from ivoson/stage-schedule-dynamic-off.
Lead-authored-by: Tengfei Huang <[email protected]>
Co-authored-by: Huang Tengfei <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
…nd inline_outer ### What changes were proposed in this pull request? Adds Scala and Python bindings for SQL functions inline and inline_outer ### Why are the changes needed? Currently these functions can only be used via SQL or through `expr` functions. This makes it a little easier to use them with the DataFrame APIs. ### Does this PR introduce _any_ user-facing change? Exposes new functions directly instead of only through SQL. ### How was this patch tested? Updated existing inline tests to use the new Scala binding instead of being called through SQL expressions Closes #37770 from Kimahriman/inline-bindings. Authored-by: Adam Binford <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…andas regression ### What changes were proposed in this pull request? This PR proposes to skip the `DataFrame.corr_with` test when the `other` is `pyspark.pandas.Series` and the `method` is "spearman" or "pearson", since there is regression in pandas 1.5.0 for that cases. ### Why are the changes needed? There are some regressions in pandas 1.5.0, so we're not going to match the behavior for those cases. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested with pandas 1.5.0, confirmed the test pass. Closes #38031 from itholic/SPARK-40589. Authored-by: itholic <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request?
The PR proposes to fix `CategoricalIndex.append` to match the behavior with pandas.
### Why are the changes needed?
Because the current behavior is different from pandas 1.5.0.
### Does this PR introduce _any_ user-facing change?
The behavior of API is changed as below:
**Before**
```python
>>> psidx1 = ps.CategoricalIndex(["x", "y", "z"], categories=["z", "y", "x", "w"])
>>> psidx3 = ps.Index(["y", "x", "w", "z"])
>>> psidx1.append(psidx3.astype("category"))
CategoricalIndex(['x', 'y', 'z', 'y', 'x', 'w', 'z'], categories=['z', 'y', 'x', 'w'], ordered=False, dtype='category')
```
**After**
```python
>>> psidx1 = ps.CategoricalIndex(["x", "y", "z"], categories=["z", "y", "x", "w"])
>>> psidx3 = ps.Index(["y", "x", "w", "z"])
>>> psidx1.append(psidx3.astype("category"))
CategoricalIndex(['x', 'y', 'z', 'x', 'y', 'z', 'w'], categories=['z', 'y', 'x', 'w'], ordered=False, dtype='category')
```
### How was this patch tested?
Manually check the existing test is passed with pandas 1.5.0.
Closes #38015 from itholic/SPARK-40577.
Authored-by: itholic <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? Eliminate `to_pandas` warnings in test, by changing `to_pandas()` to `_to_pandas()` ### Why are the changes needed? when a test containing `to_pandas` failed, it may print tons of warnings, for example, in https://github.com/zhengruifeng/spark/actions/runs/3142284988/jobs/5106178199 it printed this warning from line 1243 to line 3985: <img width="1170" alt="image" src="https://user-images.githubusercontent.com/7322292/192949507-ae1d3677-6ba4-4d80-84b1-44884fb1988b.png"> ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? updated tests Closes #38042 from zhengruifeng/ps_test_to_pandas. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…GACY_ERROR_TEMP_1200-1299 ### What changes were proposed in this pull request? In the PR, I propose to migrate 100 compilation errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_12xx`. The error message will not include the error classes, so, in this way we will preserve the existing behaviour. ### Why are the changes needed? The migration on temporary error classes allows to gather statistics about errors and detect most popular error classes. After that we could prioritise the work on migration. The new error class name prefix `_LEGACY_ERROR_TEMP_` proposed here kind of marks the error as developer-facing, not user-facing. Developers can still get the error class programmatically via the `SparkThrowable` interface, so that they can build error infra with it. End users won't see the error class in the message. This allows us to do the error migration very quickly, and we can refine the error classes and mark them as user-facing later (naming them properly, adding tests, etc.). ### Does this PR introduce _any_ user-facing change? No. The error messages should be almost the same by default. ### How was this patch tested? By running the modified test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "test:testOnly *SQLQuerySuite" ``` Closes #38027 from MaxGekk/legacy-error-temp-compliation-1200. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…ngSuite` ### What changes were proposed in this pull request? Similar as SPARK-39869, this pr add explicitly `System.gc()` before each case for `HivePartitionFilteringSuite` to avoid OutOfMemoryError as far as possible. ### Why are the changes needed? Fix flaky test. ``` 2022-09-29T16:23:50.4263170Z [info] org.apache.spark.sql.hive.client.HivePartitionFilteringSuites *** ABORTED *** (26 minutes, 32 seconds) 2022-09-29T16:23:50.4340944Z [info] java.lang.reflect.InvocationTargetException: 2022-09-29T16:23:50.4341736Z [info] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 2022-09-29T16:23:50.4342537Z [info] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 2022-09-29T16:23:50.4343543Z [info] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 2022-09-29T16:23:50.4344319Z [info] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 2022-09-29T16:23:50.4345108Z [info] at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:315) 2022-09-29T16:23:50.4346070Z [info] at org.apache.spark.sql.hive.client.HiveClientBuilder$.buildClient(HiveClientBuilder.scala:50) 2022-09-29T16:23:50.4347512Z [info] at org.apache.spark.sql.hive.client.HiveVersionSuite.buildClient(HiveVersionSuite.scala:48) 2022-09-29T16:23:50.4348463Z [info] at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.init(HivePartitionFilteringSuite.scala:73) 2022-09-29T16:23:50.4349656Z [info] at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.beforeAll(HivePartitionFilteringSuite.scala:118) 2022-09-29T16:23:50.4350533Z [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212) 2022-09-29T16:23:50.4351500Z [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) 2022-09-29T16:23:50.4352219Z [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) 2022-09-29T16:23:50.4353147Z [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:66) 2022-09-29T16:23:50.4353841Z [info] at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1178) 2022-09-29T16:23:50.4354737Z [info] at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1225) 2022-09-29T16:23:50.4355475Z [info] at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 2022-09-29T16:23:50.4356464Z [info] at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 2022-09-29T16:23:50.4357212Z [info] at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) 2022-09-29T16:23:50.4358108Z [info] at org.scalatest.Suite.runNestedSuites(Suite.scala:1223) 2022-09-29T16:23:50.4358777Z [info] at org.scalatest.Suite.runNestedSuites$(Suite.scala:1156) 2022-09-29T16:23:50.4359870Z [info] at org.apache.spark.sql.hive.client.HivePartitionFilteringSuites.runNestedSuites(HivePartitionFilteringSuites.scala:24) 2022-09-29T16:23:50.4360679Z [info] at org.scalatest.Suite.run(Suite.scala:1111) 2022-09-29T16:23:50.4361498Z [info] at org.scalatest.Suite.run$(Suite.scala:1096) 2022-09-29T16:23:50.4362487Z [info] at org.apache.spark.sql.hive.client.HivePartitionFilteringSuites.run(HivePartitionFilteringSuites.scala:24) 2022-09-29T16:23:50.4363571Z [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) 2022-09-29T16:23:50.4364320Z [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:516) 2022-09-29T16:23:50.4365208Z [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413) 2022-09-29T16:23:50.4365870Z [info] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2022-09-29T16:23:50.4366831Z [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2022-09-29T16:23:50.4368396Z [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2022-09-29T16:23:50.4368925Z [info] at java.lang.Thread.run(Thread.java:750) 2022-09-29T16:23:50.4369387Z [info] Cause: java.lang.OutOfMemoryError: Metaspace 2022-09-29T16:23:50.4369882Z [info] at java.lang.ClassLoader.defineClass1(Native Method) 2022-09-29T16:23:50.4370399Z [info] at java.lang.ClassLoader.defineClass(ClassLoader.java:757) 2022-09-29T16:23:50.4370965Z [info] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 2022-09-29T16:23:50.4371538Z [info] at java.net.URLClassLoader.defineClass(URLClassLoader.java:473) 2022-09-29T16:23:50.4372072Z [info] at java.net.URLClassLoader.access$100(URLClassLoader.java:74) 2022-09-29T16:23:50.4372586Z [info] at java.net.URLClassLoader$1.run(URLClassLoader.java:369) 2022-09-29T16:23:50.4373083Z [info] at java.net.URLClassLoader$1.run(URLClassLoader.java:363) 2022-09-29T16:23:50.4373604Z [info] at java.security.AccessController.doPrivileged(Native Method) 2022-09-29T16:23:50.4374148Z [info] at java.net.URLClassLoader.findClass(URLClassLoader.java:362) 2022-09-29T16:23:50.4374670Z [info] at java.lang.ClassLoader.loadClass(ClassLoader.java:419) 2022-09-29T16:23:50.4375323Z [info] at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:269) 2022-09-29T16:23:50.4376066Z [info] at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:258) 2022-09-29T16:23:50.4376672Z [info] at java.lang.ClassLoader.loadClass(ClassLoader.java:352) 2022-09-29T16:23:50.4377204Z [info] at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:150) 2022-09-29T16:23:50.4377825Z [info] at org.apache.spark.sql.hive.client.HiveClientImpl$.newHiveConf(HiveClientImpl.scala:1284) 2022-09-29T16:23:50.4378489Z [info] at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:171) 2022-09-29T16:23:50.4379144Z [info] at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:136) 2022-09-29T16:23:50.4379774Z [info] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 2022-09-29T16:23:50.4380475Z [info] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 2022-09-29T16:23:50.4381298Z [info] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 2022-09-29T16:23:50.4381965Z [info] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 2022-09-29T16:23:50.4382648Z [info] at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:315) 2022-09-29T16:23:50.4383383Z [info] at org.apache.spark.sql.hive.client.HiveClientBuilder$.buildClient(HiveClientBuilder.scala:50) 2022-09-29T16:23:50.4384079Z [info] at org.apache.spark.sql.hive.client.HiveVersionSuite.buildClient(HiveVersionSuite.scala:48) 2022-09-29T16:23:50.4384868Z [info] at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.init(HivePartitionFilteringSuite.scala:73) 2022-09-29T16:23:50.4385686Z [info] at org.apache.spark.sql.hive.client.HivePartitionFilteringSuite.beforeAll(HivePartitionFilteringSuite.scala:118) 2022-09-29T16:23:50.4386560Z [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212) 2022-09-29T16:23:50.4387139Z [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) 2022-09-29T16:23:50.4387696Z [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) 2022-09-29T16:23:50.4388245Z [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:66) 2022-09-29T16:23:50.4388781Z [info] at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1178) 2022-09-29T16:23:50.4389309Z [info] at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1225) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #38057 from LuciferYang/SPARK-40619. Authored-by: yangjie01 <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…y.sum`
### What changes were proposed in this pull request?
Implement `numeric_only` and `min_count` in `GroupBy.sum`
### Why are the changes needed?
for API coverage
### Does this PR introduce _any_ user-facing change?
new parameter
```
In [2]: df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], "C": [3, 4, 3, 4], "D": ["a", "a", "b", "a"]})
In [3]: df.groupby("A").sum(numeric_only=False).sort_index()
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: GroupBy.sum() can only support numeric and bool columns even ifnumeric_only=False, skip unsupported columns: ['D']
warnings.warn(message, PandasAPIOnSparkAdviceWarning)
B C
A
1 1 6
2 1 8
In [4]: df._to_pandas().groupby("A").sum(numeric_only=False).sort_index()
Out[4]:
B C D
A
1 1 6 ab
2 1 8 aa
In [5]: df.groupby("D").sum(min_count=3).sort_index()
Out[5]:
A B C
D
a 5.0 2.0 11.0
b NaN NaN NaN
In [6]: df._to_pandas().groupby("D").sum(min_count=3).sort_index()
Out[6]:
A B C
D
a 5.0 2.0 11.0
b NaN NaN NaN
```
### How was this patch tested?
added UT
Closes #38060 from zhengruifeng/ps_groupby_sum_numonly_mc.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
pull bot
pushed a commit
that referenced
this pull request
Apr 22, 2023
…onnect ### What changes were proposed in this pull request? Implement Arrow-optimized Python UDFs in Spark Connect. Please see apache#39384 for motivation and performance improvements of Arrow-optimized Python UDFs. ### Why are the changes needed? Parity with vanilla PySpark. ### Does this PR introduce _any_ user-facing change? Yes. In Spark Connect Python Client, users can: 1. Set `useArrow` parameter True to enable Arrow optimization for a specific Python UDF. ```sh >>> df = spark.range(2) >>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show() +------------+ |<lambda>(id)| +------------+ | 1| | 2| +------------+ # ArrowEvalPython indicates Arrow optimization >>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain() == Physical Plan == *(2) Project [pythonUDF0#18 AS <lambda>(id)#16] +- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200 +- *(1) Range (0, 2, step=1, splits=1) ``` 2. Enable `spark.sql.execution.pythonUDF.arrow.enabled` Spark Conf to make all Python UDFs Arrow-optimized. ```sh >>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True) >>> df.select(udf(lambda x : x + 1)('id')).show() +------------+ |<lambda>(id)| +------------+ | 1| | 2| +------------+ # ArrowEvalPython indicates Arrow optimization >>> df.select(udf(lambda x : x + 1)('id')).explain() == Physical Plan == *(2) Project [pythonUDF0#30 AS <lambda>(id)#28] +- ArrowEvalPython [<lambda>(id#26L)#27], [pythonUDF0#30], 200 +- *(1) Range (0, 2, step=1, splits=1) ``` ### How was this patch tested? Parity unit tests. Closes apache#40725 from xinrong-meng/connect_arrow_py_udf. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot]
Can you help keep this open source service alive? 💖 Please sponsor : )