Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}.sum
}

test("withColumn doesn't invalidate cached dataframe") {
var evalCount = 0
val myUDF = udf((x: String) => { evalCount += 1; "result" })
val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
df.cache()
Copy link
Contributor Author

@icexelloss icexelloss Jun 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is when evalCount is incremented (because the optimizer will execute the UDF locally on driver), the rest of the tests have no effect on the value of evalCount.

evalCount is copied (by value) to the closure and therefore it's not possible to change the value evalCount defined in the driver by incrementing it in the UDF (because each closure has a different copy of evalCount)


df.collect()
assert(evalCount === 1)

df.collect()
assert(evalCount === 1)

val df2 = df.withColumn("newColumn", lit(1))
df2.collect()

// We should not reevaluate the cached dataframe
assert(evalCount === 1)
}

test("cache temp table") {
withTempView("tempTable") {
testData.select('key).createOrReplaceTempView("tempTable")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.sql

import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.storage.StorageLevel


class DatasetCacheSuite extends QueryTest with SharedSQLContext {
class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits {
import testImplicits._

test("get storage level") {
Expand Down Expand Up @@ -96,4 +99,37 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
agged.unpersist()
assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.")
}

test("persist and then withColumn") {
val df = Seq(("test", 1)).toDF("s", "i")
val df2 = df.withColumn("newColumn", lit(1))

df.cache()
assertCached(df)
assertCached(df2)

df.count()
assertCached(df2)

df.unpersist()
assert(df.storageLevel == StorageLevel.NONE)
}

test("cache UDF result correctly") {
val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
val df2 = df.agg(sum(df("b")))

df.cache()
df.count()
assertCached(df2)

// udf has been evaluated during caching, and thus should not be re-evaluated here
failAfter(5 seconds) {
df2.collect()
}

df.unpersist()
assert(df.storageLevel == StorageLevel.NONE)
}
}