@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
2323import org .apache .spark .sql .catalyst .expressions ._
2424import org .apache .spark .sql .catalyst .plans .logical ._
2525import org .apache .spark .sql .catalyst .rules ._
26- import org .apache .spark .sql .types .{ ArrayType , StructField , StructType , IntegerType }
26+ import org .apache .spark .sql .types ._
2727
2828/**
2929 * A trivial [[Analyzer ]] with an [[EmptyCatalog ]] and [[EmptyFunctionRegistry ]]. Used for testing
@@ -66,9 +66,7 @@ class Analyzer(catalog: Catalog,
6666 typeCoercionRules ++
6767 extendedRules : _* ),
6868 Batch (" Check Analysis" , Once ,
69- CheckResolution ::
70- CheckAggregation ::
71- Nil : _* ),
69+ CheckResolution ),
7270 Batch (" AnalysisOperators" , fixedPoint,
7371 EliminateAnalysisOperators )
7472 )
@@ -77,21 +75,70 @@ class Analyzer(catalog: Catalog,
7775 * Makes sure all attributes and logical plans have been resolved.
7876 */
7977 object CheckResolution extends Rule [LogicalPlan ] {
78+ def failAnalysis (msg : String ) = { throw new AnalysisException (msg) }
79+
8080 def apply (plan : LogicalPlan ): LogicalPlan = {
81- plan.transformUp {
82- case p if p.expressions.exists(! _.resolved) =>
83- val missing = p.expressions.filterNot(_.resolved).map(_.prettyString).mkString(" ," )
84- val from = p.inputSet.map(_.name).mkString(" {" , " , " , " }" )
85-
86- throw new AnalysisException (s " Cannot resolve ' $missing' given input columns $from" )
87- case p if ! p.resolved && p.childrenResolved =>
88- throw new AnalysisException (s " Unresolved operator in the query plan ${p.simpleString}" )
89- } match {
90- // As a backstop, use the root node to check that the entire plan tree is resolved.
91- case p if ! p.resolved =>
92- throw new AnalysisException (s " Unresolved operator in the query plan ${p.simpleString}" )
93- case p => p
81+ plan.foreachUp {
82+ case operator : LogicalPlan =>
83+ operator transformAllExpressions {
84+ case a : Attribute if ! a.resolved =>
85+ val from = operator.inputSet.map(_.name).mkString(" {" , " , " , " }" )
86+ failAnalysis(s " cannot resolve ' $a' given input columns $from" )
87+
88+ case c : Cast if ! c.resolved =>
89+ failAnalysis(
90+ s " invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}" )
91+
92+ case b : BinaryExpression if ! b.resolved =>
93+ failAnalysis(
94+ s " invalid expression ${b.prettyString} " +
95+ s " between ${b.left.simpleString} and ${b.right.simpleString}" )
96+
97+
98+ }
99+
100+ operator match {
101+ case f : Filter if f.condition.dataType != BooleanType =>
102+ failAnalysis(s " filter expression ' ${f.condition.prettyString}' is not a boolean. " )
103+
104+ case aggregatePlan @ Aggregate (groupingExprs, aggregateExprs, child) =>
105+ def isValidAggregateExpression (expr : Expression ): Boolean = expr match {
106+ case _ : AggregateExpression => true
107+ case e : Attribute => groupingExprs.contains(e)
108+ case e if groupingExprs.contains(e) => true
109+ case e if e.references.isEmpty => true
110+ case e => e.children.forall(isValidAggregateExpression)
111+ }
112+
113+ aggregateExprs.find { e =>
114+ ! isValidAggregateExpression(e.transform {
115+ // Should trim aliases around `GetField`s. These aliases are introduced while
116+ // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
117+ // (Should we just turn `GetField` into a `NamedExpression`?)
118+ case Alias (g : GetField , _) => g
119+ })
120+ }.foreach { e =>
121+ failAnalysis(s " expression must be aggregates or be in group by $e" )
122+ }
123+
124+ aggregatePlan
125+
126+ case o if o.children.nonEmpty && ! o.references.subsetOf(o.inputSet) =>
127+ val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(" ," )
128+ val input = o.inputSet.map(_.prettyString).mkString(" ," )
129+
130+ failAnalysis(s " resolved attributes $missingAttributes missing from $input" )
131+
132+ // Catch all
133+ case o if ! o.resolved =>
134+ failAnalysis(
135+ s " unresolved operator ${operator.simpleString}" )
136+
137+ case _ => // Analysis successful!
138+ }
94139 }
140+
141+ plan
95142 }
96143 }
97144
@@ -192,37 +239,6 @@ class Analyzer(catalog: Catalog,
192239 }
193240 }
194241
195- /**
196- * Checks for non-aggregated attributes with aggregation
197- */
198- object CheckAggregation extends Rule [LogicalPlan ] {
199- def apply (plan : LogicalPlan ): LogicalPlan = {
200- plan.transform {
201- case aggregatePlan @ Aggregate (groupingExprs, aggregateExprs, child) =>
202- def isValidAggregateExpression (expr : Expression ): Boolean = expr match {
203- case _ : AggregateExpression => true
204- case e : Attribute => groupingExprs.contains(e)
205- case e if groupingExprs.contains(e) => true
206- case e if e.references.isEmpty => true
207- case e => e.children.forall(isValidAggregateExpression)
208- }
209-
210- aggregateExprs.find { e =>
211- ! isValidAggregateExpression(e.transform {
212- // Should trim aliases around `GetField`s. These aliases are introduced while
213- // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
214- // (Should we just turn `GetField` into a `NamedExpression`?)
215- case Alias (g : GetField , _) => g
216- })
217- }.foreach { e =>
218- throw new TreeNodeException (plan, s " Expression not in GROUP BY: $e" )
219- }
220-
221- aggregatePlan
222- }
223- }
224- }
225-
226242 /**
227243 * Replaces [[UnresolvedRelation ]]s with concrete relations from the catalog.
228244 */
0 commit comments