Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan


Expand All @@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* @since 1.3.0
*/
@InterfaceStability.Stable
class AnalysisException protected[sql] (
class AnalysisException(
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None,
Expand All @@ -36,8 +37,19 @@ class AnalysisException protected[sql] (
val cause: Option[Throwable] = None)
extends Exception(message, cause.orNull) with Serializable {

def withPlan(plan: LogicalPlan): AnalysisException = {
withPosition(plan.origin.line, plan.origin.startPosition, Option(plan))
}

def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
val newException = new AnalysisException(message, line, startPosition)
withPosition(line, startPosition, None)
}

private def withPosition(
line: Option[Int],
startPosition: Option[Int],
plan: Option[LogicalPlan]): AnalysisException = {
val newException = new AnalysisException(message, line, startPosition, plan)
newException.setStackTrace(getStackTrace)
newException
}
Expand All @@ -55,3 +67,113 @@ class AnalysisException protected[sql] (
s"$message;$lineAnnotation$positionAnnotation"
}
}

object AnalysisException {
/**
* Create a no such database analysis exception.
*/
def noSuchDatabase(db: String): AnalysisException = {
new AnalysisException(s"Database '$db' not found")
}

/**
* Create a database already exists analysis exception.
*/
def databaseAlreadyExists(db: String): AnalysisException = {
new AnalysisException(s"Database '$db' already exists")
}

/**
* Create a no such table analysis exception.
*/
def noSuchTable(db: String, table: String): AnalysisException = {
new AnalysisException(s"Table or view '$table' not found in database '$db'")
}

/**
* Create a table already exists analysis exception.
*/
def tableAlreadyExists(db: String, table: String): AnalysisException = {
new AnalysisException(s"Table or view '$table' already exists in database '$db'")
}

/**
* Create a temporary table already exists analysis exception.
*/
def tempTableAlreadyExists(table: String): AnalysisException = {
new AnalysisException(s"Temporary table '$table' already exists")
}

/**
* Create a no such partition analysis exception.
*/
def noSuchPartition(db: String, table: String, spec: TablePartitionSpec): AnalysisException = {
new AnalysisException(
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
}

/**
* Create a partition already exists analysis exception.
*/
def partitionAlreadyExists(
db: String,
table: String,
spec: TablePartitionSpec): AnalysisException = {
new AnalysisException(
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
}

/**
* Create a no such partitions analysis exception.
*/
def noSuchPartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec]): AnalysisException = {
new AnalysisException(
s"The following partitions not found in table '$table' database '$db':\n"
+ specs.mkString("\n===\n"))
}

/**
* Create a partitions already exists analysis exception.
*/
def partitionsAlreadyExists(
db: String,
table: String,
specs: Seq[TablePartitionSpec]): AnalysisException = {
new AnalysisException(
s"The following partitions already exists in table '$table' database '$db':\n"
+ specs.mkString("\n===\n"))
}

/**
* Create a no such function exception.
*/
def noSuchFunction(db: String, func: String): AnalysisException = {
new AnalysisException(
s"Undefined function: '$func'. This function is neither a registered temporary " +
s"function nor a permanent function registered in the database '$db'.")
}

/**
* Create a function already exists analysis exception.
*/
def functionAlreadyExists(db: String, func: String): AnalysisException = {
new AnalysisException(s"Function '$func' already exists in database '$db'")
}

/**
* Create a no such permanent function exception.
*/
def noSuchPermanentFunction(db: String, func: String): AnalysisException = {
new AnalysisException(s"Function '$func' not found in database '$db'")
}

/**
* Create a no such temporary function exception.
*/
def noSuchTempFunction(func: String): AnalysisException = {
new AnalysisException(s"Temporary function '$func' not found")
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,7 @@ class Analyzer(
try {
catalog.lookupRelation(tableIdentWithDb)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}")
// If the database is defined and that database is not found, throw an AnalysisException.
// Note that if the database is not defined, it is possible we are looking up a temp view.
case e: NoSuchDatabaseException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " +
s"database ${e.db} doesn't exsits.")
case a: AnalysisException => throw a.withPlan(u)
}
}

Expand Down Expand Up @@ -1122,7 +1116,9 @@ class Analyzer(
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
case f: UnresolvedFunction if !catalog.functionExists(f.name) =>
withPosition(f) {
throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName)
throw AnalysisException.noSuchFunction(
f.name.database.getOrElse("default"),
f.name.funcName)
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType

Expand All @@ -28,32 +28,32 @@ import org.apache.spark.sql.types.StructType
* can be accessed in multiple threads. This is an external catalog because it is expected to
* interact with external systems.
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
* Implementations should throw an [[AnalysisException]] when databases don't exist.
*/
abstract class ExternalCatalog {
import CatalogTypes.TablePartitionSpec

protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new NoSuchDatabaseException(db)
throw AnalysisException.noSuchDatabase(db)
}
}

protected def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new NoSuchTableException(db = db, table = table)
throw AnalysisException.noSuchTable(db, table)
}
}

protected def requireFunctionExists(db: String, funcName: String): Unit = {
if (!functionExists(db, funcName)) {
throw new NoSuchFunctionException(db = db, func = funcName)
throw AnalysisException.noSuchFunction(db, funcName)
}
}

protected def requireFunctionNotExists(db: String, funcName: String): Unit = {
if (functionExists(db, funcName)) {
throw new FunctionAlreadyExistsException(db = db, func = funcName)
throw AnalysisException.functionAlreadyExists(db, funcName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.StringUtils

Expand Down Expand Up @@ -58,7 +57,7 @@ class GlobalTempViewManager(val database: String) {
viewDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
if (!overrideIfExists && viewDefinitions.contains(name)) {
throw new TempTableAlreadyExistsException(name)
throw AnalysisException.tempTableAlreadyExists(name)
}
viewDefinitions.put(name, viewDefinition)
}
Expand Down
Loading