Skip to content

Commit ca0d2ed

Browse files
committed
fix
1 parent 448a063 commit ca0d2ed

File tree

3 files changed

+67
-15
lines changed

3 files changed

+67
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,34 @@ class CacheManager extends Logging {
166166
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
167167
while (it.hasNext) {
168168
val cd = it.next()
169-
if (condition(cd.plan)) {
170-
if (clearCache) {
171-
cd.cachedRepresentation.cacheBuilder.clearCache()
172-
}
169+
// If `clearCache` is false (which means the recache request comes from a non-cascading
170+
// cache invalidation) and the cache buffer has already been loaded, we do not need to
171+
// re-compile a physical plan because the old plan will not be used any more by the
172+
// CacheManager although it still lives in compiled `Dataset`s and it could still work.
173+
// Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
174+
// and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
175+
// buffer is still empty, then we could have a more efficient new plan by removing
176+
// dependency on the previously removed cache entries.
177+
// Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
178+
// status test and may not return the most accurate cache buffer state. So the worse case
179+
// scenario can be:
180+
// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
181+
// will clear the buffer and build a new plan. It is inefficient but doesn't affect
182+
// correctness.
183+
// 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
184+
// will keep it as it is. It means the physical plan has been re-compiled already in the
185+
// other thread.
186+
val buildNewPlan =
187+
clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
188+
if (condition(cd.plan) && buildNewPlan) {
189+
cd.cachedRepresentation.cacheBuilder.clearCache()
173190
// Remove the cache entry before we create a new one, so that we can have a different
174191
// physical plan.
175192
it.remove()
176193
val plan = spark.sessionState.executePlan(cd.plan).executedPlan
177194
val newCache = InMemoryRelation(
178-
cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
195+
cacheBuilder = cd.cachedRepresentation
196+
.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
179197
logicalPlan = cd.plan)
180198
needToRecache += cd.copy(cachedRepresentation = newCache)
181199
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,8 @@ case class CachedRDDBuilder(
7474
}
7575
}
7676

77-
def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
78-
new CachedRDDBuilder(
79-
useCompression,
80-
batchSize,
81-
storageLevel,
82-
cachedPlan = cachedPlan,
83-
tableName
84-
)(_cachedColumnBuffers)
77+
def isCachedColumnBuffersLoaded: Boolean = {
78+
_cachedColumnBuffers != null
8579
}
8680

8781
private def buildBuffers(): RDD[CachedBatch] = {

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
190190

191191
df1.unpersist(blocking = true)
192192

193-
// df1 un-cached; df2's cache plan re-compiled
193+
// df1 un-cached; df2's cache plan stays the same
194194
assert(df1.storageLevel == StorageLevel.NONE)
195-
assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
195+
assertCacheDependency(df1.groupBy('a).agg(sum('b)))
196196

197197
val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
198198
assertCached(df4)
@@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
206206
// first time use, load cache
207207
checkDataset(df5, Row(10))
208208
}
209+
210+
test("SPARK-26708 Cache data and cached plan should stay consistent") {
211+
val df = spark.range(0, 5).toDF("a")
212+
val df1 = df.withColumn("b", 'a + 1)
213+
val df2 = df.filter('a > 1)
214+
215+
df.cache()
216+
// Add df1 to the CacheManager; the buffer is currently empty.
217+
df1.cache()
218+
// After calling collect(), df1's buffer has been loaded.
219+
df1.collect()
220+
// Add df2 to the CacheManager; the buffer is currently empty.
221+
df2.cache()
222+
223+
// Verify that df1 is a InMemoryRelation plan with dependency on another cached plan.
224+
assertCacheDependency(df1)
225+
val df1InnerPlan = df1.queryExecution.withCachedData
226+
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
227+
// Verify that df2 is a InMemoryRelation plan with dependency on another cached plan.
228+
assertCacheDependency(df2)
229+
230+
df.unpersist(blocking = true)
231+
232+
// Verify that df1's cache has stayed the same, since df1's cache already has data
233+
// before df.unpersist().
234+
val df1Limit = df1.limit(2)
235+
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
236+
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
237+
}
238+
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
239+
240+
// Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
241+
// on df, since df2's cache had not been loaded before df.unpersist().
242+
val df2Limit = df2.limit(2)
243+
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
244+
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
245+
}
246+
assert(df2LimitInnerPlan.isDefined &&
247+
df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
248+
}
209249
}

0 commit comments

Comments
 (0)