From d2b3bc6374cde4f8d4ebeedd7612f51d18f13806 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Thu, 14 Dec 2017 18:50:23 +0100 Subject: [PATCH 1/4] [SPARK-22938] Assert that SQLConf.get is accessed only on the driver. --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4f77c54a7af57..cb902a24b7183 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -27,6 +27,7 @@ import scala.util.matching.Regex import org.apache.hadoop.fs.Path +import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit @@ -70,7 +71,10 @@ object SQLConf { * Default config. Only used when there is no active SparkSession for the thread. * See [[get]] for more information. */ - private val fallbackConf = new ThreadLocal[SQLConf] { + private lazy val fallbackConf = new ThreadLocal[SQLConf] { + // assert that we're only accessing it on the driver. + assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) + override def initialValue: SQLConf = new SQLConf } From d5ecab95004ceef5b6682252fd4dd5e6f3d516b6 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 2 Jan 2018 16:01:49 +0100 Subject: [PATCH 2/4] remarks --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cb902a24b7183..a21a497f478f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -33,6 +33,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator +import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -72,8 +73,10 @@ object SQLConf { * See [[get]] for more information. */ private lazy val fallbackConf = new ThreadLocal[SQLConf] { - // assert that we're only accessing it on the driver. - assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) + if (Utils.isTesting && SparkEnv.get != null) { + // assert that we're only accessing it on the driver. + assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) + } override def initialValue: SQLConf = new SQLConf } From 43ecd303325d5abaf26a7e2ff0fdf0af479eb353 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 2 Jan 2018 16:32:45 +0100 Subject: [PATCH 3/4] try putting it in SQLConf constructor --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a21a497f478f1..e30d056c59c85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -73,11 +73,6 @@ object SQLConf { * See [[get]] for more information. */ private lazy val fallbackConf = new ThreadLocal[SQLConf] { - if (Utils.isTesting && SparkEnv.get != null) { - // assert that we're only accessing it on the driver. - assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) - } - override def initialValue: SQLConf = new SQLConf } @@ -1094,6 +1089,11 @@ object SQLConf { class SQLConf extends Serializable with Logging { import SQLConf._ + if (Utils.isTesting && SparkEnv.get != null) { + // assert that we're only accessing it on the driver. + assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) + } + /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ @transient protected[spark] val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) From 1ecb766a42bafd481674df4c8c06a23977142d6e Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 2 Jan 2018 16:55:42 +0100 Subject: [PATCH 4/4] assert msg --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e30d056c59c85..80cdc61484c0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1091,7 +1091,8 @@ class SQLConf extends Serializable with Logging { if (Utils.isTesting && SparkEnv.get != null) { // assert that we're only accessing it on the driver. - assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) + assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER, + "SQLConf should only be created and accessed on the driver.") } /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */