Skip to content

Commit 74e09d6

Browse files
committed
Merge branch 'SPARK-20865' into 'spark_2.1'
[SPARK-20865] Structured streaming dataframe cache、unpersist报错 structured streaming dataset cache 会报错,应当给个log告警忽略cache,unpersist等操作。 resolve apache#74 See merge request !65
2 parents 782a22e + 5938d76 commit 74e09d6

File tree

2 files changed

+55
-30
lines changed

2 files changed

+55
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.api.java.JavaRDD
3131
import org.apache.spark.api.java.function._
3232
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
3333
import org.apache.spark.broadcast.Broadcast
34+
import org.apache.spark.internal.Logging
3435
import org.apache.spark.rdd.RDD
3536
import org.apache.spark.sql.catalyst._
3637
import org.apache.spark.sql.catalyst.analysis._
@@ -156,7 +157,7 @@ class Dataset[T] private[sql](
156157
@transient val sparkSession: SparkSession,
157158
@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
158159
encoder: Encoder[T])
159-
extends Serializable {
160+
extends Serializable with Logging {
160161

161162
queryExecution.assertAnalyzed()
162163

@@ -503,35 +504,40 @@ class Dataset[T] private[sql](
503504
@Experimental
504505
@InterfaceStability.Evolving
505506
def checkpoint(eager: Boolean): Dataset[T] = {
506-
val internalRdd = queryExecution.toRdd.map(_.copy())
507-
internalRdd.checkpoint()
507+
if (isStreaming) {
508+
logWarning("Checkpoint is no-op in queries with streaming sources.")
509+
this
510+
} else {
511+
val internalRdd = queryExecution.toRdd.map(_.copy())
512+
internalRdd.checkpoint()
508513

509-
if (eager) {
510-
internalRdd.count()
511-
}
514+
if (eager) {
515+
internalRdd.count()
516+
}
512517

513-
val physicalPlan = queryExecution.executedPlan
518+
val physicalPlan = queryExecution.executedPlan
514519

515-
// Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the
516-
// size of `PartitioningCollection` may grow exponentially for queries involving deep inner
517-
// joins.
518-
def firstLeafPartitioning(partitioning: Partitioning): Partitioning = {
519-
partitioning match {
520-
case p: PartitioningCollection => firstLeafPartitioning(p.partitionings.head)
521-
case p => p
520+
// Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the
521+
// size of `PartitioningCollection` may grow exponentially for queries involving deep inner
522+
// joins.
523+
def firstLeafPartitioning(partitioning: Partitioning): Partitioning = {
524+
partitioning match {
525+
case p: PartitioningCollection => firstLeafPartitioning(p.partitionings.head)
526+
case p => p
527+
}
522528
}
523-
}
524529

525-
val outputPartitioning = firstLeafPartitioning(physicalPlan.outputPartitioning)
530+
val outputPartitioning = firstLeafPartitioning(physicalPlan.outputPartitioning)
526531

527-
Dataset.ofRows(
528-
sparkSession,
529-
LogicalRDD(
530-
logicalPlan.output,
531-
internalRdd,
532-
outputPartitioning,
533-
physicalPlan.outputOrdering
534-
)(sparkSession)).as[T]
532+
Dataset.ofRows(
533+
sparkSession,
534+
LogicalRDD(
535+
logicalPlan.output,
536+
internalRdd,
537+
outputPartitioning,
538+
physicalPlan.outputOrdering
539+
)(sparkSession)).as[T]
540+
}
535541
}
536542

537543
/**
@@ -2493,7 +2499,11 @@ class Dataset[T] private[sql](
24932499
* @since 1.6.0
24942500
*/
24952501
def persist(): this.type = {
2496-
sparkSession.sharedState.cacheManager.cacheQuery(this)
2502+
if (isStreaming) {
2503+
logWarning("Persist is no-op in queries with streaming sources.")
2504+
} else {
2505+
sparkSession.sharedState.cacheManager.cacheQuery(this)
2506+
}
24972507
this
24982508
}
24992509

@@ -2515,7 +2525,11 @@ class Dataset[T] private[sql](
25152525
* @since 1.6.0
25162526
*/
25172527
def persist(newLevel: StorageLevel): this.type = {
2518-
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
2528+
if (isStreaming) {
2529+
logWarning("Persist is no-op in queries with streaming sources.")
2530+
} else {
2531+
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
2532+
}
25192533
this
25202534
}
25212535

@@ -2540,7 +2554,11 @@ class Dataset[T] private[sql](
25402554
* @since 1.6.0
25412555
*/
25422556
def unpersist(blocking: Boolean): this.type = {
2543-
sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
2557+
if (isStreaming) {
2558+
logWarning("Unpersist is no-op in queries with streaming sources.")
2559+
} else {
2560+
sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
2561+
}
25442562
this
25452563
}
25462564

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
21+
import java.nio.channels.ClosedChannelException
22+
import java.util.Locale
2123

2224
import scala.collection.JavaConverters._
2325
import scala.collection.mutable
@@ -73,7 +75,12 @@ private[state] class HDFSBackedStateStoreProvider(
7375
hadoopConf: Configuration
7476
) extends StateStoreProvider with Logging {
7577

76-
type MapType = java.util.HashMap[UnsafeRow, UnsafeRow]
78+
// ConcurrentHashMap is used because it generates fail-safe iterators on filtering
79+
// - The iterator is weakly consistent with the map, i.e., iterator's data reflect the values in
80+
// the map when the iterator was created
81+
// - Any updates to the map while iterating through the filtered iterator does not throw
82+
// java.util.ConcurrentModificationException
83+
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
7784

7885
/** Implementation of [[StateStore]] API which is backed by a HDFS-compatible file system */
7986
class HDFSBackedStateStore(val version: Long, mapToUpdate: MapType)
@@ -208,7 +215,7 @@ private[state] class HDFSBackedStateStoreProvider(
208215
}
209216

210217
override def toString(): String = {
211-
s"HDFSStateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
218+
s"HDFSStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]"
212219
}
213220
}
214221

@@ -565,7 +572,7 @@ private[state] class HDFSBackedStateStoreProvider(
565572
val nameParts = path.getName.split("\\.")
566573
if (nameParts.size == 2) {
567574
val version = nameParts(0).toLong
568-
nameParts(1).toLowerCase match {
575+
nameParts(1).toLowerCase(Locale.ROOT) match {
569576
case "delta" =>
570577
// ignore the file otherwise, snapshot file already exists for that batch id
571578
if (!versionToFiles.contains(version)) {

0 commit comments

Comments
 (0)