Skip to content

Commit 7953fcd

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-34700][SQL] SessionCatalog's temporary view related APIs should take/return more concrete types
### What changes were proposed in this pull request? Now that all the temporary views are wrapped with `TemporaryViewRelation`(#31273, #31652, and #31825), this PR proposes to update `SessionCatalog`'s APIs for temporary views to take or return more concrete types. APIs that will take `TemporaryViewRelation` instead of `LogicalPlan`: ``` createTempView, createGlobalTempView, alterTempViewDefinition ``` APIs that will return `TemporaryViewRelation` instead of `LogicalPlan`: ``` getRawTempView, getRawGlobalTempView ``` APIs that will return `View` instead of `LogicalPlan`: ``` getTempView, getGlobalTempView, lookupTempView ``` ### Why are the changes needed? Internal refactoring to work with more concrete types. ### Does this PR introduce _any_ user-facing change? No, this is internal refactoring. ### How was this patch tested? Updated existing tests affected by the refactoring. Closes #31906 from imback82/use_temporary_view_relation. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent e4bb975 commit 7953fcd

File tree

11 files changed

+115
-110
lines changed

11 files changed

+115
-110
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import javax.annotation.concurrent.GuardedBy
2222
import scala.collection.mutable
2323

2424
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
25-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2625
import org.apache.spark.sql.catalyst.util.StringUtils
2726
import org.apache.spark.sql.errors.QueryCompilationErrors
2827

@@ -40,12 +39,12 @@ class GlobalTempViewManager(val database: String) {
4039

4140
/** List of view definitions, mapping from view name to logical plan. */
4241
@GuardedBy("this")
43-
private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
42+
private val viewDefinitions = new mutable.HashMap[String, TemporaryViewRelation]
4443

4544
/**
4645
* Returns the global view definition which matches the given name, or None if not found.
4746
*/
48-
def get(name: String): Option[LogicalPlan] = synchronized {
47+
def get(name: String): Option[TemporaryViewRelation] = synchronized {
4948
viewDefinitions.get(name)
5049
}
5150

@@ -55,7 +54,7 @@ class GlobalTempViewManager(val database: String) {
5554
*/
5655
def create(
5756
name: String,
58-
viewDefinition: LogicalPlan,
57+
viewDefinition: TemporaryViewRelation,
5958
overrideIfExists: Boolean): Unit = synchronized {
6059
if (!overrideIfExists && viewDefinitions.contains(name)) {
6160
throw new TempTableAlreadyExistsException(name)
@@ -68,7 +67,7 @@ class GlobalTempViewManager(val database: String) {
6867
*/
6968
def update(
7069
name: String,
71-
viewDefinition: LogicalPlan): Boolean = synchronized {
70+
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
7271
if (viewDefinitions.contains(name)) {
7372
viewDefinitions.put(name, viewDefinition)
7473
true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class SessionCatalog(
101101

102102
/** List of temporary views, mapping from table name to their logical plan. */
103103
@GuardedBy("this")
104-
protected val tempViews = new mutable.HashMap[String, LogicalPlan]
104+
protected val tempViews = new mutable.HashMap[String, TemporaryViewRelation]
105105

106106
// Note: we track current database here because certain operations do not explicitly
107107
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
@@ -573,21 +573,21 @@ class SessionCatalog(
573573
*/
574574
def createTempView(
575575
name: String,
576-
tableDefinition: LogicalPlan,
576+
viewDefinition: TemporaryViewRelation,
577577
overrideIfExists: Boolean): Unit = synchronized {
578578
val table = formatTableName(name)
579579
if (tempViews.contains(table) && !overrideIfExists) {
580580
throw new TempTableAlreadyExistsException(name)
581581
}
582-
tempViews.put(table, tableDefinition)
582+
tempViews.put(table, viewDefinition)
583583
}
584584

585585
/**
586586
* Create a global temporary view.
587587
*/
588588
def createGlobalTempView(
589589
name: String,
590-
viewDefinition: LogicalPlan,
590+
viewDefinition: TemporaryViewRelation,
591591
overrideIfExists: Boolean): Unit = {
592592
globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
593593
}
@@ -598,7 +598,7 @@ class SessionCatalog(
598598
*/
599599
def alterTempViewDefinition(
600600
name: TableIdentifier,
601-
viewDefinition: LogicalPlan): Boolean = synchronized {
601+
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
602602
val viewName = formatTableName(name.table)
603603
if (name.database.isEmpty) {
604604
if (tempViews.contains(viewName)) {
@@ -617,14 +617,14 @@ class SessionCatalog(
617617
/**
618618
* Return a local temporary view exactly as it was stored.
619619
*/
620-
def getRawTempView(name: String): Option[LogicalPlan] = synchronized {
620+
def getRawTempView(name: String): Option[TemporaryViewRelation] = synchronized {
621621
tempViews.get(formatTableName(name))
622622
}
623623

624624
/**
625625
* Generate a [[View]] operator from the temporary view stored.
626626
*/
627-
def getTempView(name: String): Option[LogicalPlan] = synchronized {
627+
def getTempView(name: String): Option[View] = synchronized {
628628
getRawTempView(name).map(getTempViewPlan)
629629
}
630630

@@ -635,14 +635,14 @@ class SessionCatalog(
635635
/**
636636
* Return a global temporary view exactly as it was stored.
637637
*/
638-
def getRawGlobalTempView(name: String): Option[LogicalPlan] = {
638+
def getRawGlobalTempView(name: String): Option[TemporaryViewRelation] = {
639639
globalTempViewManager.get(formatTableName(name))
640640
}
641641

642642
/**
643643
* Generate a [[View]] operator from the global temporary view stored.
644644
*/
645-
def getGlobalTempView(name: String): Option[LogicalPlan] = {
645+
def getGlobalTempView(name: String): Option[View] = {
646646
getRawGlobalTempView(name).map(getTempViewPlan)
647647
}
648648

@@ -680,25 +680,10 @@ class SessionCatalog(
680680
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
681681
val table = formatTableName(name.table)
682682
if (name.database.isEmpty) {
683-
tempViews.get(table).map {
684-
case TemporaryViewRelation(metadata, _) => metadata
685-
case plan =>
686-
CatalogTable(
687-
identifier = TableIdentifier(table),
688-
tableType = CatalogTableType.VIEW,
689-
storage = CatalogStorageFormat.empty,
690-
schema = plan.output.toStructType)
691-
}.getOrElse(getTableMetadata(name))
683+
tempViews.get(table).map(_.tableMeta).getOrElse(getTableMetadata(name))
692684
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
693-
globalTempViewManager.get(table).map {
694-
case TemporaryViewRelation(metadata, _) => metadata
695-
case plan =>
696-
CatalogTable(
697-
identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
698-
tableType = CatalogTableType.VIEW,
699-
storage = CatalogStorageFormat.empty,
700-
schema = plan.output.toStructType)
701-
}.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
685+
globalTempViewManager.get(table).map(_.tableMeta)
686+
.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
702687
} else {
703688
getTableMetadata(name)
704689
}
@@ -834,21 +819,9 @@ class SessionCatalog(
834819
}
835820
}
836821

837-
private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
838-
plan match {
839-
case TemporaryViewRelation(tableMeta, None) =>
840-
fromCatalogTable(tableMeta, isTempView = true)
841-
case TemporaryViewRelation(tableMeta, Some(plan)) =>
842-
View(desc = tableMeta, isTempView = true, child = plan)
843-
case other => other
844-
}
845-
}
846-
847-
def getTempViewSchema(plan: LogicalPlan): StructType = {
848-
plan match {
849-
case viewInfo: TemporaryViewRelation => viewInfo.tableMeta.schema
850-
case v => v.schema
851-
}
822+
private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = viewInfo.plan match {
823+
case Some(p) => View(desc = viewInfo.tableMeta, isTempView = true, child = p)
824+
case None => fromCatalogTable(viewInfo.tableMeta, isTempView = true)
852825
}
853826

854827
private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
@@ -909,7 +882,7 @@ class SessionCatalog(
909882
isTempView(nameParts.asTableIdentifier)
910883
}
911884

912-
def lookupTempView(name: TableIdentifier): Option[LogicalPlan] = {
885+
def lookupTempView(name: TableIdentifier): Option[View] = {
913886
val tableName = formatTableName(name.table)
914887
if (name.database.isEmpty) {
915888
tempViews.get(tableName).map(getTempViewPlan)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,64 @@ import java.net.URI
2121
import java.util.Locale
2222

2323
import org.apache.spark.sql.AnalysisException
24-
import org.apache.spark.sql.catalyst.QueryPlanningTracker
25-
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
24+
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier}
25+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog, TemporaryViewRelation}
26+
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
2627
import org.apache.spark.sql.catalyst.parser.ParseException
2728
import org.apache.spark.sql.catalyst.plans.PlanTest
2829
import org.apache.spark.sql.catalyst.plans.logical._
2930
import org.apache.spark.sql.catalyst.rules.Rule
30-
import org.apache.spark.sql.internal.SQLConf
31+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
32+
import org.apache.spark.sql.types.StructType
3133

3234
trait AnalysisTest extends PlanTest {
3335

3436
protected def extendedAnalysisRules: Seq[Rule[LogicalPlan]] = Nil
3537

38+
protected def createTempView(
39+
catalog: SessionCatalog,
40+
name: String,
41+
plan: LogicalPlan,
42+
overrideIfExists: Boolean): Unit = {
43+
val identifier = TableIdentifier(name)
44+
val metadata = createTempViewMetadata(identifier, plan.schema)
45+
val viewDefinition = TemporaryViewRelation(metadata, Some(plan))
46+
catalog.createTempView(name, viewDefinition, overrideIfExists)
47+
}
48+
49+
protected def createGlobalTempView(
50+
catalog: SessionCatalog,
51+
name: String,
52+
plan: LogicalPlan,
53+
overrideIfExists: Boolean): Unit = {
54+
val globalDb = Some(SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE))
55+
val identifier = TableIdentifier(name, globalDb)
56+
val metadata = createTempViewMetadata(identifier, plan.schema)
57+
val viewDefinition = TemporaryViewRelation(metadata, Some(plan))
58+
catalog.createGlobalTempView(name, viewDefinition, overrideIfExists)
59+
}
60+
61+
private def createTempViewMetadata(
62+
identifier: TableIdentifier,
63+
schema: StructType): CatalogTable = {
64+
CatalogTable(
65+
identifier = identifier,
66+
tableType = CatalogTableType.VIEW,
67+
storage = CatalogStorageFormat.empty,
68+
schema = schema,
69+
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
70+
}
71+
3672
protected def getAnalyzer: Analyzer = {
3773
val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)
3874
catalog.createDatabase(
3975
CatalogDatabase("default", "", new URI("loc"), Map.empty),
4076
ignoreIfExists = false)
41-
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
42-
catalog.createTempView("TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
43-
catalog.createTempView("TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
44-
catalog.createGlobalTempView("TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
45-
catalog.createGlobalTempView("TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
77+
createTempView(catalog, "TaBlE", TestRelations.testRelation, overrideIfExists = true)
78+
createTempView(catalog, "TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
79+
createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
80+
createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
81+
createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
4682
new Analyzer(catalog) {
4783
override val extendedResolutionRules = EliminateSubqueryAliases +: extendedAnalysisRules
4884
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter {
4949
private val f: Expression = UnresolvedAttribute("f")
5050
private val b: Expression = UnresolvedAttribute("b")
5151

52-
before {
53-
catalog.createTempView("table", relation, overrideIfExists = true)
54-
}
55-
5652
private def checkType(expression: Expression, expectedType: DataType): Unit = {
5753
val plan = Project(Seq(Alias(expression, "c")()), relation)
5854
assert(analyzer.execute(plan).schema.fields(0).dataType === expectedType)

0 commit comments

Comments
 (0)