Skip to content

Commit 8e85393

Browse files
xiaonanyang-dbcloud-fan
authored andcommitted
[SPARK-40667][SQL] Refactor File Data Source Options
### What changes were proposed in this pull request? Code refactor on all File data source options: - `TextOptions` - `CSVOptions` - `JSONOptions` - `AvroOptions` - `ParquetOptions` - `OrcOptions` - `FileIndex` related options Change semantics: - First, we introduce a new trait `DataSourceOptions`, which defines the following functions: - `newOption(name)`: Register a new option - `newOption(name, alternative)`: Register a new option with alternative - `getAllValidOptions`: retrieve all valid options - `isValidOption(name)`: validate a given option name - `getAlternativeOption(name)`: get alternative option name if any - Then, for each class above - Create/update its companion object to extend from the trait above and register all valid options within it. - Update places where name strings are used directly to fetch option values to use those option constants instead. - Add a unit test for each file data source options ### Why are the changes needed? Currently for each file data source, all options are placed sparsely in the options class and there is no clear list of all options supported. As more and more options are added, the readability get worse. Thus, we want to refactor those codes so that - we can easily get a list of supported options for each data source - enforce better practice for adding new options going forwards. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes apache#38113 from xiaonanyang-db/SPARK-40667. Authored-by: xiaonanyang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 9ddd734 commit 8e85393

File tree

19 files changed

+461
-138
lines changed

19 files changed

+461
-138
lines changed

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.SparkSession
28-
import org.apache.spark.sql.catalyst.FileSourceOptions
28+
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
2929
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
3030
import org.apache.spark.sql.internal.SQLConf
3131

@@ -37,6 +37,8 @@ private[sql] class AvroOptions(
3737
@transient val conf: Configuration)
3838
extends FileSourceOptions(parameters) with Logging {
3939

40+
import AvroOptions._
41+
4042
def this(parameters: Map[String, String], conf: Configuration) = {
4143
this(CaseInsensitiveMap(parameters), conf)
4244
}
@@ -54,8 +56,8 @@ private[sql] class AvroOptions(
5456
* instead of "string" type in the default converted schema.
5557
*/
5658
val schema: Option[Schema] = {
57-
parameters.get("avroSchema").map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
58-
val avroUrlSchema = parameters.get("avroSchemaUrl").map(url => {
59+
parameters.get(AVRO_SCHEMA).map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
60+
val avroUrlSchema = parameters.get(AVRO_SCHEMA_URL).map(url => {
5961
log.debug("loading avro schema from url: " + url)
6062
val fs = FileSystem.get(new URI(url), conf)
6163
val in = fs.open(new Path(url))
@@ -75,20 +77,20 @@ private[sql] class AvroOptions(
7577
* whose field names do not match. Defaults to false.
7678
*/
7779
val positionalFieldMatching: Boolean =
78-
parameters.get("positionalFieldMatching").exists(_.toBoolean)
80+
parameters.get(POSITIONAL_FIELD_MATCHING).exists(_.toBoolean)
7981

8082
/**
8183
* Top level record name in write result, which is required in Avro spec.
8284
* See https://avro.apache.org/docs/1.11.1/specification/#schema-record .
8385
* Default value is "topLevelRecord"
8486
*/
85-
val recordName: String = parameters.getOrElse("recordName", "topLevelRecord")
87+
val recordName: String = parameters.getOrElse(RECORD_NAME, "topLevelRecord")
8688

8789
/**
8890
* Record namespace in write result. Default value is "".
8991
* See Avro spec for details: https://avro.apache.org/docs/1.11.1/specification/#schema-record .
9092
*/
91-
val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
93+
val recordNamespace: String = parameters.getOrElse(RECORD_NAMESPACE, "")
9294

9395
/**
9496
* The `ignoreExtension` option controls ignoring of files without `.avro` extensions in read.
@@ -104,7 +106,7 @@ private[sql] class AvroOptions(
104106
ignoreFilesWithoutExtensionByDefault)
105107

106108
parameters
107-
.get(AvroOptions.ignoreExtensionKey)
109+
.get(IGNORE_EXTENSION)
108110
.map(_.toBoolean)
109111
.getOrElse(!ignoreFilesWithoutExtension)
110112
}
@@ -116,21 +118,21 @@ private[sql] class AvroOptions(
116118
* taken into account. If the former one is not set too, the `snappy` codec is used by default.
117119
*/
118120
val compression: String = {
119-
parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
121+
parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec)
120122
}
121123

122124
val parseMode: ParseMode =
123-
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
125+
parameters.get(MODE).map(ParseMode.fromString).getOrElse(FailFastMode)
124126

125127
/**
126128
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
127129
*/
128130
val datetimeRebaseModeInRead: String = parameters
129-
.get(AvroOptions.DATETIME_REBASE_MODE)
131+
.get(DATETIME_REBASE_MODE)
130132
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
131133
}
132134

133-
private[sql] object AvroOptions {
135+
private[sql] object AvroOptions extends DataSourceOptions {
134136
def apply(parameters: Map[String, String]): AvroOptions = {
135137
val hadoopConf = SparkSession
136138
.getActiveSession
@@ -139,11 +141,17 @@ private[sql] object AvroOptions {
139141
new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
140142
}
141143

142-
val ignoreExtensionKey = "ignoreExtension"
143-
144+
val IGNORE_EXTENSION = newOption("ignoreExtension")
145+
val MODE = newOption("mode")
146+
val RECORD_NAME = newOption("recordName")
147+
val COMPRESSION = newOption("compression")
148+
val AVRO_SCHEMA = newOption("avroSchema")
149+
val AVRO_SCHEMA_URL = newOption("avroSchemaUrl")
150+
val RECORD_NAMESPACE = newOption("recordNamespace")
151+
val POSITIONAL_FIELD_MATCHING = newOption("positionalFieldMatching")
144152
// The option controls rebasing of the DATE and TIMESTAMP values between
145153
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Avro
146154
// datasource similarly to the SQL config `spark.sql.avro.datetimeRebaseModeInRead`,
147155
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
148-
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
156+
val DATETIME_REBASE_MODE = newOption("datetimeRebaseMode")
149157
}

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.Job
3434
import org.apache.spark.SparkException
3535
import org.apache.spark.internal.Logging
3636
import org.apache.spark.sql.SparkSession
37-
import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey
37+
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
3838
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
3939
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
4040
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
@@ -50,8 +50,8 @@ private[sql] object AvroUtils extends Logging {
5050
val conf = spark.sessionState.newHadoopConfWithOptions(options)
5151
val parsedOptions = new AvroOptions(options, conf)
5252

53-
if (parsedOptions.parameters.contains(ignoreExtensionKey)) {
54-
logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " +
53+
if (parsedOptions.parameters.contains(IGNORE_EXTENSION)) {
54+
logWarning(s"Option $IGNORE_EXTENSION is deprecated. Please use the " +
5555
"general data source option pathGlobFilter for filtering file names.")
5656
}
5757
// User can specify an optional avro json schema.

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,13 +1804,13 @@ abstract class AvroSuite
18041804
spark
18051805
.read
18061806
.format("avro")
1807-
.option(AvroOptions.ignoreExtensionKey, false)
1807+
.option(AvroOptions.IGNORE_EXTENSION, false)
18081808
.load(dir.getCanonicalPath)
18091809
.count()
18101810
}
18111811
val deprecatedEvents = logAppender.loggingEvents
18121812
.filter(_.getMessage.getFormattedMessage.contains(
1813-
s"Option ${AvroOptions.ignoreExtensionKey} is deprecated"))
1813+
s"Option ${AvroOptions.IGNORE_EXTENSION} is deprecated"))
18141814
assert(deprecatedEvents.size === 1)
18151815
}
18161816
}
@@ -2272,6 +2272,20 @@ abstract class AvroSuite
22722272
checkAnswer(df2, df.collect().toSeq)
22732273
}
22742274
}
2275+
2276+
test("SPARK-40667: validate Avro Options") {
2277+
assert(AvroOptions.getAllOptions.size == 9)
2278+
// Please add validation on any new Avro options here
2279+
assert(AvroOptions.isValidOption("ignoreExtension"))
2280+
assert(AvroOptions.isValidOption("mode"))
2281+
assert(AvroOptions.isValidOption("recordName"))
2282+
assert(AvroOptions.isValidOption("compression"))
2283+
assert(AvroOptions.isValidOption("avroSchema"))
2284+
assert(AvroOptions.isValidOption("avroSchemaUrl"))
2285+
assert(AvroOptions.isValidOption("recordNamespace"))
2286+
assert(AvroOptions.isValidOption("positionalFieldMatching"))
2287+
assert(AvroOptions.isValidOption("datetimeRebaseMode"))
2288+
}
22752289
}
22762290

22772291
class AvroV1Suite extends AvroSuite {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
/**
21+
* Interface defines the following methods for a data source:
22+
* - register a new option name
23+
* - retrieve all registered option names
24+
* - valid a given option name
25+
* - get alternative option name if any
26+
*/
27+
trait DataSourceOptions {
28+
// Option -> Alternative Option if any
29+
private val validOptions = collection.mutable.Map[String, Option[String]]()
30+
31+
/**
32+
* Register a new Option.
33+
*/
34+
protected def newOption(name: String): String = {
35+
validOptions += (name -> None)
36+
name
37+
}
38+
39+
/**
40+
* Register a new Option with an alternative name.
41+
* @param name Option name
42+
* @param alternative Alternative option name
43+
*/
44+
protected def newOption(name: String, alternative: String): Unit = {
45+
// Register both of the options
46+
validOptions += (name -> Some(alternative))
47+
validOptions += (alternative -> Some(name))
48+
}
49+
50+
/**
51+
* @return All data source options and their alternatives if any
52+
*/
53+
def getAllOptions: scala.collection.Set[String] = validOptions.keySet
54+
55+
/**
56+
* @param name Option name to be validated
57+
* @return if the given Option name is valid
58+
*/
59+
def isValidOption(name: String): Boolean = validOptions.contains(name)
60+
61+
/**
62+
* @param name Option name
63+
* @return Alternative option name if any
64+
*/
65+
def getAlternativeOption(name: String): Option[String] = validOptions.get(name).flatten
66+
}

0 commit comments

Comments
 (0)