Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import scala.collection.immutable

private[spark] object CatalystConf{
val CASE_SENSITIVE = "spark.sql.caseSensitive"
}

private[spark] trait CatalystConf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the catalyst conf can be as simple as a set of abstract methods that need to be implemented by some concrete conf.

trait CatalystConf {
  def caseSensitiveAnalysis: Boolean
}

We can have a trivial one for testing: case class SimpleConf(caseSensitiveAnalysis: Boolean) extends CatalystConf and SQLConf can mix this trait in as you are already doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, most interfaces in catalyst are not private since this package is not part of our standard compatibility guarantees.

def setConf(key: String, value: String) : Unit
def getConf(key: String) : String
def getConf(key: String, defaultValue: String) : String
def getAllConfs: immutable.Map[String, String]
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
def setConf(key: String, value: String) : Unit = {
throw new UnsupportedOperationException
}

def getConf(key: String) : String = {
throw new UnsupportedOperationException
}

def getConf(key: String, defaultValue: String) : String = {
throw new UnsupportedOperationException
}

def getAllConfs: immutable.Map[String, String] = {
throw new UnsupportedOperationException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.apache.spark.sql.types._

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyser needs only to resolve attribute
* references.
*/
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleConf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand All @@ -39,11 +40,17 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true
*/
class Analyzer(catalog: Catalog,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing: this indentation is wrong.

registry: FunctionRegistry,
caseSensitive: Boolean,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution
def resolver: Resolver = {
if (conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}

val fixedPoint = FixedPoint(maxIterations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf

/**
* Thrown by a catalog when a table cannot be found. The analzyer will rethrow the exception
Expand All @@ -32,7 +34,7 @@ class NoSuchTableException extends Exception
*/
trait Catalog {

def caseSensitive: Boolean
val conf: CatalystConf

def tableExists(tableIdentifier: Seq[String]): Boolean

Expand All @@ -55,7 +57,7 @@ trait Catalog {
def unregisterAllTables(): Unit

protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
tableIdentifier.map(_.toLowerCase)
} else {
tableIdentifier
Expand All @@ -76,7 +78,7 @@ trait Catalog {
}
}

class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

override def registerTable(
Expand Down Expand Up @@ -162,7 +164,7 @@ trait OverrideCatalog extends Catalog {
}

abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
val dbName = if (!caseSensitive) {
val dbName = if (!conf.getConf(CatalystConf.CASE_SENSITIVE).toBoolean) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {
databaseName
Expand Down Expand Up @@ -205,7 +207,7 @@ trait OverrideCatalog extends Catalog {
*/
object EmptyCatalog extends Catalog {

val caseSensitive: Boolean = true
override val conf: CatalystConf = EmptyConf

def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.test

import org.apache.spark.sql.catalyst.CatalystConf

import scala.collection.immutable
import scala.collection.mutable

/** A CatalystConf that can be used for local testing. */
class SimpleConf extends CatalystConf{
val map = mutable.Map[String, String]()

def setConf(key: String, value: String) : Unit = {
map.put(key, value)
}
def getConf(key: String) : String ={
map.get(key).get
}
def getConf(key: String, defaultValue: String) : String = {
map.getOrElse(key, defaultValue)
}
def getAllConfs: immutable.Map[String, String] = {
map.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveConf = new SimpleConf()
caseSensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "true")
val caseInsensitiveConf = new SimpleConf()
caseInsensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "false")
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
val caseSensitiveAnalyze =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf)
val caseInsensitiveAnalyze =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf)

val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val testRelation2 = LocalRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.scalatest.{BeforeAndAfter, FunSuite}

class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val catalog = new SimpleCatalog(false)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false)
val conf = new SimpleConf
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
Expand Down
6 changes: 4 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.CatalystConf

import scala.collection.immutable
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -69,7 +71,8 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] class SQLConf extends Serializable {

private[sql] class SQLConf extends Serializable with CatalystConf {
import SQLConf._

/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
Expand Down Expand Up @@ -220,4 +223,3 @@ private[sql] class SQLConf extends Serializable {
settings.clear()
}
}

4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs

@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true)

@transient
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = true) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
ExtractPythonUdfs ::
sources.PreWriteCheck(catalog) ::
Expand Down
11 changes: 10 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}

Expand Down Expand Up @@ -1049,4 +1049,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
rdd.toDF().registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}

test("SPARK-4699 case sensitivity SQL query") {
setConf(CatalystConf.CASE_SENSITIVE, "false")
val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("testTable1")
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
setConf(CatalystConf.CASE_SENSITIVE, "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,13 @@
package org.apache.spark.sql.sources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfter

abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
// Case sensitivity is not configurable yet, but we want to test some edge cases.
// TODO: Remove when it is configurable
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) {
@transient
override protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
PreWriteCheck(catalog) ::
PreInsertCastAndRename ::
Nil
}
}
}
// We want to test some edge cases.
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext)

caseInsensisitiveContext.setConf(CatalystConf.CASE_SENSITIVE, "false")
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.types._

/**
Expand All @@ -53,6 +54,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
}

/* By default it should be case insensitive to match Hive */
conf.setConf(CatalystConf.CASE_SENSITIVE, "false")

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
Expand Down Expand Up @@ -249,7 +253,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
override protected[sql] lazy val catalog =
new HiveMetastoreCatalog(this, conf) with OverrideCatalog

// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
Expand All @@ -261,7 +266,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.util.Utils

/* Implicit conversions */
import scala.collection.JavaConversions._

private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
private[hive] class HiveMetastoreCatalog(hive: HiveContext, val conf: CatalystConf)
extends Catalog with Logging {
import org.apache.spark.sql.hive.HiveMetastoreTypes._

/** Connection to hive metastore. Usages should lock on `this`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ class SQLQuerySuite extends QueryTest {
}
}

test("SPARK-4699 HiveContext should be case insensitive by default") {
checkAnswer(
sql("SELECT KEY FROM Src ORDER BY value"),
sql("SELECT key FROM src ORDER BY value").collect().toSeq)
}

test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") {
val schema = StructType(
StructField("s",
Expand Down