forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 2
[pull] master from apache:master #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…o extract event time from the window column ### What changes were proposed in this pull request? This PR introduces a window_time function to extract streaming event time from a window column produced by the window aggregating operators. This is one step in sequence of fixes required to add support for multiple stateful operators in Spark Structured Streaming as described in https://issues.apache.org/jira/browse/SPARK-40821 ### Why are the changes needed? The window_time function is a convenience function to compute correct event time for a window aggregate records. Such records produced by window aggregating operators have no explicit event time but rather a window column of type StructType { start: TimestampType, end: TimestampType } where start is inclusive and end is exclusive. The correct event time for such record is window.end - 1. The event time is necessary when chaining other stateful operators after the window aggregating operators. ### Does this PR introduce _any_ user-facing change? Yes: The PR introduces a new window_time SQL function for both Scala and Python APIs. ### How was this patch tested? Added new unit tests. Closes #38288 from alex-balikov/SPARK-40821-time-window. Authored-by: Alex Balikov <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request? Upgrade fabric8io - kubernetes-client from 6.1.1 to 6.2.0 ### Why are the changes needed? [Release notes](https://github.com/fabric8io/kubernetes-client/releases/tag/v6.2.0) [Snakeyaml version should be updated to mitigate CVE-2022-28857](fabric8io/kubernetes-client#4383) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #38348 from bjornjorgensen/kubernetes-client6.2.0. Authored-by: Bjørn <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? Reimplement `crosstab` with dataframe operations ### Why are the changes needed? 1, do not truncate the sql plan; 2, much more scalable; 3, existing implementation (added in v1.5.0) collect distinct `col1, col2` pairs to driver, while `pivot` (added in v2.4.0) only collect distinct `col2` which is much smaller; ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs and manually check Closes #38340 from zhengruifeng/sql_stat_crosstab. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Reimplement `summary` with dataframe operations
### Why are the changes needed?
1, do not truncate the sql plan any more;
2, enable sql optimization like column pruning:
```
scala> val df = spark.range(0, 3, 1, 10).withColumn("value", lit("str"))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: string]
scala> df.summary("max", "50%").show
+-------+---+-----+
|summary| id|value|
+-------+---+-----+
| max| 2| str|
| 50%| 1| null|
+-------+---+-----+
scala> df.summary("max", "50%").select("id").show
+---+
| id|
+---+
| 2|
| 1|
+---+
scala> df.summary("max", "50%").select("id").queryExecution.optimizedPlan
res4: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [element_at(id#367, summary#376, None, false) AS id#371]
+- Generate explode([max,50%]), false, [summary#376]
+- Aggregate [map(max, cast(max(id#153L) as string), 50%, cast(percentile_approx(id#153L, [0.5], 10000, 0, 0)[0] as string)) AS id#367]
+- Range (0, 3, step=1, splits=Some(10))
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing UTs and manually check
Closes #38346 from zhengruifeng/sql_stat_summary.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? Purging old entries in both the offset log and commit log will be done asynchronously. For every micro-batch, older entries in both offset log and commit log are deleted. This is done so that the offset log and commit log do not continually grow. Please reference logic here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L539 The time spent performing these log purges is grouped with the “walCommit” execution time in the StreamingProgressListener metrics. Around two thirds of the “walCommit” execution time is performing these purge operations thus making these operations asynchronous will also reduce latency. Also, we do not necessarily need to perform the purges every micro-batch. When these purges are executed asynchronously, they do not need to block micro-batch execution and we don’t need to start another purge until the current one is finished. The purges can happen essentially in the background. We will just have to synchronize the purges with the offset WAL commits and completion commits so that we don’t have concurrent modifications of the offset log and commit log. ### Why are the changes needed? Decrease microbatch processing latency ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #38313 from jerrypeng/SPARK-40849. Authored-by: Jerry Peng <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request?
remove unused imports
### Why are the changes needed?
```
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala:24:78: Unused import
[error] import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, EvalMode, GenericInternalRow}
[error] ^
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala:26:52: Unused import
[error] import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
[error] ^
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala:31:38: Unused import
[error] import org.apache.spark.unsafe.types.UTF8String
[error] ^
[error] three errors found
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
maunally build
Closes #38362 from zhengruifeng/sql_clean_unused_imports.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…on client ### What changes were proposed in this pull request? Following up on #38276, this PR improve both `distinct()` and `dropDuplicates` DataFrame API in Python client, which both depends on `Deduplicate` plan in the Connect proto. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38327 from amaliujia/python_deduplicate. Authored-by: Rui Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…edicates <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. 8. If you want to add or modify an error type or message, please read the guideline first in 'core/src/main/resources/error/README.md'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This PR supports correlated non-equality predicates in subqueries. It leverages the DecorrelateInnerQuery framework to decorrelate subqueries with non-equality predicates. DecorrelateInnerQuery inserts domain joins in the query plan and the rule RewriteCorrelatedScalarSubquery rewrites the domain joins into actual joins with the outer query. Note, correlated non-equality predicates can lead to query plans with non-equality join conditions, which may be planned as a broadcast NL join or cartesian product. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> To improve subquery support in Spark. ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as the documentation fix. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. Before this PR, Spark does not allow correlated non-equality predicates in subqueries. For example: ```sql SELECT (SELECT min(c2) FROM t2 WHERE t1.c1 > t2.c1) FROM t1 ``` This will throw an exception: `Correlated column is not allowed in a non-equality predicate` After this PR, this query can run successfully. ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks. --> Unit tests and SQL query tests. Closes #38135 from allisonwang-db/spark-36114-non-equality-pred. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…nSubquery ### What changes were proposed in this pull request? This PR modifies the optimizer rule `OptimizeOneRowRelationSubquery` to always collapse projects and inline non-volatile expressions. ### Why are the changes needed? SPARK-39699 made `CollpaseProjects` more conservative. This has impacted correlated subqueries that Spark used to be able to support. For example, Spark used to be able to execute this correlated subquery: ```sql SELECT ( SELECT array_sort(a, (i, j) -> rank[i] - rank[j]) AS sorted FROM (SELECT MAP('a', 1, 'b', 2) rank) ) FROM t1 ``` But after SPARK-39699, it will throw an exception `Unexpected operator Join Inner` because the projects inside the subquery can no longer be collapsed. We should always inline expressions if possible to support a broader range of correlated subqueries and avoid adding expensive domain joins. ### Does this PR introduce _any_ user-facing change? Yes. It will allow Spark to execute more types of correlated subqueries. ### How was this patch tested? Unit test. Closes #38260 from allisonwang-db/spark-40800-inline-expr-subquery. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…rtifact to v3 ### What changes were proposed in this pull request? Upgrade actions/cache to v3 and actions/upload-artifact to v3 ### Why are the changes needed? - Since actions/cachev3: support from node 12 -> node 16 and cleanup `set-output` warning - Since actions/upload-artifactv3: support from node 12 -> node 16 and cleanup `set-output` warning ### Does this PR introduce _any_ user-facing change? No, dev only ### How was this patch tested? CI passed Closes #38353 from Yikun/SPARK-40881. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Yikun Jiang <[email protected]>
…on specified ### What changes were proposed in this pull request? Upgrade actions/setup-java to v3 with distribution specified ### Why are the changes needed? - The `distribution` is required after v2, now just keep `zulu` (same distribution with v1): https://github.com/actions/setup-java/releases/tag/v2.0.0 - https://github.com/actions/setup-java/releases/tag/v3.0.0: Upgrade node - https://github.com/actions/setup-java/releases/tag/v3.6.0: Cleanup set-output warning ### Does this PR introduce _any_ user-facing change? No,dev only ### How was this patch tested? CI passed Closes #38354 from Yikun/SPARK-40882. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Yikun Jiang <[email protected]>
…signmentPolicy ### What changes were proposed in this pull request? extract the check insertion field cast methold so that we can do validate patition value at PartitioningUtils.normalizePartitionSpec ### Why are the changes needed? Insertion follow the behavior of config `spark.sql.storeAssignmentPolicy`, which will fail if the value can not cast to target data type by default. Alter partition should also follow it. For example: ```SQL CREATE TABLE t (c int) USING PARQUET PARTITIONED BY(p int); -- This DDL should fail but worked: ALTER TABLE t ADD PARTITION(p='aaa'); -- FAILED which follows spark.sql.storeAssignmentPolicy INSERT INTO t PARTITION(p='aaa') SELECT 1 ``` ### Does this PR introduce _any_ user-facing change? yes, the added partition value will follow the behavior of `storeAssignmentPolicy`. To restore the previous behavior, set spark.sql.legacy.skipPartitionSpecTypeValidation = true; ### How was this patch tested? add test Closes #38257 from ulysses-you/verify-partition. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…on window_time ### What changes were proposed in this pull request? This PR fixes the incorrect available version for new function `window_time` to 3.4.0 which is upcoming release for master branch. ### Why are the changes needed? The version information is incorrect. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #38368 from HeartSaVioR/SPARK-40821-follow-up-minor-version-fix. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
### What changes are proposed in this pull request? This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow. The proposed metrics are: - data sent to Python workers - data returned from Python workers - number of output rows ### Why are the changes needed? This aims at improving monitoring and performance troubleshooting of Python UDFs. In particular it is intended as an aid to answer performance-related questions such as: why is the UDF slow?, how much work has been done so far?, etc. ### Does this PR introduce _any_ user-facing change? SQL metrics are made available in the WEB UI. See the following examples:  ### How was this patch tested? Manually tested + a Python unit test and a Scala unit test have been added. Example code used for testing: ``` from pyspark.sql.functions import col, pandas_udf import time pandas_udf("long") def test_pandas(col1): time.sleep(0.02) return col1 * col1 spark.udf.register("test_pandas", test_pandas) spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1") spark.sql("select max(test_pandas(col1)) from t1").collect() ``` This is used to test with more data pushed to the Python workers: ``` from pyspark.sql.functions import col, pandas_udf import time pandas_udf("long") def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17): time.sleep(0.02) return col1 spark.udf.register("test_pandas", test_pandas) spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1") spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect() ``` This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows: ``` import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df.mapInPandas(filter_func, schema=df.schema).show() ``` This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received): ``` from pyspark.sql.functions import udf udf def test_udf(col1, col2): return col1 * col1 spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect() ``` Closes #33559 from LucaCanali/pythonUDFKeySQLMetrics. Authored-by: Luca Canali <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…rSuite ### What changes were proposed in this pull request? his PR aims to replace 'intercept' with 'Check error classes' in TableIdentifierParserSuite. ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the modified test suite: ``` $ build/sbt "test:testOnly *TableIdentifierParserSuite" ``` Closes #38364 from panbingkun/SPARK-40891. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? add following missing APIs to references: - StorageLevel.MEMORY_AND_DISK_DESER - TaskContext.cpus - BarrierTaskContext.cpus ### Why are the changes needed? they were missing in Reference ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? manually check, for `BarrierTaskContext.cpus` ``` In [10]: from pyspark import BarrierTaskContext In [11]: rdd = spark.sparkContext.parallelize([1]) In [12]: rdd.barrier().mapPartitions(lambda _: [BarrierTaskContext.get().cpus()]).collect() Out[12]: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] ``` Closes #38373 from zhengruifeng/py_doc_missing. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…module-scala ### What changes were proposed in this pull request? Remove unnecessary guava exclusion from jackson-module-scala ### Why are the changes needed? The exclusion added in SPARK-6149, the recent versions of jackson-module-scala does not depend on gauva any more, so we can remove this exclusion. https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12/2.13.3 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Exsiting UT. Closes #37405 from pan3793/SPARK-39977. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Sean Owen <[email protected]>
… bash scripts This fixes two problems that affect development in a Windows shell environment, such as `cygwin` or `msys2`. ### The fixed build error Running `./build/sbt packageBin` from A Windows cygwin `bash` session fails. This occurs if `WSL` is installed, because `project\SparkBuild.scala` creates a `bash` process, but `WSL bash` is called, even though `cygwin bash` appears earlier in the `PATH`. In addition, file path arguments to bash contain backslashes. The fix is to insure that the correct `bash` is called, and that arguments passed to `bash` are passed with slashes rather than **slashes.** ### The build error message: ```bash ./build.sbt packageBin ``` <pre> [info] compiling 9 Java sources to C:\Users\philwalk\workspace\spark\common\sketch\target\scala-2.12\classes ... /bin/bash: C:Usersphilwalkworkspacesparkcore/../build/spark-build-info: No such file or directory [info] compiling 1 Scala source to C:\Users\philwalk\workspace\spark\tools\target\scala-2.12\classes ... [info] compiling 5 Scala sources to C:\Users\philwalk\workspace\spark\mllib-local\target\scala-2.12\classes ... [info] Compiling 5 protobuf files to C:\Users\philwalk\workspace\spark\connector\connect\target\scala-2.12\src_managed\main [error] stack trace is suppressed; run last core / Compile / managedResources for the full output [error] (core / Compile / managedResources) Nonzero exit value: 127 [error] Total time: 42 s, completed Oct 8, 2022, 4:49:12 PM sbt:spark-parent> sbt:spark-parent> last core /Compile /managedResources last core /Compile /managedResources [error] java.lang.RuntimeException: Nonzero exit value: 127 [error] at scala.sys.package$.error(package.scala:30) [error] at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138) [error] at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108) [error] at Core$.$anonfun$settings$4(SparkBuild.scala:604) [error] at scala.Function1.$anonfun$compose$1(Function1.scala:49) [error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62) [error] at sbt.std.Transform$$anon$4.work(Transform.scala:68) [error] at sbt.Execute.$anonfun$submit$2(Execute.scala:282) [error] at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23) [error] at sbt.Execute.work(Execute.scala:291) [error] at sbt.Execute.$anonfun$submit$1(Execute.scala:282) [error] at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265) [error] at sbt.CompletionService$$anon$2.call(CompletionService.scala:64) [error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [error] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [error] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [error] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [error] at java.base/java.lang.Thread.run(Thread.java:834) [error] (core / Compile / managedResources) Nonzero exit value: 127 </pre> ### bash scripts fail when run from `cygwin` or `msys2` The other problem fixed by the PR is to address problems preventing the `bash` scripts (`spark-shell`, `spark-submit`, etc.) from being used in Windows `SHELL` environments. The problem is that the bash version of `spark-class` fails in a Windows shell environment, the result of `launcher/src/main/java/org/apache/spark/launcher/Main.java` not following the convention expected by `spark-class`, and also appending CR to line endings. The resulting error message not helpful. There are two parts to this fix: 1. modify `Main.java` to treat a `SHELL` session on Windows as a `bash` session 2. remove the appended CR character when parsing the output produced by `Main.java` ### Does this PR introduce _any_ user-facing change? These changes should NOT affect anyone who is not trying build or run bash scripts from a Windows SHELL environment. ### How was this patch tested? Manual tests were performed to verify both changes. ### related JIRA issues The following 2 JIRA issue were created. Both are fixed by this PR. They are both linked to this PR. - Bug SPARK-40739 "sbt packageBin" fails in cygwin or other windows bash session - Bug SPARK-40738 spark-shell fails with "bad array" Closes #38228 from philwalk/windows-shell-env-fixes. Authored-by: Phil <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…stead of manually write MockMaker ### What changes were proposed in this pull request? This pr aims use `mockito-inline` instead of manually write `MockMaker` ### Why are the changes needed? `mockito-inline` is a more recommended [way](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#39) to use mockito to mocking final types, enums and final methods and `mllib` and `mllib-local` module is already using `mockito-inline`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test:run `build/sbt clean "sql/testOnly *QueryExecutionErrorsSuite"` with Java 8u352, 11.0.17 and 17.0.5, all 3 Java versions passed Closes #38372 from LuciferYang/SPARK-40391. Authored-by: yangjie01 <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request?
To be able to modify the incoming requests for the Spark Connect GRPC service, for example to be able to translate metadata from the HTTP/2 request to values in the proto message the GRPC service needs to be configured using an interceptor.
This patch adds two ways to configure interceptors for the GRPC service. First, we can now configure interceptors in the `SparkConnectInterceptorRegistry` by adding a value to the `interceptorChain` like in the example below:
```
object SparkConnectInterceptorRegistry {
// Contains the list of configured interceptors.
private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
interceptor[LoggingInterceptor](classOf[LoggingInterceptor])
)
// ...
}
```
The second way to configure interceptors is by configuring them using Spark configuration values at startup. Therefore a new config key has been added called: `spark.connect.grpc.interceptor.classes`. This config value contains a comma-separated list of classes that are added as interceptors to the system.
```
./bin/pyspark --conf spark.connect.grpc.interceptor.classes=com.my.important.LoggingInterceptor
```
During startup all of the interceptors are added in order to the `NettyServerBuilder`.
```
// Add all registered interceptors to the server builder.
SparkConnectInterceptorRegistry.chainInterceptors(sb)
```
### Why are the changes needed?
Provide a configurable and extensible way to configure interceptors.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit Tests
Closes #38320 from grundprinzip/SPARK-40857.
Lead-authored-by: Martin Grund <[email protected]>
Co-authored-by: Martin Grund <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…to error classes ### What changes were proposed in this pull request? This pr replaces TypeCheckFailure by DataTypeMismatch in type checks in the math expressions, includes: - hash.scala (HashExpression) - mathExpressions.scala (RoundBase) ### Why are the changes needed? Migration onto error classes unifies Spark SQL error messages. ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. ### How was this patch tested? - Add new UT - Update existed UT - Pass GA. Closes #38332 from panbingkun/SPARK-40750. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
… quick submission of drivers ### What changes were proposed in this pull request? ##### Quick submission of drivers in tests to mesos scheduler results in dropping drivers Queued drivers in `MesosClusterScheduler` are ordered based on `MesosDriverDescription` - and the ordering used checks for priority (if different), followed by comparison of submission time. For two driver submissions with same priority, if made in quick succession (such that submission time is same due to millisecond granularity of Date), this results in dropping the second `MesosDriverDescription` from `queuedDrivers` (since `driverOrdering` returns `0` when comparing the descriptions). This PR fixes the more immediate issue with tests. ### Why are the changes needed? Flakey tests, [see here](https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg) for an example. ### Does this PR introduce _any_ user-facing change? No. Fixing only tests for now - as mesos support is deprecated, not changing scheduler itself to address this. ### How was this patch tested? Fixes unit tests Closes #38378 from mridulm/fix_MesosClusterSchedulerSuite. Authored-by: Mridul <mridulatgmail.com> Signed-off-by: Dongjoon Hyun <[email protected]>
pull bot
pushed a commit
that referenced
this pull request
Jul 21, 2025
…ingBuilder` ### What changes were proposed in this pull request? This PR aims to improve `toString` by `JEP-280` instead of `ToStringBuilder`. In addition, `Scalastyle` and `Checkstyle` rules are added to prevent a future regression. ### Why are the changes needed? Since Java 9, `String Concatenation` has been handled better by default. | ID | DESCRIPTION | | - | - | | JEP-280 | [Indify String Concatenation](https://openjdk.org/jeps/280) | For example, this PR improves `OpenBlocks` like the following. Both Java source code and byte code are simplified a lot by utilizing JEP-280 properly. **CODE CHANGE** ```java - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("appId", appId) - .append("execId", execId) - .append("blockIds", Arrays.toString(blockIds)) - .toString(); + return "OpenBlocks[appId=" + appId + ",execId=" + execId + ",blockIds=" + + Arrays.toString(blockIds) + "]"; ``` **BEFORE** ``` public java.lang.String toString(); Code: 0: new #39 // class org/apache/commons/lang3/builder/ToStringBuilder 3: dup 4: aload_0 5: getstatic #41 // Field org/apache/commons/lang3/builder/ToStringStyle.SHORT_PREFIX_STYLE:Lorg/apache/commons/lang3/builder/ToStringStyle; 8: invokespecial #47 // Method org/apache/commons/lang3/builder/ToStringBuilder."<init>":(Ljava/lang/Object;Lorg/apache/commons/lang3/builder/ToStringStyle;)V 11: ldc #50 // String appId 13: aload_0 14: getfield #7 // Field appId:Ljava/lang/String; 17: invokevirtual #51 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder; 20: ldc #55 // String execId 22: aload_0 23: getfield #13 // Field execId:Ljava/lang/String; 26: invokevirtual #51 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder; 29: ldc #56 // String blockIds 31: aload_0 32: getfield #16 // Field blockIds:[Ljava/lang/String; 35: invokestatic #57 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String; 38: invokevirtual #51 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder; 41: invokevirtual #61 // Method org/apache/commons/lang3/builder/ToStringBuilder.toString:()Ljava/lang/String; 44: areturn ``` **AFTER** ``` public java.lang.String toString(); Code: 0: aload_0 1: getfield #7 // Field appId:Ljava/lang/String; 4: aload_0 5: getfield #13 // Field execId:Ljava/lang/String; 8: aload_0 9: getfield #16 // Field blockIds:[Ljava/lang/String; 12: invokestatic #39 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String; 15: invokedynamic #43, 0 // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String; 20: areturn ``` ### Does this PR introduce _any_ user-facing change? No. This is an `toString` implementation improvement. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51572 from dongjoon-hyun/SPARK-52880. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot]
Can you help keep this open source service alive? 💖 Please sponsor : )