Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Sep 22, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

HyukjinKwon and others added 11 commits September 22, 2022 11:44
…ple in element_at

### What changes were proposed in this pull request?

This PR is a followup of #37850 that removes non-ANSI compliant example in `element_at`.

### Why are the changes needed?

ANSI build fails to run the example.

https://github.com/apache/spark/actions/runs/3094607589/jobs/5008176959

```
    Caused by: org.apache.spark.SparkArrayIndexOutOfBoundsException: [INVALID_ARRAY_INDEX_IN_ELEMENT_AT] The index -4 is out of bounds. The array has 3 elements. Use `try_element_at` to tolerate accessing element at invalid index and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
    	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidElementAtIndexError(QueryExecutionErrors.scala:264)
    	...

/usr/local/pypy/pypy3.7/lib-python/3/runpy.py:125: RuntimeWarning: 'pyspark.sql.functions' found in sys.modules after import of package 'pyspark.sql', but prior to execution of 'pyspark.sql.functions'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
/__w/spark/spark/python/pyspark/context.py:310: FutureWarning: Python 3.7 support is deprecated in Spark 3.4.
  warnings.warn("Python 3.7 support is deprecated in Spark 3.4.", FutureWarning)
**********************************************************************
   1 of   6 in pyspark.sql.functions.element_at

```

### Does this PR introduce _any_ user-facing change?

No. The example added is not exposed to end users yet.

### How was this patch tested?
Manually tested with enabling the ANSI configuration (`spark.sql.ansi.enabled`)

Closes #37959 from HyukjinKwon/SPARK-40142-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Fixes encoding of classes that uses companion object constructors in the interpreted path. Without this change the that is added in this change would fail with
```
...
  Cause: java.lang.RuntimeException: Error while decoding: java.lang.RuntimeException: Couldn't find a valid constructor on interface org.apache.spark.sql.catalyst.ScroogeLikeExample
newInstance(interface org.apache.spark.sql.catalyst.ScroogeLikeExample)
  at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1199)
...
```

As far as I can tell this bug has existed since the initial implementation in SPARK-8288 #23062

The existing spec that tested this part of the code incorrectly provided an outerPointer which hid the bug from that test.

### Why are the changes needed?
Fixes a bug, the new spec in the ExpressionsEncoderSuite shows that this is in fact a bug.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes a bug.

### How was this patch tested?
New and existing specs in ExpressionEncoderSuite and ObjectExpressionsSuite.

Closes #37837 from eejbyfeldt/spark-40385.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…nning in parallel

### What changes were proposed in this pull request?

Currently, the defaultJoin method in BroadcastNestedLoopJoinExec collects notMatchedBroadcastRows firstly, then collects matchedStreamRows. The two steps could run in parallel instead of serial.

### Why are the changes needed?
Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
UT.

Closes #37930 from xingchaozh/SPARK-40487.

Authored-by: Xingchao, Zhang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
### What changes were proposed in this pull request?

This PR proposes to introduce the new API `applyInPandasWithState` in PySpark, which provides the functionality to perform arbitrary stateful processing in Structured Streaming.

This will be a pair API with applyInPandas - applyInPandas in PySpark covers the use case of flatMapGroups in Scala/Java API, applyInPandasWithState in PySpark covers the use case of flatMapGroupsWithState in Scala/Java API.

The signature of API follows:

```
# call this function after groupBy
def applyInPandasWithState(
    self,
    func: "PandasGroupedMapFunctionWithState",
    outputStructType: Union[StructType, str],
    stateStructType: Union[StructType, str],
    outputMode: str,
    timeoutConf: str,
) -> DataFrame
```

and the signature of user function follows:

```
def func(
    key: Tuple,
    pdf_iter: Iterator[pandas.DataFrame],
    state: GroupStateImpl
) -> Iterator[pandas.DataFrame]
```

(Please refer the code diff for function doc of new function.)

Major design choices which differ from existing APIs:

1. The new API is untyped, while flatMapGroupsWithState in typed API.

This is based on the nature of Python language - it's really duck typing and type definition is just a hint. We don't have the implementation of typed API for PySpark DataFrame.

This leads us to design the API to be untyped, meaning, all types for (input, state, output) should be Row-compatible. While we don't require end users to deal with `Row` directly, the model they will use for state and output must be convertible to Row with default encoder. If they want the python type for state which is not compatible with Row (e.g. custom class), they need to pickle and use BinaryType to store it.

This requires end users to specify the type of state and output via Spark SQL schema in the method.

Note that this helps to ensure compatibility for state data across Spark versions, as long as the encoders for 1) python type -> python Row and 2) python Row -> UnsafeRow are not changed. We won't change the underlying data layout for UnsafeRow, as it will break all of existing stateful query.

2. The new API will produce Pandas DataFrame to user function, while flatMapGroupsWithState produces iterator of rows.

We decided to follow the user experience applyInPandas provides for both consistency and performance (Arrow batching, vectorization, etc). This leads us to design the user function to leverage pandas DataFrame rather than iterator of rows. While this leads inconsistency of the UX from the Scala/Java API, we don't think this will come up as a problem since Pandas is considered as de-facto standard for Python data scientists.

3. The new API will produce iterator of Pandas DataFrame to user function and also require to return iterator of Pandas DataFrame to address scalability.

There is known limitation of applyInPandas, scalability. It basically requires data in a specific group to be fit into memory. During the design phase of new API, we decided to address the scalability rather than inheriting the limitation.

To address the scalability, we tweak the user function to receive an iterator (generator) of Pandas DataFrame instead of a single Pandas DataFrame, and also return an iterator (generator) of Pandas DataFrame. We think it does not hurt the UX too much, as for-each and yield would be enough to deal with the requirement of dealing with iterator.

Implementation perspective, we split the data in a specific group to multiple chunks, which each chunk is stored and sent as "an" Arrow RecordBatch, and then finally materialized to "a" pandas DataFrame. This way, as long as end users don't materialize lots of pandas DataFrames from the iterator at the same time, only one chunk will be materialized into memory which is scalable. Similar logic applies to the output of user function, hence scalable as well.

4. The new API also bin-packs the data with multiple groups into "an" Arrow RecordBatch.

Given the API is mainly used for streaming workload, it could be high likely that the volume of data in a specific group may not be huge enough to leverage the benefit of Arrow columnar batching, which would hurt the performance. To address this, we also do the opposite thing what we do for scalability, bin-pack. That said, an Arrow RecordBatch can contain data for multiple groups, as well as a part of data for specific group. This address both aspects of concerns together, scalability and performance.

Note that we are not implementing all of features Scala/Java API provide from the initial phase. e.g. Support for batch query and support for initial state will be left as TODO.

### Why are the changes needed?

PySpark users don't have a way to perform arbitrary stateful processing in Structured Streaming and being forced to use either Java or Scala which is unacceptable for users in many cases. This PR enables PySpark users to deal with it without moving to Java/Scala world.

### Does this PR introduce _any_ user-facing change?

Yes. We are exposing new public API in PySpark which performs arbitrary stateful processing.

### How was this patch tested?

N/A. We will make sure test suites are constructed via E2E manner under [SPARK-40431](https://issues.apache.org/jira/browse/SPARK-40431) - #37894

Closes #37893 from HeartSaVioR/SPARK-40434-on-top-of-SPARK-40433-SPARK-40432.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
 Add resampling to API references

### Why are the changes needed?
Docs for `resampling` is missing

### Does this PR introduce _any_ user-facing change?
yes, new docs

### How was this patch tested?
existing UT

Closes #37948 from zhengruifeng/ps_doc_resampling.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request?
Upgrade Apache Kafka from 3.2.1 to 3.2.3

[Release notes](https://downloads.apache.org/kafka/3.2.3/RELEASE_NOTES.html)

### Why are the changes needed?
[Memory Allocation with Excessive Size Value](https://security.snyk.io/vuln/SNYK-JAVA-ORGAPACHEKAFKA-3027430)
[CVE-2022-34917](https://www.cve.org/CVERecord?id=CVE-2022-34917)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

Closes #37958 from bjornjorgensen/kafka3.2.2.

Authored-by: Bjørn <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…in PySpark

### What changes were proposed in this pull request?

This PR adds the test suites for #37893, applyInPandasWithState. The new test suite mostly ports E2E test cases from existing [flatMapGroupsWithState](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala).

### Why are the changes needed?

Tests are missing in #37893 by intention to reduce the size of change, and this PR fills the gap.

### Does this PR introduce _any_ user-facing change?

No, test only.

### How was this patch tested?

New test suites.

Closes #37894 from HeartSaVioR/SPARK-40435-on-top-of-SPARK-40434-SPARK-40433-SPARK-40432.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request?
In the PR, I propose to migrate all parsing errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_`. The error message will not include the error classes, so, in this way we will preserve the existing behaviour.

### Why are the changes needed?
The migration on temporary error classes allows to gather statistics about errors and detect most popular error classes.

### Does this PR introduce _any_ user-facing change?
No. The error messages should be almost the same by default.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
$ build/sbt "test:testOnly *ExpressionParserSuite"
```

Closes #37916 from MaxGekk/legacy-error-temp.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?

This PR addresses the review comments from the last round of review from HyukjinKwon  in #37893.

### Why are the changes needed?

Better documentation and removing unnecessary code.

### Does this PR introduce _any_ user-facing change?

Slight documentation change.

### How was this patch tested?

N/A

Closes #37964 from HeartSaVioR/SPARK-40434-followup.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request?
Implement `ddof` in `Series.cov`, by switch to `SF.covar`

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes, `ddof` supported now

```
        >>> s1 = ps.Series([0.90010907, 0.13484424, 0.62036035])
        >>> s2 = ps.Series([0.12528585, 0.26962463, 0.51111198])
        >>> with ps.option_context("compute.ops_on_diff_frames", True):
        ...     s1.cov(s2)
        -0.016857...
        >>> with ps.option_context("compute.ops_on_diff_frames", True):
        ...     s1.cov(s2, ddof=2)
        -0.033715...
```

### How was this patch tested?
added UT

Closes #37953 from zhengruifeng/ps_ser_cov.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…o error classes

### What changes were proposed in this pull request?
In the PR, I propose to use error classes in the case of type check failure in JSON expressions.

### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages, and improves search-ability of errors.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
$ build/sbt "testOnly *.CsvFunctionsSuite"
```

Closes #37902 from MaxGekk/type-check-fail-json-csv.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
zhengruifeng and others added 2 commits September 22, 2022 21:04
### What changes were proposed in this pull request?
Remove `pyspark.pandas.ml`

### Why are the changes needed?
`pyspark.pandas.ml` is no longer needed, since we implemented correlations based on Spark SQL:

1. pearson corrleation implemented in #37845
2. spearman corrleation implemented #37874

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
updated suites

Closes #37968 from zhengruifeng/ps_del_ml.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Exceptions thrown when writing to datasources (e.g. `FileFormatWriter.write` and `WriteToDataSourceV2.writeWithV2`),  are wrapped with `SparkException("Job aborted.")` or `SparkException("Writing job aborted")`.

This provides little extra information, but generates a long stacktrace and make the debugging process more difficult.

This change removes the wrapping, and the unused error class `WRITING_JOB_ABORTED`.

### Why are the changes needed?
This is to simplify the stacktrace thrown when writing to datasource fails.

### Does this PR introduce _any_ user-facing change?
When exceptions are thrown in datasources writes, the wrapping with `SparkException`s will be removed and stacktraces will be simplified.

### How was this patch tested?
Existing tests.

Closes #37931 from bozhang2820/spark-40488.

Authored-by: Bo Zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@github-actions github-actions bot added the AVRO label Sep 22, 2022
wbo4958 and others added 2 commits September 22, 2022 20:59
### What changes were proposed in this pull request?

``` scala
val df = spark.range(0, 100, 1, 50).repartition(4)
val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
}.collect()
println(v.mkString(","))
```

The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.

The RoundRobin seems to ensure to distribute the records evenly *in the same partition*, and not guarantee it between partitions.

Below is the code to generate the key

``` scala
      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
        (row: InternalRow) =>
{         // The HashPartitioner will handle the `mod` by the number of partitions
         position += 1
         position
 }
```

In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.

See the output of Random
``` scala
scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
```

Similarly, the below Random code also outputs the same value,

``` scala
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
```

Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0

The calculation is also applied for other left partitions since the starting position is always 2 for this case.

So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.

This PR changes the starting position of RoundRobin. The default position calculated by `new Random(partitionId).nextInt(numPartitions)` may always be the same for different partitions, which means each partition will output the data into the same keys when shuffle writes, and some keys may not have any data in some special cases.

### Why are the changes needed?

The PR can fix the data skew issue for the special cases.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Will add some tests and watch CI pass

Closes #37855 from wbo4958/roundrobin-data-skew.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
implement a new expression `CollectTopK`, which uses [`Array`](https://github.com/apache/spark/blob/58e07e0f4cca1e3a6387a7e0c57faeb6c5ec9ef5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala#L71-L81) instead of `BoundedPriorityQueue` in ser/deser

### Why are the changes needed?
Reduce the shuffle size of ALS in prediction

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing testsuites

Closes #37918 from zhengruifeng/sql_collect_topk.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
@github-actions github-actions bot added the ML label Sep 22, 2022
LucaCanali and others added 5 commits September 22, 2022 10:09
…to arrow-based conversion

### What changes were proposed in this pull request?
This proposes to add support for ArrayType of nested StructType to arrow-based conversion.
This allows Pandas UDFs, mapInArrow UDFs, and toPandas to operate on columns of type Array of Struct, via arrow serialization.
It appears to me that pyarrow 2.0.0 allows to serialize array of struct (while pyarrow 1.0.0 throws an error for this type of data).

### Why are the changes needed?
This extends the usability of pandas_udf.

### Does this PR introduce _any_ user-facing change?
Pandas_udf and mapInArrow will be able to operate on data of type Array of Struct.
toPandas will be able to use arrow serialization for Array of Struct (when spark.sql.execution.arrow.pyspark.enabled=true)

### How was this patch tested?
A test has been added.

Closes #35391 from LucaCanali/supportPandasUDFArrayStruct.

Authored-by: Luca Canali <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
### What changes were proposed in this pull request?

The PR improves field name resolution in `CreateStruct` when using `struct()` with fields from `named_struct` or `map` and makes "index" notation consistent with "dot" notation. Here is an example:

```sql
select struct(a['x'], a['y']), struct(a.x, a.y) from (select named_struct('x', 1, 'y', 2) as a)
```

As you can observe, the first struct has "col1" and "col2" names while the second struct has "x" and "y" which correspond to the parent struct fields.

Before:
```scala
root
 |-- struct(a.x, a.y): struct (nullable = false)
 |    |-- col1: integer (nullable = false)
 |    |-- col2: integer (nullable = false)
 |-- struct(a.x, a.y): struct (nullable = false)
 |    |-- x: integer (nullable = false)
 |    |-- y: integer (nullable = false)

+----------------+----------------+
|struct(a.x, a.y)|struct(a.x, a.y)|
+----------------+----------------+
|{1, 2}          |{1, 2}          |
+----------------+----------------+
```

This PR makes those two examples consistent and both structs will have the following schema.

After:
```scala
root
 |-- struct(a.x, a.y): struct (nullable = false)
 |    |-- x: integer (nullable = false)
 |    |-- y: integer (nullable = false)
 |-- struct(a.x, a.y): struct (nullable = false)
 |    |-- x: integer (nullable = false)
 |    |-- y: integer (nullable = false)
```

### Why are the changes needed?

Makes the behaviour consistent between `struct(a.x)` and `struct(a['x'])`.

### Does this PR introduce _any_ user-facing change?

Yes, the column names returned by `struct()` function will be different after this patch.

### How was this patch tested?

I added unit tests to verify the fix.

Closes #37965 from sadikovi/SPARK-40527.

Authored-by: Ivan Sadikov <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
The pr aim to upgrad zstd-jni from 1.5.2-3 to 1.5.2-4

### Why are the changes needed?
1.5.2-3 VS 1.5.2-4
<img width="909" alt="image" src="https://user-images.githubusercontent.com/15246973/191741246-9a3b22dc-18ed-45f1-9d2e-fc1c8c1647c3.png">

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #37970 from panbingkun/upgrade_zstd_jni.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…ion slow

### What changes were proposed in this pull request?

This PR will run `scheduleShuffleMergeFinalize()` and send `finalizeShuffleMerge` RPCs in two threads, and stop all work after `PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT` regardless of sucess or failure.

Now we will only call `removeShufflePushMergerLocation` when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.

### Why are the changes needed?

DAGSchuedler will finalize each shuffle map stage in one `shuffle-merge-finalizer` thread,  and lock `clientPool.locks[clientIndex]` when creating connect to the ESS merger node, the other  `shuffle-merge-finalizer` threads (one stage per thread) will wait for `SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY`.
Although reducing `SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY` helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add UT

Closes #37533 from wankunde/SPARK-40096.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…` accept arbitary integers

### What changes were proposed in this pull request?
add a new `std` expression to support arbitary integeral `ddof`

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes, it accept `ddof` other than {0, 1}

before
```
In [4]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b'])

In [5]: df.std(ddof=2)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In [5], line 1
----> 1 df.std(ddof=2)

File ~/Dev/spark/python/pyspark/pandas/generic.py:1866, in Frame.std(self, axis, skipna, ddof, numeric_only)
   1803 def std(
   1804     self,
   1805     axis: Optional[Axis] = None,
   (...)
   1808     numeric_only: bool = None,
   1809 ) -> Union[Scalar, "Series"]:
   1810     """
   1811     Return sample standard deviation.
   1812
   (...)
   1864     0.816496580927726
   1865     """
-> 1866     assert ddof in (0, 1)
   1868     axis = validate_axis(axis)
   1870     if numeric_only is None and axis == 0:

AssertionError:
```

after:
```
In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b'])

In [4]: df.std(ddof=2)
Out[4]:
a    1.414214
b    0.141421
dtype: float64

In [5]: df.to_pandas().std(ddof=2)
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
Out[5]:
a    1.414214
b    0.141421
dtype: float64

```

### How was this patch tested?
added testsuites

Closes #37974 from zhengruifeng/ps_std_ddof.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…rom `log4j.properties` to `log4j2.properties`

### What changes were proposed in this pull request?
This pr just rename `logConfFile` in `BaseYarnClusterSuite` from `log4j.properties`  to `log4j2.properties`.

### Why are the changes needed?
Make the log configuration defined in `BaseYarnClusterSuite#LOG4J_CONF` be used during testing.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass Github Actions
- Manual test

For example, run

```
mvn clean test -pl resource-managers/yarn -DwildcardSuites=org.apache.spark.deploy.yarn.YarnClusterSuite -Phadoop-3 -Pyarn
```

The test data will in `resource-managers/yarn/target/test/data/org.apache.spark.deploy.yarn.YarnClusterSuite/yarn-xxxx` as follows:

```
nodeattributes  org.apache.spark.deploy.yarn.YarnClusterSuite-localDir-nm-0_0	org.apache.spark.deploy.yarn.YarnClusterSuite-logDir-nm-0_0
```

**Before**

All `log4j2.properties` content uploaded for UTs as follows:

```
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the file target/unit-tests.log
rootLogger.level = debug
rootLogger.appenderRef.file.ref = File

appender.file.type = File
appender.file.name = File
appender.file.fileName = target/unit-tests.log
appender.file.append = true
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex

# Silence verbose logs from 3rd-party libraries.
logger.netty.name = io.netty
logger.netty.level = info
```

This is not defined in `BaseYarnClusterSuite#LOG4J_CONF`, and both `stderr` and `stdout` in the log directory are empty files.

**After**

The `log4j2.properties` content uploaded for UTs as follows:

```
rootLogger.level = debug
rootLogger.appenderRef.stdout.ref = console
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
logger.jetty.name = org.sparkproject.jetty
logger.jetty.level = warn
logger.eclipse.name = org.eclipse.jetty
logger.eclipse.level = warn
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = warn
logger.mortbay.name = org.mortbay
logger.mortbay.level = warn
```

The content is consistent with that defined in `BaseYarnClusterSuite#LOG4J_CONF`, and there are Spark job logs in `stderr`

Closes #37971 from LuciferYang/logfilename.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@github-actions github-actions bot added the YARN label Sep 23, 2022
zhengruifeng and others added 6 commits September 23, 2022 14:54
…` accept arbitary integers

### What changes were proposed in this pull request?
add a new `var` expression to support arbitary integeral `ddof`

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes, it accept `ddof` other than {0, 1}

before
```
In [1]: import pyspark.pandas as ps

In [2]: import numpy as np

In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b'])

In [4]: df.var(ddof=2)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In [4], line 1
----> 1 df.var(ddof=2)

File ~/Dev/spark/python/pyspark/pandas/generic.py:1958, in Frame.var(self, axis, ddof, numeric_only)
   1904 def var(
   1905     self, axis: Optional[Axis] = None, ddof: int = 1, numeric_only: bool = None
   1906 ) -> Union[Scalar, "Series"]:
   1907     """
   1908     Return unbiased variance.
   1909
   (...)
   1956     0.6666666666666666
   1957     """
-> 1958     assert ddof in (0, 1)
   1960     axis = validate_axis(axis)
   1962     if numeric_only is None and axis == 0:

AssertionError:
```

after
```
In [4]: df.var(ddof=2)
Out[4]:
a    2.00
b    0.02
dtype: float64

In [5]: df.to_pandas().var(ddof=2)
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
Out[5]:
a    2.00
b    0.02
dtype: float64

```

### How was this patch tested?
added UT

Closes #37975 from zhengruifeng/ps_var_ddof.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…vior on columns with mixed dates and timestamps

### What changes were proposed in this pull request?

This PR corrects the behavior of how columns with mixed dates and timestamps are supported in CSV schema inference and data parsing.
- If user specifies timestamp format, this type of columns will always be inferred as `StringType`.
- If no timestamp format specified by user,  we will try inferring this type of columns as `TimestampType` if possible, otherwise inferred as `StringType`

Here are the semantics of the changes:
- In `CSVInferSchema`
  - Remove the attempts to infer field as `DateType` when `prefersDate=true` and `typeSoFar=TimestampType/TimestampNTZType`
  - Change the dataType merging behavior between `DateType` and `TimestampType/TimestampNTZType`:
    - If the `timestampFormat/timestampNTZFormat` is given, merge the two types into StringType
    - Otherwise
      - if the `dateFormat` could be parsed by the lenient timestamp formatter, merge the two types into `TimestampType/TimestampNTZType`
      - otherwise, merge the two types into `StringType`

- In `UnivocityParser`, remove the attempts to parse field as Date if it failed to be parsed as Timestamp.

As an additional change, this PR also turn the default value of `prefersDate` as `true`.

### Why are the changes needed?
Simplify CSV dateTime inference logic.

### Does this PR introduce _any_ user-facing change?
No compared to the previous PR.

### How was this patch tested?

Closes #37933 from xiaonanyang-db/SPARK-40474.

Authored-by: xiaonanyang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
When converting a native table metadata representation `CatalogTable` to `HiveTable` make sure bucket spec uses an existing column.

### Does this PR introduce _any_ user-facing change?
Hive metastore seems to be not case preserving with columns but case preserving with bucket spec, which means the following table creation:
```
  CREATE TABLE t(
    c STRING,
    B_C STRING
  )
  PARTITIONED BY (p_c STRING)
  CLUSTERED BY (B_C) INTO 4 BUCKETS
  STORED AS PARQUET
```
followed by a query:
```
  SELECT * FROM t
```
fails with:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]
```

### Why are the changes needed?
Bug fix.

### How was this patch tested?
Added new UT.

Closes #36027 from peter-toth/SPARK-38717-handle-upper-case-bucket-spec.

Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…not be created if the input rows is empty

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes #37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?

Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully.

### Why are the changes needed?

Fix a bug documented in SPARK-39200

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Wrote a unit test

Closes #37960 from ukby1234/SPARK-39200.

Authored-by: Frank Yin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR fix all dead links in sparkr-vignettes.Rmd.

### Why are the changes needed?

binary-release-hadoop3.log logs:
```
yumwangLM-SHC-16508156 output % tail -n 30 binary-release-hadoop3.log
* this is package ‘SparkR’ version ‘3.3.1’
* package encoding: UTF-8
* checking CRAN incoming feasibility ... NOTE
Maintainer: ‘The Apache Software Foundation <devspark.apache.org>’

New submission

Package was archived on CRAN

CRAN repository db overrides:
  X-CRAN-Comment: Archived on 2021-06-28 as issues were not corrected
    in time.

  Should use tools::R_user_dir().

Found the following (possibly) invalid URLs:
  URL: https://spark.apache.org/docs/latest/api/R/column_aggregate_functions.html
    From: inst/doc/sparkr-vignettes.html
    Status: 404
    Message: Not Found
  URL: https://spark.apache.org/docs/latest/api/R/read.df.html
    From: inst/doc/sparkr-vignettes.html
    Status: 404
    Message: Not Found
  URL: https://spark.apache.org/docs/latest/api/R/sparkR.session.html
    From: inst/doc/sparkr-vignettes.html
    Status: 404
    Message: Not Found
* checking package namespace information ... OK
* checking package dependencies ...%
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

manual test.

Closes #37983 from wangyum/fix-links.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@github-actions github-actions bot added the R label Sep 23, 2022
### What changes were proposed in this pull request?

This PR fixes any dead links in the documentation.

### Why are the changes needed?

Correct the document.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

manual test.

Closes #37981 from wangyum/SPARK-40322.

Lead-authored-by: Yuming Wang <[email protected]>
Co-authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@pull pull bot merged commit 8ccd41f into wangyum:master Sep 23, 2022
pull bot pushed a commit that referenced this pull request Jan 17, 2023
…elper#replaceNotIncludedMsg` to remove `@hashCode`

### What changes were proposed in this pull request?
The daily build GA `Master Scala 2.13 + Hadoop 3 + JDK 8` failed after apache#39468 merged, the failed tests includes:

- org.apache.spark.sql.SQLQueryTestSuite
- org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite

and the failure error message is similar to the following:

```
2023-01-11T01:03:46.3478323Z 01:03:46.347 ERROR org.apache.spark.sql.SQLQueryTestSuite: Error using configs:
2023-01-11T01:03:46.3677880Z [info]- explain.sql *** FAILED *** (348 milliseconds)
2023-01-11T01:03:46.3678710Z [info]  explain.sql
2023-01-11T01:03:46.3679479Z [info]  Expected "...es.CatalogFileIndex[7d811218, [key, val]
2023-01-11T01:03:46.3680509Z [info]  +- Project [key#x, val#x]
2023-01-11T01:03:46.3681033Z [info]     +- SubqueryAlias spark_catalog.default.explain_temp4
2023-01-11T01:03:46.3684259Z [info]        +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet
2023-01-11T01:03:46.3684922Z [info]
2023-01-11T01:03:46.3685766Z [info]  == Optimized Logical Plan ==
2023-01-11T01:03:46.3687590Z [info]  InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex7d811218, [key, val]
2023-01-11T01:03:46.3688465Z [info]  +- WriteFiles
2023-01-11T01:03:46.3690929Z [info]     +- Sort [val#x ASC NULLS FIRST], false
2023-01-11T01:03:46.3691387Z [info]        +- Project [key#x, empty2null(val#x) AS val#x]
2023-01-11T01:03:46.3692078Z [info]           +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet
2023-01-11T01:03:46.3692549Z [info]
2023-01-11T01:03:46.3693443Z [info]  == Physical Plan ==
2023-01-11T01:03:46.3695233Z [info]  Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex7d811218], [key, val]
2023-01-11T01:03:46.3696100Z [info]  +- Writ...", but got "...es.CatalogFileIndex[cdfa8472, [key, val]
2023-01-11T01:03:46.3698327Z [info]  +- Project [key#x, val#x]
2023-01-11T01:03:46.3698881Z [info]     +- SubqueryAlias spark_catalog.default.explain_temp4
2023-01-11T01:03:46.3699680Z [info]        +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet
2023-01-11T01:03:46.3704986Z [info]
2023-01-11T01:03:46.3705457Z [info]  == Optimized Logical Plan ==
2023-01-11T01:03:46.3717140Z [info]  InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndexcdfa8472, [key, val]
2023-01-11T01:03:46.3718309Z [info]  +- WriteFiles
2023-01-11T01:03:46.3718964Z [info]     +- Sort [val#x ASC NULLS FIRST], false
2023-01-11T01:03:46.3719752Z [info]        +- Project [key#x, empty2null(val#x) AS val#x]
2023-01-11T01:03:46.3723046Z [info]           +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet
2023-01-11T01:03:46.3723598Z [info]
2023-01-11T01:03:46.3726955Z [info]  == Physical Plan ==
2023-01-11T01:03:46.3728111Z [info]  Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndexcdfa8472], [key, val]
2023-01-11T01:03:46.3898445Z [info]  +- Writ..." Result did not match for query #21
2023-01-11T01:03:46.3902948Z [info]  EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 (SQLQueryTestSuite.scala:495)
2023-01-11T01:03:46.3903881Z [info]  org.scalatest.exceptions.TestFailedException:
2023-01-11T01:03:46.3904492Z [info]  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
2023-01-11T01:03:46.3905449Z [info]  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
2023-01-11T01:03:46.3906493Z [info]  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
2023-01-11T01:03:46.3907683Z [info]  at org.scalatest.Assertions.assertResult(Assertions.scala:847)
2023-01-11T01:03:46.3908243Z [info]  at org.scalatest.Assertions.assertResult$(Assertions.scala:842)
2023-01-11T01:03:46.3908812Z [info]  at org.scalatest.funsuite.AnyFunSuite.assertResult(AnyFunSuite.scala:1564)
2023-01-11T01:03:46.3910011Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runQueries$11(SQLQueryTestSuite.scala:495)
2023-01-11T01:03:46.3910611Z [info]  at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
2023-01-11T01:03:46.3911163Z [info]  at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
2023-01-11T01:03:46.3912094Z [info]  at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
2023-01-11T01:03:46.3912781Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runQueries$9(SQLQueryTestSuite.scala:486)
2023-01-11T01:03:46.3913371Z [info]  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
2023-01-11T01:03:46.3914237Z [info]  at org.scalatest.Assertions.withClue(Assertions.scala:1065)
2023-01-11T01:03:46.3915165Z [info]  at org.scalatest.Assertions.withClue$(Assertions.scala:1052)
2023-01-11T01:03:46.3915725Z [info]  at org.scalatest.funsuite.AnyFunSuite.withClue(AnyFunSuite.scala:1564)
2023-01-11T01:03:46.3916341Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.runQueries(SQLQueryTestSuite.scala:462)
2023-01-11T01:03:46.3917485Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runTest$35(SQLQueryTestSuite.scala:364)
2023-01-11T01:03:46.3918517Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runTest$35$adapted(SQLQueryTestSuite.scala:362)
2023-01-11T01:03:46.3919102Z [info]  at scala.collection.immutable.List.foreach(List.scala:333)
2023-01-11T01:03:46.3919675Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.runTest(SQLQueryTestSuite.scala:362)
2023-01-11T01:03:46.3921754Z [info]  at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$createScalaTestCase$6(SQLQueryTestSuite.scala:269)
2023-01-11T01:03:46.3922358Z [info]  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
2023-01-11T01:03:46.3923784Z [info]  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
2023-01-11T01:03:46.3924473Z [info]  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
2023-01-11T01:03:46.3925286Z [info]  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
2023-01-11T01:03:46.3926199Z [info]  at org.scalatest.Transformer.apply(Transformer.scala:22)
2023-01-11T01:03:46.3927071Z [info]  at org.scalatest.Transformer.apply(Transformer.scala:20)
2023-01-11T01:03:46.3928583Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
2023-01-11T01:03:46.3929225Z [info]  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:207)
2023-01-11T01:03:46.3930091Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
2023-01-11T01:03:46.3933329Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
2023-01-11T01:03:46.3933893Z [info]  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
2023-01-11T01:03:46.3934875Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
2023-01-11T01:03:46.3935479Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
2023-01-11T01:03:46.3936453Z [info]  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:66)
2023-01-11T01:03:46.3937318Z [info]  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
2023-01-11T01:03:46.3940707Z [info]  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
2023-01-11T01:03:46.3941350Z [info]  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:66)
2023-01-11T01:03:46.3941962Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
2023-01-11T01:03:46.3943332Z [info]  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
2023-01-11T01:03:46.3944504Z [info]  at scala.collection.immutable.List.foreach(List.scala:333)
2023-01-11T01:03:46.3950194Z [info]  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
2023-01-11T01:03:46.3950748Z [info]  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
2023-01-11T01:03:46.3951912Z [info]  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
2023-01-11T01:03:46.3952515Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
2023-01-11T01:03:46.3953476Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
2023-01-11T01:03:46.3954069Z [info]  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
2023-01-11T01:03:46.3966445Z [info]  at org.scalatest.Suite.run(Suite.scala:1114)
2023-01-11T01:03:46.3967583Z [info]  at org.scalatest.Suite.run$(Suite.scala:1096)
2023-01-11T01:03:46.3968377Z [info]  at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
2023-01-11T01:03:46.3969537Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
2023-01-11T01:03:46.3970510Z [info]  at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
2023-01-11T01:03:46.3971298Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
2023-01-11T01:03:46.3972182Z [info]  at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
2023-01-11T01:03:46.3973529Z [info]  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:66)
2023-01-11T01:03:46.3974433Z [info]  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
2023-01-11T01:03:46.3977778Z [info]  at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2023-01-11T01:03:46.3984781Z [info]  at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2023-01-11T01:03:46.3985521Z [info]  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:66)
2023-01-11T01:03:46.3986684Z [info]  at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
2023-01-11T01:03:46.3987264Z [info]  at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
2023-01-11T01:03:46.3987774Z [info]  at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
2023-01-11T01:03:46.3988269Z [info]  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2023-01-11T01:03:46.3989260Z [info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2023-01-11T01:03:46.3996895Z [info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2023-01-11T01:03:46.3997550Z [info]  at java.lang.Thread.run(Thread.java:750)
```

The reason for the failure is that the result of `CatalogFileIndex` printed when using Scala 2.12 is different from Scala 2.13:

When using Scala 2.12, the `hashCode` of `CatalogFileIndex` is `7d811218`

```
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex7d811218, [key, val]
```

When using Scala 2.13, the `hashCode` of `CatalogFileIndex` is `cdfa8472`

```
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndexcdfa8472, [key, val]

```

So this add a new `replaceAll` action to `SQLQueryTestHelper#replaceNotIncludedMsg` to remove `hashCode`
to make `CatalogFileIndex` print the same results when using Scala 2.12 and 2.13

### Why are the changes needed?
Make daily build GA `Master Scala 2.13 + Hadoop 3 + JDK 8` can run successfully

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GitHub Actions
- Manual test with this pr:

```
gh pr checkout 39598
dev/change-scala-version.sh 2.13
build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z explain.sql" -Pscala-2.13
build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z explain-aqe.sql" -Pscala-2.13
build/sbt -Phive-thriftserver "hive-thriftserver/testOnly org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite -- -z explain.sql" -Pscala-2.13
build/sbt -Phive-thriftserver "hive-thriftserver/testOnly org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite -- -z explain-aqe.sql" -Pscala-2.13
```

Closes apache#39598 from LuciferYang/SPARK-41708-FOLLOWUP.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
pull bot pushed a commit that referenced this pull request Nov 18, 2023
### What changes were proposed in this pull request?

This PR aims to add `Python 3.10` to Infra docker images.

### Why are the changes needed?

This is a preparation to add a daily `Python 3.10` GitHub Action job later for Apache Spark 4.0.0.

Note that Python 3.10 is installed at the last step to avoid the following issues which happens when we install Python 3.9 and 3.10 at the same stage by package manager.
```
#21 13.03 ERROR: Cannot uninstall 'blinker'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
#21 ERROR: process "/bin/sh -c python3.9 -m pip install numpy 'pyarrow>=14.0.0' 'pandas<=2.1.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'" did not complete successfully: exit code: 1
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1. I verified that the Python CI is not affected and still use Python 3.9.5 only.
```
========================================================================
Running PySpark tests
========================================================================
Running PySpark tests. Output is in /__w/spark/spark/python/unit-tests.log
Will test against the following Python executables: ['python3.9']
Will test the following Python modules: ['pyspark-errors']
python3.9 python_implementation is CPython
python3.9 version is: Python 3.9.5
Starting test(python3.9): pyspark.errors.tests.test_errors (temp output: /__w/spark/spark/python/target/fd967f24-3607-4aa6-8190-3f8d7de522e1/python3.9__pyspark.errors.tests.test_errors___zauwgy1.log)
Finished test(python3.9): pyspark.errors.tests.test_errors (0s)
Tests passed in 0 seconds
```

2. Pass `Base Image Build` step for new Python 3.10.

![Screenshot 2023-11-16 at 10 53 37 AM](https://github.com/apache/spark/assets/9700541/6bbb3461-c5f0-4d60-94f6-7cd8df0594ed)

3. Since new Python 3.10 is not used in CI, we need to validate like the following.

```
$ docker run -it --rm ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-6895105871 python3.10 --version
Python 3.10.13
```

```
$ docker run -it --rm ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-6895105871 python3.10 -m pip freeze
alembic==1.12.1
annotated-types==0.6.0
blinker==1.7.0
certifi==2019.11.28
chardet==3.0.4
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==2.2.1
contourpy==1.2.0
coverage==7.3.2
cycler==0.12.1
databricks-cli==0.18.0
dbus-python==1.2.16
deepspeed==0.12.3
distro-info==0.23+ubuntu1.1
docker==6.1.3
entrypoints==0.4
et-xmlfile==1.1.0
filelock==3.9.0
Flask==3.0.0
fonttools==4.44.3
gitdb==4.0.11
GitPython==3.1.40
googleapis-common-protos==1.56.4
greenlet==3.0.1
grpcio==1.56.2
grpcio-status==1.48.2
gunicorn==21.2.0
hjson==3.1.0
idna==2.8
importlib-metadata==6.8.0
itsdangerous==2.1.2
Jinja2==3.1.2
joblib==1.3.2
kiwisolver==1.4.5
lxml==4.9.3
Mako==1.3.0
Markdown==3.5.1
MarkupSafe==2.1.3
matplotlib==3.8.1
memory-profiler==0.60.0
mlflow==2.8.1
mpmath==1.3.0
networkx==3.0
ninja==1.11.1.1
numpy==1.26.2
oauthlib==3.2.2
openpyxl==3.1.2
packaging==23.2
pandas==2.1.3
Pillow==10.1.0
plotly==5.18.0
protobuf==3.20.3
psutil==5.9.6
py-cpuinfo==9.0.0
pyarrow==14.0.1
pydantic==2.5.1
pydantic_core==2.14.3
PyGObject==3.36.0
PyJWT==2.8.0
pynvml==11.5.0
pyparsing==3.1.1
python-apt==2.0.1+ubuntu0.20.4.1
python-dateutil==2.8.2
pytz==2023.3.post1
PyYAML==6.0.1
querystring-parser==1.2.4
requests==2.31.0
requests-unixsocket==0.2.0
scikit-learn==1.1.3
scipy==1.11.3
six==1.14.0
smmap==5.0.1
SQLAlchemy==2.0.23
sqlparse==0.4.4
sympy==1.12
tabulate==0.9.0
tenacity==8.2.3
threadpoolctl==3.2.0
torch==2.0.1+cpu
torcheval==0.0.7
torchvision==0.15.2+cpu
tqdm==4.66.1
typing_extensions==4.8.0
tzdata==2023.3
unattended-upgrades==0.1
unittest-xml-reporting==3.2.0
urllib3==2.1.0
websocket-client==1.6.4
Werkzeug==3.0.1
zipp==3.17.0
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43840 from dongjoon-hyun/SPARK-45953.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
pull bot pushed a commit that referenced this pull request Jul 3, 2025
…pressions in `buildAggExprList`

### What changes were proposed in this pull request?

Trim aliases before matching Sort/Having/Filter expressions with semantically equal expression from the Aggregate below in `buildAggExprList`

### Why are the changes needed?
For a query like:
```
SELECT course, year, GROUPING(course) FROM courseSales GROUP BY CUBE(course, year) ORDER BY GROUPING(course)
```

Plan after `ResolveReferences` and before `ResolveAggregateFunctions` looks like:

```
!Sort [cast((shiftright(tempresolvedcolumn(spark_grouping_id#18L, spark_grouping_id, false), 1) & 1) as tinyint) AS grouping(course)#22 ASC NULLS FIRST], true
 +- Aggregate [course#19, year#20, spark_grouping_id#18L], [course#19, year#20, cast((shiftright(spark_grouping_id#18L, 1) & 1) as tinyint) AS grouping(course)#21 AS grouping(course)#15]
....
```
Because aggregate list has `Alias(Alias(cast((shiftright(spark_grouping_id#18L, 1) & 1) as tinyint))` expression from `SortOrder` won't get matched as semantically equal and it will result in adding an unnecessary `Project`. By stripping inner aliases from aggregate list (that are going to get removed anyways in `CleanupAliases`) we can match `SortOrder` expression and resolve it as `grouping(course)#15`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51339 from mihailotim-db/mihailotim-db/fix_inner_aliases_semi_structured.

Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.