diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md
index 1793bb9e2e..38d5c01c1e 100644
--- a/DEVELOPMENT.md
+++ b/DEVELOPMENT.md
@@ -60,6 +60,10 @@ make benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark
To run TPC-H or TPC-DS micro benchmarks, please follow the instructions
in the respective source code, e.g., `CometTPCHQueryBenchmark`.
+## Style
+
+You can fix Scala style issues using spotless by running `make format`.
+
## Debugging
Comet is a multi-language project with native code written in Rust and JVM code written in Java and Scala.
It is possible to debug both native and JVM code concurrently as described in the [DEBUGGING guide](DEBUGGING.md)
diff --git a/Makefile b/Makefile
index fe13fbd9b0..51e8e09d68 100644
--- a/Makefile
+++ b/Makefile
@@ -42,7 +42,11 @@ bench:
RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
format:
./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES)
- ./mvnw spotless:apply $(PROFILES)
+ ./mvnw spotless:apply $(PROFILES) -Pspark-3.5
+ ./mvnw spotless:apply $(PROFILES) -Pspark-3.4
+ ./mvnw spotless:apply $(PROFILES) -Pspark-3.3
+ ./mvnw spotless:apply $(PROFILES) -Pspark-3.2
+
core-amd64:
rustup target add x86_64-apple-darwin
diff --git a/pom.xml b/pom.xml
index d7cd0764ec..b7b2387d79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,7 @@ under the License.
common
+ shims
spark
spark-integration
@@ -46,8 +47,8 @@ under the License.
4.7.2
3.2.9
2.0.2
- 3.4.2
- 3.4
+ 3.5.1
+ 3.5
provided
3.19.6
1.13.1
@@ -516,6 +517,19 @@ under the License.
2.12.17
3.4
+ 3.4.2
+ 1.13.1
+ 11
+ ${java.version}
+ ${java.version}
+
+
+
+
+ spark-3.5
+
+ 2.12.17
+ 3.5
1.13.1
11
${java.version}
diff --git a/settings.xml b/settings.xml
new file mode 100644
index 0000000000..af394ae400
--- /dev/null
+++ b/settings.xml
@@ -0,0 +1,5 @@
+
+
+ spark-3.4
+
+
diff --git a/shims/pom.xml b/shims/pom.xml
new file mode 100644
index 0000000000..c859eefd12
--- /dev/null
+++ b/shims/pom.xml
@@ -0,0 +1,128 @@
+
+
+
+
+
+
+ 4.0.0
+
+ org.apache.comet
+ comet-parent-spark${spark.version.short}_${scala.binary.version}
+ 0.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ comet-spark-per-spark-shims
+ comet-spark-per-spark-shims
+ pom
+
+
+
+ false
+
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+ org.scala-lang
+ scala-library
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+
+ put-client-artifacts-in-a-property
+ pre-integration-test
+
+ build-classpath
+
+
+ true
+ ;
+ comet-artifacts
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+
+
+ check-jar-contents
+ integration-test
+
+ exec
+
+
+ bash
+ ${project.build.testOutputDirectory}
+ false
+
+ ${project.basedir}/../dev/ensure-jars-have-correct-contents.sh
+ ${comet-artifacts}
+
+
+
+
+
+
+
+
+
+
+ spark-3.2
+
+ pre-spark-3.5
+
+
+
+ spark-3.3
+
+ pre-spark-3.5
+
+
+
+ spark-3.4
+
+ pre-spark-3.5
+
+
+
+ spark-3.5
+
+ spark-3.5
+
+
+
+
diff --git a/shims/pre-spark-3.5/pom.xml b/shims/pre-spark-3.5/pom.xml
new file mode 100644
index 0000000000..3bd925bdf2
--- /dev/null
+++ b/shims/pre-spark-3.5/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+
+
+
+ 4.0.0
+
+ org.apache.comet
+ comet-spark-per-spark-shims
+ 0.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}
+ comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+ org.scala-lang
+ scala-library
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+
+ put-client-artifacts-in-a-property
+ pre-integration-test
+
+ build-classpath
+
+
+ true
+ ;
+ comet-artifacts
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+
+
+ check-jar-contents
+ integration-test
+
+ exec
+
+
+ bash
+ ${project.build.testOutputDirectory}
+ false
+
+ ${project.basedir}/../dev/ensure-jars-have-correct-contents.sh
+ ${comet-artifacts}
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+
diff --git a/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala b/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala
new file mode 100644
index 0000000000..523c4aa92a
--- /dev/null
+++ b/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.shims
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources._
+
+class PartitionShim {
+
+ def getPartitionedFile(
+ file: FileStatusWithMetadata,
+ partitionValues: InternalRow): PartitionedFile = {
+ PartitionedFileUtil.getPartitionedFile(file, f.getPath, partitionValues)
+ }
+
+ def splitFiles(
+ sparkSession: SparkSession,
+ file: FileStatusWithMetadata,
+ isSplitable: Boolean,
+ maxSplitBytes: Long,
+ partitionValues: InternalRow): Seq[PartitionedFile] = {
+ PartitionedFileUtil.splitFiles(
+ sparkSession = sparkSession,
+ file = file,
+ filePath = file.getPath,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partitionValues)
+ }
+}
diff --git a/shims/spark-3.5/pom.xml b/shims/spark-3.5/pom.xml
new file mode 100644
index 0000000000..3bd925bdf2
--- /dev/null
+++ b/shims/spark-3.5/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+
+
+
+ 4.0.0
+
+ org.apache.comet
+ comet-spark-per-spark-shims
+ 0.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}
+ comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+ org.scala-lang
+ scala-library
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+
+ put-client-artifacts-in-a-property
+ pre-integration-test
+
+ build-classpath
+
+
+ true
+ ;
+ comet-artifacts
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+
+
+ check-jar-contents
+ integration-test
+
+ exec
+
+
+ bash
+ ${project.build.testOutputDirectory}
+ false
+
+ ${project.basedir}/../dev/ensure-jars-have-correct-contents.sh
+ ${comet-artifacts}
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+
diff --git a/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala b/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala
new file mode 100644
index 0000000000..ed09bccfe0
--- /dev/null
+++ b/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.shims
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources._
+
+object PartitionShim {
+
+ def getPartitionedFile(
+ file: FileStatusWithMetadata,
+ partitionValues: InternalRow): PartitionedFile = {
+ PartitionedFileUtil.getPartitionedFile(file, partitionValues)
+ }
+
+ def splitFiles(
+ sparkSession: SparkSession,
+ file: FileStatusWithMetadata,
+ isSplitable: Boolean,
+ maxSplitBytes: Long,
+ partitionValues: InternalRow): Seq[PartitionedFile] = {
+ PartitionedFileUtil.splitFiles(
+ sparkSession = sparkSession,
+ file = file,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partitionValues)
+ }
+}
diff --git a/spark/pom.xml b/spark/pom.xml
index 7e54fde060..c5c0c948af 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -50,6 +50,17 @@ under the License.
+
+ org.apache.comet
+ comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.arrow
+ *
+
+
+
org.apache.spark
spark-sql_${scala.binary.version}
diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index ac871cf60b..7add7c7267 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -36,8 +36,8 @@ import org.apache.spark.sql.execution.datasources.RecordReaderIterator
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.internal._
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DateType, StructType, TimestampType}
import org.apache.spark.util.SerializableConfiguration
diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
index 693af125b9..7d10c6714a 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
@@ -36,8 +36,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.internal._
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index 5994dfb41e..b0eeb58751 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -38,7 +38,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, CaseInsensitiveMap, DateTimeUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros, RebaseSpec}
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.internal._
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.sources
import org.apache.spark.unsafe.types.UTF8String
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 4bf01f0f43..75e0ec3211 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.collection._
import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory}
-import org.apache.comet.shims.{ShimCometScanExec, ShimFileFormat}
+import org.apache.comet.shims.{PartitionShim, ShimCometScanExec, ShimFileFormat}
/**
* Comet physical scan node for DataSource V1. Most of the code here follow Spark's
@@ -267,7 +267,7 @@ case class CometScanExec(
selectedPartitions
.flatMap { p =>
p.files.map { f =>
- PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+ PartitionShim.getPartitionedFile(f, p.values)
}
}
.groupBy { f =>
@@ -354,10 +354,9 @@ case class CometScanExec(
// SPARK-39634: Allow file splitting in combination with row index generation once
// the fix for PARQUET-2161 is available.
!isNeededForSchema(requiredSchema)
- PartitionedFileUtil.splitFiles(
+ PartitionShim.splitFiles(
sparkSession = relation.sparkSession,
file = file,
- filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values)