Skip to content

Commit c60e2bf

Browse files
committed
address comments
1 parent b8b3a3c commit c60e2bf

File tree

4 files changed

+13
-18
lines changed

4 files changed

+13
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
207207
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
208208
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
209209
source = provider, conf = sparkSession.sessionState.conf)
210-
val pathsOption = {
210+
val pathsOption = if (paths.isEmpty) {
211+
None
212+
} else {
211213
val objectMapper = new ObjectMapper()
212-
"path" -> objectMapper.writeValueAsString(paths.toArray)
214+
Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
213215
}
214-
// TODO: remove this option.
215-
val checkFilesExistsOption = "check_files_exist" -> "true"
216-
val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption
216+
// TODO SPARK-27113: remove this option.
217+
val checkFilesExistsOpt = "check_files_exist" -> "true"
218+
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption + checkFilesExistsOpt
217219
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
218220
val table = userSpecifiedSchema match {
219221
case Some(schema) => provider.getTable(dsOptions, schema)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
261261
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
262262
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
263263
provider, session.sessionState.conf)
264-
// TODO: remove this option.
264+
// TODO SPARK-27113: remove this option.
265265
val checkFilesExistsOption = "check_files_exist" -> "false"
266266
val options = sessionOptions ++ extraOptions + checkFilesExistsOption
267267
val dsOptions = new CaseInsensitiveStringMap(options.asJava)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2
1818

19-
import java.io.IOException
20-
2119
import com.fasterxml.jackson.databind.ObjectMapper
2220

2321
import org.apache.spark.sql.SparkSession
@@ -43,14 +41,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
4341

4442
protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
4543
val objectMapper = new ObjectMapper()
46-
Option(map.get("path")).map { pathStr =>
47-
try {
48-
val paths = objectMapper.readValue(pathStr, classOf[Array[String]])
49-
paths.toSeq
50-
} catch {
51-
case _: IOException => Seq(pathStr)
52-
}
53-
}.getOrElse {
44+
Option(map.get("paths")).map { pathStr =>
45+
objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
46+
}.orElse(Option(map.get("path")).map(Seq(_))).getOrElse {
5447
throw new IllegalArgumentException("'path' must be given when reading files.")
5548
}
5649
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
308308
eventually(timeout(streamingTimeout)) {
309309
// Write options should not be set.
310310
assert(!LastWriteOptions.options.containsKey(readOptionName))
311-
assert(LastReadOptions.options.get(readOptionName) == "true")
311+
assert(LastReadOptions.options.getBoolean(readOptionName, false))
312312
}
313313
}
314314
}
@@ -319,7 +319,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
319319
eventually(timeout(streamingTimeout)) {
320320
// Read options should not be set.
321321
assert(!LastReadOptions.options.containsKey(writeOptionName))
322-
assert(LastWriteOptions.options.get(writeOptionName) == "true")
322+
assert(LastWriteOptions.options.getBoolean(writeOptionName, false))
323323
}
324324
}
325325
}

0 commit comments

Comments
 (0)