Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Sep 16, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

HeartSaVioR and others added 3 commits September 16, 2022 05:31
…out in PySpark

### What changes were proposed in this pull request?

This PR introduces GroupStateImpl and GroupStateTimeout in PySpark, and updates Scala codebase to support convenient conversion between PySpark implementation and Scala implementation.

Co-authored with HyukjinKwon .

This is a breakdown PR of #37863.

### Why are the changes needed?

This change will be leveraged in SPARK-40434.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A. We will make sure test suites are constructed via E2E manner under SPARK-40431.

Closes #37889 from HeartSaVioR/SPARK-40432.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
…omputed files

### What changes were proposed in this pull request?

This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing.

### Why are the changes needed?

Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` .

https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47

```
org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
```

### Does this PR introduce _any_ user-facing change?

No, this will improve the recover rate.

### How was this patch tested?

Pass the CIs.

Closes #37903 from dongjoon-hyun/SPARK-40459.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR sets the upperbound for `pyzmq` as `<24.0.0` in our CI Python linter job. The new release seems having a problem (zeromq/pyzmq@2d3327d).

### Why are the changes needed?

To fix the linter build failure. See https://github.com/apache/spark/actions/runs/3063515551/jobs/4947782771

```
      /tmp/timer_created_0ftep6.c: In function ‘main’:
      /tmp/timer_created_0ftep6.c:2:5: warning: implicit declaration of function ‘timer_create’ [-Wimplicit-function-declaration]
          2 |     timer_create();
            |     ^~~~~~~~~~~~
      x86_64-linux-gnu-gcc -pthread tmp/timer_created_0ftep6.o -L/usr/lib/x86_64-linux-gnu -o a.out
      /usr/bin/ld: tmp/timer_created_0ftep6.o: in function `main':
      /tmp/timer_created_0ftep6.c:2: undefined reference to `timer_create'
      collect2: error: ld returned 1 exit status
      no timer_create, linking librt
      ************************************************
      building 'zmq.libzmq' extension
      creating build/temp.linux-x86_64-cpython-39/buildutils
      creating build/temp.linux-x86_64-cpython-39/bundled
      creating build/temp.linux-x86_64-cpython-39/bundled/zeromq
      creating build/temp.linux-x86_64-cpython-39/bundled/zeromq/src
      x86_64-linux-gnu-g++ -pthread -std=c++11 -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -fPIC -DZMQ_HAVE_CURVE=1 -DZMQ_USE_TWEETNACL=1 -DZMQ_USE_EPOLL=1 -DZMQ_IOTHREADS_USE_EPOLL=1 -DZMQ_POLL_BASED_ON_POLL=1 -Ibundled/zeromq/include -Ibundled -I/usr/include/python3.9 -c buildutils/initlibzmq.cpp -o build/temp.linux-x86_64-cpython-39/buildutils/initlibzmq.o
      buildutils/initlibzmq.cpp:10:10: fatal error: Python.h: No such file or directory
         10 | #include "Python.h"
            |          ^~~~~~~~~~
      compilation terminated.
      error: command '/usr/bin/x86_64-linux-gnu-g++' failed with exit code 1
      [end of output]

  note: This error originates from a subprocess, and is likely not a problem with pip.
  ERROR: Failed building wheel for pyzmq
ERROR: Could not build wheels for pyzmq, which is required to install pyproject.toml-based projects
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PRs should validate it.

Closes #37904 from HyukjinKwon/fix-linter.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@pull pull bot added the ⤵️ pull label Sep 16, 2022
@wangyum wangyum merged commit 254bd80 into wangyum:master Sep 16, 2022
pull bot pushed a commit that referenced this pull request Apr 22, 2023
…onnect

### What changes were proposed in this pull request?
Implement Arrow-optimized Python UDFs in Spark Connect.

Please see apache#39384 for motivation and  performance improvements of Arrow-optimized Python UDFs.

### Why are the changes needed?
Parity with vanilla PySpark.

### Does this PR introduce _any_ user-facing change?
Yes. In Spark Connect Python Client, users can:

1. Set `useArrow` parameter True to enable Arrow optimization for a specific Python UDF.

```sh
>>> df = spark.range(2)
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#18 AS <lambda>(id)#16]
+- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200
   +- *(1) Range (0, 2, step=1, splits=1)
```

2. Enable `spark.sql.execution.pythonUDF.arrow.enabled` Spark Conf to make all Python UDFs Arrow-optimized.

```sh
>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> df.select(udf(lambda x : x + 1)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#30 AS <lambda>(id)#28]
+- ArrowEvalPython [<lambda>(id#26L)#27], [pythonUDF0#30], 200
   +- *(1) Range (0, 2, step=1, splits=1)

```

### How was this patch tested?
Parity unit tests.

Closes apache#40725 from xinrong-meng/connect_arrow_py_udf.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
pull bot pushed a commit that referenced this pull request Jan 28, 2025
This is a trivial change to replace the loop index from `int` to `long`. Surprisingly, microbenchmark shows more than double performance uplift.

Analysis
--------
The hot loop of `arrayEquals` method is simplifed as below. Loop index `i` is defined as `int`, it's compared with `length`, which is a `long`, to determine if the loop should end.
```
public static boolean arrayEquals(
    Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
  ......
  int i = 0;
  while (i <= length - 8) {
    if (Platform.getLong(leftBase, leftOffset + i) !=
        Platform.getLong(rightBase, rightOffset + i)) {
          return false;
    }
    i += 8;
  }
  ......
}
```

Strictly speaking, there's a code bug here. If `length` is greater than 2^31 + 8, this loop will never end because `i` as a 32 bit integer is at most 2^31 - 1. But compiler must consider this behaviour as intentional and generate code strictly match the logic. It prevents compiler from generating optimal code.

Defining loop index `i` as `long` corrects this issue. Besides more accurate code logic, JIT is able to optimize this code much more aggressively. From microbenchmark, this trivial change improves performance significantly on both Arm and x86 platforms.

Benchmark
---------
Source code:
https://gist.github.com/cyb70289/258e261f388e22f47e4d961431786d1a

Result on Arm Neoverse N2:
```
Benchmark                             Mode  Cnt    Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  674.313 ± 0.213  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10  313.563 ± 2.338  ns/op
```

Result on Intel Cascake Lake:
```
Benchmark                             Mode  Cnt     Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  1130.695 ± 0.168  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10   461.979 ± 0.097  ns/op
```

Deep dive
---------
Dive deep to the machine code level, we can see why the big gap. Listed below are arm64 assembly generated by Openjdk-17 C2 compiler.

For `int i`, the machine code is similar to source code, no deep optimization. Safepoint polling is expensive in this short loop.
```
// jit c2 machine code snippet
  0x0000ffff81ba8904:   mov        w15, wzr              // int i = 0
  0x0000ffff81ba8908:   nop
  0x0000ffff81ba890c:   nop
loop:
  0x0000ffff81ba8910:   ldr        x10, [x13, w15, sxtw] // Platform.getLong(leftBase, leftOffset + i)
  0x0000ffff81ba8914:   ldr        x14, [x12, w15, sxtw] // Platform.getLong(rightBase, rightOffset + i)
  0x0000ffff81ba8918:   cmp        x10, x14
  0x0000ffff81ba891c:   b.ne       0x0000ffff81ba899c    // return false if not equal
  0x0000ffff81ba8920:   ldr        x14, [x28, #848]      // x14 -> safepoint
  0x0000ffff81ba8924:   add        w15, w15, #0x8        // i += 8
  0x0000ffff81ba8928:   ldr        wzr, [x14]            // safepoint polling
  0x0000ffff81ba892c:   sxtw       x10, w15              // extend i to long
  0x0000ffff81ba8930:   cmp        x10, x11
  0x0000ffff81ba8934:   b.le       0x0000ffff81ba8910    // if (i <= length - 8) goto loop
```

For `long i`, JIT is able to do much more aggressive optimization. E.g, below code snippet unrolls the loop by four.
```
// jit c2 machine code snippet
unrolled_loop:
  0x0000ffff91de6fe0:   sxtw       x10, w7
  0x0000ffff91de6fe4:   add        x23, x22, x10
  0x0000ffff91de6fe8:   add        x24, x21, x10
  0x0000ffff91de6fec:   ldr        x13, [x23]          // unroll-1
  0x0000ffff91de6ff0:   ldr        x14, [x24]
  0x0000ffff91de6ff4:   cmp        x13, x14
  0x0000ffff91de6ff8:   b.ne       0x0000ffff91de70a8
  0x0000ffff91de6ffc:   ldr        x13, [x23, #8]      // unroll-2
  0x0000ffff91de7000:   ldr        x14, [x24, #8]
  0x0000ffff91de7004:   cmp        x13, x14
  0x0000ffff91de7008:   b.ne       0x0000ffff91de70b4
  0x0000ffff91de700c:   ldr        x13, [x23, #16]     // unroll-3
  0x0000ffff91de7010:   ldr        x14, [x24, #16]
  0x0000ffff91de7014:   cmp        x13, x14
  0x0000ffff91de7018:   b.ne       0x0000ffff91de70a4
  0x0000ffff91de701c:   ldr        x13, [x23, #24]     // unroll-4
  0x0000ffff91de7020:   ldr        x14, [x24, #24]
  0x0000ffff91de7024:   cmp        x13, x14
  0x0000ffff91de7028:   b.ne       0x0000ffff91de70b0
  0x0000ffff91de702c:   add        w7, w7, #0x20
  0x0000ffff91de7030:   cmp        w7, w11
  0x0000ffff91de7034:   b.lt       0x0000ffff91de6fe0
```

### What changes were proposed in this pull request?

A trivial change to replace loop index `i` of method `arrayEquals` from `int` to `long`.

### Why are the changes needed?

To improve performance and fix a possible bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49568 from cyb70289/arrayEquals.

Authored-by: Yibo Cai <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
pull bot pushed a commit that referenced this pull request Jul 21, 2025
…ingBuilder`

### What changes were proposed in this pull request?

This PR aims to improve `toString` by `JEP-280` instead of `ToStringBuilder`. In addition, `Scalastyle` and `Checkstyle` rules are added to prevent a future regression.

### Why are the changes needed?

Since Java 9, `String Concatenation` has been handled better by default.

| ID | DESCRIPTION |
| - | - |
| JEP-280 | [Indify String Concatenation](https://openjdk.org/jeps/280) |

For example, this PR improves `OpenBlocks` like the following. Both Java source code and byte code are simplified a lot by utilizing JEP-280 properly.

**CODE CHANGE**
```java

- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-   .append("appId", appId)
-   .append("execId", execId)
-   .append("blockIds", Arrays.toString(blockIds))
-   .toString();
+ return "OpenBlocks[appId=" + appId + ",execId=" + execId + ",blockIds=" +
+     Arrays.toString(blockIds) + "]";
```

**BEFORE**
```
  public java.lang.String toString();
    Code:
       0: new           #39                 // class org/apache/commons/lang3/builder/ToStringBuilder
       3: dup
       4: aload_0
       5: getstatic     #41                 // Field org/apache/commons/lang3/builder/ToStringStyle.SHORT_PREFIX_STYLE:Lorg/apache/commons/lang3/builder/ToStringStyle;
       8: invokespecial #47                 // Method org/apache/commons/lang3/builder/ToStringBuilder."<init>":(Ljava/lang/Object;Lorg/apache/commons/lang3/builder/ToStringStyle;)V
      11: ldc           #50                 // String appId
      13: aload_0
      14: getfield      #7                  // Field appId:Ljava/lang/String;
      17: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      20: ldc           #55                 // String execId
      22: aload_0
      23: getfield      #13                 // Field execId:Ljava/lang/String;
      26: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      29: ldc           #56                 // String blockIds
      31: aload_0
      32: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      35: invokestatic  #57                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      38: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      41: invokevirtual #61                 // Method org/apache/commons/lang3/builder/ToStringBuilder.toString:()Ljava/lang/String;
      44: areturn
```

**AFTER**
```
  public java.lang.String toString();
    Code:
       0: aload_0
       1: getfield      #7                  // Field appId:Ljava/lang/String;
       4: aload_0
       5: getfield      #13                 // Field execId:Ljava/lang/String;
       8: aload_0
       9: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      12: invokestatic  #39                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      15: invokedynamic #43,  0             // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;
      20: areturn
```

### Does this PR introduce _any_ user-facing change?

No. This is an `toString` implementation improvement.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51572 from dongjoon-hyun/SPARK-52880.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants