Skip to content

Commit e477a8e

Browse files
committed
[SPARK-25143][SQL] Support data source name mapping configuration
1 parent 162326c commit e477a8e

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,13 @@ object DataSource extends Logging {
609609

610610
/** Given a provider name, look up the data source class definition. */
611611
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
612-
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
612+
val customBackwardCompatibilityMap =
613+
conf.getAllConfs
614+
.filter(_._1.startsWith("spark.sql.datasource.map"))
615+
.map{ case (k, v) => (k.replaceFirst("^spark.sql.datasource.map.", ""), v) }
616+
val compatibilityMap = backwardCompatibilityMap ++ customBackwardCompatibilityMap
617+
618+
val provider1 = compatibilityMap.getOrElse(provider, provider) match {
613619
case name if name.equalsIgnoreCase("orc") &&
614620
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
615621
classOf[OrcFileFormat].getCanonicalName

sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,28 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
8282
}
8383
assert(error.getMessage.contains("Failed to find data source: asfdwefasdfasdf."))
8484
}
85+
86+
test("support custom mapping for data source names") {
87+
val csv = classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]
88+
89+
// Map a new data source name to a built-in data source
90+
withSQLConf("spark.sql.datasource.map.myDatasource" -> csv.getCanonicalName) {
91+
assert(getProvidingClass("myDatasource") === csv)
92+
}
93+
94+
// Map a existing built-in data source name to new data source
95+
val testDataSource = classOf[TestDataSource]
96+
withSQLConf(
97+
"spark.sql.datasource.map.org.apache.spark.sql.avro" -> testDataSource.getCanonicalName,
98+
"spark.sql.datasource.map.com.databricks.spark.csv" -> testDataSource.getCanonicalName,
99+
"spark.sql.datasource.map.com.databricks.spark.avro" -> testDataSource.getCanonicalName) {
100+
assert(getProvidingClass("org.apache.spark.sql.avro") === testDataSource)
101+
assert(getProvidingClass("com.databricks.spark.csv") === testDataSource)
102+
assert(getProvidingClass("com.databricks.spark.avro") === testDataSource)
103+
}
104+
}
105+
}
106+
107+
class TestDataSource extends DataSourceRegister {
108+
override def shortName(): String = "test"
85109
}

0 commit comments

Comments
 (0)