Skip to content

Commit 73cab50

Browse files
lianchengCodingCat
authored andcommitted
[SPARK-9743] [SQL] Fixes JSONRelation refreshing
PR apache#7696 added two `HadoopFsRelation.refresh()` calls ([this] [1], and [this] [2]) in `DataSourceStrategy` to make test case `InsertSuite.save directly to the path of a JSON table` pass. However, this forces every `HadoopFsRelation` table scan to do a refresh, which can be super expensive for tables with large number of partitions. The reason why the original test case fails without the `refresh()` calls is that, the old JSON relation builds the base RDD with the input paths, while `HadoopFsRelation` provides `FileStatus`es of leaf files. With the old JSON relation, we can create a temporary table based on a path, writing data to that, and then read newly written data without refreshing the table. This is no long true for `HadoopFsRelation`. This PR removes those two expensive refresh calls, and moves the refresh into `JSONRelation` to fix this issue. We might want to update `HadoopFsRelation` interface to provide better support for this use case. [1]: https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L63 [2]: https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L91 Author: Cheng Lian <[email protected]> Closes apache#8035 from liancheng/spark-9743/fix-json-relation-refreshing and squashes the following commits: ec1957d [Cheng Lian] Fixes JSONRelation refreshing
1 parent 16f99c3 commit 73cab50

File tree

4 files changed

+21
-12
lines changed

4 files changed

+21
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
6060
// Scanning partitioned HadoopFsRelation
6161
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
6262
if t.partitionSpec.partitionColumns.nonEmpty =>
63-
t.refresh()
6463
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
6564

6665
logInfo {
@@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
8887

8988
// Scanning non-partitioned HadoopFsRelation
9089
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
91-
t.refresh()
9290
// See buildPartitionedTableScan for the reason that we need to create a shard
9391
// broadcast HadoopConf.
9492
val sharedHadoopConf = SparkHadoopUtil.get.conf

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,22 @@ import java.io.CharArrayWriter
2222
import com.fasterxml.jackson.core.JsonFactory
2323
import com.google.common.base.Objects
2424
import org.apache.hadoop.fs.{FileStatus, Path}
25-
import org.apache.hadoop.io.{Text, LongWritable, NullWritable}
25+
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
2626
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
27-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
28-
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
2927
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
28+
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
29+
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
30+
3031
import org.apache.spark.Logging
32+
import org.apache.spark.broadcast.Broadcast
3133
import org.apache.spark.mapred.SparkHadoopMapRedUtil
32-
3334
import org.apache.spark.rdd.RDD
3435
import org.apache.spark.sql.catalyst.InternalRow
3536
import org.apache.spark.sql.execution.datasources.PartitionSpec
3637
import org.apache.spark.sql.sources._
3738
import org.apache.spark.sql.types.StructType
3839
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
40+
import org.apache.spark.util.SerializableConfiguration
3941

4042
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
4143

@@ -108,6 +110,15 @@ private[sql] class JSONRelation(
108110
jsonSchema
109111
}
110112

113+
override private[sql] def buildScan(
114+
requiredColumns: Array[String],
115+
filters: Array[Filter],
116+
inputPaths: Array[String],
117+
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
118+
refresh()
119+
super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf)
120+
}
121+
111122
override def buildScan(
112123
requiredColumns: Array[String],
113124
filters: Array[Filter],

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
555555
})
556556
}
557557

558-
private[sql] final def buildScan(
558+
private[sql] def buildScan(
559559
requiredColumns: Array[String],
560560
filters: Array[Filter],
561561
inputPaths: Array[String],

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
3232

3333
var path: File = null
3434

35-
override def beforeAll: Unit = {
35+
override def beforeAll(): Unit = {
3636
path = Utils.createTempDir()
37-
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
37+
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
3838
caseInsensitiveContext.read.json(rdd).registerTempTable("jt")
3939
sql(
4040
s"""
@@ -46,7 +46,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
4646
""".stripMargin)
4747
}
4848

49-
override def afterAll: Unit = {
49+
override def afterAll(): Unit = {
5050
caseInsensitiveContext.dropTempTable("jsonTable")
5151
caseInsensitiveContext.dropTempTable("jt")
5252
Utils.deleteRecursively(path)
@@ -110,7 +110,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
110110
)
111111

112112
// Writing the table to less part files.
113-
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5)
113+
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
114114
caseInsensitiveContext.read.json(rdd1).registerTempTable("jt1")
115115
sql(
116116
s"""
@@ -122,7 +122,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
122122
)
123123

124124
// Writing the table to more part files.
125-
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10)
125+
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
126126
caseInsensitiveContext.read.json(rdd2).registerTempTable("jt2")
127127
sql(
128128
s"""

0 commit comments

Comments
 (0)