From fb3ca68948f0cfc01e115d2ef224d4d1d5d84734 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 21 Apr 2021 22:24:38 -0700 Subject: [PATCH 1/2] [SPARK-35181][CORE] Use zstd for spark.io.compression.codec by default This PR aims to use `zstd` as `spark.io.compression.codec` instead of `lz4` in order to reduce the disk IOs and traffic during shuffle processing and worker decommission storage migration (between executors and to external storage). - Since SPARK-29434 and SPARK-29576, Apache Spark 3.0+ uses ZSTD MapOutputStatus Ser/Deser instead of `GZIP`. - Since SPARK-34503, Apache Spark 3.2 uses ZSTD for `spark.eventLog.compression.codec` by default. **BEFORE** **AFTER** --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- .../test/scala/org/apache/spark/io/CompressionCodecSuite.scala | 2 +- .../spark/sql/execution/CoalesceShufflePartitionsSuite.scala | 3 ++- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 ++ 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 613a66d2d5ac..b867c8083e17 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1717,7 +1717,7 @@ package object config { "the codec") .version("0.8.0") .stringConf - .createWithDefaultString("lz4") + .createWithDefaultString("zstd") private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE = ConfigBuilder("spark.io.compression.zstd.bufferSize") diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 18520ff96a59..e01e4a03f1d2 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -47,7 +47,7 @@ class CompressionCodecSuite extends SparkFunSuite { test("default compression codec") { val codec = CompressionCodec.createCodec(conf) - assert(codec.getClass === classOf[LZ4CompressionCodec]) + assert(codec.getClass === classOf[ZStdCompressionCodec]) testCodec(codec) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index e3688686a187..e4238a126f8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED +import org.apache.spark.internal.config.{IO_COMPRESSION_CODEC, IO_ENCRYPTION_ENABLED} import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ import org.apache.spark.sql.execution.adaptive._ @@ -66,6 +66,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl .setAppName("test") .set(UI_ENABLED, false) .set(IO_ENCRYPTION_ENABLED, enableIOEncryption) + .set(IO_COMPRESSION_CODEC.key, "lz4") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 16cc4a79e65f..606b112d7be0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -52,6 +52,8 @@ class AdaptiveQueryExecSuite import testImplicits._ + override protected def sparkConf = super.sparkConf.set("spark.io.compression.codec", "lz4") + setupTestData() private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, SparkPlan) = { From 70cb3362c947861df1670ba29c334cb579c40360 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 27 Jun 2021 10:30:15 -0700 Subject: [PATCH 2/2] Disable bufferPool --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b867c8083e17..bc6464baeb4c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1734,7 +1734,7 @@ package object config { .doc("If true, enable buffer pool of ZSTD JNI library.") .version("3.2.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) private[spark] val IO_COMPRESSION_ZSTD_LEVEL = ConfigBuilder("spark.io.compression.zstd.level")