Skip to content

Commit d6ee69e

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-22488][SQL] Fix the view resolution issue in the SparkSession internal table() API
## What changes were proposed in this pull request? The current internal `table()` API of `SparkSession` bypasses the Analyzer and directly calls `sessionState.catalog.lookupRelation` API. This skips the view resolution logics in our Analyzer rule `ResolveRelations`. This internal API is widely used by various DDL commands, public and internal APIs. Users might get the strange error caused by view resolution when the default database is different. ``` Table or view not found: t1; line 1 pos 14 org.apache.spark.sql.AnalysisException: Table or view not found: t1; line 1 pos 14 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) ``` This PR is to fix it by enforcing it to use `ResolveRelations` to resolve the table. ## How was this patch tested? Added a test case and modified the existing test cases Author: gatorsmile <[email protected]> Closes #19713 from gatorsmile/viewResolution.
1 parent 154351e commit d6ee69e

File tree

8 files changed

+43
-18
lines changed

8 files changed

+43
-18
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,7 @@ test_that("test cache, uncache and clearCache", {
733733
expect_true(dropTempView("table1"))
734734

735735
expect_error(uncacheTable("foo"),
736-
"Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
736+
"Error in uncacheTable : analysis error - Table or view not found: foo")
737737
})
738738

739739
test_that("insertInto() on a registered table", {

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
3232
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
3333
import org.apache.spark.sql.catalog.Catalog
3434
import org.apache.spark.sql.catalyst._
35+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
3536
import org.apache.spark.sql.catalyst.encoders._
3637
import org.apache.spark.sql.catalyst.expressions.AttributeReference
3738
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
@@ -621,7 +622,7 @@ class SparkSession private(
621622
}
622623

623624
private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
624-
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
625+
Dataset.ofRows(self, UnresolvedRelation(tableIdent))
625626
}
626627

627628
/* ----------------- *

sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,8 @@ case class UncacheTableCommand(
5454

5555
override def run(sparkSession: SparkSession): Seq[Row] = {
5656
val tableId = tableIdent.quotedString
57-
try {
57+
if (!ifExists || sparkSession.catalog.tableExists(tableId)) {
5858
sparkSession.catalog.uncacheTable(tableId)
59-
} catch {
60-
case _: NoSuchTableException if ifExists => // don't throw
6159
}
6260
Seq.empty[Row]
6361
}

sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,31 +35,36 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
3535
private var globalTempDB: String = _
3636

3737
test("basic semantic") {
38+
val expectedErrorMsg = "not found"
3839
try {
3940
sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")
4041

4142
// If there is no database in table name, we should try local temp view first, if not found,
4243
// try table/view in current database, which is "default" in this case. So we expect
4344
// NoSuchTableException here.
44-
intercept[NoSuchTableException](spark.table("src"))
45+
var e = intercept[AnalysisException](spark.table("src")).getMessage
46+
assert(e.contains(expectedErrorMsg))
4547

4648
// Use qualified name to refer to the global temp view explicitly.
4749
checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
4850

4951
// Table name without database will never refer to a global temp view.
50-
intercept[NoSuchTableException](sql("DROP VIEW src"))
52+
e = intercept[AnalysisException](sql("DROP VIEW src")).getMessage
53+
assert(e.contains(expectedErrorMsg))
5154

5255
sql(s"DROP VIEW $globalTempDB.src")
5356
// The global temp view should be dropped successfully.
54-
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
57+
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
58+
assert(e.contains(expectedErrorMsg))
5559

5660
// We can also use Dataset API to create global temp view
5761
Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
5862
checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
5963

6064
// Use qualified name to rename a global temp view.
6165
sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
62-
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
66+
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
67+
assert(e.contains(expectedErrorMsg))
6368
checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))
6469

6570
// Use qualified name to alter a global temp view.
@@ -68,7 +73,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
6873

6974
// We can also use Catalog API to drop global temp view
7075
spark.catalog.dropGlobalTempView("src2")
71-
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
76+
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src2")).getMessage
77+
assert(e.contains(expectedErrorMsg))
7278

7379
// We can also use Dataset API to replace global temp view
7480
Seq(2 -> "b").toDF("i", "j").createOrReplaceGlobalTempView("src")

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,4 +679,19 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
679679
assert(spark.table("v").schema.head.name == "cBa")
680680
}
681681
}
682+
683+
test("sparkSession API view resolution with different default database") {
684+
withDatabase("db2") {
685+
withView("v1") {
686+
withTable("t1") {
687+
sql("USE default")
688+
sql("CREATE TABLE t1 USING parquet AS SELECT 1 AS c0")
689+
sql("CREATE VIEW v1 AS SELECT * FROM t1")
690+
sql("CREATE DATABASE IF NOT EXISTS db2")
691+
sql("USE db2")
692+
checkAnswer(spark.table("default.v1"), Row(1))
693+
}
694+
}
695+
}
696+
}
682697
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,10 +825,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
825825
spark.range(10).createOrReplaceTempView("tab1")
826826
sql("ALTER TABLE tab1 RENAME TO tab2")
827827
checkAnswer(spark.table("tab2"), spark.range(10).toDF())
828-
intercept[NoSuchTableException] { spark.table("tab1") }
828+
val e = intercept[AnalysisException](spark.table("tab1")).getMessage
829+
assert(e.contains("Table or view not found"))
829830
sql("ALTER VIEW tab2 RENAME TO tab1")
830831
checkAnswer(spark.table("tab1"), spark.range(10).toDF())
831-
intercept[NoSuchTableException] { spark.table("tab2") }
832+
intercept[AnalysisException] { spark.table("tab2") }
832833
}
833834
}
834835

sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
326326
assert(ColumnsRequired.set === requiredColumnNames)
327327

328328
val table = spark.table("oneToTenFiltered")
329-
val relation = table.queryExecution.logical.collectFirst {
329+
val relation = table.queryExecution.analyzed.collectFirst {
330330
case LogicalRelation(r, _, _, _) => r
331331
}.get
332332

sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
102102
}
103103

104104
test("uncache of nonexistant tables") {
105+
val expectedErrorMsg = "Table or view not found: nonexistantTable"
105106
// make sure table doesn't exist
106-
intercept[NoSuchTableException](spark.table("nonexistantTable"))
107-
intercept[NoSuchTableException] {
107+
var e = intercept[AnalysisException](spark.table("nonexistantTable")).getMessage
108+
assert(e.contains(expectedErrorMsg))
109+
e = intercept[AnalysisException] {
108110
spark.catalog.uncacheTable("nonexistantTable")
109-
}
110-
intercept[NoSuchTableException] {
111+
}.getMessage
112+
assert(e.contains(expectedErrorMsg))
113+
e = intercept[AnalysisException] {
111114
sql("UNCACHE TABLE nonexistantTable")
112-
}
115+
}.getMessage
116+
assert(e.contains(expectedErrorMsg))
113117
sql("UNCACHE TABLE IF EXISTS nonexistantTable")
114118
}
115119

0 commit comments

Comments
 (0)