Skip to content

Commit 92bfd9a

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-28757][SQL] File table location should include both values of option path and paths
### What changes were proposed in this pull request? If both options `path` and `paths` are passed to file data source v2, both values of the options should be included as the target paths. ### Why are the changes needed? In V1 implementation, file table location includes both values of option `path` and `paths`. In the refactoring of #24025, the value of option `path` is ignored if "paths" are specified. We should make it consistent with V1. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test Closes #25473 from gengliangwang/fixPathOption. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c48e381 commit 92bfd9a

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,10 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
4343

4444
protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
4545
val objectMapper = new ObjectMapper()
46-
Option(map.get("paths")).map { pathStr =>
46+
val paths = Option(map.get("paths")).map { pathStr =>
4747
objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
48-
}.getOrElse {
49-
Option(map.get("path")).toSeq
50-
}
48+
}.getOrElse(Seq.empty)
49+
paths ++ Option(map.get("path")).toSeq
5150
}
5251

5352
protected def getTableName(paths: Seq[String]): String = {

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import org.scalatest.BeforeAndAfterAll
2929
import org.apache.spark.SparkException
3030
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
3131
import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT}
32+
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
33+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
34+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
3235
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
3336
import org.apache.spark.sql.functions._
3437
import org.apache.spark.sql.internal.SQLConf
@@ -707,6 +710,27 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
707710
}
708711
}
709712
}
713+
714+
test("File table location should include both values of option `path` and `paths`") {
715+
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
716+
withTempPaths(3) { paths =>
717+
paths.zipWithIndex.foreach { case (path, index) =>
718+
Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath)
719+
}
720+
val df = spark
721+
.read
722+
.option("path", paths.head.getCanonicalPath)
723+
.parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath)
724+
df.queryExecution.optimizedPlan match {
725+
case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) =>
726+
assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet)
727+
case _ =>
728+
throw new AnalysisException("Can not match ParquetTable in the query.")
729+
}
730+
checkAnswer(df, Seq(0, 1, 2).map(Row(_)))
731+
}
732+
}
733+
}
710734
}
711735

712736
object TestingUDT {

0 commit comments

Comments
 (0)