diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 1793bb9e2e..38d5c01c1e 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -60,6 +60,10 @@ make benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark To run TPC-H or TPC-DS micro benchmarks, please follow the instructions in the respective source code, e.g., `CometTPCHQueryBenchmark`. +## Style + +You can fix Scala style issues using spotless by running `make format`. + ## Debugging Comet is a multi-language project with native code written in Rust and JVM code written in Java and Scala. It is possible to debug both native and JVM code concurrently as described in the [DEBUGGING guide](DEBUGGING.md) diff --git a/Makefile b/Makefile index fe13fbd9b0..51e8e09d68 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,11 @@ bench: RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS)) format: ./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES) - ./mvnw spotless:apply $(PROFILES) + ./mvnw spotless:apply $(PROFILES) -Pspark-3.5 + ./mvnw spotless:apply $(PROFILES) -Pspark-3.4 + ./mvnw spotless:apply $(PROFILES) -Pspark-3.3 + ./mvnw spotless:apply $(PROFILES) -Pspark-3.2 + core-amd64: rustup target add x86_64-apple-darwin diff --git a/pom.xml b/pom.xml index d7cd0764ec..b7b2387d79 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ under the License. common + shims spark spark-integration @@ -46,8 +47,8 @@ under the License. 4.7.2 3.2.9 2.0.2 - 3.4.2 - 3.4 + 3.5.1 + 3.5 provided 3.19.6 1.13.1 @@ -516,6 +517,19 @@ under the License. 2.12.17 3.4 + 3.4.2 + 1.13.1 + 11 + ${java.version} + ${java.version} + + + + + spark-3.5 + + 2.12.17 + 3.5 1.13.1 11 ${java.version} diff --git a/settings.xml b/settings.xml new file mode 100644 index 0000000000..af394ae400 --- /dev/null +++ b/settings.xml @@ -0,0 +1,5 @@ + + + spark-3.4 + + diff --git a/shims/pom.xml b/shims/pom.xml new file mode 100644 index 0000000000..c859eefd12 --- /dev/null +++ b/shims/pom.xml @@ -0,0 +1,128 @@ + + + + + + + 4.0.0 + + org.apache.comet + comet-parent-spark${spark.version.short}_${scala.binary.version} + 0.1.0-SNAPSHOT + ../pom.xml + + + comet-spark-per-spark-shims + comet-spark-per-spark-shims + pom + + + + false + + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.scala-lang + scala-library + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + put-client-artifacts-in-a-property + pre-integration-test + + build-classpath + + + true + ; + comet-artifacts + + + + + + org.codehaus.mojo + exec-maven-plugin + + + check-jar-contents + integration-test + + exec + + + bash + ${project.build.testOutputDirectory} + false + + ${project.basedir}/../dev/ensure-jars-have-correct-contents.sh + ${comet-artifacts} + + + + + + + + + + + spark-3.2 + + pre-spark-3.5 + + + + spark-3.3 + + pre-spark-3.5 + + + + spark-3.4 + + pre-spark-3.5 + + + + spark-3.5 + + spark-3.5 + + + + diff --git a/shims/pre-spark-3.5/pom.xml b/shims/pre-spark-3.5/pom.xml new file mode 100644 index 0000000000..3bd925bdf2 --- /dev/null +++ b/shims/pre-spark-3.5/pom.xml @@ -0,0 +1,99 @@ + + + + + + + 4.0.0 + + org.apache.comet + comet-spark-per-spark-shims + 0.1.0-SNAPSHOT + ../pom.xml + + + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.scala-lang + scala-library + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + put-client-artifacts-in-a-property + pre-integration-test + + build-classpath + + + true + ; + comet-artifacts + + + + + + org.codehaus.mojo + exec-maven-plugin + + + check-jar-contents + integration-test + + exec + + + bash + ${project.build.testOutputDirectory} + false + + ${project.basedir}/../dev/ensure-jars-have-correct-contents.sh + ${comet-artifacts} + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + diff --git a/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala b/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala new file mode 100644 index 0000000000..523c4aa92a --- /dev/null +++ b/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ + +class PartitionShim { + + def getPartitionedFile( + file: FileStatusWithMetadata, + partitionValues: InternalRow): PartitionedFile = { + PartitionedFileUtil.getPartitionedFile(file, f.getPath, partitionValues) + } + + def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles( + sparkSession = sparkSession, + file = file, + filePath = file.getPath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partitionValues) + } +} diff --git a/shims/spark-3.5/pom.xml b/shims/spark-3.5/pom.xml new file mode 100644 index 0000000000..3bd925bdf2 --- /dev/null +++ b/shims/spark-3.5/pom.xml @@ -0,0 +1,99 @@ + + + + + + + 4.0.0 + + org.apache.comet + comet-spark-per-spark-shims + 0.1.0-SNAPSHOT + ../pom.xml + + + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.scala-lang + scala-library + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + put-client-artifacts-in-a-property + pre-integration-test + + build-classpath + + + true + ; + comet-artifacts + + + + + + org.codehaus.mojo + exec-maven-plugin + + + check-jar-contents + integration-test + + exec + + + bash + ${project.build.testOutputDirectory} + false + + ${project.basedir}/../dev/ensure-jars-have-correct-contents.sh + ${comet-artifacts} + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + diff --git a/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala b/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala new file mode 100644 index 0000000000..ed09bccfe0 --- /dev/null +++ b/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ + +object PartitionShim { + + def getPartitionedFile( + file: FileStatusWithMetadata, + partitionValues: InternalRow): PartitionedFile = { + PartitionedFileUtil.getPartitionedFile(file, partitionValues) + } + + def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles( + sparkSession = sparkSession, + file = file, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partitionValues) + } +} diff --git a/spark/pom.xml b/spark/pom.xml index 7e54fde060..c5c0c948af 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -50,6 +50,17 @@ under the License. + + org.apache.comet + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + ${project.version} + + + org.apache.arrow + * + + + org.apache.spark spark-sql_${scala.binary.version} diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index ac871cf60b..7add7c7267 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -36,8 +36,8 @@ import org.apache.spark.sql.execution.datasources.RecordReaderIterator import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal._ +import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{DateType, StructType, TimestampType} import org.apache.spark.util.SerializableConfiguration diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala index 693af125b9..7d10c6714a 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala @@ -36,8 +36,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal._ +import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala index 5994dfb41e..b0eeb58751 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala @@ -38,7 +38,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, CaseInsensitiveMap, DateTimeUtils, IntervalUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros, RebaseSpec} -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal._ +import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.sources import org.apache.spark.unsafe.types.UTF8String diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 4bf01f0f43..75e0ec3211 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.collection._ import org.apache.comet.{CometConf, MetricsSupport} import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory} -import org.apache.comet.shims.{ShimCometScanExec, ShimFileFormat} +import org.apache.comet.shims.{PartitionShim, ShimCometScanExec, ShimFileFormat} /** * Comet physical scan node for DataSource V1. Most of the code here follow Spark's @@ -267,7 +267,7 @@ case class CometScanExec( selectedPartitions .flatMap { p => p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + PartitionShim.getPartitionedFile(f, p.values) } } .groupBy { f => @@ -354,10 +354,9 @@ case class CometScanExec( // SPARK-39634: Allow file splitting in combination with row index generation once // the fix for PARQUET-2161 is available. !isNeededForSchema(requiredSchema) - PartitionedFileUtil.splitFiles( + PartitionShim.splitFiles( sparkSession = relation.sparkSession, file = file, - filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partition.values)